diff --git a/src/admin/debug/commands.rs b/src/admin/debug/commands.rs index 5bc65d9b..07daaf0a 100644 --- a/src/admin/debug/commands.rs +++ b/src/admin/debug/commands.rs @@ -911,8 +911,8 @@ pub(super) async fn database_stats( let map_name = map.as_ref().map_or(EMPTY, String::as_str); let mut out = String::new(); - for (name, map) in self.services.db.iter() { - if !map_name.is_empty() && *map_name != *name { + for (&name, map) in self.services.db.iter() { + if !map_name.is_empty() && map_name != name { continue; } diff --git a/src/database/database.rs b/src/database/database.rs deleted file mode 100644 index 83d2c201..00000000 --- a/src/database/database.rs +++ /dev/null @@ -1,55 +0,0 @@ -use std::{ops::Index, sync::Arc}; - -use conduwuit::{err, Result, Server}; - -use crate::{ - maps, - maps::{Maps, MapsKey, MapsVal}, - Engine, Map, -}; - -pub struct Database { - pub db: Arc, - maps: Maps, -} - -impl Database { - /// Load an existing database or create a new one. - pub async fn open(server: &Arc) -> Result> { - let db = Engine::open(server).await?; - Ok(Arc::new(Self { db: db.clone(), maps: maps::open(&db)? })) - } - - #[inline] - pub fn get(&self, name: &str) -> Result<&Arc> { - self.maps - .get(name) - .ok_or_else(|| err!(Request(NotFound("column not found")))) - } - - #[inline] - pub fn iter(&self) -> impl Iterator + Send + '_ { - self.maps.iter() - } - - #[inline] - pub fn keys(&self) -> impl Iterator + Send + '_ { self.maps.keys() } - - #[inline] - #[must_use] - pub fn is_read_only(&self) -> bool { self.db.is_read_only() } - - #[inline] - #[must_use] - pub fn is_secondary(&self) -> bool { self.db.is_secondary() } -} - -impl Index<&str> for Database { - type Output = Arc; - - fn index(&self, name: &str) -> &Self::Output { - self.maps - .get(name) - .expect("column in database does not exist") - } -} diff --git a/src/database/engine.rs b/src/database/engine.rs index 3d554eac..2958f73f 100644 --- a/src/database/engine.rs +++ b/src/database/engine.rs @@ -1,288 +1,64 @@ +mod backup; +mod cf_opts; +pub(crate) mod context; +mod db_opts; +pub(crate) mod descriptor; +mod files; +mod logger; +mod memory_usage; +mod open; +mod repair; + use std::{ - collections::{BTreeSet, HashMap}, ffi::CStr, - fmt::Write, - path::PathBuf, - sync::{atomic::AtomicU32, Arc, Mutex, RwLock}, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, }; -use conduwuit::{ - debug, error, info, utils::time::rfc2822_from_seconds, warn, Err, Result, Server, -}; -use rocksdb::{ - backup::{BackupEngine, BackupEngineOptions}, - perf::get_memory_usage_stats, - AsColumnFamilyRef, BoundColumnFamily, Cache, ColumnFamilyDescriptor, DBCommon, - DBWithThreadMode, Env, LogLevel, MultiThreaded, Options, -}; +use conduwuit::{debug, info, warn, Err, Result}; +use rocksdb::{AsColumnFamilyRef, BoundColumnFamily, DBCommon, DBWithThreadMode, MultiThreaded}; -use crate::{ - opts::{cf_options, db_options}, - or_else, - pool::Pool, - result, - util::map_err, -}; +use crate::{pool::Pool, result, Context}; pub struct Engine { - pub(crate) server: Arc, - row_cache: Cache, - col_cache: RwLock>, - opts: Options, - env: Env, - cfs: Mutex>, - pub(crate) pool: Arc, - pub(crate) db: Db, - corks: AtomicU32, pub(super) read_only: bool, pub(super) secondary: bool, + corks: AtomicU32, + pub(crate) db: Db, + pub(crate) pool: Arc, + pub(crate) ctx: Arc, } pub(crate) type Db = DBWithThreadMode; impl Engine { - #[tracing::instrument(skip_all)] - pub(crate) async fn open(server: &Arc) -> Result> { - let config = &server.config; - let path = &config.database_path; - - let cache_capacity_bytes = config.db_cache_capacity_mb * 1024.0 * 1024.0; - - #[allow(clippy::as_conversions, clippy::cast_sign_loss, clippy::cast_possible_truncation)] - let row_cache_capacity_bytes = (cache_capacity_bytes * 0.50) as usize; - - #[allow(clippy::as_conversions, clippy::cast_sign_loss, clippy::cast_possible_truncation)] - let col_cache_capacity_bytes = (cache_capacity_bytes * 0.50) as usize; - - let mut col_cache = HashMap::new(); - col_cache.insert("primary".to_owned(), Cache::new_lru_cache(col_cache_capacity_bytes)); - - let mut db_env = Env::new().or_else(or_else)?; - let row_cache = Cache::new_lru_cache(row_cache_capacity_bytes); - let db_opts = db_options( - config, - &mut db_env, - &row_cache, - col_cache.get("primary").expect("primary cache exists"), - )?; - - debug!("Listing column families in database"); - let cfs = Db::list_cf(&db_opts, &config.database_path) - .unwrap_or_default() - .into_iter() - .collect::>(); - - debug!("Configuring {} column families found in database", cfs.len()); - let cfopts = cfs - .iter() - .map(|name| cf_options(config, name, db_opts.clone(), &mut col_cache)) - .collect::>>()?; - - debug!("Opening {} column family descriptors in database", cfs.len()); - let cfds = cfs - .iter() - .zip(cfopts.into_iter()) - .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)) - .collect::>(); - - debug!("Starting frontend request pool"); - let pool = Pool::new(server)?; - - let load_time = std::time::Instant::now(); - if config.rocksdb_repair { - repair(&db_opts, &config.database_path)?; - } - - debug!("Opening database..."); - let res = if config.rocksdb_read_only { - Db::open_cf_descriptors_read_only(&db_opts, path, cfds, false) - } else if config.rocksdb_secondary { - Db::open_cf_descriptors_as_secondary(&db_opts, path, path, cfds) - } else { - Db::open_cf_descriptors(&db_opts, path, cfds) - }; - - let db = res.or_else(or_else)?; - info!( - columns = cfs.len(), - sequence = %db.latest_sequence_number(), - time = ?load_time.elapsed(), - "Opened database." - ); - - Ok(Arc::new(Self { - server: server.clone(), - row_cache, - col_cache: RwLock::new(col_cache), - opts: db_opts, - env: db_env, - cfs: Mutex::new(cfs), - corks: AtomicU32::new(0), - read_only: config.rocksdb_read_only, - secondary: config.rocksdb_secondary, - pool, - db, - })) - } - - #[tracing::instrument(skip(self), level = "trace")] - pub(crate) fn open_cf(&self, name: &str) -> Result>> { - let mut cfs = self.cfs.lock().expect("locked"); - if !cfs.contains(name) { - debug!("Creating new column family in database: {name}"); - - let mut col_cache = self.col_cache.write().expect("locked"); - let opts = cf_options(&self.server.config, name, self.opts.clone(), &mut col_cache)?; - if let Err(e) = self.db.create_cf(name, &opts) { - error!(?name, "Failed to create new column family: {e}"); - return or_else(e); - } - - cfs.insert(name.to_owned()); - } - - Ok(self.cf(name)) - } - pub(crate) fn cf(&self, name: &str) -> Arc> { self.db .cf_handle(name) - .expect("column was created and exists") - } - - pub fn flush(&self) -> Result<()> { result(DBCommon::flush_wal(&self.db, false)) } - - pub fn sync(&self) -> Result<()> { result(DBCommon::flush_wal(&self.db, true)) } - - #[inline] - pub fn corked(&self) -> bool { self.corks.load(std::sync::atomic::Ordering::Relaxed) > 0 } - - #[inline] - pub(crate) fn cork(&self) { - self.corks - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + .expect("column must be described prior to database open") } #[inline] - pub(crate) fn uncork(&self) { - self.corks - .fetch_sub(1, std::sync::atomic::Ordering::Relaxed); - } + pub(crate) fn cork(&self) { self.corks.fetch_add(1, Ordering::Relaxed); } - pub fn memory_usage(&self) -> Result { - let mut res = String::new(); - let stats = get_memory_usage_stats(Some(&[&self.db]), Some(&[&self.row_cache])) - .or_else(or_else)?; - let mibs = |input| f64::from(u32::try_from(input / 1024).unwrap_or(0)) / 1024.0; - writeln!( - res, - "Memory buffers: {:.2} MiB\nPending write: {:.2} MiB\nTable readers: {:.2} MiB\nRow \ - cache: {:.2} MiB", - mibs(stats.mem_table_total), - mibs(stats.mem_table_unflushed), - mibs(stats.mem_table_readers_total), - mibs(u64::try_from(self.row_cache.get_usage())?), - )?; + #[inline] + pub(crate) fn uncork(&self) { self.corks.fetch_sub(1, Ordering::Relaxed); } - for (name, cache) in &*self.col_cache.read().expect("locked") { - writeln!(res, "{name} cache: {:.2} MiB", mibs(u64::try_from(cache.get_usage())?))?; - } - - Ok(res) - } - - #[tracing::instrument(skip(self), level = "debug")] - pub fn cleanup(&self) -> Result<()> { - debug!("Running flush_opt"); - let flushoptions = rocksdb::FlushOptions::default(); - result(DBCommon::flush_opt(&self.db, &flushoptions)) - } + #[inline] + pub fn corked(&self) -> bool { self.corks.load(Ordering::Relaxed) > 0 } #[tracing::instrument(skip(self))] - pub fn backup(&self) -> Result { - let config = &self.server.config; - let path = config.database_backup_path.as_ref(); - if path.is_none() || path.is_some_and(|path| path.as_os_str().is_empty()) { - return Ok(()); - } + pub fn sync(&self) -> Result { result(DBCommon::flush_wal(&self.db, true)) } - let options = BackupEngineOptions::new(path.expect("valid database backup path")) - .map_err(map_err)?; - let mut engine = BackupEngine::open(&options, &self.env).map_err(map_err)?; - if config.database_backups_to_keep > 0 { - let flush = !self.is_read_only(); - engine - .create_new_backup_flush(&self.db, flush) - .map_err(map_err)?; + #[tracing::instrument(skip(self), level = "debug")] + pub fn flush(&self) -> Result { result(DBCommon::flush_wal(&self.db, false)) } - let engine_info = engine.get_backup_info(); - let info = &engine_info.last().expect("backup engine info is not empty"); - info!( - "Created database backup #{} using {} bytes in {} files", - info.backup_id, info.size, info.num_files, - ); - } - - if config.database_backups_to_keep >= 0 { - let keep = u32::try_from(config.database_backups_to_keep)?; - if let Err(e) = engine.purge_old_backups(keep.try_into()?) { - error!("Failed to purge old backup: {e:?}"); - } - } - - Ok(()) - } - - pub fn backup_list(&self) -> Result { - let config = &self.server.config; - let path = config.database_backup_path.as_ref(); - if path.is_none() || path.is_some_and(|path| path.as_os_str().is_empty()) { - return Ok("Configure database_backup_path to enable backups, or the path \ - specified is not valid" - .to_owned()); - } - - let mut res = String::new(); - let options = BackupEngineOptions::new(path.expect("valid database backup path")) - .or_else(or_else)?; - let engine = BackupEngine::open(&options, &self.env).or_else(or_else)?; - for info in engine.get_backup_info() { - writeln!( - res, - "#{} {}: {} bytes, {} files", - info.backup_id, - rfc2822_from_seconds(info.timestamp), - info.size, - info.num_files, - )?; - } - - Ok(res) - } - - pub fn file_list(&self) -> Result { - match self.db.live_files() { - | Err(e) => Ok(String::from(e)), - | Ok(files) => { - let mut res = String::new(); - writeln!(res, "| lev | sst | keys | dels | size | column |")?; - writeln!(res, "| ---: | :--- | ---: | ---: | ---: | :--- |")?; - for file in files { - writeln!( - res, - "| {} | {:<13} | {:7}+ | {:4}- | {:9} | {} |", - file.level, - file.name, - file.num_entries, - file.num_deletions, - file.size, - file.column_family_name, - )?; - } - - Ok(res) - }, - } + #[tracing::instrument(skip(self), level = "debug")] + pub fn sort(&self) -> Result { + let flushoptions = rocksdb::FlushOptions::default(); + result(DBCommon::flush_opt(&self.db, &flushoptions)) } /// Query for database property by null-terminated name which is expected to @@ -312,56 +88,14 @@ impl Engine { pub fn is_secondary(&self) -> bool { self.secondary } } -pub(crate) fn repair(db_opts: &Options, path: &PathBuf) -> Result<()> { - warn!("Starting database repair. This may take a long time..."); - match Db::repair(db_opts, path) { - | Ok(()) => info!("Database repair successful."), - | Err(e) => return Err!("Repair failed: {e:?}"), - } - - Ok(()) -} - -#[tracing::instrument( - parent = None, - name = "rocksdb", - level = "trace" - skip(msg), -)] -pub(crate) fn handle_log(level: LogLevel, msg: &str) { - let msg = msg.trim(); - if msg.starts_with("Options") { - return; - } - - match level { - | LogLevel::Header | LogLevel::Debug => debug!("{msg}"), - | LogLevel::Error | LogLevel::Fatal => error!("{msg}"), - | LogLevel::Info => debug!("{msg}"), - | LogLevel::Warn => warn!("{msg}"), - }; -} - impl Drop for Engine { #[cold] fn drop(&mut self) { const BLOCKING: bool = true; - debug!("Closing frontend pool"); - self.pool.close(); - debug!("Waiting for background tasks to finish..."); self.db.cancel_all_background_work(BLOCKING); - debug!("Shutting down background threads"); - self.env.set_high_priority_background_threads(0); - self.env.set_low_priority_background_threads(0); - self.env.set_bottom_priority_background_threads(0); - self.env.set_background_threads(0); - - debug!("Joining background threads..."); - self.env.join_all_threads(); - info!( sequence = %self.db.latest_sequence_number(), "Closing database..." diff --git a/src/database/engine/backup.rs b/src/database/engine/backup.rs new file mode 100644 index 00000000..db718c2c --- /dev/null +++ b/src/database/engine/backup.rs @@ -0,0 +1,73 @@ +use std::fmt::Write; + +use conduwuit::{error, implement, info, utils::time::rfc2822_from_seconds, warn, Result}; +use rocksdb::backup::{BackupEngine, BackupEngineOptions}; + +use super::Engine; +use crate::{or_else, util::map_err}; + +#[implement(Engine)] +#[tracing::instrument(skip(self))] +pub fn backup(&self) -> Result { + let server = &self.ctx.server; + let config = &server.config; + let path = config.database_backup_path.as_ref(); + if path.is_none() || path.is_some_and(|path| path.as_os_str().is_empty()) { + return Ok(()); + } + + let options = + BackupEngineOptions::new(path.expect("valid database backup path")).map_err(map_err)?; + let mut engine = BackupEngine::open(&options, &*self.ctx.env.lock()?).map_err(map_err)?; + if config.database_backups_to_keep > 0 { + let flush = !self.is_read_only(); + engine + .create_new_backup_flush(&self.db, flush) + .map_err(map_err)?; + + let engine_info = engine.get_backup_info(); + let info = &engine_info.last().expect("backup engine info is not empty"); + info!( + "Created database backup #{} using {} bytes in {} files", + info.backup_id, info.size, info.num_files, + ); + } + + if config.database_backups_to_keep >= 0 { + let keep = u32::try_from(config.database_backups_to_keep)?; + if let Err(e) = engine.purge_old_backups(keep.try_into()?) { + error!("Failed to purge old backup: {e:?}"); + } + } + + Ok(()) +} + +#[implement(Engine)] +pub fn backup_list(&self) -> Result { + let server = &self.ctx.server; + let config = &server.config; + let path = config.database_backup_path.as_ref(); + if path.is_none() || path.is_some_and(|path| path.as_os_str().is_empty()) { + return Ok("Configure database_backup_path to enable backups, or the path specified is \ + not valid" + .to_owned()); + } + + let mut res = String::new(); + let options = + BackupEngineOptions::new(path.expect("valid database backup path")).or_else(or_else)?; + let engine = BackupEngine::open(&options, &*self.ctx.env.lock()?).or_else(or_else)?; + for info in engine.get_backup_info() { + writeln!( + res, + "#{} {}: {} bytes, {} files", + info.backup_id, + rfc2822_from_seconds(info.timestamp), + info.size, + info.num_files, + )?; + } + + Ok(res) +} diff --git a/src/database/engine/cf_opts.rs b/src/database/engine/cf_opts.rs new file mode 100644 index 00000000..98d74044 --- /dev/null +++ b/src/database/engine/cf_opts.rs @@ -0,0 +1,214 @@ +use conduwuit::{ + err, + utils::{math::Expected, BoolExt}, + Config, Result, +}; +use rocksdb::{ + BlockBasedIndexType, BlockBasedOptions, BlockBasedPinningTier, Cache, + DBCompressionType as CompressionType, DataBlockIndexType, LruCacheOptions, Options, + UniversalCompactOptions, UniversalCompactionStopStyle, +}; + +use super::descriptor::{CacheDisp, Descriptor}; +use crate::Context; + +/// Adjust options for the specific column by name. Provide the result of +/// db_options() as the argument to this function and use the return value in +/// the arguments to open the specific column. +pub(crate) fn cf_options(ctx: &Context, opts: Options, desc: &Descriptor) -> Result { + let cache = get_cache(ctx, desc); + let config = &ctx.server.config; + descriptor_cf_options(opts, desc.clone(), config, cache.as_ref()) +} + +fn descriptor_cf_options( + mut opts: Options, + mut desc: Descriptor, + config: &Config, + cache: Option<&Cache>, +) -> Result { + set_compression(&mut desc, config); + set_table_options(&mut opts, &desc, cache); + + opts.set_min_write_buffer_number(1); + opts.set_max_write_buffer_number(2); + if let Some(write_size) = desc.write_size { + opts.set_write_buffer_size(write_size); + } + + opts.set_target_file_size_base(desc.file_size); + opts.set_target_file_size_multiplier(desc.file_shape[0]); + + opts.set_level_zero_file_num_compaction_trigger(desc.level0_width); + opts.set_level_compaction_dynamic_level_bytes(false); + opts.set_ttl(desc.ttl); + + opts.set_max_bytes_for_level_base(desc.level_size); + opts.set_max_bytes_for_level_multiplier(1.0); + opts.set_max_bytes_for_level_multiplier_additional(&desc.level_shape); + + opts.set_compaction_style(desc.compaction); + opts.set_compaction_pri(desc.compaction_pri); + opts.set_universal_compaction_options(&uc_options(&desc)); + + opts.set_compression_type(desc.compression); + opts.set_compression_options(-14, desc.compression_level, 0, 0); // -14 w_bits used by zlib. + if let Some(&bottommost_level) = desc.bottommost_level.as_ref() { + opts.set_bottommost_compression_type(desc.compression); + opts.set_bottommost_zstd_max_train_bytes(0, true); + opts.set_bottommost_compression_options( + -14, // -14 w_bits is only read by zlib. + bottommost_level, + 0, + 0, + true, + ); + } + + Ok(opts) +} + +fn set_table_options(opts: &mut Options, desc: &Descriptor, cache: Option<&Cache>) { + let mut table = table_options(desc); + if let Some(cache) = cache { + table.set_block_cache(cache); + } else { + table.disable_cache(); + } + + opts.set_block_based_table_factory(&table); +} + +fn set_compression(desc: &mut Descriptor, config: &Config) { + desc.compression = match config.rocksdb_compression_algo.as_ref() { + | "snappy" => CompressionType::Snappy, + | "zlib" => CompressionType::Zlib, + | "bz2" => CompressionType::Bz2, + | "lz4" => CompressionType::Lz4, + | "lz4hc" => CompressionType::Lz4hc, + | "none" => CompressionType::None, + | _ => CompressionType::Zstd, + }; + + desc.compression_level = config.rocksdb_compression_level; + desc.bottommost_level = config + .rocksdb_bottommost_compression + .then_some(config.rocksdb_bottommost_compression_level); +} + +fn uc_options(desc: &Descriptor) -> UniversalCompactOptions { + let mut opts = UniversalCompactOptions::default(); + opts.set_stop_style(UniversalCompactionStopStyle::Total); + opts.set_min_merge_width(desc.merge_width.0); + opts.set_max_merge_width(desc.merge_width.1); + opts.set_max_size_amplification_percent(10000); + opts.set_compression_size_percent(-1); + opts.set_size_ratio(1); + + opts +} + +fn table_options(desc: &Descriptor) -> BlockBasedOptions { + let mut opts = BlockBasedOptions::default(); + + opts.set_block_size(desc.block_size); + opts.set_metadata_block_size(desc.index_size); + + opts.set_cache_index_and_filter_blocks(true); + opts.set_pin_top_level_index_and_filter(false); + opts.set_pin_l0_filter_and_index_blocks_in_cache(false); + opts.set_partition_pinning_tier(BlockBasedPinningTier::None); + opts.set_unpartitioned_pinning_tier(BlockBasedPinningTier::None); + opts.set_top_level_index_pinning_tier(BlockBasedPinningTier::None); + + opts.set_use_delta_encoding(false); + opts.set_index_type(BlockBasedIndexType::TwoLevelIndexSearch); + opts.set_data_block_index_type( + desc.block_index_hashing + .map_or(DataBlockIndexType::BinarySearch, || DataBlockIndexType::BinaryAndHash), + ); + + opts +} + +fn get_cache(ctx: &Context, desc: &Descriptor) -> Option { + let config = &ctx.server.config; + + // Some cache capacities are overriden by server config in a strange but + // legacy-compat way + let cap = match desc.name { + | "eventid_pduid" => Some(config.eventid_pdu_cache_capacity), + | "eventid_shorteventid" => Some(config.eventidshort_cache_capacity), + | "shorteventid_eventid" => Some(config.shorteventid_cache_capacity), + | "shorteventid_authchain" => Some(config.auth_chain_cache_capacity), + | "shortstatekey_statekey" => Some(config.shortstatekey_cache_capacity), + | "statekey_shortstatekey" => Some(config.statekeyshort_cache_capacity), + | "servernameevent_data" => Some(config.servernameevent_data_cache_capacity), + | "pduid_pdu" | "eventid_outlierpdu" => Some(config.pdu_cache_capacity), + | _ => None, + } + .map(TryInto::try_into) + .transpose() + .expect("u32 to usize"); + + let ent_size: usize = desc + .key_size_hint + .unwrap_or_default() + .expected_add(desc.val_size_hint.unwrap_or_default()); + + let size = match cap { + | Some(cap) => cache_size(config, cap, ent_size), + | _ => desc.cache_size, + }; + + let shard_bits: i32 = desc + .cache_shards + .ilog2() + .try_into() + .expect("u32 to i32 conversion"); + + debug_assert!(shard_bits <= 6, "cache shards limited to 64"); + let mut cache_opts = LruCacheOptions::default(); + cache_opts.set_num_shard_bits(shard_bits); + cache_opts.set_capacity(size); + + let mut caches = ctx.col_cache.lock().expect("locked"); + match desc.cache_disp { + | CacheDisp::Unique if desc.cache_size == 0 => None, + | CacheDisp::Unique => { + let cache = Cache::new_lru_cache_opts(&cache_opts); + caches.insert(desc.name.into(), cache.clone()); + Some(cache) + }, + + | CacheDisp::SharedWith(other) if !caches.contains_key(other) => { + let cache = Cache::new_lru_cache_opts(&cache_opts); + caches.insert(desc.name.into(), cache.clone()); + Some(cache) + }, + + | CacheDisp::SharedWith(other) => Some( + caches + .get(other) + .cloned() + .expect("caches.contains_key(other) must be true"), + ), + + | CacheDisp::Shared => Some( + caches + .get("Shared") + .cloned() + .expect("shared cache must already exist"), + ), + } +} + +#[allow(clippy::as_conversions, clippy::cast_sign_loss, clippy::cast_possible_truncation)] +pub(crate) fn cache_size(config: &Config, base_size: u32, entity_size: usize) -> usize { + let ents = f64::from(base_size) * config.cache_capacity_modifier; + + (ents as usize) + .checked_mul(entity_size) + .ok_or_else(|| err!(Config("cache_capacity_modifier", "Cache size is too large."))) + .expect("invalid cache size") +} diff --git a/src/database/engine/context.rs b/src/database/engine/context.rs new file mode 100644 index 00000000..76238f7d --- /dev/null +++ b/src/database/engine/context.rs @@ -0,0 +1,73 @@ +use std::{ + collections::BTreeMap, + sync::{Arc, Mutex}, +}; + +use conduwuit::{debug, utils::math::usize_from_f64, Result, Server}; +use rocksdb::{Cache, Env}; + +use crate::{or_else, pool::Pool}; + +/// Some components are constructed prior to opening the database and must +/// outlive the database. These can also be shared between database instances +/// though at the time of this comment we only open one database per process. +/// These assets are housed in the shared Context. +pub(crate) struct Context { + pub(crate) pool: Arc, + pub(crate) col_cache: Mutex>, + pub(crate) row_cache: Mutex, + pub(crate) env: Mutex, + pub(crate) server: Arc, +} + +impl Context { + pub(crate) fn new(server: &Arc) -> Result> { + let config = &server.config; + let cache_capacity_bytes = config.db_cache_capacity_mb * 1024.0 * 1024.0; + + let row_cache_capacity_bytes = usize_from_f64(cache_capacity_bytes * 0.50)?; + let row_cache = Cache::new_lru_cache(row_cache_capacity_bytes); + + let col_cache_capacity_bytes = usize_from_f64(cache_capacity_bytes * 0.50)?; + let col_cache = Cache::new_lru_cache(col_cache_capacity_bytes); + + let col_cache: BTreeMap<_, _> = [("Shared".to_owned(), col_cache)].into(); + + let mut env = Env::new().or_else(or_else)?; + + if config.rocksdb_compaction_prio_idle { + env.lower_thread_pool_cpu_priority(); + } + + if config.rocksdb_compaction_ioprio_idle { + env.lower_thread_pool_io_priority(); + } + + Ok(Arc::new(Self { + pool: Pool::new(server)?, + col_cache: col_cache.into(), + row_cache: row_cache.into(), + env: env.into(), + server: server.clone(), + })) + } +} + +impl Drop for Context { + #[cold] + fn drop(&mut self) { + debug!("Closing frontend pool"); + self.pool.close(); + + let mut env = self.env.lock().expect("locked"); + + debug!("Shutting down background threads"); + env.set_high_priority_background_threads(0); + env.set_low_priority_background_threads(0); + env.set_bottom_priority_background_threads(0); + env.set_background_threads(0); + + debug!("Joining background threads..."); + env.join_all_threads(); + } +} diff --git a/src/database/engine/db_opts.rs b/src/database/engine/db_opts.rs new file mode 100644 index 00000000..211265de --- /dev/null +++ b/src/database/engine/db_opts.rs @@ -0,0 +1,133 @@ +use std::{cmp, convert::TryFrom}; + +use conduwuit::{utils, Config, Result}; +use rocksdb::{statistics::StatsLevel, Cache, DBRecoveryMode, Env, LogLevel, Options}; + +use super::{cf_opts::cache_size, logger::handle as handle_log}; + +/// Create database-wide options suitable for opening the database. This also +/// sets our default column options in case of opening a column with the same +/// resulting value. Note that we require special per-column options on some +/// columns, therefor columns should only be opened after passing this result +/// through cf_options(). +pub(crate) fn db_options(config: &Config, env: &Env, row_cache: &Cache) -> Result { + const DEFAULT_STATS_LEVEL: StatsLevel = if cfg!(debug_assertions) { + StatsLevel::ExceptDetailedTimers + } else { + StatsLevel::DisableAll + }; + + let mut opts = Options::default(); + + // Logging + set_logging_defaults(&mut opts, config); + + // Processing + opts.set_max_background_jobs(num_threads::(config)?); + opts.set_max_subcompactions(num_threads::(config)?); + opts.set_avoid_unnecessary_blocking_io(true); + opts.set_max_file_opening_threads(0); + + // IO + opts.set_atomic_flush(true); + opts.set_manual_wal_flush(true); + opts.set_enable_pipelined_write(false); + if config.rocksdb_direct_io { + opts.set_use_direct_reads(true); + opts.set_use_direct_io_for_flush_and_compaction(true); + } + if config.rocksdb_optimize_for_spinning_disks { + // speeds up opening DB on hard drives + opts.set_skip_checking_sst_file_sizes_on_db_open(true); + opts.set_skip_stats_update_on_db_open(true); + //opts.set_max_file_opening_threads(threads.try_into().unwrap()); + } + + // Blocks + opts.set_row_cache(row_cache); + + // Files + opts.set_table_cache_num_shard_bits(7); + opts.set_wal_size_limit_mb(1024 * 1024 * 1024); + opts.set_max_total_wal_size(1024 * 1024 * 512); + opts.set_db_write_buffer_size(cache_size(config, 1024 * 1024 * 32, 1)); + + // Misc + opts.set_disable_auto_compactions(!config.rocksdb_compaction); + opts.create_missing_column_families(true); + opts.create_if_missing(true); + + opts.set_statistics_level(match config.rocksdb_stats_level { + | 0 => StatsLevel::DisableAll, + | 1 => DEFAULT_STATS_LEVEL, + | 2 => StatsLevel::ExceptHistogramOrTimers, + | 3 => StatsLevel::ExceptTimers, + | 4 => StatsLevel::ExceptDetailedTimers, + | 5 => StatsLevel::ExceptTimeForMutex, + | 6_u8..=u8::MAX => StatsLevel::All, + }); + + opts.set_report_bg_io_stats(match config.rocksdb_stats_level { + | 0..=1 => false, + | 2_u8..=u8::MAX => true, + }); + + // Default: https://github.com/facebook/rocksdb/wiki/WAL-Recovery-Modes#ktoleratecorruptedtailrecords + // + // Unclean shutdowns of a Matrix homeserver are likely to be fine when + // recovered in this manner as it's likely any lost information will be + // restored via federation. + opts.set_wal_recovery_mode(match config.rocksdb_recovery_mode { + | 0 => DBRecoveryMode::AbsoluteConsistency, + | 1 => DBRecoveryMode::TolerateCorruptedTailRecords, + | 2 => DBRecoveryMode::PointInTime, + | 3 => DBRecoveryMode::SkipAnyCorruptedRecord, + | 4_u8..=u8::MAX => unimplemented!(), + }); + + // + // "We recommend to set track_and_verify_wals_in_manifest to true for + // production, it has been enabled in production for the entire database cluster + // serving the social graph for all Meta apps." + opts.set_track_and_verify_wals_in_manifest(true); + + opts.set_paranoid_checks(config.rocksdb_paranoid_file_checks); + + opts.set_env(env); + + Ok(opts) +} + +fn set_logging_defaults(opts: &mut Options, config: &Config) { + let rocksdb_log_level = match config.rocksdb_log_level.as_ref() { + | "debug" => LogLevel::Debug, + | "info" => LogLevel::Info, + | "warn" => LogLevel::Warn, + | "fatal" => LogLevel::Fatal, + | _ => LogLevel::Error, + }; + + opts.set_log_level(rocksdb_log_level); + opts.set_max_log_file_size(config.rocksdb_max_log_file_size); + opts.set_log_file_time_to_roll(config.rocksdb_log_time_to_roll); + opts.set_keep_log_file_num(config.rocksdb_max_log_files); + opts.set_stats_dump_period_sec(0); + + if config.rocksdb_log_stderr { + opts.set_stderr_logger(rocksdb_log_level, "rocksdb"); + } else { + opts.set_callback_logger(rocksdb_log_level, &handle_log); + } +} + +fn num_threads>(config: &Config) -> Result { + const MIN_PARALLELISM: usize = 2; + + let requested = if config.rocksdb_parallelism_threads != 0 { + config.rocksdb_parallelism_threads + } else { + utils::available_parallelism() + }; + + utils::math::try_into::(cmp::max(MIN_PARALLELISM, requested)) +} diff --git a/src/database/engine/descriptor.rs b/src/database/engine/descriptor.rs new file mode 100644 index 00000000..f0fd83f1 --- /dev/null +++ b/src/database/engine/descriptor.rs @@ -0,0 +1,89 @@ +use conduwuit::utils::string::EMPTY; +use rocksdb::{ + DBCompactionPri as CompactionPri, DBCompactionStyle as CompactionStyle, + DBCompressionType as CompressionType, +}; + +#[derive(Debug, Clone, Copy)] +pub(crate) enum CacheDisp { + Unique, + Shared, + SharedWith(&'static str), +} + +#[derive(Debug, Clone)] +pub(crate) struct Descriptor { + pub(crate) name: &'static str, + pub(crate) cache_disp: CacheDisp, + pub(crate) key_size_hint: Option, + pub(crate) val_size_hint: Option, + pub(crate) block_size: usize, + pub(crate) index_size: usize, + pub(crate) write_size: Option, + pub(crate) cache_size: usize, + pub(crate) level_size: u64, + pub(crate) level_shape: [i32; 7], + pub(crate) file_size: u64, + pub(crate) file_shape: [i32; 1], + pub(crate) level0_width: i32, + pub(crate) merge_width: (i32, i32), + pub(crate) ttl: u64, + pub(crate) compaction: CompactionStyle, + pub(crate) compaction_pri: CompactionPri, + pub(crate) compression: CompressionType, + pub(crate) compression_level: i32, + pub(crate) bottommost_level: Option, + pub(crate) block_index_hashing: bool, + pub(crate) cache_shards: u32, +} + +pub(crate) static BASE: Descriptor = Descriptor { + name: EMPTY, + cache_disp: CacheDisp::Shared, + key_size_hint: None, + val_size_hint: None, + block_size: 1024 * 4, + index_size: 1024 * 4, + write_size: None, + cache_size: 1024 * 1024 * 4, + level_size: 1024 * 1024 * 8, + level_shape: [1, 1, 1, 3, 7, 15, 31], + file_size: 1024 * 1024, + file_shape: [2], + level0_width: 2, + merge_width: (2, 16), + ttl: 60 * 60 * 24 * 21, + compaction: CompactionStyle::Level, + compaction_pri: CompactionPri::MinOverlappingRatio, + compression: CompressionType::Zstd, + compression_level: 32767, + bottommost_level: Some(32767), + block_index_hashing: false, + cache_shards: 64, +}; + +pub(crate) static RANDOM: Descriptor = Descriptor { + compaction_pri: CompactionPri::OldestSmallestSeqFirst, + ..BASE +}; + +pub(crate) static SEQUENTIAL: Descriptor = Descriptor { + compaction_pri: CompactionPri::OldestLargestSeqFirst, + level_size: 1024 * 1024 * 32, + file_size: 1024 * 1024 * 2, + ..BASE +}; + +pub(crate) static RANDOM_SMALL: Descriptor = Descriptor { + compaction: CompactionStyle::Universal, + level_size: 1024 * 512, + file_size: 1024 * 128, + ..RANDOM +}; + +pub(crate) static SEQUENTIAL_SMALL: Descriptor = Descriptor { + compaction: CompactionStyle::Universal, + level_size: 1024 * 1024, + file_size: 1024 * 512, + ..SEQUENTIAL +}; diff --git a/src/database/engine/files.rs b/src/database/engine/files.rs new file mode 100644 index 00000000..f603c57b --- /dev/null +++ b/src/database/engine/files.rs @@ -0,0 +1,32 @@ +use std::fmt::Write; + +use conduwuit::{implement, Result}; + +use super::Engine; + +#[implement(Engine)] +pub fn file_list(&self) -> Result { + match self.db.live_files() { + | Err(e) => Ok(String::from(e)), + | Ok(mut files) => { + files.sort_by_key(|f| f.name.clone()); + let mut res = String::new(); + writeln!(res, "| lev | sst | keys | dels | size | column |")?; + writeln!(res, "| ---: | :--- | ---: | ---: | ---: | :--- |")?; + for file in files { + writeln!( + res, + "| {} | {:<13} | {:7}+ | {:4}- | {:9} | {} |", + file.level, + file.name, + file.num_entries, + file.num_deletions, + file.size, + file.column_family_name, + )?; + } + + Ok(res) + }, + } +} diff --git a/src/database/engine/logger.rs b/src/database/engine/logger.rs new file mode 100644 index 00000000..a1898e30 --- /dev/null +++ b/src/database/engine/logger.rs @@ -0,0 +1,22 @@ +use conduwuit::{debug, error, warn}; +use rocksdb::LogLevel; + +#[tracing::instrument( + parent = None, + name = "rocksdb", + level = "trace" + skip(msg), +)] +pub(crate) fn handle(level: LogLevel, msg: &str) { + let msg = msg.trim(); + if msg.starts_with("Options") { + return; + } + + match level { + | LogLevel::Header | LogLevel::Debug => debug!("{msg}"), + | LogLevel::Error | LogLevel::Fatal => error!("{msg}"), + | LogLevel::Info => debug!("{msg}"), + | LogLevel::Warn => warn!("{msg}"), + }; +} diff --git a/src/database/engine/memory_usage.rs b/src/database/engine/memory_usage.rs new file mode 100644 index 00000000..01859815 --- /dev/null +++ b/src/database/engine/memory_usage.rs @@ -0,0 +1,30 @@ +use std::fmt::Write; + +use conduwuit::{implement, Result}; +use rocksdb::perf::get_memory_usage_stats; + +use super::Engine; +use crate::or_else; + +#[implement(Engine)] +pub fn memory_usage(&self) -> Result { + let mut res = String::new(); + let stats = get_memory_usage_stats(Some(&[&self.db]), Some(&[&*self.ctx.row_cache.lock()?])) + .or_else(or_else)?; + let mibs = |input| f64::from(u32::try_from(input / 1024).unwrap_or(0)) / 1024.0; + writeln!( + res, + "Memory buffers: {:.2} MiB\nPending write: {:.2} MiB\nTable readers: {:.2} MiB\nRow \ + cache: {:.2} MiB", + mibs(stats.mem_table_total), + mibs(stats.mem_table_unflushed), + mibs(stats.mem_table_readers_total), + mibs(u64::try_from(self.ctx.row_cache.lock()?.get_usage())?), + )?; + + for (name, cache) in &*self.ctx.col_cache.lock()? { + writeln!(res, "{name} cache: {:.2} MiB", mibs(u64::try_from(cache.get_usage())?))?; + } + + Ok(res) +} diff --git a/src/database/engine/open.rs b/src/database/engine/open.rs new file mode 100644 index 00000000..9999296b --- /dev/null +++ b/src/database/engine/open.rs @@ -0,0 +1,121 @@ +use std::{ + collections::BTreeSet, + path::Path, + sync::{atomic::AtomicU32, Arc}, +}; + +use conduwuit::{debug, debug_warn, implement, info, warn, Result}; +use rocksdb::{ColumnFamilyDescriptor, Options}; + +use super::{ + cf_opts::cf_options, db_opts::db_options, descriptor::Descriptor, repair::repair, Db, Engine, +}; +use crate::{or_else, Context}; + +#[implement(Engine)] +#[tracing::instrument(skip_all)] +pub(crate) async fn open(ctx: Arc, desc: &[Descriptor]) -> Result> { + let server = &ctx.server; + let config = &server.config; + let path = &config.database_path; + + let db_opts = db_options( + config, + &ctx.env.lock().expect("environment locked"), + &ctx.row_cache.lock().expect("row cache locked"), + )?; + + let cfds = Self::configure_cfds(&ctx, &db_opts, desc)?; + let num_cfds = cfds.len(); + debug!("Configured {num_cfds} column descriptors..."); + + let load_time = std::time::Instant::now(); + if config.rocksdb_repair { + repair(&db_opts, &config.database_path)?; + } + + debug!("Opening database..."); + let db = if config.rocksdb_read_only { + Db::open_cf_descriptors_read_only(&db_opts, path, cfds, false) + } else if config.rocksdb_secondary { + Db::open_cf_descriptors_as_secondary(&db_opts, path, path, cfds) + } else { + Db::open_cf_descriptors(&db_opts, path, cfds) + } + .or_else(or_else)?; + + info!( + columns = num_cfds, + sequence = %db.latest_sequence_number(), + time = ?load_time.elapsed(), + "Opened database." + ); + + Ok(Arc::new(Self { + read_only: config.rocksdb_read_only, + secondary: config.rocksdb_secondary, + corks: AtomicU32::new(0), + pool: ctx.pool.clone(), + db, + ctx, + })) +} + +#[implement(Engine)] +#[tracing::instrument(name = "configure", skip_all)] +fn configure_cfds( + ctx: &Arc, + db_opts: &Options, + desc: &[Descriptor], +) -> Result> { + let server = &ctx.server; + let config = &server.config; + let path = &config.database_path; + let existing = Self::discover_cfs(path, db_opts); + debug!( + "Found {} existing columns; have {} described columns", + existing.len(), + desc.len() + ); + + existing + .iter() + .filter(|&name| name != "default") + .filter(|&name| !desc.iter().any(|desc| desc.name == name)) + .for_each(|name| { + debug_warn!("Found unknown column {name:?} in database which will not be opened."); + }); + + desc.iter() + .filter(|desc| !existing.contains(desc.name)) + .for_each(|desc| { + debug!( + "Creating new column {:?} which was not found in the existing database.", + desc.name, + ); + }); + + let cfopts: Vec<_> = desc + .iter() + .map(|desc| cf_options(ctx, db_opts.clone(), desc)) + .collect::>()?; + + let cfds: Vec<_> = desc + .iter() + .map(|desc| desc.name) + .map(ToOwned::to_owned) + .zip(cfopts.into_iter()) + .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)) + .collect(); + + Ok(cfds) +} + +#[implement(Engine)] +#[tracing::instrument(name = "discover", skip_all)] +fn discover_cfs(path: &Path, opts: &Options) -> BTreeSet { + Db::list_cf(opts, path) + .unwrap_or_default() + .into_iter() + .collect::>() +} diff --git a/src/database/engine/repair.rs b/src/database/engine/repair.rs new file mode 100644 index 00000000..61283904 --- /dev/null +++ b/src/database/engine/repair.rs @@ -0,0 +1,16 @@ +use std::path::PathBuf; + +use conduwuit::{info, warn, Err, Result}; +use rocksdb::Options; + +use super::Db; + +pub(crate) fn repair(db_opts: &Options, path: &PathBuf) -> Result { + warn!("Starting database repair. This may take a long time..."); + match Db::repair(db_opts, path) { + | Ok(()) => info!("Database repair successful."), + | Err(e) => return Err!("Repair failed: {e:?}"), + } + + Ok(()) +} diff --git a/src/database/map.rs b/src/database/map.rs index de37b8f9..60d66585 100644 --- a/src/database/map.rs +++ b/src/database/map.rs @@ -6,6 +6,8 @@ mod insert; mod keys; mod keys_from; mod keys_prefix; +mod open; +mod options; mod remove; mod rev_keys; mod rev_keys_from; @@ -28,12 +30,15 @@ use std::{ }; use conduwuit::Result; -use rocksdb::{AsColumnFamilyRef, ColumnFamily, ReadOptions, ReadTier, WriteOptions}; +use rocksdb::{AsColumnFamilyRef, ColumnFamily, ReadOptions, WriteOptions}; +pub(crate) use self::options::{ + cache_read_options_default, iter_options_default, read_options_default, write_options_default, +}; use crate::{watchers::Watchers, Engine}; pub struct Map { - name: String, + name: &'static str, db: Arc, cf: Arc, watchers: Watchers, @@ -43,11 +48,11 @@ pub struct Map { } impl Map { - pub(crate) fn open(db: &Arc, name: &str) -> Result> { + pub(crate) fn open(db: &Arc, name: &'static str) -> Result> { Ok(Arc::new(Self { - name: name.to_owned(), + name, db: db.clone(), - cf: open(db, name)?, + cf: open::open(db, name), watchers: Watchers::default(), write_options: write_options_default(), read_options: read_options_default(), @@ -75,7 +80,7 @@ impl Map { pub fn property(&self, name: &str) -> Result { self.db.property(&self.cf(), name) } #[inline] - pub fn name(&self) -> &str { &self.name } + pub fn name(&self) -> &str { self.name } #[inline] pub(crate) fn db(&self) -> &Arc { &self.db } @@ -93,60 +98,3 @@ impl Debug for Map { impl Display for Map { fn fmt(&self, out: &mut fmt::Formatter<'_>) -> fmt::Result { write!(out, "{0}", self.name) } } - -fn open(db: &Arc, name: &str) -> Result> { - let bounded_arc = db.open_cf(name)?; - let bounded_ptr = Arc::into_raw(bounded_arc); - let cf_ptr = bounded_ptr.cast::(); - - // SAFETY: Column family handles out of RocksDB are basic pointers and can - // be invalidated: 1. when the database closes. 2. when the column is dropped or - // closed. rust_rocksdb wraps this for us by storing handles in their own - // `RwLock` map and returning an Arc>` to - // provide expected safety. Similarly in "single-threaded mode" we would - // receive `&'_ ColumnFamily`. - // - // PROBLEM: We need to hold these handles in a field, otherwise we have to take - // a lock and get them by name from this map for every query, which is what - // conduit was doing, but we're not going to make a query for every query so we - // need to be holding it right. The lifetime parameter on these references makes - // that complicated. If this can be done without polluting the userspace - // with lifetimes on every instance of `Map` then this `unsafe` might not be - // necessary. - // - // SOLUTION: After investigating the underlying types it appears valid to - // Arc-swap `BoundColumnFamily<'_>` for `ColumnFamily`. They have the - // same inner data, the same Drop behavior, Deref, etc. We're just losing the - // lifetime parameter. We should not hold this handle, even in its Arc, after - // closing the database (dropping `Engine`). Since `Arc` is a sibling - // member along with this handle in `Map`, that is prevented. - Ok(unsafe { - Arc::increment_strong_count(cf_ptr); - Arc::from_raw(cf_ptr) - }) -} - -#[inline] -pub(crate) fn iter_options_default() -> ReadOptions { - let mut read_options = read_options_default(); - read_options.set_background_purge_on_iterator_cleanup(true); - //read_options.set_pin_data(true); - read_options -} - -#[inline] -pub(crate) fn cache_read_options_default() -> ReadOptions { - let mut read_options = read_options_default(); - read_options.set_read_tier(ReadTier::BlockCache); - read_options -} - -#[inline] -pub(crate) fn read_options_default() -> ReadOptions { - let mut read_options = ReadOptions::default(); - read_options.set_total_order_seek(true); - read_options -} - -#[inline] -pub(crate) fn write_options_default() -> WriteOptions { WriteOptions::default() } diff --git a/src/database/map/open.rs b/src/database/map/open.rs new file mode 100644 index 00000000..6ecec044 --- /dev/null +++ b/src/database/map/open.rs @@ -0,0 +1,37 @@ +use std::sync::Arc; + +use rocksdb::ColumnFamily; + +use crate::Engine; + +pub(super) fn open(db: &Arc, name: &str) -> Arc { + let bounded_arc = db.cf(name); + let bounded_ptr = Arc::into_raw(bounded_arc); + let cf_ptr = bounded_ptr.cast::(); + + // SAFETY: Column family handles out of RocksDB are basic pointers and can + // be invalidated: 1. when the database closes. 2. when the column is dropped or + // closed. rust_rocksdb wraps this for us by storing handles in their own + // `RwLock` map and returning an Arc>` to + // provide expected safety. Similarly in "single-threaded mode" we would + // receive `&'_ ColumnFamily`. + // + // PROBLEM: We need to hold these handles in a field, otherwise we have to take + // a lock and get them by name from this map for every query, which is what + // conduit was doing, but we're not going to make a query for every query so we + // need to be holding it right. The lifetime parameter on these references makes + // that complicated. If this can be done without polluting the userspace + // with lifetimes on every instance of `Map` then this `unsafe` might not be + // necessary. + // + // SOLUTION: After investigating the underlying types it appears valid to + // Arc-swap `BoundColumnFamily<'_>` for `ColumnFamily`. They have the + // same inner data, the same Drop behavior, Deref, etc. We're just losing the + // lifetime parameter. We should not hold this handle, even in its Arc, after + // closing the database (dropping `Engine`). Since `Arc` is a sibling + // member along with this handle in `Map`, that is prevented. + unsafe { + Arc::increment_strong_count(cf_ptr); + Arc::from_raw(cf_ptr) + } +} diff --git a/src/database/map/options.rs b/src/database/map/options.rs new file mode 100644 index 00000000..90dc0261 --- /dev/null +++ b/src/database/map/options.rs @@ -0,0 +1,26 @@ +use rocksdb::{ReadOptions, ReadTier, WriteOptions}; + +#[inline] +pub(crate) fn iter_options_default() -> ReadOptions { + let mut read_options = read_options_default(); + read_options.set_background_purge_on_iterator_cleanup(true); + //read_options.set_pin_data(true); + read_options +} + +#[inline] +pub(crate) fn cache_read_options_default() -> ReadOptions { + let mut read_options = read_options_default(); + read_options.set_read_tier(ReadTier::BlockCache); + read_options +} + +#[inline] +pub(crate) fn read_options_default() -> ReadOptions { + let mut read_options = ReadOptions::default(); + read_options.set_total_order_seek(true); + read_options +} + +#[inline] +pub(crate) fn write_options_default() -> WriteOptions { WriteOptions::default() } diff --git a/src/database/maps.rs b/src/database/maps.rs index d69cc7fd..e9b26818 100644 --- a/src/database/maps.rs +++ b/src/database/maps.rs @@ -2,103 +2,383 @@ use std::{collections::BTreeMap, sync::Arc}; use conduwuit::Result; -use crate::{Engine, Map}; +use crate::{ + engine::descriptor::{self, CacheDisp, Descriptor}, + Engine, Map, +}; -pub type Maps = BTreeMap; -pub(crate) type MapsVal = Arc; -pub(crate) type MapsKey = String; +pub(super) type Maps = BTreeMap; +pub(super) type MapsKey = &'static str; +pub(super) type MapsVal = Arc; -pub(crate) fn open(db: &Arc) -> Result { open_list(db, MAPS) } +pub(super) fn open(db: &Arc) -> Result { open_list(db, MAPS) } #[tracing::instrument(name = "maps", level = "debug", skip_all)] -pub(crate) fn open_list(db: &Arc, maps: &[&str]) -> Result { - Ok(maps - .iter() - .map(|&name| (name.to_owned(), Map::open(db, name).expect("valid column opened"))) - .collect::()) +pub(super) fn open_list(db: &Arc, maps: &[Descriptor]) -> Result { + maps.iter() + .map(|desc| Ok((desc.name, Map::open(db, desc.name)?))) + .collect() } -pub const MAPS: &[&str] = &[ - "alias_roomid", - "alias_userid", - "aliasid_alias", - "backupid_algorithm", - "backupid_etag", - "backupkeyid_backup", - "bannedroomids", - "disabledroomids", - "eventid_outlierpdu", - "eventid_pduid", - "eventid_shorteventid", - "global", - "id_appserviceregistrations", - "keychangeid_userid", - "keyid_key", - "lazyloadedids", - "mediaid_file", - "mediaid_user", - "onetimekeyid_onetimekeys", - "pduid_pdu", - "presenceid_presence", - "publicroomids", - "readreceiptid_readreceipt", - "referencedevents", - "roomid_invitedcount", - "roomid_inviteviaservers", - "roomid_joinedcount", - "roomid_pduleaves", - "roomid_shortroomid", - "roomid_shortstatehash", - "roomserverids", - "roomsynctoken_shortstatehash", - "roomuserdataid_accountdata", - "roomuserid_invitecount", - "roomuserid_joined", - "roomuserid_lastprivatereadupdate", - "roomuserid_leftcount", - "roomuserid_privateread", - "roomuseroncejoinedids", - "roomusertype_roomuserdataid", - "senderkey_pusher", - "server_signingkeys", - "servercurrentevent_data", - "servername_educount", - "servernameevent_data", - "serverroomids", - "shorteventid_authchain", - "shorteventid_eventid", - "shorteventid_shortstatehash", - "shortstatehash_statediff", - "shortstatekey_statekey", - "softfailedeventids", - "statehash_shortstatehash", - "statekey_shortstatekey", - "threadid_userids", - "todeviceid_events", - "tofrom_relation", - "token_userdeviceid", - "tokenids", - "url_previews", - "userdeviceid_metadata", - "userdeviceid_token", - "userdevicesessionid_uiaainfo", - "userdevicetxnid_response", - "userfilterid_filter", - "userid_avatarurl", - "userid_blurhash", - "userid_devicelistversion", - "userid_displayname", - "userid_lastonetimekeyupdate", - "userid_masterkeyid", - "userid_password", - "userid_presenceid", - "userid_selfsigningkeyid", - "userid_usersigningkeyid", - "useridprofilekey_value", - "openidtoken_expiresatuserid", - "userroomid_highlightcount", - "userroomid_invitestate", - "userroomid_joined", - "userroomid_leftstate", - "userroomid_notificationcount", +pub(super) static MAPS: &[Descriptor] = &[ + Descriptor { + name: "alias_roomid", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "alias_userid", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "aliasid_alias", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "backupid_algorithm", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "backupid_etag", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "backupkeyid_backup", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "bannedroomids", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "disabledroomids", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "eventid_outlierpdu", + cache_disp: CacheDisp::SharedWith("pduid_pdu"), + key_size_hint: Some(48), + val_size_hint: Some(1488), + ..descriptor::RANDOM + }, + Descriptor { + name: "eventid_pduid", + cache_disp: CacheDisp::Unique, + key_size_hint: Some(48), + val_size_hint: Some(16), + ..descriptor::RANDOM + }, + Descriptor { + name: "eventid_shorteventid", + cache_disp: CacheDisp::Unique, + key_size_hint: Some(48), + val_size_hint: Some(8), + ..descriptor::RANDOM + }, + Descriptor { + name: "global", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "id_appserviceregistrations", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "keychangeid_userid", + ..descriptor::RANDOM + }, + Descriptor { + name: "keyid_key", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "lazyloadedids", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "mediaid_file", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "mediaid_user", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "onetimekeyid_onetimekeys", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "pduid_pdu", + cache_disp: CacheDisp::SharedWith("eventid_outlierpdu"), + key_size_hint: Some(16), + val_size_hint: Some(1520), + ..descriptor::SEQUENTIAL + }, + Descriptor { + name: "presenceid_presence", + ..descriptor::SEQUENTIAL_SMALL + }, + Descriptor { + name: "publicroomids", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "readreceiptid_readreceipt", + ..descriptor::RANDOM + }, + Descriptor { + name: "referencedevents", + ..descriptor::RANDOM + }, + Descriptor { + name: "roomid_invitedcount", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "roomid_inviteviaservers", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "roomid_joinedcount", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "roomid_pduleaves", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "roomid_shortroomid", + val_size_hint: Some(8), + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "roomid_shortstatehash", + val_size_hint: Some(8), + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "roomserverids", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "roomsynctoken_shortstatehash", + val_size_hint: Some(8), + ..descriptor::SEQUENTIAL + }, + Descriptor { + name: "roomuserdataid_accountdata", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "roomuserid_invitecount", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "roomuserid_joined", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "roomuserid_lastprivatereadupdate", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "roomuserid_leftcount", + ..descriptor::RANDOM + }, + Descriptor { + name: "roomuserid_privateread", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "roomuseroncejoinedids", + ..descriptor::RANDOM + }, + Descriptor { + name: "roomusertype_roomuserdataid", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "senderkey_pusher", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "server_signingkeys", + ..descriptor::RANDOM + }, + Descriptor { + name: "servercurrentevent_data", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "servername_educount", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "servernameevent_data", + cache_disp: CacheDisp::Unique, + val_size_hint: Some(128), + ..descriptor::RANDOM + }, + Descriptor { + name: "serverroomids", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "shorteventid_authchain", + cache_disp: CacheDisp::Unique, + key_size_hint: Some(8), + ..descriptor::SEQUENTIAL + }, + Descriptor { + name: "shorteventid_eventid", + cache_disp: CacheDisp::Unique, + key_size_hint: Some(8), + val_size_hint: Some(48), + ..descriptor::SEQUENTIAL_SMALL + }, + Descriptor { + name: "shorteventid_shortstatehash", + key_size_hint: Some(8), + val_size_hint: Some(8), + ..descriptor::SEQUENTIAL + }, + Descriptor { + name: "shortstatehash_statediff", + key_size_hint: Some(8), + ..descriptor::SEQUENTIAL_SMALL + }, + Descriptor { + name: "shortstatekey_statekey", + cache_disp: CacheDisp::Unique, + key_size_hint: Some(8), + val_size_hint: Some(1016), + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "softfailedeventids", + key_size_hint: Some(48), + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "statehash_shortstatehash", + val_size_hint: Some(8), + ..descriptor::RANDOM + }, + Descriptor { + name: "statekey_shortstatekey", + cache_disp: CacheDisp::Unique, + key_size_hint: Some(1016), + val_size_hint: Some(8), + ..descriptor::RANDOM + }, + Descriptor { + name: "threadid_userids", + ..descriptor::SEQUENTIAL_SMALL + }, + Descriptor { + name: "todeviceid_events", + ..descriptor::RANDOM + }, + Descriptor { + name: "tofrom_relation", + key_size_hint: Some(8), + val_size_hint: Some(8), + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "token_userdeviceid", + ..descriptor::RANDOM_SMALL + }, + Descriptor { name: "tokenids", ..descriptor::RANDOM }, + Descriptor { + name: "url_previews", + ..descriptor::RANDOM + }, + Descriptor { + name: "userdeviceid_metadata", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "userdeviceid_token", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "userdevicesessionid_uiaainfo", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "userdevicetxnid_response", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "userfilterid_filter", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "userid_avatarurl", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "userid_blurhash", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "userid_devicelistversion", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "userid_displayname", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "userid_lastonetimekeyupdate", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "userid_masterkeyid", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "userid_password", + ..descriptor::RANDOM + }, + Descriptor { + name: "userid_presenceid", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "userid_selfsigningkeyid", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "userid_usersigningkeyid", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "useridprofilekey_value", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "openidtoken_expiresatuserid", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "userroomid_highlightcount", + ..descriptor::RANDOM + }, + Descriptor { + name: "userroomid_invitestate", + ..descriptor::RANDOM_SMALL + }, + Descriptor { + name: "userroomid_joined", + ..descriptor::RANDOM + }, + Descriptor { + name: "userroomid_leftstate", + ..descriptor::RANDOM + }, + Descriptor { + name: "userroomid_notificationcount", + ..descriptor::RANDOM + }, ]; diff --git a/src/database/mod.rs b/src/database/mod.rs index bdb7d3ea..6e3f8c96 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,5 +1,11 @@ +extern crate conduwuit_core as conduwuit; +extern crate rust_rocksdb as rocksdb; + +conduwuit::mod_ctor! {} +conduwuit::mod_dtor! {} +conduwuit::rustc_flags_capture! {} + mod cork; -mod database; mod de; mod deserialized; mod engine; @@ -7,7 +13,6 @@ mod handle; pub mod keyval; mod map; pub mod maps; -mod opts; mod pool; mod ser; mod stream; @@ -16,16 +21,11 @@ mod tests; pub(crate) mod util; mod watchers; -pub(crate) use self::{ - engine::Engine, - util::{or_else, result}, -}; +use std::{ops::Index, sync::Arc}; -extern crate conduwuit_core as conduwuit; -extern crate rust_rocksdb as rocksdb; +use conduwuit::{err, Result, Server}; pub use self::{ - database::Database, de::{Ignore, IgnoreAll}, deserialized::Deserialized, handle::Handle, @@ -33,7 +33,60 @@ pub use self::{ map::Map, ser::{serialize, serialize_to, serialize_to_vec, Interfix, Json, Separator, SEP}, }; +pub(crate) use self::{ + engine::{context::Context, Engine}, + util::{or_else, result}, +}; +use crate::maps::{Maps, MapsKey, MapsVal}; -conduwuit::mod_ctor! {} -conduwuit::mod_dtor! {} -conduwuit::rustc_flags_capture! {} +pub struct Database { + maps: Maps, + pub db: Arc, + pub(crate) _ctx: Arc, +} + +impl Database { + /// Load an existing database or create a new one. + pub async fn open(server: &Arc) -> Result> { + let ctx = Context::new(server)?; + let db = Engine::open(ctx.clone(), maps::MAPS).await?; + Ok(Arc::new(Self { + maps: maps::open(&db)?, + db: db.clone(), + _ctx: ctx, + })) + } + + #[inline] + pub fn get(&self, name: &str) -> Result<&Arc> { + self.maps + .get(name) + .ok_or_else(|| err!(Request(NotFound("column not found")))) + } + + #[inline] + pub fn iter(&self) -> impl Iterator + Send + '_ { + self.maps.iter() + } + + #[inline] + pub fn keys(&self) -> impl Iterator + Send + '_ { self.maps.keys() } + + #[inline] + #[must_use] + pub fn is_read_only(&self) -> bool { self.db.is_read_only() } + + #[inline] + #[must_use] + pub fn is_secondary(&self) -> bool { self.db.is_secondary() } +} + +impl Index<&str> for Database { + type Output = Arc; + + fn index(&self, name: &str) -> &Self::Output { + self.maps + .get(name) + .expect("column in database does not exist") + } +} diff --git a/src/database/opts.rs b/src/database/opts.rs deleted file mode 100644 index 24128f14..00000000 --- a/src/database/opts.rs +++ /dev/null @@ -1,433 +0,0 @@ -use std::{cmp, collections::HashMap, convert::TryFrom}; - -use conduwuit::{err, utils, Config, Result}; -use rocksdb::{ - statistics::StatsLevel, BlockBasedOptions, Cache, DBCompactionStyle, DBCompressionType, - DBRecoveryMode, Env, LogLevel, LruCacheOptions, Options, UniversalCompactOptions, - UniversalCompactionStopStyle, -}; - -/// Create database-wide options suitable for opening the database. This also -/// sets our default column options in case of opening a column with the same -/// resulting value. Note that we require special per-column options on some -/// columns, therefor columns should only be opened after passing this result -/// through cf_options(). -pub(crate) fn db_options( - config: &Config, - env: &mut Env, - row_cache: &Cache, - col_cache: &Cache, -) -> Result { - const DEFAULT_STATS_LEVEL: StatsLevel = if cfg!(debug_assertions) { - StatsLevel::ExceptDetailedTimers - } else { - StatsLevel::DisableAll - }; - - let mut opts = Options::default(); - - // Logging - set_logging_defaults(&mut opts, config); - - // Processing - opts.set_max_background_jobs(num_threads::(config)?); - opts.set_max_subcompactions(num_threads::(config)?); - opts.set_avoid_unnecessary_blocking_io(true); - opts.set_max_file_opening_threads(0); - if config.rocksdb_compaction_prio_idle { - env.lower_thread_pool_cpu_priority(); - } - - // IO - opts.set_atomic_flush(true); - opts.set_manual_wal_flush(true); - opts.set_enable_pipelined_write(false); - if config.rocksdb_direct_io { - opts.set_use_direct_reads(true); - opts.set_use_direct_io_for_flush_and_compaction(true); - } - if config.rocksdb_optimize_for_spinning_disks { - // speeds up opening DB on hard drives - opts.set_skip_checking_sst_file_sizes_on_db_open(true); - opts.set_skip_stats_update_on_db_open(true); - //opts.set_max_file_opening_threads(threads.try_into().unwrap()); - } - if config.rocksdb_compaction_ioprio_idle { - env.lower_thread_pool_io_priority(); - } - - // Blocks - let mut table_opts = table_options(config); - table_opts.set_block_cache(col_cache); - opts.set_block_based_table_factory(&table_opts); - opts.set_row_cache(row_cache); - - // Buffers - opts.set_write_buffer_size(2 * 1024 * 1024); - opts.set_max_write_buffer_number(2); - opts.set_min_write_buffer_number(1); - - // Files - opts.set_table_cache_num_shard_bits(7); - opts.set_max_total_wal_size(96 * 1024 * 1024); - set_level_defaults(&mut opts, config); - - // Compression - set_compression_defaults(&mut opts, config); - - // Misc - opts.create_if_missing(true); - opts.set_disable_auto_compactions(!config.rocksdb_compaction); - - opts.set_statistics_level(match config.rocksdb_stats_level { - | 0 => StatsLevel::DisableAll, - | 1 => DEFAULT_STATS_LEVEL, - | 2 => StatsLevel::ExceptHistogramOrTimers, - | 3 => StatsLevel::ExceptTimers, - | 4 => StatsLevel::ExceptDetailedTimers, - | 5 => StatsLevel::ExceptTimeForMutex, - | 6_u8..=u8::MAX => StatsLevel::All, - }); - - // Default: https://github.com/facebook/rocksdb/wiki/WAL-Recovery-Modes#ktoleratecorruptedtailrecords - // - // Unclean shutdowns of a Matrix homeserver are likely to be fine when - // recovered in this manner as it's likely any lost information will be - // restored via federation. - opts.set_wal_recovery_mode(match config.rocksdb_recovery_mode { - | 0 => DBRecoveryMode::AbsoluteConsistency, - | 1 => DBRecoveryMode::TolerateCorruptedTailRecords, - | 2 => DBRecoveryMode::PointInTime, - | 3 => DBRecoveryMode::SkipAnyCorruptedRecord, - | 4_u8..=u8::MAX => unimplemented!(), - }); - - // - // "We recommend to set track_and_verify_wals_in_manifest to true for - // production, it has been enabled in production for the entire database cluster - // serving the social graph for all Meta apps." - opts.set_track_and_verify_wals_in_manifest(true); - - opts.set_paranoid_checks(config.rocksdb_paranoid_file_checks); - - opts.set_env(env); - Ok(opts) -} - -/// Adjust options for the specific column by name. Provide the result of -/// db_options() as the argument to this function and use the return value in -/// the arguments to open the specific column. -pub(crate) fn cf_options( - cfg: &Config, - name: &str, - mut opts: Options, - cache: &mut HashMap, -) -> Result { - // Columns with non-default compaction options - match name { - | "backupid_algorithm" - | "backupid_etag" - | "backupkeyid_backup" - | "roomid_shortroomid" - | "shorteventid_shortstatehash" - | "shorteventid_eventid" - | "shortstatekey_statekey" - | "shortstatehash_statediff" - | "userdevicetxnid_response" - | "userfilterid_filter" => set_for_sequential_small_uc(&mut opts, cfg), - | &_ => {}, - } - - // Columns with non-default table/cache configs - match name { - | "shorteventid_eventid" => set_table_with_new_cache( - &mut opts, - cfg, - cache, - name, - cache_size(cfg, cfg.shorteventid_cache_capacity, 64)?, - ), - - | "eventid_shorteventid" => set_table_with_new_cache( - &mut opts, - cfg, - cache, - name, - cache_size(cfg, cfg.eventidshort_cache_capacity, 64)?, - ), - - | "eventid_pduid" => set_table_with_new_cache( - &mut opts, - cfg, - cache, - name, - cache_size(cfg, cfg.eventid_pdu_cache_capacity, 64)?, - ), - - | "shorteventid_authchain" => { - set_table_with_new_cache( - &mut opts, - cfg, - cache, - name, - cache_size(cfg, cfg.auth_chain_cache_capacity, 192)?, - ); - }, - - | "shortstatekey_statekey" => set_table_with_new_cache( - &mut opts, - cfg, - cache, - name, - cache_size(cfg, cfg.shortstatekey_cache_capacity, 1024)?, - ), - - | "statekey_shortstatekey" => set_table_with_new_cache( - &mut opts, - cfg, - cache, - name, - cache_size(cfg, cfg.statekeyshort_cache_capacity, 1024)?, - ), - - | "servernameevent_data" => set_table_with_new_cache( - &mut opts, - cfg, - cache, - name, - cache_size(cfg, cfg.servernameevent_data_cache_capacity, 128)?, /* Raw average - * value size = - * 102, key - * size = 34 */ - ), - - | "eventid_outlierpdu" => { - set_table_with_new_cache( - &mut opts, - cfg, - cache, - name, - cache_size(cfg, cfg.pdu_cache_capacity, 1536)?, - ); - }, - - | "pduid_pdu" => { - set_table_with_shared_cache(&mut opts, cfg, cache, name, "eventid_outlierpdu"); - }, - - | &_ => {}, - } - - Ok(opts) -} - -fn set_logging_defaults(opts: &mut Options, config: &Config) { - let rocksdb_log_level = match config.rocksdb_log_level.as_ref() { - | "debug" => LogLevel::Debug, - | "info" => LogLevel::Info, - | "warn" => LogLevel::Warn, - | "fatal" => LogLevel::Fatal, - | _ => LogLevel::Error, - }; - - opts.set_log_level(rocksdb_log_level); - opts.set_max_log_file_size(config.rocksdb_max_log_file_size); - opts.set_log_file_time_to_roll(config.rocksdb_log_time_to_roll); - opts.set_keep_log_file_num(config.rocksdb_max_log_files); - opts.set_stats_dump_period_sec(0); - - if config.rocksdb_log_stderr { - opts.set_stderr_logger(rocksdb_log_level, "rocksdb"); - } else { - opts.set_callback_logger(rocksdb_log_level, &super::engine::handle_log); - } -} - -fn set_compression_defaults(opts: &mut Options, config: &Config) { - let rocksdb_compression_algo = match config.rocksdb_compression_algo.as_ref() { - | "snappy" => DBCompressionType::Snappy, - | "zlib" => DBCompressionType::Zlib, - | "bz2" => DBCompressionType::Bz2, - | "lz4" => DBCompressionType::Lz4, - | "lz4hc" => DBCompressionType::Lz4hc, - | "none" => DBCompressionType::None, - | _ => DBCompressionType::Zstd, - }; - - if config.rocksdb_bottommost_compression { - opts.set_bottommost_compression_type(rocksdb_compression_algo); - opts.set_bottommost_zstd_max_train_bytes(0, true); - - // -14 w_bits is only read by zlib. - opts.set_bottommost_compression_options( - -14, - config.rocksdb_bottommost_compression_level, - 0, - 0, - true, - ); - } - - // -14 w_bits is only read by zlib. - opts.set_compression_options(-14, config.rocksdb_compression_level, 0, 0); - opts.set_compression_type(rocksdb_compression_algo); -} - -#[allow(dead_code)] -fn set_for_random_small_uc(opts: &mut Options, config: &Config) { - let uco = uc_options(config); - set_for_random_small(opts, config); - opts.set_universal_compaction_options(&uco); - opts.set_compaction_style(DBCompactionStyle::Universal); -} - -fn set_for_sequential_small_uc(opts: &mut Options, config: &Config) { - let uco = uc_options(config); - set_for_sequential_small(opts, config); - opts.set_universal_compaction_options(&uco); - opts.set_compaction_style(DBCompactionStyle::Universal); -} - -#[allow(dead_code)] -fn set_for_random_small(opts: &mut Options, config: &Config) { - set_for_random(opts, config); - - opts.set_write_buffer_size(1024 * 128); - opts.set_target_file_size_base(1024 * 128); - opts.set_target_file_size_multiplier(2); - opts.set_max_bytes_for_level_base(1024 * 512); - opts.set_max_bytes_for_level_multiplier(2.0); -} - -fn set_for_sequential_small(opts: &mut Options, config: &Config) { - set_for_sequential(opts, config); - - opts.set_write_buffer_size(1024 * 512); - opts.set_target_file_size_base(1024 * 512); - opts.set_target_file_size_multiplier(2); - opts.set_max_bytes_for_level_base(1024 * 1024); - opts.set_max_bytes_for_level_multiplier(2.0); -} - -fn set_for_random(opts: &mut Options, config: &Config) { - set_level_defaults(opts, config); - - let pri = "compaction_pri=kOldestSmallestSeqFirst"; - opts.set_options_from_string(pri) - .expect("set compaction priority string"); - - opts.set_max_bytes_for_level_base(8 * 1024 * 1024); - opts.set_max_bytes_for_level_multiplier(1.0); - opts.set_max_bytes_for_level_multiplier_additional(&[0, 1, 1, 3, 7, 15, 31]); -} - -fn set_for_sequential(opts: &mut Options, config: &Config) { - set_level_defaults(opts, config); - - let pri = "compaction_pri=kOldestLargestSeqFirst"; - opts.set_options_from_string(pri) - .expect("set compaction priority string"); - - opts.set_target_file_size_base(2 * 1024 * 1024); - opts.set_target_file_size_multiplier(2); - - opts.set_max_bytes_for_level_base(32 * 1024 * 1024); - opts.set_max_bytes_for_level_multiplier(1.0); - opts.set_max_bytes_for_level_multiplier_additional(&[0, 1, 1, 3, 7, 15, 31]); -} - -fn set_level_defaults(opts: &mut Options, _config: &Config) { - opts.set_level_zero_file_num_compaction_trigger(2); - - opts.set_target_file_size_base(1024 * 1024); - opts.set_target_file_size_multiplier(2); - - opts.set_level_compaction_dynamic_level_bytes(false); - opts.set_max_bytes_for_level_base(16 * 1024 * 1024); - opts.set_max_bytes_for_level_multiplier(2.0); - - opts.set_ttl(21 * 24 * 60 * 60); -} - -fn uc_options(_config: &Config) -> UniversalCompactOptions { - let mut opts = UniversalCompactOptions::default(); - - opts.set_stop_style(UniversalCompactionStopStyle::Total); - opts.set_max_size_amplification_percent(10000); - opts.set_compression_size_percent(-1); - opts.set_size_ratio(1); - - opts.set_min_merge_width(2); - opts.set_max_merge_width(16); - - opts -} - -fn set_table_with_new_cache( - opts: &mut Options, - config: &Config, - caches: &mut HashMap, - name: &str, - size: usize, -) { - let mut cache_opts = LruCacheOptions::default(); - cache_opts.set_capacity(size); - cache_opts.set_num_shard_bits(7); - - let cache = Cache::new_lru_cache_opts(&cache_opts); - caches.insert(name.into(), cache); - - set_table_with_shared_cache(opts, config, caches, name, name); -} - -fn set_table_with_shared_cache( - opts: &mut Options, - config: &Config, - cache: &HashMap, - _name: &str, - cache_name: &str, -) { - let mut table = table_options(config); - table.set_block_cache( - cache - .get(cache_name) - .expect("existing cache to share with this column"), - ); - - opts.set_block_based_table_factory(&table); -} - -fn cache_size(config: &Config, base_size: u32, entity_size: usize) -> Result { - let ents = f64::from(base_size) * config.cache_capacity_modifier; - - #[allow(clippy::as_conversions, clippy::cast_sign_loss, clippy::cast_possible_truncation)] - (ents as usize) - .checked_mul(entity_size) - .ok_or_else(|| err!(Config("cache_capacity_modifier", "Cache size is too large."))) -} - -fn table_options(_config: &Config) -> BlockBasedOptions { - let mut opts = BlockBasedOptions::default(); - - opts.set_block_size(4 * 1024); - opts.set_metadata_block_size(4 * 1024); - - opts.set_use_delta_encoding(false); - opts.set_optimize_filters_for_memory(true); - opts.set_cache_index_and_filter_blocks(true); - opts.set_pin_top_level_index_and_filter(true); - - opts -} - -fn num_threads>(config: &Config) -> Result { - const MIN_PARALLELISM: usize = 2; - - let requested = if config.rocksdb_parallelism_threads != 0 { - config.rocksdb_parallelism_threads - } else { - utils::available_parallelism() - }; - - utils::math::try_into::(cmp::max(MIN_PARALLELISM, requested)) -} diff --git a/src/service/migrations.rs b/src/service/migrations.rs index adf75c0b..c42c0324 100644 --- a/src/service/migrations.rs +++ b/src/service/migrations.rs @@ -379,7 +379,7 @@ async fn fix_bad_double_separator_in_state_cache(services: &Services) -> Result< }) .await; - db.db.cleanup()?; + db.db.sort()?; db["global"].insert(b"fix_bad_double_separator_in_state_cache", []); info!("Finished fixing"); @@ -465,7 +465,7 @@ async fn retroactively_fix_bad_data_from_roomuserid_joined(services: &Services) .await; } - db.db.cleanup()?; + db.db.sort()?; db["global"].insert(b"retroactively_fix_bad_data_from_roomuserid_joined", []); info!("Finished fixing"); @@ -511,7 +511,7 @@ async fn fix_referencedevents_missing_sep(services: &Services) -> Result { info!(?total, ?fixed, "Fixed missing record separators in 'referencedevents'."); db["global"].insert(b"fix_referencedevents_missing_sep", []); - db.db.cleanup() + db.db.sort() } async fn fix_readreceiptid_readreceipt_duplicates(services: &Services) -> Result { @@ -561,5 +561,5 @@ async fn fix_readreceiptid_readreceipt_duplicates(services: &Services) -> Result info!(?total, ?fixed, "Fixed undeleted entries in readreceiptid_readreceipt."); db["global"].insert(b"fix_readreceiptid_readreceipt_duplicates", []); - db.db.cleanup() + db.db.sort() } diff --git a/src/service/services.rs b/src/service/services.rs index 9f9d10f5..c955834e 100644 --- a/src/service/services.rs +++ b/src/service/services.rs @@ -124,7 +124,7 @@ impl Services { .await?; // set the server user as online - if self.server.config.allow_local_presence { + if self.server.config.allow_local_presence && !self.db.is_read_only() { _ = self .presence .ping_presence(&self.globals.server_user, &ruma::presence::PresenceState::Online) @@ -139,7 +139,7 @@ impl Services { info!("Shutting down services..."); // set the server user as offline - if self.server.config.allow_local_presence { + if self.server.config.allow_local_presence && !self.db.is_read_only() { _ = self .presence .ping_presence(&self.globals.server_user, &ruma::presence::PresenceState::Offline)