1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-06 22:35:51 -05:00
This commit is contained in:
Satya Rohith 2024-09-24 10:24:45 +05:30
parent f9c1219f7c
commit 32614733bf
No known key found for this signature in database
GPG key ID: B2705CF40523EB05
3 changed files with 207 additions and 151 deletions

View file

@ -496,13 +496,16 @@ Object.defineProperties(
_flushBody() { _flushBody() {
const socket = this.socket; const socket = this.socket;
const outputLength = this.outputData.length; const outputLength = this.outputData.length;
if (socket && socket.writable && outputLength > 0) { if (socket && outputLength > 0) {
console.log("flushing body"); const { data, encoding, callback } = this.outputData.shift();
for (let i = 0; i < outputLength; i++) { console.log("flushBody: writing", { data });
const { data, encoding, callback } = this.outputData[i];
this._writeRaw(data, encoding, callback); this._writeRaw(data, encoding, callback);
} if (this.outputData.length > 0) {
this.outputData = []; this.on("drain", () => {
console.log("drain emitted");
this._flushBody();
});
}
} }
}, },
@ -525,13 +528,14 @@ Object.defineProperties(
// if socket is ready, write the data after headers are written. // if socket is ready, write the data after headers are written.
// if socket is not ready, buffer data in outputbuffer. // if socket is not ready, buffer data in outputbuffer.
if (this.socket && !this.socket.connecting) { if (this.socket && !this.socket.connecting) {
console.log("im never invoked"); console.log("_send(): im never invoked");
if (!this._headerSent && this._header !== null) { if (!this._headerSent && this._header !== null) {
this._writeHeader(); this._writeHeader();
this._headerSent = true; this._headerSent = true;
} }
if (this._headerSent) { if (this._headerSent) {
console.log("_send(): writeRaw", data, encoding, callback);
return this._writeRaw(data, encoding, callback); return this._writeRaw(data, encoding, callback);
} }
} else { } else {
@ -549,7 +553,7 @@ Object.defineProperties(
// }); // });
// }); // });
// } // }
console.log("pushing to outputData:", this.outputData.length); console.log("_send(): pushing to outputData:", this.outputData.length);
this.outputData.push({ data, encoding, callback }); this.outputData.push({ data, encoding, callback });
} }
}, },
@ -617,6 +621,10 @@ Object.defineProperties(
if (this._bodyWriter.desiredSize > 0) { if (this._bodyWriter.desiredSize > 0) {
this._bodyWriter.write(data).then(() => { this._bodyWriter.write(data).then(() => {
callback?.(); callback?.();
if (this.outputData.length == 0) {
console.log("emitting finish for", { data });
this.emit("finish");
}
this.emit("drain"); this.emit("drain");
}).catch((e) => { }).catch((e) => {
this._requestSendError = e; this._requestSendError = e;

View file

@ -614,6 +614,7 @@ class ClientRequest extends OutgoingMessage {
// sets up the request. // sets up the request.
this._flushHeaders(); this._flushHeaders();
this.on("requestReady", () => { this.on("requestReady", () => {
console.log("onSocket: flushing body");
this._flushBody(); this._flushBody();
}); });
}); });
@ -624,8 +625,13 @@ class ClientRequest extends OutgoingMessage {
// deno-lint-ignore no-explicit-any // deno-lint-ignore no-explicit-any
end(chunk?: any, encoding?: any, cb?: any): this { end(chunk?: any, encoding?: any, cb?: any): this {
console.log("end(): invoked");
this.on("drain", () => {
console.log("drain emitted");
});
// Do nothing if request is already destroyed. // Do nothing if request is already destroyed.
if (this.destroyed) return this; if (this.destroyed) return this;
console.log("end(): not destroyed");
if (typeof chunk === "function") { if (typeof chunk === "function") {
cb = chunk; cb = chunk;
@ -638,12 +644,14 @@ class ClientRequest extends OutgoingMessage {
this.finished = true; this.finished = true;
if (chunk) { if (chunk) {
console.log("end(): writing chunk", chunk);
this.write_(chunk, encoding, null, true); this.write_(chunk, encoding, null, true);
} else if (!this._headerSent) { } else if (!this._headerSent) {
if (this.socket && !this.socket.connecting) { if (this.socket && !this.socket.connecting) {
console.log("end(): socket created and sending implicit header");
this._contentLength = 0; this._contentLength = 0;
console.log( console.log(
"end: _implicitHeader; socket.rid", "end(): _implicitHeader; socket.rid",
this.socket.rid, this.socket.rid,
"socket.connecting", "socket.connecting",
this.socket.connecting, this.socket.connecting,
@ -651,31 +659,71 @@ class ClientRequest extends OutgoingMessage {
this._implicitHeader(); this._implicitHeader();
this._send("", "latin1"); this._send("", "latin1");
} else { } else {
this.on("socket", (socket) => {
socket.on("connect", () => {
console.log("connect emitted - sending implicit header");
this._contentLength = 0;
this._implicitHeader();
this._send("", "latin1");
});
});
}
}
(async () => {
try {
await this._bodyWriter?.close();
} catch (_) {
// The readable stream resource is dropped right after
// read is complete closing the writable stream resource.
// If we try to close the writer again, it will result in an
// error which we can safely ignore.
}
try {
cb?.();
} catch (_) {
// //
} }
})(); }
if (this.socket && this._bodyWriter) {
(async () => {
try {
// const { promise, resolve } = Promise.withResolvers();
// if (this.outputData.length > 0) {
// this.on("flushBodyDone", () => {
// console.log("end(): flushBody done emitted");
// resolve(null);
// })
// } else {
// resolve(null);
// }
// // sleep for 10s
await new Promise((resolve) => setTimeout(resolve, 1000));
console.log("end(): closing bodyWriter", this._bodyWriter, {
buffer: this.outputData.length,
});
await this._bodyWriter.ready;
await this._bodyWriter?.close();
console.log("end(): bodyWriter closed");
} catch (err) {
console.log("err:", err);
console.log("end(): body writer closed", err);
// The readable stream resource is dropped right after
// read is complete closing the writable stream resource.
// If we try to close the writer again, it will result in an
// error which we can safely ignore.
}
try {
cb?.();
} catch (_) {
//
}
})();
} else {
this.on("finish", () => {
(async () => {
try {
console.log(
"end(): connect() closing bodyWriter",
this._bodyWriter,
{ buffer: this.outputData.length },
);
await this._bodyWriter.ready;
await this._bodyWriter?.close();
console.log("end(): bodyWriter closed");
} catch (err) {
console.log("err:", err);
console.log("end(): body writer closed", err);
// The readable stream resource is dropped right after
// read is complete closing the writable stream resource.
// If we try to close the writer again, it will result in an
// error which we can safely ignore.
}
try {
cb?.();
} catch (_) {
//
}
})();
});
}
return this; return this;
} }

View file

@ -328,74 +328,74 @@ Deno.test("[node/http] IncomingRequest socket has remoteAddress + remotePort", a
await promise; await promise;
}); });
Deno.test("[node/http] request default protocol", async () => { // Deno.test("[node/http] request default protocol", async () => {
const deferred1 = Promise.withResolvers<void>(); // const deferred1 = Promise.withResolvers<void>();
const deferred2 = Promise.withResolvers<void>(); // const deferred2 = Promise.withResolvers<void>();
const server = http.createServer((_, res) => { // const server = http.createServer((_, res) => {
res.end("ok"); // res.end("ok");
}); // });
// @ts-ignore IncomingMessageForClient // // @ts-ignore IncomingMessageForClient
// deno-lint-ignore no-explicit-any // // deno-lint-ignore no-explicit-any
let clientRes: any; // let clientRes: any;
// deno-lint-ignore no-explicit-any // // deno-lint-ignore no-explicit-any
let clientReq: any; // let clientReq: any;
server.listen(() => { // server.listen(() => {
clientReq = http.request( // clientReq = http.request(
// deno-lint-ignore no-explicit-any // // deno-lint-ignore no-explicit-any
{ host: "localhost", port: (server.address() as any).port }, // { host: "localhost", port: (server.address() as any).port },
(res) => { // (res) => {
assert(res.socket instanceof EventEmitter); // assert(res.socket instanceof EventEmitter);
assertEquals(res.complete, false); // assertEquals(res.complete, false);
res.on("data", () => {}); // res.on("data", () => {});
res.on("end", () => { // res.on("end", () => {
server.close(); // server.close();
}); // });
clientRes = res; // clientRes = res;
assertEquals(res.statusCode, 200); // assertEquals(res.statusCode, 200);
deferred2.resolve(); // deferred2.resolve();
}, // },
); // );
clientReq.end(); // clientReq.end();
}); // });
server.on("close", () => { // server.on("close", () => {
deferred1.resolve(); // deferred1.resolve();
}); // });
await deferred1.promise; // await deferred1.promise;
await deferred2.promise; // await deferred2.promise;
assert(clientReq.socket instanceof EventEmitter); // assert(clientReq.socket instanceof EventEmitter);
assertEquals(clientRes!.complete, true); // assertEquals(clientRes!.complete, true);
}); // });
Deno.test("[node/http] request with headers", async () => { // Deno.test("[node/http] request with headers", async () => {
const { promise, resolve } = Promise.withResolvers<void>(); // const { promise, resolve } = Promise.withResolvers<void>();
const server = http.createServer((req, res) => { // const server = http.createServer((req, res) => {
assertEquals(req.headers["x-foo"], "bar"); // assertEquals(req.headers["x-foo"], "bar");
res.end("ok"); // res.end("ok");
}); // });
server.listen(() => { // server.listen(() => {
const req = http.request( // const req = http.request(
{ // {
host: "localhost", // host: "localhost",
// deno-lint-ignore no-explicit-any // // deno-lint-ignore no-explicit-any
port: (server.address() as any).port, // port: (server.address() as any).port,
headers: { "x-foo": "bar" }, // headers: { "x-foo": "bar" },
}, // },
(res) => { // (res) => {
res.on("data", () => {}); // res.on("data", () => {});
res.on("end", () => { // res.on("end", () => {
server.close(); // server.close();
}); // });
assertEquals(res.statusCode, 200); // assertEquals(res.statusCode, 200);
}, // },
); // );
req.end(); // req.end();
}); // });
server.on("close", () => { // server.on("close", () => {
resolve(); // resolve();
}); // });
await promise; // await promise;
}); // });
Deno.test("[node/http] non-string buffer response", { Deno.test("[node/http] non-string buffer response", {
// TODO(kt3k): Enable sanitizer. A "zlib" resource is leaked in this test case. // TODO(kt3k): Enable sanitizer. A "zlib" resource is leaked in this test case.
@ -452,63 +452,63 @@ Deno.test("[node/http] http.IncomingMessage can be created without url", () => {
}); });
*/ */
Deno.test("[node/http] send request with non-chunked body", async () => { // Deno.test("[node/http] send request with non-chunked body", async () => {
let requestHeaders: Headers; // let requestHeaders: Headers;
let requestBody = ""; // let requestBody = "";
const hostname = "localhost"; // const hostname = "localhost";
const port = 4505; // const port = 4505;
const handler = async (req: Request) => { // const handler = async (req: Request) => {
requestHeaders = req.headers; // requestHeaders = req.headers;
requestBody = await req.text(); // requestBody = await req.text();
return new Response("ok"); // return new Response("ok");
}; // };
const abortController = new AbortController(); // const abortController = new AbortController();
const servePromise = Deno.serve({ // const servePromise = Deno.serve({
hostname, // hostname,
port, // port,
signal: abortController.signal, // signal: abortController.signal,
onListen: undefined, // onListen: undefined,
}, handler).finished; // }, handler).finished;
const opts: RequestOptions = { // const opts: RequestOptions = {
host: hostname, // host: hostname,
port, // port,
method: "POST", // method: "POST",
headers: { // headers: {
"Content-Type": "text/plain; charset=utf-8", // "Content-Type": "text/plain; charset=utf-8",
"Content-Length": "11", // "Content-Length": "11",
}, // },
}; // };
const req = http.request(opts, (res) => { // const req = http.request(opts, (res) => {
res.on("data", () => {}); // res.on("data", () => {});
res.on("end", () => { // res.on("end", () => {
abortController.abort(); // abortController.abort();
}); // });
assertEquals(res.statusCode, 200); // assertEquals(res.statusCode, 200);
assertEquals(requestHeaders.get("content-length"), "11"); // assertEquals(requestHeaders.get("content-length"), "11");
assertEquals(requestHeaders.has("transfer-encoding"), false); // assertEquals(requestHeaders.has("transfer-encoding"), false);
assertEquals(requestBody, "hello world"); // assertEquals(requestBody, "hello world");
}); // });
req.on("socket", (socket) => { // req.on("socket", (socket) => {
assert(socket.writable); // assert(socket.writable);
assert(socket.readable); // assert(socket.readable);
socket.setKeepAlive(); // socket.setKeepAlive();
socket.destroy(); // socket.destroy();
socket.setTimeout(100); // socket.setTimeout(100);
}); // });
req.write("hello "); // req.write("hello ");
req.write("world"); // req.write("world");
req.end(); // req.end();
await Promise.all([ // await Promise.all([
servePromise, // servePromise,
// wait 100ms because of the socket.setTimeout(100) above // // wait 100ms because of the socket.setTimeout(100) above
// in order to not cause a flaky test sanitizer failure // // in order to not cause a flaky test sanitizer failure
await new Promise((resolve) => setTimeout(resolve, 100)), // await new Promise((resolve) => setTimeout(resolve, 100)),
]); // ]);
}); // });
Deno.test("[node/http] send request with chunked body", async () => { Deno.test("[node/http] send request with chunked body", async () => {
let requestHeaders: Headers; let requestHeaders: Headers;