From dda27ffcb1a6d9f1ff6dafebb6203cb9cb8c2f22 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sat, 18 Jan 2025 12:05:07 +0000 Subject: [PATCH] add some compaction related interfaces Signed-off-by: Jason Volk --- Cargo.toml | 1 + src/admin/mod.rs | 1 + src/admin/query/raw.rs | 104 ++++++++++++++++++++++++++++++++++-- src/database/engine.rs | 23 ++++++-- src/database/map.rs | 1 + src/database/map/compact.rs | 62 +++++++++++++++++++++ src/database/mod.rs | 4 +- 7 files changed, 188 insertions(+), 8 deletions(-) create mode 100644 src/database/map/compact.rs diff --git a/Cargo.toml b/Cargo.toml index 4d738a11..f9e3b6db 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -875,6 +875,7 @@ enum_glob_use = { level = "allow", priority = 1 } if_not_else = { level = "allow", priority = 1 } if_then_some_else_none = { level = "allow", priority = 1 } inline_always = { level = "allow", priority = 1 } +match_bool = { level = "allow", priority = 1 } missing_docs_in_private_items = { level = "allow", priority = 1 } missing_errors_doc = { level = "allow", priority = 1 } missing_panics_doc = { level = "allow", priority = 1 } diff --git a/src/admin/mod.rs b/src/admin/mod.rs index ac51104a..695155e8 100644 --- a/src/admin/mod.rs +++ b/src/admin/mod.rs @@ -1,6 +1,7 @@ #![recursion_limit = "192"] #![allow(clippy::wildcard_imports)] #![allow(clippy::enum_glob_use)] +#![allow(clippy::too_many_arguments)] pub(crate) mod admin; pub(crate) mod command; diff --git a/src/admin/query/raw.rs b/src/admin/query/raw.rs index 678d21c9..ac5e8976 100644 --- a/src/admin/query/raw.rs +++ b/src/admin/query/raw.rs @@ -2,13 +2,13 @@ use std::{borrow::Cow, collections::BTreeMap, ops::Deref}; use clap::Subcommand; use conduwuit::{ - apply, at, + apply, at, is_zero, utils::{ - stream::{ReadyExt, TryIgnore}, + stream::{ReadyExt, TryIgnore, TryParallelExt}, string::EMPTY, IterStream, }, - Result, + Err, Result, }; use futures::{FutureExt, StreamExt, TryStreamExt}; use ruma::events::room::message::RoomMessageEventContent; @@ -121,6 +121,104 @@ pub(crate) enum RawCommand { /// Key prefix prefix: Option, }, + + /// - Compact database + Compact { + #[arg(short, long, alias("column"))] + map: Option>, + + #[arg(long)] + start: Option, + + #[arg(long)] + stop: Option, + + #[arg(long)] + from: Option, + + #[arg(long)] + into: Option, + + /// There is one compaction job per column; then this controls how many + /// columns are compacted in parallel. If zero, one compaction job is + /// still run at a time here, but in exclusive-mode blocking any other + /// automatic compaction jobs until complete. + #[arg(long)] + parallelism: Option, + + #[arg(long, default_value("false"))] + exhaustive: bool, + }, +} + +#[admin_command] +pub(super) async fn compact( + &self, + map: Option>, + start: Option, + stop: Option, + from: Option, + into: Option, + parallelism: Option, + exhaustive: bool, +) -> Result { + use conduwuit_database::compact::Options; + + let default_all_maps = map + .is_none() + .then(|| { + self.services + .db + .keys() + .map(Deref::deref) + .map(ToOwned::to_owned) + }) + .into_iter() + .flatten(); + + let maps: Vec<_> = map + .unwrap_or_default() + .into_iter() + .chain(default_all_maps) + .map(|map| self.services.db.get(&map)) + .filter_map(Result::ok) + .cloned() + .collect(); + + if maps.is_empty() { + return Err!("--map argument invalid. not found in database"); + } + + let range = ( + start.as_ref().map(String::as_bytes).map(Into::into), + stop.as_ref().map(String::as_bytes).map(Into::into), + ); + + let options = Options { + range, + level: (from, into), + exclusive: parallelism.is_some_and(is_zero!()), + exhaustive, + }; + + let runtime = self.services.server.runtime().clone(); + let parallelism = parallelism.unwrap_or(1); + let results = maps + .into_iter() + .try_stream() + .paralleln_and_then(runtime, parallelism, move |map| { + map.compact_blocking(options.clone())?; + Ok(map.name().to_owned()) + }) + .collect::>(); + + let timer = Instant::now(); + let results = results.await; + let query_time = timer.elapsed(); + self.write_str(&format!("Jobs completed in {query_time:?}:\n\n```rs\n{results:#?}\n```")) + .await?; + + Ok(RoomMessageEventContent::text_plain("")) } #[admin_command] diff --git a/src/database/engine.rs b/src/database/engine.rs index 2958f73f..8be9eecc 100644 --- a/src/database/engine.rs +++ b/src/database/engine.rs @@ -18,9 +18,16 @@ use std::{ }; use conduwuit::{debug, info, warn, Err, Result}; -use rocksdb::{AsColumnFamilyRef, BoundColumnFamily, DBCommon, DBWithThreadMode, MultiThreaded}; +use rocksdb::{ + AsColumnFamilyRef, BoundColumnFamily, DBCommon, DBWithThreadMode, MultiThreaded, + WaitForCompactOptions, +}; -use crate::{pool::Pool, result, Context}; +use crate::{ + pool::Pool, + util::{map_err, result}, + Context, +}; pub struct Engine { pub(super) read_only: bool, @@ -55,12 +62,22 @@ impl Engine { #[tracing::instrument(skip(self), level = "debug")] pub fn flush(&self) -> Result { result(DBCommon::flush_wal(&self.db, false)) } - #[tracing::instrument(skip(self), level = "debug")] + #[tracing::instrument(skip(self), level = "info")] pub fn sort(&self) -> Result { let flushoptions = rocksdb::FlushOptions::default(); result(DBCommon::flush_opt(&self.db, &flushoptions)) } + #[tracing::instrument(skip(self), level = "info")] + pub fn wait_compactions(&self) -> Result { + let mut opts = WaitForCompactOptions::default(); + opts.set_abort_on_pause(true); + opts.set_flush(false); + opts.set_timeout(0); + + self.db.wait_for_compact(&opts).map_err(map_err) + } + /// Query for database property by null-terminated name which is expected to /// have a result with an integer representation. This is intended for /// low-overhead programmatic use. diff --git a/src/database/map.rs b/src/database/map.rs index 60d66585..33cae594 100644 --- a/src/database/map.rs +++ b/src/database/map.rs @@ -1,3 +1,4 @@ +pub mod compact; mod contains; mod count; mod get; diff --git a/src/database/map/compact.rs b/src/database/map/compact.rs new file mode 100644 index 00000000..c0381eb4 --- /dev/null +++ b/src/database/map/compact.rs @@ -0,0 +1,62 @@ +use conduwuit::{implement, Err, Result}; +use rocksdb::{BottommostLevelCompaction, CompactOptions}; + +use crate::keyval::KeyBuf; + +#[derive(Clone, Debug, Default)] +pub struct Options { + /// Key range to start and stop compaction. + pub range: (Option, Option), + + /// (None, None) - all levels to all necessary levels + /// (None, Some(1)) - compact all levels into level 1 + /// (Some(1), None) - compact level 1 into level 1 + /// (Some(_), Some(_) - currently unsupported + pub level: (Option, Option), + + /// run compaction until complete. if false only one pass is made, and the + /// results of that pass are not further recompacted. + pub exhaustive: bool, + + /// waits for other compactions to complete, then runs this compaction + /// exclusively before allowing automatic compactions to resume. + pub exclusive: bool, +} + +#[implement(super::Map)] +#[tracing::instrument( + name = "compact", + level = "info" + skip(self), + fields(%self), +)] +pub fn compact_blocking(&self, opts: Options) -> Result { + let mut co = CompactOptions::default(); + co.set_exclusive_manual_compaction(opts.exclusive); + co.set_bottommost_level_compaction(match opts.exhaustive { + | true => BottommostLevelCompaction::Force, + | false => BottommostLevelCompaction::ForceOptimized, + }); + + match opts.level { + | (None, None) => { + co.set_change_level(true); + co.set_target_level(-1); + }, + | (None, Some(level)) => { + co.set_change_level(true); + co.set_target_level(level.try_into()?); + }, + | (Some(level), None) => { + co.set_change_level(false); + co.set_target_level(level.try_into()?); + }, + | (Some(_), Some(_)) => return Err!("compacting between specific levels not supported"), + }; + + self.db + .db + .compact_range_cf_opt(&self.cf(), opts.range.0, opts.range.1, &co); + + Ok(()) +} diff --git a/src/database/mod.rs b/src/database/mod.rs index 6e3f8c96..8ae8dcf5 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -30,12 +30,12 @@ pub use self::{ deserialized::Deserialized, handle::Handle, keyval::{serialize_key, serialize_val, KeyVal, Slice}, - map::Map, + map::{compact, Map}, ser::{serialize, serialize_to, serialize_to_vec, Interfix, Json, Separator, SEP}, }; pub(crate) use self::{ engine::{context::Context, Engine}, - util::{or_else, result}, + util::or_else, }; use crate::maps::{Maps, MapsKey, MapsVal};