import deepEqual from 'fast-deep-equal';
import { type Observable } from 'rxjs/Observable';
import { of } from 'rxjs/observable/of';
import { catchError } from 'rxjs/operators/catchError';
import { concatMap } from 'rxjs/operators/concatMap';
import { delay } from 'rxjs/operators/delay';
import { distinctUntilChanged } from 'rxjs/operators/distinctUntilChanged';
import { filter } from 'rxjs/operators/filter';
import { map } from 'rxjs/operators/map';
import { skipWhile } from 'rxjs/operators/skipWhile';
import { switchMap } from 'rxjs/operators/switchMap';
import { timeInterval } from 'rxjs/operators/timeInterval';
import { timeout } from 'rxjs/operators/timeout';

import { type ObservableStatefulInput } from '@atlassian/rx-hooks';

import {
	CONVOAI_DEFAULT_ERROR_CODE,
	ConvoAIErrorMessage,
	type ConvoAIStreamingRequest,
} from '../api/ConvoAI/ConvoAIStreamingModels';
import {
	AnswerPartResponseMessage,
	ChannelIdResponseMessage,
	ErrorResponseMessage,
	FinalResponseMessage,
	FollowUpObject,
	FollowUpResponseMessage,
} from '../api/ConvoAI/ConvoAIStreamMessages';
import { createConvoAIRequestConfig } from '../api/ConvoAI/createConvoAIRequestConfig';
import { getDefaultConvoAIConfigWithUserSettings } from '../api/ConvoAI/getDefaultConvoAIConfigWithUserSettings';
import { makeConvoAIStreamRequest } from '../api/ConvoAI/makeConvoAIStreamRequest';
import { parsePartialConvoAIJsonIfNeeded } from '../api/ConvoAI/parsePartialConvoAIJsonIfNeeded';
import { logConvoAIInfo } from '../logs/logConvoAIInfo';
import { type UserConvoAIStreamingConfig } from '../models/ConvoAIStreamingConfig';
import { ConvoAIResponseState, type ConvoAIStreamingState } from '../models/ConvoAIStreamingState';

export function convoAIStreamingEffect<
	TConvoAIStreamingRequest extends ConvoAIStreamingRequest,
	TFinalResponse extends FinalResponseMessage = FinalResponseMessage<string>,
	TError extends ErrorResponseMessage = ErrorResponseMessage<`${ConvoAIErrorMessage}`>,
	TFollowUp extends FollowUpResponseMessage = FollowUpResponseMessage<FollowUpObject[]>,
	TAnswerPart extends AnswerPartResponseMessage = TFinalResponse,
	TChannelId extends ChannelIdResponseMessage = TFinalResponse,
>(
	input$: Observable<
		ObservableStatefulInput<
			[TConvoAIStreamingRequest, UserConvoAIStreamingConfig, string],
			ConvoAIStreamingState<TFinalResponse, TError, TFollowUp, TAnswerPart, TChannelId>
		>
	>,
): Observable<ConvoAIStreamingState<TFinalResponse, TError, TFollowUp, TAnswerPart, TChannelId>> {
	return (
		input$
			.pipe(
				skipWhile(({ inputs: [_, { skip }] }) => !!skip),
				distinctUntilChanged(
					(
						{ inputs: [prevRequest, prevConfig, prevRefetchId] },
						{ inputs: [nextRequest, nextConfig, nextRefetchId] },
					) =>
						deepEqual(prevRequest, nextRequest) &&
						deepEqual(prevConfig, nextConfig) &&
						prevRefetchId === nextRefetchId,
				),
				map(({ inputs: [request, config] }) => ({
					config: getDefaultConvoAIConfigWithUserSettings(config),
					request,
				})),
				map(({ request, config }) => ({
					requestConfig: createConvoAIRequestConfig(request, config),
					config,
				})),
				switchMap(({ requestConfig, config }) => {
					logConvoAIInfo('pre request data', config, requestConfig);
					let $request = makeConvoAIStreamRequest(requestConfig);
					if (config.timeoutDelay) {
						return $request.pipe(
							timeout(config.timeoutDelay),
							map((state) => ({
								config,
								state,
							})),
						);
					}

					return $request.pipe(
						map((state) => ({
							config,
							state,
						})),
					);
				}),
				filter(
					({ config, state }) =>
						!(!config.enableFollowUp && state.responseState === ConvoAIResponseState.FollowUp),
				),
				map(({ config, state }) => {
					logConvoAIInfo('response data', config, state);
					const nextState = parsePartialConvoAIJsonIfNeeded<TFinalResponse>(state, config);
					logConvoAIInfo('post json parsing content', config, state);
					return { state: nextState, config };
				}),
				timeInterval(),
				concatMap(({ value: { state, config }, interval }) => {
					if (!config.delayChunkUpdate) {
						return of(state);
					}

					const { time, gap } = config.delayChunkUpdate;

					if (interval < gap) {
						return of(state).pipe(delay(time));
					}
					return of(state);
				}),
			)
			// pipe() has an apparent limit of 9 operators
			// https://github.com/ReactiveX/rxjs/issues/4221#issuecomment-426774631
			.pipe(
				catchError((error: Error) => {
					if (error.name === 'TimeoutError') {
						const timeoutErrorState: ConvoAIStreamingState<TError> = {
							responseState: ConvoAIResponseState.Error,
							content: null,
							answerChunkIndex: undefined,
							error: {
								type: 'ERROR',
								message: {
									content: error.message,
									message_template: ConvoAIErrorMessage.TIMEOUT,
									status_code: CONVOAI_DEFAULT_ERROR_CODE,
								},
							},
						};

						return of(timeoutErrorState);
					}

					const unexpectedErrorState: ConvoAIStreamingState<TError> = {
						responseState: ConvoAIResponseState.Error,
						content: null,
						error: {
							type: 'ERROR',
							message: {
								content: error.message,
								message_template: ConvoAIErrorMessage.UNEXPECTED,
								status_code: CONVOAI_DEFAULT_ERROR_CODE,
							},
						},
					};

					return of(unexpectedErrorState);
				}),
			)
	);
}
