From d59bd5e8c9eac0dcf4dadce21a8e30542e80b876 Mon Sep 17 00:00:00 2001 From: snek Date: Mon, 25 Nov 2024 16:38:07 +0100 Subject: [PATCH] feat(unstable): Instrument fetch (#27057) Add basic tracing to `fetch`. Also fix span kinds so that we can differentiate fetch and serve. --- ext/fetch/26_fetch.js | 218 +++++++++++++++++---------- ext/fetch/lib.rs | 7 + ext/http/00_serve.ts | 4 +- runtime/js/telemetry.ts | 14 +- tests/specs/cli/otel_basic/basic.out | 70 ++++++++- 5 files changed, 223 insertions(+), 90 deletions(-) diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js index 8ac364a931..01be983a37 100644 --- a/ext/fetch/26_fetch.js +++ b/ext/fetch/26_fetch.js @@ -10,9 +10,10 @@ /// /// -import { core, primordials } from "ext:core/mod.js"; +import { core, internals, primordials } from "ext:core/mod.js"; import { op_fetch, + op_fetch_promise_is_settled, op_fetch_send, op_wasm_streaming_feed, op_wasm_streaming_set_url, @@ -28,7 +29,9 @@ const { PromisePrototypeThen, PromisePrototypeCatch, SafeArrayIterator, + SafePromisePrototypeFinally, String, + StringPrototypeSlice, StringPrototypeStartsWith, StringPrototypeToLowerCase, TypeError, @@ -307,93 +310,150 @@ function httpRedirectFetch(request, response, terminator) { * @param {RequestInit} init */ function fetch(input, init = { __proto__: null }) { - // There is an async dispatch later that causes a stack trace disconnect. - // We reconnect it by assigning the result of that dispatch to `opPromise`, - // awaiting `opPromise` in an inner function also named `fetch()` and - // returning the result from that. - let opPromise = undefined; - // 1. - const result = new Promise((resolve, reject) => { - const prefix = "Failed to execute 'fetch'"; - webidl.requiredArguments(arguments.length, 1, prefix); - // 2. - const requestObject = new Request(input, init); - // 3. - const request = toInnerRequest(requestObject); - // 4. - if (requestObject.signal.aborted) { - reject(abortFetch(request, null, requestObject.signal.reason)); - return; + let span; + try { + if (internals.telemetry?.tracingEnabled) { + span = new internals.telemetry.Span("fetch", { kind: 2 }); + internals.telemetry.enterSpan(span); } - // 7. - let responseObject = null; - // 9. - let locallyAborted = false; - // 10. - function onabort() { - locallyAborted = true; - reject( - abortFetch(request, responseObject, requestObject.signal.reason), - ); - } - requestObject.signal[abortSignal.add](onabort); + // There is an async dispatch later that causes a stack trace disconnect. + // We reconnect it by assigning the result of that dispatch to `opPromise`, + // awaiting `opPromise` in an inner function also named `fetch()` and + // returning the result from that. + let opPromise = undefined; + // 1. + const result = new Promise((resolve, reject) => { + const prefix = "Failed to execute 'fetch'"; + webidl.requiredArguments(arguments.length, 1, prefix); + // 2. + const requestObject = new Request(input, init); - if (!requestObject.headers.has("Accept")) { - ArrayPrototypePush(request.headerList, ["Accept", "*/*"]); - } + if (span) { + span.updateName(requestObject.method); + span.setAttribute("http.request.method", requestObject.method); + const url = new URL(requestObject.url); + span.setAttribute("url.full", requestObject.url); + span.setAttribute( + "url.scheme", + StringPrototypeSlice(url.protocol, 0, -1), + ); + span.setAttribute("url.path", url.pathname); + span.setAttribute("url.query", StringPrototypeSlice(url.search, 1)); + } - if (!requestObject.headers.has("Accept-Language")) { - ArrayPrototypePush(request.headerList, ["Accept-Language", "*"]); - } + // 3. + const request = toInnerRequest(requestObject); + // 4. + if (requestObject.signal.aborted) { + reject(abortFetch(request, null, requestObject.signal.reason)); + return; + } + // 7. + let responseObject = null; + // 9. + let locallyAborted = false; + // 10. + function onabort() { + locallyAborted = true; + reject( + abortFetch(request, responseObject, requestObject.signal.reason), + ); + } + requestObject.signal[abortSignal.add](onabort); - // 12. - opPromise = PromisePrototypeCatch( - PromisePrototypeThen( - mainFetch(request, false, requestObject.signal), - (response) => { - // 12.1. - if (locallyAborted) return; - // 12.2. - if (response.aborted) { - reject( - abortFetch( - request, - responseObject, - requestObject.signal.reason, - ), - ); + if (!requestObject.headers.has("Accept")) { + ArrayPrototypePush(request.headerList, ["Accept", "*/*"]); + } + + if (!requestObject.headers.has("Accept-Language")) { + ArrayPrototypePush(request.headerList, ["Accept-Language", "*"]); + } + + // 12. + opPromise = PromisePrototypeCatch( + PromisePrototypeThen( + mainFetch(request, false, requestObject.signal), + (response) => { + // 12.1. + if (locallyAborted) return; + // 12.2. + if (response.aborted) { + reject( + abortFetch( + request, + responseObject, + requestObject.signal.reason, + ), + ); + requestObject.signal[abortSignal.remove](onabort); + return; + } + // 12.3. + if (response.type === "error") { + const err = new TypeError( + "Fetch failed: " + (response.error ?? "unknown error"), + ); + reject(err); + requestObject.signal[abortSignal.remove](onabort); + return; + } + responseObject = fromInnerResponse(response, "immutable"); + + if (span) { + span.setAttribute( + "http.response.status_code", + String(responseObject.status), + ); + } + + resolve(responseObject); requestObject.signal[abortSignal.remove](onabort); - return; - } - // 12.3. - if (response.type === "error") { - const err = new TypeError( - "Fetch failed: " + (response.error ?? "unknown error"), - ); - reject(err); - requestObject.signal[abortSignal.remove](onabort); - return; - } - responseObject = fromInnerResponse(response, "immutable"); - resolve(responseObject); + }, + ), + (err) => { + reject(err); requestObject.signal[abortSignal.remove](onabort); }, - ), - (err) => { - reject(err); - requestObject.signal[abortSignal.remove](onabort); - }, - ); - }); - if (opPromise) { - PromisePrototypeCatch(result, () => {}); - return (async function fetch() { - await opPromise; - return result; - })(); + ); + }); + + if (opPromise) { + PromisePrototypeCatch(result, () => {}); + return (async function fetch() { + try { + await opPromise; + return result; + } finally { + if (span) { + internals.telemetry.endSpan(span); + } + } + })(); + } + // We need to end the span when the promise settles. + // WPT has a test that aborted fetch is settled in the same tick. + // This means we cannot wrap the promise if it is already settled. + // But this is OK, because we can just immediately end the span + // in that case. + if (span) { + // XXX: This should always be true, otherwise `opPromise` would be present. + if (op_fetch_promise_is_settled(result)) { + // It's already settled. + internals.telemetry.endSpan(span); + } else { + // Not settled yet, we can return a new wrapper promise. + return SafePromisePrototypeFinally(result, () => { + internals.telemetry.endSpan(span); + }); + } + } + return result; + } finally { + if (span) { + internals.telemetry.exitSpan(span); + } } - return result; } function abortFetch(request, responseObject, error) { diff --git a/ext/fetch/lib.rs b/ext/fetch/lib.rs index 303c955622..7a525053b3 100644 --- a/ext/fetch/lib.rs +++ b/ext/fetch/lib.rs @@ -27,6 +27,7 @@ use deno_core::futures::TryFutureExt; use deno_core::op2; use deno_core::url; use deno_core::url::Url; +use deno_core::v8; use deno_core::AsyncRefCell; use deno_core::AsyncResult; use deno_core::BufView; @@ -141,6 +142,7 @@ deno_core::extension!(deno_fetch, op_fetch_send, op_utf8_to_byte_string, op_fetch_custom_client, + op_fetch_promise_is_settled, ], esm = [ "20_headers.js", @@ -1206,3 +1208,8 @@ pub fn extract_authority(url: &mut Url) -> Option<(String, Option)> { None } + +#[op2(fast)] +fn op_fetch_promise_is_settled(promise: v8::Local) -> bool { + promise.state() != v8::PromiseState::Pending +} diff --git a/ext/http/00_serve.ts b/ext/http/00_serve.ts index 766a6d2739..027c2710b2 100644 --- a/ext/http/00_serve.ts +++ b/ext/http/00_serve.ts @@ -617,13 +617,13 @@ function mapToCallback(context, callback, onError) { fastSyncResponseOrStream(req, inner.body, status, innerRequest); }; - if (internals.telemetry.tracingEnabled) { + if (internals.telemetry?.tracingEnabled) { const { Span, enterSpan, endSpan } = internals.telemetry; const origMapped = mapped; mapped = function (req, _span) { const oldCtx = getAsyncContext(); setAsyncContext(context.asyncContext); - const span = new Span("deno.serve"); + const span = new Span("deno.serve", { kind: 1 }); try { enterSpan(span); return SafePromisePrototypeFinally( diff --git a/runtime/js/telemetry.ts b/runtime/js/telemetry.ts index 96c1c9369f..a98e5b0a19 100644 --- a/runtime/js/telemetry.ts +++ b/runtime/js/telemetry.ts @@ -41,6 +41,8 @@ const { AsyncVariable, setAsyncContext } = core; let TRACING_ENABLED = false; let DETERMINISTIC = false; +// Note: These start at 0 in the JS library, +// but start at 1 when serialized with JSON. enum SpanKind { INTERNAL = 0, SERVER = 1, @@ -91,6 +93,11 @@ interface Attributes { type SpanAttributes = Attributes; +interface SpanOptions { + attributes?: Attributes; + kind?: SpanKind; +} + interface Link { context: SpanContext; attributes?: SpanAttributes; @@ -354,7 +361,7 @@ export class Span { #recording = TRACING_ENABLED; - #kind: number = 0; + #kind: number = SpanKind.INTERNAL; #name: string; #startTime: number; #status: { code: number; message?: string } | null = null; @@ -429,7 +436,7 @@ export class Span { constructor( name: string, - attributes?: Attributes, + options?: SpanOptions, ) { if (!this.isRecording) { this.#name = ""; @@ -442,7 +449,8 @@ export class Span { this.#name = name; this.#startTime = now(); - this.#attributes = attributes ?? { __proto__: null } as never; + this.#attributes = options?.attributes ?? { __proto__: null } as never; + this.#kind = options?.kind ?? SpanKind.INTERNAL; const currentSpan: Span | { spanContext(): { traceId: string; spanId: string }; diff --git a/tests/specs/cli/otel_basic/basic.out b/tests/specs/cli/otel_basic/basic.out index 1883866a1d..88296a7c04 100644 --- a/tests/specs/cli/otel_basic/basic.out +++ b/tests/specs/cli/otel_basic/basic.out @@ -7,7 +7,7 @@ "parentSpanId": "", "flags": 1, "name": "GET", - "kind": 1, + "kind": 3, "startTimeUnixNano": "[WILDCARD]", "endTimeUnixNano": "[WILDCARD]", "attributes": [ @@ -59,10 +59,68 @@ } }, { - "traceId": "00000000000000000000000000000001", + "traceId": "00000000000000000000000000000003", + "spanId": "0000000000000004", + "traceState": "", + "parentSpanId": "", + "flags": 1, + "name": "GET", + "kind": 2, + "startTimeUnixNano": "[WILDCARD]", + "endTimeUnixNano": "[WILDCARD]", + "attributes": [ + { + "key": "http.request.method", + "value": { + "stringValue": "GET" + } + }, + { + "key": "url.full", + "value": { + "stringValue": "http://localhost:[WILDCARD]/" + } + }, + { + "key": "url.scheme", + "value": { + "stringValue": "http" + } + }, + { + "key": "url.path", + "value": { + "stringValue": "/" + } + }, + { + "key": "url.query", + "value": { + "stringValue": "" + } + }, + { + "key": "http.response.status_code", + "value": { + "stringValue": "200" + } + } + ], + "droppedAttributesCount": 0, + "events": [], + "droppedEventsCount": 0, + "links": [], + "droppedLinksCount": 0, + "status": { + "message": "", + "code": 0 + } + }, + { + "traceId": "00000000000000000000000000000003", "spanId": "1000000000000001", "traceState": "", - "parentSpanId": "0000000000000002", + "parentSpanId": "0000000000000004", "flags": 1, "name": "outer span", "kind": 1, @@ -80,7 +138,7 @@ } }, { - "traceId": "00000000000000000000000000000001", + "traceId": "00000000000000000000000000000003", "spanId": "1000000000000002", "traceState": "", "parentSpanId": "1000000000000001", @@ -113,7 +171,7 @@ "attributes": [], "droppedAttributesCount": 0, "flags": 1, - "traceId": "00000000000000000000000000000001", + "traceId": "00000000000000000000000000000003", "spanId": "1000000000000002" }, { @@ -127,7 +185,7 @@ "attributes": [], "droppedAttributesCount": 0, "flags": 1, - "traceId": "00000000000000000000000000000001", + "traceId": "00000000000000000000000000000003", "spanId": "1000000000000002" } ]