mirror of
https://github.com/denoland/deno.git
synced 2024-11-25 15:29:32 -05:00
feat: support individual async handler for each op (#3690)
This commit is contained in:
parent
d8ad81d3fb
commit
fe5662058e
9 changed files with 37 additions and 84 deletions
|
@ -85,57 +85,12 @@ export function setPluginAsyncHandler(
|
||||||
PLUGIN_ASYNC_HANDLER_MAP.set(opId, handler);
|
PLUGIN_ASYNC_HANDLER_MAP.set(opId, handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
export function asyncMsgFromRust(opId: number, ui8: Uint8Array): void {
|
export function getAsyncHandler(opName: string): (msg: Uint8Array) => void {
|
||||||
switch (opId) {
|
switch (opName) {
|
||||||
case OP_WRITE:
|
case "OP_WRITE":
|
||||||
case OP_READ:
|
case "OP_READ":
|
||||||
minimal.asyncMsgFromRust(opId, ui8);
|
return minimal.asyncMsgFromRust;
|
||||||
break;
|
|
||||||
case OP_GET_DIR:
|
|
||||||
case OP_EXIT:
|
|
||||||
case OP_IS_TTY:
|
|
||||||
case OP_ENV:
|
|
||||||
case OP_EXEC_PATH:
|
|
||||||
case OP_UTIME:
|
|
||||||
case OP_OPEN:
|
|
||||||
case OP_SEEK:
|
|
||||||
case OP_FETCH:
|
|
||||||
case OP_REPL_START:
|
|
||||||
case OP_REPL_READLINE:
|
|
||||||
case OP_ACCEPT:
|
|
||||||
case OP_ACCEPT_TLS:
|
|
||||||
case OP_DIAL:
|
|
||||||
case OP_GLOBAL_TIMER:
|
|
||||||
case OP_HOST_GET_WORKER_CLOSED:
|
|
||||||
case OP_HOST_GET_MESSAGE:
|
|
||||||
case OP_WORKER_GET_MESSAGE:
|
|
||||||
case OP_RUN_STATUS:
|
|
||||||
case OP_MKDIR:
|
|
||||||
case OP_CHMOD:
|
|
||||||
case OP_CHOWN:
|
|
||||||
case OP_REMOVE:
|
|
||||||
case OP_COPY_FILE:
|
|
||||||
case OP_STAT:
|
|
||||||
case OP_REALPATH:
|
|
||||||
case OP_READ_DIR:
|
|
||||||
case OP_RENAME:
|
|
||||||
case OP_LINK:
|
|
||||||
case OP_SYMLINK:
|
|
||||||
case OP_READ_LINK:
|
|
||||||
case OP_TRUNCATE:
|
|
||||||
case OP_MAKE_TEMP_DIR:
|
|
||||||
case OP_DIAL_TLS:
|
|
||||||
case OP_FETCH_SOURCE_FILES:
|
|
||||||
case OP_COMPILE:
|
|
||||||
case OP_TRANSPILE:
|
|
||||||
json.asyncMsgFromRust(opId, ui8);
|
|
||||||
break;
|
|
||||||
default:
|
default:
|
||||||
const handler = PLUGIN_ASYNC_HANDLER_MAP.get(opId);
|
return json.asyncMsgFromRust;
|
||||||
if (handler) {
|
|
||||||
handler(ui8);
|
|
||||||
} else {
|
|
||||||
throw Error("bad async opId");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,7 +43,7 @@ function unwrapResponse(res: JsonResponse): Ok {
|
||||||
return res.ok;
|
return res.ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
export function asyncMsgFromRust(opId: number, resUi8: Uint8Array): void {
|
export function asyncMsgFromRust(resUi8: Uint8Array): void {
|
||||||
const res = decode(resUi8);
|
const res = decode(resUi8);
|
||||||
util.assert(res.promiseId != null);
|
util.assert(res.promiseId != null);
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,6 @@ function nextPromiseId(): number {
|
||||||
|
|
||||||
export interface RecordMinimal {
|
export interface RecordMinimal {
|
||||||
promiseId: number;
|
promiseId: number;
|
||||||
opId: number; // Maybe better called dispatchId
|
|
||||||
arg: number;
|
arg: number;
|
||||||
result: number;
|
result: number;
|
||||||
err?: {
|
err?: {
|
||||||
|
@ -27,10 +26,7 @@ export interface RecordMinimal {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
export function recordFromBufMinimal(
|
export function recordFromBufMinimal(ui8: Uint8Array): RecordMinimal {
|
||||||
opId: number,
|
|
||||||
ui8: Uint8Array
|
|
||||||
): RecordMinimal {
|
|
||||||
const header = ui8.slice(0, 12);
|
const header = ui8.slice(0, 12);
|
||||||
const buf32 = new Int32Array(
|
const buf32 = new Int32Array(
|
||||||
header.buffer,
|
header.buffer,
|
||||||
|
@ -52,7 +48,6 @@ export function recordFromBufMinimal(
|
||||||
|
|
||||||
return {
|
return {
|
||||||
promiseId,
|
promiseId,
|
||||||
opId,
|
|
||||||
arg,
|
arg,
|
||||||
result,
|
result,
|
||||||
err
|
err
|
||||||
|
@ -74,8 +69,8 @@ const scratchBytes = new Uint8Array(
|
||||||
);
|
);
|
||||||
util.assert(scratchBytes.byteLength === scratch32.length * 4);
|
util.assert(scratchBytes.byteLength === scratch32.length * 4);
|
||||||
|
|
||||||
export function asyncMsgFromRust(opId: number, ui8: Uint8Array): void {
|
export function asyncMsgFromRust(ui8: Uint8Array): void {
|
||||||
const record = recordFromBufMinimal(opId, ui8);
|
const record = recordFromBufMinimal(ui8);
|
||||||
const { promiseId } = record;
|
const { promiseId } = record;
|
||||||
const promise = promiseTableMin.get(promiseId);
|
const promise = promiseTableMin.get(promiseId);
|
||||||
promiseTableMin.delete(promiseId);
|
promiseTableMin.delete(promiseId);
|
||||||
|
@ -95,7 +90,7 @@ export async function sendAsyncMinimal(
|
||||||
const promise = util.createResolvable<RecordMinimal>();
|
const promise = util.createResolvable<RecordMinimal>();
|
||||||
const buf = core.dispatch(opId, scratchBytes, zeroCopy);
|
const buf = core.dispatch(opId, scratchBytes, zeroCopy);
|
||||||
if (buf) {
|
if (buf) {
|
||||||
const record = recordFromBufMinimal(opId, buf);
|
const record = recordFromBufMinimal(buf);
|
||||||
// Sync result.
|
// Sync result.
|
||||||
promise.resolve(record);
|
promise.resolve(record);
|
||||||
} else {
|
} else {
|
||||||
|
@ -115,6 +110,6 @@ export function sendSyncMinimal(
|
||||||
scratch32[0] = 0; // promiseId 0 indicates sync
|
scratch32[0] = 0; // promiseId 0 indicates sync
|
||||||
scratch32[1] = arg;
|
scratch32[1] = arg;
|
||||||
const res = core.dispatch(opId, scratchBytes, zeroCopy)!;
|
const res = core.dispatch(opId, scratchBytes, zeroCopy)!;
|
||||||
const resRecord = recordFromBufMinimal(opId, res);
|
const resRecord = recordFromBufMinimal(res);
|
||||||
return unwrapResponse(resRecord);
|
return unwrapResponse(resRecord);
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,7 +89,6 @@ interface Start {
|
||||||
// the runtime and the compiler environments.
|
// the runtime and the compiler environments.
|
||||||
// @internal
|
// @internal
|
||||||
export function start(preserveDenoNamespace = true, source?: string): Start {
|
export function start(preserveDenoNamespace = true, source?: string): Start {
|
||||||
core.setAsyncHandler(dispatch.asyncMsgFromRust);
|
|
||||||
const ops = core.ops();
|
const ops = core.ops();
|
||||||
// TODO(bartlomieju): this is a prototype, we should come up with
|
// TODO(bartlomieju): this is a prototype, we should come up with
|
||||||
// something a bit more sophisticated
|
// something a bit more sophisticated
|
||||||
|
@ -98,6 +97,7 @@ export function start(preserveDenoNamespace = true, source?: string): Start {
|
||||||
// Assign op ids to actual variables
|
// Assign op ids to actual variables
|
||||||
// TODO(ry) This type casting is gross and should be fixed.
|
// TODO(ry) This type casting is gross and should be fixed.
|
||||||
((dispatch as unknown) as { [key: string]: number })[opName] = opId;
|
((dispatch as unknown) as { [key: string]: number })[opName] = opId;
|
||||||
|
core.setAsyncHandler(opId, dispatch.getAsyncHandler(opName));
|
||||||
}
|
}
|
||||||
// First we send an empty `Start` message to let the privileged side know we
|
// First we send an empty `Start` message to let the privileged side know we
|
||||||
// are ready. The response should be a `StartRes` message containing the CLI
|
// are ready. The response should be a `StartRes` message containing the CLI
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
import { sendSync } from "./dispatch_json.ts";
|
import { sendSync } from "./dispatch_json.ts";
|
||||||
import { OP_OPEN_PLUGIN, setPluginAsyncHandler } from "./dispatch.ts";
|
import { OP_OPEN_PLUGIN } from "./dispatch.ts";
|
||||||
import { core } from "./core.ts";
|
import { core } from "./core.ts";
|
||||||
|
|
||||||
export interface AsyncHandler {
|
export interface AsyncHandler {
|
||||||
|
@ -25,7 +25,7 @@ class PluginOpImpl implements PluginOp {
|
||||||
}
|
}
|
||||||
|
|
||||||
setAsyncHandler(handler: AsyncHandler): void {
|
setAsyncHandler(handler: AsyncHandler): void {
|
||||||
setPluginAsyncHandler(this.opId, handler);
|
core.setAsyncHandler(this.opId, handler);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -123,8 +123,10 @@ async function serve(rid) {
|
||||||
let ops;
|
let ops;
|
||||||
|
|
||||||
async function main() {
|
async function main() {
|
||||||
Deno.core.setAsyncHandler(handleAsyncMsgFromRust);
|
|
||||||
ops = Deno.core.ops();
|
ops = Deno.core.ops();
|
||||||
|
for (const opName in ops) {
|
||||||
|
Deno.core.setAsyncHandler(ops[opName], handleAsyncMsgFromRust);
|
||||||
|
}
|
||||||
|
|
||||||
Deno.core.print("http_bench.js start\n");
|
Deno.core.print("http_bench.js start\n");
|
||||||
|
|
||||||
|
|
|
@ -900,7 +900,7 @@ pub mod tests {
|
||||||
"setup2.js",
|
"setup2.js",
|
||||||
r#"
|
r#"
|
||||||
let nrecv = 0;
|
let nrecv = 0;
|
||||||
Deno.core.setAsyncHandler((opId, buf) => {
|
Deno.core.setAsyncHandler(1, (buf) => {
|
||||||
nrecv++;
|
nrecv++;
|
||||||
});
|
});
|
||||||
"#,
|
"#,
|
||||||
|
@ -1021,7 +1021,7 @@ pub mod tests {
|
||||||
"overflow_req_sync.js",
|
"overflow_req_sync.js",
|
||||||
r#"
|
r#"
|
||||||
let asyncRecv = 0;
|
let asyncRecv = 0;
|
||||||
Deno.core.setAsyncHandler((opId, buf) => { asyncRecv++ });
|
Deno.core.setAsyncHandler(1, (buf) => { asyncRecv++ });
|
||||||
// Large message that will overflow the shared space.
|
// Large message that will overflow the shared space.
|
||||||
let control = new Uint8Array(100 * 1024 * 1024);
|
let control = new Uint8Array(100 * 1024 * 1024);
|
||||||
let response = Deno.core.dispatch(1, control);
|
let response = Deno.core.dispatch(1, control);
|
||||||
|
@ -1043,7 +1043,7 @@ pub mod tests {
|
||||||
"overflow_res_sync.js",
|
"overflow_res_sync.js",
|
||||||
r#"
|
r#"
|
||||||
let asyncRecv = 0;
|
let asyncRecv = 0;
|
||||||
Deno.core.setAsyncHandler((opId, buf) => { asyncRecv++ });
|
Deno.core.setAsyncHandler(1, (buf) => { asyncRecv++ });
|
||||||
// Large message that will overflow the shared space.
|
// Large message that will overflow the shared space.
|
||||||
let control = new Uint8Array([42]);
|
let control = new Uint8Array([42]);
|
||||||
let response = Deno.core.dispatch(1, control);
|
let response = Deno.core.dispatch(1, control);
|
||||||
|
@ -1064,8 +1064,7 @@ pub mod tests {
|
||||||
"overflow_req_async.js",
|
"overflow_req_async.js",
|
||||||
r#"
|
r#"
|
||||||
let asyncRecv = 0;
|
let asyncRecv = 0;
|
||||||
Deno.core.setAsyncHandler((opId, buf) => {
|
Deno.core.setAsyncHandler(1, (buf) => {
|
||||||
assert(opId == 1);
|
|
||||||
assert(buf.byteLength === 4);
|
assert(buf.byteLength === 4);
|
||||||
assert(buf[0] === 43);
|
assert(buf[0] === 43);
|
||||||
asyncRecv++;
|
asyncRecv++;
|
||||||
|
@ -1097,8 +1096,7 @@ pub mod tests {
|
||||||
"overflow_res_async.js",
|
"overflow_res_async.js",
|
||||||
r#"
|
r#"
|
||||||
let asyncRecv = 0;
|
let asyncRecv = 0;
|
||||||
Deno.core.setAsyncHandler((opId, buf) => {
|
Deno.core.setAsyncHandler(1, (buf) => {
|
||||||
assert(opId == 1);
|
|
||||||
assert(buf.byteLength === 100 * 1024 * 1024);
|
assert(buf.byteLength === 100 * 1024 * 1024);
|
||||||
assert(buf[0] === 4);
|
assert(buf[0] === 4);
|
||||||
asyncRecv++;
|
asyncRecv++;
|
||||||
|
@ -1126,8 +1124,7 @@ pub mod tests {
|
||||||
"overflow_res_multiple_dispatch_async.js",
|
"overflow_res_multiple_dispatch_async.js",
|
||||||
r#"
|
r#"
|
||||||
let asyncRecv = 0;
|
let asyncRecv = 0;
|
||||||
Deno.core.setAsyncHandler((opId, buf) => {
|
Deno.core.setAsyncHandler(1, (buf) => {
|
||||||
assert(opId === 1);
|
|
||||||
assert(buf.byteLength === 100 * 1024 * 1024);
|
assert(buf.byteLength === 100 * 1024 * 1024);
|
||||||
assert(buf[0] === 4);
|
assert(buf[0] === 4);
|
||||||
asyncRecv++;
|
asyncRecv++;
|
||||||
|
|
|
@ -38,6 +38,9 @@ SharedQueue Binary Layout
|
||||||
|
|
||||||
let sharedBytes;
|
let sharedBytes;
|
||||||
let shared32;
|
let shared32;
|
||||||
|
|
||||||
|
let asyncHandlers;
|
||||||
|
|
||||||
let initialized = false;
|
let initialized = false;
|
||||||
|
|
||||||
function maybeInit() {
|
function maybeInit() {
|
||||||
|
@ -54,6 +57,7 @@ SharedQueue Binary Layout
|
||||||
assert(shared32 == null);
|
assert(shared32 == null);
|
||||||
sharedBytes = new Uint8Array(shared);
|
sharedBytes = new Uint8Array(shared);
|
||||||
shared32 = new Int32Array(shared);
|
shared32 = new Int32Array(shared);
|
||||||
|
asyncHandlers = [];
|
||||||
// Callers should not call Deno.core.recv, use setAsyncHandler.
|
// Callers should not call Deno.core.recv, use setAsyncHandler.
|
||||||
Deno.core.recv(handleAsyncMsgFromRust);
|
Deno.core.recv(handleAsyncMsgFromRust);
|
||||||
}
|
}
|
||||||
|
@ -157,24 +161,24 @@ SharedQueue Binary Layout
|
||||||
return [opId, buf];
|
return [opId, buf];
|
||||||
}
|
}
|
||||||
|
|
||||||
let asyncHandler;
|
function setAsyncHandler(opId, cb) {
|
||||||
function setAsyncHandler(cb) {
|
|
||||||
maybeInit();
|
maybeInit();
|
||||||
assert(asyncHandler == null);
|
assert(opId != null);
|
||||||
asyncHandler = cb;
|
asyncHandlers[opId] = cb;
|
||||||
}
|
}
|
||||||
|
|
||||||
function handleAsyncMsgFromRust(opId, buf) {
|
function handleAsyncMsgFromRust(opId, buf) {
|
||||||
if (buf) {
|
if (buf) {
|
||||||
// This is the overflow_response case of deno::Isolate::poll().
|
// This is the overflow_response case of deno::Isolate::poll().
|
||||||
asyncHandler(opId, buf);
|
asyncHandlers[opId](buf);
|
||||||
} else {
|
} else {
|
||||||
while (true) {
|
while (true) {
|
||||||
const opIdBuf = shift();
|
const opIdBuf = shift();
|
||||||
if (opIdBuf == null) {
|
if (opIdBuf == null) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
asyncHandler(...opIdBuf);
|
assert(asyncHandlers[opIdBuf[0]] != null);
|
||||||
|
asyncHandlers[opIdBuf[0]](opIdBuf[1]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
6
deno_typescript/lib.deno_core.d.ts
vendored
6
deno_typescript/lib.deno_core.d.ts
vendored
|
@ -5,7 +5,7 @@
|
||||||
// Deno and therefore do not flow through to the runtime type library.
|
// Deno and therefore do not flow through to the runtime type library.
|
||||||
|
|
||||||
declare interface MessageCallback {
|
declare interface MessageCallback {
|
||||||
(opId: number, msg: Uint8Array): void;
|
(msg: Uint8Array): void;
|
||||||
}
|
}
|
||||||
|
|
||||||
interface EvalErrorInfo {
|
interface EvalErrorInfo {
|
||||||
|
@ -27,7 +27,7 @@ declare interface DenoCore {
|
||||||
control: Uint8Array,
|
control: Uint8Array,
|
||||||
zeroCopy?: ArrayBufferView | null
|
zeroCopy?: ArrayBufferView | null
|
||||||
): Uint8Array | null;
|
): Uint8Array | null;
|
||||||
setAsyncHandler(cb: MessageCallback): void;
|
setAsyncHandler(opId: number, cb: MessageCallback): void;
|
||||||
sharedQueue: {
|
sharedQueue: {
|
||||||
head(): number;
|
head(): number;
|
||||||
numRecords(): number;
|
numRecords(): number;
|
||||||
|
@ -39,7 +39,7 @@ declare interface DenoCore {
|
||||||
|
|
||||||
ops(): Record<string, number>;
|
ops(): Record<string, number>;
|
||||||
|
|
||||||
recv(cb: MessageCallback): void;
|
recv(cb: (opId: number, msg: Uint8Array) => void): void;
|
||||||
|
|
||||||
send(
|
send(
|
||||||
opId: number,
|
opId: number,
|
||||||
|
|
Loading…
Reference in a new issue