1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-16 02:48:52 -05:00
denoland-deno/ext/net/ops_tls.rs
Bert Belder 6a96560986
fix(ext/net): fix TLS bugs and add 'op_tls_handshake' (#12501)
A bug was fixed that could cause a hang when a method was
called on a TlsConn object that had thrown an exception earlier.

Additionally, a bug was fixed that caused TlsConn.write() to not
completely flush large buffers (>64kB) to the socket.

The public `TlsConn.handshake()` API is scheduled for inclusion in the
next minor release. See https://github.com/denoland/deno/pull/12467.
2021-10-20 01:30:04 +02:00

1147 lines
31 KiB
Rust

// Copyright 2018-2021 the Deno authors. All rights reserved. MIT license.
use crate::io::TcpStreamResource;
use crate::ops::IpAddr;
use crate::ops::OpAddr;
use crate::ops::OpConn;
use crate::resolve_addr::resolve_addr;
use crate::resolve_addr::resolve_addr_sync;
use crate::DefaultTlsOptions;
use crate::NetPermissions;
use crate::UnsafelyIgnoreCertificateErrors;
use deno_core::error::bad_resource;
use deno_core::error::custom_error;
use deno_core::error::generic_error;
use deno_core::error::invalid_hostname;
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::futures::future::poll_fn;
use deno_core::futures::ready;
use deno_core::futures::task::noop_waker_ref;
use deno_core::futures::task::AtomicWaker;
use deno_core::futures::task::Context;
use deno_core::futures::task::Poll;
use deno_core::futures::task::RawWaker;
use deno_core::futures::task::RawWakerVTable;
use deno_core::futures::task::Waker;
use deno_core::op_async;
use deno_core::op_sync;
use deno_core::parking_lot::Mutex;
use deno_core::AsyncRefCell;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::OpPair;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_tls::create_client_config;
use deno_tls::load_certs;
use deno_tls::load_private_keys;
use deno_tls::rustls::Certificate;
use deno_tls::rustls::ClientConfig;
use deno_tls::rustls::ClientSession;
use deno_tls::rustls::NoClientAuth;
use deno_tls::rustls::PrivateKey;
use deno_tls::rustls::ServerConfig;
use deno_tls::rustls::ServerSession;
use deno_tls::rustls::Session;
use deno_tls::webpki::DNSNameRef;
use io::Error;
use io::Read;
use io::Write;
use serde::Deserialize;
use std::borrow::Cow;
use std::cell::Cell;
use std::cell::RefCell;
use std::convert::From;
use std::fs::File;
use std::io;
use std::io::BufReader;
use std::io::ErrorKind;
use std::ops::Deref;
use std::ops::DerefMut;
use std::path::Path;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::Arc;
use std::sync::Weak;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWrite;
use tokio::io::AsyncWriteExt;
use tokio::io::ReadBuf;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::task::spawn_local;
#[derive(Debug)]
enum TlsSession {
Client(ClientSession),
Server(ServerSession),
}
impl Deref for TlsSession {
type Target = dyn Session;
fn deref(&self) -> &Self::Target {
match self {
TlsSession::Client(client_session) => client_session,
TlsSession::Server(server_session) => server_session,
}
}
}
impl DerefMut for TlsSession {
fn deref_mut(&mut self) -> &mut Self::Target {
match self {
TlsSession::Client(client_session) => client_session,
TlsSession::Server(server_session) => server_session,
}
}
}
impl From<ClientSession> for TlsSession {
fn from(client_session: ClientSession) -> Self {
TlsSession::Client(client_session)
}
}
impl From<ServerSession> for TlsSession {
fn from(server_session: ServerSession) -> Self {
TlsSession::Server(server_session)
}
}
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
enum Flow {
Handshake,
Read,
Write,
}
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
enum State {
StreamOpen,
StreamClosed,
TlsClosing,
TlsClosed,
TlsError,
TcpClosed,
}
#[derive(Debug)]
pub struct TlsStream(Option<TlsStreamInner>);
impl TlsStream {
fn new(tcp: TcpStream, tls: TlsSession) -> Self {
let inner = TlsStreamInner {
tcp,
tls,
rd_state: State::StreamOpen,
wr_state: State::StreamOpen,
};
Self(Some(inner))
}
pub fn new_client_side(
tcp: TcpStream,
tls_config: &Arc<ClientConfig>,
hostname: DNSNameRef,
) -> Self {
let tls = TlsSession::Client(ClientSession::new(tls_config, hostname));
Self::new(tcp, tls)
}
pub fn new_server_side(
tcp: TcpStream,
tls_config: &Arc<ServerConfig>,
) -> Self {
let tls = TlsSession::Server(ServerSession::new(tls_config));
Self::new(tcp, tls)
}
fn into_split(self) -> (ReadHalf, WriteHalf) {
let shared = Shared::new(self);
let rd = ReadHalf {
shared: shared.clone(),
};
let wr = WriteHalf { shared };
(rd, wr)
}
/// Tokio-rustls compatibility: returns a reference to the underlying TCP
/// stream, and a reference to the Rustls `Session` object.
pub fn get_ref(&self) -> (&TcpStream, &dyn Session) {
let inner = self.0.as_ref().unwrap();
(&inner.tcp, &*inner.tls)
}
fn inner_mut(&mut self) -> &mut TlsStreamInner {
self.0.as_mut().unwrap()
}
pub async fn handshake(&mut self) -> io::Result<()> {
poll_fn(|cx| self.inner_mut().poll_handshake(cx)).await
}
fn poll_handshake(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.inner_mut().poll_handshake(cx)
}
}
impl AsyncRead for TlsStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
self.inner_mut().poll_read(cx, buf)
}
}
impl AsyncWrite for TlsStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.inner_mut().poll_write(cx, buf)
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
self.inner_mut().poll_io(cx, Flow::Write)
// The underlying TCP stream does not need to be flushed.
}
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
self.inner_mut().poll_shutdown(cx)
}
}
impl Drop for TlsStream {
fn drop(&mut self) {
let mut inner = self.0.take().unwrap();
let mut cx = Context::from_waker(noop_waker_ref());
let use_linger_task = inner.poll_close(&mut cx).is_pending();
if use_linger_task {
spawn_local(poll_fn(move |cx| inner.poll_close(cx)));
} else if cfg!(debug_assertions) {
spawn_local(async {}); // Spawn dummy task to detect missing LocalSet.
}
}
}
#[derive(Debug)]
pub struct TlsStreamInner {
tls: TlsSession,
tcp: TcpStream,
rd_state: State,
wr_state: State,
}
impl TlsStreamInner {
fn poll_io(
&mut self,
cx: &mut Context<'_>,
flow: Flow,
) -> Poll<io::Result<()>> {
loop {
let wr_ready = loop {
match self.wr_state {
_ if self.tls.is_handshaking() && !self.tls.wants_write() => {
break true;
}
_ if self.tls.is_handshaking() => {}
State::StreamOpen if !self.tls.wants_write() => break true,
State::StreamClosed => {
// Rustls will enqueue the 'CloseNotify' alert and send it after
// flusing the data that is already in the queue.
self.tls.send_close_notify();
self.wr_state = State::TlsClosing;
continue;
}
State::TlsClosing if !self.tls.wants_write() => {
self.wr_state = State::TlsClosed;
continue;
}
// If a 'CloseNotify' alert sent by the remote end has been received,
// shut down the underlying TCP socket. Otherwise, consider polling
// done for the moment.
State::TlsClosed if self.rd_state < State::TlsClosed => break true,
State::TlsClosed
if Pin::new(&mut self.tcp).poll_shutdown(cx)?.is_pending() =>
{
break false;
}
State::TlsClosed => {
self.wr_state = State::TcpClosed;
continue;
}
State::TcpClosed => break true,
_ => {}
}
// Write ciphertext to the TCP socket.
let mut wrapped_tcp = ImplementWriteTrait(&mut self.tcp);
match self.tls.write_tls(&mut wrapped_tcp) {
Ok(0) => {} // Wait until the socket has enough buffer space.
Ok(_) => continue, // Try to send more more data immediately.
Err(err) if err.kind() == ErrorKind::WouldBlock => unreachable!(),
Err(err) => return Poll::Ready(Err(err)),
}
// Poll whether there is space in the socket send buffer so we can flush
// the remaining outgoing ciphertext.
if self.tcp.poll_write_ready(cx)?.is_pending() {
break false;
}
};
let rd_ready = loop {
match self.rd_state {
State::TcpClosed if self.tls.is_handshaking() => {
let err = Error::new(ErrorKind::UnexpectedEof, "tls handshake eof");
return Poll::Ready(Err(err));
}
State::TlsError => {}
_ if self.tls.is_handshaking() && !self.tls.wants_read() => {
break true;
}
_ if self.tls.is_handshaking() => {}
State::StreamOpen if !self.tls.wants_read() => break true,
State::StreamOpen => {}
State::StreamClosed if !self.tls.wants_read() => {
// Rustls has more incoming cleartext buffered up, but the TLS
// session is closing so this data will never be processed by the
// application layer. Just like what would happen if this were a raw
// TCP stream, don't gracefully end the TLS session, but abort it.
return Poll::Ready(Err(Error::from(ErrorKind::ConnectionReset)));
}
State::StreamClosed => {}
State::TlsClosed if self.wr_state == State::TcpClosed => {
// Wait for the remote end to gracefully close the TCP connection.
// TODO(piscisaureus): this is unnecessary; remove when stable.
}
_ => break true,
}
if self.rd_state < State::TlsClosed {
// Do a zero-length plaintext read so we can detect the arrival of
// 'CloseNotify' messages, even if only the write half is open.
// Actually reading data from the socket is done in `poll_read()`.
match self.tls.read(&mut []) {
Ok(0) => {}
Err(err) if err.kind() == ErrorKind::ConnectionAborted => {
// `Session::read()` returns `ConnectionAborted` when a
// 'CloseNotify' alert has been received, which indicates that
// the remote peer wants to gracefully end the TLS session.
self.rd_state = State::TlsClosed;
continue;
}
Err(err) => return Poll::Ready(Err(err)),
_ => unreachable!(),
}
}
if self.rd_state != State::TlsError {
// Receive ciphertext from the socket.
let mut wrapped_tcp = ImplementReadTrait(&mut self.tcp);
match self.tls.read_tls(&mut wrapped_tcp) {
Ok(0) => {
// End of TCP stream.
self.rd_state = State::TcpClosed;
continue;
}
Err(err) if err.kind() == ErrorKind::WouldBlock => {
// Get notified when more ciphertext becomes available in the
// socket receive buffer.
if self.tcp.poll_read_ready(cx)?.is_pending() {
break false;
} else {
continue;
}
}
Err(err) => return Poll::Ready(Err(err)),
_ => {}
}
}
// Interpret and decrypt TLS protocol data.
match self.tls.process_new_packets() {
Ok(_) => assert!(self.rd_state < State::TcpClosed),
Err(err) => {
self.rd_state = State::TlsError;
return Poll::Ready(Err(Error::new(ErrorKind::InvalidData, err)));
}
}
};
if wr_ready {
if self.rd_state >= State::TlsClosed
&& self.wr_state >= State::TlsClosed
&& self.wr_state < State::TcpClosed
{
continue;
}
if self.tls.wants_write() {
continue;
}
}
let io_ready = match flow {
_ if self.tls.is_handshaking() => false,
Flow::Handshake => true,
Flow::Read => rd_ready,
Flow::Write => wr_ready,
};
return match io_ready {
false => Poll::Pending,
true => Poll::Ready(Ok(())),
};
}
}
fn poll_handshake(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
if self.tls.is_handshaking() {
ready!(self.poll_io(cx, Flow::Handshake))?;
}
Poll::Ready(Ok(()))
}
fn poll_read(
&mut self,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
ready!(self.poll_io(cx, Flow::Read))?;
if self.rd_state == State::StreamOpen {
let buf_slice =
unsafe { &mut *(buf.unfilled_mut() as *mut [_] as *mut [u8]) };
let bytes_read = self.tls.read(buf_slice)?;
assert_ne!(bytes_read, 0);
unsafe { buf.assume_init(bytes_read) };
buf.advance(bytes_read);
}
Poll::Ready(Ok(()))
}
fn poll_write(
&mut self,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
if buf.is_empty() {
// Tokio-rustls compatibility: a zero byte write always succeeds.
Poll::Ready(Ok(0))
} else if self.wr_state == State::StreamOpen {
// Flush Rustls' ciphertext send queue.
ready!(self.poll_io(cx, Flow::Write))?;
// Copy data from `buf` to the Rustls cleartext send queue.
let bytes_written = self.tls.write(buf)?;
assert_ne!(bytes_written, 0);
// Try to flush as much ciphertext as possible. However, since we just
// handed off at least some bytes to rustls, so we can't return
// `Poll::Pending()` any more: this would tell the caller that it should
// try to send those bytes again.
let _ = self.poll_io(cx, Flow::Write)?;
Poll::Ready(Ok(bytes_written))
} else {
// Return error if stream has been shut down for writing.
Poll::Ready(Err(ErrorKind::BrokenPipe.into()))
}
}
fn poll_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
if self.wr_state == State::StreamOpen {
self.wr_state = State::StreamClosed;
}
ready!(self.poll_io(cx, Flow::Write))?;
// At minimum, a TLS 'CloseNotify' alert should have been sent.
assert!(self.wr_state >= State::TlsClosed);
// If we received a TLS 'CloseNotify' alert from the remote end
// already, the TCP socket should be shut down at this point.
assert!(
self.rd_state < State::TlsClosed || self.wr_state == State::TcpClosed
);
Poll::Ready(Ok(()))
}
fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
if self.rd_state == State::StreamOpen {
self.rd_state = State::StreamClosed;
}
// Send TLS 'CloseNotify' alert.
ready!(self.poll_shutdown(cx))?;
// Wait for 'CloseNotify', shut down TCP stream, wait for TCP FIN packet.
ready!(self.poll_io(cx, Flow::Read))?;
assert_eq!(self.rd_state, State::TcpClosed);
assert_eq!(self.wr_state, State::TcpClosed);
Poll::Ready(Ok(()))
}
}
#[derive(Debug)]
pub struct ReadHalf {
shared: Arc<Shared>,
}
impl ReadHalf {
pub fn reunite(self, wr: WriteHalf) -> TlsStream {
assert!(Arc::ptr_eq(&self.shared, &wr.shared));
drop(wr); // Drop `wr`, so only one strong reference to `shared` remains.
Arc::try_unwrap(self.shared)
.unwrap_or_else(|_| panic!("Arc::<Shared>::try_unwrap() failed"))
.tls_stream
.into_inner()
}
}
impl AsyncRead for ReadHalf {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
self
.shared
.poll_with_shared_waker(cx, Flow::Read, move |tls, cx| {
tls.poll_read(cx, buf)
})
}
}
#[derive(Debug)]
pub struct WriteHalf {
shared: Arc<Shared>,
}
impl WriteHalf {
pub async fn handshake(&mut self) -> io::Result<()> {
poll_fn(|cx| {
self
.shared
.poll_with_shared_waker(cx, Flow::Write, |mut tls, cx| {
tls.poll_handshake(cx)
})
})
.await
}
}
impl AsyncWrite for WriteHalf {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self
.shared
.poll_with_shared_waker(cx, Flow::Write, move |tls, cx| {
tls.poll_write(cx, buf)
})
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
self
.shared
.poll_with_shared_waker(cx, Flow::Write, |tls, cx| tls.poll_flush(cx))
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
self
.shared
.poll_with_shared_waker(cx, Flow::Write, |tls, cx| tls.poll_shutdown(cx))
}
}
#[derive(Debug)]
struct Shared {
tls_stream: Mutex<TlsStream>,
rd_waker: AtomicWaker,
wr_waker: AtomicWaker,
}
impl Shared {
fn new(tls_stream: TlsStream) -> Arc<Self> {
let self_ = Self {
tls_stream: Mutex::new(tls_stream),
rd_waker: AtomicWaker::new(),
wr_waker: AtomicWaker::new(),
};
Arc::new(self_)
}
fn poll_with_shared_waker<R>(
self: &Arc<Self>,
cx: &mut Context<'_>,
flow: Flow,
mut f: impl FnMut(Pin<&mut TlsStream>, &mut Context<'_>) -> R,
) -> R {
match flow {
Flow::Handshake => unreachable!(),
Flow::Read => self.rd_waker.register(cx.waker()),
Flow::Write => self.wr_waker.register(cx.waker()),
}
let shared_waker = self.new_shared_waker();
let mut cx = Context::from_waker(&shared_waker);
let mut tls_stream = self.tls_stream.lock();
f(Pin::new(&mut tls_stream), &mut cx)
}
const SHARED_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
Self::clone_shared_waker,
Self::wake_shared_waker,
Self::wake_shared_waker_by_ref,
Self::drop_shared_waker,
);
fn new_shared_waker(self: &Arc<Self>) -> Waker {
let self_weak = Arc::downgrade(self);
let self_ptr = self_weak.into_raw() as *const ();
let raw_waker = RawWaker::new(self_ptr, &Self::SHARED_WAKER_VTABLE);
unsafe { Waker::from_raw(raw_waker) }
}
fn clone_shared_waker(self_ptr: *const ()) -> RawWaker {
let self_weak = unsafe { Weak::from_raw(self_ptr as *const Self) };
let ptr1 = self_weak.clone().into_raw();
let ptr2 = self_weak.into_raw();
assert!(ptr1 == ptr2);
RawWaker::new(self_ptr, &Self::SHARED_WAKER_VTABLE)
}
fn wake_shared_waker(self_ptr: *const ()) {
Self::wake_shared_waker_by_ref(self_ptr);
Self::drop_shared_waker(self_ptr);
}
fn wake_shared_waker_by_ref(self_ptr: *const ()) {
let self_weak = unsafe { Weak::from_raw(self_ptr as *const Self) };
if let Some(self_arc) = Weak::upgrade(&self_weak) {
self_arc.rd_waker.wake();
self_arc.wr_waker.wake();
}
self_weak.into_raw();
}
fn drop_shared_waker(self_ptr: *const ()) {
let _ = unsafe { Weak::from_raw(self_ptr as *const Self) };
}
}
struct ImplementReadTrait<'a, T>(&'a mut T);
impl Read for ImplementReadTrait<'_, TcpStream> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.0.try_read(buf)
}
}
struct ImplementWriteTrait<'a, T>(&'a mut T);
impl Write for ImplementWriteTrait<'_, TcpStream> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match self.0.try_write(buf) {
Ok(n) => Ok(n),
Err(err) if err.kind() == ErrorKind::WouldBlock => Ok(0),
Err(err) => Err(err),
}
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}
pub fn init<P: NetPermissions + 'static>() -> Vec<OpPair> {
vec![
("op_start_tls", op_async(op_start_tls::<P>)),
("op_connect_tls", op_async(op_connect_tls::<P>)),
("op_listen_tls", op_sync(op_listen_tls::<P>)),
("op_accept_tls", op_async(op_accept_tls)),
("op_tls_handshake", op_async(op_tls_handshake)),
]
}
#[derive(Debug)]
pub struct TlsStreamResource {
rd: AsyncRefCell<ReadHalf>,
wr: AsyncRefCell<WriteHalf>,
handshake_done: Cell<bool>,
cancel_handle: CancelHandle, // Only read and handshake ops get canceled.
}
impl TlsStreamResource {
pub fn new((rd, wr): (ReadHalf, WriteHalf)) -> Self {
Self {
rd: rd.into(),
wr: wr.into(),
handshake_done: Cell::new(false),
cancel_handle: Default::default(),
}
}
pub fn into_inner(self) -> (ReadHalf, WriteHalf) {
(self.rd.into_inner(), self.wr.into_inner())
}
pub async fn read(
self: &Rc<Self>,
buf: &mut [u8],
) -> Result<usize, AnyError> {
let mut rd = RcRef::map(self, |r| &r.rd).borrow_mut().await;
let cancel_handle = RcRef::map(self, |r| &r.cancel_handle);
let nread = rd.read(buf).try_or_cancel(cancel_handle).await?;
Ok(nread)
}
pub async fn write(self: &Rc<Self>, buf: &[u8]) -> Result<usize, AnyError> {
self.handshake().await?;
let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await;
let nwritten = wr.write(buf).await?;
wr.flush().await?;
Ok(nwritten)
}
pub async fn shutdown(self: &Rc<Self>) -> Result<(), AnyError> {
self.handshake().await?;
let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await;
wr.shutdown().await?;
Ok(())
}
pub async fn handshake(self: &Rc<Self>) -> Result<(), AnyError> {
if !self.handshake_done.get() {
let mut wr = RcRef::map(self, |r| &r.wr).borrow_mut().await;
let cancel_handle = RcRef::map(self, |r| &r.cancel_handle);
wr.handshake().try_or_cancel(cancel_handle).await?;
self.handshake_done.set(true);
}
Ok(())
}
}
impl Resource for TlsStreamResource {
fn name(&self) -> Cow<str> {
"tlsStream".into()
}
fn close(self: Rc<Self>) {
self.cancel_handle.cancel();
}
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ConnectTlsArgs {
transport: String,
hostname: String,
port: u16,
cert_file: Option<String>,
ca_certs: Vec<String>,
cert_chain: Option<String>,
private_key: Option<String>,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct StartTlsArgs {
rid: ResourceId,
cert_file: Option<String>,
ca_certs: Vec<String>,
hostname: String,
}
async fn op_start_tls<NP>(
state: Rc<RefCell<OpState>>,
args: StartTlsArgs,
_: (),
) -> Result<OpConn, AnyError>
where
NP: NetPermissions + 'static,
{
let rid = args.rid;
let hostname = match &*args.hostname {
"" => "localhost",
n => n,
};
let cert_file = args.cert_file.as_deref();
{
super::check_unstable2(&state, "Deno.startTls");
let mut s = state.borrow_mut();
let permissions = s.borrow_mut::<NP>();
permissions.check_net(&(hostname, Some(0)))?;
if let Some(path) = cert_file {
permissions.check_read(Path::new(path))?;
}
}
let mut ca_certs = args
.ca_certs
.into_iter()
.map(|s| s.into_bytes())
.collect::<Vec<_>>();
if let Some(path) = cert_file {
let mut buf = Vec::new();
File::open(path)?.read_to_end(&mut buf)?;
ca_certs.push(buf);
};
let hostname_dns = DNSNameRef::try_from_ascii_str(hostname)
.map_err(|_| invalid_hostname(hostname))?;
let unsafely_ignore_certificate_errors = state
.borrow()
.try_borrow::<UnsafelyIgnoreCertificateErrors>()
.and_then(|it| it.0.clone());
// TODO(@justinmchase): Ideally the certificate store is created once
// and not cloned. The store should be wrapped in Arc<T> to reduce
// copying memory unnecessarily.
let root_cert_store = state
.borrow()
.borrow::<DefaultTlsOptions>()
.root_cert_store
.clone();
let resource_rc = state
.borrow_mut()
.resource_table
.take::<TcpStreamResource>(rid)?;
let resource = Rc::try_unwrap(resource_rc)
.expect("Only a single use of this resource should happen");
let (read_half, write_half) = resource.into_inner();
let tcp_stream = read_half.reunite(write_half)?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let tls_config = Arc::new(create_client_config(
root_cert_store,
ca_certs,
unsafely_ignore_certificate_errors,
)?);
let tls_stream =
TlsStream::new_client_side(tcp_stream, &tls_config, hostname_dns);
let rid = {
let mut state_ = state.borrow_mut();
state_
.resource_table
.add(TlsStreamResource::new(tls_stream.into_split()))
};
Ok(OpConn {
rid,
local_addr: Some(OpAddr::Tcp(IpAddr {
hostname: local_addr.ip().to_string(),
port: local_addr.port(),
})),
remote_addr: Some(OpAddr::Tcp(IpAddr {
hostname: remote_addr.ip().to_string(),
port: remote_addr.port(),
})),
})
}
pub async fn op_connect_tls<NP>(
state: Rc<RefCell<OpState>>,
args: ConnectTlsArgs,
_: (),
) -> Result<OpConn, AnyError>
where
NP: NetPermissions + 'static,
{
assert_eq!(args.transport, "tcp");
let hostname = match &*args.hostname {
"" => "localhost",
n => n,
};
let port = args.port;
let cert_file = args.cert_file.as_deref();
let unsafely_ignore_certificate_errors = state
.borrow()
.try_borrow::<UnsafelyIgnoreCertificateErrors>()
.and_then(|it| it.0.clone());
if args.cert_chain.is_some() {
super::check_unstable2(&state, "ConnectTlsOptions.certChain");
}
if args.private_key.is_some() {
super::check_unstable2(&state, "ConnectTlsOptions.privateKey");
}
{
let mut s = state.borrow_mut();
let permissions = s.borrow_mut::<NP>();
permissions.check_net(&(hostname, Some(port)))?;
if let Some(path) = cert_file {
permissions.check_read(Path::new(path))?;
}
}
let mut ca_certs = args
.ca_certs
.into_iter()
.map(|s| s.into_bytes())
.collect::<Vec<_>>();
if let Some(path) = cert_file {
let mut buf = Vec::new();
File::open(path)?.read_to_end(&mut buf)?;
ca_certs.push(buf);
};
let root_cert_store = state
.borrow()
.borrow::<DefaultTlsOptions>()
.root_cert_store
.clone();
let hostname_dns = DNSNameRef::try_from_ascii_str(hostname)
.map_err(|_| invalid_hostname(hostname))?;
let connect_addr = resolve_addr(hostname, port)
.await?
.next()
.ok_or_else(|| generic_error("No resolved address found"))?;
let tcp_stream = TcpStream::connect(connect_addr).await?;
let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?;
let mut tls_config = create_client_config(
root_cert_store,
ca_certs,
unsafely_ignore_certificate_errors,
)?;
if args.cert_chain.is_some() || args.private_key.is_some() {
let cert_chain = args
.cert_chain
.ok_or_else(|| type_error("No certificate chain provided"))?;
let private_key = args
.private_key
.ok_or_else(|| type_error("No private key provided"))?;
// The `remove` is safe because load_private_keys checks that there is at least one key.
let private_key = load_private_keys(private_key.as_bytes())?.remove(0);
tls_config.set_single_client_cert(
load_certs(&mut cert_chain.as_bytes())?,
private_key,
)?;
}
let tls_config = Arc::new(tls_config);
let tls_stream =
TlsStream::new_client_side(tcp_stream, &tls_config, hostname_dns);
let rid = {
let mut state_ = state.borrow_mut();
state_
.resource_table
.add(TlsStreamResource::new(tls_stream.into_split()))
};
Ok(OpConn {
rid,
local_addr: Some(OpAddr::Tcp(IpAddr {
hostname: local_addr.ip().to_string(),
port: local_addr.port(),
})),
remote_addr: Some(OpAddr::Tcp(IpAddr {
hostname: remote_addr.ip().to_string(),
port: remote_addr.port(),
})),
})
}
fn load_certs_from_file(path: &str) -> Result<Vec<Certificate>, AnyError> {
let cert_file = File::open(path)?;
let reader = &mut BufReader::new(cert_file);
load_certs(reader)
}
fn load_private_keys_from_file(
path: &str,
) -> Result<Vec<PrivateKey>, AnyError> {
let key_bytes = std::fs::read(path)?;
load_private_keys(&key_bytes)
}
pub struct TlsListenerResource {
tcp_listener: AsyncRefCell<TcpListener>,
tls_config: Arc<ServerConfig>,
cancel_handle: CancelHandle,
}
impl Resource for TlsListenerResource {
fn name(&self) -> Cow<str> {
"tlsListener".into()
}
fn close(self: Rc<Self>) {
self.cancel_handle.cancel();
}
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ListenTlsArgs {
transport: String,
hostname: String,
port: u16,
cert_file: String,
key_file: String,
alpn_protocols: Option<Vec<String>>,
}
fn op_listen_tls<NP>(
state: &mut OpState,
args: ListenTlsArgs,
_: (),
) -> Result<OpConn, AnyError>
where
NP: NetPermissions + 'static,
{
assert_eq!(args.transport, "tcp");
let hostname = &*args.hostname;
let port = args.port;
let cert_file = &*args.cert_file;
let key_file = &*args.key_file;
{
let permissions = state.borrow_mut::<NP>();
permissions.check_net(&(hostname, Some(port)))?;
permissions.check_read(Path::new(cert_file))?;
permissions.check_read(Path::new(key_file))?;
}
let mut tls_config = ServerConfig::new(NoClientAuth::new());
if let Some(alpn_protocols) = args.alpn_protocols {
super::check_unstable(state, "Deno.listenTls#alpn_protocols");
tls_config.alpn_protocols =
alpn_protocols.into_iter().map(|s| s.into_bytes()).collect();
}
tls_config
.set_single_cert(
load_certs_from_file(cert_file)?,
load_private_keys_from_file(key_file)?.remove(0),
)
.expect("invalid key or certificate");
let bind_addr = resolve_addr_sync(hostname, port)?
.next()
.ok_or_else(|| generic_error("No resolved address found"))?;
let std_listener = std::net::TcpListener::bind(bind_addr)?;
std_listener.set_nonblocking(true)?;
let tcp_listener = TcpListener::from_std(std_listener)?;
let local_addr = tcp_listener.local_addr()?;
let tls_listener_resource = TlsListenerResource {
tcp_listener: AsyncRefCell::new(tcp_listener),
tls_config: Arc::new(tls_config),
cancel_handle: Default::default(),
};
let rid = state.resource_table.add(tls_listener_resource);
Ok(OpConn {
rid,
local_addr: Some(OpAddr::Tcp(IpAddr {
hostname: local_addr.ip().to_string(),
port: local_addr.port(),
})),
remote_addr: None,
})
}
async fn op_accept_tls(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
_: (),
) -> Result<OpConn, AnyError> {
let resource = state
.borrow()
.resource_table
.get::<TlsListenerResource>(rid)
.map_err(|_| bad_resource("Listener has been closed"))?;
let cancel_handle = RcRef::map(&resource, |r| &r.cancel_handle);
let tcp_listener = RcRef::map(&resource, |r| &r.tcp_listener)
.try_borrow_mut()
.ok_or_else(|| custom_error("Busy", "Another accept task is ongoing"))?;
let (tcp_stream, remote_addr) =
match tcp_listener.accept().try_or_cancel(&cancel_handle).await {
Ok(tuple) => tuple,
Err(err) if err.kind() == ErrorKind::Interrupted => {
// FIXME(bartlomieju): compatibility with current JS implementation.
return Err(bad_resource("Listener has been closed"));
}
Err(err) => return Err(err.into()),
};
let local_addr = tcp_stream.local_addr()?;
let tls_stream = TlsStream::new_server_side(tcp_stream, &resource.tls_config);
let rid = {
let mut state_ = state.borrow_mut();
state_
.resource_table
.add(TlsStreamResource::new(tls_stream.into_split()))
};
Ok(OpConn {
rid,
local_addr: Some(OpAddr::Tcp(IpAddr {
hostname: local_addr.ip().to_string(),
port: local_addr.port(),
})),
remote_addr: Some(OpAddr::Tcp(IpAddr {
hostname: remote_addr.ip().to_string(),
port: remote_addr.port(),
})),
})
}
async fn op_tls_handshake(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
_: (),
) -> Result<(), AnyError> {
let resource = state
.borrow()
.resource_table
.get::<TlsStreamResource>(rid)?;
resource.handshake().await
}