mirror of
https://github.com/denoland/deno.git
synced 2024-11-21 15:04:11 -05:00
fix: dd-trace http message compat (#25021)
make this http incoming constructor match with node, and also handle arbitrary duplex inputs
This commit is contained in:
parent
8537c4537b
commit
fd1f8234f4
1 changed files with 33 additions and 21 deletions
|
@ -30,6 +30,7 @@ import {
|
|||
} from "ext:deno_node/internal/validators.mjs";
|
||||
import {
|
||||
addAbortSignal,
|
||||
Duplex as NodeDuplex,
|
||||
finished,
|
||||
Readable as NodeReadable,
|
||||
Writable as NodeWritable,
|
||||
|
@ -290,12 +291,14 @@ class FakeSocket extends EventEmitter {
|
|||
encrypted?: boolean | undefined;
|
||||
remotePort?: number | undefined;
|
||||
remoteAddress?: string | undefined;
|
||||
reader?: ReadableStreamDefaultReader | undefined;
|
||||
} = {},
|
||||
) {
|
||||
super();
|
||||
this.remoteAddress = opts.remoteAddress;
|
||||
this.remotePort = opts.remotePort;
|
||||
this.encrypted = opts.encrypted;
|
||||
this.reader = opts.reader;
|
||||
this.writable = true;
|
||||
this.readable = true;
|
||||
}
|
||||
|
@ -1613,17 +1616,17 @@ export class ServerResponse extends NodeWritable {
|
|||
|
||||
// TODO(@AaronO): optimize
|
||||
export class IncomingMessageForServer extends NodeReadable {
|
||||
#req: Request;
|
||||
#headers: Record<string, string>;
|
||||
url: string;
|
||||
method: string;
|
||||
// Polyfills part of net.Socket object.
|
||||
// These properties are used by `npm:forwarded` for example.
|
||||
socket: { remoteAddress: string; remotePort: number };
|
||||
socket: Socket | FakeSocket;
|
||||
|
||||
constructor(req: Request, socket: FakeSocket) {
|
||||
// Check if no body (GET/HEAD/OPTIONS/...)
|
||||
const reader = req.body?.getReader();
|
||||
constructor(socket: FakeSocket | Socket) {
|
||||
const reader = socket instanceof FakeSocket
|
||||
? socket.reader
|
||||
: socket instanceof Socket
|
||||
? NodeDuplex.toWeb(socket).readable.getReader()
|
||||
: null;
|
||||
super({
|
||||
autoDestroy: true,
|
||||
emitClose: true,
|
||||
|
@ -1646,12 +1649,11 @@ export class IncomingMessageForServer extends NodeReadable {
|
|||
}).finally(nextTick(onError, this, err, cb));
|
||||
},
|
||||
});
|
||||
// TODO(@bartlomieju): consider more robust path extraction, e.g:
|
||||
// url: (new URL(request.url).pathname),
|
||||
this.url = req.url?.slice(req.url.indexOf("/", 8));
|
||||
this.method = req.method;
|
||||
this.url = "";
|
||||
this.method = "";
|
||||
this.socket = socket;
|
||||
this.#req = req;
|
||||
this.upgrade = null;
|
||||
this.rawHeaders = [];
|
||||
}
|
||||
|
||||
get aborted() {
|
||||
|
@ -1669,7 +1671,7 @@ export class IncomingMessageForServer extends NodeReadable {
|
|||
get headers() {
|
||||
if (!this.#headers) {
|
||||
this.#headers = {};
|
||||
const entries = headersEntries(this.#req.headers);
|
||||
const entries = headersEntries(this.rawHeaders);
|
||||
for (let i = 0; i < entries.length; i++) {
|
||||
const entry = entries[i];
|
||||
this.#headers[entry[0]] = entry[1];
|
||||
|
@ -1682,17 +1684,18 @@ export class IncomingMessageForServer extends NodeReadable {
|
|||
this.#headers = val;
|
||||
}
|
||||
|
||||
get upgrade(): boolean {
|
||||
return Boolean(
|
||||
this.#req.headers.get("connection")?.toLowerCase().includes("upgrade") &&
|
||||
this.#req.headers.get("upgrade"),
|
||||
);
|
||||
}
|
||||
|
||||
// connection is deprecated, but still tested in unit test.
|
||||
get connection() {
|
||||
return this.socket;
|
||||
}
|
||||
|
||||
setTimeout(msecs, callback) {
|
||||
if (callback) {
|
||||
this.on("timeout", callback);
|
||||
}
|
||||
this.socket.setTimeout(msecs);
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
export type ServerHandler = (
|
||||
|
@ -1774,8 +1777,17 @@ export class ServerImpl extends EventEmitter {
|
|||
remoteAddress: info.remoteAddr.hostname,
|
||||
remotePort: info.remoteAddr.port,
|
||||
encrypted: this._encrypted,
|
||||
reader: request.body?.getReader(),
|
||||
});
|
||||
const req = new IncomingMessageForServer(request, socket);
|
||||
|
||||
const req = new IncomingMessageForServer(socket);
|
||||
req.url = request.url?.slice(req.url.indexOf("/", 8));
|
||||
req.method = request.method;
|
||||
req.upgrade =
|
||||
request.headers.get("connection")?.toLowerCase().includes("upgrade") &&
|
||||
request.headers.get("upgrade");
|
||||
req.rawHeaders = request.headers;
|
||||
|
||||
if (req.upgrade && this.listenerCount("upgrade") > 0) {
|
||||
const { conn, response } = upgradeHttpRaw(request);
|
||||
const socket = new Socket({
|
||||
|
|
Loading…
Reference in a new issue