mirror of
https://github.com/denoland/deno.git
synced 2024-11-24 15:19:26 -05:00
fix(ext/node): close upgraded socket when the underlying http connection is closed (#25387)
This change fixes the handling of upgraded socket from `node:http` module. In `op_node_http_fetch_response_upgrade`, we create DuplexStream paired with `hyper::upgrade::Upgraded`. When the connection is closed from the server, the read result from `Upgraded` becomes 0. However because we don't close the paired DuplexStream at that point, the Socket object in JS side keeps alive even after the server closed. That caused the issue #20179 This change fixes it by closing the paired DuplexStream when the `Upgraded` stream returns 0 read result. closes #20179
This commit is contained in:
parent
dd208a6df0
commit
186f7484da
2 changed files with 69 additions and 0 deletions
|
@ -272,6 +272,7 @@ pub async fn op_node_http_fetch_response_upgrade(
|
|||
loop {
|
||||
let read = upgraded_rx.read(&mut buf).await?;
|
||||
if read == 0 {
|
||||
read_tx.shutdown().await?;
|
||||
break;
|
||||
}
|
||||
read_tx.write_all(&buf[..read]).await?;
|
||||
|
|
|
@ -13,6 +13,7 @@ import { text } from "node:stream/consumers";
|
|||
import { assert, assertEquals, fail } from "@std/assert";
|
||||
import { assertSpyCalls, spy } from "@std/testing/mock";
|
||||
import { fromFileUrl, relative } from "@std/path";
|
||||
import { retry } from "@std/async/retry";
|
||||
|
||||
import { gzip } from "node:zlib";
|
||||
import { Buffer } from "node:buffer";
|
||||
|
@ -1604,3 +1605,70 @@ Deno.test("[node/http] In ClientRequest, option.hostname has precedence over opt
|
|||
|
||||
await responseReceived.promise;
|
||||
});
|
||||
|
||||
Deno.test("[node/http] upgraded socket closes when the server closed without closing handshake", async () => {
|
||||
const clientSocketClosed = Promise.withResolvers<void>();
|
||||
const serverProcessClosed = Promise.withResolvers<void>();
|
||||
|
||||
// Uses the server in different process to shutdown it without closing handshake
|
||||
const server = `
|
||||
Deno.serve({ port: 1337 }, (req) => {
|
||||
if (req.headers.get("upgrade") != "websocket") {
|
||||
return new Response("ok");
|
||||
}
|
||||
console.log("upgrade on server");
|
||||
const { socket, response } = Deno.upgradeWebSocket(req);
|
||||
socket.addEventListener("message", (event) => {
|
||||
console.log("server received", event.data);
|
||||
socket.send("pong");
|
||||
});
|
||||
return response;
|
||||
});
|
||||
`;
|
||||
|
||||
const p = new Deno.Command("deno", { args: ["eval", server] }).spawn();
|
||||
|
||||
// Wait for the server to respond
|
||||
await retry(async () => {
|
||||
const resp = await fetch("http://localhost:1337");
|
||||
const _text = await resp.text();
|
||||
});
|
||||
|
||||
const options = {
|
||||
port: 1337,
|
||||
host: "127.0.0.1",
|
||||
headers: {
|
||||
"Connection": "Upgrade",
|
||||
"Upgrade": "websocket",
|
||||
"Sec-WebSocket-Key": "dGhlIHNhbXBsZSBub25jZQ==",
|
||||
},
|
||||
};
|
||||
|
||||
http.request(options).on("upgrade", (_res, socket) => {
|
||||
socket.on("close", () => {
|
||||
console.log("client socket closed");
|
||||
clientSocketClosed.resolve();
|
||||
});
|
||||
socket.on("data", async (data) => {
|
||||
// receives pong message
|
||||
assertEquals(data, Buffer.from("8104706f6e67", "hex"));
|
||||
|
||||
p.kill();
|
||||
await p.status;
|
||||
|
||||
console.log("process closed");
|
||||
serverProcessClosed.resolve();
|
||||
|
||||
// sending some additional message
|
||||
socket.write(Buffer.from("81847de88e01", "hex"));
|
||||
socket.write(Buffer.from("0d81e066", "hex"));
|
||||
});
|
||||
|
||||
// sending ping message
|
||||
socket.write(Buffer.from("81847de88e01", "hex"));
|
||||
socket.write(Buffer.from("0d81e066", "hex"));
|
||||
}).end();
|
||||
|
||||
await clientSocketClosed.promise;
|
||||
await serverProcessClosed.promise;
|
||||
});
|
||||
|
|
Loading…
Reference in a new issue