1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-08 07:08:27 -05:00

refactor(ext/http): generic abstract listeners (#19132)

Improve abstractions around listeners to support listener + connection
network stream combinations not previously possible (for example a
listener exposed as a Tcp, creating Unix network streams).
This commit is contained in:
Luca Casonato 2023-05-15 16:55:47 +02:00 committed by Levente Kurusa
parent 6e9c95785d
commit f7ef08b6ef
No known key found for this signature in database
GPG key ID: 9F72F3C05BA137C4
5 changed files with 118 additions and 72 deletions

1
Cargo.lock generated
View file

@ -1026,6 +1026,7 @@ name = "deno_http"
version = "0.99.0" version = "0.99.0"
dependencies = [ dependencies = [
"async-compression", "async-compression",
"async-trait",
"base64 0.13.1", "base64 0.13.1",
"bencher", "bencher",
"brotli", "brotli",

View file

@ -22,6 +22,7 @@ harness = false
[dependencies] [dependencies]
async-compression = { version = "0.3.12", features = ["tokio", "brotli", "gzip"] } async-compression = { version = "0.3.12", features = ["tokio", "brotli", "gzip"] }
async-trait.workspace = true
base64.workspace = true base64.workspace = true
brotli = "3.3.4" brotli = "3.3.4"
bytes.workspace = true bytes.workspace = true

View file

@ -33,7 +33,6 @@ use deno_core::ZeroCopyBuf;
use deno_net::ops_tls::TlsStream; use deno_net::ops_tls::TlsStream;
use deno_net::raw::put_network_stream_resource; use deno_net::raw::put_network_stream_resource;
use deno_net::raw::NetworkStream; use deno_net::raw::NetworkStream;
use deno_net::raw::NetworkStreamAddress;
use fly_accept_encoding::Encoding; use fly_accept_encoding::Encoding;
use http::header::ACCEPT_ENCODING; use http::header::ACCEPT_ENCODING;
use http::header::CACHE_CONTROL; use http::header::CACHE_CONTROL;
@ -61,9 +60,6 @@ use std::borrow::Cow;
use std::cell::RefCell; use std::cell::RefCell;
use std::future::Future; use std::future::Future;
use std::io; use std::io;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::net::SocketAddrV4;
use std::pin::Pin; use std::pin::Pin;
use std::rc::Rc; use std::rc::Rc;
@ -825,7 +821,7 @@ fn serve_http(
} }
fn serve_http_on<HTTP>( fn serve_http_on<HTTP>(
network_stream: NetworkStream, connection: HTTP::Connection,
listen_properties: &HttpListenProperties, listen_properties: &HttpListenProperties,
cancel: Rc<CancelHandle>, cancel: Rc<CancelHandle>,
tx: tokio::sync::mpsc::Sender<u32>, tx: tokio::sync::mpsc::Sender<u32>,
@ -833,15 +829,10 @@ fn serve_http_on<HTTP>(
where where
HTTP: HttpPropertyExtractor, HTTP: HttpPropertyExtractor,
{ {
// We always want some sort of peer address. If we can't get one, just make up one.
let peer_address = network_stream.peer_address().unwrap_or_else(|_| {
NetworkStreamAddress::Ip(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(0, 0, 0, 0),
0,
)))
});
let connection_properties: HttpConnectionProperties = let connection_properties: HttpConnectionProperties =
HTTP::connection_properties(listen_properties, &peer_address); HTTP::connection_properties(listen_properties, &connection);
let network_stream = HTTP::to_network_stream_from_connection(connection);
match network_stream { match network_stream {
NetworkStream::Tcp(conn) => { NetworkStream::Tcp(conn) => {
@ -895,14 +886,10 @@ pub fn op_http_serve<HTTP>(
where where
HTTP: HttpPropertyExtractor, HTTP: HttpPropertyExtractor,
{ {
let listener = HTTP::get_network_stream_listener_for_rid( let listener =
&mut state.borrow_mut(), HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid)?;
listener_rid,
)?;
let local_address = listener.listen_address()?; let listen_properties = HTTP::listen_properties_from_listener(&listener)?;
let listen_properties =
HTTP::listen_properties(listener.stream(), &local_address);
let (tx, rx) = tokio::sync::mpsc::channel(10); let (tx, rx) = tokio::sync::mpsc::channel(10);
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle( let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle(
@ -915,8 +902,7 @@ where
let listen_properties_clone: HttpListenProperties = listen_properties.clone(); let listen_properties_clone: HttpListenProperties = listen_properties.clone();
let handle = spawn(async move { let handle = spawn(async move {
loop { loop {
let conn = listener let conn = HTTP::accept_connection_from_listener(&listener)
.accept()
.try_or_cancel(cancel_clone.clone()) .try_or_cancel(cancel_clone.clone())
.await?; .await?;
serve_http_on::<HTTP>( serve_http_on::<HTTP>(
@ -945,17 +931,15 @@ where
#[op(v8)] #[op(v8)]
pub fn op_http_serve_on<HTTP>( pub fn op_http_serve_on<HTTP>(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
conn: ResourceId, connection_rid: ResourceId,
) -> Result<(ResourceId, &'static str, String), AnyError> ) -> Result<(ResourceId, &'static str, String), AnyError>
where where
HTTP: HttpPropertyExtractor, HTTP: HttpPropertyExtractor,
{ {
let network_stream: NetworkStream = let connection =
HTTP::get_network_stream_for_rid(&mut state.borrow_mut(), conn)?; HTTP::get_connection_for_rid(&mut state.borrow_mut(), connection_rid)?;
let local_address = network_stream.local_address()?; let listen_properties = HTTP::listen_properties_from_connection(&connection)?;
let listen_properties =
HTTP::listen_properties(network_stream.stream(), &local_address);
let (tx, rx) = tokio::sync::mpsc::channel(10); let (tx, rx) = tokio::sync::mpsc::channel(10);
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle( let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle(
@ -966,7 +950,7 @@ where
let handle: JoinHandle<Result<(), deno_core::anyhow::Error>> = let handle: JoinHandle<Result<(), deno_core::anyhow::Error>> =
serve_http_on::<HTTP>( serve_http_on::<HTTP>(
network_stream, connection,
&listen_properties, &listen_properties,
resource.cancel_handle(), resource.cancel_handle(),
tx, tx,

View file

@ -1,10 +1,10 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use deno_core::error::AnyError; use deno_core::error::AnyError;
use deno_core::OpState; use deno_core::OpState;
use deno_core::ResourceId; use deno_core::ResourceId;
use deno_net::raw::NetworkStream;
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use deno_net::raw::take_network_stream_listener_resource; use deno_net::raw::take_network_stream_listener_resource;
use deno_net::raw::take_network_stream_resource; use deno_net::raw::take_network_stream_resource;
use deno_net::raw::NetworkStream;
use deno_net::raw::NetworkStreamAddress; use deno_net::raw::NetworkStreamAddress;
use deno_net::raw::NetworkStreamListener; use deno_net::raw::NetworkStreamListener;
use deno_net::raw::NetworkStreamType; use deno_net::raw::NetworkStreamType;
@ -12,23 +12,26 @@ use hyper::HeaderMap;
use hyper::Uri; use hyper::Uri;
use hyper1::header::HOST; use hyper1::header::HOST;
use std::borrow::Cow; use std::borrow::Cow;
use std::net::Ipv4Addr;
use std::net::SocketAddr;
use std::net::SocketAddrV4;
use std::rc::Rc; use std::rc::Rc;
// TODO(mmastrac): I don't like that we have to clone this, but it's one-time setup // TODO(mmastrac): I don't like that we have to clone this, but it's one-time setup
#[derive(Clone)] #[derive(Clone)]
pub struct HttpListenProperties { pub struct HttpListenProperties {
pub stream_type: NetworkStreamType,
pub scheme: &'static str, pub scheme: &'static str,
pub fallback_host: String, pub fallback_host: String,
pub local_port: Option<u16>, pub local_port: Option<u16>,
pub stream_type: NetworkStreamType,
} }
#[derive(Clone)] #[derive(Clone)]
pub struct HttpConnectionProperties { pub struct HttpConnectionProperties {
pub stream_type: NetworkStreamType,
pub peer_address: Rc<str>, pub peer_address: Rc<str>,
pub peer_port: Option<u16>, pub peer_port: Option<u16>,
pub local_port: Option<u16>, pub local_port: Option<u16>,
pub stream_type: NetworkStreamType,
} }
pub struct HttpRequestProperties { pub struct HttpRequestProperties {
@ -37,31 +40,49 @@ pub struct HttpRequestProperties {
/// Pluggable trait to determine listen, connection and request properties /// Pluggable trait to determine listen, connection and request properties
/// for embedders that wish to provide alternative routes for incoming HTTP. /// for embedders that wish to provide alternative routes for incoming HTTP.
#[async_trait::async_trait(?Send)]
pub trait HttpPropertyExtractor { pub trait HttpPropertyExtractor {
/// Given a listener [`ResourceId`], returns the [`NetworkStreamListener`]. type Listener: 'static;
fn get_network_stream_listener_for_rid( type Connection;
/// Given a listener [`ResourceId`], returns the [`HttpPropertyExtractor::Listener`].
fn get_listener_for_rid(
state: &mut OpState, state: &mut OpState,
listener_rid: ResourceId, listener_rid: ResourceId,
) -> Result<NetworkStreamListener, AnyError>; ) -> Result<Self::Listener, AnyError>;
/// Given a connection [`ResourceId`], returns the [`NetworkStream`]. /// Given a connection [`ResourceId`], returns the [`HttpPropertyExtractor::Connection`].
fn get_network_stream_for_rid( fn get_connection_for_rid(
state: &mut OpState, state: &mut OpState,
rid: ResourceId, connection_rid: ResourceId,
) -> Result<NetworkStream, AnyError>; ) -> Result<Self::Connection, AnyError>;
/// Determines the listener properties. /// Determines the listener properties.
fn listen_properties( fn listen_properties_from_listener(
stream_type: NetworkStreamType, listener: &Self::Listener,
local_address: &NetworkStreamAddress, ) -> Result<HttpListenProperties, std::io::Error>;
) -> HttpListenProperties;
/// Determines the listener properties given a [`HttpPropertyExtractor::Connection`].
fn listen_properties_from_connection(
connection: &Self::Connection,
) -> Result<HttpListenProperties, std::io::Error>;
/// Accept a new [`HttpPropertyExtractor::Connection`] from the given listener [`HttpPropertyExtractor::Listener`].
async fn accept_connection_from_listener(
listener: &Self::Listener,
) -> Result<Self::Connection, AnyError>;
/// Determines the connection properties. /// Determines the connection properties.
fn connection_properties( fn connection_properties(
listen_properties: &HttpListenProperties, listen_properties: &HttpListenProperties,
peer_address: &NetworkStreamAddress, connection: &Self::Connection,
) -> HttpConnectionProperties; ) -> HttpConnectionProperties;
/// Turn a given [`HttpPropertyExtractor::Connection`] into a [`NetworkStream`].
fn to_network_stream_from_connection(
connection: Self::Connection,
) -> NetworkStream;
/// Determines the request properties. /// Determines the request properties.
fn request_properties( fn request_properties(
connection_properties: &HttpConnectionProperties, connection_properties: &HttpConnectionProperties,
@ -72,15 +93,13 @@ pub trait HttpPropertyExtractor {
pub struct DefaultHttpPropertyExtractor {} pub struct DefaultHttpPropertyExtractor {}
#[async_trait::async_trait(?Send)]
impl HttpPropertyExtractor for DefaultHttpPropertyExtractor { impl HttpPropertyExtractor for DefaultHttpPropertyExtractor {
fn get_network_stream_for_rid( type Listener = NetworkStreamListener;
state: &mut OpState,
rid: ResourceId,
) -> Result<NetworkStream, AnyError> {
take_network_stream_resource(&mut state.resource_table, rid)
}
fn get_network_stream_listener_for_rid( type Connection = NetworkStream;
fn get_listener_for_rid(
state: &mut OpState, state: &mut OpState,
listener_rid: ResourceId, listener_rid: ResourceId,
) -> Result<NetworkStreamListener, AnyError> { ) -> Result<NetworkStreamListener, AnyError> {
@ -90,30 +109,52 @@ impl HttpPropertyExtractor for DefaultHttpPropertyExtractor {
) )
} }
fn listen_properties( fn get_connection_for_rid(
stream_type: NetworkStreamType, state: &mut OpState,
local_address: &NetworkStreamAddress, stream_rid: ResourceId,
) -> HttpListenProperties { ) -> Result<NetworkStream, AnyError> {
let scheme = req_scheme_from_stream_type(stream_type); take_network_stream_resource(&mut state.resource_table, stream_rid)
let fallback_host = req_host_from_addr(stream_type, local_address); }
let local_port: Option<u16> = match local_address {
NetworkStreamAddress::Ip(ip) => Some(ip.port()),
#[cfg(unix)]
NetworkStreamAddress::Unix(_) => None,
};
HttpListenProperties { async fn accept_connection_from_listener(
scheme, listener: &NetworkStreamListener,
fallback_host, ) -> Result<NetworkStream, AnyError> {
local_port, listener.accept().await.map_err(Into::into)
stream_type, }
}
fn listen_properties_from_listener(
listener: &NetworkStreamListener,
) -> Result<HttpListenProperties, std::io::Error> {
let stream_type = listener.stream();
let local_address = listener.listen_address()?;
listener_properties(stream_type, local_address)
}
fn listen_properties_from_connection(
connection: &Self::Connection,
) -> Result<HttpListenProperties, std::io::Error> {
let stream_type = connection.stream();
let local_address = connection.local_address()?;
listener_properties(stream_type, local_address)
}
fn to_network_stream_from_connection(
connection: Self::Connection,
) -> NetworkStream {
connection
} }
fn connection_properties( fn connection_properties(
listen_properties: &HttpListenProperties, listen_properties: &HttpListenProperties,
peer_address: &NetworkStreamAddress, connection: &NetworkStream,
) -> HttpConnectionProperties { ) -> HttpConnectionProperties {
// We always want some sort of peer address. If we can't get one, just make up one.
let peer_address = connection.peer_address().unwrap_or_else(|_| {
NetworkStreamAddress::Ip(SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(0, 0, 0, 0),
0,
)))
});
let peer_port: Option<u16> = match peer_address { let peer_port: Option<u16> = match peer_address {
NetworkStreamAddress::Ip(ip) => Some(ip.port()), NetworkStreamAddress::Ip(ip) => Some(ip.port()),
#[cfg(unix)] #[cfg(unix)]
@ -128,10 +169,10 @@ impl HttpPropertyExtractor for DefaultHttpPropertyExtractor {
let stream_type = listen_properties.stream_type; let stream_type = listen_properties.stream_type;
HttpConnectionProperties { HttpConnectionProperties {
stream_type,
peer_address, peer_address,
peer_port, peer_port,
local_port, local_port,
stream_type,
} }
} }
@ -152,6 +193,25 @@ impl HttpPropertyExtractor for DefaultHttpPropertyExtractor {
} }
} }
fn listener_properties(
stream_type: NetworkStreamType,
local_address: NetworkStreamAddress,
) -> Result<HttpListenProperties, std::io::Error> {
let scheme = req_scheme_from_stream_type(stream_type);
let fallback_host = req_host_from_addr(stream_type, &local_address);
let local_port: Option<u16> = match local_address {
NetworkStreamAddress::Ip(ip) => Some(ip.port()),
#[cfg(unix)]
NetworkStreamAddress::Unix(_) => None,
};
Ok(HttpListenProperties {
scheme,
fallback_host,
local_port,
stream_type,
})
}
/// Compute the fallback address from the [`NetworkStreamListenAddress`]. If the request has no authority/host in /// Compute the fallback address from the [`NetworkStreamListenAddress`]. If the request has no authority/host in
/// its URI, and there is no [`HeaderName::HOST`] header, we fall back to this. /// its URI, and there is no [`HeaderName::HOST`] header, we fall back to this.
fn req_host_from_addr( fn req_host_from_addr(

View file

@ -179,7 +179,7 @@ pub enum NetworkStreamAddress {
impl NetworkStreamListener { impl NetworkStreamListener {
/// Accepts a connection on this listener. /// Accepts a connection on this listener.
pub async fn accept(&self) -> Result<NetworkStream, AnyError> { pub async fn accept(&self) -> Result<NetworkStream, std::io::Error> {
Ok(match self { Ok(match self {
Self::Tcp(tcp) => { Self::Tcp(tcp) => {
let (stream, _addr) = tcp.accept().await?; let (stream, _addr) = tcp.accept().await?;