// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.

use std::borrow::Cow;
use std::cell::RefCell;
use std::rc::Rc;

use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::op2;

use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::DetachedBuffer;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use serde::Deserialize;
use serde::Serialize;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;

enum Transferable {
  MessagePort(MessagePort),
  ArrayBuffer(u32),
}

type MessagePortMessage = (DetachedBuffer, Vec<Transferable>);

pub struct MessagePort {
  rx: RefCell<UnboundedReceiver<MessagePortMessage>>,
  tx: RefCell<Option<UnboundedSender<MessagePortMessage>>>,
}

impl MessagePort {
  pub fn send(
    &self,
    state: &mut OpState,
    data: JsMessageData,
  ) -> Result<(), AnyError> {
    let transferables =
      deserialize_js_transferables(state, data.transferables)?;

    // Swallow the failed to send error. It means the channel was disentangled,
    // but not cleaned up.
    if let Some(tx) = &*self.tx.borrow() {
      tx.send((data.data, transferables)).ok();
    }

    Ok(())
  }

  #[allow(clippy::await_holding_refcell_ref)] // TODO(ry) remove!
  pub async fn recv(
    &self,
    state: Rc<RefCell<OpState>>,
  ) -> Result<Option<JsMessageData>, AnyError> {
    let mut rx = self
      .rx
      .try_borrow_mut()
      .map_err(|_| type_error("Port receiver is already borrowed"))?;
    if let Some((data, transferables)) = rx.recv().await {
      let js_transferables =
        serialize_transferables(&mut state.borrow_mut(), transferables);
      return Ok(Some(JsMessageData {
        data,
        transferables: js_transferables,
      }));
    }
    Ok(None)
  }

  /// This forcefully disconnects the message port from its paired port. This
  /// will wake up the `.recv` on the paired port, which will return `Ok(None)`.
  pub fn disentangle(&self) {
    let mut tx = self.tx.borrow_mut();
    tx.take();
  }
}

pub fn create_entangled_message_port() -> (MessagePort, MessagePort) {
  let (port1_tx, port2_rx) = unbounded_channel::<MessagePortMessage>();
  let (port2_tx, port1_rx) = unbounded_channel::<MessagePortMessage>();

  let port1 = MessagePort {
    rx: RefCell::new(port1_rx),
    tx: RefCell::new(Some(port1_tx)),
  };

  let port2 = MessagePort {
    rx: RefCell::new(port2_rx),
    tx: RefCell::new(Some(port2_tx)),
  };

  (port1, port2)
}

pub struct MessagePortResource {
  port: MessagePort,
  cancel: CancelHandle,
}

impl Resource for MessagePortResource {
  fn name(&self) -> Cow<str> {
    "messagePort".into()
  }

  fn close(self: Rc<Self>) {
    self.cancel.cancel();
  }
}

#[op2]
#[serde]
pub fn op_message_port_create_entangled(
  state: &mut OpState,
) -> (ResourceId, ResourceId) {
  let (port1, port2) = create_entangled_message_port();

  let port1_id = state.resource_table.add(MessagePortResource {
    port: port1,
    cancel: CancelHandle::new(),
  });

  let port2_id = state.resource_table.add(MessagePortResource {
    port: port2,
    cancel: CancelHandle::new(),
  });

  (port1_id, port2_id)
}

#[derive(Deserialize, Serialize)]
#[serde(tag = "kind", content = "data", rename_all = "camelCase")]
pub enum JsTransferable {
  #[serde(rename_all = "camelCase")]
  MessagePort(ResourceId),
  ArrayBuffer(u32),
}

fn deserialize_js_transferables(
  state: &mut OpState,
  js_transferables: Vec<JsTransferable>,
) -> Result<Vec<Transferable>, AnyError> {
  let mut transferables = Vec::with_capacity(js_transferables.len());
  for js_transferable in js_transferables {
    match js_transferable {
      JsTransferable::MessagePort(id) => {
        let resource = state
          .resource_table
          .take::<MessagePortResource>(id)
          .map_err(|_| type_error("Invalid message port transfer"))?;
        resource.cancel.cancel();
        let resource = Rc::try_unwrap(resource)
          .map_err(|_| type_error("Message port is not ready for transfer"))?;
        transferables.push(Transferable::MessagePort(resource.port));
      }
      JsTransferable::ArrayBuffer(id) => {
        transferables.push(Transferable::ArrayBuffer(id));
      }
    }
  }
  Ok(transferables)
}

fn serialize_transferables(
  state: &mut OpState,
  transferables: Vec<Transferable>,
) -> Vec<JsTransferable> {
  let mut js_transferables = Vec::with_capacity(transferables.len());
  for transferable in transferables {
    match transferable {
      Transferable::MessagePort(port) => {
        let rid = state.resource_table.add(MessagePortResource {
          port,
          cancel: CancelHandle::new(),
        });
        js_transferables.push(JsTransferable::MessagePort(rid));
      }
      Transferable::ArrayBuffer(id) => {
        js_transferables.push(JsTransferable::ArrayBuffer(id));
      }
    }
  }
  js_transferables
}

#[derive(Deserialize, Serialize)]
pub struct JsMessageData {
  data: DetachedBuffer,
  transferables: Vec<JsTransferable>,
}

#[op2]
pub fn op_message_port_post_message(
  state: &mut OpState,
  #[smi] rid: ResourceId,
  #[serde] data: JsMessageData,
) -> Result<(), AnyError> {
  for js_transferable in &data.transferables {
    if let JsTransferable::MessagePort(id) = js_transferable {
      if *id == rid {
        return Err(type_error("Can not transfer self message port"));
      }
    }
  }

  let resource = state.resource_table.get::<MessagePortResource>(rid)?;

  resource.port.send(state, data)
}

#[op2(async)]
#[serde]
pub async fn op_message_port_recv_message(
  state: Rc<RefCell<OpState>>,
  #[smi] rid: ResourceId,
) -> Result<Option<JsMessageData>, AnyError> {
  let resource = {
    let state = state.borrow();
    match state.resource_table.get::<MessagePortResource>(rid) {
      Ok(resource) => resource,
      Err(_) => return Ok(None),
    }
  };
  let cancel = RcRef::map(resource.clone(), |r| &r.cancel);
  resource.port.recv(state).or_cancel(cancel).await?
}