// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use crate::io::UnixStreamResource; use crate::NetPermissions; use deno_core::error::bad_resource; use deno_core::error::custom_error; use deno_core::error::AnyError; use deno_core::op2; use deno_core::AsyncRefCell; use deno_core::CancelHandle; use deno_core::CancelTryFuture; use deno_core::JsBuffer; use deno_core::OpState; use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; use serde::Deserialize; use serde::Serialize; use std::borrow::Cow; use std::cell::RefCell; use std::path::Path; use std::rc::Rc; use tokio::net::UnixDatagram; use tokio::net::UnixListener; pub use tokio::net::UnixStream; /// A utility function to map OsStrings to Strings pub fn into_string(s: std::ffi::OsString) -> Result { s.into_string().map_err(|s| { let message = format!("File name or path {s:?} is not valid UTF-8"); custom_error("InvalidData", message) }) } pub(crate) struct UnixListenerResource { pub listener: AsyncRefCell, cancel: CancelHandle, } impl Resource for UnixListenerResource { fn name(&self) -> Cow { "unixListener".into() } fn close(self: Rc) { self.cancel.cancel(); } } pub struct UnixDatagramResource { pub socket: AsyncRefCell, pub cancel: CancelHandle, } impl Resource for UnixDatagramResource { fn name(&self) -> Cow { "unixDatagram".into() } fn close(self: Rc) { self.cancel.cancel(); } } #[derive(Serialize)] pub struct UnixAddr { pub path: Option, } #[derive(Deserialize)] pub struct UnixListenArgs { pub path: String, } #[op2(async)] #[serde] pub async fn op_net_accept_unix( state: Rc>, #[smi] rid: ResourceId, ) -> Result<(ResourceId, Option, Option), AnyError> { let resource = state .borrow() .resource_table .get::(rid) .map_err(|_| bad_resource("Listener has been closed"))?; let listener = RcRef::map(&resource, |r| &r.listener) .try_borrow_mut() .ok_or_else(|| custom_error("Busy", "Listener already in use"))?; let cancel = RcRef::map(resource, |r| &r.cancel); let (unix_stream, _socket_addr) = listener .accept() .try_or_cancel(cancel) .await .map_err(crate::ops::accept_err)?; let local_addr = unix_stream.local_addr()?; let remote_addr = unix_stream.peer_addr()?; let local_addr_path = local_addr.as_pathname().map(pathstring).transpose()?; let remote_addr_path = remote_addr.as_pathname().map(pathstring).transpose()?; let resource = UnixStreamResource::new(unix_stream.into_split()); let mut state = state.borrow_mut(); let rid = state.resource_table.add(resource); Ok((rid, local_addr_path, remote_addr_path)) } #[op2(async)] #[serde] pub async fn op_net_connect_unix( state: Rc>, #[string] path: String, ) -> Result<(ResourceId, Option, Option), AnyError> where NP: NetPermissions + 'static, { let address_path = Path::new(&path); super::check_unstable(&state.borrow(), "Deno.connect"); { let mut state_ = state.borrow_mut(); state_ .borrow_mut::() .check_read(address_path, "Deno.connect()")?; state_ .borrow_mut::() .check_write(address_path, "Deno.connect()")?; } let unix_stream = UnixStream::connect(Path::new(&path)).await?; let local_addr = unix_stream.local_addr()?; let remote_addr = unix_stream.peer_addr()?; let local_addr_path = local_addr.as_pathname().map(pathstring).transpose()?; let remote_addr_path = remote_addr.as_pathname().map(pathstring).transpose()?; let mut state_ = state.borrow_mut(); let resource = UnixStreamResource::new(unix_stream.into_split()); let rid = state_.resource_table.add(resource); Ok((rid, local_addr_path, remote_addr_path)) } #[op2(async)] #[serde] pub async fn op_net_recv_unixpacket( state: Rc>, #[smi] rid: ResourceId, #[buffer] mut buf: JsBuffer, ) -> Result<(usize, Option), AnyError> { let resource = state .borrow() .resource_table .get::(rid) .map_err(|_| bad_resource("Socket has been closed"))?; let socket = RcRef::map(&resource, |r| &r.socket) .try_borrow_mut() .ok_or_else(|| custom_error("Busy", "Socket already in use"))?; let cancel = RcRef::map(resource, |r| &r.cancel); let (nread, remote_addr) = socket.recv_from(&mut buf).try_or_cancel(cancel).await?; let path = remote_addr.as_pathname().map(pathstring).transpose()?; Ok((nread, path)) } #[op2(async)] #[number] pub async fn op_net_send_unixpacket( state: Rc>, #[smi] rid: ResourceId, #[string] path: String, #[buffer] zero_copy: JsBuffer, ) -> Result where NP: NetPermissions + 'static, { let address_path = Path::new(&path); { let mut s = state.borrow_mut(); s.borrow_mut::() .check_write(address_path, "Deno.DatagramConn.send()")?; } let resource = state .borrow() .resource_table .get::(rid) .map_err(|_| custom_error("NotConnected", "Socket has been closed"))?; let socket = RcRef::map(&resource, |r| &r.socket) .try_borrow_mut() .ok_or_else(|| custom_error("Busy", "Socket already in use"))?; let nwritten = socket.send_to(&zero_copy, address_path).await?; Ok(nwritten) } #[op2] #[serde] pub fn op_net_listen_unix( state: &mut OpState, #[string] path: String, #[string] api_name: String, ) -> Result<(ResourceId, Option), AnyError> where NP: NetPermissions + 'static, { let address_path = Path::new(&path); super::check_unstable(state, &api_name); let permissions = state.borrow_mut::(); let api_call_expr = format!("{}()", api_name); permissions.check_read(address_path, &api_call_expr)?; permissions.check_write(address_path, &api_call_expr)?; let listener = UnixListener::bind(address_path)?; let local_addr = listener.local_addr()?; let pathname = local_addr.as_pathname().map(pathstring).transpose()?; let listener_resource = UnixListenerResource { listener: AsyncRefCell::new(listener), cancel: Default::default(), }; let rid = state.resource_table.add(listener_resource); Ok((rid, pathname)) } pub fn net_listen_unixpacket( state: &mut OpState, path: String, ) -> Result<(ResourceId, Option), AnyError> where NP: NetPermissions + 'static, { let address_path = Path::new(&path); let permissions = state.borrow_mut::(); permissions.check_read(address_path, "Deno.listenDatagram()")?; permissions.check_write(address_path, "Deno.listenDatagram()")?; let socket = UnixDatagram::bind(address_path)?; let local_addr = socket.local_addr()?; let pathname = local_addr.as_pathname().map(pathstring).transpose()?; let datagram_resource = UnixDatagramResource { socket: AsyncRefCell::new(socket), cancel: Default::default(), }; let rid = state.resource_table.add(datagram_resource); Ok((rid, pathname)) } #[op2] #[serde] pub fn op_net_listen_unixpacket( state: &mut OpState, #[string] path: String, ) -> Result<(ResourceId, Option), AnyError> where NP: NetPermissions + 'static, { super::check_unstable(state, "Deno.listenDatagram"); net_listen_unixpacket::(state, path) } #[op2] #[serde] pub fn op_node_unstable_net_listen_unixpacket( state: &mut OpState, #[string] path: String, ) -> Result<(ResourceId, Option), AnyError> where NP: NetPermissions + 'static, { net_listen_unixpacket::(state, path) } pub fn pathstring(pathname: &Path) -> Result { into_string(pathname.into()) }