From 01f4455ceb5395c78a5302503d1ce589569e4079 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 12 Jun 2024 01:08:07 +0000 Subject: [PATCH] convert rocksdb errors locally; remove from Error. Signed-off-by: Jason Volk --- Cargo.lock | 1 - src/core/Cargo.toml | 16 +++++------ src/core/error.rs | 11 +++----- src/database/rocksdb/kvtree.rs | 49 +++++++++++++++++++++++----------- src/database/rocksdb/mod.rs | 44 +++++++++++++++--------------- src/main/Cargo.toml | 2 -- 6 files changed, 65 insertions(+), 58 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6333c49e..3294de97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -675,7 +675,6 @@ dependencies = [ "reqwest", "ring", "ruma", - "rust-rocksdb-uwu", "sanitize-filename", "serde", "serde_json", diff --git a/src/core/Cargo.toml b/src/core/Cargo.toml index dc3fc1d4..46ca00c7 100644 --- a/src/core/Cargo.toml +++ b/src/core/Cargo.toml @@ -24,14 +24,10 @@ release_max_log_level = [ "log/max_level_trace", "log/release_max_level_info", ] -rocksdb = [ - "dep:rust-rocksdb", -] jemalloc = [ "dep:tikv-jemalloc-sys", "dep:tikv-jemalloc-ctl", "dep:tikv-jemallocator", - "rust-rocksdb/jemalloc", ] jemalloc_prof = [ "tikv-jemalloc-sys/profiling", @@ -39,18 +35,13 @@ jemalloc_prof = [ hardened_malloc = [ "dep:hardened_malloc-rs" ] -io_uring = [ - "rust-rocksdb/io-uring", -] -zstd_compression = [ - "rust-rocksdb/zstd", -] gzip_compression = [ "reqwest/gzip", ] brotli_compression = [ "reqwest/brotli", ] +zstd_compression =[] perf_measurements = [] sentry_telemetry = [] @@ -72,8 +63,13 @@ regex.workspace = true reqwest.workspace = true ring.workspace = true ruma.workspace = true +<<<<<<< HEAD rust-rocksdb.optional = true rust-rocksdb.workspace = true +======= +rusqlite.optional = true +rusqlite.workspace = true +>>>>>>> 6963b38f (convert rocksdb errors locally; remove from Error.) sanitize-filename.workspace = true serde_json.workspace = true serde_regex.workspace = true diff --git a/src/core/error.rs b/src/core/error.rs index 4fafd1e3..ac01cc96 100644 --- a/src/core/error.rs +++ b/src/core/error.rs @@ -25,12 +25,8 @@ pub type Result = std::result::Result; #[derive(Error)] pub enum Error { - #[cfg(feature = "rocksdb")] - #[error("There was a problem with the connection to the rocksdb database: {source}")] - RocksDb { - #[from] - source: rust_rocksdb::Error, - }, + #[error("{0}")] + Database(String), #[error("Could not generate an image: {source}")] Image { #[from] @@ -114,8 +110,7 @@ impl Error { let db_error = String::from("Database or I/O error occurred."); match self { - #[cfg(feature = "rocksdb")] - Self::RocksDb { + Self::Database { .. } => db_error, Self::Io { diff --git a/src/database/rocksdb/kvtree.rs b/src/database/rocksdb/kvtree.rs index 253b10c6..949db84e 100644 --- a/src/database/rocksdb/kvtree.rs +++ b/src/database/rocksdb/kvtree.rs @@ -2,7 +2,10 @@ use std::{future::Future, pin::Pin, sync::Arc}; use conduit::{utils, Result}; -use super::{rust_rocksdb::WriteBatchWithTransaction, watchers::Watchers, Engine, KeyValueDatabaseEngine, KvTree}; +use super::{ + or_else, result, rust_rocksdb::WriteBatchWithTransaction, watchers::Watchers, Engine, KeyValueDatabaseEngine, + KvTree, +}; pub(crate) struct RocksDbEngineTree<'a> { pub(crate) db: Arc, @@ -19,7 +22,7 @@ impl KvTree for RocksDbEngineTree<'_> { let mut readoptions = rust_rocksdb::ReadOptions::default(); readoptions.set_total_order_seek(true); - Ok(self.db.rocks.get_cf_opt(&self.cf(), key, &readoptions)?) + result(self.db.rocks.get_cf_opt(&self.cf(), key, &readoptions)) } fn multi_get(&self, keys: &[&[u8]]) -> Result>>> { @@ -36,9 +39,10 @@ impl KvTree for RocksDbEngineTree<'_> { .rocks .batched_multi_get_cf_opt(&self.cf(), keys, SORTED, &readoptions) { - match res? { - Some(res) => ret.push(Some((*res).to_vec())), - None => ret.push(None), + match res { + Ok(Some(res)) => ret.push(Some((*res).to_vec())), + Ok(None) => ret.push(None), + Err(e) => return or_else(e), } } @@ -50,7 +54,8 @@ impl KvTree for RocksDbEngineTree<'_> { self.db .rocks - .put_cf_opt(&self.cf(), key, value, &writeoptions)?; + .put_cf_opt(&self.cf(), key, value, &writeoptions) + .or_else(or_else)?; if !self.db.corked() { self.db.flush()?; @@ -70,25 +75,25 @@ impl KvTree for RocksDbEngineTree<'_> { batch.put_cf(&self.cf(), key, value); } - let result = self.db.rocks.write_opt(batch, &writeoptions); + let res = self.db.rocks.write_opt(batch, &writeoptions); if !self.db.corked() { self.db.flush()?; } - Ok(result?) + result(res) } fn remove(&self, key: &[u8]) -> Result<()> { let writeoptions = rust_rocksdb::WriteOptions::default(); - let result = self.db.rocks.delete_cf_opt(&self.cf(), key, &writeoptions); + let res = self.db.rocks.delete_cf_opt(&self.cf(), key, &writeoptions); if !self.db.corked() { self.db.flush()?; } - Ok(result?) + result(res) } fn remove_batch(&self, iter: &mut dyn Iterator>) -> Result<()> { @@ -100,13 +105,13 @@ impl KvTree for RocksDbEngineTree<'_> { batch.delete_cf(&self.cf(), key); } - let result = self.db.rocks.write_opt(batch, &writeoptions); + let res = self.db.rocks.write_opt(batch, &writeoptions); if !self.db.corked() { self.db.flush()?; } - Ok(result?) + result(res) } fn iter<'a>(&'a self) -> Box, Vec)> + 'a> { @@ -151,11 +156,16 @@ impl KvTree for RocksDbEngineTree<'_> { readoptions.set_total_order_seek(true); let writeoptions = rust_rocksdb::WriteOptions::default(); - let old = self.db.rocks.get_cf_opt(&self.cf(), key, &readoptions)?; + let old = self + .db + .rocks + .get_cf_opt(&self.cf(), key, &readoptions) + .or_else(or_else)?; let new = utils::increment(old.as_deref()); self.db .rocks - .put_cf_opt(&self.cf(), key, new, &writeoptions)?; + .put_cf_opt(&self.cf(), key, new, &writeoptions) + .or_else(or_else)?; if !self.db.corked() { self.db.flush()?; @@ -172,12 +182,19 @@ impl KvTree for RocksDbEngineTree<'_> { let mut batch = WriteBatchWithTransaction::::default(); for key in iter { - let old = self.db.rocks.get_cf_opt(&self.cf(), &key, &readoptions)?; + let old = self + .db + .rocks + .get_cf_opt(&self.cf(), &key, &readoptions) + .or_else(or_else)?; let new = utils::increment(old.as_deref()); batch.put_cf(&self.cf(), key, new); } - self.db.rocks.write_opt(batch, &writeoptions)?; + self.db + .rocks + .write_opt(batch, &writeoptions) + .or_else(or_else)?; if !self.db.corked() { self.db.flush()?; diff --git a/src/database/rocksdb/mod.rs b/src/database/rocksdb/mod.rs index 52fd303d..943b6745 100644 --- a/src/database/rocksdb/mod.rs +++ b/src/database/rocksdb/mod.rs @@ -51,7 +51,7 @@ impl KeyValueDatabaseEngine for Arc { let mut col_cache = HashMap::new(); col_cache.insert("primary".to_owned(), Cache::new_lru_cache(col_cache_capacity_bytes)); - let mut db_env = Env::new()?; + let mut db_env = Env::new().or_else(or_else)?; let row_cache = Cache::new_lru_cache(row_cache_capacity_bytes); let db_opts = db_options(config, &mut db_env, &row_cache, col_cache.get("primary").expect("cache")); @@ -73,12 +73,13 @@ impl KeyValueDatabaseEngine for Arc { .collect::>(); debug!("Opening database..."); - let db = if config.rocksdb_read_only { - Db::::open_cf_for_read_only(&db_opts, &config.database_path, cfs.clone(), false)? + let res = if config.rocksdb_read_only { + Db::::open_cf_for_read_only(&db_opts, &config.database_path, cfs.clone(), false) } else { - Db::::open_cf_descriptors(&db_opts, &config.database_path, cfds)? + Db::::open_cf_descriptors(&db_opts, &config.database_path, cfds) }; + let db = res.or_else(or_else)?; info!( "Opened database at sequence number {} in {:?}", db.latest_sequence_number(), @@ -115,17 +116,9 @@ impl KeyValueDatabaseEngine for Arc { })) } - fn flush(&self) -> Result<()> { - DBCommon::flush_wal(&self.rocks, false)?; + fn flush(&self) -> Result<()> { result(DBCommon::flush_wal(&self.rocks, false)) } - Ok(()) - } - - fn sync(&self) -> Result<()> { - DBCommon::flush_wal(&self.rocks, true)?; - - Ok(()) - } + fn sync(&self) -> Result<()> { result(DBCommon::flush_wal(&self.rocks, true)) } fn corked(&self) -> bool { self.corks.load(std::sync::atomic::Ordering::Relaxed) > 0 } @@ -146,7 +139,7 @@ impl KeyValueDatabaseEngine for Arc { #[allow(clippy::as_conversions, clippy::cast_sign_loss, clippy::cast_possible_truncation)] fn memory_usage(&self) -> Result { let mut res = String::new(); - let stats = get_memory_usage_stats(Some(&[&self.rocks]), Some(&[&self.row_cache]))?; + let stats = get_memory_usage_stats(Some(&[&self.rocks]), Some(&[&self.row_cache])).or_else(or_else)?; writeln!( res, "Memory buffers: {:.2} MiB\nPending write: {:.2} MiB\nTable readers: {:.2} MiB\nRow cache: {:.2} MiB", @@ -168,10 +161,7 @@ impl KeyValueDatabaseEngine for Arc { fn cleanup(&self) -> Result<()> { debug!("Running flush_opt"); let flushoptions = rust_rocksdb::FlushOptions::default(); - - DBCommon::flush_opt(&self.rocks, &flushoptions)?; - - Ok(()) + result(DBCommon::flush_opt(&self.rocks, &flushoptions)) } fn backup(&self) -> Result<(), Box> { @@ -214,8 +204,8 @@ impl KeyValueDatabaseEngine for Arc { } let mut res = String::new(); - let options = BackupEngineOptions::new(path.unwrap())?; - let engine = BackupEngine::open(&options, &self.env)?; + let options = BackupEngineOptions::new(path.expect("valid path")).or_else(or_else)?; + let engine = BackupEngine::open(&options, &self.env).or_else(or_else)?; for info in engine.get_backup_info() { writeln!( res, @@ -275,3 +265,15 @@ impl Drop for Engine { self.env.join_all_threads(); } } + +#[inline] +fn result(r: std::result::Result) -> Result { + r.map_or_else(or_else, and_then) +} + +#[inline(always)] +fn and_then(t: T) -> Result { Ok(t) } + +fn or_else(e: rust_rocksdb::Error) -> Result { Err(map_err(e)) } + +fn map_err(e: rust_rocksdb::Error) -> conduit::Error { conduit::Error::Database(e.into_string()) } diff --git a/src/main/Cargo.toml b/src/main/Cargo.toml index 9ef51545..cb7cb9a9 100644 --- a/src/main/Cargo.toml +++ b/src/main/Cargo.toml @@ -83,7 +83,6 @@ hardened_malloc = [ ] io_uring = [ "conduit-admin/io_uring", - "conduit-core/io_uring", "conduit-database/io_uring", ] jemalloc = [ @@ -119,7 +118,6 @@ release_max_log_level = [ ] rocksdb = [ "conduit-admin/rocksdb", - "conduit-core/rocksdb", "conduit-database/rocksdb", ] sentry_telemetry = [