From b1579460ced238820c9d3e3d8343d17a8c4b4e25 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Fri, 26 Oct 2018 12:14:06 -0400 Subject: [PATCH] Support streaming response bodies from fetch() Also Buffer.readFrom in fetch() to buffer response. --- js/deno.ts | 2 +- js/dom_types.ts | 6 +- js/fetch.ts | 177 ++++++++++++++++++++++++++++++----------------- js/io.ts | 2 +- src/msg.fbs | 2 +- src/ops.rs | 15 ++-- src/resources.rs | 15 ++++ 7 files changed, 141 insertions(+), 78 deletions(-) diff --git a/js/deno.ts b/js/deno.ts index bced9075ec..afcde70339 100644 --- a/js/deno.ts +++ b/js/deno.ts @@ -13,7 +13,7 @@ export { Writer, Closer, Seeker, - ReaderCloser, + ReadCloser, WriteCloser, ReadSeeker, WriteSeeker, diff --git a/js/dom_types.ts b/js/dom_types.ts index 19a3d5fe2a..22f704ee4d 100644 --- a/js/dom_types.ts +++ b/js/dom_types.ts @@ -225,7 +225,7 @@ interface AbortSignal extends EventTarget { ): void; } -interface ReadableStream { +export interface ReadableStream { readonly locked: boolean; cancel(): Promise; getReader(): ReadableStreamReader; @@ -235,7 +235,7 @@ interface EventListenerObject { handleEvent(evt: Event): void; } -interface ReadableStreamReader { +export interface ReadableStreamReader { cancel(): Promise; // tslint:disable-next-line:no-any read(): Promise; @@ -270,7 +270,7 @@ export interface Blob { slice(start?: number, end?: number, contentType?: string): Blob; } -interface Body { +export interface Body { /** A simple getter used to expose a `ReadableStream` of the body contents. */ readonly body: ReadableStream | null; /** Stores a `Boolean` that declares whether the body has been used in a diff --git a/js/fetch.ts b/js/fetch.ts index 16172b776f..bd50cdfdca 100644 --- a/js/fetch.ts +++ b/js/fetch.ts @@ -1,12 +1,5 @@ // Copyright 2018 the Deno authors. All rights reserved. MIT license. -import { - assert, - log, - createResolvable, - Resolvable, - typedArrayToArrayBuffer, - notImplemented -} from "./util"; +import { assert, log, createResolvable, notImplemented } from "./util"; import * as flatbuffers from "./flatbuffers"; import { sendAsync } from "./dispatch"; import * as msg from "gen/msg_generated"; @@ -14,52 +7,65 @@ import * as domTypes from "./dom_types"; import { TextDecoder } from "./text_encoding"; import { DenoBlob } from "./blob"; import { Headers } from "./headers"; +import * as io from "./io"; +import { read, close } from "./files"; +import { Buffer } from "./buffer"; -class FetchResponse implements domTypes.Response { - readonly url: string = ""; - body: null; - bodyUsed = false; // TODO - statusText = "FIXME"; // TODO - readonly type = "basic"; // TODO - redirected = false; // TODO - headers: domTypes.Headers; - readonly trailer: Promise; - //private bodyChunks: Uint8Array[] = []; - private first = true; - private bodyData: ArrayBuffer; - private bodyWaiter: Resolvable; +class Body implements domTypes.Body, domTypes.ReadableStream, io.ReadCloser { + bodyUsed = false; + private _bodyPromise: null | Promise = null; + private _data: ArrayBuffer | null = null; + readonly locked: boolean = false; // TODO + readonly body: null | Body = this; - constructor( - readonly status: number, - readonly body_: ArrayBuffer, - headersList: Array<[string, string]> - ) { - this.bodyWaiter = createResolvable(); - this.trailer = createResolvable(); - this.headers = new Headers(headersList); - this.bodyData = body_; - setTimeout(() => { - this.bodyWaiter.resolve(body_); - }, 0); + constructor(private rid: number, readonly contentType: string) {} + + private async _bodyBuffer(): Promise { + assert(this._bodyPromise == null); + const buf = new Buffer(); + try { + const nread = await buf.readFrom(this); + const ui8 = buf.bytes(); + assert(ui8.byteLength === nread); + this._data = ui8.buffer.slice( + ui8.byteOffset, + ui8.byteOffset + nread + ) as ArrayBuffer; + assert(this._data.byteLength === nread); + } finally { + this.close(); + } + + return this._data; } - arrayBuffer(): Promise { - return this.bodyWaiter; + async arrayBuffer(): Promise { + // If we've already bufferred the response, just return it. + if (this._data != null) { + return this._data; + } + + // If there is no _bodyPromise yet, start it. + if (this._bodyPromise == null) { + this._bodyPromise = this._bodyBuffer(); + } + + return this._bodyPromise; } async blob(): Promise { const arrayBuffer = await this.arrayBuffer(); return new DenoBlob([arrayBuffer], { - type: this.headers.get("content-type") || "" + type: this.contentType }); } async formData(): Promise { - notImplemented(); - return {} as domTypes.FormData; + return notImplemented(); } - async json(): Promise { + // tslint:disable-next-line:no-any + async json(): Promise { const text = await this.text(); return JSON.parse(text); } @@ -70,6 +76,71 @@ class FetchResponse implements domTypes.Response { return decoder.decode(ab); } + read(p: Uint8Array): Promise { + return read(this.rid, p); + } + + close(): void { + close(this.rid); + } + + async cancel(): Promise { + return notImplemented(); + } + + getReader(): domTypes.ReadableStreamReader { + return notImplemented(); + } +} + +class Response implements domTypes.Response { + readonly url: string = ""; + statusText = "FIXME"; // TODO + readonly type = "basic"; // TODO + redirected = false; // TODO + headers: domTypes.Headers; + readonly trailer: Promise; + bodyUsed = false; + readonly body: Body; + + constructor( + readonly status: number, + headersList: Array<[string, string]>, + rid: number, + body_: null | Body = null + ) { + this.trailer = createResolvable(); + this.headers = new Headers(headersList); + const contentType = this.headers.get("content-type") || ""; + + if (body_ == null) { + this.body = new Body(rid, contentType); + } else { + this.body = body_; + } + } + + async arrayBuffer(): Promise { + return this.body.arrayBuffer(); + } + + async blob(): Promise { + return this.body.blob(); + } + + async formData(): Promise { + return this.body.formData(); + } + + // tslint:disable-next-line:no-any + async json(): Promise { + return this.body.json(); + } + + async text(): Promise { + return this.body.text(); + } + get ok(): boolean { return 200 <= this.status && this.status < 300; } @@ -87,25 +158,7 @@ class FetchResponse implements domTypes.Response { headersList.push(header); } - return new FetchResponse(this.status, this.bodyData.slice(0), headersList); - } - - onHeader?: (res: FetchResponse) => void; - onError?: (error: Error) => void; - - onMsg(base: msg.Base) { - /* - const error = base.error(); - if (error != null) { - assert(this.onError != null); - this.onError!(new Error(error)); - return; - } - */ - - if (this.first) { - this.first = false; - } + return new Response(this.status, headersList, -1, this.body); } } @@ -113,7 +166,7 @@ class FetchResponse implements domTypes.Response { export async function fetch( input?: domTypes.Request | string, init?: domTypes.RequestInit -): Promise { +): Promise { const url = input as string; log("dispatch FETCH_REQ", url); @@ -134,9 +187,7 @@ export async function fetch( assert(resBase.inner(inner) != null); const status = inner.status(); - const bodyArray = inner.bodyArray(); - assert(bodyArray != null); - const body = typedArrayToArrayBuffer(bodyArray!); + const bodyRid = inner.bodyRid(); const headersList: Array<[string, string]> = []; const len = inner.headerKeyLength(); @@ -146,6 +197,6 @@ export async function fetch( headersList.push([key, value]); } - const response = new FetchResponse(status, body, headersList); + const response = new Response(status, headersList, bodyRid); return response; } diff --git a/js/io.ts b/js/io.ts index 10908b7244..735be19a1a 100644 --- a/js/io.ts +++ b/js/io.ts @@ -77,7 +77,7 @@ export interface Seeker { } // https://golang.org/pkg/io/#ReadCloser -export interface ReaderCloser extends Reader, Closer {} +export interface ReadCloser extends Reader, Closer {} // https://golang.org/pkg/io/#WriteCloser export interface WriteCloser extends Writer, Closer {} diff --git a/src/msg.fbs b/src/msg.fbs index d5ab926dd7..7fdc739465 100644 --- a/src/msg.fbs +++ b/src/msg.fbs @@ -206,7 +206,7 @@ table FetchRes { status: int; header_key: [string]; header_value: [string]; - body: [ubyte]; + body_rid: uint32; } table MakeTempDir { diff --git a/src/ops.rs b/src/ops.rs index b07c613858..ef03630f5e 100644 --- a/src/ops.rs +++ b/src/ops.rs @@ -18,7 +18,7 @@ use futures; use futures::future::poll_fn; use futures::Poll; use hyper; -use hyper::rt::{Future, Stream}; +use hyper::rt::Future; use remove_dir_all::remove_dir_all; use repl; use resources::table_entries; @@ -417,20 +417,17 @@ fn op_fetch( (keys, values) }; - // TODO Handle streaming body. - res - .into_body() - .concat2() - .map(move |body| (status, body, headers)) + let body = res.into_body(); + let body_resource = resources::add_hyper_body(body); + Ok((status, headers, body_resource)) }); let future = future.map_err(|err| -> DenoError { err.into() }).and_then( - move |(status, body, headers)| { + move |(status, headers, body_resource)| { debug!("fetch body "); let builder = &mut FlatBufferBuilder::new(); // Send the first message without a body. This is just to indicate // what status code. - let body_off = builder.create_vector(body.as_ref()); let header_keys: Vec<&str> = headers.0.iter().map(|s| &**s).collect(); let header_keys_off = builder.create_vector_of_strings(header_keys.as_slice()); @@ -443,7 +440,7 @@ fn op_fetch( &msg::FetchResArgs { id, status, - body: Some(body_off), + body_rid: body_resource.rid, header_key: Some(header_keys_off), header_value: Some(header_values_off), }, diff --git a/src/resources.rs b/src/resources.rs index 08de098897..90b7ce772b 100644 --- a/src/resources.rs +++ b/src/resources.rs @@ -13,6 +13,7 @@ use eager_unix as eager; use errors::bad_resource; use errors::DenoError; use errors::DenoResult; +use http_body::HttpBody; use repl::Repl; use tokio_util; use tokio_write; @@ -20,6 +21,7 @@ use tokio_write; use futures; use futures::future::{Either, FutureResult}; use futures::Poll; +use hyper; use std; use std::collections::HashMap; use std::io::{Error, Read, Write}; @@ -59,6 +61,7 @@ enum Repr { FsFile(tokio::fs::File), TcpListener(tokio::net::TcpListener), TcpStream(tokio::net::TcpStream), + HttpBody(HttpBody), Repl(Repl), } @@ -89,6 +92,7 @@ fn inspect_repr(repr: &Repr) -> String { Repr::FsFile(_) => "fsFile", Repr::TcpListener(_) => "tcpListener", Repr::TcpStream(_) => "tcpStream", + Repr::HttpBody(_) => "httpBody", Repr::Repl(_) => "repl", }; @@ -155,6 +159,7 @@ impl AsyncRead for Resource { Repr::FsFile(ref mut f) => f.poll_read(buf), Repr::Stdin(ref mut f) => f.poll_read(buf), Repr::TcpStream(ref mut f) => f.poll_read(buf), + Repr::HttpBody(ref mut f) => f.poll_read(buf), _ => panic!("Cannot read"), }, } @@ -222,6 +227,15 @@ pub fn add_tcp_stream(stream: tokio::net::TcpStream) -> Resource { Resource { rid } } +pub fn add_hyper_body(body: hyper::Body) -> Resource { + let rid = new_rid(); + let mut tg = RESOURCE_TABLE.lock().unwrap(); + let body = HttpBody::from(body); + let r = tg.insert(rid, Repr::HttpBody(body)); + assert!(r.is_none()); + Resource { rid } +} + pub fn add_repl(repl: Repl) -> Resource { let rid = new_rid(); let mut tg = RESOURCE_TABLE.lock().unwrap(); @@ -243,6 +257,7 @@ pub fn readline(rid: ResourceId, prompt: &str) -> DenoResult { } pub fn lookup(rid: ResourceId) -> Option { + debug!("resource lookup {}", rid); let table = RESOURCE_TABLE.lock().unwrap(); table.get(&rid).map(|_| Resource { rid }) }