// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.

/// <reference path="../../core/internal.d.ts" />

import { core, primordials } from "ext:core/mod.js";
import {
  op_ws_check_permission_and_cancel_handle,
  op_ws_close,
  op_ws_create,
  op_ws_get_buffer,
  op_ws_get_buffer_as_string,
  op_ws_get_error,
  op_ws_next_event,
  op_ws_send_binary_async,
  op_ws_send_text_async,
} from "ext:core/ops";
const {
  ArrayPrototypeJoin,
  ArrayPrototypeMap,
  DateNow,
  Error,
  ObjectPrototypeIsPrototypeOf,
  PromisePrototypeCatch,
  PromisePrototypeThen,
  SafeSet,
  SetPrototypeGetSize,
  StringPrototypeEndsWith,
  StringPrototypeToLowerCase,
  Symbol,
  SymbolFor,
  TypeError,
  TypedArrayPrototypeGetByteLength,
  TypedArrayPrototypeGetSymbolToStringTag,
} = primordials;

import * as webidl from "ext:deno_webidl/00_webidl.js";
import { createFilteredInspectProxy } from "ext:deno_console/01_console.js";
import { Deferred, writableStreamClose } from "ext:deno_web/06_streams.js";
import { DOMException } from "ext:deno_web/01_dom_exception.js";
import { add, remove } from "ext:deno_web/03_abort_signal.js";
import {
  fillHeaders,
  headerListFromHeaders,
  headersFromHeaderList,
} from "ext:deno_fetch/20_headers.js";

webidl.converters.WebSocketStreamOptions = webidl.createDictionaryConverter(
  "WebSocketStreamOptions",
  [
    {
      key: "protocols",
      converter: webidl.converters["sequence<USVString>"],
      get defaultValue() {
        return [];
      },
    },
    {
      key: "signal",
      converter: webidl.converters.AbortSignal,
    },
    {
      key: "headers",
      converter: webidl.converters.HeadersInit,
    },
  ],
);
webidl.converters.WebSocketCloseInfo = webidl.createDictionaryConverter(
  "WebSocketCloseInfo",
  [
    {
      key: "code",
      converter: webidl.converters["unsigned short"],
    },
    {
      key: "reason",
      converter: webidl.converters.USVString,
      defaultValue: "",
    },
  ],
);

const CLOSE_RESPONSE_TIMEOUT = 5000;

const _rid = Symbol("[[rid]]");
const _url = Symbol("[[url]]");
const _opened = Symbol("[[opened]]");
const _closed = Symbol("[[closed]]");
const _earlyClose = Symbol("[[earlyClose]]");
const _closeSent = Symbol("[[closeSent]]");
class WebSocketStream {
  [_rid];

  [_url];
  get url() {
    webidl.assertBranded(this, WebSocketStreamPrototype);
    return this[_url];
  }

  constructor(url, options) {
    this[webidl.brand] = webidl.brand;
    const prefix = "Failed to construct 'WebSocketStream'";
    webidl.requiredArguments(arguments.length, 1, prefix);
    url = webidl.converters.USVString(url, prefix, "Argument 1");
    options = webidl.converters.WebSocketStreamOptions(
      options,
      prefix,
      "Argument 2",
    );

    const wsURL = new URL(url);

    if (wsURL.protocol !== "ws:" && wsURL.protocol !== "wss:") {
      throw new DOMException(
        "Only ws & wss schemes are allowed in a WebSocket URL.",
        "SyntaxError",
      );
    }

    if (wsURL.hash !== "" || StringPrototypeEndsWith(wsURL.href, "#")) {
      throw new DOMException(
        "Fragments are not allowed in a WebSocket URL.",
        "SyntaxError",
      );
    }

    this[_url] = wsURL.href;

    if (
      options.protocols.length !==
        SetPrototypeGetSize(
          new SafeSet(
            ArrayPrototypeMap(
              options.protocols,
              (p) => StringPrototypeToLowerCase(p),
            ),
          ),
        )
    ) {
      throw new DOMException(
        "Can't supply multiple times the same protocol.",
        "SyntaxError",
      );
    }

    const headers = headersFromHeaderList([], "request");
    if (options.headers !== undefined) {
      fillHeaders(headers, options.headers);
    }

    const cancelRid = op_ws_check_permission_and_cancel_handle(
      "WebSocketStream.abort()",
      this[_url],
      true,
    );

    if (options.signal?.aborted) {
      core.close(cancelRid);
      const err = options.signal.reason;
      this[_opened].reject(err);
      this[_closed].reject(err);
    } else {
      const abort = () => {
        core.close(cancelRid);
      };
      options.signal?.[add](abort);
      PromisePrototypeThen(
        op_ws_create(
          "new WebSocketStream()",
          this[_url],
          options.protocols ? ArrayPrototypeJoin(options.protocols, ", ") : "",
          cancelRid,
          headerListFromHeaders(headers),
        ),
        (create) => {
          options.signal?.[remove](abort);
          if (this[_earlyClose]) {
            PromisePrototypeThen(
              op_ws_close(create.rid),
              () => {
                PromisePrototypeThen(
                  (async () => {
                    while (true) {
                      const kind = await op_ws_next_event(create.rid);

                      if (kind > 5) {
                        /* close */
                        break;
                      }
                    }
                  })(),
                  () => {
                    const err = new DOMException(
                      "Closed while connecting",
                      "NetworkError",
                    );
                    this[_opened].reject(err);
                    this[_closed].reject(err);
                  },
                );
              },
              () => {
                const err = new DOMException(
                  "Closed while connecting",
                  "NetworkError",
                );
                this[_opened].reject(err);
                this[_closed].reject(err);
              },
            );
          } else {
            this[_rid] = create.rid;

            const writable = new WritableStream({
              write: async (chunk) => {
                if (typeof chunk === "string") {
                  await op_ws_send_text_async(this[_rid], chunk);
                } else if (
                  TypedArrayPrototypeGetSymbolToStringTag(chunk) ===
                    "Uint8Array"
                ) {
                  await op_ws_send_binary_async(this[_rid], chunk);
                } else {
                  throw new TypeError(
                    "A chunk may only be either a string or an Uint8Array",
                  );
                }
              },
              close: async (reason) => {
                try {
                  this.close(reason?.code !== undefined ? reason : {});
                } catch (_) {
                  this.close();
                }
                await this.closed;
              },
              abort: async (reason) => {
                try {
                  this.close(reason?.code !== undefined ? reason : {});
                } catch (_) {
                  this.close();
                }
                await this.closed;
              },
            });
            const pull = async (controller) => {
              // Remember that this pull method may be re-entered before it has completed
              const kind = await op_ws_next_event(this[_rid]);
              switch (kind) {
                case 0:
                  /* string */
                  controller.enqueue(op_ws_get_buffer_as_string(this[_rid]));
                  break;
                case 1: {
                  /* binary */
                  controller.enqueue(op_ws_get_buffer(this[_rid]));
                  break;
                }
                case 2: {
                  /* pong */
                  break;
                }
                case 3: {
                  /* error */
                  const err = new Error(op_ws_get_error(this[_rid]));
                  this[_closed].reject(err);
                  controller.error(err);
                  core.tryClose(this[_rid]);
                  break;
                }
                case 1005: {
                  /* closed */
                  this[_closed].resolve({ code: 1005, reason: "" });
                  core.tryClose(this[_rid]);
                  break;
                }
                default: {
                  /* close */
                  const reason = op_ws_get_error(this[_rid]);
                  this[_closed].resolve({
                    code: kind,
                    reason,
                  });
                  core.tryClose(this[_rid]);
                  break;
                }
              }

              if (
                this[_closeSent].state === "fulfilled" &&
                this[_closed].state === "pending"
              ) {
                if (
                  DateNow() - await this[_closeSent].promise <=
                    CLOSE_RESPONSE_TIMEOUT
                ) {
                  return pull(controller);
                }

                const error = op_ws_get_error(this[_rid]);
                this[_closed].reject(new Error(error));
                core.tryClose(this[_rid]);
              }
            };
            const readable = new ReadableStream({
              start: (controller) => {
                PromisePrototypeThen(this.closed, () => {
                  try {
                    controller.close();
                  } catch (_) {
                    // needed to ignore warnings & assertions
                  }
                  try {
                    PromisePrototypeCatch(
                      writableStreamClose(writable),
                      () => {},
                    );
                  } catch (_) {
                    // needed to ignore warnings & assertions
                  }
                });

                PromisePrototypeThen(this[_closeSent].promise, () => {
                  if (this[_closed].state === "pending") {
                    return pull(controller);
                  }
                });
              },
              pull,
              cancel: async (reason) => {
                try {
                  this.close(reason?.code !== undefined ? reason : {});
                } catch (_) {
                  this.close();
                }
                await this.closed;
              },
            });

            this[_opened].resolve({
              readable,
              writable,
              extensions: create.extensions ?? "",
              protocol: create.protocol ?? "",
            });
          }
        },
        (err) => {
          if (ObjectPrototypeIsPrototypeOf(core.InterruptedPrototype, err)) {
            // The signal was aborted.
            err = options.signal.reason;
          } else {
            core.tryClose(cancelRid);
          }
          this[_opened].reject(err);
          this[_closed].reject(err);
        },
      );
    }
  }

  [_opened] = new Deferred();
  get opened() {
    webidl.assertBranded(this, WebSocketStreamPrototype);
    return this[_opened].promise;
  }

  [_earlyClose] = false;
  [_closed] = new Deferred();
  [_closeSent] = new Deferred();
  get closed() {
    webidl.assertBranded(this, WebSocketStreamPrototype);
    return this[_closed].promise;
  }

  close(closeInfo) {
    webidl.assertBranded(this, WebSocketStreamPrototype);
    closeInfo = webidl.converters.WebSocketCloseInfo(
      closeInfo,
      "Failed to execute 'close' on 'WebSocketStream'",
      "Argument 1",
    );

    if (
      closeInfo.code &&
      !(closeInfo.code === 1000 ||
        (3000 <= closeInfo.code && closeInfo.code < 5000))
    ) {
      throw new DOMException(
        "The close code must be either 1000 or in the range of 3000 to 4999.",
        "InvalidAccessError",
      );
    }

    const encoder = new TextEncoder();
    if (
      closeInfo.reason &&
      TypedArrayPrototypeGetByteLength(encoder.encode(closeInfo.reason)) > 123
    ) {
      throw new DOMException(
        "The close reason may not be longer than 123 bytes.",
        "SyntaxError",
      );
    }

    let code = closeInfo.code;
    if (closeInfo.reason && code === undefined) {
      code = 1000;
    }

    if (this[_opened].state === "pending") {
      this[_earlyClose] = true;
    } else if (this[_closed].state === "pending") {
      PromisePrototypeThen(
        op_ws_close(this[_rid], code, closeInfo.reason),
        () => {
          setTimeout(() => {
            this[_closeSent].resolve(DateNow());
          }, 0);
        },
        (err) => {
          this[_rid] && core.tryClose(this[_rid]);
          this[_closed].reject(err);
        },
      );
    }
  }

  [SymbolFor("Deno.privateCustomInspect")](inspect, inspectOptions) {
    return inspect(
      createFilteredInspectProxy({
        object: this,
        evaluate: ObjectPrototypeIsPrototypeOf(WebSocketStreamPrototype, this),
        keys: [
          "closed",
          "opened",
          "url",
        ],
      }),
      inspectOptions,
    );
  }
}

const WebSocketStreamPrototype = WebSocketStream.prototype;

export { WebSocketStream };