1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-22 15:24:46 -05:00

fix(ext/http): Make Deno.serveHttp() work when proxying (#23269)

Closes https://github.com/denoland/deno/issues/21900
This commit is contained in:
Bartek Iwańczuk 2024-04-08 22:02:49 +01:00 committed by GitHub
parent cb12a93503
commit d3b63bb315
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 143 additions and 49 deletions

View file

@ -137,8 +137,10 @@ class HttpConn {
return null;
}
const { 0: streamRid, 1: method, 2: url } = nextRequest;
SetPrototypeAdd(this.#managedResources, streamRid);
const { 0: readStreamRid, 1: writeStreamRid, 2: method, 3: url } =
nextRequest;
SetPrototypeAdd(this.#managedResources, readStreamRid);
SetPrototypeAdd(this.#managedResources, writeStreamRid);
/** @type {ReadableStream<Uint8Array> | undefined} */
let body = null;
@ -146,17 +148,16 @@ class HttpConn {
// It will be closed automatically once the request has been handled and
// the response has been sent.
if (method !== "GET" && method !== "HEAD") {
body = readableStreamForRid(streamRid, false);
body = readableStreamForRid(readStreamRid, false);
}
const innerRequest = newInnerRequest(
method,
url,
() => op_http_headers(streamRid),
() => op_http_headers(readStreamRid),
body !== null ? new InnerBody(body) : null,
false,
);
innerRequest[streamRid] = streamRid;
const abortController = new AbortController();
const request = fromInnerRequest(
innerRequest,
@ -167,7 +168,8 @@ class HttpConn {
const respondWith = createRespondWith(
this,
streamRid,
readStreamRid,
writeStreamRid,
abortController,
);
@ -178,10 +180,10 @@ class HttpConn {
close() {
if (!this.#closed) {
this.#closed = true;
core.close(this.#rid);
core.tryClose(this.#rid);
for (const rid of new SafeSetIterator(this.#managedResources)) {
SetPrototypeDelete(this.#managedResources, rid);
core.close(rid);
core.tryClose(rid);
}
}
}
@ -209,7 +211,8 @@ class HttpConn {
function createRespondWith(
httpConn,
streamRid,
readStreamRid,
writeStreamRid,
abortController,
) {
return async function respondWith(resp) {
@ -270,7 +273,7 @@ function createRespondWith(
);
try {
await op_http_write_headers(
streamRid,
writeStreamRid,
innerResp.status ?? 200,
innerResp.headerList,
isStreamingResponseBody ? null : respBody,
@ -310,7 +313,7 @@ function createRespondWith(
reader = respBody.getReader(); // Acquire JS lock.
try {
await op_http_write_resource(
streamRid,
writeStreamRid,
resourceBacking.rid,
);
if (resourceBacking.autoClose) core.tryClose(resourceBacking.rid);
@ -340,7 +343,7 @@ function createRespondWith(
break;
}
try {
await op_http_write(streamRid, value);
await op_http_write(writeStreamRid, value);
} catch (error) {
const connError = httpConn[connErrorSymbol];
if (
@ -359,7 +362,7 @@ function createRespondWith(
if (success) {
try {
await op_http_shutdown(streamRid);
await op_http_shutdown(writeStreamRid);
} catch (error) {
await reader.cancel(error);
throw error;
@ -370,7 +373,7 @@ function createRespondWith(
const ws = resp[_ws];
if (ws) {
const wsRid = await op_http_upgrade_websocket(
streamRid,
readStreamRid,
);
ws[_rid] = wsRid;
ws[_protocol] = resp.headers.get("sec-websocket-protocol");
@ -395,8 +398,11 @@ function createRespondWith(
abortController.abort(error);
throw error;
} finally {
if (deleteManagedResource(httpConn, streamRid)) {
core.close(streamRid);
if (deleteManagedResource(httpConn, readStreamRid)) {
core.tryClose(readStreamRid);
}
if (deleteManagedResource(httpConn, writeStreamRid)) {
core.tryClose(writeStreamRid);
}
}
};

View file

@ -209,7 +209,15 @@ impl HttpConnResource {
// Accepts a new incoming HTTP request.
async fn accept(
self: &Rc<Self>,
) -> Result<Option<(HttpStreamResource, String, String)>, AnyError> {
) -> Result<
Option<(
HttpStreamReadResource,
HttpStreamWriteResource,
String,
String,
)>,
AnyError,
> {
let fut = async {
let (request_tx, request_rx) = oneshot::channel();
let (response_tx, response_rx) = oneshot::channel();
@ -218,7 +226,6 @@ impl HttpConnResource {
self.acceptors_tx.unbounded_send(acceptor).ok()?;
let request = request_rx.await.ok()?;
let accept_encoding = {
let encodings =
fly_accept_encoding::encodings_iter_http_02(request.headers())
@ -234,9 +241,10 @@ impl HttpConnResource {
let method = request.method().to_string();
let url = req_url(&request, self.scheme, &self.addr);
let stream =
HttpStreamResource::new(self, request, response_tx, accept_encoding);
Some((stream, method, url))
let read_stream = HttpStreamReadResource::new(self, request);
let write_stream =
HttpStreamWriteResource::new(self, response_tx, accept_encoding);
Some((read_stream, write_stream, method, url))
};
async {
@ -348,38 +356,34 @@ impl HttpAcceptor {
}
}
/// A resource representing a single HTTP request/response stream.
pub struct HttpStreamResource {
conn: Rc<HttpConnResource>,
pub struct HttpStreamReadResource {
_conn: Rc<HttpConnResource>,
pub rd: AsyncRefCell<HttpRequestReader>,
wr: AsyncRefCell<HttpResponseWriter>,
accept_encoding: Encoding,
cancel_handle: CancelHandle,
size: SizeHint,
}
impl HttpStreamResource {
fn new(
conn: &Rc<HttpConnResource>,
request: Request<Body>,
response_tx: oneshot::Sender<Response<Body>>,
accept_encoding: Encoding,
) -> Self {
pub struct HttpStreamWriteResource {
conn: Rc<HttpConnResource>,
wr: AsyncRefCell<HttpResponseWriter>,
accept_encoding: Encoding,
}
impl HttpStreamReadResource {
fn new(conn: &Rc<HttpConnResource>, request: Request<Body>) -> Self {
let size = request.body().size_hint();
Self {
conn: conn.clone(),
_conn: conn.clone(),
rd: HttpRequestReader::Headers(request).into(),
wr: HttpResponseWriter::Headers(response_tx).into(),
accept_encoding,
size,
cancel_handle: CancelHandle::new(),
}
}
}
impl Resource for HttpStreamResource {
impl Resource for HttpStreamReadResource {
fn name(&self) -> Cow<str> {
"httpStream".into()
"httpReadStream".into()
}
fn read(self: Rc<Self>, limit: usize) -> AsyncResult<BufView> {
@ -440,6 +444,26 @@ impl Resource for HttpStreamResource {
}
}
impl HttpStreamWriteResource {
fn new(
conn: &Rc<HttpConnResource>,
response_tx: oneshot::Sender<Response<Body>>,
accept_encoding: Encoding,
) -> Self {
Self {
conn: conn.clone(),
wr: HttpResponseWriter::Headers(response_tx).into(),
accept_encoding,
}
}
}
impl Resource for HttpStreamWriteResource {
fn name(&self) -> Cow<str> {
"httpWriteStream".into()
}
}
/// The read half of an HTTP stream.
pub enum HttpRequestReader {
Headers(Request<Body>),
@ -504,7 +528,9 @@ impl Drop for BodyUncompressedSender {
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct NextRequestResponse(
// stream_rid:
// read_stream_rid:
ResourceId,
// write_stream_rid:
ResourceId,
// method:
// This is a String rather than a ByteString because reqwest will only return
@ -523,10 +549,17 @@ async fn op_http_accept(
let conn = state.borrow().resource_table.get::<HttpConnResource>(rid)?;
match conn.accept().await {
Ok(Some((stream, method, url))) => {
let stream_rid =
state.borrow_mut().resource_table.add_rc(Rc::new(stream));
let r = NextRequestResponse(stream_rid, method, url);
Ok(Some((read_stream, write_stream, method, url))) => {
let read_stream_rid = state
.borrow_mut()
.resource_table
.add_rc(Rc::new(read_stream));
let write_stream_rid = state
.borrow_mut()
.resource_table
.add_rc(Rc::new(write_stream));
let r =
NextRequestResponse(read_stream_rid, write_stream_rid, method, url);
Ok(Some(r))
}
Ok(None) => Ok(None),
@ -628,7 +661,7 @@ async fn op_http_write_headers(
let stream = state
.borrow_mut()
.resource_table
.get::<HttpStreamResource>(rid)?;
.get::<HttpStreamWriteResource>(rid)?;
// Track supported encoding
let encoding = stream.accept_encoding;
@ -693,7 +726,7 @@ fn op_http_headers(
state: &mut OpState,
#[smi] rid: u32,
) -> Result<Vec<(ByteString, ByteString)>, AnyError> {
let stream = state.resource_table.get::<HttpStreamResource>(rid)?;
let stream = state.resource_table.get::<HttpStreamReadResource>(rid)?;
let rd = RcRef::map(&stream, |r| &r.rd)
.try_borrow()
.ok_or_else(|| http_error("already in use"))?;
@ -849,7 +882,7 @@ async fn op_http_write_resource(
let http_stream = state
.borrow()
.resource_table
.get::<HttpStreamResource>(rid)?;
.get::<HttpStreamWriteResource>(rid)?;
let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await;
let resource = state.borrow().resource_table.get_any(stream)?;
loop {
@ -908,7 +941,7 @@ async fn op_http_write(
let stream = state
.borrow()
.resource_table
.get::<HttpStreamResource>(rid)?;
.get::<HttpStreamWriteResource>(rid)?;
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
match &mut *wr {
@ -960,7 +993,7 @@ async fn op_http_shutdown(
let stream = state
.borrow()
.resource_table
.get::<HttpStreamResource>(rid)?;
.get::<HttpStreamWriteResource>(rid)?;
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
let wr = take(&mut *wr);
match wr {
@ -1008,7 +1041,7 @@ async fn op_http_upgrade_websocket(
let stream = state
.borrow_mut()
.resource_table
.get::<HttpStreamResource>(rid)?;
.get::<HttpStreamReadResource>(rid)?;
let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await;
let request = match &mut *rd {

View file

@ -2668,6 +2668,61 @@ Deno.test(
},
);
Deno.test("proxy with fetch", async () => {
const listener = Deno.listen({ port: listenPort });
const deferred = Promise.withResolvers<void>();
const server = Deno.serve({ port: listenPort + 1 }, (_req) => {
return new Response("Hello world");
});
let httpConn: Deno.HttpConn;
async function handleHttp(conn: Deno.Conn) {
httpConn = Deno.serveHttp(conn);
for await (const e of httpConn) {
await e.respondWith(serve(e.request));
break;
}
}
async function serve(req: Request) {
return await fetch(`http://localhost:${listenPort + 1}/`, req);
}
const originServer = (async () => {
for await (const conn of listener) {
handleHttp(conn);
break;
}
})();
const proxiedRequest = (async () => {
const conn = await Deno.connect({ port: listenPort });
const payload = new TextEncoder().encode(
"POST /api/sessions HTTP/1.1\x0d\x0aConnection: keep-alive\x0d\x0aContent-Length: 2\x0d\x0a\x0d\x0a{}",
);
const n = await conn.write(payload);
assertEquals(n, 76);
const buf = new Uint8Array(1000);
const nread = await conn.read(buf);
assertEquals(nread, 150);
const respText = new TextDecoder().decode(buf);
assert(respText.includes("HTTP/1.1 200 OK"));
assert(respText.includes("content-type: text/plain;charset=UTF-8"));
assert(respText.includes("vary: Accept-Encoding"));
assert(respText.includes("content-length: 11"));
assert(respText.includes("Hello world"));
conn.close();
deferred.resolve();
})();
await proxiedRequest;
await originServer;
await deferred.promise;
await server.shutdown();
await server.finished;
httpConn!.close();
});
function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader {
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
const tp = new TextProtoReader(r);