From 127c03550098db807419c9951d2a068cc18a360d Mon Sep 17 00:00:00 2001 From: snek Date: Thu, 21 Nov 2024 14:47:26 +0100 Subject: [PATCH] feat: instrument http.serve --- ext/http/00_serve.ts | 205 ++++++++++++++-------- runtime/js/telemetry.ts | 66 +++---- runtime/ops/otel.rs | 6 +- tests/specs/cli/otel_basic/__test__.jsonc | 12 -- tests/specs/cli/otel_basic/basic.out | 82 +++++++-- tests/specs/cli/otel_basic/main.ts | 35 ++-- 6 files changed, 254 insertions(+), 152 deletions(-) diff --git a/ext/http/00_serve.ts b/ext/http/00_serve.ts index fcdb87d092..766a6d2739 100644 --- a/ext/http/00_serve.ts +++ b/ext/http/00_serve.ts @@ -34,8 +34,11 @@ const { ObjectHasOwn, ObjectPrototypeIsPrototypeOf, PromisePrototypeCatch, + SafePromisePrototypeFinally, PromisePrototypeThen, + String, StringPrototypeIncludes, + StringPrototypeSlice, Symbol, TypeError, TypedArrayPrototypeGetSymbolToStringTag, @@ -513,91 +516,139 @@ function fastSyncResponseOrStream( * This function returns a promise that will only reject in the case of abnormal exit. */ function mapToCallback(context, callback, onError) { - return async function (req) { - const asyncContext = getAsyncContext(); - setAsyncContext(context.asyncContext); - + let mapped = async function (req, span) { + // Get the response from the user-provided callback. If that fails, use onError. If that fails, return a fallback + // 500 error. + let innerRequest; + let response; try { - // Get the response from the user-provided callback. If that fails, use onError. If that fails, return a fallback - // 500 error. - let innerRequest; - let response; - try { - innerRequest = new InnerRequest(req, context); - const request = fromInnerRequest(innerRequest, "immutable"); - innerRequest.request = request; - response = await callback( - request, - new ServeHandlerInfo(innerRequest), - ); + innerRequest = new InnerRequest(req, context); + const request = fromInnerRequest(innerRequest, "immutable"); + innerRequest.request = request; - // Throwing Error if the handler return value is not a Response class + if (span) { + span.updateName(request.method); + span.setAttribute("http.request.method", request.method); + const url = new URL(request.url); + span.setAttribute("url.full", request.url); + span.setAttribute( + "url.scheme", + StringPrototypeSlice(url.protocol, 0, -1), + ); + span.setAttribute("url.path", url.pathname); + span.setAttribute("url.query", StringPrototypeSlice(url.search, 1)); + } + + response = await callback( + request, + new ServeHandlerInfo(innerRequest), + ); + + // Throwing Error if the handler return value is not a Response class + if (!ObjectPrototypeIsPrototypeOf(ResponsePrototype, response)) { + throw new TypeError( + "Return value from serve handler must be a response or a promise resolving to a response", + ); + } + + if (response.type === "error") { + throw new TypeError( + "Return value from serve handler must not be an error response (like Response.error())", + ); + } + + if (response.bodyUsed) { + throw new TypeError( + "The body of the Response returned from the serve handler has already been consumed", + ); + } + } catch (error) { + try { + response = await onError(error); if (!ObjectPrototypeIsPrototypeOf(ResponsePrototype, response)) { throw new TypeError( - "Return value from serve handler must be a response or a promise resolving to a response", - ); - } - - if (response.type === "error") { - throw new TypeError( - "Return value from serve handler must not be an error response (like Response.error())", - ); - } - - if (response.bodyUsed) { - throw new TypeError( - "The body of the Response returned from the serve handler has already been consumed", + "Return value from onError handler must be a response or a promise resolving to a response", ); } } catch (error) { - try { - response = await onError(error); - if (!ObjectPrototypeIsPrototypeOf(ResponsePrototype, response)) { - throw new TypeError( - "Return value from onError handler must be a response or a promise resolving to a response", - ); - } - } catch (error) { - // deno-lint-ignore no-console - console.error("Exception in onError while handling exception", error); - response = internalServerError(); - } + // deno-lint-ignore no-console + console.error("Exception in onError while handling exception", error); + response = internalServerError(); } - const inner = toInnerResponse(response); - if (innerRequest?.[_upgraded]) { - // We're done here as the connection has been upgraded during the callback and no longer requires servicing. - if (response !== UPGRADE_RESPONSE_SENTINEL) { - // deno-lint-ignore no-console - console.error("Upgrade response was not returned from callback"); - context.close(); - } - innerRequest?.[_upgraded](); - return; - } - - // Did everything shut down while we were waiting? - if (context.closed) { - // We're shutting down, so this status shouldn't make it back to the client but "Service Unavailable" seems appropriate - innerRequest?.close(); - op_http_set_promise_complete(req, 503); - return; - } - - const status = inner.status; - const headers = inner.headerList; - if (headers && headers.length > 0) { - if (headers.length == 1) { - op_http_set_response_header(req, headers[0][0], headers[0][1]); - } else { - op_http_set_response_headers(req, headers); - } - } - - fastSyncResponseOrStream(req, inner.body, status, innerRequest); - } finally { - setAsyncContext(asyncContext); } + + if (span) { + span.setAttribute( + "http.response.status_code", + String(response.status), + ); + } + + const inner = toInnerResponse(response); + if (innerRequest?.[_upgraded]) { + // We're done here as the connection has been upgraded during the callback and no longer requires servicing. + if (response !== UPGRADE_RESPONSE_SENTINEL) { + // deno-lint-ignore no-console + console.error("Upgrade response was not returned from callback"); + context.close(); + } + innerRequest?.[_upgraded](); + return; + } + + // Did everything shut down while we were waiting? + if (context.closed) { + // We're shutting down, so this status shouldn't make it back to the client but "Service Unavailable" seems appropriate + innerRequest?.close(); + op_http_set_promise_complete(req, 503); + return; + } + + const status = inner.status; + const headers = inner.headerList; + if (headers && headers.length > 0) { + if (headers.length == 1) { + op_http_set_response_header(req, headers[0][0], headers[0][1]); + } else { + op_http_set_response_headers(req, headers); + } + } + + fastSyncResponseOrStream(req, inner.body, status, innerRequest); }; + + if (internals.telemetry.tracingEnabled) { + const { Span, enterSpan, endSpan } = internals.telemetry; + const origMapped = mapped; + mapped = function (req, _span) { + const oldCtx = getAsyncContext(); + setAsyncContext(context.asyncContext); + const span = new Span("deno.serve"); + try { + enterSpan(span); + return SafePromisePrototypeFinally( + origMapped(req, span), + () => endSpan(span), + ); + } finally { + // equiv to exitSpan. + setAsyncContext(oldCtx); + } + }; + } else { + const origMapped = mapped; + mapped = function (req, span) { + const oldCtx = getAsyncContext(); + setAsyncContext(context.asyncContext); + try { + return origMapped(req, span); + } finally { + setAsyncContext(oldCtx); + } + }; + } + + return mapped; } type RawHandler = ( @@ -795,7 +846,7 @@ function serveHttpOn(context, addr, callback) { // Attempt to pull as many requests out of the queue as possible before awaiting. This API is // a synchronous, non-blocking API that returns u32::MAX if anything goes wrong. while ((req = op_http_try_wait(rid)) !== null) { - PromisePrototypeCatch(callback(req), promiseErrorHandler); + PromisePrototypeCatch(callback(req, undefined), promiseErrorHandler); } currentPromise = op_http_wait(rid); if (!ref) { @@ -815,7 +866,7 @@ function serveHttpOn(context, addr, callback) { if (req === null) { break; } - PromisePrototypeCatch(callback(req), promiseErrorHandler); + PromisePrototypeCatch(callback(req, undefined), promiseErrorHandler); } try { diff --git a/runtime/js/telemetry.ts b/runtime/js/telemetry.ts index ecef3b5e68..96c1c9369f 100644 --- a/runtime/js/telemetry.ts +++ b/runtime/js/telemetry.ts @@ -1,6 +1,6 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. -import { core, primordials } from "ext:core/mod.js"; +import { core, internals, primordials } from "ext:core/mod.js"; import { op_crypto_get_random_values, op_otel_instrumentation_scope_create_and_enter, @@ -32,11 +32,9 @@ const { ObjectDefineProperty, WeakRefPrototypeDeref, String, + StringPrototypePadStart, ObjectPrototypeIsPrototypeOf, - DataView, - DataViewPrototypeSetUint32, SafeWeakRef, - TypedArrayPrototypeGetBuffer, } = primordials; const { AsyncVariable, setAsyncContext } = core; @@ -404,7 +402,7 @@ export class Span { span.#asyncContext = NO_ASYNC_CONTEXT; }; - exitSpan = (span: Span) => { + endSpan = (span: Span) => { const endTime = now(); submit( span.#spanId, @@ -449,39 +447,11 @@ export class Span { const currentSpan: Span | { spanContext(): { traceId: string; spanId: string }; } = CURRENT.get()?.getValue(SPAN_KEY); - if (!currentSpan) { - const buffer = new Uint8Array(TRACE_ID_BYTES + SPAN_ID_BYTES); + if (currentSpan) { if (DETERMINISTIC) { - DataViewPrototypeSetUint32( - new DataView(TypedArrayPrototypeGetBuffer(buffer)), - TRACE_ID_BYTES - 4, - COUNTER, - true, - ); - COUNTER += 1; - DataViewPrototypeSetUint32( - new DataView(TypedArrayPrototypeGetBuffer(buffer)), - TRACE_ID_BYTES + SPAN_ID_BYTES - 4, - COUNTER, - true, - ); - COUNTER += 1; - } else { - op_crypto_get_random_values(buffer); - } - this.#traceId = TypedArrayPrototypeSubarray(buffer, 0, TRACE_ID_BYTES); - this.#spanId = TypedArrayPrototypeSubarray(buffer, TRACE_ID_BYTES); - } else { - this.#spanId = new Uint8Array(SPAN_ID_BYTES); - if (DETERMINISTIC) { - DataViewPrototypeSetUint32( - new DataView(TypedArrayPrototypeGetBuffer(this.#spanId)), - SPAN_ID_BYTES - 4, - COUNTER, - true, - ); - COUNTER += 1; + this.#spanId = StringPrototypePadStart(String(COUNTER++), 16, "0"); } else { + this.#spanId = new Uint8Array(SPAN_ID_BYTES); op_crypto_get_random_values(this.#spanId); } // deno-lint-ignore prefer-primordials @@ -493,6 +463,16 @@ export class Span { this.#traceId = context.traceId; this.#parentSpanId = context.spanId; } + } else { + if (DETERMINISTIC) { + this.#traceId = StringPrototypePadStart(String(COUNTER++), 32, "0"); + this.#spanId = StringPrototypePadStart(String(COUNTER++), 16, "0"); + } else { + const buffer = new Uint8Array(TRACE_ID_BYTES + SPAN_ID_BYTES); + op_crypto_get_random_values(buffer); + this.#traceId = TypedArrayPrototypeSubarray(buffer, 0, TRACE_ID_BYTES); + this.#spanId = TypedArrayPrototypeSubarray(buffer, TRACE_ID_BYTES); + } } } @@ -717,4 +697,16 @@ export function bootstrap( } } -export const telemetry = { SpanExporter, ContextManager }; +export const telemetry = { + SpanExporter, + ContextManager, +}; +internals.telemetry = { + Span, + enterSpan, + exitSpan, + endSpan, + get tracingEnabled() { + return TRACING_ENABLED; + }, +}; diff --git a/runtime/ops/otel.rs b/runtime/ops/otel.rs index 61a7b0ef0d..19f09d9f6d 100644 --- a/runtime/ops/otel.rs +++ b/runtime/ops/otel.rs @@ -835,9 +835,9 @@ fn op_otel_span_set_dropped( #[smi] dropped_events_count: u32, ) { if let Some(temporary_span) = state.try_borrow_mut::() { - temporary_span.0.dropped_attributes_count = dropped_attributes_count; - temporary_span.0.links.dropped_count = dropped_links_count; - temporary_span.0.events.dropped_count = dropped_events_count; + temporary_span.0.dropped_attributes_count += dropped_attributes_count; + temporary_span.0.links.dropped_count += dropped_links_count; + temporary_span.0.events.dropped_count += dropped_events_count; } } diff --git a/tests/specs/cli/otel_basic/__test__.jsonc b/tests/specs/cli/otel_basic/__test__.jsonc index 5a27e92625..991413c3d5 100644 --- a/tests/specs/cli/otel_basic/__test__.jsonc +++ b/tests/specs/cli/otel_basic/__test__.jsonc @@ -2,30 +2,18 @@ "steps": [ { "args": "run -A main.ts basic.ts", - "envs": { - "DENO_UNSTABLE_OTEL_DETERMINISTIC": "1" - }, "output": "basic.out" }, { "args": "run -A main.ts natural_exit.ts", - "envs": { - "DENO_UNSTABLE_OTEL_DETERMINISTIC": "1" - }, "output": "natural_exit.out" }, { "args": "run -A main.ts deno_dot_exit.ts", - "envs": { - "DENO_UNSTABLE_OTEL_DETERMINISTIC": "1" - }, "output": "deno_dot_exit.out" }, { "args": "run -A main.ts uncaught.ts", - "envs": { - "DENO_UNSTABLE_OTEL_DETERMINISTIC": "1" - }, "output": "uncaught.out" } ] diff --git a/tests/specs/cli/otel_basic/basic.out b/tests/specs/cli/otel_basic/basic.out index 3745cb7f35..1883866a1d 100644 --- a/tests/specs/cli/otel_basic/basic.out +++ b/tests/specs/cli/otel_basic/basic.out @@ -1,12 +1,70 @@ { "spans": [ { - "traceId": "10000000000000000000000000000002", - "spanId": "1000000000000003", + "traceId": "00000000000000000000000000000001", + "spanId": "0000000000000002", "traceState": "", - "parentSpanId": "1000000000000001", + "parentSpanId": "", "flags": 1, - "name": "inner span", + "name": "GET", + "kind": 1, + "startTimeUnixNano": "[WILDCARD]", + "endTimeUnixNano": "[WILDCARD]", + "attributes": [ + { + "key": "http.request.method", + "value": { + "stringValue": "GET" + } + }, + { + "key": "url.full", + "value": { + "stringValue": "http://localhost:[WILDCARD]/" + } + }, + { + "key": "url.scheme", + "value": { + "stringValue": "http" + } + }, + { + "key": "url.path", + "value": { + "stringValue": "/" + } + }, + { + "key": "url.query", + "value": { + "stringValue": "" + } + }, + { + "key": "http.response.status_code", + "value": { + "stringValue": "200" + } + } + ], + "droppedAttributesCount": 0, + "events": [], + "droppedEventsCount": 0, + "links": [], + "droppedLinksCount": 0, + "status": { + "message": "", + "code": 0 + } + }, + { + "traceId": "00000000000000000000000000000001", + "spanId": "1000000000000001", + "traceState": "", + "parentSpanId": "0000000000000002", + "flags": 1, + "name": "outer span", "kind": 1, "startTimeUnixNano": "[WILDCARD]", "endTimeUnixNano": "[WILDCARD]", @@ -22,12 +80,12 @@ } }, { - "traceId": "10000000000000000000000000000002", - "spanId": "1000000000000001", + "traceId": "00000000000000000000000000000001", + "spanId": "1000000000000002", "traceState": "", - "parentSpanId": "", + "parentSpanId": "1000000000000001", "flags": 1, - "name": "outer span", + "name": "inner span", "kind": 1, "startTimeUnixNano": "[WILDCARD]", "endTimeUnixNano": "[WILDCARD]", @@ -55,8 +113,8 @@ "attributes": [], "droppedAttributesCount": 0, "flags": 1, - "traceId": "10000000000000000000000000000002", - "spanId": "1000000000000003" + "traceId": "00000000000000000000000000000001", + "spanId": "1000000000000002" }, { "timeUnixNano": "0", @@ -69,8 +127,8 @@ "attributes": [], "droppedAttributesCount": 0, "flags": 1, - "traceId": "10000000000000000000000000000002", - "spanId": "1000000000000003" + "traceId": "00000000000000000000000000000001", + "spanId": "1000000000000002" } ] } diff --git a/tests/specs/cli/otel_basic/main.ts b/tests/specs/cli/otel_basic/main.ts index 5415a7437d..bdbae0cc0e 100644 --- a/tests/specs/cli/otel_basic/main.ts +++ b/tests/specs/cli/otel_basic/main.ts @@ -12,26 +12,39 @@ const server = Deno.serve( const command = new Deno.Command(Deno.execPath(), { args: ["run", "-A", "-q", "--unstable-otel", Deno.args[0]], env: { + DENO_UNSTABLE_OTEL_DETERMINISTIC: "1", OTEL_EXPORTER_OTLP_PROTOCOL: "http/json", OTEL_EXPORTER_OTLP_ENDPOINT: `http://localhost:${port}`, }, stdout: "null", }); const child = command.spawn(); - child.output().then(() => { - server.shutdown(); - - console.log(JSON.stringify(data, null, 2)); - }); + child.output() + .then(() => server.shutdown()) + .then(() => { + data.logs.sort((a, b) => + Number( + BigInt(a.observedTimeUnixNano) - BigInt(b.observedTimeUnixNano), + ) + ); + data.spans.sort((a, b) => + Number(BigInt(`0x${a.spanId}`) - BigInt(`0x${b.spanId}`)) + ); + console.log(JSON.stringify(data, null, 2)); + }); }, async handler(req) { const body = await req.json(); - if (body.resourceLogs) { - data.logs.push(...body.resourceLogs[0].scopeLogs[0].logRecords); - } - if (body.resourceSpans) { - data.spans.push(...body.resourceSpans[0].scopeSpans[0].spans); - } + body.resourceLogs?.forEach((rLogs) => { + rLogs.scopeLogs.forEach((sLogs) => { + data.logs.push(...sLogs.logRecords); + }); + }); + body.resourceSpans?.forEach((rSpans) => { + rSpans.scopeSpans.forEach((sSpans) => { + data.spans.push(...sSpans.spans); + }); + }); return Response.json({ partialSuccess: {} }, { status: 200 }); }, },