wvwwvwwv / scalable-concurrent-containers

High performance containers and utilities for concurrent and asynchronous programming
Apache License 2.0
329 stars 16 forks source link

Update documents for HashIndex, TreeIndex, Queue, Bag about the condition when contained entries are dropped / enhance the drop timing #150

Closed beckend closed 3 months ago

beckend commented 3 months ago
use std::{sync::{atomic::{AtomicUsize, Ordering}, Arc}, time::Duration};
use color_eyre::eyre::{eyre, Result};
use once_cell::sync::Lazy;
use scc::HashIndex;

static CELL: Lazy<AtomicUsize> = Lazy::new(AtomicUsize::default);

struct Test{}

impl Drop for Test {
  fn drop(&mut self) {
    CELL.fetch_add(1, Ordering::SeqCst);
  }
}

#[tokio::main]
async fn main() -> Result<()> {
  color_eyre::install()?;

  drop(Test{});
  assert_eq!(CELL.load(Ordering::Acquire), 1);

  let map_index = HashIndex::new();
  let key = 0_usize;

  map_index
    .insert(key, Arc::new(Test{}))
    .map_err(|_| eyre!("Failed to insert"))?;
  map_index.remove_async(&key).await;
  assert!(!map_index.contains(&key));

  tokio::time::sleep(Duration::from_secs(5)).await;
  assert_eq!(CELL.load(Ordering::Acquire), 2);

  Ok(())
}
p-0190b042-81e4-7091-bb84-3f97fcf2c7a0/src/main.rs"
   Compiling p-0190b042-81e4-7091-bb84-3f97fcf2c7a0 v0.1.0 (/tmp/rust-playgrounds/p-0190b042-81e4-7091-bb84-3f97fcf2c7a0)
    Finished `dev` profile [unoptimized + debuginfo] target(s) in 0.32s
     Running `target/debug/p-0190b042-81e4-7091-bb84-3f97fcf2c7a0`
The application panicked (crashed).
Message:  assertion `left == right` failed
  left: 1
 right: 2
Location: src/main.rs:33
# Cargo.toml
[package]
name = "p-0190b042-81e4-7091-bb84-3f97fcf2c7a0"
version = "0.1.0"
edition = "2021"
publish = false

[dependencies]
once_cell = "*"
scc = "*"

[dependencies.color-eyre]
version = "*"

[dependencies.tokio]
version = "*"
features = ["full"]

This also applies to TreeIndex, Queue, Bag.

wvwwvwwv commented 3 months ago

Hi @beckend ! They will be 'eventually' dropped - if you put

for _ in 0..256 {
    drop(scc::ebr::Guard::new());
}

right after the HashIndex is cleared, the test will be passed.

beckend commented 3 months ago

Ok, but why doesn't HashMap and HashCache have this problem?

beckend commented 3 months ago

This also means that every time somebody uses HashIndex, TreeIndex, Queue, Bag. They have to manually implement Drop and use

for _ in 0..256 {
    drop(scc::ebr::Guard::new());
}
wvwwvwwv commented 3 months ago

It's a cost to pay for being lock-free unfortunately. HashMap and HashCache drop the instance immediately because they don't have to care about potential readers when removing entries.

The basic assumption is Guard will be constantly used in the program; that's usually true for most database systems where those data structures were originally aimed at.

beckend commented 3 months ago

This needs to be documented I guess.

wvwwvwwv commented 3 months ago

I agree! I'll update the doc and publish a new version of this crate soon.

beckend commented 3 months ago

I just tried it a bit more, for 0..256 is sometimes insufficient, I had to try 1024 to make it work with other occasions.

beckend commented 3 months ago

It's actually very random, sometimes 1024 is not enough.

wvwwvwwv commented 3 months ago

This also depends on what other threads are doing. It’s deterministic only when the program is dingle threaded.

wvwwvwwv commented 3 months ago
beckend commented 3 months ago

I ended up doing this, I need to make sure my database contained within the HashIndex to be running cleanup and release the filesystem.

use once_cell::sync::Lazy;
pub use scc;
use scc::{ebr::Guard, HashMap};
use std::{
  ops::{Deref, DerefMut},
  thread::yield_now,
};
use std::time::{Duration, SystemTime, SystemTimeError, UNIX_EPOCH};

/// Allows to execute a callback with the duration since the UNIX epoch.
pub fn with_duration_since_epoch<R>(
  callback: impl FnOnce(Duration) -> Result<R, SystemTimeError>,
) -> Result<R, SystemTimeError> {
  callback(SystemTime::now().duration_since(UNIX_EPOCH)?)
}

/// Map to keep track of what IDs have been reclaimed.
static MAP_RECLAIM: Lazy<HashMap<u128, bool>> = Lazy::new(Default::default);

#[derive(thiserror::Error, Debug, Clone)]
/// All errors.
pub enum Errors {
  #[error("{0}")]
  /// Represents an error when a value should be infallible but might not be.
  Generic(String),
  /// Transparent from `std::time::SystemTimeError`
  #[error(transparent)]
  SystemTime(#[from] std::time::SystemTimeError),
}

#[allow(clippy::needless_pass_by_value)]
/// Map to data `Generic` error.
pub fn map_err_generic<T: ToString>(x: T) -> Errors {
  Errors::Generic(x.to_string())
}

/// [#issue 150](https://github.com/wvwwvwwv/scalable-concurrent-containers/issues/150)
/// Reclaim the EBR guard to trigger drop.
pub fn trigger_reclaim() {
  for _ in 0..256 {
    drop(Guard::new());
  }
}

/// Reclaim the EBR guard to trigger drop. Contains the key.
#[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Hash)]
pub struct Reclaimer(u128);

impl Drop for Reclaimer {
  fn drop(&mut self) {
    while MAP_RECLAIM.get(&self.0).is_some() {
      trigger_reclaim();
      yield_now();
    }
  }
}

/// Reclaim the EBR guard to trigger drop.
#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Hash)]
pub struct ContainerReclaim<T> {
  /// Inner value.
  inner: T,
  /// ID to reclaim.
  id: u128,
}

impl<T> Deref for ContainerReclaim<T> {
  type Target = T;

  fn deref(&self) -> &Self::Target {
    &self.inner
  }
}

impl<T> DerefMut for ContainerReclaim<T> {
  fn deref_mut(&mut self) -> &mut Self::Target {
    &mut self.inner
  }
}

impl<T> Drop for ContainerReclaim<T> {
  fn drop(&mut self) {
    MAP_RECLAIM.remove(&self.id).expect("MAP_RECLAIM.insert()");
  }
}

impl<T> ContainerReclaim<T> {
  /// Create a new instance.
  /// The fields of a struct are dropped in declaration order.
  /// The fields of the active enum variant are dropped in declaration order.
  /// The fields of a tuple are dropped in order.
  /// The elements of an array or owned slice are dropped from the first element to the last.
  pub fn new(inner: T) -> Result<(Reclaimer, Self), Errors> {
    let mut id;
    {
      loop {
        id = with_duration_since_epoch(|x| Ok(x.as_nanos()))?;

        if MAP_RECLAIM.contains(&id) {
          yield_now();
          continue;
        }

        MAP_RECLAIM
          .insert(id, true)
          .map_err(|_| map_err_generic("MAP_RECLAIM.insert()"))?;
        break;
      }
    }

    // Return order is significant. Drop order is right to left of tuples.
    Ok((Reclaimer(id), Self { inner, id }))
  }
}
#[cfg(test)]
mod test {
  use super::*;
  use scc::HashIndex;

  #[test]
  fn test_new_reclaimed() -> Result<(), Errors> {
    let map = HashIndex::new();
    let (reclaimer, container) = ContainerReclaim::new(0_usize)?;
    let id = container.id;

    assert!(MAP_RECLAIM.get(&id).is_some());

    map
      .insert(0_usize, container)
      .map_err(|_| map_err_generic("map.insert"))?;

    drop(map);
    assert!(MAP_RECLAIM.get(&id).is_some());

    drop(reclaimer);
    assert!(MAP_RECLAIM.get(&id).is_none());

    Ok(())
  }
}
wvwwvwwv commented 3 months ago

Minor: if you know that a specific key is removed, but has yet to be dropped, repeatedly calling HashIndex::get(&key) until the key-value is dropped would be now more efficient since SCC 2.1.4.

wvwwvwwv commented 3 months ago

Forgot to mention that the epoch may not advance if there is no garbage to collect! I’ll also provide a forced epoch advance method through the SDD crate: ETA in two days

beckend commented 3 months ago

Thank you.