1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-12 09:03:42 -05:00

refactor(ext/http): remove op_http_read (#16096)

We can use Resource::read_return & op_read instead. This allows HTTP
request bodies to participate in FastStream.

To make this work, `readableStreamForRid` required a change to allow non
auto-closing resources to be handled. This required some minor changes
in our FastStream paths in ext/http and ext/flash.
This commit is contained in:
Luca Casonato 2022-09-30 07:54:12 +02:00 committed by GitHub
parent 38f544538b
commit 20c7300412
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 91 additions and 119 deletions

View file

@ -165,7 +165,7 @@ async fn op_read(
buf: ZeroCopyBuf, buf: ZeroCopyBuf,
) -> Result<u32, Error> { ) -> Result<u32, Error> {
let resource = state.borrow().resource_table.get_any(rid)?; let resource = state.borrow().resource_table.get_any(rid)?;
resource.read(buf).await.map(|n| n as u32) resource.read_return(buf).await.map(|(n, _)| n as u32)
} }
#[op] #[op]

View file

@ -35,14 +35,7 @@ pub trait Resource: Any + 'static {
type_name::<Self>().into() type_name::<Self>().into()
} }
/// Resources may implement `read()` to be a readable stream /// Resources may implement `read_return()` to be a readable stream
fn read(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
Box::pin(async move {
let (nread, _) = self.read_return(buf).await?;
Ok(nread)
})
}
fn read_return( fn read_return(
self: Rc<Self>, self: Rc<Self>,
_buf: ZeroCopyBuf, _buf: ZeroCopyBuf,

View file

@ -10,7 +10,7 @@
const { const {
ReadableStream, ReadableStream,
ReadableStreamPrototype, ReadableStreamPrototype,
getReadableStreamRid, getReadableStreamResourceBacking,
readableStreamClose, readableStreamClose,
_state, _state,
} = window.__bootstrap.streams; } = window.__bootstrap.streams;
@ -333,8 +333,8 @@
} }
if (isStreamingResponseBody === true) { if (isStreamingResponseBody === true) {
const resourceRid = getReadableStreamRid(respBody); const resourceBacking = getReadableStreamResourceBacking(respBody);
if (resourceRid) { if (resourceBacking) {
if (respBody.locked) { if (respBody.locked) {
throw new TypeError("ReadableStream is locked."); throw new TypeError("ReadableStream is locked.");
} }
@ -352,7 +352,8 @@
), ),
serverId, serverId,
i, i,
resourceRid, resourceBacking.rid,
resourceBacking.autoClose,
).then(() => { ).then(() => {
// Release JS lock. // Release JS lock.
readableStreamClose(respBody); readableStreamClose(respBody);

View file

@ -205,16 +205,20 @@ async fn op_flash_write_resource(
server_id: u32, server_id: u32,
token: u32, token: u32,
resource_id: deno_core::ResourceId, resource_id: deno_core::ResourceId,
auto_close: bool,
) -> Result<(), AnyError> { ) -> Result<(), AnyError> {
let resource = op_state.borrow_mut().resource_table.take_any(resource_id)?; let (resource, sock) = {
let sock = {
let op_state = &mut op_state.borrow_mut(); let op_state = &mut op_state.borrow_mut();
let resource = if auto_close {
op_state.resource_table.take_any(resource_id)?
} else {
op_state.resource_table.get_any(resource_id)?
};
let flash_ctx = op_state.borrow_mut::<FlashContext>(); let flash_ctx = op_state.borrow_mut::<FlashContext>();
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap(); let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
ctx.requests.remove(&token).unwrap().socket() (resource, ctx.requests.remove(&token).unwrap().socket())
}; };
drop(op_state);
let _ = sock.write(&response); let _ = sock.write(&response);
#[cfg(unix)] #[cfg(unix)]

View file

@ -17,8 +17,7 @@
} = window.__bootstrap.fetch; } = window.__bootstrap.fetch;
const core = window.Deno.core; const core = window.Deno.core;
const { BadResourcePrototype, InterruptedPrototype, ops } = core; const { BadResourcePrototype, InterruptedPrototype, ops } = core;
const { ReadableStream, ReadableStreamPrototype } = const { ReadableStreamPrototype } = window.__bootstrap.streams;
window.__bootstrap.streams;
const abortSignal = window.__bootstrap.abortSignal; const abortSignal = window.__bootstrap.abortSignal;
const { const {
WebSocket, WebSocket,
@ -33,8 +32,12 @@
} = window.__bootstrap.webSocket; } = window.__bootstrap.webSocket;
const { TcpConn, UnixConn } = window.__bootstrap.net; const { TcpConn, UnixConn } = window.__bootstrap.net;
const { TlsConn } = window.__bootstrap.tls; const { TlsConn } = window.__bootstrap.tls;
const { Deferred, getReadableStreamRid, readableStreamClose } = const {
window.__bootstrap.streams; Deferred,
getReadableStreamResourceBacking,
readableStreamForRid,
readableStreamClose,
} = window.__bootstrap.streams;
const { const {
ArrayPrototypeIncludes, ArrayPrototypeIncludes,
ArrayPrototypePush, ArrayPrototypePush,
@ -50,7 +53,6 @@
StringPrototypeSplit, StringPrototypeSplit,
Symbol, Symbol,
SymbolAsyncIterator, SymbolAsyncIterator,
TypedArrayPrototypeSubarray,
TypeError, TypeError,
Uint8Array, Uint8Array,
Uint8ArrayPrototype, Uint8ArrayPrototype,
@ -121,7 +123,7 @@
// It will be closed automatically once the request has been handled and // It will be closed automatically once the request has been handled and
// the response has been sent. // the response has been sent.
if (method !== "GET" && method !== "HEAD") { if (method !== "GET" && method !== "HEAD") {
body = createRequestBodyStream(streamRid); body = readableStreamForRid(streamRid, false);
} }
const innerRequest = newInnerRequest( const innerRequest = newInnerRequest(
@ -170,10 +172,6 @@
} }
} }
function readRequest(streamRid, buf) {
return core.opAsync("op_http_read", streamRid, buf);
}
function createRespondWith( function createRespondWith(
httpConn, httpConn,
streamRid, streamRid,
@ -270,9 +268,9 @@
) { ) {
throw new TypeError("Unreachable"); throw new TypeError("Unreachable");
} }
const resourceRid = getReadableStreamRid(respBody); const resourceBacking = getReadableStreamResourceBacking(respBody);
let reader; let reader;
if (resourceRid) { if (resourceBacking) {
if (respBody.locked) { if (respBody.locked) {
throw new TypeError("ReadableStream is locked."); throw new TypeError("ReadableStream is locked.");
} }
@ -281,9 +279,9 @@
await core.opAsync( await core.opAsync(
"op_http_write_resource", "op_http_write_resource",
streamRid, streamRid,
resourceRid, resourceBacking.rid,
); );
core.tryClose(resourceRid); if (resourceBacking.autoClose) core.tryClose(resourceBacking.rid);
readableStreamClose(respBody); // Release JS lock. readableStreamClose(respBody); // Release JS lock.
} catch (error) { } catch (error) {
const connError = httpConn[connErrorSymbol]; const connError = httpConn[connErrorSymbol];
@ -379,32 +377,6 @@
}; };
} }
function createRequestBodyStream(streamRid) {
return new ReadableStream({
type: "bytes",
async pull(controller) {
try {
// This is the largest possible size for a single packet on a TLS
// stream.
const chunk = new Uint8Array(16 * 1024 + 256);
const read = await readRequest(streamRid, chunk);
if (read > 0) {
// We read some data. Enqueue it onto the stream.
controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read));
} else {
// We have reached the end of the body, so we close the stream.
controller.close();
}
} catch (err) {
// There was an error while reading a chunk of the body, so we
// error.
controller.error(err);
controller.close();
}
},
});
}
const _ws = Symbol("[[associated_ws]]"); const _ws = Symbol("[[associated_ws]]");
function upgradeWebSocket(request, options = {}) { function upgradeWebSocket(request, options = {}) {

View file

@ -78,7 +78,6 @@ pub fn init() -> Extension {
)) ))
.ops(vec![ .ops(vec![
op_http_accept::decl(), op_http_accept::decl(),
op_http_read::decl(),
op_http_write_headers::decl(), op_http_write_headers::decl(),
op_http_headers::decl(), op_http_headers::decl(),
op_http_write::decl(), op_http_write::decl(),
@ -329,11 +328,63 @@ impl HttpStreamResource {
} }
} }
impl HttpStreamResource {
async fn read(
self: Rc<Self>,
mut buf: ZeroCopyBuf,
) -> Result<(usize, ZeroCopyBuf), AnyError> {
let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await;
let body = loop {
match &mut *rd {
HttpRequestReader::Headers(_) => {}
HttpRequestReader::Body(_, body) => break body,
HttpRequestReader::Closed => return Ok((0, buf)),
}
match take(&mut *rd) {
HttpRequestReader::Headers(request) => {
let (parts, body) = request.into_parts();
*rd = HttpRequestReader::Body(parts.headers, body.peekable());
}
_ => unreachable!(),
};
};
let fut = async {
let mut body = Pin::new(body);
loop {
match body.as_mut().peek_mut().await {
Some(Ok(chunk)) if !chunk.is_empty() => {
let len = min(buf.len(), chunk.len());
buf[..len].copy_from_slice(&chunk.split_to(len));
break Ok((len, buf));
}
Some(_) => match body.as_mut().next().await.unwrap() {
Ok(chunk) => assert!(chunk.is_empty()),
Err(err) => break Err(AnyError::from(err)),
},
None => break Ok((0, buf)),
}
}
};
let cancel_handle = RcRef::map(&self, |r| &r.cancel_handle);
fut.try_or_cancel(cancel_handle).await
}
}
impl Resource for HttpStreamResource { impl Resource for HttpStreamResource {
fn name(&self) -> Cow<str> { fn name(&self) -> Cow<str> {
"httpStream".into() "httpStream".into()
} }
fn read_return(
self: Rc<Self>,
_buf: ZeroCopyBuf,
) -> deno_core::AsyncResult<(usize, ZeroCopyBuf)> {
Box::pin(self.read(_buf))
}
fn close(self: Rc<Self>) { fn close(self: Rc<Self>) {
self.cancel_handle.cancel(); self.cancel_handle.cancel();
} }
@ -816,55 +867,6 @@ async fn op_http_shutdown(
Ok(()) Ok(())
} }
#[op]
async fn op_http_read(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
mut buf: ZeroCopyBuf,
) -> Result<usize, AnyError> {
let stream = state
.borrow_mut()
.resource_table
.get::<HttpStreamResource>(rid)?;
let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await;
let body = loop {
match &mut *rd {
HttpRequestReader::Headers(_) => {}
HttpRequestReader::Body(_, body) => break body,
HttpRequestReader::Closed => return Ok(0),
}
match take(&mut *rd) {
HttpRequestReader::Headers(request) => {
let (parts, body) = request.into_parts();
*rd = HttpRequestReader::Body(parts.headers, body.peekable());
}
_ => unreachable!(),
};
};
let fut = async {
let mut body = Pin::new(body);
loop {
match body.as_mut().peek_mut().await {
Some(Ok(chunk)) if !chunk.is_empty() => {
let len = min(buf.len(), chunk.len());
buf[..len].copy_from_slice(&chunk.split_to(len));
break Ok(len);
}
Some(_) => match body.as_mut().next().await.unwrap() {
Ok(chunk) => assert!(chunk.is_empty()),
Err(err) => break Err(AnyError::from(err)),
},
None => break Ok(0),
}
}
};
let cancel_handle = RcRef::map(&stream, |r| &r.cancel_handle);
fut.try_or_cancel(cancel_handle).await
}
#[op] #[op]
fn op_http_websocket_accept_header(key: String) -> Result<String, AnyError> { fn op_http_websocket_accept_header(key: String) -> Result<String, AnyError> {
let digest = ring::digest::digest( let digest = ring::digest::digest(

View file

@ -654,11 +654,12 @@
* read directly from the underlying resource if they so choose (FastStream). * read directly from the underlying resource if they so choose (FastStream).
* *
* @param {number} rid The resource ID to read from. * @param {number} rid The resource ID to read from.
* @param {boolean=} autoClose If the resource should be auto-closed when the stream closes. Defaults to true.
* @returns {ReadableStream<Uint8Array>} * @returns {ReadableStream<Uint8Array>}
*/ */
function readableStreamForRid(rid) { function readableStreamForRid(rid, autoClose = true) {
const stream = webidl.createBranded(ReadableStream); const stream = webidl.createBranded(ReadableStream);
stream[_maybeRid] = rid; stream[_resourceBacking] = { rid, autoClose };
const underlyingSource = { const underlyingSource = {
type: "bytes", type: "bytes",
async pull(controller) { async pull(controller) {
@ -666,7 +667,7 @@
try { try {
const bytesRead = await core.read(rid, v); const bytesRead = await core.read(rid, v);
if (bytesRead === 0) { if (bytesRead === 0) {
core.tryClose(rid); if (autoClose) core.tryClose(rid);
controller.close(); controller.close();
controller.byobRequest.respond(0); controller.byobRequest.respond(0);
} else { } else {
@ -674,11 +675,11 @@
} }
} catch (e) { } catch (e) {
controller.error(e); controller.error(e);
core.tryClose(rid); if (autoClose) core.tryClose(rid);
} }
}, },
cancel() { cancel() {
core.tryClose(rid); if (autoClose) core.tryClose(rid);
}, },
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
}; };
@ -761,8 +762,8 @@
} }
} }
function getReadableStreamRid(stream) { function getReadableStreamResourceBacking(stream) {
return stream[_maybeRid]; return stream[_resourceBacking];
} }
/** /**
@ -4424,7 +4425,7 @@
WeakMapPrototypeSet(countSizeFunctionWeakMap, globalObject, size); WeakMapPrototypeSet(countSizeFunctionWeakMap, globalObject, size);
} }
const _maybeRid = Symbol("[[maybeRid]]"); const _resourceBacking = Symbol("[[resourceBacking]]");
/** @template R */ /** @template R */
class ReadableStream { class ReadableStream {
/** @type {ReadableStreamDefaultController | ReadableByteStreamController} */ /** @type {ReadableStreamDefaultController | ReadableByteStreamController} */
@ -4439,8 +4440,8 @@
[_state]; [_state];
/** @type {any} */ /** @type {any} */
[_storedError]; [_storedError];
/** @type {number | null} */ /** @type {{ rid: number, autoClose: boolean } | null} */
[_maybeRid] = null; [_resourceBacking] = null;
/** /**
* @param {UnderlyingSource<R>=} underlyingSource * @param {UnderlyingSource<R>=} underlyingSource
@ -5986,7 +5987,7 @@
readableStreamForRidUnrefable, readableStreamForRidUnrefable,
readableStreamForRidUnrefableRef, readableStreamForRidUnrefableRef,
readableStreamForRidUnrefableUnref, readableStreamForRidUnrefableUnref,
getReadableStreamRid, getReadableStreamResourceBacking,
Deferred, Deferred,
// Exposed in global runtime scope // Exposed in global runtime scope
ByteLengthQueuingStrategy, ByteLengthQueuingStrategy,

View file

@ -90,7 +90,6 @@
"op_funlock_async": ["unlock a file", "awaiting the result of a `Deno.funlock` call"], "op_funlock_async": ["unlock a file", "awaiting the result of a `Deno.funlock` call"],
"op_futime_async": ["change file timestamps", "awaiting the result of a `Deno.futime` call"], "op_futime_async": ["change file timestamps", "awaiting the result of a `Deno.futime` call"],
"op_http_accept": ["accept a HTTP request", "closing a `Deno.HttpConn`"], "op_http_accept": ["accept a HTTP request", "closing a `Deno.HttpConn`"],
"op_http_read": ["read the body of a HTTP request", "consuming the entire request body"],
"op_http_shutdown": ["shutdown a HTTP connection", "awaiting `Deno.HttpEvent#respondWith`"], "op_http_shutdown": ["shutdown a HTTP connection", "awaiting `Deno.HttpEvent#respondWith`"],
"op_http_upgrade_websocket": ["upgrade a HTTP connection to a WebSocket", "awaiting `Deno.HttpEvent#respondWith`"], "op_http_upgrade_websocket": ["upgrade a HTTP connection to a WebSocket", "awaiting `Deno.HttpEvent#respondWith`"],
"op_http_write_headers": ["write HTTP response headers", "awaiting `Deno.HttpEvent#respondWith`"], "op_http_write_headers": ["write HTTP response headers", "awaiting `Deno.HttpEvent#respondWith`"],