add rustfmt.toml, format entire codebase
Signed-off-by: strawberry <strawberry@puppygock.gay>
This commit is contained in:
parent
9fd521f041
commit
f419c64aca
144 changed files with 25573 additions and 31053 deletions
|
@ -1,293 +1,265 @@
|
|||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
sync::{Arc, RwLock},
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
sync::{Arc, RwLock},
|
||||
};
|
||||
|
||||
use rocksdb::LogLevel::{Debug, Error, Fatal, Info, Warn};
|
||||
use tracing::{debug, info};
|
||||
|
||||
use super::{super::Config, watchers::Watchers, KeyValueDatabaseEngine, KvTree};
|
||||
use crate::{utils, Result};
|
||||
|
||||
use super::{super::Config, watchers::Watchers, KeyValueDatabaseEngine, KvTree};
|
||||
|
||||
pub(crate) struct Engine {
|
||||
rocks: rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>,
|
||||
cache: rocksdb::Cache,
|
||||
old_cfs: Vec<String>,
|
||||
config: Config,
|
||||
rocks: rocksdb::DBWithThreadMode<rocksdb::MultiThreaded>,
|
||||
cache: rocksdb::Cache,
|
||||
old_cfs: Vec<String>,
|
||||
config: Config,
|
||||
}
|
||||
|
||||
struct RocksDbEngineTree<'a> {
|
||||
db: Arc<Engine>,
|
||||
name: &'a str,
|
||||
watchers: Watchers,
|
||||
write_lock: RwLock<()>,
|
||||
db: Arc<Engine>,
|
||||
name: &'a str,
|
||||
watchers: Watchers,
|
||||
write_lock: RwLock<()>,
|
||||
}
|
||||
|
||||
fn db_options(rocksdb_cache: &rocksdb::Cache, config: &Config) -> rocksdb::Options {
|
||||
// block-based options: https://docs.rs/rocksdb/latest/rocksdb/struct.BlockBasedOptions.html#
|
||||
let mut block_based_options = rocksdb::BlockBasedOptions::default();
|
||||
// block-based options: https://docs.rs/rocksdb/latest/rocksdb/struct.BlockBasedOptions.html#
|
||||
let mut block_based_options = rocksdb::BlockBasedOptions::default();
|
||||
|
||||
block_based_options.set_block_cache(rocksdb_cache);
|
||||
block_based_options.set_block_cache(rocksdb_cache);
|
||||
|
||||
// "Difference of spinning disk"
|
||||
// https://zhangyuchi.gitbooks.io/rocksdbbook/content/RocksDB-Tuning-Guide.html
|
||||
block_based_options.set_block_size(64 * 1024);
|
||||
block_based_options.set_cache_index_and_filter_blocks(true);
|
||||
// "Difference of spinning disk"
|
||||
// https://zhangyuchi.gitbooks.io/rocksdbbook/content/RocksDB-Tuning-Guide.html
|
||||
block_based_options.set_block_size(64 * 1024);
|
||||
block_based_options.set_cache_index_and_filter_blocks(true);
|
||||
|
||||
// database options: https://docs.rs/rocksdb/latest/rocksdb/struct.Options.html#
|
||||
let mut db_opts = rocksdb::Options::default();
|
||||
// database options: https://docs.rs/rocksdb/latest/rocksdb/struct.Options.html#
|
||||
let mut db_opts = rocksdb::Options::default();
|
||||
|
||||
let rocksdb_log_level = match config.rocksdb_log_level.as_ref() {
|
||||
"debug" => Debug,
|
||||
"info" => Info,
|
||||
"error" => Error,
|
||||
"fatal" => Fatal,
|
||||
_ => Warn,
|
||||
};
|
||||
let rocksdb_log_level = match config.rocksdb_log_level.as_ref() {
|
||||
"debug" => Debug,
|
||||
"info" => Info,
|
||||
"error" => Error,
|
||||
"fatal" => Fatal,
|
||||
_ => Warn,
|
||||
};
|
||||
|
||||
let threads = if config.rocksdb_parallelism_threads == 0 {
|
||||
num_cpus::get_physical() // max cores if user specified 0
|
||||
} else {
|
||||
config.rocksdb_parallelism_threads
|
||||
};
|
||||
let threads = if config.rocksdb_parallelism_threads == 0 {
|
||||
num_cpus::get_physical() // max cores if user specified 0
|
||||
} else {
|
||||
config.rocksdb_parallelism_threads
|
||||
};
|
||||
|
||||
db_opts.set_log_level(rocksdb_log_level);
|
||||
db_opts.set_max_log_file_size(config.rocksdb_max_log_file_size);
|
||||
db_opts.set_log_file_time_to_roll(config.rocksdb_log_time_to_roll);
|
||||
db_opts.set_log_level(rocksdb_log_level);
|
||||
db_opts.set_max_log_file_size(config.rocksdb_max_log_file_size);
|
||||
db_opts.set_log_file_time_to_roll(config.rocksdb_log_time_to_roll);
|
||||
|
||||
if config.rocksdb_optimize_for_spinning_disks {
|
||||
db_opts.set_skip_stats_update_on_db_open(true);
|
||||
db_opts.set_compaction_readahead_size(2 * 1024 * 1024); // default compaction_readahead_size is 0 which is good for SSDs
|
||||
db_opts.set_target_file_size_base(256 * 1024 * 1024); // default target_file_size is 64MB which is good for SSDs
|
||||
db_opts.set_optimize_filters_for_hits(true); // doesn't really seem useful for fast storage
|
||||
db_opts.set_keep_log_file_num(3); // keep as few LOG files as possible for spinning hard drives. these are not really important
|
||||
} else {
|
||||
db_opts.set_skip_stats_update_on_db_open(false);
|
||||
db_opts.set_max_bytes_for_level_base(512 * 1024 * 1024);
|
||||
db_opts.set_use_direct_reads(true);
|
||||
db_opts.set_use_direct_io_for_flush_and_compaction(true);
|
||||
db_opts.set_keep_log_file_num(20);
|
||||
}
|
||||
if config.rocksdb_optimize_for_spinning_disks {
|
||||
db_opts.set_skip_stats_update_on_db_open(true);
|
||||
db_opts.set_compaction_readahead_size(2 * 1024 * 1024); // default compaction_readahead_size is 0 which is good for SSDs
|
||||
db_opts.set_target_file_size_base(256 * 1024 * 1024); // default target_file_size is 64MB which is good for SSDs
|
||||
db_opts.set_optimize_filters_for_hits(true); // doesn't really seem useful for fast storage
|
||||
db_opts.set_keep_log_file_num(3); // keep as few LOG files as possible for
|
||||
// spinning hard drives. these are not really
|
||||
// important
|
||||
} else {
|
||||
db_opts.set_skip_stats_update_on_db_open(false);
|
||||
db_opts.set_max_bytes_for_level_base(512 * 1024 * 1024);
|
||||
db_opts.set_use_direct_reads(true);
|
||||
db_opts.set_use_direct_io_for_flush_and_compaction(true);
|
||||
db_opts.set_keep_log_file_num(20);
|
||||
}
|
||||
|
||||
db_opts.set_block_based_table_factory(&block_based_options);
|
||||
db_opts.set_level_compaction_dynamic_level_bytes(true);
|
||||
db_opts.create_if_missing(true);
|
||||
db_opts.increase_parallelism(
|
||||
threads
|
||||
.try_into()
|
||||
.expect("Failed to convert \"rocksdb_parallelism_threads\" usize into i32"),
|
||||
);
|
||||
//db_opts.set_max_open_files(config.rocksdb_max_open_files);
|
||||
db_opts.set_compression_type(rocksdb::DBCompressionType::Zstd);
|
||||
db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
|
||||
db_opts.optimize_level_style_compaction(10 * 1024 * 1024);
|
||||
db_opts.set_block_based_table_factory(&block_based_options);
|
||||
db_opts.set_level_compaction_dynamic_level_bytes(true);
|
||||
db_opts.create_if_missing(true);
|
||||
db_opts.increase_parallelism(
|
||||
threads.try_into().expect("Failed to convert \"rocksdb_parallelism_threads\" usize into i32"),
|
||||
);
|
||||
//db_opts.set_max_open_files(config.rocksdb_max_open_files);
|
||||
db_opts.set_compression_type(rocksdb::DBCompressionType::Zstd);
|
||||
db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
|
||||
db_opts.optimize_level_style_compaction(10 * 1024 * 1024);
|
||||
|
||||
// https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
|
||||
db_opts.set_max_background_jobs(6);
|
||||
db_opts.set_bytes_per_sync(1_048_576);
|
||||
// https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning
|
||||
db_opts.set_max_background_jobs(6);
|
||||
db_opts.set_bytes_per_sync(1_048_576);
|
||||
|
||||
// https://github.com/facebook/rocksdb/wiki/WAL-Recovery-Modes#ktoleratecorruptedtailrecords
|
||||
//
|
||||
// Unclean shutdowns of a Matrix homeserver are likely to be fine when
|
||||
// recovered in this manner as it's likely any lost information will be
|
||||
// restored via federation.
|
||||
db_opts.set_wal_recovery_mode(rocksdb::DBRecoveryMode::TolerateCorruptedTailRecords);
|
||||
// https://github.com/facebook/rocksdb/wiki/WAL-Recovery-Modes#ktoleratecorruptedtailrecords
|
||||
//
|
||||
// Unclean shutdowns of a Matrix homeserver are likely to be fine when
|
||||
// recovered in this manner as it's likely any lost information will be
|
||||
// restored via federation.
|
||||
db_opts.set_wal_recovery_mode(rocksdb::DBRecoveryMode::TolerateCorruptedTailRecords);
|
||||
|
||||
let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1);
|
||||
db_opts.set_prefix_extractor(prefix_extractor);
|
||||
let prefix_extractor = rocksdb::SliceTransform::create_fixed_prefix(1);
|
||||
db_opts.set_prefix_extractor(prefix_extractor);
|
||||
|
||||
db_opts
|
||||
db_opts
|
||||
}
|
||||
|
||||
impl KeyValueDatabaseEngine for Arc<Engine> {
|
||||
fn open(config: &Config) -> Result<Self> {
|
||||
let cache_capacity_bytes = (config.db_cache_capacity_mb * 1024.0 * 1024.0) as usize;
|
||||
let rocksdb_cache = rocksdb::Cache::new_lru_cache(cache_capacity_bytes);
|
||||
fn open(config: &Config) -> Result<Self> {
|
||||
let cache_capacity_bytes = (config.db_cache_capacity_mb * 1024.0 * 1024.0) as usize;
|
||||
let rocksdb_cache = rocksdb::Cache::new_lru_cache(cache_capacity_bytes);
|
||||
|
||||
let db_opts = db_options(&rocksdb_cache, config);
|
||||
let db_opts = db_options(&rocksdb_cache, config);
|
||||
|
||||
debug!("Listing column families in database");
|
||||
let cfs = rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(
|
||||
&db_opts,
|
||||
&config.database_path,
|
||||
)
|
||||
.unwrap_or_default();
|
||||
debug!("Listing column families in database");
|
||||
let cfs = rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::list_cf(&db_opts, &config.database_path)
|
||||
.unwrap_or_default();
|
||||
|
||||
debug!("Opening column family descriptors in database");
|
||||
info!("RocksDB database compaction will take place now, a delay in startup is expected");
|
||||
let db = rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::open_cf_descriptors(
|
||||
&db_opts,
|
||||
&config.database_path,
|
||||
cfs.iter().map(|name| {
|
||||
rocksdb::ColumnFamilyDescriptor::new(name, db_options(&rocksdb_cache, config))
|
||||
}),
|
||||
)?;
|
||||
debug!("Opening column family descriptors in database");
|
||||
info!("RocksDB database compaction will take place now, a delay in startup is expected");
|
||||
let db = rocksdb::DBWithThreadMode::<rocksdb::MultiThreaded>::open_cf_descriptors(
|
||||
&db_opts,
|
||||
&config.database_path,
|
||||
cfs.iter().map(|name| rocksdb::ColumnFamilyDescriptor::new(name, db_options(&rocksdb_cache, config))),
|
||||
)?;
|
||||
|
||||
Ok(Arc::new(Engine {
|
||||
rocks: db,
|
||||
cache: rocksdb_cache,
|
||||
old_cfs: cfs,
|
||||
config: config.clone(),
|
||||
}))
|
||||
}
|
||||
Ok(Arc::new(Engine {
|
||||
rocks: db,
|
||||
cache: rocksdb_cache,
|
||||
old_cfs: cfs,
|
||||
config: config.clone(),
|
||||
}))
|
||||
}
|
||||
|
||||
fn open_tree(&self, name: &'static str) -> Result<Arc<dyn KvTree>> {
|
||||
if !self.old_cfs.contains(&name.to_owned()) {
|
||||
// Create if it didn't exist
|
||||
debug!("Creating new column family in database: {}", name);
|
||||
let _ = self
|
||||
.rocks
|
||||
.create_cf(name, &db_options(&self.cache, &self.config));
|
||||
}
|
||||
fn open_tree(&self, name: &'static str) -> Result<Arc<dyn KvTree>> {
|
||||
if !self.old_cfs.contains(&name.to_owned()) {
|
||||
// Create if it didn't exist
|
||||
debug!("Creating new column family in database: {}", name);
|
||||
let _ = self.rocks.create_cf(name, &db_options(&self.cache, &self.config));
|
||||
}
|
||||
|
||||
Ok(Arc::new(RocksDbEngineTree {
|
||||
name,
|
||||
db: Arc::clone(self),
|
||||
watchers: Watchers::default(),
|
||||
write_lock: RwLock::new(()),
|
||||
}))
|
||||
}
|
||||
Ok(Arc::new(RocksDbEngineTree {
|
||||
name,
|
||||
db: Arc::clone(self),
|
||||
watchers: Watchers::default(),
|
||||
write_lock: RwLock::new(()),
|
||||
}))
|
||||
}
|
||||
|
||||
fn flush(&self) -> Result<()> {
|
||||
// TODO?
|
||||
Ok(())
|
||||
}
|
||||
fn flush(&self) -> Result<()> {
|
||||
// TODO?
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn memory_usage(&self) -> Result<String> {
|
||||
let stats =
|
||||
rocksdb::perf::get_memory_usage_stats(Some(&[&self.rocks]), Some(&[&self.cache]))?;
|
||||
Ok(format!(
|
||||
"Approximate memory usage of all the mem-tables: {:.3} MB\n\
|
||||
Approximate memory usage of un-flushed mem-tables: {:.3} MB\n\
|
||||
Approximate memory usage of all the table readers: {:.3} MB\n\
|
||||
Approximate memory usage by cache: {:.3} MB\n\
|
||||
Approximate memory usage by cache pinned: {:.3} MB\n\
|
||||
",
|
||||
stats.mem_table_total as f64 / 1024.0 / 1024.0,
|
||||
stats.mem_table_unflushed as f64 / 1024.0 / 1024.0,
|
||||
stats.mem_table_readers_total as f64 / 1024.0 / 1024.0,
|
||||
stats.cache_total as f64 / 1024.0 / 1024.0,
|
||||
self.cache.get_pinned_usage() as f64 / 1024.0 / 1024.0,
|
||||
))
|
||||
}
|
||||
fn memory_usage(&self) -> Result<String> {
|
||||
let stats = rocksdb::perf::get_memory_usage_stats(Some(&[&self.rocks]), Some(&[&self.cache]))?;
|
||||
Ok(format!(
|
||||
"Approximate memory usage of all the mem-tables: {:.3} MB\nApproximate memory usage of un-flushed \
|
||||
mem-tables: {:.3} MB\nApproximate memory usage of all the table readers: {:.3} MB\nApproximate memory \
|
||||
usage by cache: {:.3} MB\nApproximate memory usage by cache pinned: {:.3} MB\n",
|
||||
stats.mem_table_total as f64 / 1024.0 / 1024.0,
|
||||
stats.mem_table_unflushed as f64 / 1024.0 / 1024.0,
|
||||
stats.mem_table_readers_total as f64 / 1024.0 / 1024.0,
|
||||
stats.cache_total as f64 / 1024.0 / 1024.0,
|
||||
self.cache.get_pinned_usage() as f64 / 1024.0 / 1024.0,
|
||||
))
|
||||
}
|
||||
|
||||
// TODO: figure out if this is needed for rocksdb
|
||||
#[allow(dead_code)]
|
||||
fn clear_caches(&self) {}
|
||||
// TODO: figure out if this is needed for rocksdb
|
||||
#[allow(dead_code)]
|
||||
fn clear_caches(&self) {}
|
||||
}
|
||||
|
||||
impl RocksDbEngineTree<'_> {
|
||||
fn cf(&self) -> Arc<rocksdb::BoundColumnFamily<'_>> {
|
||||
self.db.rocks.cf_handle(self.name).unwrap()
|
||||
}
|
||||
fn cf(&self) -> Arc<rocksdb::BoundColumnFamily<'_>> { self.db.rocks.cf_handle(self.name).unwrap() }
|
||||
}
|
||||
|
||||
impl KvTree for RocksDbEngineTree<'_> {
|
||||
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
|
||||
Ok(self.db.rocks.get_cf(&self.cf(), key)?)
|
||||
}
|
||||
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> { Ok(self.db.rocks.get_cf(&self.cf(), key)?) }
|
||||
|
||||
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
|
||||
let lock = self.write_lock.read().unwrap();
|
||||
self.db.rocks.put_cf(&self.cf(), key, value)?;
|
||||
drop(lock);
|
||||
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
|
||||
let lock = self.write_lock.read().unwrap();
|
||||
self.db.rocks.put_cf(&self.cf(), key, value)?;
|
||||
drop(lock);
|
||||
|
||||
self.watchers.wake(key);
|
||||
self.watchers.wake(key);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn insert_batch(&self, iter: &mut dyn Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<()> {
|
||||
for (key, value) in iter {
|
||||
self.db.rocks.put_cf(&self.cf(), key, value)?;
|
||||
}
|
||||
fn insert_batch(&self, iter: &mut dyn Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<()> {
|
||||
for (key, value) in iter {
|
||||
self.db.rocks.put_cf(&self.cf(), key, value)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove(&self, key: &[u8]) -> Result<()> {
|
||||
Ok(self.db.rocks.delete_cf(&self.cf(), key)?)
|
||||
}
|
||||
fn remove(&self, key: &[u8]) -> Result<()> { Ok(self.db.rocks.delete_cf(&self.cf(), key)?) }
|
||||
|
||||
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
|
||||
Box::new(
|
||||
self.db
|
||||
.rocks
|
||||
.iterator_cf(&self.cf(), rocksdb::IteratorMode::Start)
|
||||
.map(std::result::Result::unwrap)
|
||||
.map(|(k, v)| (Vec::from(k), Vec::from(v))),
|
||||
)
|
||||
}
|
||||
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
|
||||
Box::new(
|
||||
self.db
|
||||
.rocks
|
||||
.iterator_cf(&self.cf(), rocksdb::IteratorMode::Start)
|
||||
.map(std::result::Result::unwrap)
|
||||
.map(|(k, v)| (Vec::from(k), Vec::from(v))),
|
||||
)
|
||||
}
|
||||
|
||||
fn iter_from<'a>(
|
||||
&'a self,
|
||||
from: &[u8],
|
||||
backwards: bool,
|
||||
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
|
||||
Box::new(
|
||||
self.db
|
||||
.rocks
|
||||
.iterator_cf(
|
||||
&self.cf(),
|
||||
rocksdb::IteratorMode::From(
|
||||
from,
|
||||
if backwards {
|
||||
rocksdb::Direction::Reverse
|
||||
} else {
|
||||
rocksdb::Direction::Forward
|
||||
},
|
||||
),
|
||||
)
|
||||
.map(std::result::Result::unwrap)
|
||||
.map(|(k, v)| (Vec::from(k), Vec::from(v))),
|
||||
)
|
||||
}
|
||||
fn iter_from<'a>(&'a self, from: &[u8], backwards: bool) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
|
||||
Box::new(
|
||||
self.db
|
||||
.rocks
|
||||
.iterator_cf(
|
||||
&self.cf(),
|
||||
rocksdb::IteratorMode::From(
|
||||
from,
|
||||
if backwards {
|
||||
rocksdb::Direction::Reverse
|
||||
} else {
|
||||
rocksdb::Direction::Forward
|
||||
},
|
||||
),
|
||||
)
|
||||
.map(std::result::Result::unwrap)
|
||||
.map(|(k, v)| (Vec::from(k), Vec::from(v))),
|
||||
)
|
||||
}
|
||||
|
||||
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> {
|
||||
let lock = self.write_lock.write().unwrap();
|
||||
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> {
|
||||
let lock = self.write_lock.write().unwrap();
|
||||
|
||||
let old = self.db.rocks.get_cf(&self.cf(), key)?;
|
||||
let new = utils::increment(old.as_deref()).unwrap();
|
||||
self.db.rocks.put_cf(&self.cf(), key, &new)?;
|
||||
let old = self.db.rocks.get_cf(&self.cf(), key)?;
|
||||
let new = utils::increment(old.as_deref()).unwrap();
|
||||
self.db.rocks.put_cf(&self.cf(), key, &new)?;
|
||||
|
||||
drop(lock);
|
||||
Ok(new)
|
||||
}
|
||||
drop(lock);
|
||||
Ok(new)
|
||||
}
|
||||
|
||||
fn increment_batch(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()> {
|
||||
let lock = self.write_lock.write().unwrap();
|
||||
fn increment_batch(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()> {
|
||||
let lock = self.write_lock.write().unwrap();
|
||||
|
||||
for key in iter {
|
||||
let old = self.db.rocks.get_cf(&self.cf(), &key)?;
|
||||
let new = utils::increment(old.as_deref()).unwrap();
|
||||
self.db.rocks.put_cf(&self.cf(), key, new)?;
|
||||
}
|
||||
for key in iter {
|
||||
let old = self.db.rocks.get_cf(&self.cf(), &key)?;
|
||||
let new = utils::increment(old.as_deref()).unwrap();
|
||||
self.db.rocks.put_cf(&self.cf(), key, new)?;
|
||||
}
|
||||
|
||||
drop(lock);
|
||||
drop(lock);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn scan_prefix<'a>(
|
||||
&'a self,
|
||||
prefix: Vec<u8>,
|
||||
) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
|
||||
Box::new(
|
||||
self.db
|
||||
.rocks
|
||||
.iterator_cf(
|
||||
&self.cf(),
|
||||
rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward),
|
||||
)
|
||||
.map(std::result::Result::unwrap)
|
||||
.map(|(k, v)| (Vec::from(k), Vec::from(v)))
|
||||
.take_while(move |(k, _)| k.starts_with(&prefix)),
|
||||
)
|
||||
}
|
||||
fn scan_prefix<'a>(&'a self, prefix: Vec<u8>) -> Box<dyn Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a> {
|
||||
Box::new(
|
||||
self.db
|
||||
.rocks
|
||||
.iterator_cf(&self.cf(), rocksdb::IteratorMode::From(&prefix, rocksdb::Direction::Forward))
|
||||
.map(std::result::Result::unwrap)
|
||||
.map(|(k, v)| (Vec::from(k), Vec::from(v)))
|
||||
.take_while(move |(k, _)| k.starts_with(&prefix)),
|
||||
)
|
||||
}
|
||||
|
||||
fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
|
||||
self.watchers.watch(prefix)
|
||||
}
|
||||
fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
|
||||
self.watchers.watch(prefix)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,340 +1,305 @@
|
|||
use super::{watchers::Watchers, KeyValueDatabaseEngine, KvTree};
|
||||
use crate::{database::Config, Result};
|
||||
use std::{
|
||||
cell::RefCell,
|
||||
future::Future,
|
||||
path::{Path, PathBuf},
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use parking_lot::{Mutex, MutexGuard};
|
||||
use rusqlite::{Connection, DatabaseName::Main, OptionalExtension};
|
||||
use std::{
|
||||
cell::RefCell,
|
||||
future::Future,
|
||||
path::{Path, PathBuf},
|
||||
pin::Pin,
|
||||
sync::Arc,
|
||||
};
|
||||
use thread_local::ThreadLocal;
|
||||
use tracing::debug;
|
||||
|
||||
use super::{watchers::Watchers, KeyValueDatabaseEngine, KvTree};
|
||||
use crate::{database::Config, Result};
|
||||
|
||||
thread_local! {
|
||||
static READ_CONNECTION: RefCell<Option<&'static Connection>> = RefCell::new(None);
|
||||
static READ_CONNECTION_ITERATOR: RefCell<Option<&'static Connection>> = RefCell::new(None);
|
||||
static READ_CONNECTION: RefCell<Option<&'static Connection>> = RefCell::new(None);
|
||||
static READ_CONNECTION_ITERATOR: RefCell<Option<&'static Connection>> = RefCell::new(None);
|
||||
}
|
||||
|
||||
struct PreparedStatementIterator<'a> {
|
||||
pub iterator: Box<dyn Iterator<Item = TupleOfBytes> + 'a>,
|
||||
pub _statement_ref: NonAliasingBox<rusqlite::Statement<'a>>,
|
||||
pub iterator: Box<dyn Iterator<Item = TupleOfBytes> + 'a>,
|
||||
pub _statement_ref: NonAliasingBox<rusqlite::Statement<'a>>,
|
||||
}
|
||||
|
||||
impl Iterator for PreparedStatementIterator<'_> {
|
||||
type Item = TupleOfBytes;
|
||||
type Item = TupleOfBytes;
|
||||
|
||||
fn next(&mut self) -> Option<Self::Item> {
|
||||
self.iterator.next()
|
||||
}
|
||||
fn next(&mut self) -> Option<Self::Item> { self.iterator.next() }
|
||||
}
|
||||
|
||||
struct NonAliasingBox<T>(*mut T);
|
||||
impl<T> Drop for NonAliasingBox<T> {
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
let _ = Box::from_raw(self.0);
|
||||
};
|
||||
}
|
||||
fn drop(&mut self) {
|
||||
unsafe {
|
||||
let _ = Box::from_raw(self.0);
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Engine {
|
||||
writer: Mutex<Connection>,
|
||||
read_conn_tls: ThreadLocal<Connection>,
|
||||
read_iterator_conn_tls: ThreadLocal<Connection>,
|
||||
writer: Mutex<Connection>,
|
||||
read_conn_tls: ThreadLocal<Connection>,
|
||||
read_iterator_conn_tls: ThreadLocal<Connection>,
|
||||
|
||||
path: PathBuf,
|
||||
cache_size_per_thread: u32,
|
||||
path: PathBuf,
|
||||
cache_size_per_thread: u32,
|
||||
}
|
||||
|
||||
impl Engine {
|
||||
fn prepare_conn(path: &Path, cache_size_kb: u32) -> Result<Connection> {
|
||||
let conn = Connection::open(path)?;
|
||||
fn prepare_conn(path: &Path, cache_size_kb: u32) -> Result<Connection> {
|
||||
let conn = Connection::open(path)?;
|
||||
|
||||
conn.pragma_update(Some(Main), "page_size", 2048)?;
|
||||
conn.pragma_update(Some(Main), "journal_mode", "WAL")?;
|
||||
conn.pragma_update(Some(Main), "synchronous", "NORMAL")?;
|
||||
conn.pragma_update(Some(Main), "cache_size", -i64::from(cache_size_kb))?;
|
||||
conn.pragma_update(Some(Main), "wal_autocheckpoint", 0)?;
|
||||
conn.pragma_update(Some(Main), "page_size", 2048)?;
|
||||
conn.pragma_update(Some(Main), "journal_mode", "WAL")?;
|
||||
conn.pragma_update(Some(Main), "synchronous", "NORMAL")?;
|
||||
conn.pragma_update(Some(Main), "cache_size", -i64::from(cache_size_kb))?;
|
||||
conn.pragma_update(Some(Main), "wal_autocheckpoint", 0)?;
|
||||
|
||||
Ok(conn)
|
||||
}
|
||||
Ok(conn)
|
||||
}
|
||||
|
||||
fn write_lock(&self) -> MutexGuard<'_, Connection> {
|
||||
self.writer.lock()
|
||||
}
|
||||
fn write_lock(&self) -> MutexGuard<'_, Connection> { self.writer.lock() }
|
||||
|
||||
fn read_lock(&self) -> &Connection {
|
||||
self.read_conn_tls
|
||||
.get_or(|| Self::prepare_conn(&self.path, self.cache_size_per_thread).unwrap())
|
||||
}
|
||||
fn read_lock(&self) -> &Connection {
|
||||
self.read_conn_tls.get_or(|| Self::prepare_conn(&self.path, self.cache_size_per_thread).unwrap())
|
||||
}
|
||||
|
||||
fn read_lock_iterator(&self) -> &Connection {
|
||||
self.read_iterator_conn_tls
|
||||
.get_or(|| Self::prepare_conn(&self.path, self.cache_size_per_thread).unwrap())
|
||||
}
|
||||
fn read_lock_iterator(&self) -> &Connection {
|
||||
self.read_iterator_conn_tls.get_or(|| Self::prepare_conn(&self.path, self.cache_size_per_thread).unwrap())
|
||||
}
|
||||
|
||||
pub fn flush_wal(self: &Arc<Self>) -> Result<()> {
|
||||
self.write_lock()
|
||||
.pragma_update(Some(Main), "wal_checkpoint", "RESTART")?;
|
||||
Ok(())
|
||||
}
|
||||
pub fn flush_wal(self: &Arc<Self>) -> Result<()> {
|
||||
self.write_lock().pragma_update(Some(Main), "wal_checkpoint", "RESTART")?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl KeyValueDatabaseEngine for Arc<Engine> {
|
||||
fn open(config: &Config) -> Result<Self> {
|
||||
let path = Path::new(&config.database_path).join("conduit.db");
|
||||
fn open(config: &Config) -> Result<Self> {
|
||||
let path = Path::new(&config.database_path).join("conduit.db");
|
||||
|
||||
// calculates cache-size per permanent connection
|
||||
// 1. convert MB to KiB
|
||||
// 2. divide by permanent connections + permanent iter connections + write connection
|
||||
// 3. round down to nearest integer
|
||||
let cache_size_per_thread: u32 = ((config.db_cache_capacity_mb * 1024.0)
|
||||
/ ((num_cpus::get().max(1) * 2) + 1) as f64)
|
||||
as u32;
|
||||
// calculates cache-size per permanent connection
|
||||
// 1. convert MB to KiB
|
||||
// 2. divide by permanent connections + permanent iter connections + write
|
||||
// connection
|
||||
// 3. round down to nearest integer
|
||||
let cache_size_per_thread: u32 =
|
||||
((config.db_cache_capacity_mb * 1024.0) / ((num_cpus::get().max(1) * 2) + 1) as f64) as u32;
|
||||
|
||||
let writer = Mutex::new(Engine::prepare_conn(&path, cache_size_per_thread)?);
|
||||
let writer = Mutex::new(Engine::prepare_conn(&path, cache_size_per_thread)?);
|
||||
|
||||
let arc = Arc::new(Engine {
|
||||
writer,
|
||||
read_conn_tls: ThreadLocal::new(),
|
||||
read_iterator_conn_tls: ThreadLocal::new(),
|
||||
path,
|
||||
cache_size_per_thread,
|
||||
});
|
||||
let arc = Arc::new(Engine {
|
||||
writer,
|
||||
read_conn_tls: ThreadLocal::new(),
|
||||
read_iterator_conn_tls: ThreadLocal::new(),
|
||||
path,
|
||||
cache_size_per_thread,
|
||||
});
|
||||
|
||||
Ok(arc)
|
||||
}
|
||||
Ok(arc)
|
||||
}
|
||||
|
||||
fn open_tree(&self, name: &str) -> Result<Arc<dyn KvTree>> {
|
||||
self.write_lock().execute(&format!("CREATE TABLE IF NOT EXISTS {name} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )"), [])?;
|
||||
fn open_tree(&self, name: &str) -> Result<Arc<dyn KvTree>> {
|
||||
self.write_lock().execute(
|
||||
&format!("CREATE TABLE IF NOT EXISTS {name} ( \"key\" BLOB PRIMARY KEY, \"value\" BLOB NOT NULL )"),
|
||||
[],
|
||||
)?;
|
||||
|
||||
Ok(Arc::new(SqliteTable {
|
||||
engine: Arc::clone(self),
|
||||
name: name.to_owned(),
|
||||
watchers: Watchers::default(),
|
||||
}))
|
||||
}
|
||||
Ok(Arc::new(SqliteTable {
|
||||
engine: Arc::clone(self),
|
||||
name: name.to_owned(),
|
||||
watchers: Watchers::default(),
|
||||
}))
|
||||
}
|
||||
|
||||
fn flush(&self) -> Result<()> {
|
||||
// we enabled PRAGMA synchronous=normal, so this should not be necessary
|
||||
Ok(())
|
||||
}
|
||||
fn flush(&self) -> Result<()> {
|
||||
// we enabled PRAGMA synchronous=normal, so this should not be necessary
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn cleanup(&self) -> Result<()> {
|
||||
self.flush_wal()
|
||||
}
|
||||
fn cleanup(&self) -> Result<()> { self.flush_wal() }
|
||||
}
|
||||
|
||||
pub struct SqliteTable {
|
||||
engine: Arc<Engine>,
|
||||
name: String,
|
||||
watchers: Watchers,
|
||||
engine: Arc<Engine>,
|
||||
name: String,
|
||||
watchers: Watchers,
|
||||
}
|
||||
|
||||
type TupleOfBytes = (Vec<u8>, Vec<u8>);
|
||||
|
||||
impl SqliteTable {
|
||||
fn get_with_guard(&self, guard: &Connection, key: &[u8]) -> Result<Option<Vec<u8>>> {
|
||||
Ok(guard
|
||||
.prepare(format!("SELECT value FROM {} WHERE key = ?", self.name).as_str())?
|
||||
.query_row([key], |row| row.get(0))
|
||||
.optional()?)
|
||||
}
|
||||
fn get_with_guard(&self, guard: &Connection, key: &[u8]) -> Result<Option<Vec<u8>>> {
|
||||
Ok(guard
|
||||
.prepare(format!("SELECT value FROM {} WHERE key = ?", self.name).as_str())?
|
||||
.query_row([key], |row| row.get(0))
|
||||
.optional()?)
|
||||
}
|
||||
|
||||
fn insert_with_guard(&self, guard: &Connection, key: &[u8], value: &[u8]) -> Result<()> {
|
||||
guard.execute(
|
||||
format!(
|
||||
"INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)",
|
||||
self.name
|
||||
)
|
||||
.as_str(),
|
||||
[key, value],
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
fn insert_with_guard(&self, guard: &Connection, key: &[u8], value: &[u8]) -> Result<()> {
|
||||
guard.execute(
|
||||
format!("INSERT OR REPLACE INTO {} (key, value) VALUES (?, ?)", self.name).as_str(),
|
||||
[key, value],
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn iter_with_guard<'a>(
|
||||
&'a self,
|
||||
guard: &'a Connection,
|
||||
) -> Box<dyn Iterator<Item = TupleOfBytes> + 'a> {
|
||||
let statement = Box::leak(Box::new(
|
||||
guard
|
||||
.prepare(&format!(
|
||||
"SELECT key, value FROM {} ORDER BY key ASC",
|
||||
&self.name
|
||||
))
|
||||
.unwrap(),
|
||||
));
|
||||
pub fn iter_with_guard<'a>(&'a self, guard: &'a Connection) -> Box<dyn Iterator<Item = TupleOfBytes> + 'a> {
|
||||
let statement = Box::leak(Box::new(
|
||||
guard.prepare(&format!("SELECT key, value FROM {} ORDER BY key ASC", &self.name)).unwrap(),
|
||||
));
|
||||
|
||||
let statement_ref = NonAliasingBox(statement);
|
||||
let statement_ref = NonAliasingBox(statement);
|
||||
|
||||
//let name = self.name.clone();
|
||||
//let name = self.name.clone();
|
||||
|
||||
let iterator = Box::new(
|
||||
statement
|
||||
.query_map([], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
|
||||
.unwrap()
|
||||
.map(move |r| r.unwrap()),
|
||||
);
|
||||
let iterator = Box::new(
|
||||
statement.query_map([], |row| Ok((row.get_unwrap(0), row.get_unwrap(1)))).unwrap().map(move |r| r.unwrap()),
|
||||
);
|
||||
|
||||
Box::new(PreparedStatementIterator {
|
||||
iterator,
|
||||
_statement_ref: statement_ref,
|
||||
})
|
||||
}
|
||||
Box::new(PreparedStatementIterator {
|
||||
iterator,
|
||||
_statement_ref: statement_ref,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl KvTree for SqliteTable {
|
||||
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
|
||||
self.get_with_guard(self.engine.read_lock(), key)
|
||||
}
|
||||
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> { self.get_with_guard(self.engine.read_lock(), key) }
|
||||
|
||||
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
|
||||
let guard = self.engine.write_lock();
|
||||
self.insert_with_guard(&guard, key, value)?;
|
||||
drop(guard);
|
||||
self.watchers.wake(key);
|
||||
Ok(())
|
||||
}
|
||||
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
|
||||
let guard = self.engine.write_lock();
|
||||
self.insert_with_guard(&guard, key, value)?;
|
||||
drop(guard);
|
||||
self.watchers.wake(key);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn insert_batch<'a>(&self, iter: &mut dyn Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<()> {
|
||||
let guard = self.engine.write_lock();
|
||||
fn insert_batch<'a>(&self, iter: &mut dyn Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<()> {
|
||||
let guard = self.engine.write_lock();
|
||||
|
||||
guard.execute("BEGIN", [])?;
|
||||
for (key, value) in iter {
|
||||
self.insert_with_guard(&guard, &key, &value)?;
|
||||
}
|
||||
guard.execute("COMMIT", [])?;
|
||||
guard.execute("BEGIN", [])?;
|
||||
for (key, value) in iter {
|
||||
self.insert_with_guard(&guard, &key, &value)?;
|
||||
}
|
||||
guard.execute("COMMIT", [])?;
|
||||
|
||||
drop(guard);
|
||||
drop(guard);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn increment_batch<'a>(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()> {
|
||||
let guard = self.engine.write_lock();
|
||||
fn increment_batch<'a>(&self, iter: &mut dyn Iterator<Item = Vec<u8>>) -> Result<()> {
|
||||
let guard = self.engine.write_lock();
|
||||
|
||||
guard.execute("BEGIN", [])?;
|
||||
for key in iter {
|
||||
let old = self.get_with_guard(&guard, &key)?;
|
||||
let new = crate::utils::increment(old.as_deref())
|
||||
.expect("utils::increment always returns Some");
|
||||
self.insert_with_guard(&guard, &key, &new)?;
|
||||
}
|
||||
guard.execute("COMMIT", [])?;
|
||||
guard.execute("BEGIN", [])?;
|
||||
for key in iter {
|
||||
let old = self.get_with_guard(&guard, &key)?;
|
||||
let new = crate::utils::increment(old.as_deref()).expect("utils::increment always returns Some");
|
||||
self.insert_with_guard(&guard, &key, &new)?;
|
||||
}
|
||||
guard.execute("COMMIT", [])?;
|
||||
|
||||
drop(guard);
|
||||
drop(guard);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn remove(&self, key: &[u8]) -> Result<()> {
|
||||
let guard = self.engine.write_lock();
|
||||
fn remove(&self, key: &[u8]) -> Result<()> {
|
||||
let guard = self.engine.write_lock();
|
||||
|
||||
guard.execute(
|
||||
format!("DELETE FROM {} WHERE key = ?", self.name).as_str(),
|
||||
[key],
|
||||
)?;
|
||||
guard.execute(format!("DELETE FROM {} WHERE key = ?", self.name).as_str(), [key])?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = TupleOfBytes> + 'a> {
|
||||
let guard = self.engine.read_lock_iterator();
|
||||
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = TupleOfBytes> + 'a> {
|
||||
let guard = self.engine.read_lock_iterator();
|
||||
|
||||
self.iter_with_guard(guard)
|
||||
}
|
||||
self.iter_with_guard(guard)
|
||||
}
|
||||
|
||||
fn iter_from<'a>(
|
||||
&'a self,
|
||||
from: &[u8],
|
||||
backwards: bool,
|
||||
) -> Box<dyn Iterator<Item = TupleOfBytes> + 'a> {
|
||||
let guard = self.engine.read_lock_iterator();
|
||||
let from = from.to_vec(); // TODO change interface?
|
||||
fn iter_from<'a>(&'a self, from: &[u8], backwards: bool) -> Box<dyn Iterator<Item = TupleOfBytes> + 'a> {
|
||||
let guard = self.engine.read_lock_iterator();
|
||||
let from = from.to_vec(); // TODO change interface?
|
||||
|
||||
//let name = self.name.clone();
|
||||
//let name = self.name.clone();
|
||||
|
||||
if backwards {
|
||||
let statement = Box::leak(Box::new(
|
||||
guard
|
||||
.prepare(&format!(
|
||||
"SELECT key, value FROM {} WHERE key <= ? ORDER BY key DESC",
|
||||
&self.name
|
||||
))
|
||||
.unwrap(),
|
||||
));
|
||||
if backwards {
|
||||
let statement = Box::leak(Box::new(
|
||||
guard
|
||||
.prepare(&format!(
|
||||
"SELECT key, value FROM {} WHERE key <= ? ORDER BY key DESC",
|
||||
&self.name
|
||||
))
|
||||
.unwrap(),
|
||||
));
|
||||
|
||||
let statement_ref = NonAliasingBox(statement);
|
||||
let statement_ref = NonAliasingBox(statement);
|
||||
|
||||
let iterator = Box::new(
|
||||
statement
|
||||
.query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
|
||||
.unwrap()
|
||||
.map(move |r| r.unwrap()),
|
||||
);
|
||||
Box::new(PreparedStatementIterator {
|
||||
iterator,
|
||||
_statement_ref: statement_ref,
|
||||
})
|
||||
} else {
|
||||
let statement = Box::leak(Box::new(
|
||||
guard
|
||||
.prepare(&format!(
|
||||
"SELECT key, value FROM {} WHERE key >= ? ORDER BY key ASC",
|
||||
&self.name
|
||||
))
|
||||
.unwrap(),
|
||||
));
|
||||
let iterator = Box::new(
|
||||
statement
|
||||
.query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
|
||||
.unwrap()
|
||||
.map(move |r| r.unwrap()),
|
||||
);
|
||||
Box::new(PreparedStatementIterator {
|
||||
iterator,
|
||||
_statement_ref: statement_ref,
|
||||
})
|
||||
} else {
|
||||
let statement = Box::leak(Box::new(
|
||||
guard
|
||||
.prepare(&format!(
|
||||
"SELECT key, value FROM {} WHERE key >= ? ORDER BY key ASC",
|
||||
&self.name
|
||||
))
|
||||
.unwrap(),
|
||||
));
|
||||
|
||||
let statement_ref = NonAliasingBox(statement);
|
||||
let statement_ref = NonAliasingBox(statement);
|
||||
|
||||
let iterator = Box::new(
|
||||
statement
|
||||
.query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
|
||||
.unwrap()
|
||||
.map(move |r| r.unwrap()),
|
||||
);
|
||||
let iterator = Box::new(
|
||||
statement
|
||||
.query_map([from], |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
|
||||
.unwrap()
|
||||
.map(move |r| r.unwrap()),
|
||||
);
|
||||
|
||||
Box::new(PreparedStatementIterator {
|
||||
iterator,
|
||||
_statement_ref: statement_ref,
|
||||
})
|
||||
}
|
||||
}
|
||||
Box::new(PreparedStatementIterator {
|
||||
iterator,
|
||||
_statement_ref: statement_ref,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> {
|
||||
let guard = self.engine.write_lock();
|
||||
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> {
|
||||
let guard = self.engine.write_lock();
|
||||
|
||||
let old = self.get_with_guard(&guard, key)?;
|
||||
let old = self.get_with_guard(&guard, key)?;
|
||||
|
||||
let new =
|
||||
crate::utils::increment(old.as_deref()).expect("utils::increment always returns Some");
|
||||
let new = crate::utils::increment(old.as_deref()).expect("utils::increment always returns Some");
|
||||
|
||||
self.insert_with_guard(&guard, key, &new)?;
|
||||
self.insert_with_guard(&guard, key, &new)?;
|
||||
|
||||
Ok(new)
|
||||
}
|
||||
Ok(new)
|
||||
}
|
||||
|
||||
fn scan_prefix<'a>(&'a self, prefix: Vec<u8>) -> Box<dyn Iterator<Item = TupleOfBytes> + 'a> {
|
||||
Box::new(
|
||||
self.iter_from(&prefix, false)
|
||||
.take_while(move |(key, _)| key.starts_with(&prefix)),
|
||||
)
|
||||
}
|
||||
fn scan_prefix<'a>(&'a self, prefix: Vec<u8>) -> Box<dyn Iterator<Item = TupleOfBytes> + 'a> {
|
||||
Box::new(self.iter_from(&prefix, false).take_while(move |(key, _)| key.starts_with(&prefix)))
|
||||
}
|
||||
|
||||
fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
|
||||
self.watchers.watch(prefix)
|
||||
}
|
||||
fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
|
||||
self.watchers.watch(prefix)
|
||||
}
|
||||
|
||||
fn clear(&self) -> Result<()> {
|
||||
debug!("clear: running");
|
||||
self.engine
|
||||
.write_lock()
|
||||
.execute(format!("DELETE FROM {}", self.name).as_str(), [])?;
|
||||
debug!("clear: ran");
|
||||
Ok(())
|
||||
}
|
||||
fn clear(&self) -> Result<()> {
|
||||
debug!("clear: running");
|
||||
self.engine.write_lock().execute(format!("DELETE FROM {}", self.name).as_str(), [])?;
|
||||
debug!("clear: ran");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,56 +1,55 @@
|
|||
use std::{
|
||||
collections::{hash_map, HashMap},
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
sync::RwLock,
|
||||
collections::{hash_map, HashMap},
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
sync::RwLock,
|
||||
};
|
||||
|
||||
use tokio::sync::watch;
|
||||
|
||||
type Watcher = RwLock<HashMap<Vec<u8>, (watch::Sender<()>, watch::Receiver<()>)>>;
|
||||
|
||||
#[derive(Default)]
|
||||
pub(super) struct Watchers {
|
||||
watchers: Watcher,
|
||||
watchers: Watcher,
|
||||
}
|
||||
|
||||
impl Watchers {
|
||||
pub(super) fn watch<'a>(
|
||||
&'a self,
|
||||
prefix: &[u8],
|
||||
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
|
||||
let mut rx = match self.watchers.write().unwrap().entry(prefix.to_vec()) {
|
||||
hash_map::Entry::Occupied(o) => o.get().1.clone(),
|
||||
hash_map::Entry::Vacant(v) => {
|
||||
let (tx, rx) = tokio::sync::watch::channel(());
|
||||
v.insert((tx, rx.clone()));
|
||||
rx
|
||||
}
|
||||
};
|
||||
pub(super) fn watch<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
|
||||
let mut rx = match self.watchers.write().unwrap().entry(prefix.to_vec()) {
|
||||
hash_map::Entry::Occupied(o) => o.get().1.clone(),
|
||||
hash_map::Entry::Vacant(v) => {
|
||||
let (tx, rx) = tokio::sync::watch::channel(());
|
||||
v.insert((tx, rx.clone()));
|
||||
rx
|
||||
},
|
||||
};
|
||||
|
||||
Box::pin(async move {
|
||||
// Tx is never destroyed
|
||||
rx.changed().await.unwrap();
|
||||
})
|
||||
}
|
||||
pub(super) fn wake(&self, key: &[u8]) {
|
||||
let watchers = self.watchers.read().unwrap();
|
||||
let mut triggered = Vec::new();
|
||||
Box::pin(async move {
|
||||
// Tx is never destroyed
|
||||
rx.changed().await.unwrap();
|
||||
})
|
||||
}
|
||||
|
||||
for length in 0..=key.len() {
|
||||
if watchers.contains_key(&key[..length]) {
|
||||
triggered.push(&key[..length]);
|
||||
}
|
||||
}
|
||||
pub(super) fn wake(&self, key: &[u8]) {
|
||||
let watchers = self.watchers.read().unwrap();
|
||||
let mut triggered = Vec::new();
|
||||
|
||||
drop(watchers);
|
||||
for length in 0..=key.len() {
|
||||
if watchers.contains_key(&key[..length]) {
|
||||
triggered.push(&key[..length]);
|
||||
}
|
||||
}
|
||||
|
||||
if !triggered.is_empty() {
|
||||
let mut watchers = self.watchers.write().unwrap();
|
||||
for prefix in triggered {
|
||||
if let Some(tx) = watchers.remove(prefix) {
|
||||
let _ = tx.0.send(());
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
drop(watchers);
|
||||
|
||||
if !triggered.is_empty() {
|
||||
let mut watchers = self.watchers.write().unwrap();
|
||||
for prefix in triggered {
|
||||
if let Some(tx) = watchers.remove(prefix) {
|
||||
let _ = tx.0.send(());
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue