diff --git a/cli/js/body.ts b/cli/js/body.ts index e00cd30b94..a67655ba8b 100644 --- a/cli/js/body.ts +++ b/cli/js/body.ts @@ -83,11 +83,14 @@ function bufferFromStream(stream: ReadableStreamReader): Promise { .then( ({ done, value }): void => { if (done) { + stream.releaseLock(); return resolve(concatenate(...parts)); } if (typeof value === "string") { parts.push(encoder.encode(value)); + } else if (value instanceof Uint8Array) { + parts.push(value); } else if (value instanceof ArrayBuffer) { parts.push(new Uint8Array(value)); } else if (!value) { @@ -131,6 +134,7 @@ export const BodyUsedError = export class Body implements domTypes.Body { protected _stream: domTypes.ReadableStream | null; + protected _bodyUsed = false; constructor(protected _bodySource: BodySource, readonly contentType: string) { validateBodyType(this, _bodySource); @@ -160,14 +164,14 @@ export class Body implements domTypes.Body { } get bodyUsed(): boolean { - if (this.body && this.body.locked) { + if (this.body && this._bodyUsed) { return true; } return false; } public async blob(): Promise { - return new Blob([await this.arrayBuffer()]); + return new Blob([await this.arrayBuffer()], { type: this.contentType }); } // ref: https://fetch.spec.whatwg.org/#body-mixin @@ -314,6 +318,7 @@ export class Body implements domTypes.Body { } public async arrayBuffer(): Promise { + this._bodyUsed = true; if ( this._bodySource instanceof Int8Array || this._bodySource instanceof Int16Array || @@ -332,7 +337,6 @@ export class Body implements domTypes.Body { const enc = new TextEncoder(); return enc.encode(this._bodySource).buffer as ArrayBuffer; } else if (this._bodySource instanceof ReadableStream) { - // @ts-ignore return bufferFromStream(this._bodySource.getReader()); } else if (this._bodySource instanceof FormData) { const enc = new TextEncoder(); diff --git a/cli/js/fetch.ts b/cli/js/fetch.ts index 0a5f793a88..75ebed4d14 100644 --- a/cli/js/fetch.ts +++ b/cli/js/fetch.ts @@ -1,254 +1,95 @@ // Copyright 2018-2019 the Deno authors. All rights reserved. MIT license. -import { - assert, - createResolvable, - notImplemented, - isTypedArray -} from "./util.ts"; +import { createResolvable, notImplemented, isTypedArray } from "./util.ts"; +import * as body from "./body.ts"; import * as domTypes from "./dom_types.ts"; -import { TextDecoder, TextEncoder } from "./text_encoding.ts"; +import { TextEncoder } from "./text_encoding.ts"; import { DenoBlob, bytesSymbol as blobBytesSymbol } from "./blob.ts"; import { Headers } from "./headers.ts"; -import * as io from "./io.ts"; +import { EOF } from "./io.ts"; import { read, close } from "./files.ts"; -import { Buffer } from "./buffer.ts"; -import { FormData } from "./form_data.ts"; import { URLSearchParams } from "./url_search_params.ts"; import * as dispatch from "./dispatch.ts"; import { sendAsync } from "./dispatch_json.ts"; +import { ReadableStream } from "./streams/mod.ts"; -function getHeaderValueParams(value: string): Map { - const params = new Map(); - // Forced to do so for some Map constructor param mismatch - value - .split(";") - .slice(1) - .map((s): string[] => s.trim().split("=")) - .filter((arr): boolean => arr.length > 1) - .map(([k, v]): [string, string] => [k, v.replace(/^"([^"]*)"$/, "$1")]) - .forEach(([k, v]): Map => params.set(k, v)); - return params; +interface ReadableStreamController { + enqueue(chunk: string | ArrayBuffer): void; + close(): void; } -function hasHeaderValueOf(s: string, value: string): boolean { - return new RegExp(`^${value}[\t\s]*;?`).test(s); -} - -class Body implements domTypes.Body, domTypes.ReadableStream, io.ReadCloser { - private _bodyUsed = false; - private _bodyPromise: null | Promise = null; - private _data: ArrayBuffer | null = null; - readonly locked: boolean = false; // TODO - readonly body: null | Body = this; - - constructor(private rid: number, readonly contentType: string) {} - - private async _bodyBuffer(): Promise { - assert(this._bodyPromise == null); - const buf = new Buffer(); - try { - const nread = await buf.readFrom(this); - const ui8 = buf.bytes(); - assert(ui8.byteLength === nread); - this._data = ui8.buffer.slice( - ui8.byteOffset, - ui8.byteOffset + nread - ) as ArrayBuffer; - assert(this._data.byteLength === nread); - } finally { - this.close(); - } - - return this._data; +class UnderlyingRIDSource implements domTypes.UnderlyingSource { + constructor(private rid: number) { + this.rid = rid; } - async arrayBuffer(): Promise { - // If we've already bufferred the response, just return it. - if (this._data != null) { - return this._data; - } - - // If there is no _bodyPromise yet, start it. - if (this._bodyPromise == null) { - this._bodyPromise = this._bodyBuffer(); - } - - return this._bodyPromise; + start(controller: ReadableStreamController): Promise { + const buff: Uint8Array = new Uint8Array(32 * 1024); + const pump = (): Promise => { + return read(this.rid, buff).then(value => { + if (value == EOF) { + close(this.rid); + return controller.close(); + } + controller.enqueue(buff.slice(0, value)); + return pump(); + }); + }; + return pump(); } - async blob(): Promise { - const arrayBuffer = await this.arrayBuffer(); - return new DenoBlob([arrayBuffer], { - type: this.contentType - }); - } - - // ref: https://fetch.spec.whatwg.org/#body-mixin - async formData(): Promise { - const formData = new FormData(); - const enc = new TextEncoder(); - if (hasHeaderValueOf(this.contentType, "multipart/form-data")) { - const params = getHeaderValueParams(this.contentType); - if (!params.has("boundary")) { - // TypeError is required by spec - throw new TypeError("multipart/form-data must provide a boundary"); - } - // ref: https://tools.ietf.org/html/rfc2046#section-5.1 - const boundary = params.get("boundary")!; - const dashBoundary = `--${boundary}`; - const delimiter = `\r\n${dashBoundary}`; - const closeDelimiter = `${delimiter}--`; - - const body = await this.text(); - let bodyParts: string[]; - const bodyEpilogueSplit = body.split(closeDelimiter); - if (bodyEpilogueSplit.length < 2) { - bodyParts = []; - } else { - // discard epilogue - const bodyEpilogueTrimmed = bodyEpilogueSplit[0]; - // first boundary treated special due to optional prefixed \r\n - const firstBoundaryIndex = bodyEpilogueTrimmed.indexOf(dashBoundary); - if (firstBoundaryIndex < 0) { - throw new TypeError("Invalid boundary"); - } - const bodyPreambleTrimmed = bodyEpilogueTrimmed - .slice(firstBoundaryIndex + dashBoundary.length) - .replace(/^[\s\r\n\t]+/, ""); // remove transport-padding CRLF - // trimStart might not be available - // Be careful! body-part allows trailing \r\n! - // (as long as it is not part of `delimiter`) - bodyParts = bodyPreambleTrimmed - .split(delimiter) - .map((s): string => s.replace(/^[\s\r\n\t]+/, "")); - // TODO: LWSP definition is actually trickier, - // but should be fine in our case since without headers - // we should just discard the part - } - for (const bodyPart of bodyParts) { - const headers = new Headers(); - const headerOctetSeperatorIndex = bodyPart.indexOf("\r\n\r\n"); - if (headerOctetSeperatorIndex < 0) { - continue; // Skip unknown part - } - const headerText = bodyPart.slice(0, headerOctetSeperatorIndex); - const octets = bodyPart.slice(headerOctetSeperatorIndex + 4); - - // TODO: use textproto.readMIMEHeader from deno_std - const rawHeaders = headerText.split("\r\n"); - for (const rawHeader of rawHeaders) { - const sepIndex = rawHeader.indexOf(":"); - if (sepIndex < 0) { - continue; // Skip this header - } - const key = rawHeader.slice(0, sepIndex); - const value = rawHeader.slice(sepIndex + 1); - headers.set(key, value); - } - if (!headers.has("content-disposition")) { - continue; // Skip unknown part - } - // Content-Transfer-Encoding Deprecated - const contentDisposition = headers.get("content-disposition")!; - const partContentType = headers.get("content-type") || "text/plain"; - // TODO: custom charset encoding (needs TextEncoder support) - // const contentTypeCharset = - // getHeaderValueParams(partContentType).get("charset") || ""; - if (!hasHeaderValueOf(contentDisposition, "form-data")) { - continue; // Skip, might not be form-data - } - const dispositionParams = getHeaderValueParams(contentDisposition); - if (!dispositionParams.has("name")) { - continue; // Skip, unknown name - } - const dispositionName = dispositionParams.get("name")!; - if (dispositionParams.has("filename")) { - const filename = dispositionParams.get("filename")!; - const blob = new DenoBlob([enc.encode(octets)], { - type: partContentType - }); - // TODO: based on spec - // https://xhr.spec.whatwg.org/#dom-formdata-append - // https://xhr.spec.whatwg.org/#create-an-entry - // Currently it does not mention how I could pass content-type - // to the internally created file object... - formData.append(dispositionName, blob, filename); - } else { - formData.append(dispositionName, octets); - } - } - return formData; - } else if ( - hasHeaderValueOf(this.contentType, "application/x-www-form-urlencoded") - ) { - // From https://github.com/github/fetch/blob/master/fetch.js - // Copyright (c) 2014-2016 GitHub, Inc. MIT License - const body = await this.text(); - try { - body - .trim() - .split("&") - .forEach( - (bytes): void => { - if (bytes) { - const split = bytes.split("="); - const name = split.shift()!.replace(/\+/g, " "); - const value = split.join("=").replace(/\+/g, " "); - formData.append( - decodeURIComponent(name), - decodeURIComponent(value) - ); - } - } - ); - } catch (e) { - throw new TypeError("Invalid form urlencoded format"); - } - return formData; - } else { - throw new TypeError("Invalid form data"); - } - } - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - async json(): Promise { - const text = await this.text(); - return JSON.parse(text); - } - - async text(): Promise { - const ab = await this.arrayBuffer(); - const decoder = new TextDecoder("utf-8"); - return decoder.decode(ab); - } - - read(p: Uint8Array): Promise { - this._bodyUsed = true; - return read(this.rid, p); - } - - close(): void { + cancel(controller: ReadableStreamController): void { close(this.rid); + return controller.close(); } +} +class Body extends body.Body implements domTypes.ReadableStream { async cancel(): Promise { - return notImplemented(); + if (this._stream) { + return this._stream.cancel(); + } + throw new Error("no stream present"); } getReader(): domTypes.ReadableStreamReader { - return notImplemented(); + if (this._stream) { + return this._stream.getReader(); + } + throw new Error("no stream present"); + } + + get locked(): boolean { + if (this._stream) { + return this._stream.locked; + } + throw new Error("no stream present"); } tee(): [domTypes.ReadableStream, domTypes.ReadableStream] { - return notImplemented(); + if (this._stream) { + const streams = this._stream.tee(); + return [streams[0], streams[1]]; + } + throw new Error("no stream present"); } [Symbol.asyncIterator](): AsyncIterableIterator { - return io.toAsyncIterator(this); - } + //@ts-ignore + const reader = this.body.getReader(); - get bodyUsed(): boolean { - return this._bodyUsed; + return { + [Symbol.asyncIterator](): AsyncIterableIterator { + return this; + }, + + async next() { + return reader.read(); + }, + + return() { + return reader.releaseLock(); + } + } as AsyncIterableIterator; } } @@ -257,7 +98,7 @@ export class Response implements domTypes.Response { readonly redirected: boolean; headers: domTypes.Headers; readonly trailer: Promise; - readonly body: Body; + protected _body: Body; constructor( readonly url: string, @@ -266,40 +107,48 @@ export class Response implements domTypes.Response { headersList: Array<[string, string]>, rid: number, redirected_: boolean, - body_: null | Body = null + readableStream_: domTypes.ReadableStream | null = null ) { this.trailer = createResolvable(); this.headers = new Headers(headersList); const contentType = this.headers.get("content-type") || ""; - if (body_ == null) { - this.body = new Body(rid, contentType); + if (readableStream_ == null) { + const underlyingSource = new UnderlyingRIDSource(rid); + const rs = new ReadableStream(underlyingSource); + this._body = new Body(rs, contentType); } else { - this.body = body_; + this._body = new Body(readableStream_, contentType); } this.redirected = redirected_; } + get body(): domTypes.ReadableStream | null { + return this._body; + } + async arrayBuffer(): Promise { - return this.body.arrayBuffer(); + return this._body.arrayBuffer(); } async blob(): Promise { - return this.body.blob(); + return this._body.blob().then(blob => { + return blob; + }); } async formData(): Promise { - return this.body.formData(); + return this._body.formData(); } // eslint-disable-next-line @typescript-eslint/no-explicit-any async json(): Promise { - return this.body.json(); + return this._body.json(); } async text(): Promise { - return this.body.text(); + return this._body.text(); } get ok(): boolean { @@ -307,7 +156,7 @@ export class Response implements domTypes.Response { } get bodyUsed(): boolean { - return this.body.bodyUsed; + return this._body.bodyUsed; } clone(): domTypes.Response { @@ -323,6 +172,13 @@ export class Response implements domTypes.Response { headersList.push(header); } + let clonedStream: domTypes.ReadableStream | null = null; + if (this._body.body) { + const streams = this._body.body.tee(); + clonedStream = streams[1]; + this._body = new Body(streams[0], this._body.contentType); + } + return new Response( this.url, this.status, @@ -330,7 +186,7 @@ export class Response implements domTypes.Response { headersList, -1, this.redirected, - this.body + clonedStream ); } }