mirror of
https://github.com/denoland/deno.git
synced 2024-11-21 15:04:11 -05:00
fix(ext/node): don't wait for end() call to send http client request (#24390)
Closes https://github.com/denoland/deno/issues/24232 Closes https://github.com/denoland/deno/issues/24215
This commit is contained in:
parent
dadc606419
commit
496ea5903b
3 changed files with 131 additions and 49 deletions
|
@ -623,50 +623,6 @@ class ClientRequest extends OutgoingMessage {
|
|||
client[internalRidSymbol],
|
||||
this._bodyWriteRid,
|
||||
);
|
||||
}
|
||||
|
||||
_implicitHeader() {
|
||||
if (this._header) {
|
||||
throw new ERR_HTTP_HEADERS_SENT("render");
|
||||
}
|
||||
this._storeHeader(
|
||||
this.method + " " + this.path + " HTTP/1.1\r\n",
|
||||
this[kOutHeaders],
|
||||
);
|
||||
}
|
||||
|
||||
_getClient(): Deno.HttpClient | undefined {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
// TODO(bartlomieju): handle error
|
||||
onSocket(socket, _err) {
|
||||
nextTick(() => {
|
||||
this.socket = socket;
|
||||
this.emit("socket", socket);
|
||||
});
|
||||
}
|
||||
|
||||
// deno-lint-ignore no-explicit-any
|
||||
end(chunk?: any, encoding?: any, cb?: any): this {
|
||||
if (typeof chunk === "function") {
|
||||
cb = chunk;
|
||||
chunk = null;
|
||||
encoding = null;
|
||||
} else if (typeof encoding === "function") {
|
||||
cb = encoding;
|
||||
encoding = null;
|
||||
}
|
||||
|
||||
this.finished = true;
|
||||
if (chunk) {
|
||||
this.write_(chunk, encoding, null, true);
|
||||
} else if (!this._headerSent) {
|
||||
this._contentLength = 0;
|
||||
this._implicitHeader();
|
||||
this._send("", "latin1");
|
||||
}
|
||||
this._bodyWriter?.close();
|
||||
|
||||
(async () => {
|
||||
try {
|
||||
|
@ -674,11 +630,6 @@ class ClientRequest extends OutgoingMessage {
|
|||
if (this._req.cancelHandleRid !== null) {
|
||||
core.tryClose(this._req.cancelHandleRid);
|
||||
}
|
||||
try {
|
||||
cb?.();
|
||||
} catch (_) {
|
||||
//
|
||||
}
|
||||
if (this._timeout) {
|
||||
this._timeout.removeEventListener("abort", this._timeoutCb);
|
||||
webClearTimeout(this._timeout[timerId]);
|
||||
|
@ -788,6 +739,64 @@ class ClientRequest extends OutgoingMessage {
|
|||
})();
|
||||
}
|
||||
|
||||
_implicitHeader() {
|
||||
if (this._header) {
|
||||
throw new ERR_HTTP_HEADERS_SENT("render");
|
||||
}
|
||||
this._storeHeader(
|
||||
this.method + " " + this.path + " HTTP/1.1\r\n",
|
||||
this[kOutHeaders],
|
||||
);
|
||||
}
|
||||
|
||||
_getClient(): Deno.HttpClient | undefined {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
// TODO(bartlomieju): handle error
|
||||
onSocket(socket, _err) {
|
||||
nextTick(() => {
|
||||
this.socket = socket;
|
||||
this.emit("socket", socket);
|
||||
});
|
||||
}
|
||||
|
||||
// deno-lint-ignore no-explicit-any
|
||||
end(chunk?: any, encoding?: any, cb?: any): this {
|
||||
if (typeof chunk === "function") {
|
||||
cb = chunk;
|
||||
chunk = null;
|
||||
encoding = null;
|
||||
} else if (typeof encoding === "function") {
|
||||
cb = encoding;
|
||||
encoding = null;
|
||||
}
|
||||
|
||||
this.finished = true;
|
||||
if (chunk) {
|
||||
this.write_(chunk, encoding, null, true);
|
||||
} else if (!this._headerSent) {
|
||||
this._contentLength = 0;
|
||||
this._implicitHeader();
|
||||
this._send("", "latin1");
|
||||
}
|
||||
(async () => {
|
||||
try {
|
||||
await this._bodyWriter?.close();
|
||||
} catch (_) {
|
||||
// The readable stream resource is dropped right after
|
||||
// read is complete closing the writable stream resource.
|
||||
// If we try to close the writer again, it will result in an
|
||||
// error which we can safely ignore.
|
||||
}
|
||||
try {
|
||||
cb?.();
|
||||
} catch (_) {
|
||||
//
|
||||
}
|
||||
})();
|
||||
}
|
||||
|
||||
abort() {
|
||||
if (this.aborted) {
|
||||
return;
|
||||
|
|
|
@ -5,8 +5,11 @@ import http, { type RequestOptions } from "node:http";
|
|||
import url from "node:url";
|
||||
import https from "node:https";
|
||||
import net from "node:net";
|
||||
import fs from "node:fs";
|
||||
|
||||
import { assert, assertEquals, fail } from "@std/assert/mod.ts";
|
||||
import { assertSpyCalls, spy } from "@std/testing/mock.ts";
|
||||
import { fromFileUrl, relative } from "@std/path/mod.ts";
|
||||
|
||||
import { gzip } from "node:zlib";
|
||||
import { Buffer } from "node:buffer";
|
||||
|
@ -1179,3 +1182,72 @@ Deno.test("[node/http] client closing a streaming response doesn't terminate ser
|
|||
assertEquals(server.listening, false);
|
||||
clearInterval(interval!);
|
||||
});
|
||||
|
||||
Deno.test("[node/http] http.request() post streaming body works", async () => {
|
||||
const server = http.createServer((req, res) => {
|
||||
if (req.method === "POST") {
|
||||
let receivedBytes = 0;
|
||||
req.on("data", (chunk) => {
|
||||
receivedBytes += chunk.length;
|
||||
});
|
||||
req.on("end", () => {
|
||||
res.writeHead(200, { "Content-Type": "application/json" });
|
||||
res.end(JSON.stringify({ bytes: receivedBytes }));
|
||||
});
|
||||
} else {
|
||||
res.writeHead(405, { "Content-Type": "text/plain" });
|
||||
res.end("Method Not Allowed");
|
||||
}
|
||||
});
|
||||
|
||||
const deferred = Promise.withResolvers<void>();
|
||||
const timeout = setTimeout(() => {
|
||||
deferred.reject(new Error("timeout"));
|
||||
}, 5000);
|
||||
server.listen(0, () => {
|
||||
// deno-lint-ignore no-explicit-any
|
||||
const port = (server.address() as any).port;
|
||||
const filePath = relative(
|
||||
Deno.cwd(),
|
||||
fromFileUrl(new URL("./testdata/lorem_ipsum_512kb.txt", import.meta.url)),
|
||||
);
|
||||
const contentLength = 524289;
|
||||
|
||||
const options = {
|
||||
hostname: "localhost",
|
||||
port: port,
|
||||
path: "/",
|
||||
method: "POST",
|
||||
headers: {
|
||||
"Content-Type": "application/octet-stream",
|
||||
"Content-Length": contentLength,
|
||||
},
|
||||
};
|
||||
|
||||
const req = http.request(options, (res) => {
|
||||
let responseBody = "";
|
||||
res.on("data", (chunk) => {
|
||||
responseBody += chunk;
|
||||
});
|
||||
|
||||
res.on("end", () => {
|
||||
const response = JSON.parse(responseBody);
|
||||
assertEquals(res.statusCode, 200);
|
||||
assertEquals(response.bytes, contentLength);
|
||||
deferred.resolve();
|
||||
});
|
||||
});
|
||||
|
||||
req.on("error", (e) => {
|
||||
console.error(`Problem with request: ${e.message}`);
|
||||
});
|
||||
|
||||
const readStream = fs.createReadStream(filePath);
|
||||
readStream.pipe(req);
|
||||
});
|
||||
await deferred.promise;
|
||||
assertEquals(server.listening, true);
|
||||
server.close();
|
||||
clearTimeout(timeout);
|
||||
assertEquals(server.listening, false);
|
||||
});
|
||||
|
|
1
tests/unit_node/testdata/lorem_ipsum_512kb.txt
vendored
Normal file
1
tests/unit_node/testdata/lorem_ipsum_512kb.txt
vendored
Normal file
File diff suppressed because one or more lines are too long
Loading…
Reference in a new issue