mirror of
https://github.com/denoland/deno.git
synced 2024-12-24 08:09:08 -05:00
feat(ext/http): add support for unix domain sockets (#13628)
This commit is contained in:
parent
2dc5dba8ba
commit
074f53234a
5 changed files with 130 additions and 30 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -966,6 +966,7 @@ dependencies = [
|
||||||
"deno_core",
|
"deno_core",
|
||||||
"deno_websocket",
|
"deno_websocket",
|
||||||
"hyper",
|
"hyper",
|
||||||
|
"percent-encoding",
|
||||||
"ring",
|
"ring",
|
||||||
"serde",
|
"serde",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
|
|
@ -1142,6 +1142,49 @@ Deno.test(
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// https://github.com/denoland/deno/pull/13628
|
||||||
|
Deno.test(
|
||||||
|
{
|
||||||
|
ignore: Deno.build.os === "windows",
|
||||||
|
permissions: { read: true, write: true },
|
||||||
|
},
|
||||||
|
async function httpServerOnUnixSocket() {
|
||||||
|
const filePath = Deno.makeTempFileSync();
|
||||||
|
|
||||||
|
const promise = (async () => {
|
||||||
|
const listener = Deno.listen({ path: filePath, transport: "unix" });
|
||||||
|
for await (const conn of listener) {
|
||||||
|
const httpConn = Deno.serveHttp(conn);
|
||||||
|
for await (const { request, respondWith } of httpConn) {
|
||||||
|
const url = new URL(request.url);
|
||||||
|
assertEquals(url.protocol, "http+unix:");
|
||||||
|
assertEquals(decodeURIComponent(url.host), filePath);
|
||||||
|
assertEquals(url.pathname, "/path/name");
|
||||||
|
await respondWith(new Response("", { headers: {} }));
|
||||||
|
httpConn.close();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
|
// fetch() does not supports unix domain sockets yet https://github.com/denoland/deno/issues/8821
|
||||||
|
const conn = await Deno.connect({ path: filePath, transport: "unix" });
|
||||||
|
const encoder = new TextEncoder();
|
||||||
|
// The Host header must be present and empty if it is not a Internet host name (RFC2616, Section 14.23)
|
||||||
|
const body = `GET /path/name HTTP/1.1\r\nHost:\r\n\r\n`;
|
||||||
|
const writeResult = await conn.write(encoder.encode(body));
|
||||||
|
assertEquals(body.length, writeResult);
|
||||||
|
|
||||||
|
const resp = new Uint8Array(200);
|
||||||
|
const readResult = await conn.read(resp);
|
||||||
|
assertEquals(readResult, 115);
|
||||||
|
|
||||||
|
conn.close();
|
||||||
|
|
||||||
|
await promise;
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader {
|
function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader {
|
||||||
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
|
// Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
|
||||||
const tp = new TextProtoReader(r);
|
const tp = new TextProtoReader(r);
|
||||||
|
|
|
@ -19,6 +19,7 @@ bytes = "1"
|
||||||
deno_core = { version = "0.118.0", path = "../../core" }
|
deno_core = { version = "0.118.0", path = "../../core" }
|
||||||
deno_websocket = { version = "0.41.0", path = "../websocket" }
|
deno_websocket = { version = "0.41.0", path = "../websocket" }
|
||||||
hyper = { version = "0.14.9", features = ["server", "stream", "http1", "http2", "runtime"] }
|
hyper = { version = "0.14.9", features = ["server", "stream", "http1", "http2", "runtime"] }
|
||||||
|
percent-encoding = "2.1.0"
|
||||||
ring = "0.16.20"
|
ring = "0.16.20"
|
||||||
serde = { version = "1.0.129", features = ["derive"] }
|
serde = { version = "1.0.129", features = ["derive"] }
|
||||||
tokio = { version = "1.10.1", features = ["full"] }
|
tokio = { version = "1.10.1", features = ["full"] }
|
||||||
|
|
|
@ -39,6 +39,7 @@ use hyper::service::Service;
|
||||||
use hyper::Body;
|
use hyper::Body;
|
||||||
use hyper::Request;
|
use hyper::Request;
|
||||||
use hyper::Response;
|
use hyper::Response;
|
||||||
|
use percent_encoding::percent_encode;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
|
@ -49,7 +50,6 @@ use std::future::Future;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::mem::replace;
|
use std::mem::replace;
|
||||||
use std::mem::take;
|
use std::mem::take;
|
||||||
use std::net::SocketAddr;
|
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::rc::Rc;
|
use std::rc::Rc;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
@ -83,8 +83,27 @@ pub fn init() -> Extension {
|
||||||
.build()
|
.build()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub enum HttpSocketAddr {
|
||||||
|
IpSocket(std::net::SocketAddr),
|
||||||
|
#[cfg(unix)]
|
||||||
|
UnixSocket(tokio::net::unix::SocketAddr),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<std::net::SocketAddr> for HttpSocketAddr {
|
||||||
|
fn from(addr: std::net::SocketAddr) -> Self {
|
||||||
|
Self::IpSocket(addr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(unix)]
|
||||||
|
impl From<tokio::net::unix::SocketAddr> for HttpSocketAddr {
|
||||||
|
fn from(addr: tokio::net::unix::SocketAddr) -> Self {
|
||||||
|
Self::UnixSocket(addr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct HttpConnResource {
|
struct HttpConnResource {
|
||||||
addr: SocketAddr,
|
addr: HttpSocketAddr,
|
||||||
scheme: &'static str,
|
scheme: &'static str,
|
||||||
acceptors_tx: mpsc::UnboundedSender<HttpAcceptor>,
|
acceptors_tx: mpsc::UnboundedSender<HttpAcceptor>,
|
||||||
closed_fut: Shared<RemoteHandle<Result<(), Arc<hyper::Error>>>>,
|
closed_fut: Shared<RemoteHandle<Result<(), Arc<hyper::Error>>>>,
|
||||||
|
@ -92,7 +111,7 @@ struct HttpConnResource {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HttpConnResource {
|
impl HttpConnResource {
|
||||||
fn new<S>(io: S, scheme: &'static str, addr: SocketAddr) -> Self
|
fn new<S>(io: S, scheme: &'static str, addr: HttpSocketAddr) -> Self
|
||||||
where
|
where
|
||||||
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||||
{
|
{
|
||||||
|
@ -172,8 +191,8 @@ impl HttpConnResource {
|
||||||
self.scheme
|
self.scheme
|
||||||
}
|
}
|
||||||
|
|
||||||
fn addr(&self) -> SocketAddr {
|
fn addr(&self) -> &HttpSocketAddr {
|
||||||
self.addr
|
&self.addr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -188,16 +207,17 @@ impl Resource for HttpConnResource {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Creates a new HttpConn resource which uses `io` as its transport.
|
/// Creates a new HttpConn resource which uses `io` as its transport.
|
||||||
pub fn http_create_conn_resource<S>(
|
pub fn http_create_conn_resource<S, A>(
|
||||||
state: &mut OpState,
|
state: &mut OpState,
|
||||||
io: S,
|
io: S,
|
||||||
addr: SocketAddr,
|
addr: A,
|
||||||
scheme: &'static str,
|
scheme: &'static str,
|
||||||
) -> Result<ResourceId, AnyError>
|
) -> Result<ResourceId, AnyError>
|
||||||
where
|
where
|
||||||
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
|
||||||
|
A: Into<HttpSocketAddr>,
|
||||||
{
|
{
|
||||||
let conn = HttpConnResource::new(io, scheme, addr);
|
let conn = HttpConnResource::new(io, scheme, addr.into());
|
||||||
let rid = state.resource_table.add(conn);
|
let rid = state.resource_table.add(conn);
|
||||||
Ok(rid)
|
Ok(rid)
|
||||||
}
|
}
|
||||||
|
@ -375,30 +395,49 @@ async fn op_http_accept(
|
||||||
fn req_url(
|
fn req_url(
|
||||||
req: &hyper::Request<hyper::Body>,
|
req: &hyper::Request<hyper::Body>,
|
||||||
scheme: &'static str,
|
scheme: &'static str,
|
||||||
addr: SocketAddr,
|
addr: &HttpSocketAddr,
|
||||||
) -> String {
|
) -> String {
|
||||||
let host: Cow<str> = if let Some(auth) = req.uri().authority() {
|
let host: Cow<str> = match addr {
|
||||||
match addr.port() {
|
HttpSocketAddr::IpSocket(addr) => {
|
||||||
443 if scheme == "https" => Cow::Borrowed(auth.host()),
|
if let Some(auth) = req.uri().authority() {
|
||||||
80 if scheme == "http" => Cow::Borrowed(auth.host()),
|
match addr.port() {
|
||||||
_ => Cow::Borrowed(auth.as_str()), // Includes port number.
|
443 if scheme == "https" => Cow::Borrowed(auth.host()),
|
||||||
|
80 if scheme == "http" => Cow::Borrowed(auth.host()),
|
||||||
|
_ => Cow::Borrowed(auth.as_str()), // Includes port number.
|
||||||
|
}
|
||||||
|
} else if let Some(host) = req.uri().host() {
|
||||||
|
Cow::Borrowed(host)
|
||||||
|
} else if let Some(host) = req.headers().get("HOST") {
|
||||||
|
match host.to_str() {
|
||||||
|
Ok(host) => Cow::Borrowed(host),
|
||||||
|
Err(_) => Cow::Owned(
|
||||||
|
host
|
||||||
|
.as_bytes()
|
||||||
|
.iter()
|
||||||
|
.cloned()
|
||||||
|
.map(char::from)
|
||||||
|
.collect::<String>(),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Cow::Owned(addr.to_string())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if let Some(host) = req.uri().host() {
|
// There is no standard way for unix domain socket URLs
|
||||||
Cow::Borrowed(host)
|
// nginx and nodejs request use http://unix:[socket_path]:/ but it is not a valid URL
|
||||||
} else if let Some(host) = req.headers().get("HOST") {
|
// httpie uses http+unix://[percent_encoding_of_path]/ which we follow
|
||||||
match host.to_str() {
|
#[cfg(unix)]
|
||||||
Ok(host) => Cow::Borrowed(host),
|
HttpSocketAddr::UnixSocket(addr) => Cow::Owned(
|
||||||
Err(_) => Cow::Owned(
|
percent_encode(
|
||||||
host
|
addr
|
||||||
.as_bytes()
|
.as_pathname()
|
||||||
.iter()
|
.and_then(|x| x.to_str())
|
||||||
.cloned()
|
.unwrap_or_default()
|
||||||
.map(char::from)
|
.as_bytes(),
|
||||||
.collect::<String>(),
|
percent_encoding::NON_ALPHANUMERIC,
|
||||||
),
|
)
|
||||||
}
|
.to_string(),
|
||||||
} else {
|
),
|
||||||
Cow::Owned(addr.to_string())
|
|
||||||
};
|
};
|
||||||
let path = req.uri().path_and_query().map_or("/", |p| p.as_str());
|
let path = req.uri().path_and_query().map_or("/", |p| p.as_str());
|
||||||
[scheme, "://", &host, path].concat()
|
[scheme, "://", &host, path].concat()
|
||||||
|
|
|
@ -8,6 +8,7 @@ use deno_core::OpState;
|
||||||
use deno_core::ResourceId;
|
use deno_core::ResourceId;
|
||||||
use deno_http::http_create_conn_resource;
|
use deno_http::http_create_conn_resource;
|
||||||
use deno_net::io::TcpStreamResource;
|
use deno_net::io::TcpStreamResource;
|
||||||
|
use deno_net::io::UnixStreamResource;
|
||||||
use deno_net::ops_tls::TlsStreamResource;
|
use deno_net::ops_tls::TlsStreamResource;
|
||||||
|
|
||||||
pub fn init() -> Extension {
|
pub fn init() -> Extension {
|
||||||
|
@ -45,5 +46,20 @@ fn op_http_start(
|
||||||
return http_create_conn_resource(state, tls_stream, addr, "https");
|
return http_create_conn_resource(state, tls_stream, addr, "https");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(unix)]
|
||||||
|
if let Ok(resource_rc) = state
|
||||||
|
.resource_table
|
||||||
|
.take::<UnixStreamResource>(tcp_stream_rid)
|
||||||
|
{
|
||||||
|
super::check_unstable(state, "Deno.serveHttp");
|
||||||
|
|
||||||
|
let resource = Rc::try_unwrap(resource_rc)
|
||||||
|
.expect("Only a single use of this resource should happen");
|
||||||
|
let (read_half, write_half) = resource.into_inner();
|
||||||
|
let unix_stream = read_half.reunite(write_half)?;
|
||||||
|
let addr = unix_stream.local_addr()?;
|
||||||
|
return http_create_conn_resource(state, unix_stream, addr, "http+unix");
|
||||||
|
}
|
||||||
|
|
||||||
Err(bad_resource_id())
|
Err(bad_resource_id())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue