// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.

pub use impl_::*;

pub struct ChildPipeFd(pub i64);

mod impl_ {
  use std::cell::RefCell;
  use std::future::Future;
  use std::io;
  use std::mem;
  #[cfg(unix)]
  use std::os::fd::FromRawFd;
  #[cfg(unix)]
  use std::os::fd::RawFd;
  use std::pin::Pin;
  use std::rc::Rc;
  use std::task::Context;
  use std::task::Poll;

  use deno_core::error::bad_resource_id;
  use deno_core::error::AnyError;
  use deno_core::op2;
  use deno_core::serde_json;
  use deno_core::AsyncRefCell;
  use deno_core::CancelFuture;
  use deno_core::CancelHandle;
  use deno_core::OpState;
  use deno_core::RcRef;
  use deno_core::ResourceId;
  use pin_project_lite::pin_project;
  use tokio::io::AsyncBufRead;
  use tokio::io::AsyncWriteExt;
  use tokio::io::BufReader;

  #[cfg(unix)]
  use tokio::net::unix::OwnedReadHalf;
  #[cfg(unix)]
  use tokio::net::unix::OwnedWriteHalf;
  #[cfg(unix)]
  use tokio::net::UnixStream;

  #[cfg(windows)]
  type NamedPipeClient = tokio::net::windows::named_pipe::NamedPipeClient;

  // Open IPC pipe from bootstrap options.
  #[op2]
  #[smi]
  pub fn op_node_child_ipc_pipe(
    state: &mut OpState,
  ) -> Result<Option<ResourceId>, AnyError> {
    let fd = match state.try_borrow_mut::<crate::ChildPipeFd>() {
      Some(child_pipe_fd) => child_pipe_fd.0,
      None => return Ok(None),
    };

    Ok(Some(
      state.resource_table.add(IpcJsonStreamResource::new(fd)?),
    ))
  }

  #[op2(async)]
  pub async fn op_node_ipc_write(
    state: Rc<RefCell<OpState>>,
    #[smi] rid: ResourceId,
    #[serde] value: serde_json::Value,
  ) -> Result<(), AnyError> {
    let stream = state
      .borrow()
      .resource_table
      .get::<IpcJsonStreamResource>(rid)
      .map_err(|_| bad_resource_id())?;
    stream.write_msg(value).await?;
    Ok(())
  }

  #[op2(async)]
  #[serde]
  pub async fn op_node_ipc_read(
    state: Rc<RefCell<OpState>>,
    #[smi] rid: ResourceId,
  ) -> Result<serde_json::Value, AnyError> {
    let stream = state
      .borrow()
      .resource_table
      .get::<IpcJsonStreamResource>(rid)
      .map_err(|_| bad_resource_id())?;

    let cancel = stream.cancel.clone();
    let mut stream = RcRef::map(stream, |r| &r.read_half).borrow_mut().await;
    let msgs = stream.read_msg().or_cancel(cancel).await??;
    Ok(msgs)
  }

  pub struct IpcJsonStreamResource {
    read_half: AsyncRefCell<IpcJsonStream>,
    #[cfg(unix)]
    write_half: AsyncRefCell<OwnedWriteHalf>,
    #[cfg(windows)]
    write_half: AsyncRefCell<tokio::io::WriteHalf<NamedPipeClient>>,
    cancel: Rc<CancelHandle>,
  }

  impl deno_core::Resource for IpcJsonStreamResource {
    fn close(self: Rc<Self>) {
      self.cancel.cancel();
    }
  }

  #[cfg(unix)]
  fn pipe(stream: RawFd) -> Result<(OwnedReadHalf, OwnedWriteHalf), io::Error> {
    // Safety: The fd is part of a pair of connected sockets create by child process
    // implementation.
    let unix_stream = UnixStream::from_std(unsafe {
      std::os::unix::net::UnixStream::from_raw_fd(stream)
    })?;
    Ok(unix_stream.into_split())
  }

  #[cfg(windows)]
  fn pipe(
    handle: i64,
  ) -> Result<
    (
      tokio::io::ReadHalf<NamedPipeClient>,
      tokio::io::WriteHalf<NamedPipeClient>,
    ),
    io::Error,
  > {
    // Safety: We cannot use `get_osfhandle` because Deno statically links to msvcrt. It is not guaranteed that the
    // fd handle map will be the same.
    let pipe = unsafe { NamedPipeClient::from_raw_handle(handle as _)? };
    Ok(tokio::io::split(pipe))
  }

  impl IpcJsonStreamResource {
    pub fn new(stream: i64) -> Result<Self, std::io::Error> {
      let (read_half, write_half) = pipe(stream as _)?;
      Ok(Self {
        read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
        write_half: AsyncRefCell::new(write_half),
        cancel: Default::default(),
      })
    }

    #[cfg(unix)]
    #[cfg(test)]
    fn from_stream(stream: UnixStream) -> Self {
      let (read_half, write_half) = stream.into_split();
      Self {
        read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
        write_half: AsyncRefCell::new(write_half),
        cancel: Default::default(),
      }
    }

    #[cfg(windows)]
    #[cfg(test)]
    fn from_stream(pipe: NamedPipeClient) -> Self {
      let (read_half, write_half) = tokio::io::split(pipe);
      Self {
        read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
        write_half: AsyncRefCell::new(write_half),
        cancel: Default::default(),
      }
    }

    async fn write_msg(
      self: Rc<Self>,
      msg: serde_json::Value,
    ) -> Result<(), AnyError> {
      let mut write_half =
        RcRef::map(self, |r| &r.write_half).borrow_mut().await;
      // Perf note: We do not benefit from writev here because
      // we are always allocating a buffer for serialization anyways.
      let mut buf = Vec::new();
      serde_json::to_writer(&mut buf, &msg)?;
      buf.push(b'\n');
      write_half.write_all(&buf).await?;
      Ok(())
    }
  }

  #[inline]
  fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> {
    #[cfg(all(target_os = "macos", target_arch = "aarch64"))]
    // Safety: haystack of valid length. neon_memchr can handle unaligned
    // data.
    return unsafe { neon::neon_memchr(haystack, needle, haystack.len()) };

    #[cfg(not(all(target_os = "macos", target_arch = "aarch64")))]
    return haystack.iter().position(|&b| b == needle);
  }

  // Initial capacity of the buffered reader and the JSON backing buffer.
  //
  // This is a tradeoff between memory usage and performance on large messages.
  //
  // 64kb has been chosen after benchmarking 64 to 66536 << 6 - 1 bytes per message.
  const INITIAL_CAPACITY: usize = 1024 * 64;

  // JSON serialization stream over IPC pipe.
  //
  // `\n` is used as a delimiter between messages.
  struct IpcJsonStream {
    #[cfg(unix)]
    pipe: BufReader<OwnedReadHalf>,
    #[cfg(windows)]
    pipe: BufReader<tokio::io::ReadHalf<NamedPipeClient>>,
    buffer: Vec<u8>,
  }

  impl IpcJsonStream {
    #[cfg(unix)]
    fn new(pipe: OwnedReadHalf) -> Self {
      Self {
        pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe),
        buffer: Vec::with_capacity(INITIAL_CAPACITY),
      }
    }

    #[cfg(windows)]
    fn new(pipe: tokio::io::ReadHalf<NamedPipeClient>) -> Self {
      Self {
        pipe: BufReader::with_capacity(INITIAL_CAPACITY, pipe),
        buffer: Vec::with_capacity(INITIAL_CAPACITY),
      }
    }

    async fn read_msg(&mut self) -> Result<serde_json::Value, AnyError> {
      let mut json = None;
      let nread =
        read_msg_inner(&mut self.pipe, &mut self.buffer, &mut json).await?;
      if nread == 0 {
        // EOF.
        return Ok(serde_json::Value::Null);
      }

      let json = match json {
        Some(v) => v,
        None => {
          // Took more than a single read and some buffering.
          simd_json::from_slice(&mut self.buffer[..nread])?
        }
      };

      // Safety: Same as `Vec::clear` but without the `drop_in_place` for
      // each element (nop for u8). Capacity remains the same.
      unsafe {
        self.buffer.set_len(0);
      }

      Ok(json)
    }
  }

  pin_project! {
      #[must_use = "futures do nothing unless you `.await` or poll them"]
      struct ReadMsgInner<'a, R: ?Sized> {
          reader: &'a mut R,
          buf: &'a mut Vec<u8>,
          json: &'a mut Option<serde_json::Value>,
          // The number of bytes appended to buf. This can be less than buf.len() if
          // the buffer was not empty when the operation was started.
          read: usize,
      }
  }

  fn read_msg_inner<'a, R>(
    reader: &'a mut R,
    buf: &'a mut Vec<u8>,
    json: &'a mut Option<serde_json::Value>,
  ) -> ReadMsgInner<'a, R>
  where
    R: AsyncBufRead + ?Sized + Unpin,
  {
    ReadMsgInner {
      reader,
      buf,
      json,
      read: 0,
    }
  }

  fn read_msg_internal<R: AsyncBufRead + ?Sized>(
    mut reader: Pin<&mut R>,
    cx: &mut Context<'_>,
    buf: &mut Vec<u8>,
    json: &mut Option<serde_json::Value>,
    read: &mut usize,
  ) -> Poll<io::Result<usize>> {
    loop {
      let (done, used) = {
        let available = match reader.as_mut().poll_fill_buf(cx) {
          std::task::Poll::Ready(t) => t?,
          std::task::Poll::Pending => return std::task::Poll::Pending,
        };
        if let Some(i) = memchr(b'\n', available) {
          if *read == 0 {
            // Fast path: parse and put into the json slot directly.
            //
            // Safety: It is ok to overwrite the  contents because
            // we don't need to copy it into the buffer and the length will be reset.
            let available = unsafe {
              std::slice::from_raw_parts_mut(
                available.as_ptr() as *mut u8,
                available.len(),
              )
            };
            json.replace(
              simd_json::from_slice(&mut available[..i + 1])
                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
            );
          } else {
            // This is not the first read, so we have to copy the data
            // to make it contiguous.
            buf.extend_from_slice(&available[..=i]);
          }
          (true, i + 1)
        } else {
          buf.extend_from_slice(available);
          (false, available.len())
        }
      };

      reader.as_mut().consume(used);
      *read += used;
      if done || used == 0 {
        return Poll::Ready(Ok(mem::replace(read, 0)));
      }
    }
  }

  impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadMsgInner<'_, R> {
    type Output = io::Result<usize>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
      let me = self.project();
      read_msg_internal(Pin::new(*me.reader), cx, me.buf, me.json, me.read)
    }
  }

  #[cfg(all(target_os = "macos", target_arch = "aarch64"))]
  mod neon {
    use std::arch::aarch64::*;

    pub unsafe fn neon_memchr(
      str: &[u8],
      c: u8,
      length: usize,
    ) -> Option<usize> {
      let end = str.as_ptr().wrapping_add(length);

      // Alignment handling
      let mut ptr = str.as_ptr();
      while ptr < end && (ptr as usize) & 0xF != 0 {
        if *ptr == c {
          return Some(ptr as usize - str.as_ptr() as usize);
        }
        ptr = ptr.wrapping_add(1);
      }

      let search_char = vdupq_n_u8(c);

      while ptr.wrapping_add(16) <= end {
        let chunk = vld1q_u8(ptr);
        let comparison = vceqq_u8(chunk, search_char);

        // Check first 64 bits
        let result0 = vgetq_lane_u64(vreinterpretq_u64_u8(comparison), 0);
        if result0 != 0 {
          return Some(
            (ptr as usize - str.as_ptr() as usize)
              + result0.trailing_zeros() as usize / 8,
          );
        }

        // Check second 64 bits
        let result1 = vgetq_lane_u64(vreinterpretq_u64_u8(comparison), 1);
        if result1 != 0 {
          return Some(
            (ptr as usize - str.as_ptr() as usize)
              + 8
              + result1.trailing_zeros() as usize / 8,
          );
        }

        ptr = ptr.wrapping_add(16);
      }

      // Handle remaining unaligned characters
      while ptr < end {
        if *ptr == c {
          return Some(ptr as usize - str.as_ptr() as usize);
        }
        ptr = ptr.wrapping_add(1);
      }

      None
    }
  }

  #[cfg(test)]
  mod tests {
    use super::IpcJsonStreamResource;
    use deno_core::serde_json;
    use deno_core::serde_json::json;
    use deno_core::RcRef;
    use std::rc::Rc;

    #[allow(clippy::unused_async)]
    #[cfg(unix)]
    pub async fn pair() -> (Rc<IpcJsonStreamResource>, tokio::net::UnixStream) {
      let (a, b) = tokio::net::UnixStream::pair().unwrap();

      /* Similar to how ops would use the resource */
      let a = Rc::new(IpcJsonStreamResource::from_stream(a));
      (a, b)
    }

    #[cfg(windows)]
    pub async fn pair() -> (
      Rc<IpcJsonStreamResource>,
      tokio::net::windows::named_pipe::NamedPipeServer,
    ) {
      use tokio::net::windows::named_pipe::ClientOptions;
      use tokio::net::windows::named_pipe::ServerOptions;

      let name =
        format!(r"\\.\pipe\deno-named-pipe-test-{}", rand::random::<u32>());

      let server = ServerOptions::new().create(name.clone()).unwrap();
      let client = ClientOptions::new().open(name).unwrap();

      server.connect().await.unwrap();
      /* Similar to how ops would use the resource */
      let client = Rc::new(IpcJsonStreamResource::from_stream(client));
      (client, server)
    }

    #[allow(clippy::print_stdout)]
    #[tokio::test]
    async fn bench_ipc() -> Result<(), Box<dyn std::error::Error>> {
      // A simple round trip benchmark for quick dev feedback.
      //
      // Only ran when the env var is set.
      if std::env::var_os("BENCH_IPC_DENO").is_none() {
        return Ok(());
      }

      let (ipc, mut fd2) = pair().await;
      let child = tokio::spawn(async move {
        use tokio::io::AsyncWriteExt;

        let size = 1024 * 1024;

        let stri = "x".repeat(size);
        let data = format!("\"{}\"\n", stri);
        for _ in 0..100 {
          fd2.write_all(data.as_bytes()).await?;
        }
        Ok::<_, std::io::Error>(())
      });

      let start = std::time::Instant::now();
      let mut bytes = 0;

      let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await;
      loop {
        let msgs = ipc.read_msg().await?;
        if msgs == serde_json::Value::Null {
          break;
        }
        bytes += msgs.as_str().unwrap().len();
        if start.elapsed().as_secs() > 5 {
          break;
        }
      }
      let elapsed = start.elapsed();
      let mb = bytes as f64 / 1024.0 / 1024.0;
      println!("{} mb/s", mb / elapsed.as_secs_f64());

      child.await??;

      Ok(())
    }

    #[tokio::test]
    async fn unix_ipc_json() -> Result<(), Box<dyn std::error::Error>> {
      let (ipc, mut fd2) = pair().await;
      let child = tokio::spawn(async move {
        use tokio::io::AsyncReadExt;
        use tokio::io::AsyncWriteExt;

        const EXPECTED: &[u8] = b"\"hello\"\n";
        let mut buf = [0u8; EXPECTED.len()];
        let n = fd2.read_exact(&mut buf).await?;
        assert_eq!(&buf[..n], EXPECTED);
        fd2.write_all(b"\"world\"\n").await?;

        Ok::<_, std::io::Error>(())
      });

      ipc.clone().write_msg(json!("hello")).await?;

      let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await;
      let msgs = ipc.read_msg().await?;
      assert_eq!(msgs, json!("world"));

      child.await??;

      Ok(())
    }

    #[tokio::test]
    async fn unix_ipc_json_multi() -> Result<(), Box<dyn std::error::Error>> {
      let (ipc, mut fd2) = pair().await;
      let child = tokio::spawn(async move {
        use tokio::io::AsyncReadExt;
        use tokio::io::AsyncWriteExt;

        const EXPECTED: &[u8] = b"\"hello\"\n\"world\"\n";
        let mut buf = [0u8; EXPECTED.len()];
        let n = fd2.read_exact(&mut buf).await?;
        assert_eq!(&buf[..n], EXPECTED);
        fd2.write_all(b"\"foo\"\n\"bar\"\n").await?;
        Ok::<_, std::io::Error>(())
      });

      ipc.clone().write_msg(json!("hello")).await?;
      ipc.clone().write_msg(json!("world")).await?;

      let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await;
      let msgs = ipc.read_msg().await?;
      assert_eq!(msgs, json!("foo"));

      child.await??;

      Ok(())
    }

    #[tokio::test]
    async fn unix_ipc_json_invalid() -> Result<(), Box<dyn std::error::Error>> {
      let (ipc, mut fd2) = pair().await;
      let child = tokio::spawn(async move {
        tokio::io::AsyncWriteExt::write_all(&mut fd2, b"\n\n").await?;
        Ok::<_, std::io::Error>(())
      });

      let mut ipc = RcRef::map(ipc, |r| &r.read_half).borrow_mut().await;
      let _err = ipc.read_msg().await.unwrap_err();

      child.await??;

      Ok(())
    }

    #[test]
    fn memchr() {
      let str = b"hello world";
      assert_eq!(super::memchr(b'h', str), Some(0));
      assert_eq!(super::memchr(b'w', str), Some(6));
      assert_eq!(super::memchr(b'd', str), Some(10));
      assert_eq!(super::memchr(b'x', str), None);

      let empty = b"";
      assert_eq!(super::memchr(b'\n', empty), None);
    }
  }
}