From a42ac7f68672f49e81acc207ebc935e978d53aad Mon Sep 17 00:00:00 2001 From: Heyang Zhou Date: Tue, 22 Aug 2023 13:56:00 +0800 Subject: [PATCH] feat(ext/kv): connect to remote database (#20178) This patch adds a `remote` backend for `ext/kv`. This supports connection to Deno Deploy and potentially other services compatible with the KV Connect protocol. --- .github/workflows/bench_cron.yml | 6 + .github/workflows/ci.generate.ts | 6 + .github/workflows/ci.yml | 6 + Cargo.lock | 78 +++ Cargo.toml | 3 +- README.md | 5 + cli/Cargo.toml | 2 +- .../kv-metadata-exchange-response.v1.json | 54 ++ cli/tests/unit/kv_test.ts | 74 +++ ext/kv/Cargo.toml | 7 + ext/kv/README.md | 80 ++- ext/kv/build.rs | 19 + ext/kv/dynamic.rs | 216 +++++++ ext/kv/interface.rs | 7 +- ext/kv/lib.rs | 10 +- ext/kv/proto/datapath.proto | 96 +++ ext/kv/proto/mod.rs | 7 + ext/kv/remote.rs | 558 ++++++++++++++++++ ext/kv/sqlite.rs | 7 +- runtime/permissions/mod.rs | 16 + runtime/web_worker.rs | 4 +- runtime/worker.rs | 4 +- test_util/Cargo.toml | 4 + test_util/build.rs | 22 + test_util/src/kv_remote.rs | 7 + test_util/src/lib.rs | 204 +++++++ 26 files changed, 1490 insertions(+), 12 deletions(-) create mode 100644 cli/schemas/kv-metadata-exchange-response.v1.json create mode 100644 ext/kv/build.rs create mode 100644 ext/kv/dynamic.rs create mode 100644 ext/kv/proto/datapath.proto create mode 100644 ext/kv/proto/mod.rs create mode 100644 ext/kv/remote.rs create mode 100644 test_util/build.rs create mode 100644 test_util/src/kv_remote.rs diff --git a/.github/workflows/bench_cron.yml b/.github/workflows/bench_cron.yml index 3725f80a87..c62a0f267f 100644 --- a/.github/workflows/bench_cron.yml +++ b/.github/workflows/bench_cron.yml @@ -31,6 +31,12 @@ jobs: - uses: dsherret/rust-toolchain-file@v1 + - name: Install protoc + uses: arduino/setup-protoc@v2 + with: + version: "21.12" + repo-token: ${{ secrets.GITHUB_TOKEN }} + - name: Build release run: cargo build --release --locked --all-targets diff --git a/.github/workflows/ci.generate.ts b/.github/workflows/ci.generate.ts index 43190563da..648dc19732 100755 --- a/.github/workflows/ci.generate.ts +++ b/.github/workflows/ci.generate.ts @@ -168,6 +168,11 @@ const installNodeStep = { uses: "actions/setup-node@v3", with: { "node-version": 18 }, }; +const installProtocStep = { + name: "Install protoc", + uses: "arduino/setup-protoc@v2", + with: { "version": "21.12", "repo-token": "${{ secrets.GITHUB_TOKEN }}" }, +}; const installDenoStep = { name: "Install Deno", uses: "denoland/setup-deno@v1", @@ -434,6 +439,7 @@ const ci = { if: "matrix.job == 'bench'", ...installNodeStep, }, + installProtocStep, { if: [ "matrix.profile == 'release' &&", diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 87ecc7ecb0..813f912f66 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -170,6 +170,12 @@ jobs: uses: actions/setup-node@v3 with: node-version: 18 + - name: Install protoc + uses: arduino/setup-protoc@v2 + with: + version: '21.12' + repo-token: '${{ secrets.GITHUB_TOKEN }}' + if: '!(github.event_name == ''pull_request'' && matrix.skip_pr)' - if: |- !(github.event_name == 'pull_request' && matrix.skip_pr) && (matrix.profile == 'release' && matrix.job == 'test' && diff --git a/Cargo.lock b/Cargo.lock index 65c07399b2..baefe36259 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -507,6 +507,7 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", + "serde", "time 0.1.45", "wasm-bindgen", "winapi", @@ -1237,15 +1238,20 @@ dependencies = [ "anyhow", "async-trait", "base64 0.13.1", + "chrono", "deno_core", "hex", "log", "num-bigint", + "prost", + "prost-build", "rand", + "reqwest", "rusqlite", "serde", "serde_json", "tokio", + "url", "uuid", ] @@ -3132,6 +3138,12 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4519a88847ba2d5ead3dc53f1060ec6a571de93f325d9c5c4968147382b1cbc3" +[[package]] +name = "multimap" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" + [[package]] name = "napi-build" version = "1.2.1" @@ -3703,6 +3715,16 @@ dependencies = [ "yansi", ] +[[package]] +name = "prettyplease" +version = "0.1.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c8646e95016a7a6c4adea95bafa8a16baab64b583356217f2c85db4a39d9a86" +dependencies = [ + "proc-macro2 1.0.66", + "syn 1.0.109", +] + [[package]] name = "primeorder" version = "0.13.1" @@ -3770,6 +3792,60 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" +dependencies = [ + "bytes", + "heck", + "itertools", + "lazy_static", + "log", + "multimap", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 1.0.109", + "tempfile", + "which", +] + +[[package]] +name = "prost-derive" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" +dependencies = [ + "anyhow", + "itertools", + "proc-macro2 1.0.66", + "quote 1.0.32", + "syn 1.0.109", +] + +[[package]] +name = "prost-types" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" +dependencies = [ + "prost", +] + [[package]] name = "psm" version = "0.1.21" @@ -5240,6 +5316,8 @@ dependencies = [ "os_pipe", "parking_lot 0.12.1", "pretty_assertions", + "prost", + "prost-build", "regex", "reqwest", "ring", diff --git a/Cargo.toml b/Cargo.toml index 021348ab77..226d5d62fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -82,6 +82,7 @@ brotli = "3.3.4" bytes = "1.4.0" cache_control = "=0.2.0" cbc = { version = "=0.1.2", features = ["alloc"] } +chrono = { version = "=0.4.26", default-features = false, features = ["std", "serde", "clock"] } console_static_text = "=0.8.1" data-url = "=0.2.0" dlopen = "0.1.8" @@ -115,7 +116,7 @@ prost-build = "0.11" rand = "=0.8.5" regex = "^1.7.0" lazy-regex = "2.5.0" -reqwest = { version = "0.11.18", default-features = false, features = ["rustls-tls", "stream", "gzip", "brotli", "socks"] } +reqwest = { version = "0.11.18", default-features = false, features = ["rustls-tls", "stream", "gzip", "brotli", "socks", "json"] } ring = "=0.16.20" rusqlite = { version = "=0.29.0", features = ["unlock_notify", "bundled"] } rustls = "0.21.0" diff --git a/README.md b/README.md index 47d811da7f..d1da25b7ff 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,11 @@ scoop install deno Build and install from source using [Cargo](https://crates.io/crates/deno): ```sh +# Install the Protobuf compiler +apt install -y protobuf-compiler # Linux +brew install protobuf # macOS + +# Build and install Deno cargo install deno --locked ``` diff --git a/cli/Cargo.toml b/cli/Cargo.toml index eff3634fc0..b3a6939acd 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -67,7 +67,7 @@ base32 = "=0.4.0" base64.workspace = true bincode = "=1.3.3" cache_control.workspace = true -chrono = { version = "=0.4.26", default-features = false, features = ["std"] } +chrono.workspace = true clap = { version = "=4.3.3", features = ["string"] } clap_complete = "=4.3.1" clap_complete_fig = "=4.3.1" diff --git a/cli/schemas/kv-metadata-exchange-response.v1.json b/cli/schemas/kv-metadata-exchange-response.v1.json new file mode 100644 index 0000000000..aa29242fbf --- /dev/null +++ b/cli/schemas/kv-metadata-exchange-response.v1.json @@ -0,0 +1,54 @@ +{ + "$id": "https://deno.land/x/deno/cli/schemas/kv-metadata-exchange-response.v1.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "definitions": { + "Uuid": { + "type": "string", + "pattern": "^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$" + }, + "DateTime": { + "type": "string", + "format": "date-time" + }, + "EndpointInfo": { + "type": "object", + "properties": { + "url": { + "type": "string" + }, + "consistency": { + "type": "string" + } + }, + "required": ["url", "consistency"], + "additionalProperties": false + }, + "DatabaseMetadata": { + "type": "object", + "properties": { + "version": { + "type": "integer", + "minimum": 0 + }, + "databaseId": { + "$ref": "#/definitions/Uuid" + }, + "endpoints": { + "type": "array", + "items": { + "$ref": "#/definitions/EndpointInfo" + } + }, + "token": { + "type": "string" + }, + "expiresAt": { + "$ref": "#/definitions/DateTime" + } + }, + "required": ["version", "databaseId", "endpoints", "token", "expiresAt"], + "additionalProperties": false + } + }, + "$ref": "#/definitions/DatabaseMetadata" +} diff --git a/cli/tests/unit/kv_test.ts b/cli/tests/unit/kv_test.ts index 438ebd7eec..acda9a0e2e 100644 --- a/cli/tests/unit/kv_test.ts +++ b/cli/tests/unit/kv_test.ts @@ -20,6 +20,9 @@ try { isCI = true; } +// Defined in test_util/src/lib.rs +Deno.env.set("DENO_KV_ACCESS_TOKEN", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"); + Deno.test({ name: "openKv :memory: no permissions", permissions: {}, @@ -1932,3 +1935,74 @@ Deno.test({ } }, }); + +Deno.test({ + name: "remote backend", + async fn() { + const db = await Deno.openKv("http://localhost:4545/kv_remote_authorize"); + try { + await db.set(["some-key"], 1); + const entry = await db.get(["some-key"]); + assertEquals(entry.value, null); + assertEquals(entry.versionstamp, null); + } finally { + db.close(); + } + }, +}); + +Deno.test({ + name: "remote backend invalid format", + async fn() { + const db = await Deno.openKv( + "http://localhost:4545/kv_remote_authorize_invalid_format", + ); + let ok = false; + try { + await db.set(["some-key"], 1); + } catch (e) { + if ( + e.name === "TypeError" && + e.message.startsWith("Metadata error: Failed to decode metadata: ") + ) { + ok = true; + } else { + throw e; + } + } finally { + db.close(); + } + + if (!ok) { + throw new Error("did not get expected error"); + } + }, +}); + +Deno.test({ + name: "remote backend invalid version", + async fn() { + const db = await Deno.openKv( + "http://localhost:4545/kv_remote_authorize_invalid_version", + ); + let ok = false; + try { + await db.set(["some-key"], 1); + } catch (e) { + if ( + e.name === "TypeError" && + e.message === "Metadata error: Unsupported metadata version: 2" + ) { + ok = true; + } else { + throw e; + } + } finally { + db.close(); + } + + if (!ok) { + throw new Error("did not get expected error"); + } + }, +}); diff --git a/ext/kv/Cargo.toml b/ext/kv/Cargo.toml index 81c01880c5..459555a89e 100644 --- a/ext/kv/Cargo.toml +++ b/ext/kv/Cargo.toml @@ -17,13 +17,20 @@ path = "lib.rs" anyhow.workspace = true async-trait.workspace = true base64.workspace = true +chrono.workspace = true deno_core.workspace = true hex.workspace = true log.workspace = true num-bigint.workspace = true +prost.workspace = true rand.workspace = true +reqwest.workspace = true rusqlite.workspace = true serde.workspace = true serde_json.workspace = true tokio.workspace = true +url.workspace = true uuid.workspace = true + +[build-dependencies] +prost-build.workspace = true diff --git a/ext/kv/README.md b/ext/kv/README.md index 32896da623..f5c2de9ed0 100644 --- a/ext/kv/README.md +++ b/ext/kv/README.md @@ -1,3 +1,81 @@ # deno_kv -This crate provides a key/value store for Deno. +This crate provides a key/value store for Deno. For an overview of Deno KV, +please read the [manual](https://deno.land/manual/runtime/kv). + +## Storage Backends + +Deno KV has a pluggable storage interface that supports multiple backends: + +- SQLite - backed by a local SQLite database. This backend is suitable for + development and is the default when running locally. +- Remote - backed by a remote service that implements the + [KV Connect](#kv-connect) protocol, for example + [Deno Deploy](https://deno.com/deploy). + +Additional backends can be added by implementing the `DatabaseHandler` trait. + +## KV Connect + +The KV Connect protocol has separate control and data planes to maximize +throughput and minimize latency. _Metadata Exchange_ and _Data Path_ are the two +sub-protocols that are used when talking to a KV Connect-compatible service. + +### Metadata Exchange + +To connect to a KV Connect service, the user provides an HTTP or HTTPS URL to +`Deno.openKv`. A background task is then spawned to periodically make HTTP POST +requests to the provided URL to refresh database metadata. + +The HTTP `Authorization` header is included and have the format +`Bearer `. The `` is a static token issued by the +service provider. For Deno Deploy, this is the personal access token generated +from the dashboard. You can specify the access token with the environment +variable `DENO_KV_ACCESS_TOKEN`. + +Request body is currently unused. The response is a JSON message that satisfies +the [JSON Schema](https://json-schema.org/) definition in +`cli/schemas/kv-metadata-exchange-response.v1.json`. + +Semantics of the response fields: + +- `version`: Protocol version. The only supported value is `1`. +- `databaseId`: UUID of the database. +- `endpoints`: Data plane endpoints that can serve requests to the database, + along with their consistency levels. +- `token`: An ephemeral authentication token that must be included in all + requests to the data plane. This value is an opaque string and the client + should not depend on its format. +- `expiresAt`: The time at which the token expires. Encoded as an ISO 8601 + string. + +### Data Path + +After the first metadata exchange has completed, the client can talk to the data +plane endpoints listed in the `endpoints` field using a Protobuf-over-HTTP +protocol called the _Data Path_. The Protobuf messages are defined in +`proto/datapath.proto`. + +Two sub-endpoints are available under a data plane endpoint URL: + +- `POST /snapshot_read`: Used for read operations: `kv.get()` and + `kv.getMany()`. + - **Request type**: `SnapshotRead` + - **Response type**: `SnapshotReadOutput` +- `POST /atomic_write`: Used for write operations: `kv.set()` and + `kv.atomic().commit()`. + - **Request type**: `AtomicWrite` + - **Response type**: `AtomicWriteOutput` + +An HTTP `Authorization` header in the format `Bearer ` must be +included in all requests to the data plane. The value of `` is +the `token` field from the metadata exchange response. + +### Error handling + +All non-client errors (i.e. network errors and HTTP 5xx status codes) are +handled by retrying the request. Randomized exponential backoff is applied to +each retry. + +Client errors cannot be recovered by retrying. A JavaScript exception is +generated for each of those errors. diff --git a/ext/kv/build.rs b/ext/kv/build.rs new file mode 100644 index 0000000000..eba8a20f71 --- /dev/null +++ b/ext/kv/build.rs @@ -0,0 +1,19 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +use std::env; +use std::io; +use std::path::PathBuf; + +fn main() -> io::Result<()> { + println!("cargo:rerun-if-changed=./proto"); + + let descriptor_path = + PathBuf::from(env::var("OUT_DIR").unwrap()).join("proto_descriptor.bin"); + + prost_build::Config::new() + .file_descriptor_set_path(&descriptor_path) + .compile_well_known_types() + .compile_protos(&["proto/datapath.proto"], &["proto/"])?; + + Ok(()) +} diff --git a/ext/kv/dynamic.rs b/ext/kv/dynamic.rs new file mode 100644 index 0000000000..f79c10f559 --- /dev/null +++ b/ext/kv/dynamic.rs @@ -0,0 +1,216 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +use std::cell::RefCell; +use std::rc::Rc; + +use crate::remote::RemoteDbHandlerPermissions; +use crate::sqlite::SqliteDbHandler; +use crate::sqlite::SqliteDbHandlerPermissions; +use crate::AtomicWrite; +use crate::CommitResult; +use crate::Database; +use crate::DatabaseHandler; +use crate::QueueMessageHandle; +use crate::ReadRange; +use crate::ReadRangeOutput; +use crate::SnapshotReadOptions; +use async_trait::async_trait; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::OpState; + +pub struct MultiBackendDbHandler { + backends: Vec<(&'static [&'static str], Box)>, +} + +impl MultiBackendDbHandler { + pub fn new( + backends: Vec<(&'static [&'static str], Box)>, + ) -> Self { + Self { backends } + } + + pub fn remote_or_sqlite< + P: SqliteDbHandlerPermissions + RemoteDbHandlerPermissions + 'static, + >( + default_storage_dir: Option, + ) -> Self { + Self::new(vec![ + ( + &["https://", "http://"], + Box::new(crate::remote::RemoteDbHandler::

::new()), + ), + ( + &[""], + Box::new(SqliteDbHandler::

::new(default_storage_dir)), + ), + ]) + } +} + +#[async_trait(?Send)] +impl DatabaseHandler for MultiBackendDbHandler { + type DB = Box; + + async fn open( + &self, + state: Rc>, + path: Option, + ) -> Result { + for (prefixes, handler) in &self.backends { + for &prefix in *prefixes { + if prefix.is_empty() { + return handler.dyn_open(state.clone(), path.clone()).await; + } + let Some(path) = &path else { + continue; + }; + if path.starts_with(prefix) { + return handler.dyn_open(state.clone(), Some(path.clone())).await; + } + } + } + Err(type_error(format!( + "No backend supports the given path: {:?}", + path + ))) + } +} + +#[async_trait(?Send)] +pub trait DynamicDbHandler { + async fn dyn_open( + &self, + state: Rc>, + path: Option, + ) -> Result, AnyError>; +} + +#[async_trait(?Send)] +impl DatabaseHandler for Box { + type DB = Box; + + async fn open( + &self, + state: Rc>, + path: Option, + ) -> Result { + (**self).dyn_open(state, path).await + } +} + +#[async_trait(?Send)] +impl DynamicDbHandler for T +where + T: DatabaseHandler, + DB: Database + 'static, +{ + async fn dyn_open( + &self, + state: Rc>, + path: Option, + ) -> Result, AnyError> { + Ok(Box::new(self.open(state, path).await?)) + } +} + +#[async_trait(?Send)] +pub trait DynamicDb { + async fn dyn_snapshot_read( + &self, + state: Rc>, + requests: Vec, + options: SnapshotReadOptions, + ) -> Result, AnyError>; + + async fn dyn_atomic_write( + &self, + state: Rc>, + write: AtomicWrite, + ) -> Result, AnyError>; + + async fn dyn_dequeue_next_message( + &self, + state: Rc>, + ) -> Result, AnyError>; + + fn dyn_close(&self); +} + +#[async_trait(?Send)] +impl Database for Box { + type QMH = Box; + + async fn snapshot_read( + &self, + state: Rc>, + requests: Vec, + options: SnapshotReadOptions, + ) -> Result, AnyError> { + (**self).dyn_snapshot_read(state, requests, options).await + } + + async fn atomic_write( + &self, + state: Rc>, + write: AtomicWrite, + ) -> Result, AnyError> { + (**self).dyn_atomic_write(state, write).await + } + + async fn dequeue_next_message( + &self, + state: Rc>, + ) -> Result, AnyError> { + (**self).dyn_dequeue_next_message(state).await + } + + fn close(&self) { + (**self).dyn_close() + } +} + +#[async_trait(?Send)] +impl DynamicDb for T +where + T: Database, + QMH: QueueMessageHandle + 'static, +{ + async fn dyn_snapshot_read( + &self, + state: Rc>, + requests: Vec, + options: SnapshotReadOptions, + ) -> Result, AnyError> { + Ok(self.snapshot_read(state, requests, options).await?) + } + + async fn dyn_atomic_write( + &self, + state: Rc>, + write: AtomicWrite, + ) -> Result, AnyError> { + Ok(self.atomic_write(state, write).await?) + } + + async fn dyn_dequeue_next_message( + &self, + state: Rc>, + ) -> Result, AnyError> { + Ok(Box::new(self.dequeue_next_message(state).await?)) + } + + fn dyn_close(&self) { + self.close() + } +} + +#[async_trait(?Send)] +impl QueueMessageHandle for Box { + async fn take_payload(&mut self) -> Result, AnyError> { + (**self).take_payload().await + } + async fn finish(&self, success: bool) -> Result<(), AnyError> { + (**self).finish(success).await + } +} diff --git a/ext/kv/interface.rs b/ext/kv/interface.rs index 28b43f8d71..abeaf8dd51 100644 --- a/ext/kv/interface.rs +++ b/ext/kv/interface.rs @@ -29,16 +29,21 @@ pub trait Database { async fn snapshot_read( &self, + state: Rc>, requests: Vec, options: SnapshotReadOptions, ) -> Result, AnyError>; async fn atomic_write( &self, + state: Rc>, write: AtomicWrite, ) -> Result, AnyError>; - async fn dequeue_next_message(&self) -> Result; + async fn dequeue_next_message( + &self, + state: Rc>, + ) -> Result; fn close(&self); } diff --git a/ext/kv/lib.rs b/ext/kv/lib.rs index 7164a700bf..f226b11ae7 100644 --- a/ext/kv/lib.rs +++ b/ext/kv/lib.rs @@ -1,7 +1,10 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. pub mod codec; +pub mod dynamic; mod interface; +mod proto; +pub mod remote; pub mod sqlite; use std::borrow::Cow; @@ -285,7 +288,8 @@ where let opts = SnapshotReadOptions { consistency: consistency.into(), }; - let output_ranges = db.snapshot_read(read_ranges, opts).await?; + let output_ranges = + db.snapshot_read(state.clone(), read_ranges, opts).await?; let output_ranges = output_ranges .into_iter() .map(|x| { @@ -323,7 +327,7 @@ where resource.db.clone() }; - let mut handle = db.dequeue_next_message().await?; + let mut handle = db.dequeue_next_message(state.clone()).await?; let payload = handle.take_payload().await?.into(); let handle_rid = { let mut state = state.borrow_mut(); @@ -660,7 +664,7 @@ where enqueues, }; - let result = db.atomic_write(atomic_write).await?; + let result = db.atomic_write(state.clone(), atomic_write).await?; Ok(result.map(|res| hex::encode(res.versionstamp))) } diff --git a/ext/kv/proto/datapath.proto b/ext/kv/proto/datapath.proto new file mode 100644 index 0000000000..ea48f2385c --- /dev/null +++ b/ext/kv/proto/datapath.proto @@ -0,0 +1,96 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +syntax = "proto3"; + +package datapath; + +message SnapshotRead { + repeated ReadRange ranges = 1; +} + +message SnapshotReadOutput { + repeated ReadRangeOutput ranges = 1; + bool read_disabled = 2; + repeated string regions_if_read_disabled = 3; + bool read_is_strongly_consistent = 4; + string primary_if_not_strongly_consistent = 5; +} + +message ReadRange { + bytes start = 1; + bytes end = 2; + int32 limit = 3; + bool reverse = 4; +} + +message ReadRangeOutput { + repeated KvEntry values = 1; +} + +message AtomicWrite { + repeated KvCheck kv_checks = 1; + repeated KvMutation kv_mutations = 2; + repeated Enqueue enqueues = 3; +} + +message AtomicWriteOutput { + AtomicWriteStatus status = 1; + bytes versionstamp = 2; + string primary_if_write_disabled = 3; +} + +message KvCheck { + bytes key = 1; + bytes versionstamp = 2; // 10-byte raw versionstamp +} + +message KvMutation { + bytes key = 1; + KvValue value = 2; + KvMutationType mutation_type = 3; +} + +message KvValue { + bytes data = 1; + KvValueEncoding encoding = 2; +} + +message KvEntry { + bytes key = 1; + bytes value = 2; + KvValueEncoding encoding = 3; + bytes versionstamp = 4; +} + +enum KvMutationType { + M_UNSPECIFIED = 0; + M_SET = 1; + M_CLEAR = 2; + M_SUM = 3; + M_MAX = 4; + M_MIN = 5; +} + +enum KvValueEncoding { + VE_UNSPECIFIED = 0; + VE_V8 = 1; + VE_LE64 = 2; + VE_BYTES = 3; +} + +enum AtomicWriteStatus { + AW_UNSPECIFIED = 0; + AW_SUCCESS = 1; + AW_CHECK_FAILURE = 2; + AW_UNSUPPORTED_WRITE = 3; + AW_USAGE_LIMIT_EXCEEDED = 4; + AW_WRITE_DISABLED = 5; + AW_QUEUE_BACKLOG_LIMIT_EXCEEDED = 6; +} + +message Enqueue { + bytes payload = 1; + int64 deadline_ms = 2; + repeated bytes kv_keys_if_undelivered = 3; + repeated uint32 backoff_schedule = 4; +} diff --git a/ext/kv/proto/mod.rs b/ext/kv/proto/mod.rs new file mode 100644 index 0000000000..d258a05511 --- /dev/null +++ b/ext/kv/proto/mod.rs @@ -0,0 +1,7 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +// Generated code, disable lints +#[allow(clippy::all, non_snake_case)] +pub mod datapath { + include!(concat!(env!("OUT_DIR"), "/datapath.rs")); +} diff --git a/ext/kv/remote.rs b/ext/kv/remote.rs new file mode 100644 index 0000000000..47528d15fb --- /dev/null +++ b/ext/kv/remote.rs @@ -0,0 +1,558 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +use std::cell::RefCell; +use std::marker::PhantomData; +use std::rc::Rc; +use std::sync::Arc; +use std::time::Duration; + +use crate::proto::datapath as pb; +use crate::AtomicWrite; +use crate::CommitResult; +use crate::Database; +use crate::DatabaseHandler; +use crate::KvEntry; +use crate::MutationKind; +use crate::QueueMessageHandle; +use crate::ReadRange; +use crate::ReadRangeOutput; +use crate::SnapshotReadOptions; +use anyhow::Context; +use async_trait::async_trait; +use chrono::DateTime; +use chrono::Utc; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::futures::TryFutureExt; +use deno_core::task::JoinHandle; +use deno_core::OpState; +use prost::Message; +use rand::Rng; +use serde::Deserialize; +use tokio::sync::watch; +use url::Url; +use uuid::Uuid; + +pub trait RemoteDbHandlerPermissions { + fn check_env(&mut self, var: &str) -> Result<(), AnyError>; + fn check_net_url( + &mut self, + url: &Url, + api_name: &str, + ) -> Result<(), AnyError>; +} + +pub struct RemoteDbHandler { + _p: std::marker::PhantomData

, +} + +impl RemoteDbHandler

{ + pub fn new() -> Self { + Self { _p: PhantomData } + } +} + +impl Default for RemoteDbHandler

{ + fn default() -> Self { + Self::new() + } +} + +#[derive(Deserialize)] +struct VersionInfo { + version: u64, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +#[allow(dead_code)] +struct DatabaseMetadata { + version: u64, + database_id: Uuid, + endpoints: Vec, + token: String, + expires_at: DateTime, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct EndpointInfo { + pub url: String, + + // Using `String` instead of an enum, so that parsing doesn't + // break if more consistency levels are added. + pub consistency: String, +} + +#[async_trait(?Send)] +impl DatabaseHandler for RemoteDbHandler

{ + type DB = RemoteDb

; + + async fn open( + &self, + state: Rc>, + path: Option, + ) -> Result { + const ENV_VAR_NAME: &str = "DENO_KV_ACCESS_TOKEN"; + + let Some(url) = path else { + return Err(type_error("Missing database url")); + }; + + let Ok(parsed_url) = Url::parse(&url) else { + return Err(type_error(format!("Invalid database url: {}", url))); + }; + + { + let mut state = state.borrow_mut(); + let permissions = state.borrow_mut::

(); + permissions.check_env(ENV_VAR_NAME)?; + permissions.check_net_url(&parsed_url, "Deno.openKv")?; + } + + let access_token = std::env::var(ENV_VAR_NAME) + .map_err(anyhow::Error::from) + .with_context(|| { + "Missing DENO_KV_ACCESS_TOKEN environment variable. Please set it to your access token from https://dash.deno.com/account." + })?; + + let refresher = MetadataRefresher::new(url, access_token); + + let db = RemoteDb { + client: reqwest::Client::new(), + refresher, + _p: PhantomData, + }; + Ok(db) + } +} + +pub struct RemoteDb { + client: reqwest::Client, + refresher: MetadataRefresher, + _p: std::marker::PhantomData

, +} + +pub struct DummyQueueMessageHandle {} + +#[async_trait(?Send)] +impl QueueMessageHandle for DummyQueueMessageHandle { + async fn take_payload(&mut self) -> Result, AnyError> { + unimplemented!() + } + + async fn finish(&self, _success: bool) -> Result<(), AnyError> { + unimplemented!() + } +} + +#[async_trait(?Send)] +impl Database for RemoteDb

{ + type QMH = DummyQueueMessageHandle; + + async fn snapshot_read( + &self, + state: Rc>, + requests: Vec, + _options: SnapshotReadOptions, + ) -> Result, AnyError> { + let req = pb::SnapshotRead { + ranges: requests + .into_iter() + .map(|r| pb::ReadRange { + start: r.start, + end: r.end, + limit: r.limit.get() as _, + reverse: r.reverse, + }) + .collect(), + }; + + let res: pb::SnapshotReadOutput = call_remote::( + &state, + &self.refresher, + &self.client, + "snapshot_read", + &req, + ) + .await?; + + if res.read_disabled { + return Err(type_error("Reads are disabled for this database.")); + } + + let out = res + .ranges + .into_iter() + .map(|r| { + Ok(ReadRangeOutput { + entries: r + .values + .into_iter() + .map(|e| { + let encoding = e.encoding(); + Ok(KvEntry { + key: e.key, + value: decode_value(e.value, encoding)?, + versionstamp: <[u8; 10]>::try_from(&e.versionstamp[..])?, + }) + }) + .collect::>()?, + }) + }) + .collect::, AnyError>>()?; + Ok(out) + } + + async fn atomic_write( + &self, + state: Rc>, + write: AtomicWrite, + ) -> Result, AnyError> { + if !write.enqueues.is_empty() { + return Err(type_error("Enqueue operations are not supported yet.")); + } + + let req = pb::AtomicWrite { + kv_checks: write + .checks + .into_iter() + .map(|x| { + Ok(pb::KvCheck { + key: x.key, + versionstamp: x.versionstamp.unwrap_or([0u8; 10]).to_vec(), + }) + }) + .collect::>()?, + kv_mutations: write + .mutations + .into_iter() + .map(|x| encode_mutation(x.key, x.kind)) + .collect(), + enqueues: vec![], + }; + + let res: pb::AtomicWriteOutput = call_remote::( + &state, + &self.refresher, + &self.client, + "atomic_write", + &req, + ) + .await?; + match res.status() { + pb::AtomicWriteStatus::AwSuccess => Ok(Some(CommitResult { + versionstamp: if res.versionstamp.is_empty() { + Default::default() + } else { + res.versionstamp[..].try_into()? + }, + })), + pb::AtomicWriteStatus::AwCheckFailure => Ok(None), + pb::AtomicWriteStatus::AwUnsupportedWrite => { + Err(type_error("Unsupported write")) + } + pb::AtomicWriteStatus::AwUsageLimitExceeded => { + Err(type_error("The database usage limit has been exceeded.")) + } + pb::AtomicWriteStatus::AwWriteDisabled => { + // TODO: Auto retry + Err(type_error("Writes are disabled for this database.")) + } + pb::AtomicWriteStatus::AwUnspecified => { + Err(type_error("Unspecified error")) + } + pb::AtomicWriteStatus::AwQueueBacklogLimitExceeded => { + Err(type_error("Queue backlog limit exceeded")) + } + } + } + + async fn dequeue_next_message( + &self, + _state: Rc>, + ) -> Result { + deno_core::futures::future::pending().await + } + + fn close(&self) {} +} + +fn decode_value( + value: Vec, + encoding: pb::KvValueEncoding, +) -> anyhow::Result { + match encoding { + pb::KvValueEncoding::VeV8 => Ok(crate::Value::V8(value)), + pb::KvValueEncoding::VeBytes => Ok(crate::Value::Bytes(value)), + pb::KvValueEncoding::VeLe64 => Ok(crate::Value::U64(u64::from_le_bytes( + <[u8; 8]>::try_from(&value[..])?, + ))), + pb::KvValueEncoding::VeUnspecified => { + Err(anyhow::anyhow!("Unspecified value encoding, cannot decode")) + } + } +} + +fn encode_value(value: crate::Value) -> pb::KvValue { + match value { + crate::Value::V8(data) => pb::KvValue { + data, + encoding: pb::KvValueEncoding::VeV8 as _, + }, + crate::Value::Bytes(data) => pb::KvValue { + data, + encoding: pb::KvValueEncoding::VeBytes as _, + }, + crate::Value::U64(x) => pb::KvValue { + data: x.to_le_bytes().to_vec(), + encoding: pb::KvValueEncoding::VeLe64 as _, + }, + } +} + +fn encode_mutation(key: Vec, mutation: MutationKind) -> pb::KvMutation { + match mutation { + MutationKind::Set(x) => pb::KvMutation { + key, + value: Some(encode_value(x)), + mutation_type: pb::KvMutationType::MSet as _, + }, + MutationKind::Delete => pb::KvMutation { + key, + value: Some(encode_value(crate::Value::Bytes(vec![]))), + mutation_type: pb::KvMutationType::MClear as _, + }, + MutationKind::Max(x) => pb::KvMutation { + key, + value: Some(encode_value(x)), + mutation_type: pb::KvMutationType::MMax as _, + }, + MutationKind::Min(x) => pb::KvMutation { + key, + value: Some(encode_value(x)), + mutation_type: pb::KvMutationType::MMin as _, + }, + MutationKind::Sum(x) => pb::KvMutation { + key, + value: Some(encode_value(x)), + mutation_type: pb::KvMutationType::MSum as _, + }, + } +} + +#[derive(Clone)] +enum MetadataState { + Ready(Arc), + Invalid(String), + Pending, +} + +struct MetadataRefresher { + metadata_rx: watch::Receiver, + handle: JoinHandle<()>, +} + +impl MetadataRefresher { + pub fn new(url: String, access_token: String) -> Self { + let (tx, rx) = watch::channel(MetadataState::Pending); + let handle = + deno_core::task::spawn(metadata_refresh_task(url, access_token, tx)); + Self { + handle, + metadata_rx: rx, + } + } +} + +impl Drop for MetadataRefresher { + fn drop(&mut self) { + self.handle.abort(); + } +} + +async fn metadata_refresh_task( + metadata_url: String, + access_token: String, + tx: watch::Sender, +) { + let client = reqwest::Client::new(); + loop { + let mut attempt = 0u64; + let metadata = loop { + match fetch_metadata(&client, &metadata_url, &access_token).await { + Ok(Ok(x)) => break x, + Ok(Err(e)) => { + if tx.send(MetadataState::Invalid(e)).is_err() { + return; + } + } + Err(e) => { + log::error!("Failed to fetch database metadata: {}", e); + } + } + randomized_exponential_backoff(Duration::from_secs(5), attempt).await; + attempt += 1; + }; + + let ms_until_expire = u64::try_from( + metadata + .expires_at + .timestamp_millis() + .saturating_sub(Utc::now().timestamp_millis()), + ) + .unwrap_or_default(); + + // Refresh 10 minutes before expiry + // In case of buggy clocks, don't refresh more than once per minute + let interval = Duration::from_millis(ms_until_expire) + .saturating_sub(Duration::from_secs(600)) + .max(Duration::from_secs(60)); + + if tx.send(MetadataState::Ready(Arc::new(metadata))).is_err() { + return; + } + + tokio::time::sleep(interval).await; + } +} + +async fn fetch_metadata( + client: &reqwest::Client, + metadata_url: &str, + access_token: &str, +) -> anyhow::Result> { + let res = client + .post(metadata_url) + .header("authorization", format!("Bearer {}", access_token)) + .send() + .await?; + + if !res.status().is_success() { + if res.status().is_client_error() { + return Ok(Err(format!( + "Client error while fetching metadata: {:?} {}", + res.status(), + res.text().await? + ))); + } else { + anyhow::bail!( + "remote returned error: {:?} {}", + res.status(), + res.text().await? + ); + } + } + + let res = res.bytes().await?; + let version_info: VersionInfo = match serde_json::from_slice(&res) { + Ok(x) => x, + Err(e) => return Ok(Err(format!("Failed to decode version info: {}", e))), + }; + if version_info.version > 1 { + return Ok(Err(format!( + "Unsupported metadata version: {}", + version_info.version + ))); + } + + Ok( + serde_json::from_slice(&res) + .map_err(|e| format!("Failed to decode metadata: {}", e)), + ) +} + +async fn randomized_exponential_backoff(base: Duration, attempt: u64) { + let attempt = attempt.min(12); + let delay = base.as_millis() as u64 + (2 << attempt); + let delay = delay + rand::thread_rng().gen_range(0..(delay / 2) + 1); + tokio::time::sleep(std::time::Duration::from_millis(delay)).await; +} + +async fn call_remote< + P: RemoteDbHandlerPermissions + 'static, + T: Message, + R: Message + Default, +>( + state: &RefCell, + refresher: &MetadataRefresher, + client: &reqwest::Client, + method: &str, + req: &T, +) -> anyhow::Result { + let mut attempt = 0u64; + let res = loop { + let mut metadata_rx = refresher.metadata_rx.clone(); + let metadata = loop { + match &*metadata_rx.borrow() { + MetadataState::Pending => {} + MetadataState::Ready(x) => break x.clone(), + MetadataState::Invalid(e) => { + return Err(type_error(format!("Metadata error: {}", e))) + } + } + // `unwrap()` never fails because `tx` is owned by the task held by `refresher`. + metadata_rx.changed().await.unwrap(); + }; + let Some(sc_endpoint) = metadata.endpoints.iter().find(|x| x.consistency == "strong") else { + return Err(type_error("No strong consistency endpoint is available for this database")); + }; + + let full_url = format!("{}/{}", sc_endpoint.url, method); + { + let parsed_url = Url::parse(&full_url)?; + let mut state = state.borrow_mut(); + let permissions = state.borrow_mut::

(); + permissions.check_net_url(&parsed_url, "Deno.Kv")?; + } + + let res = client + .post(&full_url) + .header("x-transaction-domain-id", metadata.database_id.to_string()) + .header("authorization", format!("Bearer {}", metadata.token)) + .body(req.encode_to_vec()) + .send() + .map_err(anyhow::Error::from) + .and_then(|x| async move { + if x.status().is_success() { + Ok(Ok(x.bytes().await?)) + } else if x.status().is_client_error() { + Ok(Err((x.status(), x.text().await?))) + } else { + Err(anyhow::anyhow!( + "server error ({:?}): {}", + x.status(), + x.text().await? + )) + } + }) + .await; + + match res { + Ok(x) => break x, + Err(e) => { + log::error!("retryable error in {}: {}", method, e); + randomized_exponential_backoff(Duration::from_millis(0), attempt).await; + attempt += 1; + } + } + }; + + let res = match res { + Ok(x) => x, + Err((status, message)) => { + return Err(type_error(format!( + "client error in {} (status {:?}): {}", + method, status, message + ))) + } + }; + + match R::decode(&*res) { + Ok(x) => Ok(x), + Err(e) => Err(type_error(format!( + "failed to decode response from {}: {}", + method, e + ))), + } +} diff --git a/ext/kv/sqlite.rs b/ext/kv/sqlite.rs index 5147bba06e..f67154cb4d 100644 --- a/ext/kv/sqlite.rs +++ b/ext/kv/sqlite.rs @@ -724,6 +724,7 @@ impl Database for SqliteDb { async fn snapshot_read( &self, + _state: Rc>, requests: Vec, _options: SnapshotReadOptions, ) -> Result, AnyError> { @@ -769,6 +770,7 @@ impl Database for SqliteDb { async fn atomic_write( &self, + _state: Rc>, write: AtomicWrite, ) -> Result, AnyError> { let write = Arc::new(write); @@ -894,7 +896,10 @@ impl Database for SqliteDb { Ok(commit_result) } - async fn dequeue_next_message(&self) -> Result { + async fn dequeue_next_message( + &self, + _state: Rc>, + ) -> Result { let queue = self .queue .get_or_init(|| async move { SqliteQueue::new(self.conn.clone()) }) diff --git a/runtime/permissions/mod.rs b/runtime/permissions/mod.rs index 93294fc926..a87ca309f3 100644 --- a/runtime/permissions/mod.rs +++ b/runtime/permissions/mod.rs @@ -1483,6 +1483,22 @@ impl deno_kv::sqlite::SqliteDbHandlerPermissions for PermissionsContainer { } } +impl deno_kv::remote::RemoteDbHandlerPermissions for PermissionsContainer { + #[inline(always)] + fn check_env(&mut self, var: &str) -> Result<(), AnyError> { + self.0.lock().env.check(var) + } + + #[inline(always)] + fn check_net_url( + &mut self, + url: &url::Url, + api_name: &str, + ) -> Result<(), AnyError> { + self.0.lock().net.check_url(url, Some(api_name)) + } +} + fn unit_permission_from_flag_bools( allow_flag: bool, deny_flag: bool, diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index 0c4e951404..8a88dfa409 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -38,7 +38,7 @@ use deno_core::SourceMapGetter; use deno_fs::FileSystem; use deno_http::DefaultHttpPropertyExtractor; use deno_io::Stdio; -use deno_kv::sqlite::SqliteDbHandler; +use deno_kv::dynamic::MultiBackendDbHandler; use deno_node::SUPPORTED_BUILTIN_NODE_MODULES_WITH_PREFIX; use deno_tls::RootCertStoreProvider; use deno_web::create_entangled_message_port; @@ -439,7 +439,7 @@ impl WebWorker { ), deno_tls::deno_tls::init_ops_and_esm(), deno_kv::deno_kv::init_ops_and_esm( - SqliteDbHandler::::new(None), + MultiBackendDbHandler::remote_or_sqlite::(None), unstable, ), deno_napi::deno_napi::init_ops_and_esm::(), diff --git a/runtime/worker.rs b/runtime/worker.rs index 5eefd5fa89..a31bd2ae1b 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -35,7 +35,7 @@ use deno_core::SourceMapGetter; use deno_fs::FileSystem; use deno_http::DefaultHttpPropertyExtractor; use deno_io::Stdio; -use deno_kv::sqlite::SqliteDbHandler; +use deno_kv::dynamic::MultiBackendDbHandler; use deno_node::SUPPORTED_BUILTIN_NODE_MODULES_WITH_PREFIX; use deno_tls::RootCertStoreProvider; use deno_web::BlobStore; @@ -334,7 +334,7 @@ impl MainWorker { ), deno_tls::deno_tls::init_ops_and_esm(), deno_kv::deno_kv::init_ops_and_esm( - SqliteDbHandler::::new( + MultiBackendDbHandler::remote_or_sqlite::( options.origin_storage_dir.clone(), ), unstable, diff --git a/test_util/Cargo.toml b/test_util/Cargo.toml index 2f35473e8a..6ea9d870dd 100644 --- a/test_util/Cargo.toml +++ b/test_util/Cargo.toml @@ -31,6 +31,7 @@ once_cell.workspace = true os_pipe.workspace = true parking_lot.workspace = true pretty_assertions.workspace = true +prost.workspace = true regex.workspace = true reqwest.workspace = true ring.workspace = true @@ -46,3 +47,6 @@ url.workspace = true [target.'cfg(windows)'.dependencies] winapi = { workspace = true, features = ["consoleapi", "synchapi", "handleapi", "namedpipeapi", "winbase", "winerror"] } + +[build-dependencies] +prost-build.workspace = true diff --git a/test_util/build.rs b/test_util/build.rs new file mode 100644 index 0000000000..420abd0a10 --- /dev/null +++ b/test_util/build.rs @@ -0,0 +1,22 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +use std::env; +use std::io; +use std::path::PathBuf; + +fn main() -> io::Result<()> { + println!("cargo:rerun-if-changed=../ext/kv/proto"); + + let descriptor_path = + PathBuf::from(env::var("OUT_DIR").unwrap()).join("proto_descriptor.bin"); + + prost_build::Config::new() + .file_descriptor_set_path(&descriptor_path) + .compile_well_known_types() + .compile_protos( + &["../ext/kv/proto/datapath.proto"], + &["../ext/kv/proto/"], + )?; + + Ok(()) +} diff --git a/test_util/src/kv_remote.rs b/test_util/src/kv_remote.rs new file mode 100644 index 0000000000..d258a05511 --- /dev/null +++ b/test_util/src/kv_remote.rs @@ -0,0 +1,7 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +// Generated code, disable lints +#[allow(clippy::all, non_snake_case)] +pub mod datapath { + include!(concat!(env!("OUT_DIR"), "/datapath.rs")); +} diff --git a/test_util/src/lib.rs b/test_util/src/lib.rs index 136576890c..41fb96a1b8 100644 --- a/test_util/src/lib.rs +++ b/test_util/src/lib.rs @@ -15,9 +15,16 @@ use hyper::Body; use hyper::Request; use hyper::Response; use hyper::StatusCode; +use kv_remote::datapath::AtomicWrite; +use kv_remote::datapath::AtomicWriteOutput; +use kv_remote::datapath::AtomicWriteStatus; +use kv_remote::datapath::ReadRangeOutput; +use kv_remote::datapath::SnapshotRead; +use kv_remote::datapath::SnapshotReadOutput; use npm::CUSTOM_NPM_PACKAGE_CACHE; use once_cell::sync::Lazy; use pretty_assertions::assert_eq; +use prost::Message; use pty::Pty; use regex::Regex; use rustls::Certificate; @@ -57,6 +64,7 @@ pub mod assertions; mod builders; pub mod factory; mod fs; +mod kv_remote; pub mod lsp; mod npm; pub mod pty; @@ -72,6 +80,9 @@ const PORT: u16 = 4545; const TEST_AUTH_TOKEN: &str = "abcdef123456789"; const TEST_BASIC_AUTH_USERNAME: &str = "testuser123"; const TEST_BASIC_AUTH_PASSWORD: &str = "testpassabc"; +const KV_DATABASE_ID: &str = "11111111-1111-1111-1111-111111111111"; +const KV_ACCESS_TOKEN: &str = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; +const KV_DATABASE_TOKEN: &str = "MOCKMOCKMOCKMOCKMOCKMOCKMOCK"; const REDIRECT_PORT: u16 = 4546; const ANOTHER_REDIRECT_PORT: u16 = 4547; const DOUBLE_REDIRECTS_PORT: u16 = 4548; @@ -1095,6 +1106,199 @@ async fn main_server( let res = Response::new(Body::from(query.unwrap_or_default())); Ok(res) } + (&hyper::Method::POST, "/kv_remote_authorize") => { + if req + .headers() + .get("authorization") + .and_then(|x| x.to_str().ok()) + .unwrap_or_default() + != format!("Bearer {}", KV_ACCESS_TOKEN) + { + return Ok( + Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body(Body::empty()) + .unwrap(), + ); + } + + Ok( + Response::builder() + .header("content-type", "application/json") + .body(Body::from( + serde_json::json!({ + "version": 1, + "databaseId": KV_DATABASE_ID, + "endpoints": [ + { + "url": format!("http://localhost:{}/kv_blackhole", PORT), + "consistency": "strong", + } + ], + "token": KV_DATABASE_TOKEN, + "expiresAt": "2099-01-01T00:00:00Z", + }) + .to_string(), + )) + .unwrap(), + ) + } + (&hyper::Method::POST, "/kv_remote_authorize_invalid_format") => { + if req + .headers() + .get("authorization") + .and_then(|x| x.to_str().ok()) + .unwrap_or_default() + != format!("Bearer {}", KV_ACCESS_TOKEN) + { + return Ok( + Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body(Body::empty()) + .unwrap(), + ); + } + + Ok( + Response::builder() + .header("content-type", "application/json") + .body(Body::from( + serde_json::json!({ + "version": 1, + "databaseId": KV_DATABASE_ID, + }) + .to_string(), + )) + .unwrap(), + ) + } + (&hyper::Method::POST, "/kv_remote_authorize_invalid_version") => { + if req + .headers() + .get("authorization") + .and_then(|x| x.to_str().ok()) + .unwrap_or_default() + != format!("Bearer {}", KV_ACCESS_TOKEN) + { + return Ok( + Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body(Body::empty()) + .unwrap(), + ); + } + + Ok( + Response::builder() + .header("content-type", "application/json") + .body(Body::from( + serde_json::json!({ + "version": 2, + "databaseId": KV_DATABASE_ID, + "endpoints": [ + { + "url": format!("http://localhost:{}/kv_blackhole", PORT), + "consistency": "strong", + } + ], + "token": KV_DATABASE_TOKEN, + "expiresAt": "2099-01-01T00:00:00Z", + }) + .to_string(), + )) + .unwrap(), + ) + } + (&hyper::Method::POST, "/kv_blackhole/snapshot_read") => { + if req + .headers() + .get("authorization") + .and_then(|x| x.to_str().ok()) + .unwrap_or_default() + != format!("Bearer {}", KV_DATABASE_TOKEN) + { + return Ok( + Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body(Body::empty()) + .unwrap(), + ); + } + + let body = hyper::body::to_bytes(req.into_body()) + .await + .unwrap_or_default(); + let Ok(body): Result = prost::Message::decode(&body[..]) else { + return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::empty()) + .unwrap()); + }; + if body.ranges.is_empty() { + return Ok( + Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::empty()) + .unwrap(), + ); + } + Ok( + Response::builder() + .body(Body::from( + SnapshotReadOutput { + ranges: body + .ranges + .iter() + .map(|_| ReadRangeOutput { values: vec![] }) + .collect(), + read_disabled: false, + regions_if_read_disabled: vec![], + read_is_strongly_consistent: true, + primary_if_not_strongly_consistent: "".into(), + } + .encode_to_vec(), + )) + .unwrap(), + ) + } + (&hyper::Method::POST, "/kv_blackhole/atomic_write") => { + if req + .headers() + .get("authorization") + .and_then(|x| x.to_str().ok()) + .unwrap_or_default() + != format!("Bearer {}", KV_DATABASE_TOKEN) + { + return Ok( + Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body(Body::empty()) + .unwrap(), + ); + } + + let body = hyper::body::to_bytes(req.into_body()) + .await + .unwrap_or_default(); + let Ok(_body): Result = prost::Message::decode(&body[..]) else { + return Ok(Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::empty()) + .unwrap()); + }; + Ok( + Response::builder() + .body(Body::from( + AtomicWriteOutput { + status: AtomicWriteStatus::AwSuccess.into(), + versionstamp: vec![0u8; 10], + primary_if_write_disabled: "".into(), + } + .encode_to_vec(), + )) + .unwrap(), + ) + } _ => { let mut file_path = testdata_path().to_path_buf(); file_path.push(&req.uri().path()[1..]);