1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-11-29 16:30:56 -05:00
denoland-deno/ext/http/http_next.rs

1245 lines
35 KiB
Rust
Raw Normal View History

// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use crate::compressible::is_content_compressible;
use crate::extract_network_stream;
feat(ext/http): Upgrade to hyper1.0-rc4 (#19987) Includes a lightly-modified version of hyper-util's `TokioIo` utility. Hyper changes: v1.0.0-rc.4 (2023-07-10) Bug Fixes http1: http1 server graceful shutdown fix (#3261) ([f4b51300](https://github.com/hyperium/hyper/commit/f4b513009d81083081d1c60c1981847bbb17dd5d)) send error on Incoming body when connection errors (#3256) ([52f19259](https://github.com/hyperium/hyper/commit/52f192593fb9ebcf6d3894e0c85cbf710da4decd), closes https://github.com/hyperium/hyper/issues/3253) properly end chunked bodies when it was known to be empty (#3254) ([fec64cf0](https://github.com/hyperium/hyper/commit/fec64cf0abdc678e30ca5f1b310c5118b2e01999), closes https://github.com/hyperium/hyper/issues/3252) Features client: Make clients able to use non-Send executor (#3184) ([d977f209](https://github.com/hyperium/hyper/commit/d977f209bc6068d8f878b22803fc42d90c887fcc), closes https://github.com/hyperium/hyper/issues/3017) rt: replace IO traits with hyper::rt ones (#3230) ([f9f65b7a](https://github.com/hyperium/hyper/commit/f9f65b7aa67fa3ec0267fe015945973726285bc2), closes https://github.com/hyperium/hyper/issues/3110) add downcast on Sleep trait (#3125) ([d92d3917](https://github.com/hyperium/hyper/commit/d92d3917d950e4c61c37c2170f3ce273d2a0f7d1), closes https://github.com/hyperium/hyper/issues/3027) service: change Service::call to take &self (#3223) ([d894439e](https://github.com/hyperium/hyper/commit/d894439e009aa75103f6382a7ba98fb17da72f02), closes https://github.com/hyperium/hyper/issues/3040) Breaking Changes Any IO transport type provided must not implement hyper::rt::{Read, Write} instead of tokio::io traits. You can grab a helper type from hyper-util to wrap Tokio types, or implement the traits yourself, if it's a custom type. ([f9f65b7a](https://github.com/hyperium/hyper/commit/f9f65b7aa67fa3ec0267fe015945973726285bc2)) client::conn::http2 types now use another generic for an Executor. Code that names Connection needs to include the additional generic parameter. ([d977f209](https://github.com/hyperium/hyper/commit/d977f209bc6068d8f878b22803fc42d90c887fcc)) The Service::call function no longer takes a mutable reference to self. The FnMut trait bound on the service::util::service_fn function and the trait bound on the impl for the ServiceFn struct were changed from FnMut to Fn.
2023-07-31 09:34:53 -04:00
use crate::hyper_util_tokioio::TokioIo;
use crate::network_buffered_stream::NetworkStreamPrefixCheck;
use crate::request_body::HttpRequestBody;
use crate::request_properties::HttpConnectionProperties;
use crate::request_properties::HttpListenProperties;
use crate::request_properties::HttpPropertyExtractor;
use crate::response_body::Compression;
use crate::response_body::ResponseBytes;
use crate::response_body::ResponseBytesInner;
use crate::slab::new_slab_future;
use crate::slab::slab_get;
use crate::slab::slab_init;
fix(ext/http): ensure request body resource lives as long as response is alive (#20206) Deno.serve's fast streaming implementation was not keeping the request body resource ID alive. We were taking the `Rc<Resource>` from the resource table during the response, so a hairpin duplex response that fed back the request body would work. However, if any JS code attempted to read from the request body (which requires the resource ID to be valid), the response would fail with a difficult-to-diagnose "EOF" error. This was affecting more complex duplex uses of `Deno.fetch` (though as far as I can tell was unreported). Simple test: ```ts const reader = request.body.getReader(); return new Response( new ReadableStream({ async pull(controller) { const { done, value } = await reader.read(); if (done) { controller.close(); } else { controller.enqueue(value); } }, }), ``` And then attempt to use the stream in duplex mode: ```ts async function testDuplex( reader: ReadableStreamDefaultReader<Uint8Array>, writable: WritableStreamDefaultWriter<Uint8Array>, ) { await writable.write(new Uint8Array([1])); const chunk1 = await reader.read(); assert(!chunk1.done); assertEquals(chunk1.value, new Uint8Array([1])); await writable.write(new Uint8Array([2])); const chunk2 = await reader.read(); assert(!chunk2.done); assertEquals(chunk2.value, new Uint8Array([2])); await writable.close(); const chunk3 = await reader.read(); assert(chunk3.done); } ``` In older versions of Deno, this would just lock up. I believe after 23ff0e722e3c4b0827940853c53c5ee2ede5ec9f, it started throwing a more explicit error: ``` httpServerStreamDuplexJavascript => ./cli/tests/unit/serve_test.ts:1339:6 error: TypeError: request or response body error: error reading a body from connection: Connection reset by peer (os error 54) at async Object.pull (ext:deno_web/06_streams.js:810:27) ```
2023-08-20 21:35:26 -04:00
use crate::slab::HttpRequestBodyAutocloser;
use crate::slab::RefCount;
use crate::slab::SlabId;
use crate::websocket_upgrade::WebSocketUpgrade;
use crate::LocalExecutor;
use cache_control::CacheControl;
use deno_core::error::AnyError;
use deno_core::futures::TryFutureExt;
use deno_core::op2;
use deno_core::serde_v8::from_v8;
use deno_core::unsync::spawn;
use deno_core::unsync::JoinHandle;
use deno_core::v8;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::BufView;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::JsBuffer;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_net::ops_tls::TlsStream;
use deno_net::raw::NetworkStream;
use deno_websocket::ws_create_server_stream;
use fly_accept_encoding::Encoding;
use http::header::ACCEPT_ENCODING;
use http::header::CACHE_CONTROL;
use http::header::CONTENT_ENCODING;
use http::header::CONTENT_LENGTH;
use http::header::CONTENT_RANGE;
use http::header::CONTENT_TYPE;
use http::HeaderMap;
use hyper1::body::Incoming;
use hyper1::header::COOKIE;
use hyper1::http::HeaderName;
use hyper1::http::HeaderValue;
use hyper1::server::conn::http1;
use hyper1::server::conn::http2;
use hyper1::service::service_fn;
use hyper1::service::HttpService;
use hyper1::StatusCode;
use once_cell::sync::Lazy;
use smallvec::SmallVec;
use std::borrow::Cow;
use std::cell::RefCell;
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::rc::Rc;
use std::time::Duration;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
type Request = hyper1::Request<Incoming>;
static USE_WRITEV: Lazy<bool> = Lazy::new(|| {
let enable = std::env::var("DENO_USE_WRITEV").ok();
if let Some(val) = enable {
return !val.is_empty();
}
false
});
pub const UNSTABLE_FEATURE_NAME: &str = "http";
/// All HTTP/2 connections start with this byte string.
///
/// In HTTP/2, each endpoint is required to send a connection preface as a final confirmation
/// of the protocol in use and to establish the initial settings for the HTTP/2 connection. The
/// client and server each send a different connection preface.
///
/// The client connection preface starts with a sequence of 24 octets, which in hex notation is:
///
/// 0x505249202a20485454502f322e300d0a0d0a534d0d0a0d0a
///
/// That is, the connection preface starts with the string PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n). This sequence
/// MUST be followed by a SETTINGS frame (Section 6.5), which MAY be empty.
const HTTP2_PREFIX: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
2023-06-26 09:10:27 -04:00
/// ALPN negotiation for "h2"
const TLS_ALPN_HTTP_2: &[u8] = b"h2";
2023-06-26 09:10:27 -04:00
/// ALPN negotiation for "http/1.1"
const TLS_ALPN_HTTP_11: &[u8] = b"http/1.1";
/// Name a trait for streams we can serve HTTP over.
trait HttpServeStream:
tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static
{
}
impl<
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
> HttpServeStream for S
{
}
#[op2(fast)]
#[smi]
pub fn op_http_upgrade_raw(
state: &mut OpState,
#[smi] slab_id: SlabId,
) -> Result<ResourceId, AnyError> {
// Stage 1: extract the upgrade future
let upgrade = slab_get(slab_id).upgrade()?;
let (read, write) = tokio::io::duplex(1024);
let (read_rx, write_tx) = tokio::io::split(read);
let (mut write_rx, mut read_tx) = tokio::io::split(write);
spawn(async move {
let mut upgrade_stream = WebSocketUpgrade::<ResponseBytes>::default();
// Stage 2: Extract the Upgraded connection
let mut buf = [0; 1024];
let upgraded = loop {
let read = Pin::new(&mut write_rx).read(&mut buf).await?;
match upgrade_stream.write(&buf[..read]) {
Ok(None) => continue,
Ok(Some((response, bytes))) => {
let mut http = slab_get(slab_id);
*http.response() = response;
http.complete();
feat(ext/http): Upgrade to hyper1.0-rc4 (#19987) Includes a lightly-modified version of hyper-util's `TokioIo` utility. Hyper changes: v1.0.0-rc.4 (2023-07-10) Bug Fixes http1: http1 server graceful shutdown fix (#3261) ([f4b51300](https://github.com/hyperium/hyper/commit/f4b513009d81083081d1c60c1981847bbb17dd5d)) send error on Incoming body when connection errors (#3256) ([52f19259](https://github.com/hyperium/hyper/commit/52f192593fb9ebcf6d3894e0c85cbf710da4decd), closes https://github.com/hyperium/hyper/issues/3253) properly end chunked bodies when it was known to be empty (#3254) ([fec64cf0](https://github.com/hyperium/hyper/commit/fec64cf0abdc678e30ca5f1b310c5118b2e01999), closes https://github.com/hyperium/hyper/issues/3252) Features client: Make clients able to use non-Send executor (#3184) ([d977f209](https://github.com/hyperium/hyper/commit/d977f209bc6068d8f878b22803fc42d90c887fcc), closes https://github.com/hyperium/hyper/issues/3017) rt: replace IO traits with hyper::rt ones (#3230) ([f9f65b7a](https://github.com/hyperium/hyper/commit/f9f65b7aa67fa3ec0267fe015945973726285bc2), closes https://github.com/hyperium/hyper/issues/3110) add downcast on Sleep trait (#3125) ([d92d3917](https://github.com/hyperium/hyper/commit/d92d3917d950e4c61c37c2170f3ce273d2a0f7d1), closes https://github.com/hyperium/hyper/issues/3027) service: change Service::call to take &self (#3223) ([d894439e](https://github.com/hyperium/hyper/commit/d894439e009aa75103f6382a7ba98fb17da72f02), closes https://github.com/hyperium/hyper/issues/3040) Breaking Changes Any IO transport type provided must not implement hyper::rt::{Read, Write} instead of tokio::io traits. You can grab a helper type from hyper-util to wrap Tokio types, or implement the traits yourself, if it's a custom type. ([f9f65b7a](https://github.com/hyperium/hyper/commit/f9f65b7aa67fa3ec0267fe015945973726285bc2)) client::conn::http2 types now use another generic for an Executor. Code that names Connection needs to include the additional generic parameter. ([d977f209](https://github.com/hyperium/hyper/commit/d977f209bc6068d8f878b22803fc42d90c887fcc)) The Service::call function no longer takes a mutable reference to self. The FnMut trait bound on the service::util::service_fn function and the trait bound on the impl for the ServiceFn struct were changed from FnMut to Fn.
2023-07-31 09:34:53 -04:00
let mut upgraded = TokioIo::new(upgrade.await?);
upgraded.write_all(&bytes).await?;
break upgraded;
}
Err(err) => return Err(err),
}
};
// Stage 3: Pump the data
let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded);
spawn(async move {
let mut buf = [0; 1024];
loop {
let read = upgraded_rx.read(&mut buf).await?;
if read == 0 {
break;
}
read_tx.write_all(&buf[..read]).await?;
}
Ok::<_, AnyError>(())
});
spawn(async move {
let mut buf = [0; 1024];
loop {
let read = write_rx.read(&mut buf).await?;
if read == 0 {
break;
}
upgraded_tx.write_all(&buf[..read]).await?;
}
Ok::<_, AnyError>(())
});
Ok(())
});
Ok(
state
.resource_table
.add(UpgradeStream::new(read_rx, write_tx)),
)
}
#[op2(async)]
#[smi]
pub async fn op_http_upgrade_websocket_next(
state: Rc<RefCell<OpState>>,
#[smi] slab_id: SlabId,
#[serde] headers: Vec<(ByteString, ByteString)>,
) -> Result<ResourceId, AnyError> {
let mut http = slab_get(slab_id);
// Stage 1: set the response to 101 Switching Protocols and send it
let upgrade = http.upgrade()?;
let response = http.response();
*response.status_mut() = StatusCode::SWITCHING_PROTOCOLS;
for (name, value) in headers {
response.headers_mut().append(
HeaderName::from_bytes(&name).unwrap(),
HeaderValue::from_bytes(&value).unwrap(),
);
}
http.complete();
// Stage 2: wait for the request to finish upgrading
let upgraded = upgrade.await?;
// Stage 3: take the extracted raw network stream and upgrade it to a websocket, then return it
let (stream, bytes) = extract_network_stream(upgraded);
ws_create_server_stream(&mut state.borrow_mut(), stream, bytes)
}
#[op2(fast)]
pub fn op_http_set_promise_complete(#[smi] slab_id: SlabId, status: u16) {
let mut http = slab_get(slab_id);
// The Javascript code should never provide a status that is invalid here (see 23_response.js), so we
// will quitely ignore invalid values.
if let Ok(code) = StatusCode::from_u16(status) {
*http.response().status_mut() = code;
}
http.complete();
}
#[op2]
pub fn op_http_get_request_method_and_url<'scope, HTTP>(
scope: &mut v8::HandleScope<'scope>,
#[smi] slab_id: SlabId,
) -> v8::Local<'scope, v8::Array>
where
HTTP: HttpPropertyExtractor,
{
let http = slab_get(slab_id);
let request_info = http.request_info();
let request_parts = http.request_parts();
let request_properties = HTTP::request_properties(
request_info,
&request_parts.uri,
&request_parts.headers,
);
let method: v8::Local<v8::Value> = v8::String::new_from_utf8(
scope,
request_parts.method.as_str().as_bytes(),
v8::NewStringType::Normal,
)
.unwrap()
.into();
let authority: v8::Local<v8::Value> = match request_properties.authority {
Some(authority) => v8::String::new_from_utf8(
scope,
authority.as_ref(),
v8::NewStringType::Normal,
)
.unwrap()
.into(),
None => v8::undefined(scope).into(),
};
// Only extract the path part - we handle authority elsewhere
let path = match &request_parts.uri.path_and_query() {
Some(path_and_query) => path_and_query.to_string(),
None => "".to_owned(),
};
let path: v8::Local<v8::Value> =
v8::String::new_from_utf8(scope, path.as_ref(), v8::NewStringType::Normal)
.unwrap()
.into();
let peer_address: v8::Local<v8::Value> = v8::String::new_from_utf8(
scope,
request_info.peer_address.as_bytes(),
v8::NewStringType::Normal,
)
.unwrap()
.into();
let port: v8::Local<v8::Value> = match request_info.peer_port {
Some(port) => v8::Integer::new(scope, port.into()).into(),
None => v8::undefined(scope).into(),
};
let vec = [method, authority, path, peer_address, port];
v8::Array::new_with_elements(scope, vec.as_slice())
}
#[op2]
#[serde]
pub fn op_http_get_request_header(
#[smi] slab_id: SlabId,
#[string] name: String,
) -> Option<ByteString> {
let http = slab_get(slab_id);
let value = http.request_parts().headers.get(name);
value.map(|value| value.as_bytes().into())
}
#[op2]
pub fn op_http_get_request_headers<'scope>(
scope: &mut v8::HandleScope<'scope>,
#[smi] slab_id: SlabId,
) -> v8::Local<'scope, v8::Array> {
let http = slab_get(slab_id);
let headers = &http.request_parts().headers;
// Two slots for each header key/value pair
let mut vec: SmallVec<[v8::Local<v8::Value>; 32]> =
SmallVec::with_capacity(headers.len() * 2);
let mut cookies: Option<Vec<&[u8]>> = None;
for (name, value) in headers {
if name == COOKIE {
if let Some(ref mut cookies) = cookies {
cookies.push(value.as_bytes());
} else {
cookies = Some(vec![value.as_bytes()]);
}
} else {
vec.push(
v8::String::new_from_one_byte(
scope,
name.as_ref(),
v8::NewStringType::Normal,
)
.unwrap()
.into(),
);
vec.push(
v8::String::new_from_one_byte(
scope,
value.as_bytes(),
v8::NewStringType::Normal,
)
.unwrap()
.into(),
);
}
}
// We treat cookies specially, because we don't want them to get them
// mangled by the `Headers` object in JS. What we do is take all cookie
// headers and concat them into a single cookie header, separated by
// semicolons.
// TODO(mmastrac): This should probably happen on the JS side on-demand
if let Some(cookies) = cookies {
let cookie_sep = "; ".as_bytes();
vec.push(
v8::String::new_external_onebyte_static(scope, COOKIE.as_ref())
.unwrap()
.into(),
);
vec.push(
v8::String::new_from_one_byte(
scope,
cookies.join(cookie_sep).as_ref(),
v8::NewStringType::Normal,
)
.unwrap()
.into(),
);
}
v8::Array::new_with_elements(scope, vec.as_slice())
}
#[op2(fast)]
#[smi]
pub fn op_http_read_request_body(
fix(ext/http): ensure request body resource lives as long as response is alive (#20206) Deno.serve's fast streaming implementation was not keeping the request body resource ID alive. We were taking the `Rc<Resource>` from the resource table during the response, so a hairpin duplex response that fed back the request body would work. However, if any JS code attempted to read from the request body (which requires the resource ID to be valid), the response would fail with a difficult-to-diagnose "EOF" error. This was affecting more complex duplex uses of `Deno.fetch` (though as far as I can tell was unreported). Simple test: ```ts const reader = request.body.getReader(); return new Response( new ReadableStream({ async pull(controller) { const { done, value } = await reader.read(); if (done) { controller.close(); } else { controller.enqueue(value); } }, }), ``` And then attempt to use the stream in duplex mode: ```ts async function testDuplex( reader: ReadableStreamDefaultReader<Uint8Array>, writable: WritableStreamDefaultWriter<Uint8Array>, ) { await writable.write(new Uint8Array([1])); const chunk1 = await reader.read(); assert(!chunk1.done); assertEquals(chunk1.value, new Uint8Array([1])); await writable.write(new Uint8Array([2])); const chunk2 = await reader.read(); assert(!chunk2.done); assertEquals(chunk2.value, new Uint8Array([2])); await writable.close(); const chunk3 = await reader.read(); assert(chunk3.done); } ``` In older versions of Deno, this would just lock up. I believe after 23ff0e722e3c4b0827940853c53c5ee2ede5ec9f, it started throwing a more explicit error: ``` httpServerStreamDuplexJavascript => ./cli/tests/unit/serve_test.ts:1339:6 error: TypeError: request or response body error: error reading a body from connection: Connection reset by peer (os error 54) at async Object.pull (ext:deno_web/06_streams.js:810:27) ```
2023-08-20 21:35:26 -04:00
state: Rc<RefCell<OpState>>,
#[smi] slab_id: SlabId,
) -> ResourceId {
let mut http = slab_get(slab_id);
fix(ext/http): ensure request body resource lives as long as response is alive (#20206) Deno.serve's fast streaming implementation was not keeping the request body resource ID alive. We were taking the `Rc<Resource>` from the resource table during the response, so a hairpin duplex response that fed back the request body would work. However, if any JS code attempted to read from the request body (which requires the resource ID to be valid), the response would fail with a difficult-to-diagnose "EOF" error. This was affecting more complex duplex uses of `Deno.fetch` (though as far as I can tell was unreported). Simple test: ```ts const reader = request.body.getReader(); return new Response( new ReadableStream({ async pull(controller) { const { done, value } = await reader.read(); if (done) { controller.close(); } else { controller.enqueue(value); } }, }), ``` And then attempt to use the stream in duplex mode: ```ts async function testDuplex( reader: ReadableStreamDefaultReader<Uint8Array>, writable: WritableStreamDefaultWriter<Uint8Array>, ) { await writable.write(new Uint8Array([1])); const chunk1 = await reader.read(); assert(!chunk1.done); assertEquals(chunk1.value, new Uint8Array([1])); await writable.write(new Uint8Array([2])); const chunk2 = await reader.read(); assert(!chunk2.done); assertEquals(chunk2.value, new Uint8Array([2])); await writable.close(); const chunk3 = await reader.read(); assert(chunk3.done); } ``` In older versions of Deno, this would just lock up. I believe after 23ff0e722e3c4b0827940853c53c5ee2ede5ec9f, it started throwing a more explicit error: ``` httpServerStreamDuplexJavascript => ./cli/tests/unit/serve_test.ts:1339:6 error: TypeError: request or response body error: error reading a body from connection: Connection reset by peer (os error 54) at async Object.pull (ext:deno_web/06_streams.js:810:27) ```
2023-08-20 21:35:26 -04:00
let rid = if let Some(incoming) = http.take_body() {
let body_resource = Rc::new(HttpRequestBody::new(incoming));
state.borrow_mut().resource_table.add_rc(body_resource)
} else {
// This should not be possible, but rather than panicking we'll return an invalid
// resource value to JavaScript.
ResourceId::MAX
};
http.put_resource(HttpRequestBodyAutocloser::new(rid, state.clone()));
rid
}
#[op2(fast)]
pub fn op_http_set_response_header(
#[smi] slab_id: SlabId,
#[string(onebyte)] name: Cow<[u8]>,
#[string(onebyte)] value: Cow<[u8]>,
) {
let mut http = slab_get(slab_id);
let resp_headers = http.response().headers_mut();
// These are valid latin-1 strings
let name = HeaderName::from_bytes(&name).unwrap();
let value = match value {
Cow::Borrowed(bytes) => HeaderValue::from_bytes(bytes).unwrap(),
// SAFETY: These are valid latin-1 strings
Cow::Owned(bytes_vec) => unsafe {
HeaderValue::from_maybe_shared_unchecked(bytes::Bytes::from(bytes_vec))
},
};
resp_headers.append(name, value);
}
#[op2]
pub fn op_http_set_response_headers(
scope: &mut v8::HandleScope,
#[smi] slab_id: SlabId,
headers: v8::Local<v8::Array>,
) {
let mut http = slab_get(slab_id);
// TODO(mmastrac): Invalid headers should be handled?
let resp_headers = http.response().headers_mut();
let len = headers.length();
let header_len = len * 2;
resp_headers.reserve(header_len.try_into().unwrap());
for i in 0..len {
let item = headers.get_index(scope, i).unwrap();
let pair = v8::Local::<v8::Array>::try_from(item).unwrap();
let name = pair.get_index(scope, 0).unwrap();
let value = pair.get_index(scope, 1).unwrap();
let v8_name: ByteString = from_v8(scope, name).unwrap();
let v8_value: ByteString = from_v8(scope, value).unwrap();
let header_name = HeaderName::from_bytes(&v8_name).unwrap();
let header_value =
// SAFETY: These are valid latin-1 strings
unsafe { HeaderValue::from_maybe_shared_unchecked(v8_value) };
resp_headers.append(header_name, header_value);
}
}
#[op2]
pub fn op_http_set_response_trailers(
#[smi] slab_id: SlabId,
#[serde] trailers: Vec<(ByteString, ByteString)>,
) {
let mut http = slab_get(slab_id);
let mut trailer_map: HeaderMap = HeaderMap::with_capacity(trailers.len());
for (name, value) in trailers {
// These are valid latin-1 strings
let name = HeaderName::from_bytes(&name).unwrap();
// SAFETY: These are valid latin-1 strings
let value = unsafe { HeaderValue::from_maybe_shared_unchecked(value) };
trailer_map.append(name, value);
}
*http.trailers().borrow_mut() = Some(trailer_map);
}
fn is_request_compressible(
length: Option<usize>,
headers: &HeaderMap,
) -> Compression {
if let Some(length) = length {
// By the time we add compression headers and Accept-Encoding, it probably doesn't make sense
// to compress stuff that's smaller than this.
if length < 64 {
return Compression::None;
}
}
let Some(accept_encoding) = headers.get(ACCEPT_ENCODING) else {
return Compression::None;
};
match accept_encoding.to_str().unwrap() {
// Firefox and Chrome send this -- no need to parse
"gzip, deflate, br" => return Compression::Brotli,
"gzip" => return Compression::GZip,
"br" => return Compression::Brotli,
_ => (),
}
// Fall back to the expensive parser
let accepted = fly_accept_encoding::encodings_iter(headers).filter(|r| {
matches!(
r,
Ok((
Some(Encoding::Identity | Encoding::Gzip | Encoding::Brotli),
_
))
)
});
match fly_accept_encoding::preferred(accepted) {
Ok(Some(fly_accept_encoding::Encoding::Gzip)) => Compression::GZip,
Ok(Some(fly_accept_encoding::Encoding::Brotli)) => Compression::Brotli,
_ => Compression::None,
}
}
fn is_response_compressible(headers: &HeaderMap) -> bool {
if let Some(content_type) = headers.get(CONTENT_TYPE) {
if !is_content_compressible(content_type) {
return false;
}
} else {
return false;
}
if headers.contains_key(CONTENT_ENCODING) {
return false;
}
if headers.contains_key(CONTENT_RANGE) {
return false;
}
if let Some(cache_control) = headers.get(CACHE_CONTROL) {
if let Ok(s) = std::str::from_utf8(cache_control.as_bytes()) {
if let Some(cache_control) = CacheControl::from_value(s) {
if cache_control.no_transform {
return false;
}
}
}
}
true
}
fn modify_compressibility_from_response(
compression: Compression,
headers: &mut HeaderMap,
) -> Compression {
ensure_vary_accept_encoding(headers);
if compression == Compression::None {
return Compression::None;
}
if !is_response_compressible(headers) {
return Compression::None;
}
let encoding = match compression {
Compression::Brotli => "br",
Compression::GZip => "gzip",
_ => unreachable!(),
};
weaken_etag(headers);
headers.remove(CONTENT_LENGTH);
headers.insert(CONTENT_ENCODING, HeaderValue::from_static(encoding));
compression
}
/// If the user provided a ETag header for uncompressed data, we need to ensure it is a
/// weak Etag header ("W/").
fn weaken_etag(hmap: &mut HeaderMap) {
if let Some(etag) = hmap.get_mut(hyper::header::ETAG) {
if !etag.as_bytes().starts_with(b"W/") {
let mut v = Vec::with_capacity(etag.as_bytes().len() + 2);
v.extend(b"W/");
v.extend(etag.as_bytes());
*etag = v.try_into().unwrap();
}
}
}
// Set Vary: Accept-Encoding header for direct body response.
// Note: we set the header irrespective of whether or not we compress the data
// to make sure cache services do not serve uncompressed data to clients that
// support compression.
fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) {
if let Some(v) = hmap.get_mut(hyper::header::VARY) {
if let Ok(s) = v.to_str() {
if !s.to_lowercase().contains("accept-encoding") {
*v = format!("Accept-Encoding, {s}").try_into().unwrap()
}
return;
}
}
hmap.insert(
hyper::header::VARY,
HeaderValue::from_static("Accept-Encoding"),
);
}
/// Sets the appropriate response body. Use `force_instantiate_body` if you need
/// to ensure that the response is cleaned up correctly (eg: for resources).
fn set_response(
slab_id: SlabId,
length: Option<usize>,
status: u16,
force_instantiate_body: bool,
response_fn: impl FnOnce(Compression) -> ResponseBytesInner,
) {
let mut http = slab_get(slab_id);
// The request may have been cancelled by this point and if so, there's no need for us to
// do all of this work to send the response.
if !http.cancelled() {
let resource = http.take_resource();
let compression =
is_request_compressible(length, &http.request_parts().headers);
let response = http.response();
let compression =
modify_compressibility_from_response(compression, response.headers_mut());
response
.body_mut()
.initialize(response_fn(compression), resource);
// The Javascript code should never provide a status that is invalid here (see 23_response.js), so we
// will quitely ignore invalid values.
if let Ok(code) = StatusCode::from_u16(status) {
*response.status_mut() = code;
}
} else if force_instantiate_body {
response_fn(Compression::None).abort();
}
http.complete();
}
#[op2(fast)]
pub fn op_http_set_response_body_resource(
state: Rc<RefCell<OpState>>,
#[smi] slab_id: SlabId,
#[smi] stream_rid: ResourceId,
auto_close: bool,
status: u16,
) -> Result<(), AnyError> {
// IMPORTANT: We might end up requiring the OpState lock in set_response if we need to drop the request
// body resource so we _cannot_ hold the OpState lock longer than necessary.
// If the stream is auto_close, we will hold the last ref to it until the response is complete.
// TODO(mmastrac): We should be using the same auto-close functionality rather than removing autoclose resources.
// It's possible things could fail elsewhere if code expects the rid to continue existing after the response has been
// returned.
let resource = {
let mut state = state.borrow_mut();
if auto_close {
state.resource_table.take_any(stream_rid)?
} else {
state.resource_table.get_any(stream_rid)?
}
};
set_response(
slab_id,
resource.size_hint().1.map(|s| s as usize),
status,
true,
move |compression| {
ResponseBytesInner::from_resource(compression, resource, auto_close)
},
);
Ok(())
}
#[op2(fast)]
pub fn op_http_set_response_body_text(
#[smi] slab_id: SlabId,
#[string] text: String,
status: u16,
) {
if !text.is_empty() {
set_response(slab_id, Some(text.len()), status, false, |compression| {
ResponseBytesInner::from_vec(compression, text.into_bytes())
});
} else {
op_http_set_promise_complete::call(slab_id, status);
}
}
#[op2(fast)]
pub fn op_http_set_response_body_bytes(
#[smi] slab_id: SlabId,
#[buffer] buffer: JsBuffer,
status: u16,
) {
if !buffer.is_empty() {
set_response(slab_id, Some(buffer.len()), status, false, |compression| {
ResponseBytesInner::from_bufview(compression, BufView::from(buffer))
});
} else {
op_http_set_promise_complete::call(slab_id, status);
}
}
#[op2(async)]
pub async fn op_http_track(
state: Rc<RefCell<OpState>>,
#[smi] slab_id: SlabId,
#[smi] server_rid: ResourceId,
) -> Result<(), AnyError> {
let http = slab_get(slab_id);
let handle = http.body_promise();
let join_handle = state
.borrow_mut()
.resource_table
.get::<HttpJoinHandle>(server_rid)?;
match handle
.or_cancel(join_handle.connection_cancel_handle())
.await
{
Ok(true) => Ok(()),
Ok(false) => {
Err(AnyError::msg("connection closed before message completed"))
}
Err(_e) => Ok(()),
}
}
fn serve_http11_unconditional(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
cancel: Rc<CancelHandle>,
) -> impl Future<Output = Result<(), hyper1::Error>> + 'static {
let conn = http1::Builder::new()
.keep_alive(true)
.writev(*USE_WRITEV)
.serve_connection(TokioIo::new(io), svc)
.with_upgrades();
async {
match conn.or_abort(cancel).await {
Err(mut conn) => {
Pin::new(&mut conn).graceful_shutdown();
conn.await
}
Ok(res) => res,
}
}
}
fn serve_http2_unconditional(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
cancel: Rc<CancelHandle>,
) -> impl Future<Output = Result<(), hyper1::Error>> + 'static {
feat(ext/http): Upgrade to hyper1.0-rc4 (#19987) Includes a lightly-modified version of hyper-util's `TokioIo` utility. Hyper changes: v1.0.0-rc.4 (2023-07-10) Bug Fixes http1: http1 server graceful shutdown fix (#3261) ([f4b51300](https://github.com/hyperium/hyper/commit/f4b513009d81083081d1c60c1981847bbb17dd5d)) send error on Incoming body when connection errors (#3256) ([52f19259](https://github.com/hyperium/hyper/commit/52f192593fb9ebcf6d3894e0c85cbf710da4decd), closes https://github.com/hyperium/hyper/issues/3253) properly end chunked bodies when it was known to be empty (#3254) ([fec64cf0](https://github.com/hyperium/hyper/commit/fec64cf0abdc678e30ca5f1b310c5118b2e01999), closes https://github.com/hyperium/hyper/issues/3252) Features client: Make clients able to use non-Send executor (#3184) ([d977f209](https://github.com/hyperium/hyper/commit/d977f209bc6068d8f878b22803fc42d90c887fcc), closes https://github.com/hyperium/hyper/issues/3017) rt: replace IO traits with hyper::rt ones (#3230) ([f9f65b7a](https://github.com/hyperium/hyper/commit/f9f65b7aa67fa3ec0267fe015945973726285bc2), closes https://github.com/hyperium/hyper/issues/3110) add downcast on Sleep trait (#3125) ([d92d3917](https://github.com/hyperium/hyper/commit/d92d3917d950e4c61c37c2170f3ce273d2a0f7d1), closes https://github.com/hyperium/hyper/issues/3027) service: change Service::call to take &self (#3223) ([d894439e](https://github.com/hyperium/hyper/commit/d894439e009aa75103f6382a7ba98fb17da72f02), closes https://github.com/hyperium/hyper/issues/3040) Breaking Changes Any IO transport type provided must not implement hyper::rt::{Read, Write} instead of tokio::io traits. You can grab a helper type from hyper-util to wrap Tokio types, or implement the traits yourself, if it's a custom type. ([f9f65b7a](https://github.com/hyperium/hyper/commit/f9f65b7aa67fa3ec0267fe015945973726285bc2)) client::conn::http2 types now use another generic for an Executor. Code that names Connection needs to include the additional generic parameter. ([d977f209](https://github.com/hyperium/hyper/commit/d977f209bc6068d8f878b22803fc42d90c887fcc)) The Service::call function no longer takes a mutable reference to self. The FnMut trait bound on the service::util::service_fn function and the trait bound on the impl for the ServiceFn struct were changed from FnMut to Fn.
2023-07-31 09:34:53 -04:00
let conn =
http2::Builder::new(LocalExecutor).serve_connection(TokioIo::new(io), svc);
async {
match conn.or_abort(cancel).await {
Err(mut conn) => {
Pin::new(&mut conn).graceful_shutdown();
conn.await
}
Ok(res) => res,
}
}
}
async fn serve_http2_autodetect(
io: impl HttpServeStream,
svc: impl HttpService<Incoming, ResBody = ResponseBytes> + 'static,
cancel: Rc<CancelHandle>,
) -> Result<(), AnyError> {
let prefix = NetworkStreamPrefixCheck::new(io, HTTP2_PREFIX);
let (matches, io) = prefix.match_prefix().await?;
if matches {
serve_http2_unconditional(io, svc, cancel)
.await
.map_err(|e| e.into())
} else {
serve_http11_unconditional(io, svc, cancel)
.await
.map_err(|e| e.into())
}
}
fn serve_https(
mut io: TlsStream,
request_info: HttpConnectionProperties,
lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<SlabId>,
) -> JoinHandle<Result<(), AnyError>> {
let HttpLifetime {
refcount,
connection_cancel_handle,
listen_cancel_handle,
} = lifetime;
let svc = service_fn(move |req: Request| {
new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone())
});
spawn(
async {
io.handshake().await?;
// If the client specifically negotiates a protocol, we will use it. If not, we'll auto-detect
// based on the prefix bytes
let handshake = io.get_ref().1.alpn_protocol();
if handshake == Some(TLS_ALPN_HTTP_2) {
serve_http2_unconditional(io, svc, listen_cancel_handle)
.await
.map_err(|e| e.into())
} else if handshake == Some(TLS_ALPN_HTTP_11) {
serve_http11_unconditional(io, svc, listen_cancel_handle)
.await
.map_err(|e| e.into())
} else {
serve_http2_autodetect(io, svc, listen_cancel_handle).await
}
}
.try_or_cancel(connection_cancel_handle),
)
}
fn serve_http(
io: impl HttpServeStream,
request_info: HttpConnectionProperties,
lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<SlabId>,
) -> JoinHandle<Result<(), AnyError>> {
let HttpLifetime {
refcount,
connection_cancel_handle,
listen_cancel_handle,
} = lifetime;
let svc = service_fn(move |req: Request| {
new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone())
});
spawn(
serve_http2_autodetect(io, svc, listen_cancel_handle)
.try_or_cancel(connection_cancel_handle),
)
}
fn serve_http_on<HTTP>(
connection: HTTP::Connection,
listen_properties: &HttpListenProperties,
lifetime: HttpLifetime,
tx: tokio::sync::mpsc::Sender<SlabId>,
) -> JoinHandle<Result<(), AnyError>>
where
HTTP: HttpPropertyExtractor,
{
let connection_properties: HttpConnectionProperties =
HTTP::connection_properties(listen_properties, &connection);
let network_stream = HTTP::to_network_stream_from_connection(connection);
match network_stream {
NetworkStream::Tcp(conn) => {
serve_http(conn, connection_properties, lifetime, tx)
}
NetworkStream::Tls(conn) => {
serve_https(conn, connection_properties, lifetime, tx)
}
#[cfg(unix)]
NetworkStream::Unix(conn) => {
serve_http(conn, connection_properties, lifetime, tx)
}
}
}
#[derive(Clone)]
struct HttpLifetime {
connection_cancel_handle: Rc<CancelHandle>,
listen_cancel_handle: Rc<CancelHandle>,
refcount: RefCount,
}
struct HttpJoinHandle {
join_handle: AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
connection_cancel_handle: Rc<CancelHandle>,
listen_cancel_handle: Rc<CancelHandle>,
rx: AsyncRefCell<tokio::sync::mpsc::Receiver<SlabId>>,
refcount: RefCount,
}
impl HttpJoinHandle {
fn new(rx: tokio::sync::mpsc::Receiver<SlabId>) -> Self {
Self {
join_handle: AsyncRefCell::new(None),
connection_cancel_handle: CancelHandle::new_rc(),
listen_cancel_handle: CancelHandle::new_rc(),
rx: AsyncRefCell::new(rx),
refcount: RefCount::default(),
}
}
fn lifetime(self: &Rc<Self>) -> HttpLifetime {
HttpLifetime {
connection_cancel_handle: self.connection_cancel_handle.clone(),
listen_cancel_handle: self.listen_cancel_handle.clone(),
refcount: self.refcount.clone(),
}
}
fn connection_cancel_handle(self: &Rc<Self>) -> Rc<CancelHandle> {
self.connection_cancel_handle.clone()
}
fn listen_cancel_handle(self: &Rc<Self>) -> Rc<CancelHandle> {
self.listen_cancel_handle.clone()
}
}
impl Resource for HttpJoinHandle {
fn name(&self) -> Cow<str> {
"http".into()
}
fn close(self: Rc<Self>) {
// During a close operation, we cancel everything
self.connection_cancel_handle.cancel();
self.listen_cancel_handle.cancel();
}
}
impl Drop for HttpJoinHandle {
fn drop(&mut self) {
// In some cases we may be dropped without closing, so let's cancel everything on the way out
self.connection_cancel_handle.cancel();
self.listen_cancel_handle.cancel();
}
}
#[op2]
#[serde]
pub fn op_http_serve<HTTP>(
state: Rc<RefCell<OpState>>,
#[smi] listener_rid: ResourceId,
) -> Result<(ResourceId, &'static str, String), AnyError>
where
HTTP: HttpPropertyExtractor,
{
slab_init();
let listener =
HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid)?;
let listen_properties = HTTP::listen_properties_from_listener(&listener)?;
let (tx, rx) = tokio::sync::mpsc::channel(10);
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle::new(rx));
let listen_cancel_clone = resource.listen_cancel_handle();
let lifetime = resource.lifetime();
let listen_properties_clone: HttpListenProperties = listen_properties.clone();
let handle = spawn(async move {
loop {
let conn = HTTP::accept_connection_from_listener(&listener)
.try_or_cancel(listen_cancel_clone.clone())
.await?;
serve_http_on::<HTTP>(
conn,
&listen_properties_clone,
lifetime.clone(),
tx.clone(),
);
}
#[allow(unreachable_code)]
Ok::<_, AnyError>(())
});
// Set the handle after we start the future
*RcRef::map(&resource, |this| &this.join_handle)
.try_borrow_mut()
.unwrap() = Some(handle);
Ok((
state.borrow_mut().resource_table.add_rc(resource),
listen_properties.scheme,
listen_properties.fallback_host,
))
}
#[op2]
#[serde]
pub fn op_http_serve_on<HTTP>(
state: Rc<RefCell<OpState>>,
#[smi] connection_rid: ResourceId,
) -> Result<(ResourceId, &'static str, String), AnyError>
where
HTTP: HttpPropertyExtractor,
{
slab_init();
let connection =
HTTP::get_connection_for_rid(&mut state.borrow_mut(), connection_rid)?;
let listen_properties = HTTP::listen_properties_from_connection(&connection)?;
let (tx, rx) = tokio::sync::mpsc::channel(10);
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle::new(rx));
let handle: JoinHandle<Result<(), deno_core::anyhow::Error>> =
serve_http_on::<HTTP>(
connection,
&listen_properties,
resource.lifetime(),
tx,
);
// Set the handle after we start the future
*RcRef::map(&resource, |this| &this.join_handle)
.try_borrow_mut()
.unwrap() = Some(handle);
Ok((
state.borrow_mut().resource_table.add_rc(resource),
listen_properties.scheme,
listen_properties.fallback_host,
))
}
/// Synchronous, non-blocking call to see if there are any further HTTP requests. If anything
/// goes wrong in this method we return [`SlabId::MAX`] and let the async handler pick up the real error.
#[op2(fast)]
#[smi]
pub fn op_http_try_wait(state: &mut OpState, #[smi] rid: ResourceId) -> SlabId {
// The resource needs to exist.
let Ok(join_handle) = state.resource_table.get::<HttpJoinHandle>(rid) else {
return SlabId::MAX;
};
// If join handle is somehow locked, just abort.
let Some(mut handle) =
RcRef::map(&join_handle, |this| &this.rx).try_borrow_mut()
else {
return SlabId::MAX;
};
// See if there are any requests waiting on this channel. If not, return.
let Ok(id) = handle.try_recv() else {
return SlabId::MAX;
};
id
}
#[op2(async)]
#[smi]
pub async fn op_http_wait(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
) -> Result<SlabId, AnyError> {
// We will get the join handle initially, as we might be consuming requests still
let join_handle = state
.borrow_mut()
.resource_table
.get::<HttpJoinHandle>(rid)?;
let cancel = join_handle.listen_cancel_handle();
let next = async {
let mut recv = RcRef::map(&join_handle, |this| &this.rx).borrow_mut().await;
recv.recv().await
}
.or_cancel(cancel)
.unwrap_or_else(|_| None)
.await;
// Do we have a request?
if let Some(req) = next {
return Ok(req);
}
// No - we're shutting down
let res = RcRef::map(join_handle, |this| &this.join_handle)
.borrow_mut()
.await
.take()
.unwrap()
.await?;
// Filter out shutdown (ENOTCONN) errors
if let Err(err) = res {
if let Some(err) = err.source() {
if let Some(err) = err.downcast_ref::<io::Error>() {
if err.kind() == io::ErrorKind::NotConnected {
return Ok(SlabId::MAX);
}
}
}
return Err(err);
}
Ok(SlabId::MAX)
}
/// Cancels the HTTP handle.
#[op2(fast)]
pub fn op_http_cancel(
state: &mut OpState,
#[smi] rid: ResourceId,
graceful: bool,
) -> Result<(), AnyError> {
let join_handle = state.resource_table.get::<HttpJoinHandle>(rid)?;
if graceful {
// In a graceful shutdown, we close the listener and allow all the remaining connections to drain
join_handle.listen_cancel_handle().cancel();
} else {
// In a forceful shutdown, we close everything
join_handle.listen_cancel_handle().cancel();
join_handle.connection_cancel_handle().cancel();
}
Ok(())
}
#[op2(async)]
pub async fn op_http_close(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
graceful: bool,
) -> Result<(), AnyError> {
let join_handle = state
.borrow_mut()
.resource_table
.take::<HttpJoinHandle>(rid)?;
if graceful {
// TODO(bartlomieju): replace with `state.feature_checker.check_or_exit`
// once we phase out `check_or_exit_with_legacy_fallback`
state
.borrow()
.feature_checker
.check_or_exit_with_legacy_fallback(
UNSTABLE_FEATURE_NAME,
"Deno.Server.shutdown",
);
// In a graceful shutdown, we close the listener and allow all the remaining connections to drain
join_handle.listen_cancel_handle().cancel();
} else {
// In a forceful shutdown, we close everything
join_handle.listen_cancel_handle().cancel();
join_handle.connection_cancel_handle().cancel();
}
// Async spin on the refcount while we wait for everything to drain
while Rc::strong_count(&join_handle.refcount.0) > 1 {
tokio::time::sleep(Duration::from_millis(10)).await;
}
let mut join_handle = RcRef::map(&join_handle, |this| &this.join_handle)
.borrow_mut()
.await;
if let Some(join_handle) = join_handle.take() {
join_handle.await??;
}
Ok(())
}
struct UpgradeStream {
read: AsyncRefCell<tokio::io::ReadHalf<tokio::io::DuplexStream>>,
write: AsyncRefCell<tokio::io::WriteHalf<tokio::io::DuplexStream>>,
cancel_handle: CancelHandle,
}
impl UpgradeStream {
pub fn new(
read: tokio::io::ReadHalf<tokio::io::DuplexStream>,
write: tokio::io::WriteHalf<tokio::io::DuplexStream>,
) -> Self {
Self {
read: AsyncRefCell::new(read),
write: AsyncRefCell::new(write),
cancel_handle: CancelHandle::new(),
}
}
async fn read(self: Rc<Self>, buf: &mut [u8]) -> Result<usize, AnyError> {
let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
async {
let read = RcRef::map(self, |this| &this.read);
let mut read = read.borrow_mut().await;
Ok(Pin::new(&mut *read).read(buf).await?)
}
.try_or_cancel(cancel_handle)
.await
}
async fn write(self: Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> {
let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle);
async {
let write = RcRef::map(self, |this| &this.write);
let mut write = write.borrow_mut().await;
Ok(Pin::new(&mut *write).write(buf).await?)
}
.try_or_cancel(cancel_handle)
.await
}
async fn write_vectored(
self: Rc<Self>,
buf1: &[u8],
buf2: &[u8],
) -> Result<usize, AnyError> {
let mut wr = RcRef::map(self, |r| &r.write).borrow_mut().await;
let total = buf1.len() + buf2.len();
let mut bufs = [std::io::IoSlice::new(buf1), std::io::IoSlice::new(buf2)];
let mut nwritten = wr.write_vectored(&bufs).await?;
if nwritten == total {
return Ok(nwritten);
}
// Slightly more optimized than (unstable) write_all_vectored for 2 iovecs.
while nwritten <= buf1.len() {
bufs[0] = std::io::IoSlice::new(&buf1[nwritten..]);
nwritten += wr.write_vectored(&bufs).await?;
}
// First buffer out of the way.
if nwritten < total && nwritten > buf1.len() {
wr.write_all(&buf2[nwritten - buf1.len()..]).await?;
}
Ok(total)
}
}
impl Resource for UpgradeStream {
fn name(&self) -> Cow<str> {
"httpRawUpgradeStream".into()
}
deno_core::impl_readable_byob!();
deno_core::impl_writable!();
fn close(self: Rc<Self>) {
self.cancel_handle.cancel();
}
}
#[op2(fast)]
pub fn op_can_write_vectored(
state: &mut OpState,
#[smi] rid: ResourceId,
) -> bool {
state.resource_table.get::<UpgradeStream>(rid).is_ok()
}
#[op2(async)]
#[number]
pub async fn op_raw_write_vectored(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
#[buffer] buf1: JsBuffer,
#[buffer] buf2: JsBuffer,
) -> Result<usize, AnyError> {
let resource: Rc<UpgradeStream> =
state.borrow().resource_table.get::<UpgradeStream>(rid)?;
let nwritten = resource.write_vectored(&buf1, &buf2).await?;
Ok(nwritten)
}