import {
  put,
  select,
  call,
  take,
  takeEvery,
  takeLatest,
  delay,
  all,
  fork,
  race
} from "redux-saga/effects";
import { eventChannel } from "redux-saga";
import {
  socketOpened,
  socketError,
  socketClosed,
  networkConnected,
  delayedNetworkDisonnected
} from "./actions";
import {
  WS_SOCKET_CLOSED,
  WS_SOCKET_OPENED,
  WS_SOCKET_ERROR,
  WS_INITIATE,
  ENTITIES,
  EXERCISE,
  LOCKCODES
} from "../../../constants";
import { CMS_SSO_TOKEN } from "../../constants/fetch";
import { getCookie } from "../../services/cookies";
import {
  actionUpdateLock,
  FORCE_UNLOCK_POST_BROADCAST,
  LOCK_POST_BROADCAST,
  UNLOCK_POST_BROADCAST
} from "../../../posts/sagas/Locks/actions";
import { selectEntityId } from "../../../store/selectors";

let socketChannel;
let webSocket;
let reconnectAttempts = 0;
let keepAliveId;
const MAX_RECONNECT_INTERVAL_MS = 15000;
const RECONNECT_INCREASE_STEP_MS = 250;

const generateReconnectInterval = attempt => {
  const newReconnectInterval =
    (Math.pow(2, attempt) - 1) * RECONNECT_INCREASE_STEP_MS;
  return newReconnectInterval <= MAX_RECONNECT_INTERVAL_MS
    ? newReconnectInterval
    : MAX_RECONNECT_INTERVAL_MS;
};

/**
 * Creates the websocket eventChannel and listens for socket closed events
 * @param socket the WebSocket object
 */
const createSocketChannel = socket => {
  return eventChannel(emit => {
    socket.onmessage = event => {
      emit(event.data);
    };

    socket.onclose = () => {
      window.console.log("WebSocket closed");
      emit(socketClosed());
    };

    return () => {
      socket.close();
    };
  });
};

/**
 * Handles opening of websocket
 * @param socket the WebSocket object
 */
const open = (token, entityId, wsUrl) => {
  return new Promise((resolve, reject) => {
    webSocket = new WebSocket(
      `${wsUrl}?entityId=${entityId}&accessToken=${token}`
    );
    webSocket.onopen = () => {
      window.console.log("WebSocket successfully opened!");
      reconnectAttempts = 0;
      resolve(webSocket);
    };
    webSocket.onerror = err => {
      window.console.log("Failed to open socket");
      reject(err);
    };
  });
};

/**
 * Sends a ping to the websocket server if connection is open.
 */
const keepAlive = () => {
  if (webSocket.readyState === webSocket.OPEN) {
    webSocket.send("ping");
  }
};

/**
 * Saga to init websocket. Should be set as a contraint of the application.
 */
function* connect() {
  yield call(connectSocketSaga);
}

/**
 * Worker saga to connect websocket
 */
function* connectSocketSaga() {
  const token = getCookie(CMS_SSO_TOKEN);
  const entityId = yield select(selectEntityId);
  const wsProtocol = window.location.protocol === "https:" ? "wss://" : "ws://";
  let wsUrl = process.env.REACT_APP_WS_URL || `${wsProtocol}localhost:8081/ws`;
  if (window.location.host.substring(0, "localhost".length) !== "localhost") {
    wsUrl = `${wsProtocol}${window.location.host}/ws`;
  }

  try {
    const socket = yield call(open, token, entityId, wsUrl);
    socketChannel = yield call(createSocketChannel, socket);
    yield put(socketOpened());
    yield put(networkConnected());
    yield call(keepAliveSocketSaga);
  } catch (e) {
    yield put(socketError());
    yield put(delayedNetworkDisonnected(e));
    yield call(cancelKeepAliveSocketSaga);
  }
}

/**
 * Worker saga to reconnect websocket
 */
function* reconnectSocketSaga() {
  yield call(cancelKeepAliveSocketSaga);

  const interval = generateReconnectInterval(reconnectAttempts);
  window.console.log(
    `Will try to reconnect to socket in ${interval}ms (attempt ${reconnectAttempts})`
  );
  yield delay(interval);
  reconnectAttempts++;
  yield call(connectSocketSaga);
}

/**
 * Worker saga to start a keep alive loop that pings the websocket server every 20 s.
 */
function* keepAliveSocketSaga() {
  if (!keepAliveId) {
    const interval = process.env.REACT_APP_SOCKET_KEEPALIVE_INTERVAL || 30000;
    keepAliveId = yield call(setInterval, keepAlive, interval);
  }
}

/**
 * Worker saga to cancel the keep alive loop on socket errors.
 */
function* cancelKeepAliveSocketSaga() {
  if (keepAliveId) {
    yield call(clearInterval, keepAliveId);
    keepAliveId = 0;
  }
}

/**
 * Watcher saga to handle socket closed actions
 */
function* watchSocketClosedSaga() {
  yield takeEvery(WS_SOCKET_CLOSED, reconnectSocketSaga);
}

/**
 * Watcher saga to handle socket error actions
 */
function* watchSocketErrorSaga() {
  yield takeEvery(WS_SOCKET_ERROR, reconnectSocketSaga);
}

/**
 * Watcher saga to initiate websocket
 */
function* watchInitiateWebSocket() {
  yield takeLatest(WS_INITIATE, connect);
}

/**
 * Watcher saga to send lock status
 */
function* watchLockStatusChanges() {
  yield takeEvery(LOCK_POST_BROADCAST, sendMessage);
  yield takeEvery(UNLOCK_POST_BROADCAST, sendMessage);
  yield takeEvery(FORCE_UNLOCK_POST_BROADCAST, sendMessage);
}

/**
 * Send message
 */
const sendMessage = action => {
  const object = {
    type: action.type,
    data: { entityId: action.entityId, postId: action.postId }
  };
  if (webSocket.readyState === webSocket.OPEN) {
    webSocket.send(JSON.stringify(object));
  }
};

/**
 * Watcher saga for the socket eventChannel
 */
function* watchSocketChannelSaga() {
  while (true) {
    yield take(WS_SOCKET_OPENED);
    try {
      const { cancel } = yield race({
        task: all([call(serverListener)]),
        cancel: take(WS_SOCKET_CLOSED)
      });
      if (cancel) {
        socketChannel.close();
        webSocket.close();
      }
    } catch (e) {
      yield put(socketError());
      break;
    }
  }
}

function* serverListener() {
  while (true) {
    const action = yield take(socketChannel);
    const loggedInUserId = yield select(st => st.viewer.get("author"));

    if (action.type === WS_SOCKET_CLOSED) {
      yield put(socketClosed());
      continue;
    }

    if (action === "pong") {
      continue;
    }
    try {
      const message = JSON.parse(action);

      // Error handling
      if (message.type === "error") {
        handleErrorBroadcast(message.message);
      } else {
        yield put(handleLockBrodcast(loggedInUserId, message));
      }
    } catch (e) {
      window.console.error("Failed to parse socket message", action);
      yield put(socketClosed());
      break;
    }
  }
}

/**
 * Error handling for incoming errors
 * @param {*} message
 */
const handleErrorBroadcast = message => {
  if (message === "user unknown") {
    // If refresh token fails, we force window to be reloaded to prevent looping.
    window.console.error("Refresh token has failed! Reloading page...");
    window.location.reload();
  }
};

/**
 * Prepares and returns an action for updating lock.
 * @param {*} loggedInUserId
 * @param {*} message
 */
const handleLockBrodcast = (loggedInUserId, message) => {
  if (message.type === "LOCK_POST_BROADCAST") {
    const data = mutateLockData(message, loggedInUserId);
    return actionUpdateLock(data, ENTITIES[EXERCISE], message.postId);
  }
  if (message.type === "UNLOCK_POST_BROADCAST") {
    return actionUpdateLock(message.data, ENTITIES[EXERCISE], message.postId);
  }
  if (message.type === "FORCE_UNLOCK_POST_BROADCAST") {
    return actionUpdateLock(message.data, ENTITIES[EXERCISE], message.postId);
  }
};

/**
 * Handle LOCK_POST here, mutate the param code here depending on usecase.
 * @param {*} message
 * @param {*} loggedInUserId
 */
const mutateLockData = (message, loggedInUserId) => {
  const { user_id } = message && message.data;

  if (user_id === 0) {
    return { ...message.data, code: LOCKCODES.NotLocked };
  } else if (user_id === loggedInUserId) {
    return { ...message.data, code: LOCKCODES.LockedByMe };
  } else {
    return { ...message.data, code: LOCKCODES.LockedByOther };
  }
};

export default function* rootSaga() {
  yield all([
    fork(watchInitiateWebSocket),
    fork(watchSocketChannelSaga),
    fork(watchSocketClosedSaga),
    fork(watchSocketErrorSaga),
    fork(watchLockStatusChanges)
  ]);
}
