import { EventEmitter } from 'events';
import { watsonSingleton } from './watson-initalization';
import MicrophoneStream from 'microphone-stream';
import logger from 'go-modules/logger/logger';
import { FULLSTORY_EVENTS } from 'go-modules/services/fullstory/fullstory.events';
import { xhr } from 'go-modules/xhr/xhr';
import * as dayjs from 'dayjs';
import { LOCAL_STORAGE_WATSON_USAGE_KEY } from '../services/watson-usage-tracking/watson-usage-tracking.service';
import { RealtimeTranscriber, RealtimeTranscript } from 'assemblyai';
import { WritableStreamAdapter } from 'go-modules/real-time-captions/writable-stream-adapter';
import { AudioBufferTransformStream } from 'go-modules/real-time-captions/audio-buffer-transform-stream';
import { clientSettings } from 'go-modules/models/common/client.settings';

export class RealTimeCaptionsService extends EventEmitter {
	private previousStream: MediaStream;
	private stream: MediaStream;
	private microphoneStream: MicrophoneStream;
	private transcribeStream: RealtimeTranscriber|Watson.Speech.FormatStream;
	private retryTimeout: ReturnType<typeof setTimeout>;
	private retries: number = 0;
	private stopped: boolean = false;
	private pauseTimeoutDuration: number = 30000;
	private pausedTimeoutId: ReturnType<typeof setTimeout>;
	private paused: boolean = true;
	private isRequesting = false;
	private accesstoken: string;
	private trackTranscribeUsageIntervalRef;
	private startedAt: Date;
	private totalSeconds: number;
	private mediaId: number;
	private heyGenEnabled: boolean = false;

	constructor (heyGen: boolean) {
		super();
		this.heyGenEnabled = heyGen;
	}

	public setPaused (isPaused: boolean) {
		this.paused = isPaused;

		if (this.transcribeStream) {
			if(this.paused) {
				this.startPausedTimerToStopTranscribingStream();
				this.microphoneStream?.pauseRecording();
			} else {
				this.microphoneStream?.playRecording();
				this.clearPausedTimeout();
			}
		} else if(!this.paused && this.stream) {
			this.clearPausedTimeout();
			this.startTranscribingStream(this.stream, this.mediaId);
		}
	}

	public stop () {
		this.paused = true;
		this.microphoneStream?.pauseRecording();
		this.stopTranscribingStream();
	}


	public async startTranscribingStream (stream: MediaStream, mediaId?: number): Promise<void> {
		this.previousStream = this.stream;
		this.stream = stream;
		this.mediaId = mediaId;
		this.createMicrophoneStream(stream);

		if (this.paused || this.isRequesting) {
			return;
		}

		this.cancelReconnect();
		this.isRequesting = true;
		this.stopped = false;

		if (this.heyGenEnabled) {
			let newTranscribeStream: RealtimeTranscriber;
			try {
				newTranscribeStream = await this.recognizeStream() as RealtimeTranscriber;
			} catch(e /*Response | TypeError*/) {
				if (e instanceof Response) {
					e = new Error(e.statusText);
				}
				this.error(e);
				return;
			}

			if (newTranscribeStream == null) {
				this.isRequesting = false;
				this.stopped = true;
				return;
			}

			newTranscribeStream.on('open', () => {
				this.retries = 0;
				this.destroyStream(this.transcribeStream);

				if (!this.stopped) {
					this.transcribeStream = newTranscribeStream;
					window.FS.event(FULLSTORY_EVENTS.CAPTIONS_STARTED, {});
					this.transcribeStream.on('transcript', (data: RealtimeTranscript) => this.emit('data', data));
				} else {
					this.destroyStream(newTranscribeStream);
				}

				this.isRequesting = false;

				// This will happen when user turn on the caption
				// And suddenly pause the video and still connecting to watson socket
				// So we trigger this so that we will not automatically close the transcribing
				if(this.paused) {
					this.startPausedTimerToStopTranscribingStream();
				}
			});

			newTranscribeStream.on('error', (error) => this.error(error, newTranscribeStream));

			newTranscribeStream.on('close', () => {
				this.isRequesting = false;

				if (this.transcribeStream === newTranscribeStream) {
					this.transcribeStream = null;
				}
			});
		} else {
			let newTranscribeStream: Watson.Speech.FormatStream;
			try {
				newTranscribeStream = await this.recognizeStream() as Watson.Speech.FormatStream;
			} catch(e /*Response | TypeError*/) {
				if (e instanceof Response) {
					e = new Error(e.statusText);
				}
				this.error(e);
				return;
			}

			if (newTranscribeStream == null) {
				this.isRequesting = false;
				this.stopped = true;
				return;
			}

			newTranscribeStream.recognizeStream.once('open', () => {
				this.retries = 0;
				this.destroyStream(this.transcribeStream);

				if (!this.stopped) {
					this.transcribeStream = newTranscribeStream;
					window.FS.event(FULLSTORY_EVENTS.CAPTIONS_STARTED, {});
					this.transcribeStream.on('data', (data: Watson.Speech.RecognitionResults) => this.emit('data', data));
					this.trackTranscriptionUsage(mediaId);
				} else {
					this.destroyStream(newTranscribeStream);
				}

				this.isRequesting = false;

				// This will happen when user turn on the caption
				// And suddenly pause the video and still connecting to watson socket
				// So we trigger this so that we will not automatically close the transcribing
				if(this.paused) {
					this.startPausedTimerToStopTranscribingStream();
				}
			});

			newTranscribeStream.on('error', (error) => this.error(error, newTranscribeStream));

			newTranscribeStream.recognizeStream.once('close', () => {
				this.isRequesting = false;

				newTranscribeStream.removeAllListeners();
				if (this.transcribeStream === newTranscribeStream) {
					this.transcribeStream = null;
				}
			});
		}
	}

	public async trackTranscriptionUsage (mediaId: number) {
		await this.stopTrackingTranscriptionUsage();
		this.startedAt = dayjs().toDate();
		this.totalSeconds = 0;
		this.trackTranscribeUsageIntervalRef = setInterval(() => {
			this.totalSeconds++;
			localStorage.setItem(
				LOCAL_STORAGE_WATSON_USAGE_KEY,
				JSON.stringify({
					media_id: mediaId,
					total_seconds: this.totalSeconds,
					started_at: this.startedAt,
					provider: 'watson'
				})
			);
		}, 1000);
	}

	public async stopTrackingTranscriptionUsage () {
		clearInterval(this.trackTranscribeUsageIntervalRef);
		const usageString = localStorage.getItem(LOCAL_STORAGE_WATSON_USAGE_KEY);
		if (usageString == null) return null;
		const usage = JSON.parse(usageString);
		await xhr('/transcription_usages', {
			method: 'POST',
			body: usage
		}).then(() => {
			localStorage.removeItem(LOCAL_STORAGE_WATSON_USAGE_KEY);
		});
	}

	public async stopTranscribingStream (): Promise<void> {
		this.stopped = true;
		this.cancelReconnect();
		await this.destroyStream(this.transcribeStream);
	}

	public async destroy (): Promise<void> {
		this.cancelReconnect();
		await this.stopTranscribingStream();
		this.stopMicrophoneStream();
		this.removeAllListeners();
	}

	private async destroyStream (transcribeStream: RealtimeTranscriber|Watson.Speech.FormatStream|null): Promise<void> {

		if (transcribeStream == null) {
			return;
		}

		if (transcribeStream instanceof RealtimeTranscriber) {
			return transcribeStream.close().then(() => {
				this.stopTrackingTranscriptionUsage();
			});
		}

		transcribeStream.stop();

		if (transcribeStream.recognizeStream.socket?.readyState !== WebSocket.CLOSED) {
			await new Promise<void>((whenClosed: () => void) => {
				transcribeStream.recognizeStream.once('close', () => {
					this.stopTrackingTranscriptionUsage().finally(whenClosed);
				});
			});
		}
	}

	private error (error: Error, transcribeStream?: RealtimeTranscriber|Watson.Speech.FormatStream): void {
		try {
			this.emit('error', error);
		} catch (e) {
			// If nobody is listening to the 'error' event,
			// the event emitter throws, but we do not want it to.
		}

		this.destroyStream(transcribeStream);

		// Unless there was silence for too long, we will attempt to reconnect
		if (error.message.indexOf('No speech detected for') === 0) {
			this.cancelReconnect();
			return;
		}

		logger.logError(error);

		// We are already set up to reconnect in the future
		if (this.retryTimeout != null) {
			return;
		}

		// Reconnect backoff: 0s, 5s, 10s, 20s, 40s, 80s, etc...
		const exponentialBackoff = this.retries === 0 ? 0 : (2 ** (this.retries - 1)) * 5000;
		this.retries++;
		this.retryTimeout = setTimeout(() => {
			this.isRequesting = false;
			this.startTranscribingStream(this.stream, this.mediaId);
		}, exponentialBackoff);
	}

	private cancelReconnect () {
		this.isRequesting = false;
		clearTimeout(this.retryTimeout);
		this.retryTimeout = null;
	}

	private async recognizeStream (): Promise<RealtimeTranscriber|Watson.Speech.FormatStream> {
		// Await the token and speech package
		const watsonToken = await watsonSingleton.tokenPromise(this.heyGenEnabled);
		const watsonSpeech = await watsonSingleton.speechPromise(this.heyGenEnabled);

		// This fixes issue where it will still try to create token if you turn off
		// CC before it is done with the watson create token request.
		// This SAVES MONEY
		if (this.microphoneStream == null) {
			return null;
		}

		if (this.heyGenEnabled) {
			const realtimeTranscriber = new watsonSpeech.RealtimeTranscriber({
				token: watsonToken.access_token,
				endUtteranceSilenceThreshold: 1500
			});
			const writeStream = new WritableStreamAdapter(realtimeTranscriber.stream());

			const l16Stream = new watsonSpeech.WebAudioL16Stream({ writableObjectMode: true });

			this.microphoneStream.unpipe();

			realtimeTranscriber.connect().then(() => {
				this.microphoneStream
					.pipe(l16Stream)
					.pipe(new AudioBufferTransformStream())
					.pipe(writeStream);
			});

			return realtimeTranscriber;
		}

		const recognizeStream = new watsonSpeech.RecognizeStream({
			accessToken: watsonToken.access_token,
			url: watsonToken.transcribe_url,
			inactivityTimeout: clientSettings.watson?.speech?.inactivityTimeoutInSeconds || 1000,
			contentType: 'audio/l16;rate=16000;endianness=little-endian',
			interimResults: true,
			xWatsonLearningOptOut: true,
			objectMode: true,
			timestamps: true,
			smartFormatting: true
		});
		const l16Stream = new watsonSpeech.WebAudioL16Stream({ writableObjectMode: true });
		const formatStream = new watsonSpeech.FormatStream({objectMode: true});

		this.microphoneStream.unpipe();
		this.microphoneStream.pipe(l16Stream).pipe(recognizeStream).pipe(formatStream);

		// Forward all errors to the format stream
		this.microphoneStream.on('error', formatStream.emit.bind(formatStream, 'error'));
		l16Stream.on('error', formatStream.emit.bind(formatStream, 'error'));
		recognizeStream.on('error', formatStream.emit.bind(formatStream, 'error'));

		// Expose the recognize stream
		formatStream.stop = recognizeStream.stop.bind(recognizeStream);
		formatStream.recognizeStream = recognizeStream;

		return formatStream;
	}

	private createMicrophoneStream (stream: MediaStream): MicrophoneStream {
		if(this.isStreamValueChanged()) {
			this.stopMicrophoneStream();
		}

		if(this.isStreamValueChanged() || !this.microphoneStream) {
			// Create the MicrophoneStream synchronously to allow it to resume the context in Safari on iOS 11
			this.microphoneStream = new MicrophoneStream({
				objectMode: true
			});

			this.microphoneStream.setStream(stream);
		}

		if(this.paused) {
			this.microphoneStream.pauseRecording();
		} else {
			this.microphoneStream.playRecording();
		}

		return this.microphoneStream;
	}

	private stopMicrophoneStream () {
		this.microphoneStream?.stop();
		this.microphoneStream = null;
	}

	private isStreamValueChanged (): boolean {
		return  this.previousStream?.id !== this.stream?.id;
	}

	private clearPausedTimeout () {
		clearTimeout(this.pausedTimeoutId);
		this.pausedTimeoutId = undefined;
	}

	private startPausedTimerToStopTranscribingStream () {
		this.clearPausedTimeout();

		this.pausedTimeoutId = setTimeout(() => {
			if(this.paused) {
				this.stopTranscribingStream();
			}

			this.clearPausedTimeout();
		}, this.pauseTimeoutDuration);
	}

}

