1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-14 18:08:52 -05:00
denoland-deno/runtime/ops/http.rs

198 lines
5.8 KiB
Rust

// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use std::cell::RefCell;
use std::rc::Rc;
use deno_core::error::bad_resource;
use deno_core::error::bad_resource_id;
use deno_core::error::custom_error;
use deno_core::error::AnyError;
use deno_core::op;
use deno_core::Extension;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::ResourceId;
use deno_core::ZeroCopyBuf;
use deno_http::http_create_conn_resource;
use deno_http::HttpRequestReader;
use deno_http::HttpStreamResource;
use deno_net::io::TcpStreamResource;
use deno_net::ops_tls::TlsStream;
use deno_net::ops_tls::TlsStreamResource;
use hyper::upgrade::Parts;
use serde::Serialize;
use tokio::net::TcpStream;
#[cfg(unix)]
use deno_net::io::UnixStreamResource;
#[cfg(unix)]
use tokio::net::UnixStream;
pub fn init() -> Extension {
Extension::builder("deno_http_runtime")
.ops(vec![
op_http_start::decl(),
op_http_upgrade::decl(),
op_flash_upgrade_http::decl(),
])
.build()
}
#[op]
fn op_http_start(
state: &mut OpState,
tcp_stream_rid: ResourceId,
) -> Result<ResourceId, AnyError> {
if let Ok(resource_rc) = state
.resource_table
.take::<TcpStreamResource>(tcp_stream_rid)
{
// This TCP connection might be used somewhere else. If it's the case, we cannot proceed with the
// process of starting a HTTP server on top of this TCP connection, so we just return a bad
// resource error. See also: https://github.com/denoland/deno/pull/16242
let resource = Rc::try_unwrap(resource_rc)
.map_err(|_| bad_resource("TCP stream is currently in use"))?;
let (read_half, write_half) = resource.into_inner();
let tcp_stream = read_half.reunite(write_half)?;
let addr = tcp_stream.local_addr()?;
return http_create_conn_resource(state, tcp_stream, addr, "http");
}
if let Ok(resource_rc) = state
.resource_table
.take::<TlsStreamResource>(tcp_stream_rid)
{
// This TLS connection might be used somewhere else. If it's the case, we cannot proceed with the
// process of starting a HTTP server on top of this TLS connection, so we just return a bad
// resource error. See also: https://github.com/denoland/deno/pull/16242
let resource = Rc::try_unwrap(resource_rc)
.map_err(|_| bad_resource("TLS stream is currently in use"))?;
let (read_half, write_half) = resource.into_inner();
let tls_stream = read_half.reunite(write_half);
let addr = tls_stream.get_ref().0.local_addr()?;
return http_create_conn_resource(state, tls_stream, addr, "https");
}
#[cfg(unix)]
if let Ok(resource_rc) = state
.resource_table
.take::<deno_net::io::UnixStreamResource>(tcp_stream_rid)
{
super::check_unstable(state, "Deno.serveHttp");
// This UNIX socket might be used somewhere else. If it's the case, we cannot proceed with the
// process of starting a HTTP server on top of this UNIX socket, so we just return a bad
// resource error. See also: https://github.com/denoland/deno/pull/16242
let resource = Rc::try_unwrap(resource_rc)
.map_err(|_| bad_resource("UNIX stream is currently in use"))?;
let (read_half, write_half) = resource.into_inner();
let unix_stream = read_half.reunite(write_half)?;
let addr = unix_stream.local_addr()?;
return http_create_conn_resource(state, unix_stream, addr, "http+unix");
}
Err(bad_resource_id())
}
#[op]
fn op_flash_upgrade_http(
state: &mut OpState,
token: u32,
server_id: u32,
) -> Result<deno_core::ResourceId, AnyError> {
let flash_ctx = state.borrow_mut::<deno_flash::FlashContext>();
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
let tcp_stream = deno_flash::detach_socket(ctx, token)?;
Ok(
state
.resource_table
.add(TcpStreamResource::new(tcp_stream.into_split())),
)
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct HttpUpgradeResult {
conn_rid: ResourceId,
conn_type: &'static str,
read_buf: ZeroCopyBuf,
}
#[op]
async fn op_http_upgrade(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
_: (),
) -> Result<HttpUpgradeResult, AnyError> {
let stream = state
.borrow_mut()
.resource_table
.get::<HttpStreamResource>(rid)?;
let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await;
let request = match &mut *rd {
HttpRequestReader::Headers(request) => request,
_ => {
return Err(custom_error(
"Http",
"cannot upgrade because request body was used",
))
}
};
let transport = hyper::upgrade::on(request).await?;
let transport = match transport.downcast::<TcpStream>() {
Ok(Parts {
io: tcp_stream,
read_buf,
..
}) => {
return Ok(HttpUpgradeResult {
conn_type: "tcp",
conn_rid: state
.borrow_mut()
.resource_table
.add(TcpStreamResource::new(tcp_stream.into_split())),
read_buf: read_buf.to_vec().into(),
});
}
Err(transport) => transport,
};
#[cfg(unix)]
let transport = match transport.downcast::<UnixStream>() {
Ok(Parts {
io: unix_stream,
read_buf,
..
}) => {
return Ok(HttpUpgradeResult {
conn_type: "unix",
conn_rid: state
.borrow_mut()
.resource_table
.add(UnixStreamResource::new(unix_stream.into_split())),
read_buf: read_buf.to_vec().into(),
});
}
Err(transport) => transport,
};
match transport.downcast::<TlsStream>() {
Ok(Parts {
io: tls_stream,
read_buf,
..
}) => Ok(HttpUpgradeResult {
conn_type: "tls",
conn_rid: state
.borrow_mut()
.resource_table
.add(TlsStreamResource::new(tls_stream.into_split())),
read_buf: read_buf.to_vec().into(),
}),
Err(_) => Err(custom_error(
"Http",
"encountered unsupported transport while upgrading",
)),
}
}