mirror of
https://github.com/denoland/deno.git
synced 2025-01-18 03:44:05 -05:00
386d5c8310
Also removes permissions being passed in for node resolution. It was completely useless because we only checked it for reading package.json files, but Deno reading package.json files for resolution is perfectly fine. My guess is this is also a perf improvement because Deno is doing less work.
905 lines
24 KiB
Rust
905 lines
24 KiB
Rust
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
|
|
use crate::stream::WebSocketStream;
|
|
use bytes::Bytes;
|
|
use deno_core::anyhow::bail;
|
|
use deno_core::error::invalid_hostname;
|
|
use deno_core::error::type_error;
|
|
use deno_core::error::AnyError;
|
|
use deno_core::futures::TryFutureExt;
|
|
use deno_core::op2;
|
|
use deno_core::unsync::spawn;
|
|
use deno_core::url;
|
|
use deno_core::AsyncMutFuture;
|
|
use deno_core::AsyncRefCell;
|
|
use deno_core::ByteString;
|
|
use deno_core::CancelHandle;
|
|
use deno_core::CancelTryFuture;
|
|
use deno_core::JsBuffer;
|
|
use deno_core::OpState;
|
|
use deno_core::RcRef;
|
|
use deno_core::Resource;
|
|
use deno_core::ResourceId;
|
|
use deno_core::ToJsBuffer;
|
|
use deno_net::raw::NetworkStream;
|
|
use deno_tls::create_client_config;
|
|
use deno_tls::rustls::ClientConfig;
|
|
use deno_tls::rustls::ClientConnection;
|
|
use deno_tls::RootCertStoreProvider;
|
|
use deno_tls::SocketUse;
|
|
use deno_tls::TlsKeys;
|
|
use http::header::CONNECTION;
|
|
use http::header::UPGRADE;
|
|
use http::HeaderName;
|
|
use http::HeaderValue;
|
|
use http::Method;
|
|
use http::Request;
|
|
use http::StatusCode;
|
|
use http::Uri;
|
|
use once_cell::sync::Lazy;
|
|
use rustls_tokio_stream::rustls::RootCertStore;
|
|
use rustls_tokio_stream::rustls::ServerName;
|
|
use rustls_tokio_stream::TlsStream;
|
|
use serde::Serialize;
|
|
use std::borrow::Cow;
|
|
use std::cell::Cell;
|
|
use std::cell::RefCell;
|
|
use std::convert::TryFrom;
|
|
use std::fmt;
|
|
use std::future::Future;
|
|
use std::num::NonZeroUsize;
|
|
use std::path::PathBuf;
|
|
use std::rc::Rc;
|
|
use std::sync::Arc;
|
|
use tokio::io::AsyncRead;
|
|
use tokio::io::AsyncWrite;
|
|
use tokio::io::ReadHalf;
|
|
use tokio::io::WriteHalf;
|
|
use tokio::net::TcpStream;
|
|
|
|
use fastwebsockets::CloseCode;
|
|
use fastwebsockets::FragmentCollectorRead;
|
|
use fastwebsockets::Frame;
|
|
use fastwebsockets::OpCode;
|
|
use fastwebsockets::Role;
|
|
use fastwebsockets::WebSocket;
|
|
use fastwebsockets::WebSocketWrite;
|
|
|
|
mod stream;
|
|
|
|
static USE_WRITEV: Lazy<bool> = Lazy::new(|| {
|
|
let enable = std::env::var("DENO_USE_WRITEV").ok();
|
|
|
|
if let Some(val) = enable {
|
|
return !val.is_empty();
|
|
}
|
|
|
|
false
|
|
});
|
|
|
|
#[derive(Clone)]
|
|
pub struct WsRootStoreProvider(Option<Arc<dyn RootCertStoreProvider>>);
|
|
|
|
impl WsRootStoreProvider {
|
|
pub fn get_or_try_init(&self) -> Result<Option<RootCertStore>, AnyError> {
|
|
Ok(match &self.0 {
|
|
Some(provider) => Some(provider.get_or_try_init()?.clone()),
|
|
None => None,
|
|
})
|
|
}
|
|
}
|
|
|
|
#[derive(Clone)]
|
|
pub struct WsUserAgent(pub String);
|
|
|
|
pub trait WebSocketPermissions {
|
|
fn check_net_url(
|
|
&mut self,
|
|
_url: &url::Url,
|
|
_api_name: &str,
|
|
) -> Result<(), AnyError>;
|
|
}
|
|
|
|
impl WebSocketPermissions for deno_permissions::PermissionsContainer {
|
|
#[inline(always)]
|
|
fn check_net_url(
|
|
&mut self,
|
|
url: &url::Url,
|
|
api_name: &str,
|
|
) -> Result<(), AnyError> {
|
|
deno_permissions::PermissionsContainer::check_net_url(self, url, api_name)
|
|
}
|
|
}
|
|
|
|
/// `UnsafelyIgnoreCertificateErrors` is a wrapper struct so it can be placed inside `GothamState`;
|
|
/// using type alias for a `Option<Vec<String>>` could work, but there's a high chance
|
|
/// that there might be another type alias pointing to a `Option<Vec<String>>`, which
|
|
/// would override previously used alias.
|
|
pub struct UnsafelyIgnoreCertificateErrors(Option<Vec<String>>);
|
|
|
|
pub struct WsCancelResource(Rc<CancelHandle>);
|
|
|
|
impl Resource for WsCancelResource {
|
|
fn name(&self) -> Cow<str> {
|
|
"webSocketCancel".into()
|
|
}
|
|
|
|
fn close(self: Rc<Self>) {
|
|
self.0.cancel()
|
|
}
|
|
}
|
|
|
|
// This op is needed because creating a WS instance in JavaScript is a sync
|
|
// operation and should throw error when permissions are not fulfilled,
|
|
// but actual op that connects WS is async.
|
|
#[op2]
|
|
#[smi]
|
|
pub fn op_ws_check_permission_and_cancel_handle<WP>(
|
|
state: &mut OpState,
|
|
#[string] api_name: String,
|
|
#[string] url: String,
|
|
cancel_handle: bool,
|
|
) -> Result<Option<ResourceId>, AnyError>
|
|
where
|
|
WP: WebSocketPermissions + 'static,
|
|
{
|
|
state
|
|
.borrow_mut::<WP>()
|
|
.check_net_url(&url::Url::parse(&url)?, &api_name)?;
|
|
|
|
if cancel_handle {
|
|
let rid = state
|
|
.resource_table
|
|
.add(WsCancelResource(CancelHandle::new_rc()));
|
|
Ok(Some(rid))
|
|
} else {
|
|
Ok(None)
|
|
}
|
|
}
|
|
|
|
#[derive(Serialize)]
|
|
#[serde(rename_all = "camelCase")]
|
|
pub struct CreateResponse {
|
|
rid: ResourceId,
|
|
protocol: String,
|
|
extensions: String,
|
|
}
|
|
|
|
async fn handshake_websocket(
|
|
state: &Rc<RefCell<OpState>>,
|
|
uri: &Uri,
|
|
protocols: &str,
|
|
headers: Option<Vec<(ByteString, ByteString)>>,
|
|
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> {
|
|
let mut request = Request::builder().method(Method::GET).uri(
|
|
uri
|
|
.path_and_query()
|
|
.ok_or(type_error("Missing path in url".to_string()))?
|
|
.as_str(),
|
|
);
|
|
|
|
let authority = uri.authority().unwrap().as_str();
|
|
let host = authority
|
|
.find('@')
|
|
.map(|idx| authority.split_at(idx + 1).1)
|
|
.unwrap_or_else(|| authority);
|
|
request = request
|
|
.header("Host", host)
|
|
.header(UPGRADE, "websocket")
|
|
.header(CONNECTION, "Upgrade")
|
|
.header(
|
|
"Sec-WebSocket-Key",
|
|
fastwebsockets::handshake::generate_key(),
|
|
);
|
|
|
|
let user_agent = state.borrow().borrow::<WsUserAgent>().0.clone();
|
|
request =
|
|
populate_common_request_headers(request, &user_agent, protocols, &headers)?;
|
|
|
|
let request = request.body(http_body_util::Empty::new())?;
|
|
let domain = &uri.host().unwrap().to_string();
|
|
let port = &uri.port_u16().unwrap_or(match uri.scheme_str() {
|
|
Some("wss") => 443,
|
|
Some("ws") => 80,
|
|
_ => unreachable!(),
|
|
});
|
|
let addr = format!("{domain}:{port}");
|
|
|
|
let res = match uri.scheme_str() {
|
|
Some("ws") => handshake_http1_ws(request, &addr).await?,
|
|
Some("wss") => {
|
|
match handshake_http1_wss(state, request, domain, &addr).await {
|
|
Ok(res) => res,
|
|
Err(_) => {
|
|
handshake_http2_wss(
|
|
state,
|
|
uri,
|
|
authority,
|
|
&user_agent,
|
|
protocols,
|
|
domain,
|
|
&headers,
|
|
&addr,
|
|
)
|
|
.await?
|
|
}
|
|
}
|
|
}
|
|
_ => unreachable!(),
|
|
};
|
|
Ok(res)
|
|
}
|
|
|
|
async fn handshake_http1_ws(
|
|
request: Request<http_body_util::Empty<Bytes>>,
|
|
addr: &String,
|
|
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> {
|
|
let tcp_socket = TcpStream::connect(addr).await?;
|
|
handshake_connection(request, tcp_socket).await
|
|
}
|
|
|
|
async fn handshake_http1_wss(
|
|
state: &Rc<RefCell<OpState>>,
|
|
request: Request<http_body_util::Empty<Bytes>>,
|
|
domain: &str,
|
|
addr: &str,
|
|
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> {
|
|
let tcp_socket = TcpStream::connect(addr).await?;
|
|
let tls_config = create_ws_client_config(state, SocketUse::Http1Only)?;
|
|
let dnsname =
|
|
ServerName::try_from(domain).map_err(|_| invalid_hostname(domain))?;
|
|
let mut tls_connector = TlsStream::new_client_side(
|
|
tcp_socket,
|
|
ClientConnection::new(tls_config.into(), dnsname)?,
|
|
NonZeroUsize::new(65536),
|
|
);
|
|
// If we can bail on an http/1.1 ALPN mismatch here, we can avoid doing extra work
|
|
tls_connector.handshake().await?;
|
|
handshake_connection(request, tls_connector).await
|
|
}
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
async fn handshake_http2_wss(
|
|
state: &Rc<RefCell<OpState>>,
|
|
uri: &Uri,
|
|
authority: &str,
|
|
user_agent: &str,
|
|
protocols: &str,
|
|
domain: &str,
|
|
headers: &Option<Vec<(ByteString, ByteString)>>,
|
|
addr: &str,
|
|
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> {
|
|
let tcp_socket = TcpStream::connect(addr).await?;
|
|
let tls_config = create_ws_client_config(state, SocketUse::Http2Only)?;
|
|
let dnsname =
|
|
ServerName::try_from(domain).map_err(|_| invalid_hostname(domain))?;
|
|
// We need to better expose the underlying errors here
|
|
let mut tls_connector = TlsStream::new_client_side(
|
|
tcp_socket,
|
|
ClientConnection::new(tls_config.into(), dnsname)?,
|
|
None,
|
|
);
|
|
let handshake = tls_connector.handshake().await?;
|
|
if handshake.alpn.is_none() {
|
|
bail!("Didn't receive h2 alpn, aborting connection");
|
|
}
|
|
let h2 = h2::client::Builder::new();
|
|
let (mut send, conn) = h2.handshake::<_, Bytes>(tls_connector).await?;
|
|
spawn(conn);
|
|
let mut request = Request::builder();
|
|
request = request.method(Method::CONNECT);
|
|
let uri = Uri::builder()
|
|
.authority(authority)
|
|
.path_and_query(uri.path_and_query().unwrap().as_str())
|
|
.scheme("https")
|
|
.build()?;
|
|
request = request.uri(uri);
|
|
request =
|
|
populate_common_request_headers(request, user_agent, protocols, headers)?;
|
|
request = request.extension(h2::ext::Protocol::from("websocket"));
|
|
let (resp, send) = send.send_request(request.body(())?, false)?;
|
|
let resp = resp.await?;
|
|
if resp.status() != StatusCode::OK {
|
|
bail!("Invalid status code: {}", resp.status());
|
|
}
|
|
let (http::response::Parts { headers, .. }, recv) = resp.into_parts();
|
|
let mut stream = WebSocket::after_handshake(
|
|
WebSocketStream::new(stream::WsStreamKind::H2(send, recv), None),
|
|
Role::Client,
|
|
);
|
|
// We currently don't support vectored writes in the H2 streams
|
|
stream.set_writev(false);
|
|
// TODO(mmastrac): we should be able to use a zero masking key over HTTPS
|
|
// stream.set_auto_apply_mask(false);
|
|
Ok((stream, headers))
|
|
}
|
|
|
|
async fn handshake_connection<
|
|
S: AsyncRead + AsyncWrite + Send + Unpin + 'static,
|
|
>(
|
|
request: Request<http_body_util::Empty<Bytes>>,
|
|
socket: S,
|
|
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), AnyError> {
|
|
let (upgraded, response) =
|
|
fastwebsockets::handshake::client(&LocalExecutor, request, socket).await?;
|
|
|
|
let upgraded = upgraded.into_inner();
|
|
let stream =
|
|
WebSocketStream::new(stream::WsStreamKind::Upgraded(upgraded), None);
|
|
let stream = WebSocket::after_handshake(stream, Role::Client);
|
|
|
|
Ok((stream, response.into_parts().0.headers))
|
|
}
|
|
|
|
pub fn create_ws_client_config(
|
|
state: &Rc<RefCell<OpState>>,
|
|
socket_use: SocketUse,
|
|
) -> Result<ClientConfig, AnyError> {
|
|
let unsafely_ignore_certificate_errors: Option<Vec<String>> = state
|
|
.borrow()
|
|
.try_borrow::<UnsafelyIgnoreCertificateErrors>()
|
|
.and_then(|it| it.0.clone());
|
|
let root_cert_store = state
|
|
.borrow()
|
|
.borrow::<WsRootStoreProvider>()
|
|
.get_or_try_init()?;
|
|
|
|
create_client_config(
|
|
root_cert_store,
|
|
vec![],
|
|
unsafely_ignore_certificate_errors,
|
|
TlsKeys::Null,
|
|
socket_use,
|
|
)
|
|
}
|
|
|
|
/// Headers common to both http/1.1 and h2 requests.
|
|
fn populate_common_request_headers(
|
|
mut request: http::request::Builder,
|
|
user_agent: &str,
|
|
protocols: &str,
|
|
headers: &Option<Vec<(ByteString, ByteString)>>,
|
|
) -> Result<http::request::Builder, AnyError> {
|
|
request = request
|
|
.header("User-Agent", user_agent)
|
|
.header("Sec-WebSocket-Version", "13");
|
|
|
|
if !protocols.is_empty() {
|
|
request = request.header("Sec-WebSocket-Protocol", protocols);
|
|
}
|
|
|
|
if let Some(headers) = headers {
|
|
for (key, value) in headers {
|
|
let name = HeaderName::from_bytes(key)
|
|
.map_err(|err| type_error(err.to_string()))?;
|
|
let v = HeaderValue::from_bytes(value)
|
|
.map_err(|err| type_error(err.to_string()))?;
|
|
|
|
let is_disallowed_header = matches!(
|
|
name,
|
|
http::header::HOST
|
|
| http::header::SEC_WEBSOCKET_ACCEPT
|
|
| http::header::SEC_WEBSOCKET_EXTENSIONS
|
|
| http::header::SEC_WEBSOCKET_KEY
|
|
| http::header::SEC_WEBSOCKET_PROTOCOL
|
|
| http::header::SEC_WEBSOCKET_VERSION
|
|
| http::header::UPGRADE
|
|
| http::header::CONNECTION
|
|
);
|
|
if !is_disallowed_header {
|
|
request = request.header(name, v);
|
|
}
|
|
}
|
|
}
|
|
Ok(request)
|
|
}
|
|
|
|
#[op2(async)]
|
|
#[serde]
|
|
pub async fn op_ws_create<WP>(
|
|
state: Rc<RefCell<OpState>>,
|
|
#[string] api_name: String,
|
|
#[string] url: String,
|
|
#[string] protocols: String,
|
|
#[smi] cancel_handle: Option<ResourceId>,
|
|
#[serde] headers: Option<Vec<(ByteString, ByteString)>>,
|
|
) -> Result<CreateResponse, AnyError>
|
|
where
|
|
WP: WebSocketPermissions + 'static,
|
|
{
|
|
{
|
|
let mut s = state.borrow_mut();
|
|
s.borrow_mut::<WP>()
|
|
.check_net_url(&url::Url::parse(&url)?, &api_name)
|
|
.expect(
|
|
"Permission check should have been done in op_ws_check_permission",
|
|
);
|
|
}
|
|
|
|
let cancel_resource = if let Some(cancel_rid) = cancel_handle {
|
|
let r = state
|
|
.borrow_mut()
|
|
.resource_table
|
|
.get::<WsCancelResource>(cancel_rid)?;
|
|
Some(r.0.clone())
|
|
} else {
|
|
None
|
|
};
|
|
|
|
let uri: Uri = url.parse()?;
|
|
|
|
let handshake = handshake_websocket(&state, &uri, &protocols, headers)
|
|
.map_err(|err| {
|
|
AnyError::from(DomExceptionNetworkError::new(&format!(
|
|
"failed to connect to WebSocket: {err}"
|
|
)))
|
|
});
|
|
let (stream, response) = match cancel_resource {
|
|
Some(rc) => handshake.try_or_cancel(rc).await,
|
|
None => handshake.await,
|
|
}?;
|
|
|
|
if let Some(cancel_rid) = cancel_handle {
|
|
if let Ok(res) = state.borrow_mut().resource_table.take_any(cancel_rid) {
|
|
res.close();
|
|
}
|
|
}
|
|
|
|
let mut state = state.borrow_mut();
|
|
let rid = state.resource_table.add(ServerWebSocket::new(stream));
|
|
|
|
let protocol = match response.get("Sec-WebSocket-Protocol") {
|
|
Some(header) => header.to_str().unwrap(),
|
|
None => "",
|
|
};
|
|
let extensions = response
|
|
.get_all("Sec-WebSocket-Extensions")
|
|
.iter()
|
|
.map(|header| header.to_str().unwrap())
|
|
.collect::<String>();
|
|
Ok(CreateResponse {
|
|
rid,
|
|
protocol: protocol.to_string(),
|
|
extensions,
|
|
})
|
|
}
|
|
|
|
#[repr(u16)]
|
|
pub enum MessageKind {
|
|
Text = 0,
|
|
Binary = 1,
|
|
Pong = 2,
|
|
Error = 3,
|
|
ClosedDefault = 1005,
|
|
}
|
|
|
|
/// To avoid locks, we keep as much as we can inside of [`Cell`]s.
|
|
pub struct ServerWebSocket {
|
|
buffered: Cell<usize>,
|
|
error: Cell<Option<String>>,
|
|
errored: Cell<bool>,
|
|
closed: Cell<bool>,
|
|
buffer: Cell<Option<Vec<u8>>>,
|
|
string: Cell<Option<String>>,
|
|
ws_read: AsyncRefCell<FragmentCollectorRead<ReadHalf<WebSocketStream>>>,
|
|
ws_write: AsyncRefCell<WebSocketWrite<WriteHalf<WebSocketStream>>>,
|
|
}
|
|
|
|
impl ServerWebSocket {
|
|
fn new(ws: WebSocket<WebSocketStream>) -> Self {
|
|
let (ws_read, ws_write) = ws.split(tokio::io::split);
|
|
Self {
|
|
buffered: Cell::new(0),
|
|
error: Cell::new(None),
|
|
errored: Cell::new(false),
|
|
closed: Cell::new(false),
|
|
buffer: Cell::new(None),
|
|
string: Cell::new(None),
|
|
ws_read: AsyncRefCell::new(FragmentCollectorRead::new(ws_read)),
|
|
ws_write: AsyncRefCell::new(ws_write),
|
|
}
|
|
}
|
|
|
|
fn set_error(&self, error: Option<String>) {
|
|
if let Some(error) = error {
|
|
self.error.set(Some(error));
|
|
self.errored.set(true);
|
|
} else {
|
|
self.error.set(None);
|
|
self.errored.set(false);
|
|
}
|
|
}
|
|
|
|
/// Reserve a lock, but don't wait on it. This gets us our place in line.
|
|
fn reserve_lock(
|
|
self: &Rc<Self>,
|
|
) -> AsyncMutFuture<WebSocketWrite<WriteHalf<WebSocketStream>>> {
|
|
RcRef::map(self, |r| &r.ws_write).borrow_mut()
|
|
}
|
|
|
|
#[inline]
|
|
async fn write_frame(
|
|
self: &Rc<Self>,
|
|
lock: AsyncMutFuture<WebSocketWrite<WriteHalf<WebSocketStream>>>,
|
|
frame: Frame<'_>,
|
|
) -> Result<(), AnyError> {
|
|
let mut ws = lock.await;
|
|
if ws.is_closed() {
|
|
return Ok(());
|
|
}
|
|
ws.write_frame(frame)
|
|
.await
|
|
.map_err(|err| type_error(err.to_string()))?;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl Resource for ServerWebSocket {
|
|
fn name(&self) -> Cow<str> {
|
|
"serverWebSocket".into()
|
|
}
|
|
}
|
|
|
|
pub fn ws_create_server_stream(
|
|
state: &mut OpState,
|
|
transport: NetworkStream,
|
|
read_buf: Bytes,
|
|
) -> Result<ResourceId, AnyError> {
|
|
let mut ws = WebSocket::after_handshake(
|
|
WebSocketStream::new(
|
|
stream::WsStreamKind::Network(transport),
|
|
Some(read_buf),
|
|
),
|
|
Role::Server,
|
|
);
|
|
ws.set_writev(*USE_WRITEV);
|
|
ws.set_auto_close(true);
|
|
ws.set_auto_pong(true);
|
|
|
|
let rid = state.resource_table.add(ServerWebSocket::new(ws));
|
|
Ok(rid)
|
|
}
|
|
|
|
fn send_binary(state: &mut OpState, rid: ResourceId, data: &[u8]) {
|
|
let resource = state.resource_table.get::<ServerWebSocket>(rid).unwrap();
|
|
let data = data.to_vec();
|
|
let len = data.len();
|
|
resource.buffered.set(resource.buffered.get() + len);
|
|
let lock = resource.reserve_lock();
|
|
deno_core::unsync::spawn(async move {
|
|
if let Err(err) = resource
|
|
.write_frame(lock, Frame::new(true, OpCode::Binary, None, data.into()))
|
|
.await
|
|
{
|
|
resource.set_error(Some(err.to_string()));
|
|
} else {
|
|
resource.buffered.set(resource.buffered.get() - len);
|
|
}
|
|
});
|
|
}
|
|
|
|
#[op2]
|
|
pub fn op_ws_send_binary(
|
|
state: &mut OpState,
|
|
#[smi] rid: ResourceId,
|
|
#[anybuffer] data: &[u8],
|
|
) {
|
|
send_binary(state, rid, data)
|
|
}
|
|
|
|
#[op2(fast)]
|
|
pub fn op_ws_send_binary_ab(
|
|
state: &mut OpState,
|
|
#[smi] rid: ResourceId,
|
|
#[arraybuffer] data: &[u8],
|
|
) {
|
|
send_binary(state, rid, data)
|
|
}
|
|
|
|
#[op2(fast)]
|
|
pub fn op_ws_send_text(
|
|
state: &mut OpState,
|
|
#[smi] rid: ResourceId,
|
|
#[string] data: String,
|
|
) {
|
|
let resource = state.resource_table.get::<ServerWebSocket>(rid).unwrap();
|
|
let len = data.len();
|
|
resource.buffered.set(resource.buffered.get() + len);
|
|
let lock = resource.reserve_lock();
|
|
deno_core::unsync::spawn(async move {
|
|
if let Err(err) = resource
|
|
.write_frame(
|
|
lock,
|
|
Frame::new(true, OpCode::Text, None, data.into_bytes().into()),
|
|
)
|
|
.await
|
|
{
|
|
resource.set_error(Some(err.to_string()));
|
|
} else {
|
|
resource.buffered.set(resource.buffered.get() - len);
|
|
}
|
|
});
|
|
}
|
|
|
|
/// Async version of send. Does not update buffered amount as we rely on the socket itself for backpressure.
|
|
#[op2(async)]
|
|
pub async fn op_ws_send_binary_async(
|
|
state: Rc<RefCell<OpState>>,
|
|
#[smi] rid: ResourceId,
|
|
#[buffer] data: JsBuffer,
|
|
) -> Result<(), AnyError> {
|
|
let resource = state
|
|
.borrow_mut()
|
|
.resource_table
|
|
.get::<ServerWebSocket>(rid)?;
|
|
let data = data.to_vec();
|
|
let lock = resource.reserve_lock();
|
|
resource
|
|
.write_frame(lock, Frame::new(true, OpCode::Binary, None, data.into()))
|
|
.await
|
|
}
|
|
|
|
/// Async version of send. Does not update buffered amount as we rely on the socket itself for backpressure.
|
|
#[op2(async)]
|
|
pub async fn op_ws_send_text_async(
|
|
state: Rc<RefCell<OpState>>,
|
|
#[smi] rid: ResourceId,
|
|
#[string] data: String,
|
|
) -> Result<(), AnyError> {
|
|
let resource = state
|
|
.borrow_mut()
|
|
.resource_table
|
|
.get::<ServerWebSocket>(rid)?;
|
|
let lock = resource.reserve_lock();
|
|
resource
|
|
.write_frame(
|
|
lock,
|
|
Frame::new(true, OpCode::Text, None, data.into_bytes().into()),
|
|
)
|
|
.await
|
|
}
|
|
|
|
const EMPTY_PAYLOAD: &[u8] = &[];
|
|
|
|
#[op2(fast)]
|
|
#[smi]
|
|
pub fn op_ws_get_buffered_amount(
|
|
state: &mut OpState,
|
|
#[smi] rid: ResourceId,
|
|
) -> u32 {
|
|
state
|
|
.resource_table
|
|
.get::<ServerWebSocket>(rid)
|
|
.unwrap()
|
|
.buffered
|
|
.get() as u32
|
|
}
|
|
|
|
#[op2(async)]
|
|
pub async fn op_ws_send_ping(
|
|
state: Rc<RefCell<OpState>>,
|
|
#[smi] rid: ResourceId,
|
|
) -> Result<(), AnyError> {
|
|
let resource = state
|
|
.borrow_mut()
|
|
.resource_table
|
|
.get::<ServerWebSocket>(rid)?;
|
|
let lock = resource.reserve_lock();
|
|
resource
|
|
.write_frame(
|
|
lock,
|
|
Frame::new(true, OpCode::Ping, None, EMPTY_PAYLOAD.into()),
|
|
)
|
|
.await
|
|
}
|
|
|
|
#[op2(async(lazy))]
|
|
pub async fn op_ws_close(
|
|
state: Rc<RefCell<OpState>>,
|
|
#[smi] rid: ResourceId,
|
|
#[smi] code: Option<u16>,
|
|
#[string] reason: Option<String>,
|
|
) -> Result<(), AnyError> {
|
|
let resource = state
|
|
.borrow_mut()
|
|
.resource_table
|
|
.get::<ServerWebSocket>(rid)?;
|
|
let frame = reason
|
|
.map(|reason| Frame::close(code.unwrap_or(1005), reason.as_bytes()))
|
|
.unwrap_or_else(|| Frame::close_raw(vec![].into()));
|
|
|
|
resource.closed.set(true);
|
|
let lock = resource.reserve_lock();
|
|
resource.write_frame(lock, frame).await?;
|
|
Ok(())
|
|
}
|
|
|
|
#[op2]
|
|
#[serde]
|
|
pub fn op_ws_get_buffer(
|
|
state: &mut OpState,
|
|
#[smi] rid: ResourceId,
|
|
) -> Option<ToJsBuffer> {
|
|
let Ok(resource) = state.resource_table.get::<ServerWebSocket>(rid) else {
|
|
return None;
|
|
};
|
|
resource.buffer.take().map(ToJsBuffer::from)
|
|
}
|
|
|
|
#[op2]
|
|
#[string]
|
|
pub fn op_ws_get_buffer_as_string(
|
|
state: &mut OpState,
|
|
#[smi] rid: ResourceId,
|
|
) -> Option<String> {
|
|
let Ok(resource) = state.resource_table.get::<ServerWebSocket>(rid) else {
|
|
return None;
|
|
};
|
|
resource.string.take()
|
|
}
|
|
|
|
#[op2]
|
|
#[string]
|
|
pub fn op_ws_get_error(state: &mut OpState, #[smi] rid: ResourceId) -> String {
|
|
let Ok(resource) = state.resource_table.get::<ServerWebSocket>(rid) else {
|
|
return "Bad resource".into();
|
|
};
|
|
resource.errored.set(false);
|
|
resource.error.take().unwrap_or_default()
|
|
}
|
|
|
|
#[op2(async)]
|
|
pub async fn op_ws_next_event(
|
|
state: Rc<RefCell<OpState>>,
|
|
#[smi] rid: ResourceId,
|
|
) -> u16 {
|
|
let Ok(resource) = state
|
|
.borrow_mut()
|
|
.resource_table
|
|
.get::<ServerWebSocket>(rid)
|
|
else {
|
|
// op_ws_get_error will correctly handle a bad resource
|
|
return MessageKind::Error as u16;
|
|
};
|
|
|
|
// If there's a pending error, this always returns error
|
|
if resource.errored.get() {
|
|
return MessageKind::Error as u16;
|
|
}
|
|
|
|
let mut ws = RcRef::map(&resource, |r| &r.ws_read).borrow_mut().await;
|
|
let writer = RcRef::map(&resource, |r| &r.ws_write);
|
|
let mut sender = move |frame| {
|
|
let writer = writer.clone();
|
|
async move { writer.borrow_mut().await.write_frame(frame).await }
|
|
};
|
|
loop {
|
|
let res = ws.read_frame(&mut sender).await;
|
|
let val = match res {
|
|
Ok(val) => val,
|
|
Err(err) => {
|
|
// No message was received, socket closed while we waited.
|
|
// Report closed status to JavaScript.
|
|
if resource.closed.get() {
|
|
return MessageKind::ClosedDefault as u16;
|
|
}
|
|
|
|
resource.set_error(Some(err.to_string()));
|
|
return MessageKind::Error as u16;
|
|
}
|
|
};
|
|
|
|
break match val.opcode {
|
|
OpCode::Text => match String::from_utf8(val.payload.to_vec()) {
|
|
Ok(s) => {
|
|
resource.string.set(Some(s));
|
|
MessageKind::Text as u16
|
|
}
|
|
Err(_) => {
|
|
resource.set_error(Some("Invalid string data".into()));
|
|
MessageKind::Error as u16
|
|
}
|
|
},
|
|
OpCode::Binary => {
|
|
resource.buffer.set(Some(val.payload.to_vec()));
|
|
MessageKind::Binary as u16
|
|
}
|
|
OpCode::Close => {
|
|
// Close reason is returned through error
|
|
if val.payload.len() < 2 {
|
|
resource.set_error(None);
|
|
MessageKind::ClosedDefault as u16
|
|
} else {
|
|
let close_code = CloseCode::from(u16::from_be_bytes([
|
|
val.payload[0],
|
|
val.payload[1],
|
|
]));
|
|
let reason = String::from_utf8(val.payload[2..].to_vec()).ok();
|
|
resource.set_error(reason);
|
|
close_code.into()
|
|
}
|
|
}
|
|
OpCode::Pong => MessageKind::Pong as u16,
|
|
OpCode::Continuation | OpCode::Ping => {
|
|
continue;
|
|
}
|
|
};
|
|
}
|
|
}
|
|
|
|
deno_core::extension!(deno_websocket,
|
|
deps = [ deno_url, deno_webidl ],
|
|
parameters = [P: WebSocketPermissions],
|
|
ops = [
|
|
op_ws_check_permission_and_cancel_handle<P>,
|
|
op_ws_create<P>,
|
|
op_ws_close,
|
|
op_ws_next_event,
|
|
op_ws_get_buffer,
|
|
op_ws_get_buffer_as_string,
|
|
op_ws_get_error,
|
|
op_ws_send_binary,
|
|
op_ws_send_binary_ab,
|
|
op_ws_send_text,
|
|
op_ws_send_binary_async,
|
|
op_ws_send_text_async,
|
|
op_ws_send_ping,
|
|
op_ws_get_buffered_amount,
|
|
],
|
|
esm = [ "01_websocket.js", "02_websocketstream.js" ],
|
|
options = {
|
|
user_agent: String,
|
|
root_cert_store_provider: Option<Arc<dyn RootCertStoreProvider>>,
|
|
unsafely_ignore_certificate_errors: Option<Vec<String>>
|
|
},
|
|
state = |state, options| {
|
|
state.put::<WsUserAgent>(WsUserAgent(options.user_agent));
|
|
state.put(UnsafelyIgnoreCertificateErrors(
|
|
options.unsafely_ignore_certificate_errors,
|
|
));
|
|
state.put::<WsRootStoreProvider>(WsRootStoreProvider(options.root_cert_store_provider));
|
|
},
|
|
);
|
|
|
|
pub fn get_declaration() -> PathBuf {
|
|
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_websocket.d.ts")
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct DomExceptionNetworkError {
|
|
pub msg: String,
|
|
}
|
|
|
|
impl DomExceptionNetworkError {
|
|
pub fn new(msg: &str) -> Self {
|
|
DomExceptionNetworkError {
|
|
msg: msg.to_string(),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl fmt::Display for DomExceptionNetworkError {
|
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
f.pad(&self.msg)
|
|
}
|
|
}
|
|
|
|
impl std::error::Error for DomExceptionNetworkError {}
|
|
|
|
pub fn get_network_error_class_name(e: &AnyError) -> Option<&'static str> {
|
|
e.downcast_ref::<DomExceptionNetworkError>()
|
|
.map(|_| "DOMExceptionNetworkError")
|
|
}
|
|
|
|
// Needed so hyper can use non Send futures
|
|
#[derive(Clone)]
|
|
struct LocalExecutor;
|
|
|
|
impl<Fut> hyper::rt::Executor<Fut> for LocalExecutor
|
|
where
|
|
Fut: Future + 'static,
|
|
Fut::Output: 'static,
|
|
{
|
|
fn execute(&self, fut: Fut) {
|
|
deno_core::unsync::spawn(fut);
|
|
}
|
|
}
|