mirror of
https://github.com/denoland/deno.git
synced 2024-11-21 15:04:11 -05:00
feat: instrument http.serve
This commit is contained in:
parent
0b8df9f24d
commit
127c035500
6 changed files with 254 additions and 152 deletions
|
@ -34,8 +34,11 @@ const {
|
|||
ObjectHasOwn,
|
||||
ObjectPrototypeIsPrototypeOf,
|
||||
PromisePrototypeCatch,
|
||||
SafePromisePrototypeFinally,
|
||||
PromisePrototypeThen,
|
||||
String,
|
||||
StringPrototypeIncludes,
|
||||
StringPrototypeSlice,
|
||||
Symbol,
|
||||
TypeError,
|
||||
TypedArrayPrototypeGetSymbolToStringTag,
|
||||
|
@ -513,91 +516,139 @@ function fastSyncResponseOrStream(
|
|||
* This function returns a promise that will only reject in the case of abnormal exit.
|
||||
*/
|
||||
function mapToCallback(context, callback, onError) {
|
||||
return async function (req) {
|
||||
const asyncContext = getAsyncContext();
|
||||
setAsyncContext(context.asyncContext);
|
||||
|
||||
let mapped = async function (req, span) {
|
||||
// Get the response from the user-provided callback. If that fails, use onError. If that fails, return a fallback
|
||||
// 500 error.
|
||||
let innerRequest;
|
||||
let response;
|
||||
try {
|
||||
// Get the response from the user-provided callback. If that fails, use onError. If that fails, return a fallback
|
||||
// 500 error.
|
||||
let innerRequest;
|
||||
let response;
|
||||
try {
|
||||
innerRequest = new InnerRequest(req, context);
|
||||
const request = fromInnerRequest(innerRequest, "immutable");
|
||||
innerRequest.request = request;
|
||||
response = await callback(
|
||||
request,
|
||||
new ServeHandlerInfo(innerRequest),
|
||||
);
|
||||
innerRequest = new InnerRequest(req, context);
|
||||
const request = fromInnerRequest(innerRequest, "immutable");
|
||||
innerRequest.request = request;
|
||||
|
||||
// Throwing Error if the handler return value is not a Response class
|
||||
if (span) {
|
||||
span.updateName(request.method);
|
||||
span.setAttribute("http.request.method", request.method);
|
||||
const url = new URL(request.url);
|
||||
span.setAttribute("url.full", request.url);
|
||||
span.setAttribute(
|
||||
"url.scheme",
|
||||
StringPrototypeSlice(url.protocol, 0, -1),
|
||||
);
|
||||
span.setAttribute("url.path", url.pathname);
|
||||
span.setAttribute("url.query", StringPrototypeSlice(url.search, 1));
|
||||
}
|
||||
|
||||
response = await callback(
|
||||
request,
|
||||
new ServeHandlerInfo(innerRequest),
|
||||
);
|
||||
|
||||
// Throwing Error if the handler return value is not a Response class
|
||||
if (!ObjectPrototypeIsPrototypeOf(ResponsePrototype, response)) {
|
||||
throw new TypeError(
|
||||
"Return value from serve handler must be a response or a promise resolving to a response",
|
||||
);
|
||||
}
|
||||
|
||||
if (response.type === "error") {
|
||||
throw new TypeError(
|
||||
"Return value from serve handler must not be an error response (like Response.error())",
|
||||
);
|
||||
}
|
||||
|
||||
if (response.bodyUsed) {
|
||||
throw new TypeError(
|
||||
"The body of the Response returned from the serve handler has already been consumed",
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
try {
|
||||
response = await onError(error);
|
||||
if (!ObjectPrototypeIsPrototypeOf(ResponsePrototype, response)) {
|
||||
throw new TypeError(
|
||||
"Return value from serve handler must be a response or a promise resolving to a response",
|
||||
);
|
||||
}
|
||||
|
||||
if (response.type === "error") {
|
||||
throw new TypeError(
|
||||
"Return value from serve handler must not be an error response (like Response.error())",
|
||||
);
|
||||
}
|
||||
|
||||
if (response.bodyUsed) {
|
||||
throw new TypeError(
|
||||
"The body of the Response returned from the serve handler has already been consumed",
|
||||
"Return value from onError handler must be a response or a promise resolving to a response",
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
try {
|
||||
response = await onError(error);
|
||||
if (!ObjectPrototypeIsPrototypeOf(ResponsePrototype, response)) {
|
||||
throw new TypeError(
|
||||
"Return value from onError handler must be a response or a promise resolving to a response",
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
// deno-lint-ignore no-console
|
||||
console.error("Exception in onError while handling exception", error);
|
||||
response = internalServerError();
|
||||
}
|
||||
// deno-lint-ignore no-console
|
||||
console.error("Exception in onError while handling exception", error);
|
||||
response = internalServerError();
|
||||
}
|
||||
const inner = toInnerResponse(response);
|
||||
if (innerRequest?.[_upgraded]) {
|
||||
// We're done here as the connection has been upgraded during the callback and no longer requires servicing.
|
||||
if (response !== UPGRADE_RESPONSE_SENTINEL) {
|
||||
// deno-lint-ignore no-console
|
||||
console.error("Upgrade response was not returned from callback");
|
||||
context.close();
|
||||
}
|
||||
innerRequest?.[_upgraded]();
|
||||
return;
|
||||
}
|
||||
|
||||
// Did everything shut down while we were waiting?
|
||||
if (context.closed) {
|
||||
// We're shutting down, so this status shouldn't make it back to the client but "Service Unavailable" seems appropriate
|
||||
innerRequest?.close();
|
||||
op_http_set_promise_complete(req, 503);
|
||||
return;
|
||||
}
|
||||
|
||||
const status = inner.status;
|
||||
const headers = inner.headerList;
|
||||
if (headers && headers.length > 0) {
|
||||
if (headers.length == 1) {
|
||||
op_http_set_response_header(req, headers[0][0], headers[0][1]);
|
||||
} else {
|
||||
op_http_set_response_headers(req, headers);
|
||||
}
|
||||
}
|
||||
|
||||
fastSyncResponseOrStream(req, inner.body, status, innerRequest);
|
||||
} finally {
|
||||
setAsyncContext(asyncContext);
|
||||
}
|
||||
|
||||
if (span) {
|
||||
span.setAttribute(
|
||||
"http.response.status_code",
|
||||
String(response.status),
|
||||
);
|
||||
}
|
||||
|
||||
const inner = toInnerResponse(response);
|
||||
if (innerRequest?.[_upgraded]) {
|
||||
// We're done here as the connection has been upgraded during the callback and no longer requires servicing.
|
||||
if (response !== UPGRADE_RESPONSE_SENTINEL) {
|
||||
// deno-lint-ignore no-console
|
||||
console.error("Upgrade response was not returned from callback");
|
||||
context.close();
|
||||
}
|
||||
innerRequest?.[_upgraded]();
|
||||
return;
|
||||
}
|
||||
|
||||
// Did everything shut down while we were waiting?
|
||||
if (context.closed) {
|
||||
// We're shutting down, so this status shouldn't make it back to the client but "Service Unavailable" seems appropriate
|
||||
innerRequest?.close();
|
||||
op_http_set_promise_complete(req, 503);
|
||||
return;
|
||||
}
|
||||
|
||||
const status = inner.status;
|
||||
const headers = inner.headerList;
|
||||
if (headers && headers.length > 0) {
|
||||
if (headers.length == 1) {
|
||||
op_http_set_response_header(req, headers[0][0], headers[0][1]);
|
||||
} else {
|
||||
op_http_set_response_headers(req, headers);
|
||||
}
|
||||
}
|
||||
|
||||
fastSyncResponseOrStream(req, inner.body, status, innerRequest);
|
||||
};
|
||||
|
||||
if (internals.telemetry.tracingEnabled) {
|
||||
const { Span, enterSpan, endSpan } = internals.telemetry;
|
||||
const origMapped = mapped;
|
||||
mapped = function (req, _span) {
|
||||
const oldCtx = getAsyncContext();
|
||||
setAsyncContext(context.asyncContext);
|
||||
const span = new Span("deno.serve");
|
||||
try {
|
||||
enterSpan(span);
|
||||
return SafePromisePrototypeFinally(
|
||||
origMapped(req, span),
|
||||
() => endSpan(span),
|
||||
);
|
||||
} finally {
|
||||
// equiv to exitSpan.
|
||||
setAsyncContext(oldCtx);
|
||||
}
|
||||
};
|
||||
} else {
|
||||
const origMapped = mapped;
|
||||
mapped = function (req, span) {
|
||||
const oldCtx = getAsyncContext();
|
||||
setAsyncContext(context.asyncContext);
|
||||
try {
|
||||
return origMapped(req, span);
|
||||
} finally {
|
||||
setAsyncContext(oldCtx);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
return mapped;
|
||||
}
|
||||
|
||||
type RawHandler = (
|
||||
|
@ -795,7 +846,7 @@ function serveHttpOn(context, addr, callback) {
|
|||
// Attempt to pull as many requests out of the queue as possible before awaiting. This API is
|
||||
// a synchronous, non-blocking API that returns u32::MAX if anything goes wrong.
|
||||
while ((req = op_http_try_wait(rid)) !== null) {
|
||||
PromisePrototypeCatch(callback(req), promiseErrorHandler);
|
||||
PromisePrototypeCatch(callback(req, undefined), promiseErrorHandler);
|
||||
}
|
||||
currentPromise = op_http_wait(rid);
|
||||
if (!ref) {
|
||||
|
@ -815,7 +866,7 @@ function serveHttpOn(context, addr, callback) {
|
|||
if (req === null) {
|
||||
break;
|
||||
}
|
||||
PromisePrototypeCatch(callback(req), promiseErrorHandler);
|
||||
PromisePrototypeCatch(callback(req, undefined), promiseErrorHandler);
|
||||
}
|
||||
|
||||
try {
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
|
||||
|
||||
import { core, primordials } from "ext:core/mod.js";
|
||||
import { core, internals, primordials } from "ext:core/mod.js";
|
||||
import {
|
||||
op_crypto_get_random_values,
|
||||
op_otel_instrumentation_scope_create_and_enter,
|
||||
|
@ -32,11 +32,9 @@ const {
|
|||
ObjectDefineProperty,
|
||||
WeakRefPrototypeDeref,
|
||||
String,
|
||||
StringPrototypePadStart,
|
||||
ObjectPrototypeIsPrototypeOf,
|
||||
DataView,
|
||||
DataViewPrototypeSetUint32,
|
||||
SafeWeakRef,
|
||||
TypedArrayPrototypeGetBuffer,
|
||||
} = primordials;
|
||||
const { AsyncVariable, setAsyncContext } = core;
|
||||
|
||||
|
@ -404,7 +402,7 @@ export class Span {
|
|||
span.#asyncContext = NO_ASYNC_CONTEXT;
|
||||
};
|
||||
|
||||
exitSpan = (span: Span) => {
|
||||
endSpan = (span: Span) => {
|
||||
const endTime = now();
|
||||
submit(
|
||||
span.#spanId,
|
||||
|
@ -449,39 +447,11 @@ export class Span {
|
|||
const currentSpan: Span | {
|
||||
spanContext(): { traceId: string; spanId: string };
|
||||
} = CURRENT.get()?.getValue(SPAN_KEY);
|
||||
if (!currentSpan) {
|
||||
const buffer = new Uint8Array(TRACE_ID_BYTES + SPAN_ID_BYTES);
|
||||
if (currentSpan) {
|
||||
if (DETERMINISTIC) {
|
||||
DataViewPrototypeSetUint32(
|
||||
new DataView(TypedArrayPrototypeGetBuffer(buffer)),
|
||||
TRACE_ID_BYTES - 4,
|
||||
COUNTER,
|
||||
true,
|
||||
);
|
||||
COUNTER += 1;
|
||||
DataViewPrototypeSetUint32(
|
||||
new DataView(TypedArrayPrototypeGetBuffer(buffer)),
|
||||
TRACE_ID_BYTES + SPAN_ID_BYTES - 4,
|
||||
COUNTER,
|
||||
true,
|
||||
);
|
||||
COUNTER += 1;
|
||||
} else {
|
||||
op_crypto_get_random_values(buffer);
|
||||
}
|
||||
this.#traceId = TypedArrayPrototypeSubarray(buffer, 0, TRACE_ID_BYTES);
|
||||
this.#spanId = TypedArrayPrototypeSubarray(buffer, TRACE_ID_BYTES);
|
||||
} else {
|
||||
this.#spanId = new Uint8Array(SPAN_ID_BYTES);
|
||||
if (DETERMINISTIC) {
|
||||
DataViewPrototypeSetUint32(
|
||||
new DataView(TypedArrayPrototypeGetBuffer(this.#spanId)),
|
||||
SPAN_ID_BYTES - 4,
|
||||
COUNTER,
|
||||
true,
|
||||
);
|
||||
COUNTER += 1;
|
||||
this.#spanId = StringPrototypePadStart(String(COUNTER++), 16, "0");
|
||||
} else {
|
||||
this.#spanId = new Uint8Array(SPAN_ID_BYTES);
|
||||
op_crypto_get_random_values(this.#spanId);
|
||||
}
|
||||
// deno-lint-ignore prefer-primordials
|
||||
|
@ -493,6 +463,16 @@ export class Span {
|
|||
this.#traceId = context.traceId;
|
||||
this.#parentSpanId = context.spanId;
|
||||
}
|
||||
} else {
|
||||
if (DETERMINISTIC) {
|
||||
this.#traceId = StringPrototypePadStart(String(COUNTER++), 32, "0");
|
||||
this.#spanId = StringPrototypePadStart(String(COUNTER++), 16, "0");
|
||||
} else {
|
||||
const buffer = new Uint8Array(TRACE_ID_BYTES + SPAN_ID_BYTES);
|
||||
op_crypto_get_random_values(buffer);
|
||||
this.#traceId = TypedArrayPrototypeSubarray(buffer, 0, TRACE_ID_BYTES);
|
||||
this.#spanId = TypedArrayPrototypeSubarray(buffer, TRACE_ID_BYTES);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -717,4 +697,16 @@ export function bootstrap(
|
|||
}
|
||||
}
|
||||
|
||||
export const telemetry = { SpanExporter, ContextManager };
|
||||
export const telemetry = {
|
||||
SpanExporter,
|
||||
ContextManager,
|
||||
};
|
||||
internals.telemetry = {
|
||||
Span,
|
||||
enterSpan,
|
||||
exitSpan,
|
||||
endSpan,
|
||||
get tracingEnabled() {
|
||||
return TRACING_ENABLED;
|
||||
},
|
||||
};
|
||||
|
|
|
@ -835,9 +835,9 @@ fn op_otel_span_set_dropped(
|
|||
#[smi] dropped_events_count: u32,
|
||||
) {
|
||||
if let Some(temporary_span) = state.try_borrow_mut::<TemporarySpan>() {
|
||||
temporary_span.0.dropped_attributes_count = dropped_attributes_count;
|
||||
temporary_span.0.links.dropped_count = dropped_links_count;
|
||||
temporary_span.0.events.dropped_count = dropped_events_count;
|
||||
temporary_span.0.dropped_attributes_count += dropped_attributes_count;
|
||||
temporary_span.0.links.dropped_count += dropped_links_count;
|
||||
temporary_span.0.events.dropped_count += dropped_events_count;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2,30 +2,18 @@
|
|||
"steps": [
|
||||
{
|
||||
"args": "run -A main.ts basic.ts",
|
||||
"envs": {
|
||||
"DENO_UNSTABLE_OTEL_DETERMINISTIC": "1"
|
||||
},
|
||||
"output": "basic.out"
|
||||
},
|
||||
{
|
||||
"args": "run -A main.ts natural_exit.ts",
|
||||
"envs": {
|
||||
"DENO_UNSTABLE_OTEL_DETERMINISTIC": "1"
|
||||
},
|
||||
"output": "natural_exit.out"
|
||||
},
|
||||
{
|
||||
"args": "run -A main.ts deno_dot_exit.ts",
|
||||
"envs": {
|
||||
"DENO_UNSTABLE_OTEL_DETERMINISTIC": "1"
|
||||
},
|
||||
"output": "deno_dot_exit.out"
|
||||
},
|
||||
{
|
||||
"args": "run -A main.ts uncaught.ts",
|
||||
"envs": {
|
||||
"DENO_UNSTABLE_OTEL_DETERMINISTIC": "1"
|
||||
},
|
||||
"output": "uncaught.out"
|
||||
}
|
||||
]
|
||||
|
|
|
@ -1,12 +1,70 @@
|
|||
{
|
||||
"spans": [
|
||||
{
|
||||
"traceId": "10000000000000000000000000000002",
|
||||
"spanId": "1000000000000003",
|
||||
"traceId": "00000000000000000000000000000001",
|
||||
"spanId": "0000000000000002",
|
||||
"traceState": "",
|
||||
"parentSpanId": "1000000000000001",
|
||||
"parentSpanId": "",
|
||||
"flags": 1,
|
||||
"name": "inner span",
|
||||
"name": "GET",
|
||||
"kind": 1,
|
||||
"startTimeUnixNano": "[WILDCARD]",
|
||||
"endTimeUnixNano": "[WILDCARD]",
|
||||
"attributes": [
|
||||
{
|
||||
"key": "http.request.method",
|
||||
"value": {
|
||||
"stringValue": "GET"
|
||||
}
|
||||
},
|
||||
{
|
||||
"key": "url.full",
|
||||
"value": {
|
||||
"stringValue": "http://localhost:[WILDCARD]/"
|
||||
}
|
||||
},
|
||||
{
|
||||
"key": "url.scheme",
|
||||
"value": {
|
||||
"stringValue": "http"
|
||||
}
|
||||
},
|
||||
{
|
||||
"key": "url.path",
|
||||
"value": {
|
||||
"stringValue": "/"
|
||||
}
|
||||
},
|
||||
{
|
||||
"key": "url.query",
|
||||
"value": {
|
||||
"stringValue": ""
|
||||
}
|
||||
},
|
||||
{
|
||||
"key": "http.response.status_code",
|
||||
"value": {
|
||||
"stringValue": "200"
|
||||
}
|
||||
}
|
||||
],
|
||||
"droppedAttributesCount": 0,
|
||||
"events": [],
|
||||
"droppedEventsCount": 0,
|
||||
"links": [],
|
||||
"droppedLinksCount": 0,
|
||||
"status": {
|
||||
"message": "",
|
||||
"code": 0
|
||||
}
|
||||
},
|
||||
{
|
||||
"traceId": "00000000000000000000000000000001",
|
||||
"spanId": "1000000000000001",
|
||||
"traceState": "",
|
||||
"parentSpanId": "0000000000000002",
|
||||
"flags": 1,
|
||||
"name": "outer span",
|
||||
"kind": 1,
|
||||
"startTimeUnixNano": "[WILDCARD]",
|
||||
"endTimeUnixNano": "[WILDCARD]",
|
||||
|
@ -22,12 +80,12 @@
|
|||
}
|
||||
},
|
||||
{
|
||||
"traceId": "10000000000000000000000000000002",
|
||||
"spanId": "1000000000000001",
|
||||
"traceId": "00000000000000000000000000000001",
|
||||
"spanId": "1000000000000002",
|
||||
"traceState": "",
|
||||
"parentSpanId": "",
|
||||
"parentSpanId": "1000000000000001",
|
||||
"flags": 1,
|
||||
"name": "outer span",
|
||||
"name": "inner span",
|
||||
"kind": 1,
|
||||
"startTimeUnixNano": "[WILDCARD]",
|
||||
"endTimeUnixNano": "[WILDCARD]",
|
||||
|
@ -55,8 +113,8 @@
|
|||
"attributes": [],
|
||||
"droppedAttributesCount": 0,
|
||||
"flags": 1,
|
||||
"traceId": "10000000000000000000000000000002",
|
||||
"spanId": "1000000000000003"
|
||||
"traceId": "00000000000000000000000000000001",
|
||||
"spanId": "1000000000000002"
|
||||
},
|
||||
{
|
||||
"timeUnixNano": "0",
|
||||
|
@ -69,8 +127,8 @@
|
|||
"attributes": [],
|
||||
"droppedAttributesCount": 0,
|
||||
"flags": 1,
|
||||
"traceId": "10000000000000000000000000000002",
|
||||
"spanId": "1000000000000003"
|
||||
"traceId": "00000000000000000000000000000001",
|
||||
"spanId": "1000000000000002"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
@ -12,26 +12,39 @@ const server = Deno.serve(
|
|||
const command = new Deno.Command(Deno.execPath(), {
|
||||
args: ["run", "-A", "-q", "--unstable-otel", Deno.args[0]],
|
||||
env: {
|
||||
DENO_UNSTABLE_OTEL_DETERMINISTIC: "1",
|
||||
OTEL_EXPORTER_OTLP_PROTOCOL: "http/json",
|
||||
OTEL_EXPORTER_OTLP_ENDPOINT: `http://localhost:${port}`,
|
||||
},
|
||||
stdout: "null",
|
||||
});
|
||||
const child = command.spawn();
|
||||
child.output().then(() => {
|
||||
server.shutdown();
|
||||
|
||||
console.log(JSON.stringify(data, null, 2));
|
||||
});
|
||||
child.output()
|
||||
.then(() => server.shutdown())
|
||||
.then(() => {
|
||||
data.logs.sort((a, b) =>
|
||||
Number(
|
||||
BigInt(a.observedTimeUnixNano) - BigInt(b.observedTimeUnixNano),
|
||||
)
|
||||
);
|
||||
data.spans.sort((a, b) =>
|
||||
Number(BigInt(`0x${a.spanId}`) - BigInt(`0x${b.spanId}`))
|
||||
);
|
||||
console.log(JSON.stringify(data, null, 2));
|
||||
});
|
||||
},
|
||||
async handler(req) {
|
||||
const body = await req.json();
|
||||
if (body.resourceLogs) {
|
||||
data.logs.push(...body.resourceLogs[0].scopeLogs[0].logRecords);
|
||||
}
|
||||
if (body.resourceSpans) {
|
||||
data.spans.push(...body.resourceSpans[0].scopeSpans[0].spans);
|
||||
}
|
||||
body.resourceLogs?.forEach((rLogs) => {
|
||||
rLogs.scopeLogs.forEach((sLogs) => {
|
||||
data.logs.push(...sLogs.logRecords);
|
||||
});
|
||||
});
|
||||
body.resourceSpans?.forEach((rSpans) => {
|
||||
rSpans.scopeSpans.forEach((sSpans) => {
|
||||
data.spans.push(...sSpans.spans);
|
||||
});
|
||||
});
|
||||
return Response.json({ partialSuccess: {} }, { status: 200 });
|
||||
},
|
||||
},
|
||||
|
|
Loading…
Reference in a new issue