mirror of
https://github.com/denoland/deno.git
synced 2024-11-21 15:04:11 -05:00
fix(ext/http): internal upgradeHttpRaw works with "Deno.serve()" API (#18859)
Fix internal "upgradeHttpRaw" API restoring capability to upgrade HTTP connection in polyfilles "node:http" API.
This commit is contained in:
parent
a8b4e346b4
commit
e2761df3fe
9 changed files with 264 additions and 248 deletions
12
cli/bench/testdata/deno_upgrade_http.js
vendored
12
cli/bench/testdata/deno_upgrade_http.js
vendored
|
@ -1,12 +0,0 @@
|
|||
const { serve, upgradeHttpRaw } = Deno;
|
||||
const u8 = Deno[Deno.internal].core.encode(
|
||||
"HTTP/1.1 101 Switching Protocols\r\n\r\n",
|
||||
);
|
||||
|
||||
async function handler(req) {
|
||||
const [conn, _firstPacket] = upgradeHttpRaw(req);
|
||||
await conn.write(u8);
|
||||
await conn.close();
|
||||
}
|
||||
|
||||
serve({ hostname: "127.0.0.1", port: 9000 }, handler);
|
|
@ -17,6 +17,11 @@ import {
|
|||
} from "./test_util.ts";
|
||||
import { consoleSize } from "../../../runtime/js/40_tty.js";
|
||||
|
||||
const {
|
||||
upgradeHttpRaw,
|
||||
// @ts-expect-error TypeScript (as of 3.7) does not support indexing namespaces by symbol
|
||||
} = Deno[Deno.internal];
|
||||
|
||||
function createOnErrorCb(ac: AbortController): (err: unknown) => Response {
|
||||
return (err) => {
|
||||
console.error(err);
|
||||
|
@ -803,6 +808,85 @@ Deno.test({ permissions: { net: true } }, async function httpServerWebSocket() {
|
|||
await server;
|
||||
});
|
||||
|
||||
Deno.test(
|
||||
{ permissions: { net: true } },
|
||||
async function httpServerWebSocketRaw() {
|
||||
const ac = new AbortController();
|
||||
const listeningPromise = deferred();
|
||||
const server = Deno.serve({
|
||||
handler: async (request) => {
|
||||
const { conn, response } = upgradeHttpRaw(request);
|
||||
const buf = new Uint8Array(1024);
|
||||
let read;
|
||||
|
||||
// Write our fake HTTP upgrade
|
||||
await conn.write(
|
||||
new TextEncoder().encode(
|
||||
"HTTP/1.1 101 Switching Protocols\r\nConnection: Upgraded\r\n\r\nExtra",
|
||||
),
|
||||
);
|
||||
|
||||
// Upgrade data
|
||||
read = await conn.read(buf);
|
||||
assertEquals(
|
||||
new TextDecoder().decode(buf.subarray(0, read!)),
|
||||
"Upgrade data",
|
||||
);
|
||||
// Read the packet to echo
|
||||
read = await conn.read(buf);
|
||||
// Echo
|
||||
await conn.write(buf.subarray(0, read!));
|
||||
|
||||
conn.close();
|
||||
return response;
|
||||
},
|
||||
port: 4501,
|
||||
signal: ac.signal,
|
||||
onListen: onListen(listeningPromise),
|
||||
onError: createOnErrorCb(ac),
|
||||
});
|
||||
|
||||
await listeningPromise;
|
||||
|
||||
const conn = await Deno.connect({ port: 4501 });
|
||||
await conn.write(
|
||||
new TextEncoder().encode(
|
||||
"GET / HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: websocket\r\n\r\nUpgrade data",
|
||||
),
|
||||
);
|
||||
const buf = new Uint8Array(1024);
|
||||
let len;
|
||||
|
||||
// Headers
|
||||
let headers = "";
|
||||
for (let i = 0; i < 2; i++) {
|
||||
len = await conn.read(buf);
|
||||
headers += new TextDecoder().decode(buf.subarray(0, len!));
|
||||
if (headers.endsWith("Extra")) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertMatch(
|
||||
headers,
|
||||
/HTTP\/1\.1 101 Switching Protocols[ ,.A-Za-z:0-9\r\n]*Extra/im,
|
||||
);
|
||||
|
||||
// Data to echo
|
||||
await conn.write(new TextEncoder().encode("buffer data"));
|
||||
|
||||
// Echo
|
||||
len = await conn.read(buf);
|
||||
assertEquals(
|
||||
new TextDecoder().decode(buf.subarray(0, len!)),
|
||||
"buffer data",
|
||||
);
|
||||
|
||||
conn.close();
|
||||
ac.abort();
|
||||
await server;
|
||||
},
|
||||
);
|
||||
|
||||
Deno.test(
|
||||
{ permissions: { net: true } },
|
||||
async function httpServerWebSocketUpgradeTwice() {
|
||||
|
|
19
cli/tsc/dts/lib.deno.unstable.d.ts
vendored
19
cli/tsc/dts/lib.deno.unstable.d.ts
vendored
|
@ -1516,25 +1516,6 @@ declare namespace Deno {
|
|||
request: Request,
|
||||
): Promise<[Deno.Conn, Uint8Array]>;
|
||||
|
||||
/** **UNSTABLE**: New API, yet to be vetted.
|
||||
*
|
||||
* Allows "hijacking" the connection that the request is associated with.
|
||||
* This can be used to implement protocols that build on top of HTTP (eg.
|
||||
* {@linkcode WebSocket}).
|
||||
*
|
||||
* Unlike {@linkcode Deno.upgradeHttp} this function does not require that you
|
||||
* respond to the request with a {@linkcode Response} object. Instead this
|
||||
* function returns the underlying connection and first packet received
|
||||
* immediately, and then the caller is responsible for writing the response to
|
||||
* the connection.
|
||||
*
|
||||
* This method can only be called on requests originating the
|
||||
* {@linkcode Deno.serve} server.
|
||||
*
|
||||
* @category HTTP Server
|
||||
*/
|
||||
export function upgradeHttpRaw(request: Request): [Deno.Conn, Uint8Array];
|
||||
|
||||
/** **UNSTABLE**: New API, yet to be vetted.
|
||||
*
|
||||
* Open a new {@linkcode Deno.Kv} connection to persist data.
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
|
||||
const core = globalThis.Deno.core;
|
||||
const primordials = globalThis.__bootstrap.primordials;
|
||||
const internals = globalThis.__bootstrap.internals;
|
||||
|
||||
const { BadResourcePrototype } = core;
|
||||
import { InnerBody } from "ext:deno_fetch/22_body.js";
|
||||
|
@ -10,7 +11,7 @@ import {
|
|||
newInnerResponse,
|
||||
toInnerResponse,
|
||||
} from "ext:deno_fetch/23_response.js";
|
||||
import { fromInnerRequest } from "ext:deno_fetch/23_request.js";
|
||||
import { fromInnerRequest, toInnerRequest } from "ext:deno_fetch/23_request.js";
|
||||
import { AbortController } from "ext:deno_web/03_abort_signal.js";
|
||||
import {
|
||||
_eventLoop,
|
||||
|
@ -32,6 +33,7 @@ import {
|
|||
readableStreamForRid,
|
||||
ReadableStreamPrototype,
|
||||
} from "ext:deno_web/06_streams.js";
|
||||
import { TcpConn } from "ext:deno_net/01_net.js";
|
||||
const {
|
||||
ObjectPrototypeIsPrototypeOf,
|
||||
SafeSet,
|
||||
|
@ -82,6 +84,14 @@ const UPGRADE_RESPONSE_SENTINEL = fromInnerResponse(
|
|||
"immutable",
|
||||
);
|
||||
|
||||
function upgradeHttpRaw(req, conn) {
|
||||
const inner = toInnerRequest(req);
|
||||
if (inner._wantsUpgrade) {
|
||||
return inner._wantsUpgrade("upgradeHttpRaw", conn);
|
||||
}
|
||||
throw new TypeError("upgradeHttpRaw may only be used with Deno.serve");
|
||||
}
|
||||
|
||||
class InnerRequest {
|
||||
#slabId;
|
||||
#context;
|
||||
|
@ -122,10 +132,26 @@ class InnerRequest {
|
|||
throw "upgradeHttp is unavailable in Deno.serve at this time";
|
||||
}
|
||||
|
||||
// upgradeHttpRaw is async
|
||||
// TODO(mmastrac)
|
||||
// upgradeHttpRaw is sync
|
||||
if (upgradeType == "upgradeHttpRaw") {
|
||||
throw "upgradeHttp is unavailable in Deno.serve at this time";
|
||||
const slabId = this.#slabId;
|
||||
const underlyingConn = originalArgs[0];
|
||||
|
||||
this.url();
|
||||
this.headerList;
|
||||
this.close();
|
||||
|
||||
this.#upgraded = () => {};
|
||||
|
||||
const upgradeRid = core.ops.op_upgrade_raw(slabId);
|
||||
|
||||
const conn = new TcpConn(
|
||||
upgradeRid,
|
||||
underlyingConn?.remoteAddr,
|
||||
underlyingConn?.localAddr,
|
||||
);
|
||||
|
||||
return { response: UPGRADE_RESPONSE_SENTINEL, conn };
|
||||
}
|
||||
|
||||
// upgradeWebSocket is sync
|
||||
|
@ -623,4 +649,6 @@ async function serve(arg1, arg2) {
|
|||
}
|
||||
}
|
||||
|
||||
export { serve };
|
||||
internals.upgradeHttpRaw = upgradeHttpRaw;
|
||||
|
||||
export { serve, upgradeHttpRaw };
|
||||
|
|
|
@ -64,7 +64,6 @@ const {
|
|||
} = primordials;
|
||||
|
||||
const connErrorSymbol = Symbol("connError");
|
||||
const streamRid = Symbol("streamRid");
|
||||
const _deferred = Symbol("upgradeHttpDeferred");
|
||||
|
||||
class HttpConn {
|
||||
|
@ -482,16 +481,6 @@ function upgradeHttp(req) {
|
|||
return req[_deferred].promise;
|
||||
}
|
||||
|
||||
async function upgradeHttpRaw(req, tcpConn) {
|
||||
const inner = toInnerRequest(req);
|
||||
if (inner._wantsUpgrade) {
|
||||
return inner._wantsUpgrade("upgradeHttpRaw", arguments);
|
||||
}
|
||||
|
||||
const res = await core.opAsync("op_http_upgrade_early", inner[streamRid]);
|
||||
return new TcpConn(res, tcpConn.remoteAddr, tcpConn.localAddr);
|
||||
}
|
||||
|
||||
const spaceCharCode = StringPrototypeCharCodeAt(" ", 0);
|
||||
const tabCharCode = StringPrototypeCharCodeAt("\t", 0);
|
||||
const commaCharCode = StringPrototypeCharCodeAt(",", 0);
|
||||
|
@ -566,4 +555,4 @@ function buildCaseInsensitiveCommaValueFinder(checkText) {
|
|||
internals.buildCaseInsensitiveCommaValueFinder =
|
||||
buildCaseInsensitiveCommaValueFinder;
|
||||
|
||||
export { _ws, HttpConn, serve, upgradeHttp, upgradeHttpRaw, upgradeWebSocket };
|
||||
export { _ws, HttpConn, serve, upgradeHttp, upgradeWebSocket };
|
||||
|
|
|
@ -10,11 +10,13 @@ use crate::response_body::CompletionHandle;
|
|||
use crate::response_body::ResponseBytes;
|
||||
use crate::response_body::ResponseBytesInner;
|
||||
use crate::response_body::V8StreamHttpResponseBody;
|
||||
use crate::websocket_upgrade::WebSocketUpgrade;
|
||||
use crate::LocalExecutor;
|
||||
use deno_core::error::AnyError;
|
||||
use deno_core::futures::TryFutureExt;
|
||||
use deno_core::op;
|
||||
use deno_core::AsyncRefCell;
|
||||
use deno_core::AsyncResult;
|
||||
use deno_core::BufView;
|
||||
use deno_core::ByteString;
|
||||
use deno_core::CancelFuture;
|
||||
|
@ -39,6 +41,7 @@ use hyper1::server::conn::http2;
|
|||
use hyper1::service::service_fn;
|
||||
use hyper1::service::HttpService;
|
||||
use hyper1::upgrade::OnUpgrade;
|
||||
|
||||
use hyper1::StatusCode;
|
||||
use pin_project::pin_project;
|
||||
use pin_project::pinned_drop;
|
||||
|
@ -52,6 +55,10 @@ use std::net::SocketAddr;
|
|||
use std::net::SocketAddrV4;
|
||||
use std::pin::Pin;
|
||||
use std::rc::Rc;
|
||||
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
use tokio::task::spawn_local;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
|
@ -228,7 +235,79 @@ fn slab_insert(
|
|||
}
|
||||
|
||||
#[op]
|
||||
pub fn op_upgrade_raw(_index: usize) {}
|
||||
pub fn op_upgrade_raw(
|
||||
state: &mut OpState,
|
||||
index: u32,
|
||||
) -> Result<ResourceId, AnyError> {
|
||||
// Stage 1: extract the upgrade future
|
||||
let upgrade = with_http_mut(index, |http| {
|
||||
// Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit
|
||||
http
|
||||
.request_parts
|
||||
.extensions
|
||||
.remove::<OnUpgrade>()
|
||||
.ok_or_else(|| AnyError::msg("upgrade unavailable"))
|
||||
})?;
|
||||
|
||||
let (read, write) = tokio::io::duplex(1024);
|
||||
let (read_rx, write_tx) = tokio::io::split(read);
|
||||
let (mut write_rx, mut read_tx) = tokio::io::split(write);
|
||||
|
||||
spawn_local(async move {
|
||||
let mut upgrade_stream = WebSocketUpgrade::<ResponseBytes>::default();
|
||||
|
||||
// Stage 2: Extract the Upgraded connection
|
||||
let mut buf = [0; 1024];
|
||||
let upgraded = loop {
|
||||
let read = Pin::new(&mut write_rx).read(&mut buf).await?;
|
||||
match upgrade_stream.write(&buf[..read]) {
|
||||
Ok(None) => continue,
|
||||
Ok(Some((response, bytes))) => {
|
||||
with_resp_mut(index, |resp| *resp = Some(response));
|
||||
with_promise_mut(index, |promise| promise.complete(true));
|
||||
let mut upgraded = upgrade.await?;
|
||||
upgraded.write_all(&bytes).await?;
|
||||
break upgraded;
|
||||
}
|
||||
Err(err) => return Err(err),
|
||||
}
|
||||
};
|
||||
|
||||
// Stage 3: Pump the data
|
||||
let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded);
|
||||
|
||||
spawn_local(async move {
|
||||
let mut buf = [0; 1024];
|
||||
loop {
|
||||
let read = upgraded_rx.read(&mut buf).await?;
|
||||
if read == 0 {
|
||||
break;
|
||||
}
|
||||
read_tx.write_all(&buf[..read]).await?;
|
||||
}
|
||||
Ok::<_, AnyError>(())
|
||||
});
|
||||
spawn_local(async move {
|
||||
let mut buf = [0; 1024];
|
||||
loop {
|
||||
let read = write_rx.read(&mut buf).await?;
|
||||
if read == 0 {
|
||||
break;
|
||||
}
|
||||
upgraded_tx.write_all(&buf[..read]).await?;
|
||||
}
|
||||
Ok::<_, AnyError>(())
|
||||
});
|
||||
|
||||
Ok(())
|
||||
});
|
||||
|
||||
Ok(
|
||||
state
|
||||
.resource_table
|
||||
.add(UpgradeStream::new(read_rx, write_tx)),
|
||||
)
|
||||
}
|
||||
|
||||
#[op]
|
||||
pub async fn op_upgrade(
|
||||
|
@ -825,3 +904,57 @@ pub async fn op_http_wait(
|
|||
|
||||
Ok(u32::MAX)
|
||||
}
|
||||
|
||||
struct UpgradeStream {
|
||||
read: AsyncRefCell<tokio::io::ReadHalf<tokio::io::DuplexStream>>,
|
||||
write: AsyncRefCell<tokio::io::WriteHalf<tokio::io::DuplexStream>>,
|
||||
cancel_handle: CancelHandle,
|
||||
}
|
||||
|
||||
impl UpgradeStream {
|
||||
pub fn new(
|
||||
read: tokio::io::ReadHalf<tokio::io::DuplexStream>,
|
||||
write: tokio::io::WriteHalf<tokio::io::DuplexStream>,
|
||||
) -> Self {
|
||||
Self {
|
||||
read: AsyncRefCell::new(read),
|
||||
write: AsyncRefCell::new(write),
|
||||
cancel_handle: CancelHandle::new(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> {
|
||||
let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
|
||||
async {
|
||||
let read = RcRef::map(self, |this| &this.read);
|
||||
let mut read = read.borrow_mut().await;
|
||||
Ok(Pin::new(&mut *read).read(buf).await?)
|
||||
}
|
||||
.try_or_cancel(cancel_handle)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> {
|
||||
let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
|
||||
async {
|
||||
let write = RcRef::map(self, |this| &this.write);
|
||||
let mut write = write.borrow_mut().await;
|
||||
Ok(Pin::new(&mut *write).write(buf).await?)
|
||||
}
|
||||
.try_or_cancel(cancel_handle)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
impl Resource for UpgradeStream {
|
||||
fn name(&self) -> Cow<str> {
|
||||
"httpRawUpgradeStream".into()
|
||||
}
|
||||
|
||||
deno_core::impl_readable_byob!();
|
||||
deno_core::impl_writable!();
|
||||
|
||||
fn close(self: Rc<Self>) {
|
||||
self.cancel_handle.cancel();
|
||||
}
|
||||
}
|
||||
|
|
190
ext/http/lib.rs
190
ext/http/lib.rs
|
@ -32,7 +32,6 @@ use deno_core::RcRef;
|
|||
use deno_core::Resource;
|
||||
use deno_core::ResourceId;
|
||||
use deno_core::StringOrBuffer;
|
||||
use deno_core::WriteOutcome;
|
||||
use deno_core::ZeroCopyBuf;
|
||||
use deno_net::raw::NetworkStream;
|
||||
use deno_websocket::ws_create_server_stream;
|
||||
|
@ -67,11 +66,9 @@ use std::sync::Arc;
|
|||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::io::AsyncWrite;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use tokio::task::spawn_local;
|
||||
use websocket_upgrade::WebSocketUpgrade;
|
||||
|
||||
use crate::network_buffered_stream::NetworkBufferedStream;
|
||||
use crate::reader_stream::ExternallyAbortableReaderStream;
|
||||
|
@ -97,7 +94,6 @@ deno_core::extension!(
|
|||
op_http_write_resource,
|
||||
op_http_shutdown,
|
||||
op_http_websocket_accept_header,
|
||||
op_http_upgrade_early,
|
||||
op_http_upgrade_websocket,
|
||||
http_next::op_serve_http,
|
||||
http_next::op_serve_http_on,
|
||||
|
@ -967,192 +963,6 @@ fn op_http_websocket_accept_header(key: String) -> Result<String, AnyError> {
|
|||
Ok(base64::encode(digest))
|
||||
}
|
||||
|
||||
struct EarlyUpgradeSocket(AsyncRefCell<EarlyUpgradeSocketInner>, CancelHandle);
|
||||
|
||||
enum EarlyUpgradeSocketInner {
|
||||
PreResponse(
|
||||
Rc<HttpStreamResource>,
|
||||
WebSocketUpgrade,
|
||||
// Readers need to block in this state, so they can wait here for the broadcast.
|
||||
tokio::sync::broadcast::Sender<
|
||||
Rc<AsyncRefCell<tokio::io::ReadHalf<hyper::upgrade::Upgraded>>>,
|
||||
>,
|
||||
),
|
||||
PostResponse(
|
||||
Rc<AsyncRefCell<tokio::io::ReadHalf<hyper::upgrade::Upgraded>>>,
|
||||
Rc<AsyncRefCell<tokio::io::WriteHalf<hyper::upgrade::Upgraded>>>,
|
||||
),
|
||||
}
|
||||
|
||||
impl EarlyUpgradeSocket {
|
||||
/// Gets a reader without holding the lock.
|
||||
async fn get_reader(
|
||||
self: Rc<Self>,
|
||||
) -> Result<
|
||||
Rc<AsyncRefCell<tokio::io::ReadHalf<hyper::upgrade::Upgraded>>>,
|
||||
AnyError,
|
||||
> {
|
||||
let mut borrow = RcRef::map(self.clone(), |x| &x.0).borrow_mut().await;
|
||||
let cancel = RcRef::map(self, |x| &x.1);
|
||||
let inner = &mut *borrow;
|
||||
match inner {
|
||||
EarlyUpgradeSocketInner::PreResponse(_, _, tx) => {
|
||||
let mut rx = tx.subscribe();
|
||||
// Ensure we're not borrowing self here
|
||||
drop(borrow);
|
||||
Ok(
|
||||
rx.recv()
|
||||
.map_err(AnyError::from)
|
||||
.try_or_cancel(&cancel)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
EarlyUpgradeSocketInner::PostResponse(rx, _) => Ok(rx.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn read(self: Rc<Self>, data: &mut [u8]) -> Result<usize, AnyError> {
|
||||
let reader = self.clone().get_reader().await?;
|
||||
let cancel = RcRef::map(self, |x| &x.1);
|
||||
Ok(
|
||||
reader
|
||||
.borrow_mut()
|
||||
.await
|
||||
.read(data)
|
||||
.try_or_cancel(&cancel)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
|
||||
/// Write all the data provided, only holding the lock while we see if the connection needs to be
|
||||
/// upgraded.
|
||||
async fn write_all(self: Rc<Self>, buf: &[u8]) -> Result<(), AnyError> {
|
||||
let mut borrow = RcRef::map(self.clone(), |x| &x.0).borrow_mut().await;
|
||||
let cancel = RcRef::map(self, |x| &x.1);
|
||||
let inner = &mut *borrow;
|
||||
match inner {
|
||||
EarlyUpgradeSocketInner::PreResponse(stream, upgrade, rx_tx) => {
|
||||
if let Some((resp, extra)) = upgrade.write(buf)? {
|
||||
let new_wr = HttpResponseWriter::Closed;
|
||||
let mut old_wr =
|
||||
RcRef::map(stream.clone(), |r| &r.wr).borrow_mut().await;
|
||||
let response_tx = match replace(&mut *old_wr, new_wr) {
|
||||
HttpResponseWriter::Headers(response_tx) => response_tx,
|
||||
_ => return Err(http_error("response headers already sent")),
|
||||
};
|
||||
|
||||
if response_tx.send(resp).is_err() {
|
||||
stream.conn.closed().await?;
|
||||
return Err(http_error("connection closed while sending response"));
|
||||
};
|
||||
|
||||
let mut old_rd =
|
||||
RcRef::map(stream.clone(), |r| &r.rd).borrow_mut().await;
|
||||
let new_rd = HttpRequestReader::Closed;
|
||||
let upgraded = match replace(&mut *old_rd, new_rd) {
|
||||
HttpRequestReader::Headers(request) => {
|
||||
hyper::upgrade::on(request)
|
||||
.map_err(AnyError::from)
|
||||
.try_or_cancel(&cancel)
|
||||
.await?
|
||||
}
|
||||
_ => {
|
||||
return Err(http_error("response already started"));
|
||||
}
|
||||
};
|
||||
|
||||
let (rx, tx) = tokio::io::split(upgraded);
|
||||
let rx = Rc::new(AsyncRefCell::new(rx));
|
||||
let tx = Rc::new(AsyncRefCell::new(tx));
|
||||
|
||||
// Take the tx and rx lock before we allow anything else to happen because we want to control
|
||||
// the order of reads and writes.
|
||||
let mut tx_lock = tx.clone().borrow_mut().await;
|
||||
let rx_lock = rx.clone().borrow_mut().await;
|
||||
|
||||
// Allow all the pending readers to go now. We still have the lock on inner, so no more
|
||||
// pending readers can show up. We intentionally ignore errors here, as there may be
|
||||
// nobody waiting on a read.
|
||||
_ = rx_tx.send(rx.clone());
|
||||
|
||||
// We swap out inner here, so once the lock is gone, readers will acquire rx directly.
|
||||
// We also fully release our lock.
|
||||
*inner = EarlyUpgradeSocketInner::PostResponse(rx, tx);
|
||||
drop(borrow);
|
||||
|
||||
// We've updated inner and unlocked it, reads are free to go in-order.
|
||||
drop(rx_lock);
|
||||
|
||||
// If we had extra data after the response, write that to the upgraded connection
|
||||
if !extra.is_empty() {
|
||||
tx_lock.write_all(&extra).try_or_cancel(&cancel).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
EarlyUpgradeSocketInner::PostResponse(_, tx) => {
|
||||
let tx = tx.clone();
|
||||
drop(borrow);
|
||||
tx.borrow_mut()
|
||||
.await
|
||||
.write_all(buf)
|
||||
.try_or_cancel(&cancel)
|
||||
.await?;
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Resource for EarlyUpgradeSocket {
|
||||
fn name(&self) -> Cow<str> {
|
||||
"upgradedHttpConnection".into()
|
||||
}
|
||||
|
||||
deno_core::impl_readable_byob!();
|
||||
|
||||
fn write(
|
||||
self: Rc<Self>,
|
||||
buf: BufView,
|
||||
) -> AsyncResult<deno_core::WriteOutcome> {
|
||||
Box::pin(async move {
|
||||
let nwritten = buf.len();
|
||||
Self::write_all(self, &buf).await?;
|
||||
Ok(WriteOutcome::Full { nwritten })
|
||||
})
|
||||
}
|
||||
|
||||
fn write_all(self: Rc<Self>, buf: BufView) -> AsyncResult<()> {
|
||||
Box::pin(async move { Self::write_all(self, &buf).await })
|
||||
}
|
||||
|
||||
fn close(self: Rc<Self>) {
|
||||
self.1.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
#[op]
|
||||
async fn op_http_upgrade_early(
|
||||
state: Rc<RefCell<OpState>>,
|
||||
rid: ResourceId,
|
||||
) -> Result<ResourceId, AnyError> {
|
||||
let stream = state
|
||||
.borrow_mut()
|
||||
.resource_table
|
||||
.get::<HttpStreamResource>(rid)?;
|
||||
let resources = &mut state.borrow_mut().resource_table;
|
||||
let (tx, _rx) = tokio::sync::broadcast::channel(1);
|
||||
let socket = EarlyUpgradeSocketInner::PreResponse(
|
||||
stream,
|
||||
WebSocketUpgrade::default(),
|
||||
tx,
|
||||
);
|
||||
let rid = resources.add(EarlyUpgradeSocket(
|
||||
AsyncRefCell::new(socket),
|
||||
CancelHandle::new(),
|
||||
));
|
||||
Ok(rid)
|
||||
}
|
||||
|
||||
#[op]
|
||||
async fn op_http_upgrade_websocket(
|
||||
state: Rc<RefCell<OpState>>,
|
||||
|
|
|
@ -1,12 +1,13 @@
|
|||
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
|
||||
|
||||
use std::marker::PhantomData;
|
||||
|
||||
use bytes::Bytes;
|
||||
use bytes::BytesMut;
|
||||
use deno_core::error::AnyError;
|
||||
use httparse::Status;
|
||||
use hyper::http::HeaderName;
|
||||
use hyper::http::HeaderValue;
|
||||
use hyper::Body;
|
||||
use hyper::Response;
|
||||
use memmem::Searcher;
|
||||
use memmem::TwoWaySearcher;
|
||||
|
@ -15,14 +16,14 @@ use once_cell::sync::OnceCell;
|
|||
use crate::http_error;
|
||||
|
||||
/// Given a buffer that ends in `\n\n` or `\r\n\r\n`, returns a parsed [`Request<Body>`].
|
||||
fn parse_response(
|
||||
fn parse_response<T: Default>(
|
||||
header_bytes: &[u8],
|
||||
) -> Result<(usize, Response<Body>), AnyError> {
|
||||
) -> Result<(usize, Response<T>), AnyError> {
|
||||
let mut headers = [httparse::EMPTY_HEADER; 16];
|
||||
let status = httparse::parse_headers(header_bytes, &mut headers)?;
|
||||
match status {
|
||||
Status::Complete((index, parsed)) => {
|
||||
let mut resp = Response::builder().status(101).body(Body::empty())?;
|
||||
let mut resp = Response::builder().status(101).body(T::default())?;
|
||||
for header in parsed.iter() {
|
||||
resp.headers_mut().append(
|
||||
HeaderName::from_bytes(header.name.as_bytes())?,
|
||||
|
@ -59,12 +60,13 @@ static HEADER_SEARCHER: OnceCell<TwoWaySearcher> = OnceCell::new();
|
|||
static HEADER_SEARCHER2: OnceCell<TwoWaySearcher> = OnceCell::new();
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct WebSocketUpgrade {
|
||||
pub struct WebSocketUpgrade<T: Default> {
|
||||
state: WebSocketUpgradeState,
|
||||
buf: BytesMut,
|
||||
_t: PhantomData<T>,
|
||||
}
|
||||
|
||||
impl WebSocketUpgrade {
|
||||
impl<T: Default> WebSocketUpgrade<T> {
|
||||
/// Ensures that the status line starts with "HTTP/1.1 101 " which matches all of the node.js
|
||||
/// WebSocket libraries that are known. We don't care about the trailing status text.
|
||||
fn validate_status(&self, status: &[u8]) -> Result<(), AnyError> {
|
||||
|
@ -80,7 +82,7 @@ impl WebSocketUpgrade {
|
|||
pub fn write(
|
||||
&mut self,
|
||||
bytes: &[u8],
|
||||
) -> Result<Option<(Response<Body>, Bytes)>, AnyError> {
|
||||
) -> Result<Option<(Response<T>, Bytes)>, AnyError> {
|
||||
use WebSocketUpgradeState::*;
|
||||
|
||||
match self.state {
|
||||
|
@ -153,6 +155,7 @@ impl WebSocketUpgrade {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use hyper::Body;
|
||||
|
||||
type ExpectedResponseAndHead = Option<(Response<Body>, &'static [u8])>;
|
||||
|
||||
|
|
|
@ -16,7 +16,7 @@ import { Agent } from "ext:deno_node/_http_agent.mjs";
|
|||
import { chunkExpression as RE_TE_CHUNKED } from "ext:deno_node/_http_common.ts";
|
||||
import { urlToHttpOptions } from "ext:deno_node/internal/url.ts";
|
||||
import { constants, TCP } from "ext:deno_node/internal_binding/tcp_wrap.ts";
|
||||
import * as denoHttp from "ext:deno_http/01_http.js";
|
||||
import { upgradeHttpRaw } from "ext:deno_http/00_serve.js";
|
||||
import * as httpRuntime from "ext:runtime/40_http.js";
|
||||
import { connResetException } from "ext:deno_node/internal/errors.ts";
|
||||
|
||||
|
@ -704,7 +704,7 @@ class ServerImpl extends EventEmitter {
|
|||
}
|
||||
const req = new IncomingMessageForServer(reqEvent.request, tcpConn);
|
||||
if (req.upgrade && this.listenerCount("upgrade") > 0) {
|
||||
const conn = await denoHttp.upgradeHttpRaw(
|
||||
const conn = await upgradeHttpRaw(
|
||||
reqEvent.request,
|
||||
tcpConn,
|
||||
) as Deno.Conn;
|
||||
|
|
Loading…
Reference in a new issue