1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-06 22:35:51 -05:00
denoland-deno/ext/net/01_net.js
Bartek Iwańczuk 8d5c0112fb
feat: don't require --unstable flag for npm programs (#16520)
This PR adds copies of several unstable APIs that are available
in "Deno[Deno.internal].nodeUnstable" namespace.

These copies do not perform unstable check (ie. don't require
"--unstable" flag to be present). Otherwise they work exactly
the same, including permission checks.

These APIs are not meant to be used by users directly and
can change at any time.

Copies of following APIs are available in that namespace:
- Deno.spawnChild
- Deno.spawn
- Deno.spawnSync
- Deno.serve
- Deno.upgradeHttpRaw
- Deno.listenDatagram
2022-11-10 22:03:28 +01:00

400 lines
9 KiB
JavaScript

// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
"use strict";
((window) => {
const core = window.Deno.core;
const { BadResourcePrototype, InterruptedPrototype, ops } = core;
const { readableStreamForRid, writableStreamForRid } =
window.__bootstrap.streams;
const {
Error,
ObjectPrototypeIsPrototypeOf,
PromiseResolve,
SymbolAsyncIterator,
SymbolFor,
TypedArrayPrototypeSubarray,
TypeError,
Uint8Array,
} = window.__bootstrap.primordials;
const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId");
async function read(
rid,
buffer,
) {
if (buffer.length === 0) {
return 0;
}
const nread = await core.read(rid, buffer);
return nread === 0 ? null : nread;
}
async function write(rid, data) {
return await core.write(rid, data);
}
function shutdown(rid) {
return core.shutdown(rid);
}
function resolveDns(query, recordType, options) {
return core.opAsync("op_dns_resolve", { query, recordType, options });
}
class Conn {
#rid = 0;
#remoteAddr = null;
#localAddr = null;
#readable;
#writable;
constructor(rid, remoteAddr, localAddr) {
this.#rid = rid;
this.#remoteAddr = remoteAddr;
this.#localAddr = localAddr;
}
get rid() {
return this.#rid;
}
get remoteAddr() {
return this.#remoteAddr;
}
get localAddr() {
return this.#localAddr;
}
write(p) {
return write(this.rid, p);
}
read(p) {
return read(this.rid, p);
}
close() {
core.close(this.rid);
}
closeWrite() {
return shutdown(this.rid);
}
get readable() {
if (this.#readable === undefined) {
this.#readable = readableStreamForRid(this.rid);
}
return this.#readable;
}
get writable() {
if (this.#writable === undefined) {
this.#writable = writableStreamForRid(this.rid);
}
return this.#writable;
}
}
class TcpConn extends Conn {
setNoDelay(nodelay = true) {
return ops.op_set_nodelay(this.rid, nodelay);
}
setKeepAlive(keepalive = true) {
return ops.op_set_keepalive(this.rid, keepalive);
}
}
class UnixConn extends Conn {}
class Listener {
#rid = 0;
#addr = null;
#unref = false;
#promiseId = null;
constructor(rid, addr) {
this.#rid = rid;
this.#addr = addr;
}
get rid() {
return this.#rid;
}
get addr() {
return this.#addr;
}
async accept() {
let promise;
switch (this.addr.transport) {
case "tcp":
promise = core.opAsync("op_net_accept_tcp", this.rid);
break;
case "unix":
promise = core.opAsync("op_net_accept_unix", this.rid);
break;
default:
throw new Error(`Unsupported transport: ${this.addr.transport}`);
}
this.#promiseId = promise[promiseIdSymbol];
if (this.#unref) core.unrefOp(this.#promiseId);
const [rid, localAddr, remoteAddr] = await promise;
this.#promiseId = null;
if (this.addr.transport == "tcp") {
localAddr.transport = "tcp";
remoteAddr.transport = "tcp";
return new TcpConn(rid, remoteAddr, localAddr);
} else if (this.addr.transport == "unix") {
return new UnixConn(
rid,
{ transport: "unix", path: remoteAddr },
{ transport: "unix", path: localAddr },
);
} else {
throw new Error("unreachable");
}
}
async next() {
let conn;
try {
conn = await this.accept();
} catch (error) {
if (
ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error) ||
ObjectPrototypeIsPrototypeOf(InterruptedPrototype, error)
) {
return { value: undefined, done: true };
}
throw error;
}
return { value: conn, done: false };
}
return(value) {
this.close();
return PromiseResolve({ value, done: true });
}
close() {
core.close(this.rid);
}
[SymbolAsyncIterator]() {
return this;
}
ref() {
this.#unref = false;
if (typeof this.#promiseId === "number") {
core.refOp(this.#promiseId);
}
}
unref() {
this.#unref = true;
if (typeof this.#promiseId === "number") {
core.unrefOp(this.#promiseId);
}
}
}
class Datagram {
#rid = 0;
#addr = null;
constructor(rid, addr, bufSize = 1024) {
this.#rid = rid;
this.#addr = addr;
this.bufSize = bufSize;
}
get rid() {
return this.#rid;
}
get addr() {
return this.#addr;
}
async receive(p) {
const buf = p || new Uint8Array(this.bufSize);
let nread;
let remoteAddr;
switch (this.addr.transport) {
case "udp": {
[nread, remoteAddr] = await core.opAsync(
"op_net_recv_udp",
this.rid,
buf,
);
remoteAddr.transport = "udp";
break;
}
case "unixpacket": {
let path;
[nread, path] = await core.opAsync(
"op_net_recv_unixpacket",
this.rid,
buf,
);
remoteAddr = { transport: "unixpacket", path };
break;
}
default:
throw new Error(`Unsupported transport: ${this.addr.transport}`);
}
const sub = TypedArrayPrototypeSubarray(buf, 0, nread);
return [sub, remoteAddr];
}
async send(p, opts) {
switch (this.addr.transport) {
case "udp":
return await core.opAsync(
"op_net_send_udp",
this.rid,
{ hostname: opts.hostname ?? "127.0.0.1", port: opts.port },
p,
);
case "unixpacket":
return await core.opAsync(
"op_net_send_unixpacket",
this.rid,
opts.path,
p,
);
default:
throw new Error(`Unsupported transport: ${this.addr.transport}`);
}
}
close() {
core.close(this.rid);
}
async *[SymbolAsyncIterator]() {
while (true) {
try {
yield await this.receive();
} catch (err) {
if (
ObjectPrototypeIsPrototypeOf(BadResourcePrototype, err) ||
ObjectPrototypeIsPrototypeOf(InterruptedPrototype, err)
) {
break;
}
throw err;
}
}
}
}
function listen(args) {
switch (args.transport ?? "tcp") {
case "tcp": {
const [rid, addr] = ops.op_net_listen_tcp({
hostname: args.hostname ?? "0.0.0.0",
port: args.port,
}, args.reusePort);
addr.transport = "tcp";
return new Listener(rid, addr);
}
case "unix": {
const [rid, path] = ops.op_net_listen_unix(args.path);
const addr = {
transport: "unix",
path,
};
return new Listener(rid, addr);
}
default:
throw new TypeError(`Unsupported transport: '${transport}'`);
}
}
function createListenDatagram(udpOpFn, unixOpFn) {
return function listenDatagram(args) {
switch (args.transport) {
case "udp": {
const [rid, addr] = udpOpFn(
{
hostname: args.hostname ?? "127.0.0.1",
port: args.port,
},
args.reuseAddress ?? false,
);
addr.transport = "udp";
return new Datagram(rid, addr);
}
case "unixpacket": {
const [rid, path] = unixOpFn(args.path);
const addr = {
transport: "unixpacket",
path,
};
return new Datagram(rid, addr);
}
default:
throw new TypeError(`Unsupported transport: '${transport}'`);
}
};
}
async function connect(args) {
switch (args.transport ?? "tcp") {
case "tcp": {
const [rid, localAddr, remoteAddr] = await core.opAsync(
"op_net_connect_tcp",
{
hostname: args.hostname ?? "127.0.0.1",
port: args.port,
},
);
localAddr.transport = "tcp";
remoteAddr.transport = "tcp";
return new TcpConn(rid, remoteAddr, localAddr);
}
case "unix": {
const [rid, localAddr, remoteAddr] = await core.opAsync(
"op_net_connect_unix",
args.path,
);
return new UnixConn(
rid,
{ transport: "unix", path: remoteAddr },
{ transport: "unix", path: localAddr },
);
}
default:
throw new TypeError(`Unsupported transport: '${transport}'`);
}
}
function setup(unstable) {
if (!unstable) {
delete Listener.prototype.ref;
delete Listener.prototype.unref;
}
}
window.__bootstrap.net = {
setup,
connect,
Conn,
TcpConn,
UnixConn,
listen,
createListenDatagram,
Listener,
shutdown,
Datagram,
resolveDns,
};
})(this);