// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.

use std::collections::LinkedList;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use deno_core::futures::task::AtomicWaker;
use deno_core::futures::Future;
use deno_core::parking_lot::Mutex;

/// Simplifies the use of an atomic boolean as a flag.
#[derive(Debug, Default)]
pub struct AtomicFlag(AtomicBool);

impl AtomicFlag {
  /// Raises the flag returning if the raise was successful.
  pub fn raise(&self) -> bool {
    !self.0.swap(true, Ordering::SeqCst)
  }

  /// Gets if the flag is raised.
  pub fn is_raised(&self) -> bool {
    self.0.load(Ordering::SeqCst)
  }
}

#[derive(Debug, Default)]
struct TaskQueueTaskItem {
  is_ready: AtomicFlag,
  is_future_dropped: AtomicFlag,
  waker: AtomicWaker,
}

#[derive(Debug, Default)]
struct TaskQueueTasks {
  is_running: bool,
  items: LinkedList<Arc<TaskQueueTaskItem>>,
}

/// A queue that executes tasks sequentially one after the other
/// ensuring order and that no task runs at the same time as another.
///
/// Note that this differs from tokio's semaphore in that the order
/// is acquired synchronously.
#[derive(Debug, Default)]
pub struct TaskQueue {
  tasks: Mutex<TaskQueueTasks>,
}

impl TaskQueue {
  /// Acquires a permit where the tasks are executed one at a time
  /// and in the order that they were acquired.
  pub fn acquire(&self) -> TaskQueuePermitAcquireFuture {
    TaskQueuePermitAcquireFuture::new(self)
  }

  /// Alternate API that acquires a permit internally
  /// for the duration of the future.
  #[allow(unused)]
  pub fn run<'a, R>(
    &'a self,
    future: impl Future<Output = R> + 'a,
  ) -> impl Future<Output = R> + 'a {
    let acquire_future = self.acquire();
    async move {
      let permit = acquire_future.await;
      let result = future.await;
      drop(permit); // explicit for clarity
      result
    }
  }

  fn raise_next(&self) {
    let front_item = {
      let mut tasks = self.tasks.lock();

      // clear out any wakers for futures that were dropped
      while let Some(front_waker) = tasks.items.front() {
        if front_waker.is_future_dropped.is_raised() {
          tasks.items.pop_front();
        } else {
          break;
        }
      }
      let front_item = tasks.items.pop_front();
      tasks.is_running = front_item.is_some();
      front_item
    };

    // wake up the next waker
    if let Some(front_item) = front_item {
      front_item.is_ready.raise();
      front_item.waker.wake();
    }
  }
}

/// A permit that when dropped will allow another task to proceed.
pub struct TaskQueuePermit<'a>(&'a TaskQueue);

impl<'a> Drop for TaskQueuePermit<'a> {
  fn drop(&mut self) {
    self.0.raise_next();
  }
}

pub struct TaskQueuePermitAcquireFuture<'a> {
  task_queue: Option<&'a TaskQueue>,
  item: Arc<TaskQueueTaskItem>,
}

impl<'a> TaskQueuePermitAcquireFuture<'a> {
  pub fn new(task_queue: &'a TaskQueue) -> Self {
    // acquire the waker position synchronously
    let mut tasks = task_queue.tasks.lock();
    let item = if !tasks.is_running {
      tasks.is_running = true;
      let item = Arc::new(TaskQueueTaskItem::default());
      item.is_ready.raise();
      item
    } else {
      let item = Arc::new(TaskQueueTaskItem::default());
      tasks.items.push_back(item.clone());
      item
    };
    drop(tasks);
    Self {
      task_queue: Some(task_queue),
      item,
    }
  }
}

impl<'a> Drop for TaskQueuePermitAcquireFuture<'a> {
  fn drop(&mut self) {
    if let Some(task_queue) = self.task_queue.take() {
      if self.item.is_ready.is_raised() {
        task_queue.raise_next();
      } else {
        self.item.is_future_dropped.raise();
      }
    }
  }
}

impl<'a> Future for TaskQueuePermitAcquireFuture<'a> {
  type Output = TaskQueuePermit<'a>;

  fn poll(
    mut self: std::pin::Pin<&mut Self>,
    cx: &mut std::task::Context<'_>,
  ) -> std::task::Poll<Self::Output> {
    if self.item.is_ready.is_raised() {
      std::task::Poll::Ready(TaskQueuePermit(self.task_queue.take().unwrap()))
    } else {
      self.item.waker.register(cx.waker());
      std::task::Poll::Pending
    }
  }
}

#[cfg(test)]
mod test {
  use deno_core::futures;
  use deno_core::parking_lot::Mutex;
  use std::sync::Arc;

  use super::*;

  #[test]
  fn atomic_flag_raises() {
    let flag = AtomicFlag::default();
    assert!(!flag.is_raised()); // false by default
    assert!(flag.raise());
    assert!(flag.is_raised());
    assert!(!flag.raise());
    assert!(flag.is_raised());
  }

  #[tokio::test]
  async fn task_queue_runs_one_after_other() {
    let task_queue = TaskQueue::default();
    let mut tasks = Vec::new();
    let data = Arc::new(Mutex::new(0));
    for i in 0..100 {
      let data = data.clone();
      tasks.push(task_queue.run(async move {
        deno_core::unsync::spawn_blocking(move || {
          let mut data = data.lock();
          assert_eq!(*data, i);
          *data = i + 1;
        })
        .await
        .unwrap();
      }));
    }
    futures::future::join_all(tasks).await;
  }

  #[tokio::test]
  async fn task_queue_run_in_sequence() {
    let task_queue = TaskQueue::default();
    let data = Arc::new(Mutex::new(0));

    let first = task_queue.run(async {
      *data.lock() = 1;
    });
    let second = task_queue.run(async {
      assert_eq!(*data.lock(), 1);
      *data.lock() = 2;
    });
    let _ = tokio::join!(first, second);

    assert_eq!(*data.lock(), 2);
  }

  #[tokio::test]
  async fn task_queue_future_dropped_before_poll() {
    let task_queue = Arc::new(TaskQueue::default());

    // acquire a future, but do not await it
    let future = task_queue.acquire();

    // this task tries to acquire another permit, but will be blocked by the first permit.
    let enter_flag = Arc::new(AtomicFlag::default());
    let delayed_task = deno_core::unsync::spawn({
      let enter_flag = enter_flag.clone();
      let task_queue = task_queue.clone();
      async move {
        enter_flag.raise();
        task_queue.acquire().await;
        true
      }
    });

    // ensure the task gets a chance to be scheduled and blocked
    tokio::task::yield_now().await;
    assert!(enter_flag.is_raised());

    // now, drop the first future
    drop(future);

    assert!(delayed_task.await.unwrap());
  }

  #[tokio::test]
  async fn task_queue_many_future_dropped_before_poll() {
    let task_queue = Arc::new(TaskQueue::default());

    // acquire a future, but do not await it
    let mut futures = Vec::new();
    for _ in 0..=10_000 {
      futures.push(task_queue.acquire());
    }

    // this task tries to acquire another permit, but will be blocked by the first permit.
    let enter_flag = Arc::new(AtomicFlag::default());
    let delayed_task = deno_core::unsync::spawn({
      let task_queue = task_queue.clone();
      let enter_flag = enter_flag.clone();
      async move {
        enter_flag.raise();
        task_queue.acquire().await;
        true
      }
    });

    // ensure the task gets a chance to be scheduled and blocked
    tokio::task::yield_now().await;
    assert!(enter_flag.is_raised());

    // now, drop the futures
    drop(futures);

    assert!(delayed_task.await.unwrap());
  }

  #[tokio::test]
  async fn task_queue_middle_future_dropped_while_permit_acquired() {
    let task_queue = TaskQueue::default();

    let fut1 = task_queue.acquire();
    let fut2 = task_queue.acquire();
    let fut3 = task_queue.acquire();

    // should not hang
    drop(fut2);
    drop(fut1.await);
    drop(fut3.await);
  }
}