mirror of
https://github.com/denoland/deno.git
synced 2024-11-21 15:04:11 -05:00
feat(ext/node): support http2session.socket (#24786)
Co-authored-by: Satya Rohith <me@satyarohith.com>
This commit is contained in:
parent
4eff1e8ec4
commit
875ee618d3
2 changed files with 130 additions and 19 deletions
|
@ -54,6 +54,7 @@ import {
|
||||||
ERR_HTTP2_INVALID_STREAM,
|
ERR_HTTP2_INVALID_STREAM,
|
||||||
ERR_HTTP2_NO_SOCKET_MANIPULATION,
|
ERR_HTTP2_NO_SOCKET_MANIPULATION,
|
||||||
ERR_HTTP2_SESSION_ERROR,
|
ERR_HTTP2_SESSION_ERROR,
|
||||||
|
ERR_HTTP2_SOCKET_UNBOUND,
|
||||||
ERR_HTTP2_STATUS_INVALID,
|
ERR_HTTP2_STATUS_INVALID,
|
||||||
ERR_HTTP2_STREAM_CANCEL,
|
ERR_HTTP2_STREAM_CANCEL,
|
||||||
ERR_HTTP2_STREAM_ERROR,
|
ERR_HTTP2_STREAM_ERROR,
|
||||||
|
@ -95,6 +96,8 @@ const kSentTrailers = Symbol("sent-trailers");
|
||||||
const kState = Symbol("state");
|
const kState = Symbol("state");
|
||||||
const kType = Symbol("type");
|
const kType = Symbol("type");
|
||||||
const kTimeout = Symbol("timeout");
|
const kTimeout = Symbol("timeout");
|
||||||
|
const kSocket = Symbol("socket");
|
||||||
|
const kProxySocket = Symbol("proxySocket");
|
||||||
|
|
||||||
const kDenoResponse = Symbol("kDenoResponse");
|
const kDenoResponse = Symbol("kDenoResponse");
|
||||||
const kDenoRid = Symbol("kDenoRid");
|
const kDenoRid = Symbol("kDenoRid");
|
||||||
|
@ -128,11 +131,76 @@ function debugHttp2(...args) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export class Http2Session extends EventEmitter {
|
const sessionProxySocketHandler = {
|
||||||
constructor(type, _options /* socket */) {
|
get(session, prop) {
|
||||||
super();
|
switch (prop) {
|
||||||
|
case "setTimeout":
|
||||||
|
case "ref":
|
||||||
|
case "unref":
|
||||||
|
return FunctionPrototypeBind(session[prop], session);
|
||||||
|
case "destroy":
|
||||||
|
case "emit":
|
||||||
|
case "end":
|
||||||
|
case "pause":
|
||||||
|
case "read":
|
||||||
|
case "resume":
|
||||||
|
case "write":
|
||||||
|
case "setEncoding":
|
||||||
|
case "setKeepAlive":
|
||||||
|
case "setNoDelay":
|
||||||
|
throw new ERR_HTTP2_NO_SOCKET_MANIPULATION();
|
||||||
|
default: {
|
||||||
|
const socket = session[kSocket];
|
||||||
|
if (socket === undefined) {
|
||||||
|
throw new ERR_HTTP2_SOCKET_UNBOUND();
|
||||||
|
}
|
||||||
|
const value = socket[prop];
|
||||||
|
return typeof value === "function"
|
||||||
|
? FunctionPrototypeBind(value, socket)
|
||||||
|
: value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
getPrototypeOf(session) {
|
||||||
|
const socket = session[kSocket];
|
||||||
|
if (socket === undefined) {
|
||||||
|
throw new ERR_HTTP2_SOCKET_UNBOUND();
|
||||||
|
}
|
||||||
|
return ReflectGetPrototypeOf(socket);
|
||||||
|
},
|
||||||
|
set(session, prop, value) {
|
||||||
|
switch (prop) {
|
||||||
|
case "setTimeout":
|
||||||
|
case "ref":
|
||||||
|
case "unref":
|
||||||
|
session[prop] = value;
|
||||||
|
return true;
|
||||||
|
case "destroy":
|
||||||
|
case "emit":
|
||||||
|
case "end":
|
||||||
|
case "pause":
|
||||||
|
case "read":
|
||||||
|
case "resume":
|
||||||
|
case "write":
|
||||||
|
case "setEncoding":
|
||||||
|
case "setKeepAlive":
|
||||||
|
case "setNoDelay":
|
||||||
|
throw new ERR_HTTP2_NO_SOCKET_MANIPULATION();
|
||||||
|
default: {
|
||||||
|
const socket = session[kSocket];
|
||||||
|
if (socket === undefined) {
|
||||||
|
throw new ERR_HTTP2_SOCKET_UNBOUND();
|
||||||
|
}
|
||||||
|
socket[prop] = value;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
// TODO(bartlomieju): Handle sockets here
|
export class Http2Session extends EventEmitter {
|
||||||
|
constructor(type, _options, socket) {
|
||||||
|
super();
|
||||||
|
|
||||||
this[kState] = {
|
this[kState] = {
|
||||||
destroyCode: constants.NGHTTP2_NO_ERROR,
|
destroyCode: constants.NGHTTP2_NO_ERROR,
|
||||||
|
@ -149,12 +217,11 @@ export class Http2Session extends EventEmitter {
|
||||||
this[kEncrypted] = undefined;
|
this[kEncrypted] = undefined;
|
||||||
this[kAlpnProtocol] = undefined;
|
this[kAlpnProtocol] = undefined;
|
||||||
this[kType] = type;
|
this[kType] = type;
|
||||||
|
this[kProxySocket] = null;
|
||||||
|
this[kSocket] = socket;
|
||||||
this[kTimeout] = null;
|
this[kTimeout] = null;
|
||||||
// this[kProxySocket] = null;
|
|
||||||
// this[kSocket] = socket;
|
|
||||||
// this[kHandle] = undefined;
|
|
||||||
|
|
||||||
// TODO(bartlomieju): connecting via socket
|
debugHttp2(type, "created");
|
||||||
}
|
}
|
||||||
|
|
||||||
get encrypted(): boolean {
|
get encrypted(): boolean {
|
||||||
|
@ -206,9 +273,12 @@ export class Http2Session extends EventEmitter {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
get socket(): Socket /*| TlsSocket*/ {
|
get socket(): Socket {
|
||||||
warnNotImplemented("Http2Session.socket");
|
const proxySocket = this[kProxySocket];
|
||||||
return {};
|
if (proxySocket === null) {
|
||||||
|
return this[kProxySocket] = new Proxy(this, sessionProxySocketHandler);
|
||||||
|
}
|
||||||
|
return proxySocket;
|
||||||
}
|
}
|
||||||
|
|
||||||
get type(): number {
|
get type(): number {
|
||||||
|
@ -274,7 +344,7 @@ export class Http2Session extends EventEmitter {
|
||||||
if (this.closed || this.destroyed) {
|
if (this.closed || this.destroyed) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
debugHttp2(this, "marking session closed");
|
||||||
this[kState].flags |= SESSION_FLAGS_CLOSED;
|
this[kState].flags |= SESSION_FLAGS_CLOSED;
|
||||||
if (typeof callback === "function") {
|
if (typeof callback === "function") {
|
||||||
this.once("close", callback);
|
this.once("close", callback);
|
||||||
|
@ -306,6 +376,10 @@ export class Http2Session extends EventEmitter {
|
||||||
warnNotImplemented("Http2Session.unref");
|
warnNotImplemented("Http2Session.unref");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_onTimeout() {
|
||||||
|
callTimeout(this, this);
|
||||||
|
}
|
||||||
|
|
||||||
setTimeout(msecs: number, callback?: () => void) {
|
setTimeout(msecs: number, callback?: () => void) {
|
||||||
setStreamTimeout.call(this, msecs, callback);
|
setStreamTimeout.call(this, msecs, callback);
|
||||||
}
|
}
|
||||||
|
@ -357,7 +431,8 @@ function closeSession(session: Http2Session, code?: number, error?: Error) {
|
||||||
|
|
||||||
export class ServerHttp2Session extends Http2Session {
|
export class ServerHttp2Session extends Http2Session {
|
||||||
constructor() {
|
constructor() {
|
||||||
super(constants.NGHTTP2_SESSION_SERVER, {});
|
// TODO(satyarohith): pass socket instead of undefined
|
||||||
|
super(constants.NGHTTP2_SESSION_SERVER, {}, undefined);
|
||||||
}
|
}
|
||||||
|
|
||||||
altsvc(
|
altsvc(
|
||||||
|
@ -395,7 +470,7 @@ export class ClientHttp2Session extends Http2Session {
|
||||||
url: string,
|
url: string,
|
||||||
options: Record<string, unknown>,
|
options: Record<string, unknown>,
|
||||||
) {
|
) {
|
||||||
super(constants.NGHTTP2_SESSION_CLIENT, options);
|
super(constants.NGHTTP2_SESSION_CLIENT, options, socket);
|
||||||
this[kPendingRequestCalls] = null;
|
this[kPendingRequestCalls] = null;
|
||||||
this[kDenoClientRid] = undefined;
|
this[kDenoClientRid] = undefined;
|
||||||
this[kDenoConnRid] = undefined;
|
this[kDenoConnRid] = undefined;
|
||||||
|
@ -2072,16 +2147,14 @@ const kStream = Symbol("stream");
|
||||||
const kResponse = Symbol("response");
|
const kResponse = Symbol("response");
|
||||||
const kHeaders = Symbol("headers");
|
const kHeaders = Symbol("headers");
|
||||||
const kRawHeaders = Symbol("rawHeaders");
|
const kRawHeaders = Symbol("rawHeaders");
|
||||||
const kSocket = Symbol("socket");
|
|
||||||
const kTrailers = Symbol("trailers");
|
const kTrailers = Symbol("trailers");
|
||||||
const kRawTrailers = Symbol("rawTrailers");
|
const kRawTrailers = Symbol("rawTrailers");
|
||||||
const kSetHeader = Symbol("setHeader");
|
const kSetHeader = Symbol("setHeader");
|
||||||
const kAppendHeader = Symbol("appendHeader");
|
const kAppendHeader = Symbol("appendHeader");
|
||||||
const kAborted = Symbol("aborted");
|
const kAborted = Symbol("aborted");
|
||||||
const kProxySocket = Symbol("proxySocket");
|
|
||||||
const kRequest = Symbol("request");
|
const kRequest = Symbol("request");
|
||||||
|
|
||||||
const proxySocketHandler = {
|
const streamProxySocketHandler = {
|
||||||
has(stream, prop) {
|
has(stream, prop) {
|
||||||
const ref = stream.session !== undefined ? stream.session[kSocket] : stream;
|
const ref = stream.session !== undefined ? stream.session[kSocket] : stream;
|
||||||
return (prop in stream) || (prop in ref);
|
return (prop in stream) || (prop in ref);
|
||||||
|
@ -2280,7 +2353,7 @@ class Http2ServerRequest extends Readable {
|
||||||
const stream = this[kStream];
|
const stream = this[kStream];
|
||||||
const proxySocket = stream[kProxySocket];
|
const proxySocket = stream[kProxySocket];
|
||||||
if (proxySocket === null) {
|
if (proxySocket === null) {
|
||||||
return stream[kProxySocket] = new Proxy(stream, proxySocketHandler);
|
return stream[kProxySocket] = new Proxy(stream, streamProxySocketHandler);
|
||||||
}
|
}
|
||||||
return proxySocket;
|
return proxySocket;
|
||||||
}
|
}
|
||||||
|
@ -2484,7 +2557,7 @@ class Http2ServerResponse extends Stream {
|
||||||
const stream = this[kStream];
|
const stream = this[kStream];
|
||||||
const proxySocket = stream[kProxySocket];
|
const proxySocket = stream[kProxySocket];
|
||||||
if (proxySocket === null) {
|
if (proxySocket === null) {
|
||||||
return stream[kProxySocket] = new Proxy(stream, proxySocketHandler);
|
return stream[kProxySocket] = new Proxy(stream, streamProxySocketHandler);
|
||||||
}
|
}
|
||||||
return proxySocket;
|
return proxySocket;
|
||||||
}
|
}
|
||||||
|
|
|
@ -275,3 +275,41 @@ Deno.test("[node/http2 client] deno doesn't panic on uppercase headers", async (
|
||||||
await endPromise.promise;
|
await endPromise.promise;
|
||||||
assertEquals(receivedData, "hello world\n");
|
assertEquals(receivedData, "hello world\n");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
Deno.test("[node/http2 ClientHttp2Session.socket]", async () => {
|
||||||
|
const url = "http://127.0.0.1:4246";
|
||||||
|
const client = http2.connect(url);
|
||||||
|
client.on("error", (err) => console.error(err));
|
||||||
|
|
||||||
|
const req = client.request({ ":method": "POST", ":path": "/" });
|
||||||
|
const endPromise = Promise.withResolvers<void>();
|
||||||
|
|
||||||
|
// test that we can access session.socket
|
||||||
|
client.socket.setTimeout(10000);
|
||||||
|
// nodejs allows setting arbitrary properties
|
||||||
|
// deno-lint-ignore no-explicit-any
|
||||||
|
(client.socket as any).nonExistant = 9001;
|
||||||
|
// deno-lint-ignore no-explicit-any
|
||||||
|
assertEquals((client.socket as any).nonExistant, 9001);
|
||||||
|
|
||||||
|
// regular request dance to make sure it keeps working
|
||||||
|
let receivedData = "";
|
||||||
|
req.write("hello");
|
||||||
|
req.setEncoding("utf8");
|
||||||
|
|
||||||
|
req.on("data", (chunk) => {
|
||||||
|
receivedData += chunk;
|
||||||
|
});
|
||||||
|
req.on("end", () => {
|
||||||
|
req.close();
|
||||||
|
client.close();
|
||||||
|
endPromise.resolve();
|
||||||
|
});
|
||||||
|
req.end();
|
||||||
|
await endPromise.promise;
|
||||||
|
assertEquals(client.socket.remoteAddress, "127.0.0.1");
|
||||||
|
assertEquals(client.socket.remotePort, 4246);
|
||||||
|
assertEquals(client.socket.remoteFamily, "IPv4");
|
||||||
|
client.socket.setTimeout(0);
|
||||||
|
assertEquals(receivedData, "hello world\n");
|
||||||
|
});
|
||||||
|
|
Loading…
Reference in a new issue