// @ts-strict-ignore
import moment from 'moment-timezone';
import _ from 'lodash';
import { MESSAGES } from '@/services/socket.constants';
import { logError, logInfo, logTrace, logWarn } from '@/utilities/logger';
import { formatMessage } from '@/utilities/logger.utilities';
import { sqMonitorsApi } from '@/sdk/api/MonitorsApi';
import { sqSubscriptionsApi } from '@/sdk/api/SubscriptionsApi';
import { SeeqNames } from '@/main/app.constants.seeqnames';
import { APPSERVER_PATHS, SOCKET_LIVELINESS_TIMEOUT, TEST_ENV } from '@/main/app.constants';
import { subscribeToAsyncResponse } from '@/core/asyncResponses.utilities';
import { subscribeToPendingRequests } from '@/requests/pendingRequests.utilities';
import { goTo } from '@/main/routing.utilities';
import { decompress } from 'fzstd';
import SocketWorker from '@/webWorkers/socket.worker.shim.js?worker';

interface ChannelSubscription {
  /**
   * The URI that identifies the message channel on which to listen
   */
  channelId: string;
  /**
   * The callback to invoke when a message of the specified channel is received
   */
  onMessage: (message: any) => void;
  /**
   * Determines if this subscription should use the subscription API to subscribe and unsubscribe to the channel.
   * Defaults to true because there are only a few channels that the backend auto-subscribes a client to, such as
   * the async response channel. If this is true then when adding a callback the API will also be subscribed and
   * when the last callback for a channel is removed the API will be unsubscribed.
   */
  useSubscriptionsApi?: boolean;
  /**
   * Used when useSubscriptionsApi is true this callback is invoked after any successful API request to subscribe to
   * the channel.
   */
  onSubscribe?: (isReconnect: boolean) => void;
  /**
   * This callback is invoked when a socket closed event is received.
   */
  onClose?: (event: CloseEvent) => void;
  /**
   * This callback is invoked if an error occurs when either subscribing or unsubscribing from the API or if the
   * socket never opens successfully.
   */
  onError?: (error: any, action: string) => void;

  /**
   * A list of parameters associated with this subscription.
   */
  subscriberParameters?: Record<string, any>;
}

interface ChannelSubscriptionInput extends Omit<ChannelSubscription, 'channelId'> {
  channelId: string[];
}

// JSON structure of client-originated messages.
export type ClientMessage = {
  channelId: string;
  data: any;
};

let webWorker;

export function initialize(onMessageCallback) {
  webWorker = new SocketWorker();
  webWorker.onmessage = onMessageCallback;
}

// We do not want to create the SocketWorker when running Frontend IT tests as it will fail the plugin tests.
process.env.NODE_ENV !== TEST_ENV && initialize(onWorkerMessage);

let livelinessRetry;
let reconnectStart = moment.invalid();
let hasMessageBeenReceived = false;
let errorCount = 0;
let connectionTimer;
let isCreated = false;
let isOpen = false;
const errorCountMax = 2;
const channelSubscriptions: ChannelSubscription[] = [];
const openCallbacks: ((evt: Event) => void)[] = [];
const closeCallbacks: ((evt: CloseEvent) => void)[] = [];
const errorStateCallbacks: (() => void)[] = [];

/**
 * Opens a websocket connection using the specified interactiveId. Any previous websocket connection is automatically
 * closed.
 *
 * @param {string} interactiveId - Interactive ID to use when opening connection. This ID is used to associate HTTP
 *   requests with the websocket from the same client connection.
 * @param {string} csrfToken - The cross-site request forgery token. Necessary because websockets are vulnerable
 * to cross-site attacks since their authentication id is sent via cookie.
 * @returns {Promise} that resolves when the socket request to open has been sent, or rejects if the open fails
 */
export function open(interactiveId, csrfToken) {
  if (isCreated) {
    return Promise.resolve();
  }

  const interactiveIdParamName = SeeqNames.API.QueryParams.ClientInteractiveSessionId;

  const url =
    `${window.location.protocol === 'https:' ? 'wss' : 'ws'}://${window.location.hostname}:${window.location.port}` +
    `${APPSERVER_PATHS.WEBSOCKET_EVENTS}?${interactiveIdParamName}=${encodeURIComponent(interactiveId)}`;
  close();

  // Register this wait prior to sending the open attempt, so that there's no possibility of missing an open or error
  // response
  const waitingForOpen = waitForOpen().then(() => {
    // These channels are automatically created by the backend for the unique interactiveSessionId and should be
    // subscribed for all users
    subscribeToAsyncResponse(interactiveId, subscribe);
    subscribeToPendingRequests(interactiveId, subscribe);
  });

  sendToWorkerThread(MESSAGES.COMMAND_OPEN, { url, csrfToken });
  isCreated = true;

  livelinessRetry = () => {
    sqMonitorsApi
      .postMeterData({
        monitor: 'Network/ClientWebSocket/LivelinessTimeoutExpired',
      })
      .catch((e) => logError(formatMessage`sqMonitorsApi.postMeterData Error: ${e}`));
    close();
    open(interactiveId, csrfToken);
  };

  restartConnectionTimer();
  return waitingForOpen;
}

/**
 * Main HTML5 message handler for messages from the WebWorker thread. All messages are routed to the correct handler
 * function based on the type specified in the data.
 *
 * @param {Object} message - HTML5 message received from the WebWorker thread
 * @param {Object} message.data - Data payload for this message, as sent from the WebWorker thread
 * @param {String} message.data.type - Type of message, to use for routing to the appropriate handler
 * @param {String} message.data.payload - Data payload for the message (varies by type)
 */
export function onWorkerMessage({ data }: MessageEvent) {
  switch (data.type) {
    case MESSAGES.ON_WEBSOCKET_DATA:
      onMessage(data.payload);
      break;
    case MESSAGES.ON_OPEN_COMPLETE:
      onOpenCallback(data.payload);
      break;
    case MESSAGES.ON_CLOSE_COMPLETE:
      onCloseCallback(data.payload);
      break;
    case MESSAGES.ON_ERROR:
      onErrorCallback();
      break;
    case MESSAGES.COMMAND_LOG:
      logInfo(data.payload);
      break;
    default:
      logWarn(`Unknown message from webworker: ${JSON.stringify(data)}`);
      break;
  }
}

/**
 * Service handler for socket onOpen notification
 *
 * @param {Object} event - the event
 */
function onOpenCallback(event) {
  isOpen = true;
  if (reconnectStart.isValid()) {
    sqMonitorsApi
      .postMeterData({ monitor: 'Network/ClientWebSocket/Reconnect' })
      .catch((e) => logError(formatMessage`sqMonitorsApi.postMeterData Error: ${e}`));
  }

  reconnectStart = moment.invalid();
  _.forEach(_.clone(openCallbacks), (callback) => callback(event));
}

/**
 * Internal processor that calls each registered handler when a message received via the websocket.
 *
 * @param {Blob|String} stringOrBinaryMessage - Gzipped or string JSON message
 */
function onMessage(stringOrBinaryMessage: Blob | string) {
  hasMessageBeenReceived = true;
  restartConnectionTimer();

  Promise.resolve(stringOrBinaryMessage instanceof Blob ? unzipBlob(stringOrBinaryMessage) : stringOrBinaryMessage)
    .then((message) => {
      const jsonMessage = _.attempt(JSON.parse, message);

      if (_.isError(jsonMessage)) {
        logError(formatMessage`sqSocket message parse error: ${jsonMessage.toString()}, message: ${message}`);
        return;
      }

      const channelId = jsonMessage.channelId;

      if (channelId) {
        _.chain(channelSubscriptions)
          .filter((subscription) => channelId === subscription.channelId)
          .forEach((subscription) => subscription.onMessage(jsonMessage))
          .value();
      }
    })
    .catch((e) => {
      logError(e);
    });
}

/**
 * Closes the existing websocket connection, if open. Does not clear existing subscriptions since they are
 * expected to persist across network interruptions.
 */
export function close() {
  sendToWorkerThread(MESSAGES.COMMAND_CLOSE);

  isCreated = false;
  isOpen = false;
  errorCount = 0;
  stopConnectionTimer();
}

/**** CONNECTION HANDLERS ****/

/**
 * Starts a timer that will fire if no data is received via the socket within SOCKET_LIVELINESS_TIMEOUT. If
 * triggered, the timer will close the existing socket and attempt to open a new one.
 */
function restartConnectionTimer() {
  stopConnectionTimer();
  connectionTimer = setTimeout(livelinessRetry, SOCKET_LIVELINESS_TIMEOUT);
}

/**
 * Cancels any existing connection timer created with restartConnectionTimer.
 */
function stopConnectionTimer() {
  clearTimeout(connectionTimer);
  connectionTimer = undefined;
}

/**
 * Service handler for socket onClose notification
 *
 * @param {CloseEvent} event
 * @param {Number} event.code - status code for the close event
 * @param {boolean} event.wasClean - true if socket was closed 'cleanly', otherwise false
 */
function onCloseCallback(event) {
  const { code, wasClean } = event;
  // See https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent for what codes mean
  if (code !== 1000) {
    // 1000 is a 'normal' close code; anything else indicates we'll try to reconnect
    reconnectStart = moment.utc();
  } else {
    reconnectStart = moment.invalid();
  }

  if (!wasClean) {
    logTrace(`sqSocket closed unexpectedly (${code})`);
    // TODO: CRAB-28087 right now we're missing the track dependency so we're only logging ...
    // errorToast({ messageKey: 'NOT_CONNECTED' });
    close();
  }
  _.forEach(_.clone(closeCallbacks), (callback) => callback(event));
}

/**
 * Service handler for socket onError notification.
 */
function onErrorCallback() {
  errorCount += 1;
  logError(`sqSocket error (count: ${errorCount})`);
  // If we haven't successfully received any data over this socket before we get multiple errors, then we assume
  // that this is a result of a "failure to communicate", aka a late-arriving failure of the open request.
  if (!hasMessageBeenReceived && errorCount >= errorCountMax) {
    close();
    redirectToErrorState();
  }
}

/**
 * Permanently redirect to the error state with "Websocket unsupported" error
 */
function redirectToErrorState() {
  goTo('/no-websocket');
  _.forEach(_.clone(errorStateCallbacks), (callback) => callback());
}

/**** HELPER FUNCTIONS ****/

/**
 * Helper method that converts a Blob to an ArrayBuffer and then unzips it.
 *
 * @param {Blob} blob - The blob to convert
 * @return {Promise<String>} - Promise that resolves with the unzipped string
 */
function unzipBlob(blob: Blob): Promise<string> {
  return new Promise((resolve, reject) => {
    const reader = new FileReader();
    reader.readAsArrayBuffer(blob);
    reader.onerror = reject;
    reader.onabort = reject;
    reader.onloadend = () => {
      try {
        const decompressed = decompress(new Uint8Array(reader.result as ArrayBuffer));
        resolve(new TextDecoder().decode(decompressed));
      } catch (e) {
        reject(formatMessage`sqSocket message decompress error: ${e}`);
      }
    };
  });
}

/**
 * Helper function to send messages to the worker thread
 *
 * @param {string} type - One of the defined message types
 * @param {object|string} [payload] - data payload to send
 */
function sendToWorkerThread(type: MESSAGES, payload?) {
  if (webWorker) {
    webWorker.postMessage({ type, payload });
  }
}

/**
 * Wait for the socket to be created and/or opened.
 *
 * @returns {Promise} resolves when the websocket is open or rejects if the websocket permanently fails to open
 */
export function waitForOpen() {
  if (isOpen || !isCreated) {
    return Promise.resolve({});
  }
  return new Promise((resolve, reject) => {
    const unsubscribes = [
      onOpen(() => {
        _.forEach(unsubscribes, (u) => u());
        resolve(undefined);
      }),
      onErrorState(() => {
        _.forEach(unsubscribes, (u) => u());
        reject();
      }),
    ];
  });
}

/**
 * Registers a callback that is called whenever the socket is opened, either during the initial connect or when there
 * is a re-connection after a broken connection.
 *
 * @param {Function} callback - the callback that handles the open event. The callback takes one parameter: the
 * event sent to WebSocket.onopen (https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/onopen)
 * @returns {Function} that unregisters the listener when invoked
 */
function onOpen(callback) {
  openCallbacks.push(callback);
  return () => _.pull(openCallbacks, callback);
}

/**
 * Registers a callback that is called when we navigate to the error page (when the websocket permanently fails).
 *
 * @param {Function} callback - a callback function (e.g. function())
 * @returns {Function} that unregisters the listener when invoked
 */
function onErrorState(callback) {
  errorStateCallbacks.push(callback);
  return () => _.pull(errorStateCallbacks, callback);
}

/**
 * Subscribes to a particular message channel, returning a function that is used to unsubscribe. It is assumed that
 * socket callbacks should persist across network interruptions. For that reason, if useSubscriptionsApi is
 * true then the user will be subscribed initially and also if the socket disconnects and reconnects. It is the
 * responsibility of the caller to ensure that the unsubscribe callback that is returned is invoked when the
 * subscription is no longer needed. If useSubscriptionsApi is true and the last callback for a channel is being
 * removed, the subscription will also be unsubscribed from the API. If called multiple times with the same
 * definition, the callback will only be registered once.
 *
 * @param definition - The channel subscription definition
 * @returns A callback that unregisters the listener locally and, if useSubscriptionsApi is true, also
 *   from the API.
 */
export function subscribe(definition: ChannelSubscriptionInput): () => void {
  const channelId = constructChannel(definition.channelId);
  const channelSubscription: ChannelSubscription = { ...definition, channelId };
  const useSubscriptionsApi = _.defaultTo(definition.useSubscriptionsApi, true);
  const onError = _.defaultTo(definition.onError, _.noop);
  let unsubscribeOnOpen = () => {};
  let unsubscribeOnClose = () => {};

  const errorHandler = (action) => (error) => {
    const errorMessage = formatMessage`sqSocket utilities failed to ${action} to: ${channelId}, message: ${error}`;
    // If the caller has already unsubscribed, don't try to unsubscribe again and don't call the onError callback
    if (!_.some(channelSubscriptions, channelSubscription) && action !== 'unsubscribe') {
      logWarn(`${errorMessage} after unsubscribe`);
      return;
    }

    if (action !== 'unsubscribe') {
      unsubscribeCallback();
    }

    logError(errorMessage);
    onError(error, action);
  };

  const onOpenChannel = (isReconnect) => {
    if (useSubscriptionsApi && _.some(channelSubscriptions, { channelId })) {
      sqSubscriptionsApi
        .subscribe({
          channelId,
          subscriberParameters: channelSubscription.subscriberParameters || {},
        })
        .then(() => {
          if (_.isFunction(channelSubscription.onSubscribe)) {
            return channelSubscription.onSubscribe(isReconnect);
          }
        })
        .catch(errorHandler('subscribe'));
    }
  };

  const unsubscribeCallback = () => {
    _.remove(channelSubscriptions, channelSubscription);
    unsubscribeOnOpen();
    unsubscribeOnClose();
    if (useSubscriptionsApi && !_.some(channelSubscriptions, { channelId })) {
      sqSubscriptionsApi.unsubscribe({ channelId }).catch(errorHandler('unsubscribe'));
    }
  };

  if (_.some(channelSubscriptions, channelSubscription)) {
    return unsubscribeCallback;
  }

  channelSubscriptions.push(channelSubscription);
  Promise.resolve(isOpen || waitForOpen())
    .then(() => {
      // If the subscription was unsubscribed while we were waiting for open, don't take any further action
      if (_.some(channelSubscriptions, channelSubscription)) {
        onOpenChannel(false);

        unsubscribeOnOpen = onOpen(() => onOpenChannel(true));
        if (_.isFunction(channelSubscription.onClose)) {
          unsubscribeOnClose = onClose((event) => channelSubscription.onClose(event));
        }
      }
    })
    .catch(errorHandler('open'));

  return unsubscribeCallback;
}

/**
 * Registers a callback that is called whenever the socket is closed.
 *
 * @param {Function} callback - the callback that handle the close event. The callback takes one parameter: the
 * event sent to WebSocket.onclose (https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent)
 * @returns {Function} that unregisters the listener when invoked
 */
function onClose(callback) {
  closeCallbacks.push(callback);
  return () => _.pull(closeCallbacks, callback);
}

/**
 * Construct a channelId by joining encoded parts
 *
 * @param channelIdTokens - Token of channelId to be constructed
 */
function constructChannel(channelIdTokens: string[]) {
  return `channel://${_.map(channelIdTokens, encodeURIComponent).join('/')}`;
}

/**
 * Send a message over the socket
 *
 * @param channelId - URI tokens that identifies the message channel
 * @param data - Data to send
 */
export function emit(channelId: string[], data: string | object) {
  emitClientMessage({ data, channelId: constructChannel(channelId) });
}

function emitClientMessage(message: ClientMessage) {
  // Important! If socket isn't connected then this message will be lost!
  // Currently the messages emitted by the client don't contain critical
  // data. If this changes in the future, a queue/buffer may be needed to
  // store messages until the socket is connected
  sendToWorkerThread(MESSAGES.COMMAND_TRANSMIT, JSON.stringify(message));
}
