// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.

pub mod config;
pub mod dynamic;
mod interface;
pub mod remote;
pub mod sqlite;

use std::borrow::Cow;
use std::cell::RefCell;
use std::num::NonZeroU32;
use std::rc::Rc;
use std::time::Duration;

use base64::prelude::BASE64_URL_SAFE;
use base64::Engine;
use chrono::DateTime;
use chrono::Utc;
use deno_core::error::get_custom_error_class;
use deno_core::futures::StreamExt;
use deno_core::op2;
use deno_core::serde_v8::AnyValue;
use deno_core::serde_v8::BigInt;
use deno_core::AsyncRefCell;
use deno_core::ByteString;
use deno_core::CancelFuture;
use deno_core::CancelHandle;
use deno_core::JsBuffer;
use deno_core::OpState;
use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_core::ToJsBuffer;
use denokv_proto::decode_key;
use denokv_proto::encode_key;
use denokv_proto::AtomicWrite;
use denokv_proto::Check;
use denokv_proto::Consistency;
use denokv_proto::Database;
use denokv_proto::Enqueue;
use denokv_proto::Key;
use denokv_proto::KeyPart;
use denokv_proto::KvEntry;
use denokv_proto::KvValue;
use denokv_proto::Mutation;
use denokv_proto::MutationKind;
use denokv_proto::QueueMessageHandle;
use denokv_proto::ReadRange;
use denokv_proto::SnapshotReadOptions;
use denokv_proto::WatchKeyOutput;
use denokv_proto::WatchStream;
use log::debug;
use serde::Deserialize;
use serde::Serialize;

pub use crate::config::*;
pub use crate::interface::*;

pub const UNSTABLE_FEATURE_NAME: &str = "kv";

deno_core::extension!(deno_kv,
  deps = [ deno_console, deno_web ],
  parameters = [ DBH: DatabaseHandler ],
  ops = [
    op_kv_database_open<DBH>,
    op_kv_snapshot_read<DBH>,
    op_kv_atomic_write<DBH>,
    op_kv_encode_cursor,
    op_kv_dequeue_next_message<DBH>,
    op_kv_finish_dequeued_message<DBH>,
    op_kv_watch<DBH>,
    op_kv_watch_next,
  ],
  esm = [ "01_db.ts" ],
  options = {
    handler: DBH,
    config: KvConfig,
  },
  state = |state, options| {
    state.put(Rc::new(options.config));
    state.put(Rc::new(options.handler));
  }
);

struct DatabaseResource<DB: Database + 'static> {
  db: DB,
  cancel_handle: Rc<CancelHandle>,
}

impl<DB: Database + 'static> Resource for DatabaseResource<DB> {
  fn name(&self) -> Cow<str> {
    "database".into()
  }

  fn close(self: Rc<Self>) {
    self.db.close();
    self.cancel_handle.cancel();
  }
}

struct DatabaseWatcherResource {
  stream: AsyncRefCell<WatchStream>,
  db_cancel_handle: Rc<CancelHandle>,
  cancel_handle: Rc<CancelHandle>,
}

impl Resource for DatabaseWatcherResource {
  fn name(&self) -> Cow<str> {
    "databaseWatcher".into()
  }

  fn close(self: Rc<Self>) {
    self.cancel_handle.cancel()
  }
}

#[derive(Debug, thiserror::Error)]
pub enum KvError {
  #[error(transparent)]
  DatabaseHandler(deno_core::error::AnyError),
  #[error(transparent)]
  Resource(deno_core::error::AnyError),
  #[error("Too many ranges (max {0})")]
  TooManyRanges(usize),
  #[error("Too many entries (max {0})")]
  TooManyEntries(usize),
  #[error("Too many checks (max {0})")]
  TooManyChecks(usize),
  #[error("Too many mutations (max {0})")]
  TooManyMutations(usize),
  #[error("Too many keys (max {0})")]
  TooManyKeys(usize),
  #[error("limit must be greater than 0")]
  InvalidLimit,
  #[error("Invalid boundary key")]
  InvalidBoundaryKey,
  #[error("Key too large for read (max {0} bytes)")]
  KeyTooLargeToRead(usize),
  #[error("Key too large for write (max {0} bytes)")]
  KeyTooLargeToWrite(usize),
  #[error("Total mutation size too large (max {0} bytes)")]
  TotalMutationTooLarge(usize),
  #[error("Total key size too large (max {0} bytes)")]
  TotalKeyTooLarge(usize),
  #[error(transparent)]
  Kv(deno_core::error::AnyError),
  #[error(transparent)]
  Io(#[from] std::io::Error),
  #[error("Queue message not found")]
  QueueMessageNotFound,
  #[error("Start key is not in the keyspace defined by prefix")]
  StartKeyNotInKeyspace,
  #[error("End key is not in the keyspace defined by prefix")]
  EndKeyNotInKeyspace,
  #[error("Start key is greater than end key")]
  StartKeyGreaterThanEndKey,
  #[error("Invalid check")]
  InvalidCheck(#[source] KvCheckError),
  #[error("Invalid mutation")]
  InvalidMutation(#[source] KvMutationError),
  #[error("Invalid enqueue")]
  InvalidEnqueue(#[source] std::io::Error),
  #[error("key cannot be empty")]
  EmptyKey, // TypeError
  #[error("Value too large (max {0} bytes)")]
  ValueTooLarge(usize), // TypeError
  #[error("enqueue payload too large (max {0} bytes)")]
  EnqueuePayloadTooLarge(usize), // TypeError
  #[error("invalid cursor")]
  InvalidCursor,
  #[error("cursor out of bounds")]
  CursorOutOfBounds,
  #[error("Invalid range")]
  InvalidRange,
}

#[op2(async)]
#[smi]
async fn op_kv_database_open<DBH>(
  state: Rc<RefCell<OpState>>,
  #[string] path: Option<String>,
) -> Result<ResourceId, KvError>
where
  DBH: DatabaseHandler + 'static,
{
  let handler = {
    let state = state.borrow();
    state
      .feature_checker
      .check_or_exit(UNSTABLE_FEATURE_NAME, "Deno.openKv");
    state.borrow::<Rc<DBH>>().clone()
  };
  let db = handler
    .open(state.clone(), path)
    .await
    .map_err(KvError::DatabaseHandler)?;
  let rid = state.borrow_mut().resource_table.add(DatabaseResource {
    db,
    cancel_handle: CancelHandle::new_rc(),
  });
  Ok(rid)
}

type KvKey = Vec<AnyValue>;

fn key_part_from_v8(value: AnyValue) -> KeyPart {
  match value {
    AnyValue::Bool(false) => KeyPart::False,
    AnyValue::Bool(true) => KeyPart::True,
    AnyValue::Number(n) => KeyPart::Float(n),
    AnyValue::BigInt(n) => KeyPart::Int(n),
    AnyValue::String(s) => KeyPart::String(s),
    AnyValue::V8Buffer(buf) => KeyPart::Bytes(buf.to_vec()),
    AnyValue::RustBuffer(_) => unreachable!(),
  }
}

fn key_part_to_v8(value: KeyPart) -> AnyValue {
  match value {
    KeyPart::False => AnyValue::Bool(false),
    KeyPart::True => AnyValue::Bool(true),
    KeyPart::Float(n) => AnyValue::Number(n),
    KeyPart::Int(n) => AnyValue::BigInt(n),
    KeyPart::String(s) => AnyValue::String(s),
    KeyPart::Bytes(buf) => AnyValue::RustBuffer(buf.into()),
  }
}

#[derive(Debug, Deserialize)]
#[serde(tag = "kind", content = "value", rename_all = "snake_case")]
enum FromV8Value {
  V8(JsBuffer),
  Bytes(JsBuffer),
  U64(BigInt),
}

#[derive(Debug, Serialize)]
#[serde(tag = "kind", content = "value", rename_all = "snake_case")]
enum ToV8Value {
  V8(ToJsBuffer),
  Bytes(ToJsBuffer),
  U64(BigInt),
}

impl TryFrom<FromV8Value> for KvValue {
  type Error = num_bigint::TryFromBigIntError<num_bigint::BigInt>;
  fn try_from(value: FromV8Value) -> Result<Self, Self::Error> {
    Ok(match value {
      FromV8Value::V8(buf) => KvValue::V8(buf.to_vec()),
      FromV8Value::Bytes(buf) => KvValue::Bytes(buf.to_vec()),
      FromV8Value::U64(n) => {
        KvValue::U64(num_bigint::BigInt::from(n).try_into()?)
      }
    })
  }
}

impl From<KvValue> for ToV8Value {
  fn from(value: KvValue) -> Self {
    match value {
      KvValue::V8(buf) => ToV8Value::V8(buf.into()),
      KvValue::Bytes(buf) => ToV8Value::Bytes(buf.into()),
      KvValue::U64(n) => ToV8Value::U64(num_bigint::BigInt::from(n).into()),
    }
  }
}

#[derive(Serialize)]
struct ToV8KvEntry {
  key: KvKey,
  value: ToV8Value,
  versionstamp: ByteString,
}

impl TryFrom<KvEntry> for ToV8KvEntry {
  type Error = std::io::Error;
  fn try_from(entry: KvEntry) -> Result<Self, Self::Error> {
    Ok(ToV8KvEntry {
      key: decode_key(&entry.key)?
        .0
        .into_iter()
        .map(key_part_to_v8)
        .collect(),
      value: entry.value.into(),
      versionstamp: faster_hex::hex_string(&entry.versionstamp).into(),
    })
  }
}

#[derive(Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
enum V8Consistency {
  Strong,
  Eventual,
}

impl From<V8Consistency> for Consistency {
  fn from(value: V8Consistency) -> Self {
    match value {
      V8Consistency::Strong => Consistency::Strong,
      V8Consistency::Eventual => Consistency::Eventual,
    }
  }
}

// (prefix, start, end, limit, reverse, cursor)
type SnapshotReadRange = (
  Option<KvKey>,
  Option<KvKey>,
  Option<KvKey>,
  u32,
  bool,
  Option<ByteString>,
);

#[op2(async)]
#[serde]
async fn op_kv_snapshot_read<DBH>(
  state: Rc<RefCell<OpState>>,
  #[smi] rid: ResourceId,
  #[serde] ranges: Vec<SnapshotReadRange>,
  #[serde] consistency: V8Consistency,
) -> Result<Vec<Vec<ToV8KvEntry>>, KvError>
where
  DBH: DatabaseHandler + 'static,
{
  let db = {
    let state = state.borrow();
    let resource = state
      .resource_table
      .get::<DatabaseResource<DBH::DB>>(rid)
      .map_err(KvError::Resource)?;
    resource.db.clone()
  };

  let config = {
    let state = state.borrow();
    state.borrow::<Rc<KvConfig>>().clone()
  };

  if ranges.len() > config.max_read_ranges {
    return Err(KvError::TooManyRanges(config.max_read_ranges));
  }

  let mut total_entries = 0usize;

  let read_ranges = ranges
    .into_iter()
    .map(|(prefix, start, end, limit, reverse, cursor)| {
      let selector = RawSelector::from_tuple(prefix, start, end)?;

      let (start, end) =
        decode_selector_and_cursor(&selector, reverse, cursor.as_ref())?;
      check_read_key_size(&start, &config)?;
      check_read_key_size(&end, &config)?;

      total_entries += limit as usize;
      Ok(ReadRange {
        start,
        end,
        limit: NonZeroU32::new(limit).ok_or(KvError::InvalidLimit)?,
        reverse,
      })
    })
    .collect::<Result<Vec<_>, KvError>>()?;

  if total_entries > config.max_read_entries {
    return Err(KvError::TooManyEntries(config.max_read_entries));
  }

  let opts = SnapshotReadOptions {
    consistency: consistency.into(),
  };
  let output_ranges = db
    .snapshot_read(read_ranges, opts)
    .await
    .map_err(KvError::Kv)?;
  let output_ranges = output_ranges
    .into_iter()
    .map(|x| {
      x.entries
        .into_iter()
        .map(TryInto::try_into)
        .collect::<Result<Vec<_>, std::io::Error>>()
    })
    .collect::<Result<Vec<_>, std::io::Error>>()?;
  Ok(output_ranges)
}

struct QueueMessageResource<QPH: QueueMessageHandle + 'static> {
  handle: QPH,
}

impl<QMH: QueueMessageHandle + 'static> Resource for QueueMessageResource<QMH> {
  fn name(&self) -> Cow<str> {
    "queueMessage".into()
  }
}

#[op2(async)]
#[serde]
async fn op_kv_dequeue_next_message<DBH>(
  state: Rc<RefCell<OpState>>,
  #[smi] rid: ResourceId,
) -> Result<Option<(ToJsBuffer, ResourceId)>, KvError>
where
  DBH: DatabaseHandler + 'static,
{
  let db = {
    let state = state.borrow();
    let resource =
      match state.resource_table.get::<DatabaseResource<DBH::DB>>(rid) {
        Ok(resource) => resource,
        Err(err) => {
          if get_custom_error_class(&err) == Some("BadResource") {
            return Ok(None);
          } else {
            return Err(KvError::Resource(err));
          }
        }
      };
    resource.db.clone()
  };

  let Some(mut handle) =
    db.dequeue_next_message().await.map_err(KvError::Kv)?
  else {
    return Ok(None);
  };
  let payload = handle.take_payload().await.map_err(KvError::Kv)?.into();
  let handle_rid = {
    let mut state = state.borrow_mut();
    state.resource_table.add(QueueMessageResource { handle })
  };
  Ok(Some((payload, handle_rid)))
}

#[op2]
#[smi]
fn op_kv_watch<DBH>(
  state: &mut OpState,
  #[smi] rid: ResourceId,
  #[serde] keys: Vec<KvKey>,
) -> Result<ResourceId, KvError>
where
  DBH: DatabaseHandler + 'static,
{
  let resource = state
    .resource_table
    .get::<DatabaseResource<DBH::DB>>(rid)
    .map_err(KvError::Resource)?;
  let config = state.borrow::<Rc<KvConfig>>().clone();

  if keys.len() > config.max_watched_keys {
    return Err(KvError::TooManyKeys(config.max_watched_keys));
  }

  let keys: Vec<Vec<u8>> = keys
    .into_iter()
    .map(encode_v8_key)
    .collect::<std::io::Result<_>>()?;

  for k in &keys {
    check_read_key_size(k, &config)?;
  }

  let stream = resource.db.watch(keys);

  let rid = state.resource_table.add(DatabaseWatcherResource {
    stream: AsyncRefCell::new(stream),
    db_cancel_handle: resource.cancel_handle.clone(),
    cancel_handle: CancelHandle::new_rc(),
  });

  Ok(rid)
}

#[derive(Serialize)]
#[serde(rename_all = "camelCase", untagged)]
enum WatchEntry {
  Changed(Option<ToV8KvEntry>),
  Unchanged,
}

#[op2(async)]
#[serde]
async fn op_kv_watch_next(
  state: Rc<RefCell<OpState>>,
  #[smi] rid: ResourceId,
) -> Result<Option<Vec<WatchEntry>>, KvError> {
  let resource = {
    let state = state.borrow();
    let resource = state
      .resource_table
      .get::<DatabaseWatcherResource>(rid)
      .map_err(KvError::Resource)?;
    resource.clone()
  };

  let db_cancel_handle = resource.db_cancel_handle.clone();
  let cancel_handle = resource.cancel_handle.clone();
  let stream = RcRef::map(resource, |r| &r.stream)
    .borrow_mut()
    .or_cancel(db_cancel_handle.clone())
    .or_cancel(cancel_handle.clone())
    .await;
  let Ok(Ok(mut stream)) = stream else {
    return Ok(None);
  };

  // We hold a strong reference to `resource`, so we can't rely on the stream
  // being dropped when the db connection is closed
  let Ok(Ok(Some(res))) = stream
    .next()
    .or_cancel(db_cancel_handle)
    .or_cancel(cancel_handle)
    .await
  else {
    return Ok(None);
  };

  let entries = res.map_err(KvError::Kv)?;
  let entries = entries
    .into_iter()
    .map(|entry| {
      Ok(match entry {
        WatchKeyOutput::Changed { entry } => {
          WatchEntry::Changed(entry.map(TryInto::try_into).transpose()?)
        }
        WatchKeyOutput::Unchanged => WatchEntry::Unchanged,
      })
    })
    .collect::<Result<_, KvError>>()?;

  Ok(Some(entries))
}

#[op2(async)]
async fn op_kv_finish_dequeued_message<DBH>(
  state: Rc<RefCell<OpState>>,
  #[smi] handle_rid: ResourceId,
  success: bool,
) -> Result<(), KvError>
where
  DBH: DatabaseHandler + 'static,
{
  let handle = {
    let mut state = state.borrow_mut();
    let handle = state
      .resource_table
      .take::<QueueMessageResource<<<DBH>::DB as Database>::QMH>>(handle_rid)
      .map_err(|_| KvError::QueueMessageNotFound)?;
    Rc::try_unwrap(handle)
      .map_err(|_| KvError::QueueMessageNotFound)?
      .handle
  };
  // if we fail to finish the message, there is not much we can do and the
  // message will be retried anyway, so we just ignore the error
  if let Err(err) = handle.finish(success).await {
    debug!("Failed to finish dequeued message: {}", err);
  };
  Ok(())
}

#[derive(Debug, thiserror::Error)]
pub enum KvCheckError {
  #[error("invalid versionstamp")]
  InvalidVersionstamp,
  #[error(transparent)]
  Io(std::io::Error),
}

type V8KvCheck = (KvKey, Option<ByteString>);

fn check_from_v8(value: V8KvCheck) -> Result<Check, KvCheckError> {
  let versionstamp = match value.1 {
    Some(data) => {
      let mut out = [0u8; 10];
      if data.len() != out.len() * 2 {
        return Err(KvCheckError::InvalidVersionstamp);
      }
      faster_hex::hex_decode(&data, &mut out)
        .map_err(|_| KvCheckError::InvalidVersionstamp)?;
      Some(out)
    }
    None => None,
  };
  Ok(Check {
    key: encode_v8_key(value.0).map_err(KvCheckError::Io)?,
    versionstamp,
  })
}

#[derive(Debug, thiserror::Error)]
pub enum KvMutationError {
  #[error(transparent)]
  BigInt(#[from] num_bigint::TryFromBigIntError<num_bigint::BigInt>),
  #[error(transparent)]
  Io(#[from] std::io::Error),
  #[error("Invalid mutation '{0}' with value")]
  InvalidMutationWithValue(String),
  #[error("Invalid mutation '{0}' without value")]
  InvalidMutationWithoutValue(String),
}

type V8KvMutation = (KvKey, String, Option<FromV8Value>, Option<u64>);

fn mutation_from_v8(
  (value, current_timstamp): (V8KvMutation, DateTime<Utc>),
) -> Result<Mutation, KvMutationError> {
  let key = encode_v8_key(value.0)?;
  let kind = match (value.1.as_str(), value.2) {
    ("set", Some(value)) => MutationKind::Set(value.try_into()?),
    ("delete", None) => MutationKind::Delete,
    ("sum", Some(value)) => MutationKind::Sum {
      value: value.try_into()?,
      min_v8: vec![],
      max_v8: vec![],
      clamp: false,
    },
    ("min", Some(value)) => MutationKind::Min(value.try_into()?),
    ("max", Some(value)) => MutationKind::Max(value.try_into()?),
    ("setSuffixVersionstampedKey", Some(value)) => {
      MutationKind::SetSuffixVersionstampedKey(value.try_into()?)
    }
    (op, Some(_)) => {
      return Err(KvMutationError::InvalidMutationWithValue(op.to_string()))
    }
    (op, None) => {
      return Err(KvMutationError::InvalidMutationWithoutValue(op.to_string()))
    }
  };
  Ok(Mutation {
    key,
    kind,
    expire_at: value
      .3
      .map(|expire_in| current_timstamp + Duration::from_millis(expire_in)),
  })
}

type V8Enqueue = (JsBuffer, u64, Vec<KvKey>, Option<Vec<u32>>);

fn enqueue_from_v8(
  value: V8Enqueue,
  current_timestamp: DateTime<Utc>,
) -> Result<Enqueue, std::io::Error> {
  Ok(Enqueue {
    payload: value.0.to_vec(),
    deadline: current_timestamp
      + chrono::Duration::milliseconds(value.1 as i64),
    keys_if_undelivered: value
      .2
      .into_iter()
      .map(encode_v8_key)
      .collect::<std::io::Result<_>>()?,
    backoff_schedule: value.3,
  })
}

fn encode_v8_key(key: KvKey) -> Result<Vec<u8>, std::io::Error> {
  encode_key(&Key(key.into_iter().map(key_part_from_v8).collect()))
}

enum RawSelector {
  Prefixed {
    prefix: Vec<u8>,
    start: Option<Vec<u8>>,
    end: Option<Vec<u8>>,
  },
  Range {
    start: Vec<u8>,
    end: Vec<u8>,
  },
}

impl RawSelector {
  fn from_tuple(
    prefix: Option<KvKey>,
    start: Option<KvKey>,
    end: Option<KvKey>,
  ) -> Result<Self, KvError> {
    let prefix = prefix.map(encode_v8_key).transpose()?;
    let start = start.map(encode_v8_key).transpose()?;
    let end = end.map(encode_v8_key).transpose()?;

    match (prefix, start, end) {
      (Some(prefix), None, None) => Ok(Self::Prefixed {
        prefix,
        start: None,
        end: None,
      }),
      (Some(prefix), Some(start), None) => {
        if !start.starts_with(&prefix) || start.len() == prefix.len() {
          return Err(KvError::StartKeyNotInKeyspace);
        }
        Ok(Self::Prefixed {
          prefix,
          start: Some(start),
          end: None,
        })
      }
      (Some(prefix), None, Some(end)) => {
        if !end.starts_with(&prefix) || end.len() == prefix.len() {
          return Err(KvError::EndKeyNotInKeyspace);
        }
        Ok(Self::Prefixed {
          prefix,
          start: None,
          end: Some(end),
        })
      }
      (None, Some(start), Some(end)) => {
        if start > end {
          return Err(KvError::StartKeyGreaterThanEndKey);
        }
        Ok(Self::Range { start, end })
      }
      (None, Some(start), None) => {
        let end = start.iter().copied().chain(Some(0)).collect();
        Ok(Self::Range { start, end })
      }
      _ => Err(KvError::InvalidRange),
    }
  }

  fn start(&self) -> Option<&[u8]> {
    match self {
      Self::Prefixed { start, .. } => start.as_deref(),
      Self::Range { start, .. } => Some(start),
    }
  }

  fn end(&self) -> Option<&[u8]> {
    match self {
      Self::Prefixed { end, .. } => end.as_deref(),
      Self::Range { end, .. } => Some(end),
    }
  }

  fn common_prefix(&self) -> &[u8] {
    match self {
      Self::Prefixed { prefix, .. } => prefix,
      Self::Range { start, end } => common_prefix_for_bytes(start, end),
    }
  }

  fn range_start_key(&self) -> Vec<u8> {
    match self {
      Self::Prefixed {
        start: Some(start), ..
      } => start.clone(),
      Self::Range { start, .. } => start.clone(),
      Self::Prefixed { prefix, .. } => {
        prefix.iter().copied().chain(Some(0)).collect()
      }
    }
  }

  fn range_end_key(&self) -> Vec<u8> {
    match self {
      Self::Prefixed { end: Some(end), .. } => end.clone(),
      Self::Range { end, .. } => end.clone(),
      Self::Prefixed { prefix, .. } => {
        prefix.iter().copied().chain(Some(0xff)).collect()
      }
    }
  }
}

fn common_prefix_for_bytes<'a>(a: &'a [u8], b: &'a [u8]) -> &'a [u8] {
  let mut i = 0;
  while i < a.len() && i < b.len() && a[i] == b[i] {
    i += 1;
  }
  &a[..i]
}

fn encode_cursor(
  selector: &RawSelector,
  boundary_key: &[u8],
) -> Result<String, KvError> {
  let common_prefix = selector.common_prefix();
  if !boundary_key.starts_with(common_prefix) {
    return Err(KvError::InvalidBoundaryKey);
  }
  Ok(BASE64_URL_SAFE.encode(&boundary_key[common_prefix.len()..]))
}

fn decode_selector_and_cursor(
  selector: &RawSelector,
  reverse: bool,
  cursor: Option<&ByteString>,
) -> Result<(Vec<u8>, Vec<u8>), KvError> {
  let Some(cursor) = cursor else {
    return Ok((selector.range_start_key(), selector.range_end_key()));
  };

  let common_prefix = selector.common_prefix();
  let cursor = BASE64_URL_SAFE
    .decode(cursor)
    .map_err(|_| KvError::InvalidCursor)?;

  let first_key: Vec<u8>;
  let last_key: Vec<u8>;

  if reverse {
    first_key = selector.range_start_key();
    last_key = common_prefix
      .iter()
      .copied()
      .chain(cursor.iter().copied())
      .collect();
  } else {
    first_key = common_prefix
      .iter()
      .copied()
      .chain(cursor.iter().copied())
      .chain(Some(0))
      .collect();
    last_key = selector.range_end_key();
  }

  // Defend against out-of-bounds reading
  if let Some(start) = selector.start() {
    if &first_key[..] < start {
      return Err(KvError::CursorOutOfBounds);
    }
  }

  if let Some(end) = selector.end() {
    if &last_key[..] > end {
      return Err(KvError::CursorOutOfBounds);
    }
  }

  Ok((first_key, last_key))
}

#[op2(async)]
#[string]
async fn op_kv_atomic_write<DBH>(
  state: Rc<RefCell<OpState>>,
  #[smi] rid: ResourceId,
  #[serde] checks: Vec<V8KvCheck>,
  #[serde] mutations: Vec<V8KvMutation>,
  #[serde] enqueues: Vec<V8Enqueue>,
) -> Result<Option<String>, KvError>
where
  DBH: DatabaseHandler + 'static,
{
  let current_timestamp = chrono::Utc::now();
  let db = {
    let state = state.borrow();
    let resource = state
      .resource_table
      .get::<DatabaseResource<DBH::DB>>(rid)
      .map_err(KvError::Resource)?;
    resource.db.clone()
  };

  let config = {
    let state = state.borrow();
    state.borrow::<Rc<KvConfig>>().clone()
  };

  if checks.len() > config.max_checks {
    return Err(KvError::TooManyChecks(config.max_checks));
  }

  if mutations.len() + enqueues.len() > config.max_mutations {
    return Err(KvError::TooManyMutations(config.max_mutations));
  }

  let checks = checks
    .into_iter()
    .map(check_from_v8)
    .collect::<Result<Vec<Check>, KvCheckError>>()
    .map_err(KvError::InvalidCheck)?;
  let mutations = mutations
    .into_iter()
    .map(|mutation| mutation_from_v8((mutation, current_timestamp)))
    .collect::<Result<Vec<Mutation>, KvMutationError>>()
    .map_err(KvError::InvalidMutation)?;
  let enqueues = enqueues
    .into_iter()
    .map(|e| enqueue_from_v8(e, current_timestamp))
    .collect::<Result<Vec<Enqueue>, std::io::Error>>()
    .map_err(KvError::InvalidEnqueue)?;

  let mut total_payload_size = 0usize;
  let mut total_key_size = 0usize;

  for key in checks
    .iter()
    .map(|c| &c.key)
    .chain(mutations.iter().map(|m| &m.key))
  {
    if key.is_empty() {
      return Err(KvError::EmptyKey);
    }

    total_payload_size += check_write_key_size(key, &config)?;
  }

  for (key, value) in mutations
    .iter()
    .flat_map(|m| m.kind.value().map(|x| (&m.key, x)))
  {
    let key_size = check_write_key_size(key, &config)?;
    total_payload_size += check_value_size(value, &config)? + key_size;
    total_key_size += key_size;
  }

  for enqueue in &enqueues {
    total_payload_size +=
      check_enqueue_payload_size(&enqueue.payload, &config)?;
    if let Some(schedule) = enqueue.backoff_schedule.as_ref() {
      total_payload_size += 4 * schedule.len();
    }
  }

  if total_payload_size > config.max_total_mutation_size_bytes {
    return Err(KvError::TotalMutationTooLarge(
      config.max_total_mutation_size_bytes,
    ));
  }

  if total_key_size > config.max_total_key_size_bytes {
    return Err(KvError::TotalKeyTooLarge(config.max_total_key_size_bytes));
  }

  let atomic_write = AtomicWrite {
    checks,
    mutations,
    enqueues,
  };

  let result = db.atomic_write(atomic_write).await.map_err(KvError::Kv)?;

  Ok(result.map(|res| faster_hex::hex_string(&res.versionstamp)))
}

// (prefix, start, end)
type EncodeCursorRangeSelector = (Option<KvKey>, Option<KvKey>, Option<KvKey>);

#[op2]
#[string]
fn op_kv_encode_cursor(
  #[serde] (prefix, start, end): EncodeCursorRangeSelector,
  #[serde] boundary_key: KvKey,
) -> Result<String, KvError> {
  let selector = RawSelector::from_tuple(prefix, start, end)?;
  let boundary_key = encode_v8_key(boundary_key)?;
  let cursor = encode_cursor(&selector, &boundary_key)?;
  Ok(cursor)
}

fn check_read_key_size(key: &[u8], config: &KvConfig) -> Result<(), KvError> {
  if key.len() > config.max_read_key_size_bytes {
    Err(KvError::KeyTooLargeToRead(config.max_read_key_size_bytes))
  } else {
    Ok(())
  }
}

fn check_write_key_size(
  key: &[u8],
  config: &KvConfig,
) -> Result<usize, KvError> {
  if key.len() > config.max_write_key_size_bytes {
    Err(KvError::KeyTooLargeToWrite(config.max_write_key_size_bytes))
  } else {
    Ok(key.len())
  }
}

fn check_value_size(
  value: &KvValue,
  config: &KvConfig,
) -> Result<usize, KvError> {
  let payload = match value {
    KvValue::Bytes(x) => x,
    KvValue::V8(x) => x,
    KvValue::U64(_) => return Ok(8),
  };

  if payload.len() > config.max_value_size_bytes {
    Err(KvError::ValueTooLarge(config.max_value_size_bytes))
  } else {
    Ok(payload.len())
  }
}

fn check_enqueue_payload_size(
  payload: &[u8],
  config: &KvConfig,
) -> Result<usize, KvError> {
  if payload.len() > config.max_value_size_bytes {
    Err(KvError::EnqueuePayloadTooLarge(config.max_value_size_bytes))
  } else {
    Ok(payload.len())
  }
}