diff --git a/src/database/abstraction.rs b/src/database/abstraction.rs index 16f7d440..1db99f79 100644 --- a/src/database/abstraction.rs +++ b/src/database/abstraction.rs @@ -30,6 +30,7 @@ pub(crate) trait KeyValueDatabaseEngine: Send + Sync { pub(crate) trait KvTree: Send + Sync { fn get(&self, key: &[u8]) -> Result>>; + #[allow(dead_code)] #[cfg(feature = "rocksdb")] fn multi_get( &self, iter: Vec<(&Arc>, Vec)>, diff --git a/src/database/abstraction/rocksdb.rs b/src/database/abstraction/rocksdb.rs index 25dd727e..29376b15 100644 --- a/src/database/abstraction/rocksdb.rs +++ b/src/database/abstraction/rocksdb.rs @@ -4,10 +4,7 @@ use std::{ sync::{Arc, RwLock}, }; -use rust_rocksdb::{ - FlushOptions, - LogLevel::{Debug, Error, Fatal, Info, Warn}, -}; +use rust_rocksdb::LogLevel::{Debug, Error, Fatal, Info, Warn}; use tracing::{debug, info}; use super::{super::Config, watchers::Watchers, KeyValueDatabaseEngine, KvTree}; @@ -104,6 +101,7 @@ fn db_options(rocksdb_cache: &rust_rocksdb::Cache, config: &Config) -> rust_rock // restored via federation. db_opts.set_wal_recovery_mode(rust_rocksdb::DBRecoveryMode::TolerateCorruptedTailRecords); + // TODO: remove me? https://gitlab.com/famedly/conduit/-/merge_requests/602/diffs#a3a261d6a9014330581b5bdecd586dab5ae00245_62_54 let prefix_extractor = rust_rocksdb::SliceTransform::create_fixed_prefix(1); db_opts.set_prefix_extractor(prefix_extractor); @@ -176,7 +174,9 @@ impl KeyValueDatabaseEngine for Arc { fn cleanup(&self) -> Result<()> { debug!("Running flush_opt"); - rust_rocksdb::DBCommon::flush_opt(&self.rocks, &FlushOptions::default())?; + let flushoptions = rust_rocksdb::FlushOptions::default(); + + rust_rocksdb::DBCommon::flush_opt(&self.rocks, &flushoptions)?; Ok(()) } @@ -191,17 +191,28 @@ impl RocksDbEngineTree<'_> { } impl KvTree for RocksDbEngineTree<'_> { - fn get(&self, key: &[u8]) -> Result>> { Ok(self.db.rocks.get_cf(&self.cf(), key)?) } + fn get(&self, key: &[u8]) -> Result>> { + let mut readoptions = rust_rocksdb::ReadOptions::default(); + readoptions.set_total_order_seek(true); + + Ok(self.db.rocks.get_cf_opt(&self.cf(), key, &readoptions)?) + } fn multi_get( &self, iter: Vec<(&Arc>, Vec)>, ) -> Vec>, rust_rocksdb::Error>> { - self.db.rocks.multi_get_cf(iter) + let mut readoptions = rust_rocksdb::ReadOptions::default(); + readoptions.set_total_order_seek(true); + + self.db.rocks.multi_get_cf_opt(iter, &readoptions) } fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { + let writeoptions = rust_rocksdb::WriteOptions::default(); let lock = self.write_lock.read().unwrap(); - self.db.rocks.put_cf(&self.cf(), key, value)?; + + self.db.rocks.put_cf_opt(&self.cf(), key, value, &writeoptions)?; + drop(lock); self.watchers.wake(key); @@ -210,31 +221,44 @@ impl KvTree for RocksDbEngineTree<'_> { } fn insert_batch(&self, iter: &mut dyn Iterator, Vec)>) -> Result<()> { + let writeoptions = rust_rocksdb::WriteOptions::default(); + for (key, value) in iter { - self.db.rocks.put_cf(&self.cf(), key, value)?; + self.db.rocks.put_cf_opt(&self.cf(), key, value, &writeoptions)?; } Ok(()) } - fn remove(&self, key: &[u8]) -> Result<()> { Ok(self.db.rocks.delete_cf(&self.cf(), key)?) } + fn remove(&self, key: &[u8]) -> Result<()> { + let writeoptions = rust_rocksdb::WriteOptions::default(); + + Ok(self.db.rocks.delete_cf_opt(&self.cf(), key, &writeoptions)?) + } fn iter<'a>(&'a self) -> Box, Vec)> + 'a> { + let mut readoptions = rust_rocksdb::ReadOptions::default(); + readoptions.set_total_order_seek(true); + Box::new( self.db .rocks - .iterator_cf(&self.cf(), rust_rocksdb::IteratorMode::Start) + .iterator_cf_opt(&self.cf(), readoptions, rust_rocksdb::IteratorMode::Start) .map(std::result::Result::unwrap) .map(|(k, v)| (Vec::from(k), Vec::from(v))), ) } fn iter_from<'a>(&'a self, from: &[u8], backwards: bool) -> Box, Vec)> + 'a> { + let mut readoptions = rust_rocksdb::ReadOptions::default(); + readoptions.set_total_order_seek(true); + Box::new( self.db .rocks - .iterator_cf( + .iterator_cf_opt( &self.cf(), + readoptions, rust_rocksdb::IteratorMode::From( from, if backwards { @@ -250,23 +274,31 @@ impl KvTree for RocksDbEngineTree<'_> { } fn increment(&self, key: &[u8]) -> Result> { + let mut readoptions = rust_rocksdb::ReadOptions::default(); + readoptions.set_total_order_seek(true); + let writeoptions = rust_rocksdb::WriteOptions::default(); + let lock = self.write_lock.write().unwrap(); - let old = self.db.rocks.get_cf(&self.cf(), key)?; + let old = self.db.rocks.get_cf_opt(&self.cf(), key, &readoptions)?; let new = utils::increment(old.as_deref()).unwrap(); - self.db.rocks.put_cf(&self.cf(), key, &new)?; + self.db.rocks.put_cf_opt(&self.cf(), key, &new, &writeoptions)?; drop(lock); Ok(new) } fn increment_batch(&self, iter: &mut dyn Iterator>) -> Result<()> { + let mut readoptions = rust_rocksdb::ReadOptions::default(); + readoptions.set_total_order_seek(true); + let writeoptions = rust_rocksdb::WriteOptions::default(); + let lock = self.write_lock.write().unwrap(); for key in iter { - let old = self.db.rocks.get_cf(&self.cf(), &key)?; + let old = self.db.rocks.get_cf_opt(&self.cf(), &key, &readoptions)?; let new = utils::increment(old.as_deref()).unwrap(); - self.db.rocks.put_cf(&self.cf(), key, new)?; + self.db.rocks.put_cf_opt(&self.cf(), key, new, &writeoptions)?; } drop(lock); @@ -275,11 +307,15 @@ impl KvTree for RocksDbEngineTree<'_> { } fn scan_prefix<'a>(&'a self, prefix: Vec) -> Box, Vec)> + 'a> { + let mut readoptions = rust_rocksdb::ReadOptions::default(); + readoptions.set_total_order_seek(true); + Box::new( self.db .rocks - .iterator_cf( + .iterator_cf_opt( &self.cf(), + readoptions, rust_rocksdb::IteratorMode::From(&prefix, rust_rocksdb::Direction::Forward), ) .map(std::result::Result::unwrap)