// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. use deno_core::error::bad_resource_id; use deno_core::error::invalid_hostname; use deno_core::error::null_opbuf; use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::futures::stream::SplitSink; use deno_core::futures::stream::SplitStream; use deno_core::futures::SinkExt; use deno_core::futures::StreamExt; use deno_core::include_js_files; use deno_core::op_async; use deno_core::op_sync; use deno_core::url; use deno_core::AsyncRefCell; use deno_core::CancelFuture; use deno_core::CancelHandle; use deno_core::Extension; use deno_core::OpState; use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; use deno_core::ZeroCopyBuf; use http::{Method, Request, Uri}; use serde::Deserialize; use serde::Serialize; use std::borrow::Cow; use std::cell::RefCell; use std::io::BufReader; use std::io::Cursor; use std::path::PathBuf; use std::rc::Rc; use std::sync::Arc; use tokio::net::TcpStream; use tokio_rustls::{rustls::ClientConfig, TlsConnector}; use tokio_tungstenite::tungstenite::{ handshake::client::Response, protocol::frame::coding::CloseCode, protocol::CloseFrame, Message, }; use tokio_tungstenite::MaybeTlsStream; use tokio_tungstenite::{client_async, WebSocketStream}; use webpki::DNSNameRef; pub use tokio_tungstenite; // Re-export tokio_tungstenite #[derive(Clone)] pub struct WsCaData(pub Vec); #[derive(Clone)] pub struct WsUserAgent(pub String); pub trait WebSocketPermissions { fn check_net_url(&mut self, _url: &url::Url) -> Result<(), AnyError>; } /// For use with `op_websocket_*` when the user does not want permissions. pub struct NoWebSocketPermissions; impl WebSocketPermissions for NoWebSocketPermissions { fn check_net_url(&mut self, _url: &url::Url) -> Result<(), AnyError> { Ok(()) } } type WsStream = WebSocketStream>; pub enum WebSocketStreamType { Client { tx: AsyncRefCell>, rx: AsyncRefCell>, }, Server { tx: AsyncRefCell< SplitSink, Message>, >, rx: AsyncRefCell>>, }, } pub struct WsStreamResource { pub stream: WebSocketStreamType, // When a `WsStreamResource` resource is closed, all pending 'read' ops are // canceled, while 'write' ops are allowed to complete. Therefore only // 'read' futures are attached to this cancel handle. pub cancel: CancelHandle, } impl WsStreamResource { async fn send(self: &Rc, message: Message) -> Result<(), AnyError> { match self.stream { WebSocketStreamType::Client { .. } => { let mut tx = RcRef::map(self, |r| match &r.stream { WebSocketStreamType::Client { tx, .. } => tx, WebSocketStreamType::Server { .. } => unreachable!(), }) .borrow_mut() .await; tx.send(message).await?; } WebSocketStreamType::Server { .. } => { let mut tx = RcRef::map(self, |r| match &r.stream { WebSocketStreamType::Client { .. } => unreachable!(), WebSocketStreamType::Server { tx, .. } => tx, }) .borrow_mut() .await; tx.send(message).await?; } } Ok(()) } async fn next_message( self: &Rc, cancel: RcRef, ) -> Result< Option>, AnyError, > { match &self.stream { WebSocketStreamType::Client { .. } => { let mut rx = RcRef::map(self, |r| match &r.stream { WebSocketStreamType::Client { rx, .. } => rx, WebSocketStreamType::Server { .. } => unreachable!(), }) .borrow_mut() .await; rx.next().or_cancel(cancel).await.map_err(AnyError::from) } WebSocketStreamType::Server { .. } => { let mut rx = RcRef::map(self, |r| match &r.stream { WebSocketStreamType::Client { .. } => unreachable!(), WebSocketStreamType::Server { rx, .. } => rx, }) .borrow_mut() .await; rx.next().or_cancel(cancel).await.map_err(AnyError::from) } } } } impl Resource for WsStreamResource { fn name(&self) -> Cow { "webSocketStream".into() } } // This op is needed because creating a WS instance in JavaScript is a sync // operation and should throw error when permissions are not fulfilled, // but actual op that connects WS is async. pub fn op_ws_check_permission( state: &mut OpState, url: String, _: (), ) -> Result<(), AnyError> where WP: WebSocketPermissions + 'static, { state .borrow_mut::() .check_net_url(&url::Url::parse(&url)?)?; Ok(()) } #[derive(Deserialize)] #[serde(rename_all = "camelCase")] pub struct CreateArgs { url: String, protocols: String, } #[derive(Serialize)] #[serde(rename_all = "camelCase")] pub struct CreateResponse { rid: ResourceId, protocol: String, extensions: String, } pub async fn op_ws_create( state: Rc>, args: CreateArgs, _: (), ) -> Result where WP: WebSocketPermissions + 'static, { { let mut s = state.borrow_mut(); s.borrow_mut::() .check_net_url(&url::Url::parse(&args.url)?) .expect( "Permission check should have been done in op_ws_check_permission", ); } let ws_ca_data = state.borrow().try_borrow::().cloned(); let user_agent = state.borrow().borrow::().0.clone(); let uri: Uri = args.url.parse()?; let mut request = Request::builder().method(Method::GET).uri(&uri); request = request.header("User-Agent", user_agent); if !args.protocols.is_empty() { request = request.header("Sec-WebSocket-Protocol", args.protocols); } let request = request.body(())?; let domain = &uri.host().unwrap().to_string(); let port = &uri.port_u16().unwrap_or(match uri.scheme_str() { Some("wss") => 443, Some("ws") => 80, _ => unreachable!(), }); let addr = format!("{}:{}", domain, port); let tcp_socket = TcpStream::connect(addr).await?; let socket: MaybeTlsStream = match uri.scheme_str() { Some("ws") => MaybeTlsStream::Plain(tcp_socket), Some("wss") => { let mut config = ClientConfig::new(); config .root_store .add_server_trust_anchors(&webpki_roots::TLS_SERVER_ROOTS); if let Some(ws_ca_data) = ws_ca_data { let reader = &mut BufReader::new(Cursor::new(ws_ca_data.0)); config.root_store.add_pem_file(reader).unwrap(); } let tls_connector = TlsConnector::from(Arc::new(config)); let dnsname = DNSNameRef::try_from_ascii_str(domain) .map_err(|_| invalid_hostname(domain))?; let tls_socket = tls_connector.connect(dnsname, tcp_socket).await?; MaybeTlsStream::Rustls(tls_socket) } _ => unreachable!(), }; let (stream, response): (WsStream, Response) = client_async(request, socket).await.map_err(|err| { type_error(format!( "failed to connect to WebSocket: {}", err.to_string() )) })?; let (ws_tx, ws_rx) = stream.split(); let resource = WsStreamResource { stream: WebSocketStreamType::Client { rx: AsyncRefCell::new(ws_rx), tx: AsyncRefCell::new(ws_tx), }, cancel: Default::default(), }; let mut state = state.borrow_mut(); let rid = state.resource_table.add(resource); let protocol = match response.headers().get("Sec-WebSocket-Protocol") { Some(header) => header.to_str().unwrap(), None => "", }; let extensions = response .headers() .get_all("Sec-WebSocket-Extensions") .iter() .map(|header| header.to_str().unwrap()) .collect::(); Ok(CreateResponse { rid, protocol: protocol.to_string(), extensions, }) } #[derive(Deserialize)] #[serde(rename_all = "camelCase")] pub struct SendArgs { rid: ResourceId, kind: String, text: Option, } pub async fn op_ws_send( state: Rc>, args: SendArgs, buf: Option, ) -> Result<(), AnyError> { let msg = match args.kind.as_str() { "text" => Message::Text(args.text.unwrap()), "binary" => Message::Binary(buf.ok_or_else(null_opbuf)?.to_vec()), "pong" => Message::Pong(vec![]), _ => unreachable!(), }; let resource = state .borrow_mut() .resource_table .get::(args.rid) .ok_or_else(bad_resource_id)?; resource.send(msg).await?; Ok(()) } #[derive(Deserialize)] #[serde(rename_all = "camelCase")] pub struct CloseArgs { rid: ResourceId, code: Option, reason: Option, } pub async fn op_ws_close( state: Rc>, args: CloseArgs, _: (), ) -> Result<(), AnyError> { let rid = args.rid; let msg = Message::Close(args.code.map(|c| CloseFrame { code: CloseCode::from(c), reason: match args.reason { Some(reason) => Cow::from(reason), None => Default::default(), }, })); let resource = state .borrow_mut() .resource_table .get::(rid) .ok_or_else(bad_resource_id)?; resource.send(msg).await?; Ok(()) } #[derive(Serialize)] #[serde(tag = "kind", content = "value", rename_all = "camelCase")] pub enum NextEventResponse { String(String), Binary(ZeroCopyBuf), Close { code: u16, reason: String }, Ping, Pong, Error(String), Closed, } pub async fn op_ws_next_event( state: Rc>, rid: ResourceId, _: (), ) -> Result { let resource = state .borrow_mut() .resource_table .get::(rid) .ok_or_else(bad_resource_id)?; let cancel = RcRef::map(&resource, |r| &r.cancel); let val = resource.next_message(cancel).await?; let res = match val { Some(Ok(Message::Text(text))) => NextEventResponse::String(text), Some(Ok(Message::Binary(data))) => NextEventResponse::Binary(data.into()), Some(Ok(Message::Close(Some(frame)))) => NextEventResponse::Close { code: frame.code.into(), reason: frame.reason.to_string(), }, Some(Ok(Message::Close(None))) => NextEventResponse::Close { code: 1005, reason: String::new(), }, Some(Ok(Message::Ping(_))) => NextEventResponse::Ping, Some(Ok(Message::Pong(_))) => NextEventResponse::Pong, Some(Err(e)) => NextEventResponse::Error(e.to_string()), None => { state.borrow_mut().resource_table.close(rid).unwrap(); NextEventResponse::Closed } }; Ok(res) } pub fn init( user_agent: String, ca_data: Option>, ) -> Extension { Extension::builder() .js(include_js_files!( prefix "deno:extensions/websocket", "01_websocket.js", )) .ops(vec![ ( "op_ws_check_permission", op_sync(op_ws_check_permission::

), ), ("op_ws_create", op_async(op_ws_create::

)), ("op_ws_send", op_async(op_ws_send)), ("op_ws_close", op_async(op_ws_close)), ("op_ws_next_event", op_async(op_ws_next_event)), ]) .state(move |state| { state.put::(WsUserAgent(user_agent.clone())); if let Some(ca_data) = ca_data.clone() { state.put::(WsCaData(ca_data)); } Ok(()) }) .build() } pub fn get_declaration() -> PathBuf { PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("lib.deno_websocket.d.ts") }