diff --git a/cli/ops/io.rs b/cli/ops/io.rs index 1d832a70e4..8f69fd4440 100644 --- a/cli/ops/io.rs +++ b/cli/ops/io.rs @@ -7,14 +7,12 @@ use crate::state::ThreadSafeState; use deno::ErrBox; use deno::Resource; use deno::*; -use futures; use futures::future::FutureExt; -use std; +use futures::ready; use std::future::Future; use std::pin::Pin; use std::task::Context; use std::task::Poll; -use tokio; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; use tokio_rustls::client::TlsStream as ClientTlsStream; @@ -94,7 +92,7 @@ impl Resource for StreamResource {} /// but uses an `ErrBox` error instead of `std::io:Error` pub trait DenoAsyncRead { fn poll_read( - self: Pin<&mut Self>, + &mut self, cx: &mut Context, buf: &mut [u8], ) -> Poll>; @@ -102,32 +100,25 @@ pub trait DenoAsyncRead { impl DenoAsyncRead for StreamResource { fn poll_read( - self: Pin<&mut Self>, + &mut self, cx: &mut Context, buf: &mut [u8], ) -> Poll> { - let inner = self.get_mut(); - let mut f: Box = match inner { - StreamResource::FsFile(f) => Box::new(f), - StreamResource::Stdin(f) => Box::new(f), - StreamResource::TcpStream(f) => Box::new(f), - StreamResource::ClientTlsStream(f) => Box::new(f), - StreamResource::ServerTlsStream(f) => Box::new(f), - StreamResource::ChildStdout(f) => Box::new(f), - StreamResource::ChildStderr(f) => Box::new(f), - StreamResource::HttpBody(f) => Box::new(f), - _ => { - return Poll::Ready(Err(bad_resource())); - } + use StreamResource::*; + let mut f: Pin> = match self { + FsFile(f) => Box::pin(f), + Stdin(f) => Box::pin(f), + TcpStream(f) => Box::pin(f), + ClientTlsStream(f) => Box::pin(f), + ServerTlsStream(f) => Box::pin(f), + ChildStdout(f) => Box::pin(f), + ChildStderr(f) => Box::pin(f), + HttpBody(f) => Box::pin(f), + _ => return Err(bad_resource()).into(), }; - let r = AsyncRead::poll_read(Pin::new(&mut f), cx, buf); - - match r { - Poll::Ready(Err(e)) => Poll::Ready(Err(ErrBox::from(e))), - Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)), - Poll::Pending => Poll::Pending, - } + let v = ready!(f.as_mut().poll_read(cx, buf))?; + Ok(v).into() } } @@ -182,15 +173,7 @@ where let resource = table .get_mut::(inner.rid) .ok_or_else(bad_resource)?; - let nread = match DenoAsyncRead::poll_read( - Pin::new(resource), - cx, - &mut inner.buf.as_mut()[..], - ) { - Poll::Ready(Ok(v)) => v, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - Poll::Pending => return Poll::Pending, - }; + let nread = ready!(resource.poll_read(cx, &mut inner.buf.as_mut()[..]))?; inner.io_state = IoState::Done; Poll::Ready(Ok(nread as i32)) } @@ -218,82 +201,56 @@ pub fn op_read( /// but uses an `ErrBox` error instead of `std::io:Error` pub trait DenoAsyncWrite { fn poll_write( - self: Pin<&mut Self>, + &mut self, cx: &mut Context, buf: &[u8], ) -> Poll>; - fn poll_close( - self: Pin<&mut Self>, - cx: &mut Context, - ) -> Poll>; + fn poll_close(&mut self, cx: &mut Context) -> Poll>; - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut Context, - ) -> Poll>; + fn poll_flush(&mut self, cx: &mut Context) -> Poll>; } impl DenoAsyncWrite for StreamResource { fn poll_write( - self: Pin<&mut Self>, + &mut self, cx: &mut Context, buf: &[u8], ) -> Poll> { - let inner = self.get_mut(); - let mut f: Box = match inner { - StreamResource::FsFile(f) => Box::new(f), - StreamResource::Stdout(f) => Box::new(f), - StreamResource::Stderr(f) => Box::new(f), - StreamResource::TcpStream(f) => Box::new(f), - StreamResource::ClientTlsStream(f) => Box::new(f), - StreamResource::ServerTlsStream(f) => Box::new(f), - StreamResource::ChildStdin(f) => Box::new(f), - _ => { - return Poll::Ready(Err(bad_resource())); - } + use StreamResource::*; + let mut f: Pin> = match self { + FsFile(f) => Box::pin(f), + Stdout(f) => Box::pin(f), + Stderr(f) => Box::pin(f), + TcpStream(f) => Box::pin(f), + ClientTlsStream(f) => Box::pin(f), + ServerTlsStream(f) => Box::pin(f), + ChildStdin(f) => Box::pin(f), + _ => return Err(bad_resource()).into(), }; - let r = AsyncWrite::poll_write(Pin::new(&mut f), cx, buf); - - match r { - Poll::Ready(Err(e)) => Poll::Ready(Err(ErrBox::from(e))), - Poll::Ready(Ok(v)) => Poll::Ready(Ok(v)), - Poll::Pending => Poll::Pending, - } + let v = ready!(f.as_mut().poll_write(cx, buf))?; + Ok(v).into() } - fn poll_flush( - self: Pin<&mut Self>, - cx: &mut Context, - ) -> Poll> { - let inner = self.get_mut(); - let mut f: Box = match inner { - StreamResource::FsFile(f) => Box::new(f), - StreamResource::Stdout(f) => Box::new(f), - StreamResource::Stderr(f) => Box::new(f), - StreamResource::TcpStream(f) => Box::new(f), - StreamResource::ClientTlsStream(f) => Box::new(f), - StreamResource::ServerTlsStream(f) => Box::new(f), - StreamResource::ChildStdin(f) => Box::new(f), - _ => { - return Poll::Ready(Err(bad_resource())); - } + fn poll_flush(&mut self, cx: &mut Context) -> Poll> { + use StreamResource::*; + let mut f: Pin> = match self { + FsFile(f) => Box::pin(f), + Stdout(f) => Box::pin(f), + Stderr(f) => Box::pin(f), + TcpStream(f) => Box::pin(f), + ClientTlsStream(f) => Box::pin(f), + ServerTlsStream(f) => Box::pin(f), + ChildStdin(f) => Box::pin(f), + _ => return Err(bad_resource()).into(), }; - let r = AsyncWrite::poll_flush(Pin::new(&mut f), cx); - - match r { - Poll::Ready(Err(e)) => Poll::Ready(Err(ErrBox::from(e))), - Poll::Ready(Ok(_)) => Poll::Ready(Ok(())), - Poll::Pending => Poll::Pending, - } + ready!(f.as_mut().poll_flush(cx))?; + Ok(()).into() } - fn poll_close( - self: Pin<&mut Self>, - _cx: &mut Context, - ) -> Poll> { + fn poll_close(&mut self, _cx: &mut Context) -> Poll> { unimplemented!() } } @@ -345,15 +302,7 @@ where .get_mut::(inner.rid) .ok_or_else(bad_resource)?; - let nwritten = match DenoAsyncWrite::poll_write( - Pin::new(resource), - cx, - inner.buf.as_ref(), - ) { - Poll::Ready(Ok(v)) => v, - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - Poll::Pending => return Poll::Pending, - }; + let nwritten = ready!(resource.poll_write(cx, inner.buf.as_ref()))?; inner.io_state = IoState::Flush; inner.nwritten = nwritten as i32; } @@ -367,13 +316,8 @@ where let resource = table .get_mut::(inner.rid) .ok_or_else(bad_resource)?; - match DenoAsyncWrite::poll_flush(Pin::new(resource), cx) { - Poll::Ready(Ok(_)) => { - inner.io_state = IoState::Done; - } - Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), - Poll::Pending => return Poll::Pending, - }; + ready!(resource.poll_flush(cx))?; + inner.io_state = IoState::Done; } Poll::Ready(Ok(inner.nwritten))