1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-11 08:33:43 -05:00

Revert "refactor(ext/net): clean up variadic network ops (#16392)" (#16417)

Should fix https://github.com/denoland/deno_std/issues/2807
This commit is contained in:
Bartek Iwańczuk 2022-10-25 20:32:51 +02:00 committed by GitHub
parent 1f6aeb430b
commit 8e3f825c92
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 727 additions and 471 deletions

View file

@ -10,10 +10,10 @@
Error, Error,
ObjectPrototypeIsPrototypeOf, ObjectPrototypeIsPrototypeOf,
PromiseResolve, PromiseResolve,
Symbol,
SymbolAsyncIterator, SymbolAsyncIterator,
SymbolFor, SymbolFor,
TypedArrayPrototypeSubarray, TypedArrayPrototypeSubarray,
TypeError,
Uint8Array, Uint8Array,
} = window.__bootstrap.primordials; } = window.__bootstrap.primordials;
@ -38,6 +38,30 @@
return core.shutdown(rid); return core.shutdown(rid);
} }
function opAccept(rid, transport) {
return core.opAsync("op_net_accept", { rid, transport });
}
function opListen(args) {
return ops.op_net_listen(args);
}
function opConnect(args) {
return core.opAsync("op_net_connect", args);
}
function opReceive(rid, transport, zeroCopy) {
return core.opAsync(
"op_dgram_recv",
{ rid, transport },
zeroCopy,
);
}
function opSend(args, zeroCopy) {
return core.opAsync("op_dgram_send", args, zeroCopy);
}
function resolveDns(query, recordType, options) { function resolveDns(query, recordType, options) {
return core.opAsync("op_dns_resolve", { query, recordType, options }); return core.opAsync("op_dns_resolve", { query, recordType, options });
} }
@ -111,6 +135,11 @@
class UnixConn extends Conn {} class UnixConn extends Conn {}
// Use symbols for method names to hide these in stable API.
// TODO(kt3k): Remove these symbols when ref/unref become stable.
const listenerRef = Symbol("listenerRef");
const listenerUnref = Symbol("listenerUnref");
class Listener { class Listener {
#rid = 0; #rid = 0;
#addr = null; #addr = null;
@ -130,35 +159,21 @@
return this.#addr; return this.#addr;
} }
async accept() { accept() {
let promise; const promise = opAccept(this.rid, this.addr.transport);
switch (this.addr.transport) {
case "tcp":
promise = core.opAsync("op_net_accept_tcp", this.rid);
break;
case "unix":
promise = core.opAsync("op_net_accept_unix", this.rid);
break;
default:
throw new Error(`Unsupported transport: ${this.addr.transport}`);
}
this.#promiseId = promise[promiseIdSymbol]; this.#promiseId = promise[promiseIdSymbol];
if (this.#unref) core.unrefOp(this.#promiseId); if (this.#unref) {
const [rid, localAddr, remoteAddr] = await promise; this.#unrefOpAccept();
this.#promiseId = null;
if (this.addr.transport == "tcp") {
localAddr.transport = "tcp";
remoteAddr.transport = "tcp";
return new TcpConn(rid, remoteAddr, localAddr);
} else if (this.addr.transport == "unix") {
return new UnixConn(
rid,
{ transport: "unix", path: remoteAddr },
{ transport: "unix", path: localAddr },
);
} else {
throw new Error("unreachable");
} }
return promise.then((res) => {
if (this.addr.transport == "tcp") {
return new TcpConn(res.rid, res.remoteAddr, res.localAddr);
} else if (this.addr.transport == "unix") {
return new UnixConn(res.rid, res.remoteAddr, res.localAddr);
} else {
throw new Error("unreachable");
}
});
} }
async next() { async next() {
@ -190,15 +205,22 @@
return this; return this;
} }
ref() { [listenerRef]() {
this.#unref = false; this.#unref = false;
this.#refOpAccept();
}
[listenerUnref]() {
this.#unref = true;
this.#unrefOpAccept();
}
#refOpAccept() {
if (typeof this.#promiseId === "number") { if (typeof this.#promiseId === "number") {
core.refOp(this.#promiseId); core.refOp(this.#promiseId);
} }
} }
#unrefOpAccept() {
unref() {
this.#unref = true;
if (typeof this.#promiseId === "number") { if (typeof this.#promiseId === "number") {
core.unrefOp(this.#promiseId); core.unrefOp(this.#promiseId);
} }
@ -225,54 +247,18 @@
async receive(p) { async receive(p) {
const buf = p || new Uint8Array(this.bufSize); const buf = p || new Uint8Array(this.bufSize);
let nread; const { size, remoteAddr } = await opReceive(
let remoteAddr; this.rid,
switch (this.addr.transport) { this.addr.transport,
case "udp": { buf,
[nread, remoteAddr] = await core.opAsync( );
"op_net_recv_udp", const sub = TypedArrayPrototypeSubarray(buf, 0, size);
this.rid,
buf,
);
remoteAddr.transport = "udp";
break;
}
case "unixpacket": {
let path;
[nread, path] = await core.opAsync(
"op_net_recv_unixpacket",
this.rid,
buf,
);
remoteAddr = { transport: "unixpacket", path };
break;
}
default:
throw new Error(`Unsupported transport: ${this.addr.transport}`);
}
const sub = TypedArrayPrototypeSubarray(buf, 0, nread);
return [sub, remoteAddr]; return [sub, remoteAddr];
} }
async send(p, opts) { send(p, addr) {
switch (this.addr.transport) { const args = { hostname: "127.0.0.1", ...addr, rid: this.rid };
case "udp": return opSend(args, p);
return await core.opAsync(
"op_net_send_udp",
this.rid,
{ hostname: opts.hostname ?? "127.0.0.1", port: opts.port },
p,
);
case "unixpacket":
return await core.opAsync(
"op_net_send_unixpacket",
this.rid,
opts.path,
p,
);
default:
throw new Error(`Unsupported transport: ${this.addr.transport}`);
}
} }
close() { close() {
@ -296,100 +282,40 @@
} }
} }
function listen(args) { function listen({ hostname, ...options }, constructor = Listener) {
switch (args.transport ?? "tcp") { const res = opListen({
case "tcp": { transport: "tcp",
const [rid, addr] = ops.op_net_listen_tcp({ hostname: typeof hostname === "undefined" ? "0.0.0.0" : hostname,
hostname: args.hostname ?? "0.0.0.0", ...options,
port: args.port, });
});
addr.transport = "tcp"; return new constructor(res.rid, res.localAddr);
return new Listener(rid, addr);
}
case "unix": {
const [rid, path] = ops.op_net_listen_unix(args.path);
const addr = {
transport: "unix",
path,
};
return new Listener(rid, addr);
}
default:
throw new TypeError(`Unsupported transport: '${transport}'`);
}
} }
function listenDatagram(args) { async function connect(options) {
switch (args.transport) { if (options.transport === "unix") {
case "udp": { const res = await opConnect(options);
const [rid, addr] = ops.op_net_listen_udp( return new UnixConn(res.rid, res.remoteAddr, res.localAddr);
{
hostname: args.hostname ?? "127.0.0.1",
port: args.port,
},
args.reuseAddress ?? false,
);
addr.transport = "udp";
return new Datagram(rid, addr);
}
case "unixpacket": {
const [rid, path] = ops.op_net_listen_unixpacket(args.path);
const addr = {
transport: "unixpacket",
path,
};
return new Datagram(rid, addr);
}
default:
throw new TypeError(`Unsupported transport: '${transport}'`);
} }
}
async function connect(args) { const res = await opConnect({
switch (args.transport ?? "tcp") { transport: "tcp",
case "tcp": { hostname: "127.0.0.1",
const [rid, localAddr, remoteAddr] = await core.opAsync( ...options,
"op_net_connect_tcp", });
{ return new TcpConn(res.rid, res.remoteAddr, res.localAddr);
hostname: args.hostname ?? "127.0.0.1",
port: args.port,
},
);
localAddr.transport = "tcp";
remoteAddr.transport = "tcp";
return new TcpConn(rid, remoteAddr, localAddr);
}
case "unix": {
const [rid, localAddr, remoteAddr] = await core.opAsync(
"op_net_connect_unix",
args.path,
);
return new UnixConn(
rid,
{ transport: "unix", path: remoteAddr },
{ transport: "unix", path: localAddr },
);
}
default:
throw new TypeError(`Unsupported transport: '${transport}'`);
}
}
function setup(unstable) {
if (!unstable) {
delete Listener.prototype.ref;
delete Listener.prototype.unref;
}
} }
window.__bootstrap.net = { window.__bootstrap.net = {
setup,
connect, connect,
Conn, Conn,
TcpConn, TcpConn,
UnixConn, UnixConn,
opConnect,
listen, listen,
listenDatagram, listenerRef,
listenerUnref,
opListen,
Listener, Listener,
shutdown, shutdown,
Datagram, Datagram,

View file

@ -5,7 +5,20 @@
const core = window.Deno.core; const core = window.Deno.core;
const ops = core.ops; const ops = core.ops;
const { Listener, Conn } = window.__bootstrap.net; const { Listener, Conn } = window.__bootstrap.net;
const { TypeError } = window.__bootstrap.primordials;
function opConnectTls(
args,
) {
return core.opAsync("op_tls_connect", args);
}
function opAcceptTLS(rid) {
return core.opAsync("op_tls_accept", rid);
}
function opListenTls(args) {
return ops.op_tls_listen(args);
}
function opStartTls(args) { function opStartTls(args) {
return core.opAsync("op_tls_start", args); return core.opAsync("op_tls_start", args);
@ -31,28 +44,23 @@
privateKey = undefined, privateKey = undefined,
alpnProtocols = undefined, alpnProtocols = undefined,
}) { }) {
if (transport !== "tcp") { const res = await opConnectTls({
throw new TypeError(`Unsupported transport: '${transport}'`); port,
} hostname,
const [rid, localAddr, remoteAddr] = await core.opAsync( transport,
"op_net_connect_tls", certFile,
{ hostname, port }, caCerts,
{ certFile, caCerts, certChain, privateKey, alpnProtocols }, certChain,
); privateKey,
localAddr.transport = "tcp"; alpnProtocols,
remoteAddr.transport = "tcp"; });
return new TlsConn(rid, remoteAddr, localAddr); return new TlsConn(res.rid, res.remoteAddr, res.localAddr);
} }
class TlsListener extends Listener { class TlsListener extends Listener {
async accept() { async accept() {
const [rid, localAddr, remoteAddr] = await core.opAsync( const res = await opAcceptTLS(this.rid);
"op_net_accept_tls", return new TlsConn(res.rid, res.remoteAddr, res.localAddr);
this.rid,
);
localAddr.transport = "tcp";
remoteAddr.transport = "tcp";
return new TlsConn(rid, remoteAddr, localAddr);
} }
} }
@ -66,14 +74,17 @@
transport = "tcp", transport = "tcp",
alpnProtocols = undefined, alpnProtocols = undefined,
}) { }) {
if (transport !== "tcp") { const res = opListenTls({
throw new TypeError(`Unsupported transport: '${transport}'`); port,
} cert,
const [rid, localAddr] = ops.op_net_listen_tls( certFile,
{ hostname, port }, key,
{ cert, certFile, key, keyFile, alpnProtocols }, keyFile,
); hostname,
return new TlsListener(rid, localAddr); transport,
alpnProtocols,
});
return new TlsListener(res.rid, res.localAddr);
} }
async function startTls( async function startTls(
@ -85,14 +96,14 @@
alpnProtocols = undefined, alpnProtocols = undefined,
} = {}, } = {},
) { ) {
const [rid, localAddr, remoteAddr] = await opStartTls({ const res = await opStartTls({
rid: conn.rid, rid: conn.rid,
hostname, hostname,
certFile, certFile,
caCerts, caCerts,
alpnProtocols, alpnProtocols,
}); });
return new TlsConn(rid, remoteAddr, localAddr); return new TlsConn(res.rid, res.remoteAddr, res.localAddr);
} }
window.__bootstrap.tls = { window.__bootstrap.tls = {

View file

@ -0,0 +1,60 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
"use strict";
((window) => {
const net = window.__bootstrap.net;
function listen(options) {
if (options.transport === "unix") {
const res = net.opListen(options);
return new Listener(res.rid, res.localAddr);
} else {
return net.listen(options, Listener);
}
}
function listenDatagram(
options,
) {
let res;
if (options.transport === "unixpacket") {
res = net.opListen(options);
} else {
res = net.opListen({
transport: "udp",
hostname: "127.0.0.1",
...options,
});
}
return new net.Datagram(res.rid, res.localAddr);
}
async function connect(
options,
) {
if (options.transport === "unix") {
const res = await net.opConnect(options);
return new net.Conn(res.rid, res.remoteAddr, res.localAddr);
} else {
return net.connect(options);
}
}
class Listener extends net.Listener {
ref() {
this[net.listenerRef]();
}
unref() {
this[net.listenerUnref]();
}
}
window.__bootstrap.netUnstable = {
connect,
listenDatagram,
listen,
Listener,
};
})(this);

View file

@ -9,21 +9,14 @@ This crate depends on following extensions:
Following ops are provided: Following ops are provided:
- "op_net_accept_tcp" - "op_net_accept"
- "op_net_accept_unix" - "op_net_connect"
- "op_net_connect_tcp" - "op_net_listen"
- "op_net_connect_unix" - "op_dgram_recv"
- "op_net_listen_tcp" - "op_dgram_send"
- "op_net_listen_udp"
- "op_net_listen_unix"
- "op_net_listen_unixpacket"
- "op_net_recv_udp"
- "op_net_recv_unixpacket"
- "op_net_send_udp"
- "op_net_send_unixpacket"
- "op_dns_resolve" - "op_dns_resolve"
- "op_net_connect_tls"
- "op_net_listen_tls"
- "op_net_accept_tls"
- "op_tls_start" - "op_tls_start"
- "op_tls_connect"
- "op_tls_listen"
- "op_tls_accept"
- "op_tls_handshake" - "op_tls_handshake"

View file

@ -90,6 +90,7 @@ pub fn init<P: NetPermissions + 'static>(
prefix "deno:ext/net", prefix "deno:ext/net",
"01_net.js", "01_net.js",
"02_tls.js", "02_tls.js",
"04_net_unstable.js",
)) ))
.ops(ops) .ops(ops)
.state(move |state| { .state(move |state| {

View file

@ -7,6 +7,7 @@ use crate::NetPermissions;
use deno_core::error::bad_resource; use deno_core::error::bad_resource;
use deno_core::error::custom_error; use deno_core::error::custom_error;
use deno_core::error::generic_error; use deno_core::error::generic_error;
use deno_core::error::type_error;
use deno_core::error::AnyError; use deno_core::error::AnyError;
use deno_core::op; use deno_core::op;
@ -20,6 +21,7 @@ use deno_core::RcRef;
use deno_core::Resource; use deno_core::Resource;
use deno_core::ResourceId; use deno_core::ResourceId;
use deno_core::ZeroCopyBuf; use deno_core::ZeroCopyBuf;
use log::debug;
use serde::Deserialize; use serde::Deserialize;
use serde::Serialize; use serde::Serialize;
use socket2::Domain; use socket2::Domain;
@ -43,51 +45,69 @@ use trust_dns_resolver::error::ResolveErrorKind;
use trust_dns_resolver::system_conf; use trust_dns_resolver::system_conf;
use trust_dns_resolver::AsyncResolver; use trust_dns_resolver::AsyncResolver;
#[cfg(unix)]
use super::ops_unix as net_unix;
#[cfg(unix)]
use crate::io::UnixStreamResource;
#[cfg(unix)]
use std::path::Path;
pub fn init<P: NetPermissions + 'static>() -> Vec<OpDecl> { pub fn init<P: NetPermissions + 'static>() -> Vec<OpDecl> {
vec![ vec![
op_net_accept_tcp::decl(), op_net_accept::decl(),
#[cfg(unix)] op_net_connect::decl::<P>(),
crate::ops_unix::op_net_accept_unix::decl(), op_net_listen::decl::<P>(),
op_net_connect_tcp::decl::<P>(), op_dgram_recv::decl(),
#[cfg(unix)] op_dgram_send::decl::<P>(),
crate::ops_unix::op_net_connect_unix::decl::<P>(),
op_net_listen_tcp::decl::<P>(),
op_net_listen_udp::decl::<P>(),
#[cfg(unix)]
crate::ops_unix::op_net_listen_unix::decl::<P>(),
#[cfg(unix)]
crate::ops_unix::op_net_listen_unixpacket::decl::<P>(),
op_net_recv_udp::decl(),
#[cfg(unix)]
crate::ops_unix::op_net_recv_unixpacket::decl(),
op_net_send_udp::decl::<P>(),
#[cfg(unix)]
crate::ops_unix::op_net_send_unixpacket::decl::<P>(),
op_dns_resolve::decl::<P>(), op_dns_resolve::decl::<P>(),
op_set_nodelay::decl::<P>(), op_set_nodelay::decl::<P>(),
op_set_keepalive::decl::<P>(), op_set_keepalive::decl::<P>(),
] ]
} }
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct OpConn {
pub rid: ResourceId,
pub remote_addr: Option<OpAddr>,
pub local_addr: Option<OpAddr>,
}
#[derive(Serialize)]
#[serde(tag = "transport", rename_all = "lowercase")]
pub enum OpAddr {
Tcp(IpAddr),
Udp(IpAddr),
#[cfg(unix)]
Unix(net_unix::UnixAddr),
#[cfg(unix)]
UnixPacket(net_unix::UnixAddr),
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
/// A received datagram packet (from udp or unixpacket)
pub struct OpPacket {
pub size: usize,
pub remote_addr: OpAddr,
}
#[derive(Serialize, Clone, Debug)] #[derive(Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct TlsHandshakeInfo { pub struct TlsHandshakeInfo {
pub alpn_protocol: Option<ByteString>, pub alpn_protocol: Option<ByteString>,
} }
#[derive(Deserialize, Serialize)] #[derive(Serialize)]
pub struct IpAddr { pub struct IpAddr {
pub hostname: String, pub hostname: String,
pub port: u16, pub port: u16,
} }
impl From<SocketAddr> for IpAddr { #[derive(Deserialize)]
fn from(addr: SocketAddr) -> Self { pub(crate) struct AcceptArgs {
Self { pub rid: ResourceId,
hostname: addr.ip().to_string(), pub transport: String,
port: addr.port(),
}
}
} }
pub(crate) fn accept_err(e: std::io::Error) -> AnyError { pub(crate) fn accept_err(e: std::io::Error) -> AnyError {
@ -99,11 +119,13 @@ pub(crate) fn accept_err(e: std::io::Error) -> AnyError {
} }
} }
#[op] async fn accept_tcp(
async fn op_net_accept_tcp(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
rid: ResourceId, args: AcceptArgs,
) -> Result<(ResourceId, IpAddr, IpAddr), AnyError> { _: (),
) -> Result<OpConn, AnyError> {
let rid = args.rid;
let resource = state let resource = state
.borrow() .borrow()
.resource_table .resource_table
@ -125,15 +147,51 @@ async fn op_net_accept_tcp(
let rid = state let rid = state
.resource_table .resource_table
.add(TcpStreamResource::new(tcp_stream.into_split())); .add(TcpStreamResource::new(tcp_stream.into_split()));
Ok((rid, IpAddr::from(local_addr), IpAddr::from(remote_addr))) Ok(OpConn {
rid,
local_addr: Some(OpAddr::Tcp(IpAddr {
hostname: local_addr.ip().to_string(),
port: local_addr.port(),
})),
remote_addr: Some(OpAddr::Tcp(IpAddr {
hostname: remote_addr.ip().to_string(),
port: remote_addr.port(),
})),
})
} }
#[op] #[op]
async fn op_net_recv_udp( async fn op_net_accept(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
rid: ResourceId, args: AcceptArgs,
mut buf: ZeroCopyBuf, ) -> Result<OpConn, AnyError> {
) -> Result<(usize, IpAddr), AnyError> { match args.transport.as_str() {
"tcp" => accept_tcp(state, args, ()).await,
#[cfg(unix)]
"unix" => net_unix::accept_unix(state, args, ()).await,
other => Err(bad_transport(other)),
}
}
fn bad_transport(transport: &str) -> AnyError {
generic_error(format!("Unsupported transport protocol {}", transport))
}
#[derive(Deserialize)]
pub(crate) struct ReceiveArgs {
pub rid: ResourceId,
pub transport: String,
}
async fn receive_udp(
state: Rc<RefCell<OpState>>,
args: ReceiveArgs,
zero_copy: ZeroCopyBuf,
) -> Result<OpPacket, AnyError> {
let mut zero_copy = zero_copy.clone();
let rid = args.rid;
let resource = state let resource = state
.borrow_mut() .borrow_mut()
.resource_table .resource_table
@ -141,75 +199,192 @@ async fn op_net_recv_udp(
.map_err(|_| bad_resource("Socket has been closed"))?; .map_err(|_| bad_resource("Socket has been closed"))?;
let socket = RcRef::map(&resource, |r| &r.socket).borrow().await; let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
let cancel_handle = RcRef::map(&resource, |r| &r.cancel); let cancel_handle = RcRef::map(&resource, |r| &r.cancel);
let (nread, remote_addr) = socket let (size, remote_addr) = socket
.recv_from(&mut buf) .recv_from(&mut zero_copy)
.try_or_cancel(cancel_handle) .try_or_cancel(cancel_handle)
.await?; .await?;
Ok((nread, IpAddr::from(remote_addr))) Ok(OpPacket {
size,
remote_addr: OpAddr::Udp(IpAddr {
hostname: remote_addr.ip().to_string(),
port: remote_addr.port(),
}),
})
} }
#[op] #[op]
async fn op_net_send_udp<NP>( async fn op_dgram_recv(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
args: ReceiveArgs,
zero_copy: ZeroCopyBuf,
) -> Result<OpPacket, AnyError> {
match args.transport.as_str() {
"udp" => receive_udp(state, args, zero_copy).await,
#[cfg(unix)]
"unixpacket" => net_unix::receive_unix_packet(state, args, zero_copy).await,
other => Err(bad_transport(other)),
}
}
#[derive(Deserialize)]
struct SendArgs {
rid: ResourceId, rid: ResourceId,
addr: IpAddr, transport: String,
#[serde(flatten)]
transport_args: ArgsEnum,
}
#[op]
async fn op_dgram_send<NP>(
state: Rc<RefCell<OpState>>,
args: SendArgs,
zero_copy: ZeroCopyBuf, zero_copy: ZeroCopyBuf,
) -> Result<usize, AnyError> ) -> Result<usize, AnyError>
where where
NP: NetPermissions + 'static, NP: NetPermissions + 'static,
{ {
{ let zero_copy = zero_copy.clone();
let mut s = state.borrow_mut();
s.borrow_mut::<NP>().check_net( match args {
&(&addr.hostname, Some(addr.port)), SendArgs {
"Deno.DatagramConn.send()", rid,
)?; transport,
transport_args: ArgsEnum::Ip(args),
} if transport == "udp" => {
{
let mut s = state.borrow_mut();
s.borrow_mut::<NP>().check_net(
&(&args.hostname, Some(args.port)),
"Deno.DatagramConn.send()",
)?;
}
let addr = resolve_addr(&args.hostname, args.port)
.await?
.next()
.ok_or_else(|| generic_error("No resolved address found"))?;
let resource = state
.borrow_mut()
.resource_table
.get::<UdpSocketResource>(rid)
.map_err(|_| bad_resource("Socket has been closed"))?;
let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
let byte_length = socket.send_to(&zero_copy, &addr).await?;
Ok(byte_length)
}
#[cfg(unix)]
SendArgs {
rid,
transport,
transport_args: ArgsEnum::Unix(args),
} if transport == "unixpacket" => {
let address_path = Path::new(&args.path);
{
let mut s = state.borrow_mut();
s.borrow_mut::<NP>()
.check_write(address_path, "Deno.DatagramConn.send()")?;
}
let resource = state
.borrow()
.resource_table
.get::<net_unix::UnixDatagramResource>(rid)
.map_err(|_| custom_error("NotConnected", "Socket has been closed"))?;
let socket = RcRef::map(&resource, |r| &r.socket)
.try_borrow_mut()
.ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
let byte_length = socket.send_to(&zero_copy, address_path).await?;
Ok(byte_length)
}
_ => Err(type_error("Wrong argument format!")),
} }
let addr = resolve_addr(&addr.hostname, addr.port) }
.await?
.next()
.ok_or_else(|| generic_error("No resolved address found"))?;
let resource = state #[derive(Deserialize)]
.borrow_mut() pub struct ConnectArgs {
.resource_table transport: String,
.get::<UdpSocketResource>(rid) #[serde(flatten)]
.map_err(|_| bad_resource("Socket has been closed"))?; transport_args: ArgsEnum,
let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
let nwritten = socket.send_to(&zero_copy, &addr).await?;
Ok(nwritten)
} }
#[op] #[op]
pub async fn op_net_connect_tcp<NP>( pub async fn op_net_connect<NP>(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
addr: IpAddr, args: ConnectArgs,
) -> Result<(ResourceId, IpAddr, IpAddr), AnyError> ) -> Result<OpConn, AnyError>
where where
NP: NetPermissions + 'static, NP: NetPermissions + 'static,
{ {
{ match args {
let mut state_ = state.borrow_mut(); ConnectArgs {
state_ transport,
.borrow_mut::<NP>() transport_args: ArgsEnum::Ip(args),
.check_net(&(&addr.hostname, Some(addr.port)), "Deno.connect()")?; } if transport == "tcp" => {
{
let mut state_ = state.borrow_mut();
state_
.borrow_mut::<NP>()
.check_net(&(&args.hostname, Some(args.port)), "Deno.connect()")?;
}
let addr = resolve_addr(&args.hostname, args.port)
.await?
.next()
.ok_or_else(|| generic_error("No resolved address found"))?;
let tcp_stream = TcpStream::connect(&addr).await?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut state_ = state.borrow_mut();
let rid = state_
.resource_table
.add(TcpStreamResource::new(tcp_stream.into_split()));
Ok(OpConn {
rid,
local_addr: Some(OpAddr::Tcp(IpAddr {
hostname: local_addr.ip().to_string(),
port: local_addr.port(),
})),
remote_addr: Some(OpAddr::Tcp(IpAddr {
hostname: remote_addr.ip().to_string(),
port: remote_addr.port(),
})),
})
}
#[cfg(unix)]
ConnectArgs {
transport,
transport_args: ArgsEnum::Unix(args),
} if transport == "unix" => {
let address_path = Path::new(&args.path);
super::check_unstable2(&state, "Deno.connect");
{
let mut state_ = state.borrow_mut();
state_
.borrow_mut::<NP>()
.check_read(address_path, "Deno.connect()")?;
state_
.borrow_mut::<NP>()
.check_write(address_path, "Deno.connect()")?;
}
let path = args.path;
let unix_stream = net_unix::UnixStream::connect(Path::new(&path)).await?;
let local_addr = unix_stream.local_addr()?;
let remote_addr = unix_stream.peer_addr()?;
let mut state_ = state.borrow_mut();
let resource = UnixStreamResource::new(unix_stream.into_split());
let rid = state_.resource_table.add(resource);
Ok(OpConn {
rid,
local_addr: Some(OpAddr::Unix(net_unix::UnixAddr {
path: local_addr.as_pathname().and_then(net_unix::pathstring),
})),
remote_addr: Some(OpAddr::Unix(net_unix::UnixAddr {
path: remote_addr.as_pathname().and_then(net_unix::pathstring),
})),
})
}
_ => Err(type_error("Wrong argument format!")),
} }
let addr = resolve_addr(&addr.hostname, addr.port)
.await?
.next()
.ok_or_else(|| generic_error("No resolved address found"))?;
let tcp_stream = TcpStream::connect(&addr).await?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut state_ = state.borrow_mut();
let rid = state_
.resource_table
.add(TcpStreamResource::new(tcp_stream.into_split()));
Ok((rid, IpAddr::from(local_addr), IpAddr::from(remote_addr)))
} }
pub struct TcpListenerResource { pub struct TcpListenerResource {
@ -242,20 +417,33 @@ impl Resource for UdpSocketResource {
} }
} }
#[op] #[derive(Deserialize)]
fn op_net_listen_tcp<NP>( #[serde(rename_all = "camelCase")]
struct IpListenArgs {
hostname: String,
port: u16,
reuse_address: Option<bool>,
}
#[derive(Deserialize)]
#[serde(untagged)]
enum ArgsEnum {
Ip(IpListenArgs),
#[cfg(unix)]
Unix(net_unix::UnixListenArgs),
}
#[derive(Deserialize)]
struct ListenArgs {
transport: String,
#[serde(flatten)]
transport_args: ArgsEnum,
}
fn listen_tcp(
state: &mut OpState, state: &mut OpState,
addr: IpAddr, addr: SocketAddr,
) -> Result<(ResourceId, IpAddr), AnyError> ) -> Result<(u32, SocketAddr), AnyError> {
where
NP: NetPermissions + 'static,
{
state
.borrow_mut::<NP>()
.check_net(&(&addr.hostname, Some(addr.port)), "Deno.listen()")?;
let addr = resolve_addr_sync(&addr.hostname, addr.port)?
.next()
.ok_or_else(|| generic_error("No resolved address found"))?;
let domain = if addr.is_ipv4() { let domain = if addr.is_ipv4() {
Domain::IPV4 Domain::IPV4
} else { } else {
@ -277,33 +465,21 @@ where
}; };
let rid = state.resource_table.add(listener_resource); let rid = state.resource_table.add(listener_resource);
Ok((rid, IpAddr::from(local_addr))) Ok((rid, local_addr))
} }
#[op] fn listen_udp(
fn op_net_listen_udp<NP>(
state: &mut OpState, state: &mut OpState,
addr: IpAddr, addr: SocketAddr,
reuse_address: bool, reuse_address: Option<bool>,
) -> Result<(ResourceId, IpAddr), AnyError> ) -> Result<(u32, SocketAddr), AnyError> {
where
NP: NetPermissions + 'static,
{
super::check_unstable(state, "Deno.listenDatagram");
state
.borrow_mut::<NP>()
.check_net(&(&addr.hostname, Some(addr.port)), "Deno.listenDatagram()")?;
let addr = resolve_addr_sync(&addr.hostname, addr.port)?
.next()
.ok_or_else(|| generic_error("No resolved address found"))?;
let domain = if addr.is_ipv4() { let domain = if addr.is_ipv4() {
Domain::IPV4 Domain::IPV4
} else { } else {
Domain::IPV6 Domain::IPV6
}; };
let socket_tmp = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?; let socket_tmp = Socket::new(domain, Type::DGRAM, Some(Protocol::UDP))?;
if reuse_address { if reuse_address.unwrap_or(false) {
// This logic is taken from libuv: // This logic is taken from libuv:
// //
// On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional // On the BSDs, SO_REUSEPORT implies SO_REUSEADDR but with some additional
@ -332,7 +508,110 @@ where
}; };
let rid = state.resource_table.add(socket_resource); let rid = state.resource_table.add(socket_resource);
Ok((rid, IpAddr::from(local_addr))) Ok((rid, local_addr))
}
#[op]
fn op_net_listen<NP>(
state: &mut OpState,
args: ListenArgs,
) -> Result<OpConn, AnyError>
where
NP: NetPermissions + 'static,
{
match args {
ListenArgs {
transport,
transport_args: ArgsEnum::Ip(args),
} => {
{
if transport == "udp" {
super::check_unstable(state, "Deno.listenDatagram");
}
state.borrow_mut::<NP>().check_net(
&(&args.hostname, Some(args.port)),
"Deno.listenDatagram()",
)?;
}
let addr = resolve_addr_sync(&args.hostname, args.port)?
.next()
.ok_or_else(|| generic_error("No resolved address found"))?;
let (rid, local_addr) = if transport == "tcp" {
if args.reuse_address.is_some() {
return Err(generic_error(
"The reuseAddress option is not supported for TCP",
));
}
listen_tcp(state, addr)?
} else {
listen_udp(state, addr, args.reuse_address)?
};
debug!(
"New listener {} {}:{}",
rid,
local_addr.ip().to_string(),
local_addr.port()
);
let ip_addr = IpAddr {
hostname: local_addr.ip().to_string(),
port: local_addr.port(),
};
Ok(OpConn {
rid,
local_addr: Some(match transport.as_str() {
"udp" => OpAddr::Udp(ip_addr),
"tcp" => OpAddr::Tcp(ip_addr),
// NOTE: This could be unreachable!()
other => return Err(bad_transport(other)),
}),
remote_addr: None,
})
}
#[cfg(unix)]
ListenArgs {
transport,
transport_args: ArgsEnum::Unix(args),
} if transport == "unix" || transport == "unixpacket" => {
let address_path = Path::new(&args.path);
{
if transport == "unix" {
super::check_unstable(state, "Deno.listen");
}
if transport == "unixpacket" {
super::check_unstable(state, "Deno.listenDatagram");
}
let api_name = if transport == "unix" {
"Deno.listen()"
} else {
"Deno.listenDatagram()"
};
let permissions = state.borrow_mut::<NP>();
permissions.check_read(address_path, api_name)?;
permissions.check_write(address_path, api_name)?;
}
let (rid, local_addr) = if transport == "unix" {
net_unix::listen_unix(state, address_path)?
} else {
net_unix::listen_unix_packet(state, address_path)?
};
debug!("New listener {} {:?}", rid, local_addr);
let unix_addr = net_unix::UnixAddr {
path: local_addr.as_pathname().and_then(net_unix::pathstring),
};
Ok(OpConn {
rid,
local_addr: Some(match transport.as_str() {
"unix" => OpAddr::Unix(unix_addr),
"unixpacket" => OpAddr::UnixPacket(unix_addr),
other => return Err(bad_transport(other)),
}),
remote_addr: None,
})
}
#[cfg(unix)]
_ => Err(type_error("Wrong argument format!")),
}
} }
#[derive(Serialize, Eq, PartialEq, Debug)] #[derive(Serialize, Eq, PartialEq, Debug)]
@ -849,15 +1128,21 @@ mod tests {
let conn_state = runtime.op_state(); let conn_state = runtime.op_state();
let server_addr: Vec<&str> = clone_addr.split(':').collect(); let server_addr: Vec<&str> = clone_addr.split(':').collect();
let ip_addr = IpAddr { let ip_args = IpListenArgs {
hostname: String::from(server_addr[0]), hostname: String::from(server_addr[0]),
port: server_addr[1].parse().unwrap(), port: server_addr[1].parse().unwrap(),
reuse_address: None,
};
let connect_args = ConnectArgs {
transport: String::from("tcp"),
transport_args: ArgsEnum::Ip(ip_args),
}; };
let connect_fut = let connect_fut =
op_net_connect_tcp::call::<TestPermission>(conn_state, ip_addr); op_net_connect::call::<TestPermission>(conn_state, connect_args);
let (rid, _, _) = connect_fut.await.unwrap(); let conn = connect_fut.await.unwrap();
let rid = conn.rid;
let state = runtime.op_state(); let state = runtime.op_state();
set_sockopt_fn(&mut state.borrow_mut(), rid); set_sockopt_fn(&mut state.borrow_mut(), rid);

View file

@ -2,6 +2,8 @@
use crate::io::TcpStreamResource; use crate::io::TcpStreamResource;
use crate::ops::IpAddr; use crate::ops::IpAddr;
use crate::ops::OpAddr;
use crate::ops::OpConn;
use crate::ops::TlsHandshakeInfo; use crate::ops::TlsHandshakeInfo;
use crate::resolve_addr::resolve_addr; use crate::resolve_addr::resolve_addr;
use crate::resolve_addr::resolve_addr_sync; use crate::resolve_addr::resolve_addr_sync;
@ -656,9 +658,9 @@ impl Write for ImplementWriteTrait<'_, TcpStream> {
pub fn init<P: NetPermissions + 'static>() -> Vec<OpDecl> { pub fn init<P: NetPermissions + 'static>() -> Vec<OpDecl> {
vec![ vec![
op_tls_start::decl::<P>(), op_tls_start::decl::<P>(),
op_net_connect_tls::decl::<P>(), op_tls_connect::decl::<P>(),
op_net_listen_tls::decl::<P>(), op_tls_listen::decl::<P>(),
op_net_accept_tls::decl(), op_tls_accept::decl(),
op_tls_handshake::decl(), op_tls_handshake::decl(),
] ]
} }
@ -749,6 +751,9 @@ impl Resource for TlsStreamResource {
#[derive(Deserialize)] #[derive(Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct ConnectTlsArgs { pub struct ConnectTlsArgs {
transport: String,
hostname: String,
port: u16,
cert_file: Option<String>, cert_file: Option<String>,
ca_certs: Vec<String>, ca_certs: Vec<String>,
cert_chain: Option<String>, cert_chain: Option<String>,
@ -769,7 +774,7 @@ pub struct StartTlsArgs {
pub async fn op_tls_start<NP>( pub async fn op_tls_start<NP>(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
args: StartTlsArgs, args: StartTlsArgs,
) -> Result<(ResourceId, IpAddr, IpAddr), AnyError> ) -> Result<OpConn, AnyError>
where where
NP: NetPermissions + 'static, NP: NetPermissions + 'static,
{ {
@ -848,18 +853,33 @@ where
.add(TlsStreamResource::new(tls_stream.into_split())) .add(TlsStreamResource::new(tls_stream.into_split()))
}; };
Ok((rid, IpAddr::from(local_addr), IpAddr::from(remote_addr))) Ok(OpConn {
rid,
local_addr: Some(OpAddr::Tcp(IpAddr {
hostname: local_addr.ip().to_string(),
port: local_addr.port(),
})),
remote_addr: Some(OpAddr::Tcp(IpAddr {
hostname: remote_addr.ip().to_string(),
port: remote_addr.port(),
})),
})
} }
#[op] #[op]
pub async fn op_net_connect_tls<NP>( pub async fn op_tls_connect<NP>(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
addr: IpAddr,
args: ConnectTlsArgs, args: ConnectTlsArgs,
) -> Result<(ResourceId, IpAddr, IpAddr), AnyError> ) -> Result<OpConn, AnyError>
where where
NP: NetPermissions + 'static, NP: NetPermissions + 'static,
{ {
assert_eq!(args.transport, "tcp");
let hostname = match &*args.hostname {
"" => "localhost",
n => n,
};
let port = args.port;
let cert_file = args.cert_file.as_deref(); let cert_file = args.cert_file.as_deref();
let unsafely_ignore_certificate_errors = state let unsafely_ignore_certificate_errors = state
.borrow() .borrow()
@ -876,8 +896,7 @@ where
{ {
let mut s = state.borrow_mut(); let mut s = state.borrow_mut();
let permissions = s.borrow_mut::<NP>(); let permissions = s.borrow_mut::<NP>();
permissions permissions.check_net(&(hostname, Some(port)), "Deno.connectTls()")?;
.check_net(&(&addr.hostname, Some(addr.port)), "Deno.connectTls()")?;
if let Some(path) = cert_file { if let Some(path) = cert_file {
permissions.check_read(Path::new(path), "Deno.connectTls()")?; permissions.check_read(Path::new(path), "Deno.connectTls()")?;
} }
@ -900,9 +919,10 @@ where
.borrow::<DefaultTlsOptions>() .borrow::<DefaultTlsOptions>()
.root_cert_store .root_cert_store
.clone(); .clone();
let hostname_dns = ServerName::try_from(&*addr.hostname) let hostname_dns =
.map_err(|_| invalid_hostname(&addr.hostname))?; ServerName::try_from(hostname).map_err(|_| invalid_hostname(hostname))?;
let connect_addr = resolve_addr(&addr.hostname, addr.port)
let connect_addr = resolve_addr(hostname, port)
.await? .await?
.next() .next()
.ok_or_else(|| generic_error("No resolved address found"))?; .ok_or_else(|| generic_error("No resolved address found"))?;
@ -948,7 +968,17 @@ where
.add(TlsStreamResource::new(tls_stream.into_split())) .add(TlsStreamResource::new(tls_stream.into_split()))
}; };
Ok((rid, IpAddr::from(local_addr), IpAddr::from(remote_addr))) Ok(OpConn {
rid,
local_addr: Some(OpAddr::Tcp(IpAddr {
hostname: local_addr.ip().to_string(),
port: local_addr.port(),
})),
remote_addr: Some(OpAddr::Tcp(IpAddr {
hostname: remote_addr.ip().to_string(),
port: remote_addr.port(),
})),
})
} }
fn load_certs_from_file(path: &str) -> Result<Vec<Certificate>, AnyError> { fn load_certs_from_file(path: &str) -> Result<Vec<Certificate>, AnyError> {
@ -983,6 +1013,9 @@ impl Resource for TlsListenerResource {
#[derive(Deserialize)] #[derive(Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct ListenTlsArgs { pub struct ListenTlsArgs {
transport: String,
hostname: String,
port: u16,
cert: Option<String>, cert: Option<String>,
// TODO(kt3k): Remove this option at v2.0. // TODO(kt3k): Remove this option at v2.0.
cert_file: Option<String>, cert_file: Option<String>,
@ -993,14 +1026,16 @@ pub struct ListenTlsArgs {
} }
#[op] #[op]
pub fn op_net_listen_tls<NP>( pub fn op_tls_listen<NP>(
state: &mut OpState, state: &mut OpState,
addr: IpAddr,
args: ListenTlsArgs, args: ListenTlsArgs,
) -> Result<(ResourceId, IpAddr), AnyError> ) -> Result<OpConn, AnyError>
where where
NP: NetPermissions + 'static, NP: NetPermissions + 'static,
{ {
assert_eq!(args.transport, "tcp");
let hostname = &*args.hostname;
let port = args.port;
let cert_file = args.cert_file.as_deref(); let cert_file = args.cert_file.as_deref();
let key_file = args.key_file.as_deref(); let key_file = args.key_file.as_deref();
let cert = args.cert.as_deref(); let cert = args.cert.as_deref();
@ -1008,8 +1043,7 @@ where
{ {
let permissions = state.borrow_mut::<NP>(); let permissions = state.borrow_mut::<NP>();
permissions permissions.check_net(&(hostname, Some(port)), "Deno.listenTls()")?;
.check_net(&(&addr.hostname, Some(addr.port)), "Deno.listenTls()")?;
if let Some(path) = cert_file { if let Some(path) = cert_file {
permissions.check_read(Path::new(path), "Deno.listenTls()")?; permissions.check_read(Path::new(path), "Deno.listenTls()")?;
} }
@ -1050,7 +1084,7 @@ where
alpn_protocols.into_iter().map(|s| s.into_bytes()).collect(); alpn_protocols.into_iter().map(|s| s.into_bytes()).collect();
} }
let bind_addr = resolve_addr_sync(&addr.hostname, addr.port)? let bind_addr = resolve_addr_sync(hostname, port)?
.next() .next()
.ok_or_else(|| generic_error("No resolved address found"))?; .ok_or_else(|| generic_error("No resolved address found"))?;
let domain = if bind_addr.is_ipv4() { let domain = if bind_addr.is_ipv4() {
@ -1077,14 +1111,21 @@ where
let rid = state.resource_table.add(tls_listener_resource); let rid = state.resource_table.add(tls_listener_resource);
Ok((rid, IpAddr::from(local_addr))) Ok(OpConn {
rid,
local_addr: Some(OpAddr::Tcp(IpAddr {
hostname: local_addr.ip().to_string(),
port: local_addr.port(),
})),
remote_addr: None,
})
} }
#[op] #[op]
pub async fn op_net_accept_tls( pub async fn op_tls_accept(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
rid: ResourceId, rid: ResourceId,
) -> Result<(ResourceId, IpAddr, IpAddr), AnyError> { ) -> Result<OpConn, AnyError> {
let resource = state let resource = state
.borrow() .borrow()
.resource_table .resource_table
@ -1118,7 +1159,17 @@ pub async fn op_net_accept_tls(
.add(TlsStreamResource::new(tls_stream.into_split())) .add(TlsStreamResource::new(tls_stream.into_split()))
}; };
Ok((rid, IpAddr::from(local_addr), IpAddr::from(remote_addr))) Ok(OpConn {
rid,
local_addr: Some(OpAddr::Tcp(IpAddr {
hostname: local_addr.ip().to_string(),
port: local_addr.port(),
})),
remote_addr: Some(OpAddr::Tcp(IpAddr {
hostname: remote_addr.ip().to_string(),
port: remote_addr.port(),
})),
})
} }
#[op] #[op]

View file

@ -1,18 +1,20 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. // Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
use crate::io::UnixStreamResource; use crate::io::UnixStreamResource;
use crate::NetPermissions; use crate::ops::AcceptArgs;
use crate::ops::OpAddr;
use crate::ops::OpConn;
use crate::ops::OpPacket;
use crate::ops::ReceiveArgs;
use deno_core::error::bad_resource; use deno_core::error::bad_resource;
use deno_core::error::custom_error; use deno_core::error::custom_error;
use deno_core::error::AnyError; use deno_core::error::AnyError;
use deno_core::op;
use deno_core::AsyncRefCell; use deno_core::AsyncRefCell;
use deno_core::CancelHandle; use deno_core::CancelHandle;
use deno_core::CancelTryFuture; use deno_core::CancelTryFuture;
use deno_core::OpState; use deno_core::OpState;
use deno_core::RcRef; use deno_core::RcRef;
use deno_core::Resource; use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ZeroCopyBuf; use deno_core::ZeroCopyBuf;
use serde::Deserialize; use serde::Deserialize;
use serde::Serialize; use serde::Serialize;
@ -72,11 +74,13 @@ pub struct UnixListenArgs {
pub path: String, pub path: String,
} }
#[op] pub(crate) async fn accept_unix(
pub async fn op_net_accept_unix(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
rid: ResourceId, args: AcceptArgs,
) -> Result<(ResourceId, Option<String>, Option<String>), AnyError> { _: (),
) -> Result<OpConn, AnyError> {
let rid = args.rid;
let resource = state let resource = state
.borrow() .borrow()
.resource_table .resource_table
@ -94,52 +98,27 @@ pub async fn op_net_accept_unix(
let local_addr = unix_stream.local_addr()?; let local_addr = unix_stream.local_addr()?;
let remote_addr = unix_stream.peer_addr()?; let remote_addr = unix_stream.peer_addr()?;
let local_addr_path = local_addr.as_pathname().map(pathstring).transpose()?;
let remote_addr_path =
remote_addr.as_pathname().map(pathstring).transpose()?;
let resource = UnixStreamResource::new(unix_stream.into_split()); let resource = UnixStreamResource::new(unix_stream.into_split());
let mut state = state.borrow_mut(); let mut state = state.borrow_mut();
let rid = state.resource_table.add(resource); let rid = state.resource_table.add(resource);
Ok((rid, local_addr_path, remote_addr_path)) Ok(OpConn {
rid,
local_addr: Some(OpAddr::Unix(UnixAddr {
path: local_addr.as_pathname().and_then(pathstring),
})),
remote_addr: Some(OpAddr::Unix(UnixAddr {
path: remote_addr.as_pathname().and_then(pathstring),
})),
})
} }
#[op] pub(crate) async fn receive_unix_packet(
pub async fn op_net_connect_unix<NP>(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
path: String, args: ReceiveArgs,
) -> Result<(ResourceId, Option<String>, Option<String>), AnyError>
where
NP: NetPermissions + 'static,
{
let address_path = Path::new(&path);
super::check_unstable2(&state, "Deno.connect");
{
let mut state_ = state.borrow_mut();
state_
.borrow_mut::<NP>()
.check_read(address_path, "Deno.connect()")?;
state_
.borrow_mut::<NP>()
.check_write(address_path, "Deno.connect()")?;
}
let unix_stream = UnixStream::connect(Path::new(&path)).await?;
let local_addr = unix_stream.local_addr()?;
let remote_addr = unix_stream.peer_addr()?;
let local_addr_path = local_addr.as_pathname().map(pathstring).transpose()?;
let remote_addr_path =
remote_addr.as_pathname().map(pathstring).transpose()?;
let mut state_ = state.borrow_mut();
let resource = UnixStreamResource::new(unix_stream.into_split());
let rid = state_.resource_table.add(resource);
Ok((rid, local_addr_path, remote_addr_path))
}
#[op]
pub async fn op_net_recv_unixpacket(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
mut buf: ZeroCopyBuf, mut buf: ZeroCopyBuf,
) -> Result<(usize, Option<String>), AnyError> { ) -> Result<OpPacket, AnyError> {
let rid = args.rid;
let resource = state let resource = state
.borrow() .borrow()
.resource_table .resource_table
@ -149,90 +128,46 @@ pub async fn op_net_recv_unixpacket(
.try_borrow_mut() .try_borrow_mut()
.ok_or_else(|| custom_error("Busy", "Socket already in use"))?; .ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
let cancel = RcRef::map(resource, |r| &r.cancel); let cancel = RcRef::map(resource, |r| &r.cancel);
let (nread, remote_addr) = let (size, remote_addr) =
socket.recv_from(&mut buf).try_or_cancel(cancel).await?; socket.recv_from(&mut buf).try_or_cancel(cancel).await?;
let path = remote_addr.as_pathname().map(pathstring).transpose()?; Ok(OpPacket {
Ok((nread, path)) size,
remote_addr: OpAddr::UnixPacket(UnixAddr {
path: remote_addr.as_pathname().and_then(pathstring),
}),
})
} }
#[op] pub fn listen_unix(
async fn op_net_send_unixpacket<NP>(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
path: String,
zero_copy: ZeroCopyBuf,
) -> Result<usize, AnyError>
where
NP: NetPermissions + 'static,
{
let address_path = Path::new(&path);
{
let mut s = state.borrow_mut();
s.borrow_mut::<NP>()
.check_write(address_path, "Deno.DatagramConn.send()")?;
}
let resource = state
.borrow()
.resource_table
.get::<UnixDatagramResource>(rid)
.map_err(|_| custom_error("NotConnected", "Socket has been closed"))?;
let socket = RcRef::map(&resource, |r| &r.socket)
.try_borrow_mut()
.ok_or_else(|| custom_error("Busy", "Socket already in use"))?;
let nwritten = socket.send_to(&zero_copy, address_path).await?;
Ok(nwritten)
}
#[op]
pub fn op_net_listen_unix<NP>(
state: &mut OpState, state: &mut OpState,
path: String, addr: &Path,
) -> Result<(ResourceId, Option<String>), AnyError> ) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> {
where let listener = UnixListener::bind(&addr)?;
NP: NetPermissions + 'static,
{
let address_path = Path::new(&path);
super::check_unstable(state, "Deno.listen");
let permissions = state.borrow_mut::<NP>();
permissions.check_read(address_path, "Deno.listen()")?;
permissions.check_write(address_path, "Deno.listen()")?;
let listener = UnixListener::bind(&address_path)?;
let local_addr = listener.local_addr()?; let local_addr = listener.local_addr()?;
let pathname = local_addr.as_pathname().map(pathstring).transpose()?;
let listener_resource = UnixListenerResource { let listener_resource = UnixListenerResource {
listener: AsyncRefCell::new(listener), listener: AsyncRefCell::new(listener),
cancel: Default::default(), cancel: Default::default(),
}; };
let rid = state.resource_table.add(listener_resource); let rid = state.resource_table.add(listener_resource);
Ok((rid, pathname))
Ok((rid, local_addr))
} }
#[op] pub fn listen_unix_packet(
pub fn op_net_listen_unixpacket<NP>(
state: &mut OpState, state: &mut OpState,
path: String, addr: &Path,
) -> Result<(ResourceId, Option<String>), AnyError> ) -> Result<(u32, tokio::net::unix::SocketAddr), AnyError> {
where let socket = UnixDatagram::bind(&addr)?;
NP: NetPermissions + 'static,
{
let address_path = Path::new(&path);
super::check_unstable(state, "Deno.listenDatagram");
let permissions = state.borrow_mut::<NP>();
permissions.check_read(address_path, "Deno.listenDatagram()")?;
permissions.check_write(address_path, "Deno.listenDatagram()")?;
let socket = UnixDatagram::bind(&address_path)?;
let local_addr = socket.local_addr()?; let local_addr = socket.local_addr()?;
let pathname = local_addr.as_pathname().map(pathstring).transpose()?;
let datagram_resource = UnixDatagramResource { let datagram_resource = UnixDatagramResource {
socket: AsyncRefCell::new(socket), socket: AsyncRefCell::new(socket),
cancel: Default::default(), cancel: Default::default(),
}; };
let rid = state.resource_table.add(datagram_resource); let rid = state.resource_table.add(datagram_resource);
Ok((rid, pathname))
Ok((rid, local_addr))
} }
pub fn pathstring(pathname: &Path) -> Result<String, AnyError> { pub fn pathstring(pathname: &Path) -> Option<String> {
into_string(pathname.into()) into_string(pathname.into()).ok()
} }

View file

@ -75,10 +75,8 @@
"op_crypto_sign_key": ["sign data", "awaiting the result of a `crypto.subtle.sign` call"], "op_crypto_sign_key": ["sign data", "awaiting the result of a `crypto.subtle.sign` call"],
"op_crypto_subtle_digest": ["digest data", "awaiting the result of a `crypto.subtle.digest` call"], "op_crypto_subtle_digest": ["digest data", "awaiting the result of a `crypto.subtle.digest` call"],
"op_crypto_verify_key": ["verify data", "awaiting the result of a `crypto.subtle.verify` call"], "op_crypto_verify_key": ["verify data", "awaiting the result of a `crypto.subtle.verify` call"],
"op_net_recv_udp": ["receive a datagram message via UDP", "awaiting the result of `Deno.DatagramConn#receive` call, or not breaking out of a for await loop looping over a `Deno.DatagramConn`"], "op_dgram_recv": ["receive a datagram message", "awaiting the result of `Deno.DatagramConn#receive` call, or not breaking out of a for await loop looping over a `Deno.DatagramConn`"],
"op_net_recv_unixpacket": ["receive a datagram message via Unixpacket", "awaiting the result of `Deno.DatagramConn#receive` call, or not breaking out of a for await loop looping over a `Deno.DatagramConn`"], "op_dgram_send": ["send a datagram message", "awaiting the result of `Deno.DatagramConn#send` call"],
"op_net_send_udp": ["send a datagram message via UDP", "awaiting the result of `Deno.DatagramConn#send` call"],
"op_net_send_unixpacket": ["send a datagram message via Unixpacket", "awaiting the result of `Deno.DatagramConn#send` call"],
"op_dns_resolve": ["resolve a DNS name", "awaiting the result of a `Deno.resolveDns` call"], "op_dns_resolve": ["resolve a DNS name", "awaiting the result of a `Deno.resolveDns` call"],
"op_fdatasync_async": ["flush pending data operations for a file to disk", "awaiting the result of a `Deno.fdatasync` call"], "op_fdatasync_async": ["flush pending data operations for a file to disk", "awaiting the result of a `Deno.fdatasync` call"],
"op_fetch_send": ["send a HTTP request", "awaiting the result of a `fetch` call"], "op_fetch_send": ["send a HTTP request", "awaiting the result of a `fetch` call"],
@ -101,10 +99,8 @@
"op_make_temp_file_async": ["create a temporary file", "awaiting the result of a `Deno.makeTempFile` call"], "op_make_temp_file_async": ["create a temporary file", "awaiting the result of a `Deno.makeTempFile` call"],
"op_message_port_recv_message": ["receive a message from a MessagePort", "awaiting the result of not closing a `MessagePort`"], "op_message_port_recv_message": ["receive a message from a MessagePort", "awaiting the result of not closing a `MessagePort`"],
"op_mkdir_async": ["create a directory", "awaiting the result of a `Deno.mkdir` call"], "op_mkdir_async": ["create a directory", "awaiting the result of a `Deno.mkdir` call"],
"op_net_accept_tcp": ["accept a TCP stream", "closing a `Deno.Listener`"], "op_net_accept": ["accept a TCP connection", "closing a `Deno.Listener`"],
"op_net_accept_unix": ["accept a Unix stream", "closing a `Deno.Listener`"], "op_net_connect": ["connect to a TCP or UDP server", "awaiting a `Deno.connect` call"],
"op_net_connect_tcp": ["connect to a TCP server", "awaiting a `Deno.connect` call"],
"op_net_connect_unix": ["connect to a Unix server", "awaiting a `Deno.connect` call"],
"op_open_async": ["open a file", "awaiting the result of a `Deno.open` call"], "op_open_async": ["open a file", "awaiting the result of a `Deno.open` call"],
"op_read_dir_async": ["read a directory", "collecting all items in the async iterable returned from a `Deno.readDir` call"], "op_read_dir_async": ["read a directory", "collecting all items in the async iterable returned from a `Deno.readDir` call"],
"op_read_link_async": ["read a symlink", "awaiting the result of a `Deno.readLink` call"], "op_read_link_async": ["read a symlink", "awaiting the result of a `Deno.readLink` call"],
@ -117,8 +113,8 @@
"op_sleep": ["sleep for a duration", "cancelling a `setTimeout` or `setInterval` call"], "op_sleep": ["sleep for a duration", "cancelling a `setTimeout` or `setInterval` call"],
"op_stat_async": ["get file metadata", "awaiting the result of a `Deno.stat` call"], "op_stat_async": ["get file metadata", "awaiting the result of a `Deno.stat` call"],
"op_symlink_async": ["create a symlink", "awaiting the result of a `Deno.symlink` call"], "op_symlink_async": ["create a symlink", "awaiting the result of a `Deno.symlink` call"],
"op_net_accept_tls": ["accept a TLS stream", "closing a `Deno.TlsListener`"], "op_tls_accept": ["accept a TLS connection", "closing a `Deno.TlsListener`"],
"op_net_connect_tls": ["connect to a TLS server", "awaiting a `Deno.connectTls` call"], "op_tls_connect": ["connect to a TLS server", "awaiting a `Deno.connectTls` call"],
"op_tls_handshake": ["perform a TLS handshake", "awaiting a `Deno.TlsConn#handshake` call"], "op_tls_handshake": ["perform a TLS handshake", "awaiting a `Deno.TlsConn#handshake` call"],
"op_tls_start": ["start a TLS connection", "awaiting a `Deno.startTls` call"], "op_tls_start": ["start a TLS connection", "awaiting a `Deno.startTls` call"],
"op_truncate_async": ["truncate a file", "awaiting the result of a `Deno.truncate` call"], "op_truncate_async": ["truncate a file", "awaiting the result of a `Deno.truncate` call"],

View file

@ -128,7 +128,10 @@
networkInterfaces: __bootstrap.os.networkInterfaces, networkInterfaces: __bootstrap.os.networkInterfaces,
getGid: __bootstrap.os.getGid, getGid: __bootstrap.os.getGid,
getUid: __bootstrap.os.getUid, getUid: __bootstrap.os.getUid,
listenDatagram: __bootstrap.net.listenDatagram, listen: __bootstrap.netUnstable.listen,
connect: __bootstrap.netUnstable.connect,
listenDatagram: __bootstrap.netUnstable.listenDatagram,
Listener: __bootstrap.netUnstable.Listener,
umask: __bootstrap.fs.umask, umask: __bootstrap.fs.umask,
utime: __bootstrap.fs.utime, utime: __bootstrap.fs.utime,
utimeSync: __bootstrap.fs.utimeSync, utimeSync: __bootstrap.fs.utimeSync,

View file

@ -58,7 +58,6 @@ delete Intl.v8BreakIterator;
const worker = window.__bootstrap.worker; const worker = window.__bootstrap.worker;
const internals = window.__bootstrap.internals; const internals = window.__bootstrap.internals;
const performance = window.__bootstrap.performance; const performance = window.__bootstrap.performance;
const net = window.__bootstrap.net;
const crypto = window.__bootstrap.crypto; const crypto = window.__bootstrap.crypto;
const url = window.__bootstrap.url; const url = window.__bootstrap.url;
const urlPattern = window.__bootstrap.urlPattern; const urlPattern = window.__bootstrap.urlPattern;
@ -693,8 +692,6 @@ delete Intl.v8BreakIterator;
} }
performance.setTimeOrigin(DateNow()); performance.setTimeOrigin(DateNow());
net.setup(runtimeOptions.unstableFlag);
const consoleFromV8 = window.console; const consoleFromV8 = window.console;
const wrapConsole = window.__bootstrap.console.wrapConsole; const wrapConsole = window.__bootstrap.console.wrapConsole;
@ -792,8 +789,6 @@ delete Intl.v8BreakIterator;
} }
performance.setTimeOrigin(DateNow()); performance.setTimeOrigin(DateNow());
net.setup(runtimeOptions.unstableFlag);
const consoleFromV8 = window.console; const consoleFromV8 = window.console;
const wrapConsole = window.__bootstrap.console.wrapConsole; const wrapConsole = window.__bootstrap.console.wrapConsole;