mirror of
https://github.com/denoland/deno.git
synced 2024-11-01 09:24:20 -04:00
20c7300412
We can use Resource::read_return & op_read instead. This allows HTTP request bodies to participate in FastStream. To make this work, `readableStreamForRid` required a change to allow non auto-closing resources to be handled. This required some minor changes in our FastStream paths in ext/http and ext/flash.
1484 lines
42 KiB
Rust
1484 lines
42 KiB
Rust
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
|
|
|
|
// False positive lint for explicit drops.
|
|
// https://github.com/rust-lang/rust-clippy/issues/6446
|
|
#![allow(clippy::await_holding_lock)]
|
|
// https://github.com/rust-lang/rust-clippy/issues/6353
|
|
#![allow(clippy::await_holding_refcell_ref)]
|
|
|
|
use deno_core::error::generic_error;
|
|
use deno_core::error::type_error;
|
|
use deno_core::error::AnyError;
|
|
use deno_core::op;
|
|
use deno_core::serde_v8;
|
|
use deno_core::v8;
|
|
use deno_core::v8::fast_api;
|
|
use deno_core::ByteString;
|
|
use deno_core::CancelFuture;
|
|
use deno_core::CancelHandle;
|
|
use deno_core::Extension;
|
|
use deno_core::OpState;
|
|
use deno_core::StringOrBuffer;
|
|
use deno_core::ZeroCopyBuf;
|
|
use deno_core::V8_WRAPPER_OBJECT_INDEX;
|
|
use deno_tls::load_certs;
|
|
use deno_tls::load_private_keys;
|
|
use http::header::HeaderName;
|
|
use http::header::CONNECTION;
|
|
use http::header::CONTENT_LENGTH;
|
|
use http::header::EXPECT;
|
|
use http::header::TRANSFER_ENCODING;
|
|
use http::HeaderValue;
|
|
use log::trace;
|
|
use mio::net::TcpListener;
|
|
use mio::Events;
|
|
use mio::Interest;
|
|
use mio::Poll;
|
|
use mio::Token;
|
|
use serde::Deserialize;
|
|
use serde::Serialize;
|
|
use socket2::Socket;
|
|
use std::cell::RefCell;
|
|
use std::cell::UnsafeCell;
|
|
use std::collections::HashMap;
|
|
use std::ffi::c_void;
|
|
use std::future::Future;
|
|
use std::intrinsics::transmute;
|
|
use std::io::BufReader;
|
|
use std::io::Read;
|
|
use std::io::Write;
|
|
use std::mem::replace;
|
|
use std::net::SocketAddr;
|
|
use std::net::ToSocketAddrs;
|
|
use std::pin::Pin;
|
|
use std::rc::Rc;
|
|
use std::sync::Arc;
|
|
use std::sync::Mutex;
|
|
use std::task::Context;
|
|
use std::time::Duration;
|
|
use tokio::sync::mpsc;
|
|
use tokio::task::JoinHandle;
|
|
|
|
mod chunked;
|
|
mod request;
|
|
#[cfg(unix)]
|
|
mod sendfile;
|
|
mod socket;
|
|
|
|
use request::InnerRequest;
|
|
use request::Request;
|
|
use socket::InnerStream;
|
|
use socket::Stream;
|
|
|
|
pub struct FlashContext {
|
|
next_server_id: u32,
|
|
join_handles: HashMap<u32, JoinHandle<Result<(), AnyError>>>,
|
|
pub servers: HashMap<u32, ServerContext>,
|
|
}
|
|
|
|
pub struct ServerContext {
|
|
_addr: SocketAddr,
|
|
tx: mpsc::Sender<Request>,
|
|
rx: mpsc::Receiver<Request>,
|
|
requests: HashMap<u32, Request>,
|
|
next_token: u32,
|
|
listening_rx: Option<mpsc::Receiver<u16>>,
|
|
close_tx: mpsc::Sender<()>,
|
|
cancel_handle: Rc<CancelHandle>,
|
|
}
|
|
|
|
#[derive(Debug, Eq, PartialEq)]
|
|
pub enum ParseStatus {
|
|
None,
|
|
Ongoing(usize),
|
|
}
|
|
|
|
#[op]
|
|
fn op_flash_respond(
|
|
op_state: &mut OpState,
|
|
server_id: u32,
|
|
token: u32,
|
|
response: StringOrBuffer,
|
|
shutdown: bool,
|
|
) -> u32 {
|
|
let flash_ctx = op_state.borrow_mut::<FlashContext>();
|
|
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
|
|
flash_respond(ctx, token, shutdown, &response)
|
|
}
|
|
|
|
#[op]
|
|
async fn op_flash_respond_async(
|
|
state: Rc<RefCell<OpState>>,
|
|
server_id: u32,
|
|
token: u32,
|
|
response: StringOrBuffer,
|
|
shutdown: bool,
|
|
) -> Result<(), AnyError> {
|
|
trace!("op_flash_respond_async");
|
|
|
|
let mut close = false;
|
|
let sock = {
|
|
let mut op_state = state.borrow_mut();
|
|
let flash_ctx = op_state.borrow_mut::<FlashContext>();
|
|
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
|
|
|
|
match shutdown {
|
|
true => {
|
|
let tx = ctx.requests.remove(&token).unwrap();
|
|
close = !tx.keep_alive;
|
|
tx.socket()
|
|
}
|
|
// In case of a websocket upgrade or streaming response.
|
|
false => {
|
|
let tx = ctx.requests.get(&token).unwrap();
|
|
tx.socket()
|
|
}
|
|
}
|
|
};
|
|
|
|
sock
|
|
.with_async_stream(|stream| {
|
|
Box::pin(async move {
|
|
Ok(tokio::io::AsyncWriteExt::write(stream, &response).await?)
|
|
})
|
|
})
|
|
.await?;
|
|
// server is done writing and request doesn't want to kept alive.
|
|
if shutdown && close {
|
|
sock.shutdown();
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
#[op]
|
|
async fn op_flash_respond_chuncked(
|
|
op_state: Rc<RefCell<OpState>>,
|
|
server_id: u32,
|
|
token: u32,
|
|
response: Option<ZeroCopyBuf>,
|
|
shutdown: bool,
|
|
) -> Result<(), AnyError> {
|
|
let mut op_state = op_state.borrow_mut();
|
|
let flash_ctx = op_state.borrow_mut::<FlashContext>();
|
|
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
|
|
let sock = match shutdown {
|
|
true => {
|
|
let tx = ctx.requests.remove(&token).unwrap();
|
|
tx.socket()
|
|
}
|
|
// In case of a websocket upgrade or streaming response.
|
|
false => {
|
|
let tx = ctx.requests.get(&token).unwrap();
|
|
tx.socket()
|
|
}
|
|
};
|
|
|
|
drop(op_state);
|
|
sock
|
|
.with_async_stream(|stream| {
|
|
Box::pin(async move {
|
|
use tokio::io::AsyncWriteExt;
|
|
if let Some(response) = response {
|
|
stream
|
|
.write_all(format!("{:x}\r\n", response.len()).as_bytes())
|
|
.await?;
|
|
stream.write_all(&response).await?;
|
|
stream.write_all(b"\r\n").await?;
|
|
}
|
|
|
|
// The last chunk
|
|
if shutdown {
|
|
stream.write_all(b"0\r\n\r\n").await?;
|
|
}
|
|
|
|
Ok(())
|
|
})
|
|
})
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
#[op]
|
|
async fn op_flash_write_resource(
|
|
op_state: Rc<RefCell<OpState>>,
|
|
response: StringOrBuffer,
|
|
server_id: u32,
|
|
token: u32,
|
|
resource_id: deno_core::ResourceId,
|
|
auto_close: bool,
|
|
) -> Result<(), AnyError> {
|
|
let (resource, sock) = {
|
|
let op_state = &mut op_state.borrow_mut();
|
|
let resource = if auto_close {
|
|
op_state.resource_table.take_any(resource_id)?
|
|
} else {
|
|
op_state.resource_table.get_any(resource_id)?
|
|
};
|
|
let flash_ctx = op_state.borrow_mut::<FlashContext>();
|
|
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
|
|
(resource, ctx.requests.remove(&token).unwrap().socket())
|
|
};
|
|
|
|
let _ = sock.write(&response);
|
|
|
|
#[cfg(unix)]
|
|
{
|
|
use std::os::unix::io::AsRawFd;
|
|
if let InnerStream::Tcp(stream_handle) = &sock.inner {
|
|
let stream_handle = stream_handle.as_raw_fd();
|
|
if let Some(fd) = resource.clone().backing_fd() {
|
|
// SAFETY: all-zero byte-pattern is a valid value for libc::stat.
|
|
let mut stat: libc::stat = unsafe { std::mem::zeroed() };
|
|
// SAFETY: call to libc::fstat.
|
|
if unsafe { libc::fstat(fd, &mut stat) } >= 0 {
|
|
let _ = sock.write(
|
|
format!("Content-Length: {}\r\n\r\n", stat.st_size).as_bytes(),
|
|
);
|
|
let tx = sendfile::SendFile {
|
|
io: (fd, stream_handle),
|
|
written: 0,
|
|
};
|
|
tx.await?;
|
|
return Ok(());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
sock
|
|
.with_async_stream(|stream| {
|
|
Box::pin(async move {
|
|
use tokio::io::AsyncWriteExt;
|
|
stream
|
|
.write_all(b"Transfer-Encoding: chunked\r\n\r\n")
|
|
.await?;
|
|
loop {
|
|
let vec = vec![0u8; 64 * 1024]; // 64KB
|
|
let buf = ZeroCopyBuf::new_temp(vec);
|
|
let (nread, buf) = resource.clone().read_return(buf).await?;
|
|
if nread == 0 {
|
|
stream.write_all(b"0\r\n\r\n").await?;
|
|
break;
|
|
}
|
|
|
|
let response = &buf[..nread];
|
|
// TODO(@littledivy): use vectored writes.
|
|
stream
|
|
.write_all(format!("{:x}\r\n", response.len()).as_bytes())
|
|
.await?;
|
|
stream.write_all(response).await?;
|
|
stream.write_all(b"\r\n").await?;
|
|
}
|
|
resource.close();
|
|
Ok(())
|
|
})
|
|
})
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
pub struct RespondFast;
|
|
|
|
impl fast_api::FastFunction for RespondFast {
|
|
fn function(&self) -> *const c_void {
|
|
op_flash_respond_fast as *const c_void
|
|
}
|
|
|
|
fn args(&self) -> &'static [fast_api::Type] {
|
|
&[
|
|
fast_api::Type::V8Value,
|
|
fast_api::Type::Uint32,
|
|
fast_api::Type::TypedArray(fast_api::CType::Uint8),
|
|
fast_api::Type::Bool,
|
|
]
|
|
}
|
|
|
|
fn return_type(&self) -> fast_api::CType {
|
|
fast_api::CType::Uint32
|
|
}
|
|
}
|
|
|
|
fn flash_respond(
|
|
ctx: &mut ServerContext,
|
|
token: u32,
|
|
shutdown: bool,
|
|
response: &[u8],
|
|
) -> u32 {
|
|
let tx = ctx.requests.get(&token).unwrap();
|
|
let sock = tx.socket();
|
|
|
|
sock.read_tx.take();
|
|
sock.read_rx.take();
|
|
|
|
let nwritten = sock.try_write(response);
|
|
|
|
if shutdown && nwritten == response.len() {
|
|
if !tx.keep_alive {
|
|
sock.shutdown();
|
|
}
|
|
ctx.requests.remove(&token).unwrap();
|
|
}
|
|
|
|
nwritten as u32
|
|
}
|
|
|
|
unsafe fn op_flash_respond_fast(
|
|
recv: v8::Local<v8::Object>,
|
|
token: u32,
|
|
response: *const fast_api::FastApiTypedArray<u8>,
|
|
shutdown: bool,
|
|
) -> u32 {
|
|
let ptr =
|
|
recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX);
|
|
let ctx = &mut *(ptr as *mut ServerContext);
|
|
|
|
let response = &*response;
|
|
if let Some(response) = response.get_storage_if_aligned() {
|
|
flash_respond(ctx, token, shutdown, response)
|
|
} else {
|
|
todo!();
|
|
}
|
|
}
|
|
|
|
macro_rules! get_request {
|
|
($op_state: ident, $token: ident) => {
|
|
get_request!($op_state, 0, $token)
|
|
};
|
|
($op_state: ident, $server_id: expr, $token: ident) => {{
|
|
let flash_ctx = $op_state.borrow_mut::<FlashContext>();
|
|
let ctx = flash_ctx.servers.get_mut(&$server_id).unwrap();
|
|
ctx.requests.get_mut(&$token).unwrap()
|
|
}};
|
|
}
|
|
|
|
#[repr(u32)]
|
|
pub enum Method {
|
|
GET = 0,
|
|
HEAD,
|
|
CONNECT,
|
|
PUT,
|
|
DELETE,
|
|
OPTIONS,
|
|
TRACE,
|
|
POST,
|
|
PATCH,
|
|
}
|
|
|
|
#[inline]
|
|
fn get_method(req: &mut Request) -> u32 {
|
|
let method = match req.method() {
|
|
"GET" => Method::GET,
|
|
"POST" => Method::POST,
|
|
"PUT" => Method::PUT,
|
|
"DELETE" => Method::DELETE,
|
|
"OPTIONS" => Method::OPTIONS,
|
|
"HEAD" => Method::HEAD,
|
|
"PATCH" => Method::PATCH,
|
|
"TRACE" => Method::TRACE,
|
|
"CONNECT" => Method::CONNECT,
|
|
_ => Method::GET,
|
|
};
|
|
method as u32
|
|
}
|
|
|
|
#[op]
|
|
fn op_flash_method(state: &mut OpState, server_id: u32, token: u32) -> u32 {
|
|
let req = get_request!(state, server_id, token);
|
|
get_method(req)
|
|
}
|
|
|
|
#[op]
|
|
async fn op_flash_close_server(state: Rc<RefCell<OpState>>, server_id: u32) {
|
|
let close_tx = {
|
|
let mut op_state = state.borrow_mut();
|
|
let flash_ctx = op_state.borrow_mut::<FlashContext>();
|
|
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
|
|
ctx.cancel_handle.cancel();
|
|
ctx.close_tx.clone()
|
|
};
|
|
let _ = close_tx.send(()).await;
|
|
}
|
|
|
|
#[op]
|
|
fn op_flash_path(
|
|
state: Rc<RefCell<OpState>>,
|
|
server_id: u32,
|
|
token: u32,
|
|
) -> String {
|
|
let mut op_state = state.borrow_mut();
|
|
let flash_ctx = op_state.borrow_mut::<FlashContext>();
|
|
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
|
|
ctx
|
|
.requests
|
|
.get(&token)
|
|
.unwrap()
|
|
.inner
|
|
.req
|
|
.path
|
|
.unwrap()
|
|
.to_string()
|
|
}
|
|
|
|
#[inline]
|
|
fn next_request_sync(ctx: &mut ServerContext) -> u32 {
|
|
let offset = ctx.next_token;
|
|
|
|
while let Ok(token) = ctx.rx.try_recv() {
|
|
ctx.requests.insert(ctx.next_token, token);
|
|
ctx.next_token += 1;
|
|
}
|
|
|
|
ctx.next_token - offset
|
|
}
|
|
|
|
pub struct NextRequestFast;
|
|
|
|
impl fast_api::FastFunction for NextRequestFast {
|
|
fn function(&self) -> *const c_void {
|
|
op_flash_next_fast as *const c_void
|
|
}
|
|
|
|
fn args(&self) -> &'static [fast_api::Type] {
|
|
&[fast_api::Type::V8Value]
|
|
}
|
|
|
|
fn return_type(&self) -> fast_api::CType {
|
|
fast_api::CType::Uint32
|
|
}
|
|
}
|
|
|
|
unsafe fn op_flash_next_fast(recv: v8::Local<v8::Object>) -> u32 {
|
|
let ptr =
|
|
recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX);
|
|
let ctx = &mut *(ptr as *mut ServerContext);
|
|
next_request_sync(ctx)
|
|
}
|
|
|
|
pub struct GetMethodFast;
|
|
|
|
impl fast_api::FastFunction for GetMethodFast {
|
|
fn function(&self) -> *const c_void {
|
|
op_flash_get_method_fast as *const c_void
|
|
}
|
|
|
|
fn args(&self) -> &'static [fast_api::Type] {
|
|
&[fast_api::Type::V8Value, fast_api::Type::Uint32]
|
|
}
|
|
|
|
fn return_type(&self) -> fast_api::CType {
|
|
fast_api::CType::Uint32
|
|
}
|
|
}
|
|
|
|
unsafe fn op_flash_get_method_fast(
|
|
recv: v8::Local<v8::Object>,
|
|
token: u32,
|
|
) -> u32 {
|
|
let ptr =
|
|
recv.get_aligned_pointer_from_internal_field(V8_WRAPPER_OBJECT_INDEX);
|
|
let ctx = &mut *(ptr as *mut ServerContext);
|
|
let req = ctx.requests.get_mut(&token).unwrap();
|
|
get_method(req)
|
|
}
|
|
|
|
// Fast calls
|
|
#[op(v8)]
|
|
fn op_flash_make_request<'scope>(
|
|
scope: &mut v8::HandleScope<'scope>,
|
|
state: &mut OpState,
|
|
) -> serde_v8::Value<'scope> {
|
|
let object_template = v8::ObjectTemplate::new(scope);
|
|
assert!(object_template
|
|
.set_internal_field_count((V8_WRAPPER_OBJECT_INDEX + 1) as usize));
|
|
let obj = object_template.new_instance(scope).unwrap();
|
|
let ctx = {
|
|
let flash_ctx = state.borrow_mut::<FlashContext>();
|
|
let ctx = flash_ctx.servers.get_mut(&0).unwrap();
|
|
ctx as *mut ServerContext
|
|
};
|
|
obj.set_aligned_pointer_in_internal_field(V8_WRAPPER_OBJECT_INDEX, ctx as _);
|
|
|
|
// nextRequest
|
|
{
|
|
let builder = v8::FunctionTemplate::builder(
|
|
|_: &mut v8::HandleScope,
|
|
args: v8::FunctionCallbackArguments,
|
|
mut rv: v8::ReturnValue| {
|
|
let external: v8::Local<v8::External> =
|
|
args.data().unwrap().try_into().unwrap();
|
|
// SAFETY: This external is guaranteed to be a pointer to a ServerContext
|
|
let ctx = unsafe { &mut *(external.value() as *mut ServerContext) };
|
|
rv.set_uint32(next_request_sync(ctx));
|
|
},
|
|
)
|
|
.data(v8::External::new(scope, ctx as *mut _).into());
|
|
|
|
let func = builder.build_fast(scope, &NextRequestFast, None);
|
|
let func: v8::Local<v8::Value> = func.get_function(scope).unwrap().into();
|
|
|
|
let key = v8::String::new(scope, "nextRequest").unwrap();
|
|
obj.set(scope, key.into(), func).unwrap();
|
|
}
|
|
|
|
// getMethod
|
|
{
|
|
let builder = v8::FunctionTemplate::builder(
|
|
|scope: &mut v8::HandleScope,
|
|
args: v8::FunctionCallbackArguments,
|
|
mut rv: v8::ReturnValue| {
|
|
let external: v8::Local<v8::External> =
|
|
args.data().unwrap().try_into().unwrap();
|
|
// SAFETY: This external is guaranteed to be a pointer to a ServerContext
|
|
let ctx = unsafe { &mut *(external.value() as *mut ServerContext) };
|
|
let token = args.get(0).uint32_value(scope).unwrap();
|
|
let req = ctx.requests.get_mut(&token).unwrap();
|
|
rv.set_uint32(get_method(req));
|
|
},
|
|
)
|
|
.data(v8::External::new(scope, ctx as *mut _).into());
|
|
|
|
let func = builder.build_fast(scope, &GetMethodFast, None);
|
|
let func: v8::Local<v8::Value> = func.get_function(scope).unwrap().into();
|
|
|
|
let key = v8::String::new(scope, "getMethod").unwrap();
|
|
obj.set(scope, key.into(), func).unwrap();
|
|
}
|
|
|
|
// respond
|
|
{
|
|
let builder = v8::FunctionTemplate::builder(
|
|
|scope: &mut v8::HandleScope,
|
|
args: v8::FunctionCallbackArguments,
|
|
mut rv: v8::ReturnValue| {
|
|
let external: v8::Local<v8::External> =
|
|
args.data().unwrap().try_into().unwrap();
|
|
// SAFETY: This external is guaranteed to be a pointer to a ServerContext
|
|
let ctx = unsafe { &mut *(external.value() as *mut ServerContext) };
|
|
|
|
let token = args.get(0).uint32_value(scope).unwrap();
|
|
|
|
let response: v8::Local<v8::ArrayBufferView> =
|
|
args.get(1).try_into().unwrap();
|
|
let ab = response.buffer(scope).unwrap();
|
|
let store = ab.get_backing_store();
|
|
let (offset, len) = (response.byte_offset(), response.byte_length());
|
|
// SAFETY: v8::SharedRef<v8::BackingStore> is similar to Arc<[u8]>,
|
|
// it points to a fixed continuous slice of bytes on the heap.
|
|
// We assume it's initialized and thus safe to read (though may not contain meaningful data)
|
|
let response = unsafe {
|
|
&*(&store[offset..offset + len] as *const _ as *const [u8])
|
|
};
|
|
|
|
let shutdown = args.get(2).boolean_value(scope);
|
|
|
|
rv.set_uint32(flash_respond(ctx, token, shutdown, response));
|
|
},
|
|
)
|
|
.data(v8::External::new(scope, ctx as *mut _).into());
|
|
|
|
let func = builder.build_fast(scope, &RespondFast, None);
|
|
let func: v8::Local<v8::Value> = func.get_function(scope).unwrap().into();
|
|
|
|
let key = v8::String::new(scope, "respond").unwrap();
|
|
obj.set(scope, key.into(), func).unwrap();
|
|
}
|
|
|
|
let value: v8::Local<v8::Value> = obj.into();
|
|
value.into()
|
|
}
|
|
|
|
#[inline]
|
|
fn has_body_stream(req: &Request) -> bool {
|
|
let sock = req.socket();
|
|
sock.read_rx.is_some()
|
|
}
|
|
|
|
#[op]
|
|
fn op_flash_has_body_stream(
|
|
op_state: &mut OpState,
|
|
server_id: u32,
|
|
token: u32,
|
|
) -> bool {
|
|
let req = get_request!(op_state, server_id, token);
|
|
has_body_stream(req)
|
|
}
|
|
|
|
#[op]
|
|
fn op_flash_headers(
|
|
state: Rc<RefCell<OpState>>,
|
|
server_id: u32,
|
|
token: u32,
|
|
) -> Result<Vec<(ByteString, ByteString)>, AnyError> {
|
|
let mut op_state = state.borrow_mut();
|
|
let flash_ctx = op_state.borrow_mut::<FlashContext>();
|
|
let ctx = flash_ctx
|
|
.servers
|
|
.get_mut(&server_id)
|
|
.ok_or_else(|| type_error("server closed"))?;
|
|
let inner_req = &ctx
|
|
.requests
|
|
.get(&token)
|
|
.ok_or_else(|| type_error("request closed"))?
|
|
.inner
|
|
.req;
|
|
Ok(
|
|
inner_req
|
|
.headers
|
|
.iter()
|
|
.map(|h| (h.name.as_bytes().into(), h.value.into()))
|
|
.collect(),
|
|
)
|
|
}
|
|
|
|
// Remember the first packet we read? It probably also has some body data. This op quickly copies it into
|
|
// a buffer and sets up channels for streaming the rest.
|
|
#[op]
|
|
fn op_flash_first_packet(
|
|
op_state: &mut OpState,
|
|
server_id: u32,
|
|
token: u32,
|
|
) -> Result<Option<ZeroCopyBuf>, AnyError> {
|
|
let tx = get_request!(op_state, server_id, token);
|
|
let sock = tx.socket();
|
|
|
|
if !tx.te_chunked && tx.content_length.is_none() {
|
|
return Ok(None);
|
|
}
|
|
|
|
if tx.expect_continue {
|
|
let _ = sock.write(b"HTTP/1.1 100 Continue\r\n\r\n");
|
|
tx.expect_continue = false;
|
|
}
|
|
|
|
let buffer = &tx.inner.buffer[tx.inner.body_offset..tx.inner.body_len];
|
|
// Oh there is nothing here.
|
|
if buffer.is_empty() {
|
|
return Ok(Some(ZeroCopyBuf::empty()));
|
|
}
|
|
|
|
if tx.te_chunked {
|
|
let mut buf = vec![0; 1024];
|
|
let mut offset = 0;
|
|
let mut decoder = chunked::Decoder::new(
|
|
std::io::Cursor::new(buffer),
|
|
tx.remaining_chunk_size,
|
|
);
|
|
|
|
loop {
|
|
match decoder.read(&mut buf[offset..]) {
|
|
Ok(n) => {
|
|
tx.remaining_chunk_size = decoder.remaining_chunks_size;
|
|
offset += n;
|
|
|
|
if n == 0 {
|
|
tx.te_chunked = false;
|
|
buf.truncate(offset);
|
|
return Ok(Some(buf.into()));
|
|
}
|
|
|
|
if offset < buf.len()
|
|
&& decoder.source.position() < buffer.len() as u64
|
|
{
|
|
continue;
|
|
}
|
|
|
|
buf.truncate(offset);
|
|
return Ok(Some(buf.into()));
|
|
}
|
|
Err(e) => {
|
|
return Err(type_error(format!("{}", e)));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
tx.content_length
|
|
.ok_or_else(|| type_error("no content-length"))?;
|
|
tx.content_read += buffer.len();
|
|
|
|
Ok(Some(buffer.to_vec().into()))
|
|
}
|
|
|
|
#[op]
|
|
async fn op_flash_read_body(
|
|
state: Rc<RefCell<OpState>>,
|
|
server_id: u32,
|
|
token: u32,
|
|
mut buf: ZeroCopyBuf,
|
|
) -> usize {
|
|
// SAFETY: we cannot hold op_state borrow across the await point. The JS caller
|
|
// is responsible for ensuring this is not called concurrently.
|
|
let ctx = unsafe {
|
|
{
|
|
let op_state = &mut state.borrow_mut();
|
|
let flash_ctx = op_state.borrow_mut::<FlashContext>();
|
|
flash_ctx.servers.get_mut(&server_id).unwrap() as *mut ServerContext
|
|
}
|
|
.as_mut()
|
|
.unwrap()
|
|
};
|
|
let tx = ctx.requests.get_mut(&token).unwrap();
|
|
|
|
if tx.te_chunked {
|
|
let mut decoder =
|
|
chunked::Decoder::new(tx.socket(), tx.remaining_chunk_size);
|
|
loop {
|
|
let sock = tx.socket();
|
|
|
|
let _lock = sock.read_lock.lock().unwrap();
|
|
match decoder.read(&mut buf) {
|
|
Ok(n) => {
|
|
tx.remaining_chunk_size = decoder.remaining_chunks_size;
|
|
return n;
|
|
}
|
|
Err(e) if e.kind() == std::io::ErrorKind::InvalidInput => {
|
|
panic!("chunked read error: {}", e);
|
|
}
|
|
Err(_) => {
|
|
drop(_lock);
|
|
sock.read_rx.as_mut().unwrap().recv().await.unwrap();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if let Some(content_length) = tx.content_length {
|
|
let sock = tx.socket();
|
|
let l = sock.read_lock.clone();
|
|
|
|
loop {
|
|
let _lock = l.lock().unwrap();
|
|
if tx.content_read >= content_length as usize {
|
|
return 0;
|
|
}
|
|
match sock.read(&mut buf) {
|
|
Ok(n) => {
|
|
tx.content_read += n;
|
|
return n;
|
|
}
|
|
_ => {
|
|
drop(_lock);
|
|
sock.read_rx.as_mut().unwrap().recv().await.unwrap();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
0
|
|
}
|
|
|
|
// https://github.com/hyperium/hyper/blob/0c8ee93d7f557afc63ca2a5686d19071813ab2b7/src/headers.rs#L67
|
|
#[inline]
|
|
fn from_digits(bytes: &[u8]) -> Option<u64> {
|
|
// cannot use FromStr for u64, since it allows a signed prefix
|
|
let mut result = 0u64;
|
|
const RADIX: u64 = 10;
|
|
if bytes.is_empty() {
|
|
return None;
|
|
}
|
|
for &b in bytes {
|
|
// can't use char::to_digit, since we haven't verified these bytes
|
|
// are utf-8.
|
|
match b {
|
|
b'0'..=b'9' => {
|
|
result = result.checked_mul(RADIX)?;
|
|
result = result.checked_add((b - b'0') as u64)?;
|
|
}
|
|
_ => {
|
|
return None;
|
|
}
|
|
}
|
|
}
|
|
Some(result)
|
|
}
|
|
|
|
#[inline]
|
|
fn connection_has(value: &HeaderValue, needle: &str) -> bool {
|
|
if let Ok(s) = value.to_str() {
|
|
for val in s.split(',') {
|
|
if val.trim().eq_ignore_ascii_case(needle) {
|
|
return true;
|
|
}
|
|
}
|
|
}
|
|
false
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct ListenOpts {
|
|
cert: Option<String>,
|
|
key: Option<String>,
|
|
hostname: String,
|
|
port: u16,
|
|
reuseport: bool,
|
|
}
|
|
|
|
fn run_server(
|
|
tx: mpsc::Sender<Request>,
|
|
listening_tx: mpsc::Sender<u16>,
|
|
mut close_rx: mpsc::Receiver<()>,
|
|
addr: SocketAddr,
|
|
maybe_cert: Option<String>,
|
|
maybe_key: Option<String>,
|
|
reuseport: bool,
|
|
) -> Result<(), AnyError> {
|
|
let domain = if addr.is_ipv4() {
|
|
socket2::Domain::IPV4
|
|
} else {
|
|
socket2::Domain::IPV6
|
|
};
|
|
let socket = Socket::new(domain, socket2::Type::STREAM, None)?;
|
|
|
|
#[cfg(not(windows))]
|
|
socket.set_reuse_address(true)?;
|
|
if reuseport {
|
|
#[cfg(target_os = "linux")]
|
|
socket.set_reuse_port(true)?;
|
|
}
|
|
|
|
let socket_addr = socket2::SockAddr::from(addr);
|
|
socket.bind(&socket_addr)?;
|
|
socket.listen(128)?;
|
|
socket.set_nonblocking(true)?;
|
|
let std_listener: std::net::TcpListener = socket.into();
|
|
let mut listener = TcpListener::from_std(std_listener);
|
|
|
|
let mut poll = Poll::new()?;
|
|
let token = Token(0);
|
|
poll
|
|
.registry()
|
|
.register(&mut listener, token, Interest::READABLE)
|
|
.unwrap();
|
|
|
|
let tls_context: Option<Arc<rustls::ServerConfig>> = {
|
|
if let Some(cert) = maybe_cert {
|
|
let key = maybe_key.unwrap();
|
|
let certificate_chain: Vec<rustls::Certificate> =
|
|
load_certs(&mut BufReader::new(cert.as_bytes()))?;
|
|
let private_key = load_private_keys(key.as_bytes())?.remove(0);
|
|
|
|
let config = rustls::ServerConfig::builder()
|
|
.with_safe_defaults()
|
|
.with_no_client_auth()
|
|
.with_single_cert(certificate_chain, private_key)
|
|
.expect("invalid key or certificate");
|
|
Some(Arc::new(config))
|
|
} else {
|
|
None
|
|
}
|
|
};
|
|
|
|
listening_tx
|
|
.blocking_send(listener.local_addr().unwrap().port())
|
|
.unwrap();
|
|
let mut sockets = HashMap::with_capacity(1000);
|
|
let mut counter: usize = 1;
|
|
let mut events = Events::with_capacity(1024);
|
|
'outer: loop {
|
|
let result = close_rx.try_recv();
|
|
if result.is_ok() {
|
|
break 'outer;
|
|
}
|
|
// FIXME(bartlomieju): how does Tokio handle it? I just put random 100ms
|
|
// timeout here to handle close signal.
|
|
match poll.poll(&mut events, Some(Duration::from_millis(100))) {
|
|
Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
|
|
Err(e) => panic!("{}", e),
|
|
Ok(()) => (),
|
|
}
|
|
'events: for event in &events {
|
|
if close_rx.try_recv().is_ok() {
|
|
break 'outer;
|
|
}
|
|
let token = event.token();
|
|
match token {
|
|
Token(0) => loop {
|
|
match listener.accept() {
|
|
Ok((mut socket, _)) => {
|
|
counter += 1;
|
|
let token = Token(counter);
|
|
poll
|
|
.registry()
|
|
.register(&mut socket, token, Interest::READABLE)
|
|
.unwrap();
|
|
|
|
let socket = match tls_context {
|
|
Some(ref tls_conf) => {
|
|
let connection =
|
|
rustls::ServerConnection::new(tls_conf.clone()).unwrap();
|
|
InnerStream::Tls(Box::new(rustls::StreamOwned::new(
|
|
connection, socket,
|
|
)))
|
|
}
|
|
None => InnerStream::Tcp(socket),
|
|
};
|
|
let stream = Box::pin(Stream {
|
|
inner: socket,
|
|
detached: false,
|
|
read_rx: None,
|
|
read_tx: None,
|
|
read_lock: Arc::new(Mutex::new(())),
|
|
parse_done: ParseStatus::None,
|
|
buffer: UnsafeCell::new(vec![0_u8; 1024]),
|
|
});
|
|
|
|
trace!("New connection: {}", token.0);
|
|
sockets.insert(token, stream);
|
|
}
|
|
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => break,
|
|
Err(_) => break,
|
|
}
|
|
},
|
|
token => {
|
|
let socket = sockets.get_mut(&token).unwrap();
|
|
// SAFETY: guarantee that we will never move the data out of the mutable reference.
|
|
let socket = unsafe {
|
|
let mut_ref: Pin<&mut Stream> = Pin::as_mut(socket);
|
|
Pin::get_unchecked_mut(mut_ref)
|
|
};
|
|
let sock_ptr = socket as *mut _;
|
|
|
|
if socket.detached {
|
|
match &mut socket.inner {
|
|
InnerStream::Tcp(ref mut socket) => {
|
|
poll.registry().deregister(socket).unwrap();
|
|
}
|
|
InnerStream::Tls(_) => {
|
|
todo!("upgrade tls not implemented");
|
|
}
|
|
}
|
|
|
|
let boxed = sockets.remove(&token).unwrap();
|
|
std::mem::forget(boxed);
|
|
trace!("Socket detached: {}", token.0);
|
|
continue;
|
|
}
|
|
|
|
debug_assert!(event.is_readable());
|
|
|
|
trace!("Socket readable: {}", token.0);
|
|
if let Some(tx) = &socket.read_tx {
|
|
{
|
|
let _l = socket.read_lock.lock().unwrap();
|
|
}
|
|
trace!("Sending readiness notification: {}", token.0);
|
|
let _ = tx.blocking_send(());
|
|
|
|
continue;
|
|
}
|
|
|
|
let mut headers = vec![httparse::EMPTY_HEADER; 40];
|
|
let mut req = httparse::Request::new(&mut headers);
|
|
let body_offset;
|
|
let body_len;
|
|
loop {
|
|
// SAFETY: It is safe for the read buf to be mutable here.
|
|
let buffer = unsafe { &mut *socket.buffer.get() };
|
|
let offset = match socket.parse_done {
|
|
ParseStatus::None => 0,
|
|
ParseStatus::Ongoing(offset) => offset,
|
|
};
|
|
if offset >= buffer.len() {
|
|
buffer.resize(offset * 2, 0);
|
|
}
|
|
let nread = socket.read(&mut buffer[offset..]);
|
|
|
|
match nread {
|
|
Ok(0) => {
|
|
trace!("Socket closed: {}", token.0);
|
|
// FIXME: don't remove while JS is writing!
|
|
// sockets.remove(&token);
|
|
continue 'events;
|
|
}
|
|
Ok(read) => {
|
|
match req.parse(&buffer[..offset + read]) {
|
|
Ok(httparse::Status::Complete(n)) => {
|
|
body_offset = n;
|
|
body_len = offset + read;
|
|
socket.parse_done = ParseStatus::None;
|
|
// On Windows, We must keep calling socket.read() until it fails with WouldBlock.
|
|
//
|
|
// Mio tries to emulate edge triggered events on Windows.
|
|
// AFAICT it only rearms the event on WouldBlock, but it doesn't when a partial read happens.
|
|
// https://github.com/denoland/deno/issues/15549
|
|
#[cfg(target_os = "windows")]
|
|
match &mut socket.inner {
|
|
InnerStream::Tcp(ref mut socket) => {
|
|
poll
|
|
.registry()
|
|
.reregister(socket, token, Interest::READABLE)
|
|
.unwrap();
|
|
}
|
|
InnerStream::Tls(ref mut socket) => {
|
|
poll
|
|
.registry()
|
|
.reregister(
|
|
&mut socket.sock,
|
|
token,
|
|
Interest::READABLE,
|
|
)
|
|
.unwrap();
|
|
}
|
|
};
|
|
break;
|
|
}
|
|
Ok(httparse::Status::Partial) => {
|
|
socket.parse_done = ParseStatus::Ongoing(offset + read);
|
|
continue;
|
|
}
|
|
Err(_) => {
|
|
let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n");
|
|
continue 'events;
|
|
}
|
|
}
|
|
}
|
|
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
|
|
break 'events
|
|
}
|
|
Err(_) => break 'events,
|
|
}
|
|
}
|
|
|
|
debug_assert_eq!(socket.parse_done, ParseStatus::None);
|
|
if let Some(method) = &req.method {
|
|
if method == &"POST" || method == &"PUT" {
|
|
let (tx, rx) = mpsc::channel(100);
|
|
socket.read_tx = Some(tx);
|
|
socket.read_rx = Some(rx);
|
|
}
|
|
}
|
|
|
|
// SAFETY: It is safe for the read buf to be mutable here.
|
|
let buffer = unsafe { &mut *socket.buffer.get() };
|
|
let inner_req = InnerRequest {
|
|
// SAFETY: backing buffer is pinned and lives as long as the request.
|
|
req: unsafe { transmute::<httparse::Request<'_, '_>, _>(req) },
|
|
// SAFETY: backing buffer is pinned and lives as long as the request.
|
|
_headers: unsafe {
|
|
transmute::<Vec<httparse::Header<'_>>, _>(headers)
|
|
},
|
|
buffer: Pin::new(
|
|
replace(buffer, vec![0_u8; 1024]).into_boxed_slice(),
|
|
),
|
|
body_offset,
|
|
body_len,
|
|
};
|
|
// h1
|
|
// https://github.com/tiny-http/tiny-http/blob/master/src/client.rs#L177
|
|
// https://github.com/hyperium/hyper/blob/4545c3ef191ce9b5f5d250ee27c4c96f9b71d2c6/src/proto/h1/role.rs#L127
|
|
let mut keep_alive = inner_req.req.version.unwrap() == 1;
|
|
let mut expect_continue = false;
|
|
let mut te = false;
|
|
let mut te_chunked = false;
|
|
let mut content_length = None;
|
|
for header in inner_req.req.headers.iter() {
|
|
match HeaderName::from_bytes(header.name.as_bytes()) {
|
|
Ok(CONNECTION) => {
|
|
// SAFETY: illegal bytes are validated by httparse.
|
|
let value = unsafe {
|
|
HeaderValue::from_maybe_shared_unchecked(header.value)
|
|
};
|
|
if keep_alive {
|
|
// 1.1
|
|
keep_alive = !connection_has(&value, "close");
|
|
} else {
|
|
// 1.0
|
|
keep_alive = connection_has(&value, "keep-alive");
|
|
}
|
|
}
|
|
Ok(TRANSFER_ENCODING) => {
|
|
// https://tools.ietf.org/html/rfc7230#section-3.3.3
|
|
debug_assert!(inner_req.req.version.unwrap() == 1);
|
|
// Two states for Transfer-Encoding because we want to make sure Content-Length handling knows it.
|
|
te = true;
|
|
content_length = None;
|
|
// SAFETY: illegal bytes are validated by httparse.
|
|
let value = unsafe {
|
|
HeaderValue::from_maybe_shared_unchecked(header.value)
|
|
};
|
|
if let Ok(Some(encoding)) =
|
|
value.to_str().map(|s| s.rsplit(',').next())
|
|
{
|
|
// Chunked must always be the last encoding
|
|
if encoding.trim().eq_ignore_ascii_case("chunked") {
|
|
te_chunked = true;
|
|
}
|
|
}
|
|
}
|
|
// Transfer-Encoding overrides the Content-Length.
|
|
Ok(CONTENT_LENGTH) if !te => {
|
|
if let Some(len) = from_digits(header.value) {
|
|
if let Some(prev) = content_length {
|
|
if prev != len {
|
|
let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n");
|
|
continue 'events;
|
|
}
|
|
continue;
|
|
}
|
|
content_length = Some(len);
|
|
} else {
|
|
let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n");
|
|
continue 'events;
|
|
}
|
|
}
|
|
Ok(EXPECT) if inner_req.req.version.unwrap() != 0 => {
|
|
expect_continue =
|
|
header.value.eq_ignore_ascii_case(b"100-continue");
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
|
|
// There is Transfer-Encoding but its not chunked.
|
|
if te && !te_chunked {
|
|
let _ = socket.write(b"HTTP/1.1 400 Bad Request\r\n\r\n");
|
|
continue 'events;
|
|
}
|
|
|
|
tx.blocking_send(Request {
|
|
socket: sock_ptr,
|
|
// SAFETY: headers backing buffer outlives the mio event loop ('static)
|
|
inner: inner_req,
|
|
keep_alive,
|
|
te_chunked,
|
|
remaining_chunk_size: None,
|
|
content_read: 0,
|
|
content_length,
|
|
expect_continue,
|
|
})
|
|
.ok();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn make_addr_port_pair(hostname: &str, port: u16) -> (&str, u16) {
|
|
// Default to localhost if given just the port. Example: ":80"
|
|
if hostname.is_empty() {
|
|
return ("0.0.0.0", port);
|
|
}
|
|
|
|
// If this looks like an ipv6 IP address. Example: "[2001:db8::1]"
|
|
// Then we remove the brackets.
|
|
let addr = hostname.trim_start_matches('[').trim_end_matches(']');
|
|
(addr, port)
|
|
}
|
|
|
|
/// Resolve network address *synchronously*.
|
|
pub fn resolve_addr_sync(
|
|
hostname: &str,
|
|
port: u16,
|
|
) -> Result<impl Iterator<Item = SocketAddr>, AnyError> {
|
|
let addr_port_pair = make_addr_port_pair(hostname, port);
|
|
let result = addr_port_pair.to_socket_addrs()?;
|
|
Ok(result)
|
|
}
|
|
|
|
#[op]
|
|
fn op_flash_serve<P>(
|
|
state: &mut OpState,
|
|
opts: ListenOpts,
|
|
) -> Result<u32, AnyError>
|
|
where
|
|
P: FlashPermissions + 'static,
|
|
{
|
|
check_unstable(state, "Deno.serve");
|
|
state
|
|
.borrow_mut::<P>()
|
|
.check_net(&(&opts.hostname, Some(opts.port)), "Deno.serve()")?;
|
|
|
|
let addr = resolve_addr_sync(&opts.hostname, opts.port)?
|
|
.next()
|
|
.ok_or_else(|| generic_error("No resolved address found"))?;
|
|
let (tx, rx) = mpsc::channel(100);
|
|
let (close_tx, close_rx) = mpsc::channel(1);
|
|
let (listening_tx, listening_rx) = mpsc::channel(1);
|
|
let ctx = ServerContext {
|
|
_addr: addr,
|
|
tx,
|
|
rx,
|
|
requests: HashMap::with_capacity(1000),
|
|
next_token: 0,
|
|
close_tx,
|
|
listening_rx: Some(listening_rx),
|
|
cancel_handle: CancelHandle::new_rc(),
|
|
};
|
|
let tx = ctx.tx.clone();
|
|
let maybe_cert = opts.cert;
|
|
let maybe_key = opts.key;
|
|
let reuseport = opts.reuseport;
|
|
let join_handle = tokio::task::spawn_blocking(move || {
|
|
run_server(
|
|
tx,
|
|
listening_tx,
|
|
close_rx,
|
|
addr,
|
|
maybe_cert,
|
|
maybe_key,
|
|
reuseport,
|
|
)
|
|
});
|
|
let flash_ctx = state.borrow_mut::<FlashContext>();
|
|
let server_id = flash_ctx.next_server_id;
|
|
flash_ctx.next_server_id += 1;
|
|
flash_ctx.join_handles.insert(server_id, join_handle);
|
|
flash_ctx.servers.insert(server_id, ctx);
|
|
Ok(server_id)
|
|
}
|
|
|
|
#[op]
|
|
fn op_flash_wait_for_listening(
|
|
state: &mut OpState,
|
|
server_id: u32,
|
|
) -> Result<impl Future<Output = Result<u16, AnyError>> + 'static, AnyError> {
|
|
let mut listening_rx = {
|
|
let flash_ctx = state.borrow_mut::<FlashContext>();
|
|
let server_ctx = flash_ctx
|
|
.servers
|
|
.get_mut(&server_id)
|
|
.ok_or_else(|| type_error("server not found"))?;
|
|
server_ctx.listening_rx.take().unwrap()
|
|
};
|
|
Ok(async move {
|
|
if let Some(port) = listening_rx.recv().await {
|
|
Ok(port)
|
|
} else {
|
|
Err(generic_error("This error will be discarded"))
|
|
}
|
|
})
|
|
}
|
|
|
|
#[op]
|
|
fn op_flash_drive_server(
|
|
state: &mut OpState,
|
|
server_id: u32,
|
|
) -> Result<impl Future<Output = Result<(), AnyError>> + 'static, AnyError> {
|
|
let join_handle = {
|
|
let flash_ctx = state.borrow_mut::<FlashContext>();
|
|
flash_ctx
|
|
.join_handles
|
|
.remove(&server_id)
|
|
.ok_or_else(|| type_error("server not found"))?
|
|
};
|
|
Ok(async move {
|
|
join_handle
|
|
.await
|
|
.map_err(|_| type_error("server join error"))??;
|
|
Ok(())
|
|
})
|
|
}
|
|
|
|
// Asychronous version of op_flash_next. This can be a bottleneck under
|
|
// heavy load, it should be used as a fallback if there are no buffered
|
|
// requests i.e `op_flash_next() == 0`.
|
|
#[op]
|
|
async fn op_flash_next_async(
|
|
op_state: Rc<RefCell<OpState>>,
|
|
server_id: u32,
|
|
) -> u32 {
|
|
let ctx = {
|
|
let mut op_state = op_state.borrow_mut();
|
|
let flash_ctx = op_state.borrow_mut::<FlashContext>();
|
|
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
|
|
ctx as *mut ServerContext
|
|
};
|
|
// SAFETY: we cannot hold op_state borrow across the await point. The JS caller
|
|
// is responsible for ensuring this is not called concurrently.
|
|
let ctx = unsafe { &mut *ctx };
|
|
let cancel_handle = &ctx.cancel_handle;
|
|
|
|
if let Ok(Some(req)) = ctx.rx.recv().or_cancel(cancel_handle).await {
|
|
ctx.requests.insert(ctx.next_token, req);
|
|
ctx.next_token += 1;
|
|
return 1;
|
|
}
|
|
|
|
0
|
|
}
|
|
|
|
// Synchronous version of op_flash_next_async. Under heavy load,
|
|
// this can collect buffered requests from rx channel and return tokens in a single batch.
|
|
//
|
|
// perf: please do not add any arguments to this op. With optimizations enabled,
|
|
// the ContextScope creation is optimized away and the op is as simple as:
|
|
// f(info: *const v8::FunctionCallbackInfo) { let rv = ...; rv.set_uint32(op_flash_next()); }
|
|
#[op]
|
|
fn op_flash_next(state: &mut OpState) -> u32 {
|
|
let flash_ctx = state.borrow_mut::<FlashContext>();
|
|
let ctx = flash_ctx.servers.get_mut(&0).unwrap();
|
|
next_request_sync(ctx)
|
|
}
|
|
|
|
// Syncrhonous version of op_flash_next_async. Under heavy load,
|
|
// this can collect buffered requests from rx channel and return tokens in a single batch.
|
|
#[op]
|
|
fn op_flash_next_server(state: &mut OpState, server_id: u32) -> u32 {
|
|
let flash_ctx = state.borrow_mut::<FlashContext>();
|
|
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
|
|
next_request_sync(ctx)
|
|
}
|
|
|
|
// Wrapper type for tokio::net::TcpStream that implements
|
|
// deno_websocket::UpgradedStream
|
|
struct UpgradedStream(tokio::net::TcpStream);
|
|
impl tokio::io::AsyncRead for UpgradedStream {
|
|
fn poll_read(
|
|
self: Pin<&mut Self>,
|
|
cx: &mut Context,
|
|
buf: &mut tokio::io::ReadBuf,
|
|
) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
|
|
Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
|
|
}
|
|
}
|
|
|
|
impl tokio::io::AsyncWrite for UpgradedStream {
|
|
fn poll_write(
|
|
self: Pin<&mut Self>,
|
|
cx: &mut Context,
|
|
buf: &[u8],
|
|
) -> std::task::Poll<Result<usize, std::io::Error>> {
|
|
Pin::new(&mut self.get_mut().0).poll_write(cx, buf)
|
|
}
|
|
fn poll_flush(
|
|
self: Pin<&mut Self>,
|
|
cx: &mut Context,
|
|
) -> std::task::Poll<Result<(), std::io::Error>> {
|
|
Pin::new(&mut self.get_mut().0).poll_flush(cx)
|
|
}
|
|
fn poll_shutdown(
|
|
self: Pin<&mut Self>,
|
|
cx: &mut Context,
|
|
) -> std::task::Poll<Result<(), std::io::Error>> {
|
|
Pin::new(&mut self.get_mut().0).poll_shutdown(cx)
|
|
}
|
|
}
|
|
|
|
impl deno_websocket::Upgraded for UpgradedStream {}
|
|
|
|
#[inline]
|
|
pub fn detach_socket(
|
|
ctx: &mut ServerContext,
|
|
token: u32,
|
|
) -> Result<tokio::net::TcpStream, AnyError> {
|
|
// Two main 'hacks' to get this working:
|
|
// * make server thread forget about the socket. `detach_ownership` prevents the socket from being
|
|
// dropped on the server thread.
|
|
// * conversion from mio::net::TcpStream -> tokio::net::TcpStream. There is no public API so we
|
|
// use raw fds.
|
|
let tx = ctx
|
|
.requests
|
|
.remove(&token)
|
|
.ok_or_else(|| type_error("request closed"))?;
|
|
let stream = tx.socket();
|
|
// prevent socket from being dropped on server thread.
|
|
// TODO(@littledivy): Box-ify, since there is no overhead.
|
|
stream.detach_ownership();
|
|
|
|
#[cfg(unix)]
|
|
let std_stream = {
|
|
use std::os::unix::prelude::AsRawFd;
|
|
use std::os::unix::prelude::FromRawFd;
|
|
let fd = match stream.inner {
|
|
InnerStream::Tcp(ref tcp) => tcp.as_raw_fd(),
|
|
_ => todo!(),
|
|
};
|
|
// SAFETY: `fd` is a valid file descriptor.
|
|
unsafe { std::net::TcpStream::from_raw_fd(fd) }
|
|
};
|
|
#[cfg(windows)]
|
|
let std_stream = {
|
|
use std::os::windows::prelude::AsRawSocket;
|
|
use std::os::windows::prelude::FromRawSocket;
|
|
let fd = match stream.inner {
|
|
InnerStream::Tcp(ref tcp) => tcp.as_raw_socket(),
|
|
_ => todo!(),
|
|
};
|
|
// SAFETY: `fd` is a valid file descriptor.
|
|
unsafe { std::net::TcpStream::from_raw_socket(fd) }
|
|
};
|
|
let stream = tokio::net::TcpStream::from_std(std_stream)?;
|
|
Ok(stream)
|
|
}
|
|
|
|
#[op]
|
|
async fn op_flash_upgrade_websocket(
|
|
state: Rc<RefCell<OpState>>,
|
|
server_id: u32,
|
|
token: u32,
|
|
) -> Result<deno_core::ResourceId, AnyError> {
|
|
let stream = {
|
|
let op_state = &mut state.borrow_mut();
|
|
let flash_ctx = op_state.borrow_mut::<FlashContext>();
|
|
detach_socket(flash_ctx.servers.get_mut(&server_id).unwrap(), token)?
|
|
};
|
|
deno_websocket::ws_create_server_stream(
|
|
&state,
|
|
Box::pin(UpgradedStream(stream)),
|
|
)
|
|
.await
|
|
}
|
|
|
|
pub struct Unstable(pub bool);
|
|
|
|
fn check_unstable(state: &OpState, api_name: &str) {
|
|
let unstable = state.borrow::<Unstable>();
|
|
|
|
if !unstable.0 {
|
|
eprintln!(
|
|
"Unstable API '{}'. The --unstable flag must be provided.",
|
|
api_name
|
|
);
|
|
std::process::exit(70);
|
|
}
|
|
}
|
|
|
|
pub trait FlashPermissions {
|
|
fn check_net<T: AsRef<str>>(
|
|
&mut self,
|
|
_host: &(T, Option<u16>),
|
|
_api_name: &str,
|
|
) -> Result<(), AnyError>;
|
|
}
|
|
|
|
pub fn init<P: FlashPermissions + 'static>(unstable: bool) -> Extension {
|
|
Extension::builder()
|
|
.js(deno_core::include_js_files!(
|
|
prefix "deno:ext/flash",
|
|
"01_http.js",
|
|
))
|
|
.ops(vec![
|
|
op_flash_serve::decl::<P>(),
|
|
op_flash_respond::decl(),
|
|
op_flash_respond_async::decl(),
|
|
op_flash_respond_chuncked::decl(),
|
|
op_flash_method::decl(),
|
|
op_flash_path::decl(),
|
|
op_flash_headers::decl(),
|
|
op_flash_next::decl(),
|
|
op_flash_next_server::decl(),
|
|
op_flash_next_async::decl(),
|
|
op_flash_read_body::decl(),
|
|
op_flash_upgrade_websocket::decl(),
|
|
op_flash_drive_server::decl(),
|
|
op_flash_wait_for_listening::decl(),
|
|
op_flash_first_packet::decl(),
|
|
op_flash_has_body_stream::decl(),
|
|
op_flash_close_server::decl(),
|
|
op_flash_make_request::decl(),
|
|
op_flash_write_resource::decl(),
|
|
])
|
|
.state(move |op_state| {
|
|
op_state.put(Unstable(unstable));
|
|
op_state.put(FlashContext {
|
|
next_server_id: 0,
|
|
join_handles: HashMap::default(),
|
|
servers: HashMap::default(),
|
|
});
|
|
Ok(())
|
|
})
|
|
.build()
|
|
}
|