1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2024-12-27 01:29:14 -05:00
denoland-deno/ext/websocket/lib.rs

935 lines
25 KiB
Rust
Raw Normal View History

// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
use crate::stream::WebSocketStream;
use bytes::Bytes;
use deno_core::futures::TryFutureExt;
use deno_core::op2;
use deno_core::unsync::spawn;
use deno_core::url;
use deno_core::AsyncMutFuture;
use deno_core::AsyncRefCell;
use deno_core::ByteString;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::JsBuffer;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ToJsBuffer;
use deno_net::raw::NetworkStream;
use deno_tls::create_client_config;
use deno_tls::rustls::ClientConfig;
use deno_tls::rustls::ClientConnection;
use deno_tls::RootCertStoreProvider;
use deno_tls::SocketUse;
use deno_tls::TlsKeys;
use http::header::CONNECTION;
use http::header::UPGRADE;
use http::HeaderName;
use http::HeaderValue;
use http::Method;
use http::Request;
use http::StatusCode;
use http::Uri;
use once_cell::sync::Lazy;
use rustls_tokio_stream::rustls::pki_types::ServerName;
use rustls_tokio_stream::rustls::RootCertStore;
use rustls_tokio_stream::TlsStream;
use serde::Serialize;
use std::borrow::Cow;
use std::cell::Cell;
use std::cell::RefCell;
use std::future::Future;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::rc::Rc;
use std::sync::Arc;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::io::ReadHalf;
use tokio::io::WriteHalf;
use tokio::net::TcpStream;
use fastwebsockets::CloseCode;
use fastwebsockets::FragmentCollectorRead;
use fastwebsockets::Frame;
use fastwebsockets::OpCode;
use fastwebsockets::Role;
use fastwebsockets::WebSocket;
use fastwebsockets::WebSocketWrite;
mod stream;
static USE_WRITEV: Lazy<bool> = Lazy::new(|| {
let enable = std::env::var("DENO_USE_WRITEV").ok();
if let Some(val) = enable {
return !val.is_empty();
}
false
});
#[derive(Debug, thiserror::Error)]
pub enum WebsocketError {
#[error(transparent)]
Url(url::ParseError),
#[error(transparent)]
Permission(deno_core::error::AnyError),
#[error(transparent)]
Resource(deno_core::error::AnyError),
#[error(transparent)]
Uri(#[from] http::uri::InvalidUri),
#[error("{0}")]
Io(#[from] std::io::Error),
#[error(transparent)]
WebSocket(#[from] fastwebsockets::WebSocketError),
#[error("failed to connect to WebSocket: {0}")]
ConnectionFailed(#[from] HandshakeError),
#[error(transparent)]
Canceled(#[from] deno_core::Canceled),
}
#[derive(Clone)]
pub struct WsRootStoreProvider(Option<Arc<dyn RootCertStoreProvider>>);
impl WsRootStoreProvider {
pub fn get_or_try_init(
&self,
) -> Result<Option<RootCertStore>, deno_core::error::AnyError> {
Ok(match &self.0 {
Some(provider) => Some(provider.get_or_try_init()?.clone()),
None => None,
})
}
}
#[derive(Clone)]
pub struct WsUserAgent(pub String);
pub trait WebSocketPermissions {
fn check_net_url(
&mut self,
_url: &url::Url,
_api_name: &str,
) -> Result<(), deno_core::error::AnyError>;
}
impl WebSocketPermissions for deno_permissions::PermissionsContainer {
#[inline(always)]
fn check_net_url(
&mut self,
url: &url::Url,
api_name: &str,
) -> Result<(), deno_core::error::AnyError> {
deno_permissions::PermissionsContainer::check_net_url(self, url, api_name)
}
}
/// `UnsafelyIgnoreCertificateErrors` is a wrapper struct so it can be placed inside `GothamState`;
/// using type alias for a `Option<Vec<String>>` could work, but there's a high chance
/// that there might be another type alias pointing to a `Option<Vec<String>>`, which
/// would override previously used alias.
pub struct UnsafelyIgnoreCertificateErrors(Option<Vec<String>>);
pub struct WsCancelResource(Rc<CancelHandle>);
impl Resource for WsCancelResource {
fn name(&self) -> Cow<str> {
"webSocketCancel".into()
}
fn close(self: Rc<Self>) {
self.0.cancel()
}
}
// This op is needed because creating a WS instance in JavaScript is a sync
// operation and should throw error when permissions are not fulfilled,
// but actual op that connects WS is async.
#[op2]
#[smi]
pub fn op_ws_check_permission_and_cancel_handle<WP>(
state: &mut OpState,
#[string] api_name: String,
#[string] url: String,
cancel_handle: bool,
) -> Result<Option<ResourceId>, WebsocketError>
where
WP: WebSocketPermissions + 'static,
{
state
.borrow_mut::<WP>()
.check_net_url(
&url::Url::parse(&url).map_err(WebsocketError::Url)?,
&api_name,
)
.map_err(WebsocketError::Permission)?;
if cancel_handle {
let rid = state
.resource_table
.add(WsCancelResource(CancelHandle::new_rc()));
Ok(Some(rid))
} else {
Ok(None)
}
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CreateResponse {
rid: ResourceId,
protocol: String,
extensions: String,
}
#[derive(Debug, thiserror::Error)]
pub enum HandshakeError {
#[error("Missing path in url")]
MissingPath,
#[error("Invalid status code {0}")]
InvalidStatusCode(StatusCode),
#[error(transparent)]
Http(#[from] http::Error),
#[error(transparent)]
WebSocket(#[from] fastwebsockets::WebSocketError),
#[error("Didn't receive h2 alpn, aborting connection")]
NoH2Alpn,
#[error(transparent)]
Rustls(#[from] deno_tls::rustls::Error),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
H2(#[from] h2::Error),
#[error("Invalid hostname: '{0}'")]
InvalidHostname(String),
#[error(transparent)]
RootStoreError(deno_core::error::AnyError),
#[error(transparent)]
Tls(deno_tls::TlsError),
#[error(transparent)]
HeaderName(#[from] http::header::InvalidHeaderName),
#[error(transparent)]
HeaderValue(#[from] http::header::InvalidHeaderValue),
}
async fn handshake_websocket(
state: &Rc<RefCell<OpState>>,
uri: &Uri,
protocols: &str,
headers: Option<Vec<(ByteString, ByteString)>>,
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), HandshakeError> {
let mut request = Request::builder().method(Method::GET).uri(
uri
.path_and_query()
.ok_or(HandshakeError::MissingPath)?
.as_str(),
);
let authority = uri.authority().unwrap().as_str();
let host = authority
.find('@')
.map(|idx| authority.split_at(idx + 1).1)
.unwrap_or_else(|| authority);
request = request
.header("Host", host)
.header(UPGRADE, "websocket")
.header(CONNECTION, "Upgrade")
.header(
"Sec-WebSocket-Key",
fastwebsockets::handshake::generate_key(),
);
let user_agent = state.borrow().borrow::<WsUserAgent>().0.clone();
request =
populate_common_request_headers(request, &user_agent, protocols, &headers)?;
let request = request
.body(http_body_util::Empty::new())
.map_err(HandshakeError::Http)?;
let domain = &uri.host().unwrap().to_string();
let port = &uri.port_u16().unwrap_or(match uri.scheme_str() {
Some("wss") => 443,
Some("ws") => 80,
_ => unreachable!(),
});
let addr = format!("{domain}:{port}");
let res = match uri.scheme_str() {
Some("ws") => handshake_http1_ws(request, &addr).await?,
Some("wss") => {
match handshake_http1_wss(state, request, domain, &addr).await {
Ok(res) => res,
Err(_) => {
handshake_http2_wss(
state,
uri,
authority,
&user_agent,
protocols,
domain,
&headers,
&addr,
)
.await?
}
}
}
_ => unreachable!(),
};
Ok(res)
}
async fn handshake_http1_ws(
request: Request<http_body_util::Empty<Bytes>>,
addr: &String,
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), HandshakeError> {
let tcp_socket = TcpStream::connect(addr).await?;
handshake_connection(request, tcp_socket).await
}
async fn handshake_http1_wss(
state: &Rc<RefCell<OpState>>,
request: Request<http_body_util::Empty<Bytes>>,
domain: &str,
addr: &str,
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), HandshakeError> {
let tcp_socket = TcpStream::connect(addr).await?;
let tls_config = create_ws_client_config(state, SocketUse::Http1Only)?;
let dnsname = ServerName::try_from(domain.to_string())
.map_err(|_| HandshakeError::InvalidHostname(domain.to_string()))?;
let mut tls_connector = TlsStream::new_client_side(
tcp_socket,
ClientConnection::new(tls_config.into(), dnsname)?,
NonZeroUsize::new(65536),
);
// If we can bail on an http/1.1 ALPN mismatch here, we can avoid doing extra work
tls_connector.handshake().await?;
handshake_connection(request, tls_connector).await
}
#[allow(clippy::too_many_arguments)]
async fn handshake_http2_wss(
state: &Rc<RefCell<OpState>>,
uri: &Uri,
authority: &str,
user_agent: &str,
protocols: &str,
domain: &str,
headers: &Option<Vec<(ByteString, ByteString)>>,
addr: &str,
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), HandshakeError> {
let tcp_socket = TcpStream::connect(addr).await?;
let tls_config = create_ws_client_config(state, SocketUse::Http2Only)?;
let dnsname = ServerName::try_from(domain.to_string())
.map_err(|_| HandshakeError::InvalidHostname(domain.to_string()))?;
// We need to better expose the underlying errors here
let mut tls_connector = TlsStream::new_client_side(
tcp_socket,
ClientConnection::new(tls_config.into(), dnsname)?,
None,
);
let handshake = tls_connector.handshake().await?;
if handshake.alpn.is_none() {
return Err(HandshakeError::NoH2Alpn);
}
let h2 = h2::client::Builder::new();
let (mut send, conn) = h2.handshake::<_, Bytes>(tls_connector).await?;
spawn(conn);
let mut request = Request::builder();
request = request.method(Method::CONNECT);
let uri = Uri::builder()
.authority(authority)
.path_and_query(uri.path_and_query().unwrap().as_str())
.scheme("https")
.build()?;
request = request.uri(uri);
request =
populate_common_request_headers(request, user_agent, protocols, headers)?;
request = request.extension(h2::ext::Protocol::from("websocket"));
let (resp, send) = send.send_request(request.body(())?, false)?;
let resp = resp.await?;
if resp.status() != StatusCode::OK {
return Err(HandshakeError::InvalidStatusCode(resp.status()));
}
let (http::response::Parts { headers, .. }, recv) = resp.into_parts();
let mut stream = WebSocket::after_handshake(
WebSocketStream::new(stream::WsStreamKind::H2(send, recv), None),
Role::Client,
);
// We currently don't support vectored writes in the H2 streams
stream.set_writev(false);
// TODO(mmastrac): we should be able to use a zero masking key over HTTPS
// stream.set_auto_apply_mask(false);
Ok((stream, headers))
}
async fn handshake_connection<
S: AsyncRead + AsyncWrite + Send + Unpin + 'static,
>(
request: Request<http_body_util::Empty<Bytes>>,
socket: S,
) -> Result<(WebSocket<WebSocketStream>, http::HeaderMap), HandshakeError> {
let (upgraded, response) =
fastwebsockets::handshake::client(&LocalExecutor, request, socket).await?;
let upgraded = upgraded.into_inner();
let stream =
WebSocketStream::new(stream::WsStreamKind::Upgraded(upgraded), None);
let stream = WebSocket::after_handshake(stream, Role::Client);
Ok((stream, response.into_parts().0.headers))
}
pub fn create_ws_client_config(
state: &Rc<RefCell<OpState>>,
socket_use: SocketUse,
) -> Result<ClientConfig, HandshakeError> {
let unsafely_ignore_certificate_errors: Option<Vec<String>> = state
.borrow()
.try_borrow::<UnsafelyIgnoreCertificateErrors>()
.and_then(|it| it.0.clone());
let root_cert_store = state
.borrow()
.borrow::<WsRootStoreProvider>()
.get_or_try_init()
.map_err(HandshakeError::RootStoreError)?;
create_client_config(
root_cert_store,
vec![],
unsafely_ignore_certificate_errors,
TlsKeys::Null,
socket_use,
)
.map_err(HandshakeError::Tls)
}
/// Headers common to both http/1.1 and h2 requests.
fn populate_common_request_headers(
mut request: http::request::Builder,
user_agent: &str,
protocols: &str,
headers: &Option<Vec<(ByteString, ByteString)>>,
) -> Result<http::request::Builder, HandshakeError> {
request = request
.header("User-Agent", user_agent)
.header("Sec-WebSocket-Version", "13");
if !protocols.is_empty() {
request = request.header("Sec-WebSocket-Protocol", protocols);
}
if let Some(headers) = headers {
for (key, value) in headers {
let name = HeaderName::from_bytes(key)?;
let v = HeaderValue::from_bytes(value)?;
let is_disallowed_header = matches!(
name,
http::header::HOST
| http::header::SEC_WEBSOCKET_ACCEPT
| http::header::SEC_WEBSOCKET_EXTENSIONS
| http::header::SEC_WEBSOCKET_KEY
| http::header::SEC_WEBSOCKET_PROTOCOL
| http::header::SEC_WEBSOCKET_VERSION
| http::header::UPGRADE
| http::header::CONNECTION
);
if !is_disallowed_header {
request = request.header(name, v);
}
}
}
Ok(request)
}
#[op2(async)]
#[serde]
pub async fn op_ws_create<WP>(
state: Rc<RefCell<OpState>>,
#[string] api_name: String,
#[string] url: String,
#[string] protocols: String,
#[smi] cancel_handle: Option<ResourceId>,
#[serde] headers: Option<Vec<(ByteString, ByteString)>>,
) -> Result<CreateResponse, WebsocketError>
where
WP: WebSocketPermissions + 'static,
{
{
let mut s = state.borrow_mut();
s.borrow_mut::<WP>()
.check_net_url(
&url::Url::parse(&url).map_err(WebsocketError::Url)?,
&api_name,
)
.expect(
"Permission check should have been done in op_ws_check_permission",
);
}
let cancel_resource = if let Some(cancel_rid) = cancel_handle {
let r = state
.borrow_mut()
.resource_table
.get::<WsCancelResource>(cancel_rid)
.map_err(WebsocketError::Resource)?;
Some(r.0.clone())
} else {
None
};
let uri: Uri = url.parse()?;
let handshake = handshake_websocket(&state, &uri, &protocols, headers)
.map_err(WebsocketError::ConnectionFailed);
let (stream, response) = match cancel_resource {
Some(rc) => handshake.try_or_cancel(rc).await?,
None => handshake.await?,
};
if let Some(cancel_rid) = cancel_handle {
if let Ok(res) = state.borrow_mut().resource_table.take_any(cancel_rid) {
res.close();
}
}
let mut state = state.borrow_mut();
let rid = state.resource_table.add(ServerWebSocket::new(stream));
let protocol = match response.get("Sec-WebSocket-Protocol") {
Some(header) => header.to_str().unwrap(),
None => "",
};
let extensions = response
.get_all("Sec-WebSocket-Extensions")
.iter()
.map(|header| header.to_str().unwrap())
.collect::<String>();
Ok(CreateResponse {
rid,
protocol: protocol.to_string(),
extensions,
})
}
#[repr(u16)]
pub enum MessageKind {
Text = 0,
Binary = 1,
Pong = 2,
Error = 3,
ClosedDefault = 1005,
}
/// To avoid locks, we keep as much as we can inside of [`Cell`]s.
pub struct ServerWebSocket {
buffered: Cell<usize>,
error: Cell<Option<String>>,
errored: Cell<bool>,
closed: Cell<bool>,
buffer: Cell<Option<Vec<u8>>>,
string: Cell<Option<String>>,
ws_read: AsyncRefCell<FragmentCollectorRead<ReadHalf<WebSocketStream>>>,
ws_write: AsyncRefCell<WebSocketWrite<WriteHalf<WebSocketStream>>>,
}
impl ServerWebSocket {
fn new(ws: WebSocket<WebSocketStream>) -> Self {
let (ws_read, ws_write) = ws.split(tokio::io::split);
Self {
buffered: Cell::new(0),
error: Cell::new(None),
errored: Cell::new(false),
closed: Cell::new(false),
buffer: Cell::new(None),
string: Cell::new(None),
ws_read: AsyncRefCell::new(FragmentCollectorRead::new(ws_read)),
ws_write: AsyncRefCell::new(ws_write),
}
}
fn set_error(&self, error: Option<String>) {
if let Some(error) = error {
self.error.set(Some(error));
self.errored.set(true);
} else {
self.error.set(None);
self.errored.set(false);
}
}
/// Reserve a lock, but don't wait on it. This gets us our place in line.
fn reserve_lock(
self: &Rc<Self>,
) -> AsyncMutFuture<WebSocketWrite<WriteHalf<WebSocketStream>>> {
RcRef::map(self, |r| &r.ws_write).borrow_mut()
}
#[inline]
async fn write_frame(
self: &Rc<Self>,
lock: AsyncMutFuture<WebSocketWrite<WriteHalf<WebSocketStream>>>,
frame: Frame<'_>,
) -> Result<(), WebsocketError> {
let mut ws = lock.await;
if ws.is_closed() {
return Ok(());
}
ws.write_frame(frame).await?;
Ok(())
}
}
impl Resource for ServerWebSocket {
fn name(&self) -> Cow<str> {
"serverWebSocket".into()
}
}
pub fn ws_create_server_stream(
state: &mut OpState,
transport: NetworkStream,
read_buf: Bytes,
) -> ResourceId {
let mut ws = WebSocket::after_handshake(
WebSocketStream::new(
stream::WsStreamKind::Network(transport),
Some(read_buf),
),
Role::Server,
);
ws.set_writev(*USE_WRITEV);
ws.set_auto_close(true);
ws.set_auto_pong(true);
state.resource_table.add(ServerWebSocket::new(ws))
}
fn send_binary(state: &mut OpState, rid: ResourceId, data: &[u8]) {
let resource = state.resource_table.get::<ServerWebSocket>(rid).unwrap();
let data = data.to_vec();
let len = data.len();
resource.buffered.set(resource.buffered.get() + len);
let lock = resource.reserve_lock();
deno_core::unsync::spawn(async move {
if let Err(err) = resource
.write_frame(lock, Frame::new(true, OpCode::Binary, None, data.into()))
.await
{
resource.set_error(Some(err.to_string()));
} else {
resource.buffered.set(resource.buffered.get() - len);
}
});
}
#[op2]
pub fn op_ws_send_binary(
state: &mut OpState,
#[smi] rid: ResourceId,
#[anybuffer] data: &[u8],
) {
send_binary(state, rid, data)
}
#[op2(fast)]
pub fn op_ws_send_binary_ab(
state: &mut OpState,
#[smi] rid: ResourceId,
#[arraybuffer] data: &[u8],
) {
send_binary(state, rid, data)
}
#[op2(fast)]
pub fn op_ws_send_text(
state: &mut OpState,
#[smi] rid: ResourceId,
#[string] data: String,
) {
let resource = state.resource_table.get::<ServerWebSocket>(rid).unwrap();
let len = data.len();
resource.buffered.set(resource.buffered.get() + len);
let lock = resource.reserve_lock();
deno_core::unsync::spawn(async move {
if let Err(err) = resource
.write_frame(
lock,
Frame::new(true, OpCode::Text, None, data.into_bytes().into()),
)
.await
{
resource.set_error(Some(err.to_string()));
} else {
resource.buffered.set(resource.buffered.get() - len);
}
});
}
/// Async version of send. Does not update buffered amount as we rely on the socket itself for backpressure.
#[op2(async)]
pub async fn op_ws_send_binary_async(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
#[buffer] data: JsBuffer,
) -> Result<(), WebsocketError> {
let resource = state
.borrow_mut()
.resource_table
.get::<ServerWebSocket>(rid)
.map_err(WebsocketError::Resource)?;
let data = data.to_vec();
let lock = resource.reserve_lock();
resource
.write_frame(lock, Frame::new(true, OpCode::Binary, None, data.into()))
.await
}
/// Async version of send. Does not update buffered amount as we rely on the socket itself for backpressure.
#[op2(async)]
pub async fn op_ws_send_text_async(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
#[string] data: String,
) -> Result<(), WebsocketError> {
let resource = state
.borrow_mut()
.resource_table
.get::<ServerWebSocket>(rid)
.map_err(WebsocketError::Resource)?;
let lock = resource.reserve_lock();
resource
.write_frame(
lock,
Frame::new(true, OpCode::Text, None, data.into_bytes().into()),
)
.await
}
const EMPTY_PAYLOAD: &[u8] = &[];
#[op2(fast)]
#[smi]
pub fn op_ws_get_buffered_amount(
state: &mut OpState,
#[smi] rid: ResourceId,
) -> u32 {
state
.resource_table
.get::<ServerWebSocket>(rid)
.unwrap()
.buffered
.get() as u32
}
#[op2(async)]
pub async fn op_ws_send_ping(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
) -> Result<(), WebsocketError> {
let resource = state
.borrow_mut()
.resource_table
.get::<ServerWebSocket>(rid)
.map_err(WebsocketError::Resource)?;
let lock = resource.reserve_lock();
resource
.write_frame(
lock,
Frame::new(true, OpCode::Ping, None, EMPTY_PAYLOAD.into()),
)
.await
}
#[op2(async(lazy))]
pub async fn op_ws_close(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
#[smi] code: Option<u16>,
#[string] reason: Option<String>,
) -> Result<(), WebsocketError> {
let Ok(resource) = state
.borrow_mut()
.resource_table
.get::<ServerWebSocket>(rid)
else {
return Ok(());
};
let frame = reason
.map(|reason| Frame::close(code.unwrap_or(1005), reason.as_bytes()))
.unwrap_or_else(|| Frame::close_raw(vec![].into()));
resource.closed.set(true);
let lock = resource.reserve_lock();
resource.write_frame(lock, frame).await
}
#[op2]
#[serde]
pub fn op_ws_get_buffer(
state: &mut OpState,
#[smi] rid: ResourceId,
) -> Option<ToJsBuffer> {
let Ok(resource) = state.resource_table.get::<ServerWebSocket>(rid) else {
return None;
};
resource.buffer.take().map(ToJsBuffer::from)
}
#[op2]
#[string]
pub fn op_ws_get_buffer_as_string(
state: &mut OpState,
#[smi] rid: ResourceId,
) -> Option<String> {
let Ok(resource) = state.resource_table.get::<ServerWebSocket>(rid) else {
return None;
};
resource.string.take()
}
#[op2]
#[string]
pub fn op_ws_get_error(state: &mut OpState, #[smi] rid: ResourceId) -> String {
let Ok(resource) = state.resource_table.get::<ServerWebSocket>(rid) else {
return "Bad resource".into();
};
resource.errored.set(false);
resource.error.take().unwrap_or_default()
}
#[op2(async)]
pub async fn op_ws_next_event(
state: Rc<RefCell<OpState>>,
#[smi] rid: ResourceId,
) -> u16 {
let Ok(resource) = state
.borrow_mut()
.resource_table
.get::<ServerWebSocket>(rid)
else {
// op_ws_get_error will correctly handle a bad resource
return MessageKind::Error as u16;
};
// If there's a pending error, this always returns error
if resource.errored.get() {
return MessageKind::Error as u16;
}
let mut ws = RcRef::map(&resource, |r| &r.ws_read).borrow_mut().await;
let writer = RcRef::map(&resource, |r| &r.ws_write);
let mut sender = move |frame| {
let writer = writer.clone();
async move { writer.borrow_mut().await.write_frame(frame).await }
};
loop {
let res = ws.read_frame(&mut sender).await;
let val = match res {
Ok(val) => val,
Err(err) => {
// No message was received, socket closed while we waited.
// Report closed status to JavaScript.
if resource.closed.get() {
return MessageKind::ClosedDefault as u16;
}
resource.set_error(Some(err.to_string()));
return MessageKind::Error as u16;
}
};
break match val.opcode {
OpCode::Text => match String::from_utf8(val.payload.to_vec()) {
Ok(s) => {
resource.string.set(Some(s));
MessageKind::Text as u16
}
Err(_) => {
resource.set_error(Some("Invalid string data".into()));
MessageKind::Error as u16
}
},
OpCode::Binary => {
resource.buffer.set(Some(val.payload.to_vec()));
MessageKind::Binary as u16
}
OpCode::Close => {
// Close reason is returned through error
if val.payload.len() < 2 {
resource.set_error(None);
MessageKind::ClosedDefault as u16
} else {
let close_code = CloseCode::from(u16::from_be_bytes([
val.payload[0],
val.payload[1],
]));
let reason = String::from_utf8(val.payload[2..].to_vec()).ok();
resource.set_error(reason);
close_code.into()
}
}
OpCode::Pong => MessageKind::Pong as u16,
OpCode::Continuation | OpCode::Ping => {
continue;
}
};
}
}
deno_core::extension!(deno_websocket,
deps = [ deno_url, deno_webidl ],
parameters = [P: WebSocketPermissions],
ops = [
op_ws_check_permission_and_cancel_handle<P>,
op_ws_create<P>,
op_ws_close,
op_ws_next_event,
op_ws_get_buffer,
op_ws_get_buffer_as_string,
op_ws_get_error,
op_ws_send_binary,
op_ws_send_binary_ab,
op_ws_send_text,
op_ws_send_binary_async,
op_ws_send_text_async,
op_ws_send_ping,
op_ws_get_buffered_amount,
],
esm = [ "01_websocket.js", "02_websocketstream.js" ],
options = {
user_agent: String,
root_cert_store_provider: Option<Arc<dyn RootCertStoreProvider>>,
unsafely_ignore_certificate_errors: Option<Vec<String>>
},
state = |state, options| {
state.put::<WsUserAgent>(WsUserAgent(options.user_agent));
state.put(UnsafelyIgnoreCertificateErrors(
options.unsafely_ignore_certificate_errors,
));
state.put::<WsRootStoreProvider>(WsRootStoreProvider(options.root_cert_store_provider));
},
);
pub fn get_declaration() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_websocket.d.ts")
}
// Needed so hyper can use non Send futures
#[derive(Clone)]
struct LocalExecutor;
impl<Fut> hyper::rt::Executor<Fut> for LocalExecutor
where
Fut: Future + 'static,
Fut::Output: 'static,
{
fn execute(&self, fut: Fut) {
deno_core::unsync::spawn(fut);
}
}