mirror of
https://github.com/denoland/deno.git
synced 2024-12-25 16:49:18 -05:00
feat(ext/net): Deno.upgradeHttp handles unix connections (#13987)
This commit is contained in:
parent
e55dee7fd8
commit
52a6e9ef4a
3 changed files with 140 additions and 3 deletions
|
@ -5,7 +5,7 @@ import {
|
|||
BufWriter,
|
||||
} from "../../../test_util/std/io/buffer.ts";
|
||||
import { TextProtoReader } from "../../../test_util/std/textproto/mod.ts";
|
||||
import { serve } from "../../../test_util/std/http/server.ts";
|
||||
import { serve, serveTls } from "../../../test_util/std/http/server.ts";
|
||||
import {
|
||||
assert,
|
||||
assertEquals,
|
||||
|
@ -1739,7 +1739,7 @@ Deno.test({
|
|||
},
|
||||
});
|
||||
|
||||
Deno.test("upgradeHttp", async () => {
|
||||
Deno.test("upgradeHttp tcp", async () => {
|
||||
async function client() {
|
||||
const tcpConn = await Deno.connect({ port: 4501 });
|
||||
await tcpConn.write(
|
||||
|
@ -1782,6 +1782,120 @@ Deno.test("upgradeHttp", async () => {
|
|||
await Promise.all([server, client()]);
|
||||
});
|
||||
|
||||
Deno.test(
|
||||
"upgradeHttp tls",
|
||||
{ permissions: { net: true, read: true } },
|
||||
async () => {
|
||||
async function client() {
|
||||
const caCerts = [
|
||||
await Deno.readTextFile("cli/tests/testdata/tls/RootCA.pem"),
|
||||
];
|
||||
const tlsConn = await Deno.connectTls({
|
||||
hostname: "localhost",
|
||||
port: 4502,
|
||||
caCerts,
|
||||
});
|
||||
await tlsConn.write(
|
||||
new TextEncoder().encode(
|
||||
"CONNECT server.example.com:80 HTTP/1.1\r\n\r\nbla bla bla\nbla bla\nbla\n",
|
||||
),
|
||||
);
|
||||
setTimeout(async () => {
|
||||
await tlsConn.write(
|
||||
new TextEncoder().encode(
|
||||
"bla bla bla\nbla bla\nbla\n",
|
||||
),
|
||||
);
|
||||
tlsConn.close();
|
||||
}, 500);
|
||||
}
|
||||
|
||||
const abortController = new AbortController();
|
||||
const signal = abortController.signal;
|
||||
const certFile = "cli/tests/testdata/tls/localhost.crt";
|
||||
const keyFile = "cli/tests/testdata/tls/localhost.key";
|
||||
|
||||
const server = serveTls((req) => {
|
||||
const p = Deno.upgradeHttp(req);
|
||||
|
||||
(async () => {
|
||||
const [conn, firstPacket] = await p;
|
||||
const buf = new Uint8Array(1024);
|
||||
const firstPacketText = new TextDecoder().decode(firstPacket);
|
||||
assertEquals(firstPacketText, "bla bla bla\nbla bla\nbla\n");
|
||||
const n = await conn.read(buf);
|
||||
assert(n != null);
|
||||
const secondPacketText = new TextDecoder().decode(buf.slice(0, n));
|
||||
assertEquals(secondPacketText, "bla bla bla\nbla bla\nbla\n");
|
||||
abortController.abort();
|
||||
conn.close();
|
||||
})();
|
||||
|
||||
return new Response(null, { status: 101 });
|
||||
}, { hostname: "localhost", port: 4502, signal, keyFile, certFile });
|
||||
|
||||
await Promise.all([server, client()]);
|
||||
},
|
||||
);
|
||||
|
||||
Deno.test("upgradeHttp unix", {
|
||||
permissions: { read: true, write: true },
|
||||
ignore: Deno.build.os === "windows",
|
||||
}, async () => {
|
||||
const filePath = Deno.makeTempFileSync();
|
||||
const promise = deferred();
|
||||
|
||||
async function client() {
|
||||
const unixConn = await Deno.connect({ path: filePath, transport: "unix" });
|
||||
await unixConn.write(
|
||||
new TextEncoder().encode(
|
||||
"CONNECT server.example.com:80 HTTP/1.1\r\n\r\nbla bla bla\nbla bla\nbla\n",
|
||||
),
|
||||
);
|
||||
setTimeout(async () => {
|
||||
await unixConn.write(
|
||||
new TextEncoder().encode(
|
||||
"bla bla bla\nbla bla\nbla\n",
|
||||
),
|
||||
);
|
||||
unixConn.close();
|
||||
promise.resolve();
|
||||
}, 500);
|
||||
await promise;
|
||||
}
|
||||
|
||||
const server = (async () => {
|
||||
const listener = Deno.listen({ path: filePath, transport: "unix" });
|
||||
for await (const conn of listener) {
|
||||
const httpConn = Deno.serveHttp(conn);
|
||||
const maybeReq = await httpConn.nextRequest();
|
||||
assert(maybeReq);
|
||||
const { request, respondWith } = maybeReq;
|
||||
const p = Deno.upgradeHttp(request);
|
||||
|
||||
const promise = (async () => {
|
||||
const [conn, firstPacket] = await p;
|
||||
const buf = new Uint8Array(1024);
|
||||
const firstPacketText = new TextDecoder().decode(firstPacket);
|
||||
assertEquals(firstPacketText, "bla bla bla\nbla bla\nbla\n");
|
||||
const n = await conn.read(buf);
|
||||
assert(n != null);
|
||||
const secondPacketText = new TextDecoder().decode(buf.slice(0, n));
|
||||
assertEquals(secondPacketText, "bla bla bla\nbla bla\nbla\n");
|
||||
conn.close();
|
||||
})();
|
||||
|
||||
const resp = new Response(null, { status: 101 });
|
||||
await respondWith(resp);
|
||||
await promise;
|
||||
httpConn.close();
|
||||
break;
|
||||
}
|
||||
})();
|
||||
|
||||
await Promise.all([server, client()]);
|
||||
});
|
||||
|
||||
function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader {
|
||||
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
|
||||
const tp = new TextProtoReader(r);
|
||||
|
|
|
@ -30,7 +30,7 @@
|
|||
_idleTimeoutTimeout,
|
||||
_serverHandleIdleTimeout,
|
||||
} = window.__bootstrap.webSocket;
|
||||
const { TcpConn } = window.__bootstrap.net;
|
||||
const { TcpConn, UnixConn } = window.__bootstrap.net;
|
||||
const { TlsConn } = window.__bootstrap.tls;
|
||||
const { Deferred } = window.__bootstrap.streams;
|
||||
const {
|
||||
|
@ -311,6 +311,8 @@
|
|||
conn = new TcpConn(res.connRid, remoteAddr, localAddr);
|
||||
} else if (res.connType === "tls") {
|
||||
conn = new TlsConn(res.connRid, remoteAddr, localAddr);
|
||||
} else if (res.connType === "unix") {
|
||||
conn = new UnixConn(res.connRid, remoteAddr, localAddr);
|
||||
} else {
|
||||
throw new Error("unreachable");
|
||||
}
|
||||
|
|
|
@ -14,11 +14,14 @@ use deno_http::http_create_conn_resource;
|
|||
use deno_http::HttpRequestReader;
|
||||
use deno_http::HttpStreamResource;
|
||||
use deno_net::io::TcpStreamResource;
|
||||
use deno_net::io::UnixStreamResource;
|
||||
use deno_net::ops_tls::TlsStream;
|
||||
use deno_net::ops_tls::TlsStreamResource;
|
||||
use hyper::upgrade::Parts;
|
||||
use serde::Serialize;
|
||||
use tokio::net::TcpStream;
|
||||
#[cfg(unix)]
|
||||
use tokio::net::UnixStream;
|
||||
|
||||
pub fn init() -> Extension {
|
||||
Extension::builder()
|
||||
|
@ -121,6 +124,24 @@ async fn op_http_upgrade(
|
|||
}
|
||||
Err(transport) => transport,
|
||||
};
|
||||
#[cfg(unix)]
|
||||
let transport = match transport.downcast::<UnixStream>() {
|
||||
Ok(Parts {
|
||||
io: unix_stream,
|
||||
read_buf,
|
||||
..
|
||||
}) => {
|
||||
return Ok(HttpUpgradeResult {
|
||||
conn_type: "unix",
|
||||
conn_rid: state
|
||||
.borrow_mut()
|
||||
.resource_table
|
||||
.add(UnixStreamResource::new(unix_stream.into_split())),
|
||||
read_buf: read_buf.to_vec().into(),
|
||||
});
|
||||
}
|
||||
Err(transport) => transport,
|
||||
};
|
||||
match transport.downcast::<TlsStream>() {
|
||||
Ok(Parts {
|
||||
io: tls_stream,
|
||||
|
|
Loading…
Reference in a new issue