add some compaction related interfaces
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
8ab825b12c
commit
dda27ffcb1
7 changed files with 188 additions and 8 deletions
|
@ -875,6 +875,7 @@ enum_glob_use = { level = "allow", priority = 1 }
|
||||||
if_not_else = { level = "allow", priority = 1 }
|
if_not_else = { level = "allow", priority = 1 }
|
||||||
if_then_some_else_none = { level = "allow", priority = 1 }
|
if_then_some_else_none = { level = "allow", priority = 1 }
|
||||||
inline_always = { level = "allow", priority = 1 }
|
inline_always = { level = "allow", priority = 1 }
|
||||||
|
match_bool = { level = "allow", priority = 1 }
|
||||||
missing_docs_in_private_items = { level = "allow", priority = 1 }
|
missing_docs_in_private_items = { level = "allow", priority = 1 }
|
||||||
missing_errors_doc = { level = "allow", priority = 1 }
|
missing_errors_doc = { level = "allow", priority = 1 }
|
||||||
missing_panics_doc = { level = "allow", priority = 1 }
|
missing_panics_doc = { level = "allow", priority = 1 }
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
#![recursion_limit = "192"]
|
#![recursion_limit = "192"]
|
||||||
#![allow(clippy::wildcard_imports)]
|
#![allow(clippy::wildcard_imports)]
|
||||||
#![allow(clippy::enum_glob_use)]
|
#![allow(clippy::enum_glob_use)]
|
||||||
|
#![allow(clippy::too_many_arguments)]
|
||||||
|
|
||||||
pub(crate) mod admin;
|
pub(crate) mod admin;
|
||||||
pub(crate) mod command;
|
pub(crate) mod command;
|
||||||
|
|
|
@ -2,13 +2,13 @@ use std::{borrow::Cow, collections::BTreeMap, ops::Deref};
|
||||||
|
|
||||||
use clap::Subcommand;
|
use clap::Subcommand;
|
||||||
use conduwuit::{
|
use conduwuit::{
|
||||||
apply, at,
|
apply, at, is_zero,
|
||||||
utils::{
|
utils::{
|
||||||
stream::{ReadyExt, TryIgnore},
|
stream::{ReadyExt, TryIgnore, TryParallelExt},
|
||||||
string::EMPTY,
|
string::EMPTY,
|
||||||
IterStream,
|
IterStream,
|
||||||
},
|
},
|
||||||
Result,
|
Err, Result,
|
||||||
};
|
};
|
||||||
use futures::{FutureExt, StreamExt, TryStreamExt};
|
use futures::{FutureExt, StreamExt, TryStreamExt};
|
||||||
use ruma::events::room::message::RoomMessageEventContent;
|
use ruma::events::room::message::RoomMessageEventContent;
|
||||||
|
@ -121,6 +121,104 @@ pub(crate) enum RawCommand {
|
||||||
/// Key prefix
|
/// Key prefix
|
||||||
prefix: Option<String>,
|
prefix: Option<String>,
|
||||||
},
|
},
|
||||||
|
|
||||||
|
/// - Compact database
|
||||||
|
Compact {
|
||||||
|
#[arg(short, long, alias("column"))]
|
||||||
|
map: Option<Vec<String>>,
|
||||||
|
|
||||||
|
#[arg(long)]
|
||||||
|
start: Option<String>,
|
||||||
|
|
||||||
|
#[arg(long)]
|
||||||
|
stop: Option<String>,
|
||||||
|
|
||||||
|
#[arg(long)]
|
||||||
|
from: Option<usize>,
|
||||||
|
|
||||||
|
#[arg(long)]
|
||||||
|
into: Option<usize>,
|
||||||
|
|
||||||
|
/// There is one compaction job per column; then this controls how many
|
||||||
|
/// columns are compacted in parallel. If zero, one compaction job is
|
||||||
|
/// still run at a time here, but in exclusive-mode blocking any other
|
||||||
|
/// automatic compaction jobs until complete.
|
||||||
|
#[arg(long)]
|
||||||
|
parallelism: Option<usize>,
|
||||||
|
|
||||||
|
#[arg(long, default_value("false"))]
|
||||||
|
exhaustive: bool,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
#[admin_command]
|
||||||
|
pub(super) async fn compact(
|
||||||
|
&self,
|
||||||
|
map: Option<Vec<String>>,
|
||||||
|
start: Option<String>,
|
||||||
|
stop: Option<String>,
|
||||||
|
from: Option<usize>,
|
||||||
|
into: Option<usize>,
|
||||||
|
parallelism: Option<usize>,
|
||||||
|
exhaustive: bool,
|
||||||
|
) -> Result<RoomMessageEventContent> {
|
||||||
|
use conduwuit_database::compact::Options;
|
||||||
|
|
||||||
|
let default_all_maps = map
|
||||||
|
.is_none()
|
||||||
|
.then(|| {
|
||||||
|
self.services
|
||||||
|
.db
|
||||||
|
.keys()
|
||||||
|
.map(Deref::deref)
|
||||||
|
.map(ToOwned::to_owned)
|
||||||
|
})
|
||||||
|
.into_iter()
|
||||||
|
.flatten();
|
||||||
|
|
||||||
|
let maps: Vec<_> = map
|
||||||
|
.unwrap_or_default()
|
||||||
|
.into_iter()
|
||||||
|
.chain(default_all_maps)
|
||||||
|
.map(|map| self.services.db.get(&map))
|
||||||
|
.filter_map(Result::ok)
|
||||||
|
.cloned()
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
if maps.is_empty() {
|
||||||
|
return Err!("--map argument invalid. not found in database");
|
||||||
|
}
|
||||||
|
|
||||||
|
let range = (
|
||||||
|
start.as_ref().map(String::as_bytes).map(Into::into),
|
||||||
|
stop.as_ref().map(String::as_bytes).map(Into::into),
|
||||||
|
);
|
||||||
|
|
||||||
|
let options = Options {
|
||||||
|
range,
|
||||||
|
level: (from, into),
|
||||||
|
exclusive: parallelism.is_some_and(is_zero!()),
|
||||||
|
exhaustive,
|
||||||
|
};
|
||||||
|
|
||||||
|
let runtime = self.services.server.runtime().clone();
|
||||||
|
let parallelism = parallelism.unwrap_or(1);
|
||||||
|
let results = maps
|
||||||
|
.into_iter()
|
||||||
|
.try_stream()
|
||||||
|
.paralleln_and_then(runtime, parallelism, move |map| {
|
||||||
|
map.compact_blocking(options.clone())?;
|
||||||
|
Ok(map.name().to_owned())
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
|
let timer = Instant::now();
|
||||||
|
let results = results.await;
|
||||||
|
let query_time = timer.elapsed();
|
||||||
|
self.write_str(&format!("Jobs completed in {query_time:?}:\n\n```rs\n{results:#?}\n```"))
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(RoomMessageEventContent::text_plain(""))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[admin_command]
|
#[admin_command]
|
||||||
|
|
|
@ -18,9 +18,16 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use conduwuit::{debug, info, warn, Err, Result};
|
use conduwuit::{debug, info, warn, Err, Result};
|
||||||
use rocksdb::{AsColumnFamilyRef, BoundColumnFamily, DBCommon, DBWithThreadMode, MultiThreaded};
|
use rocksdb::{
|
||||||
|
AsColumnFamilyRef, BoundColumnFamily, DBCommon, DBWithThreadMode, MultiThreaded,
|
||||||
|
WaitForCompactOptions,
|
||||||
|
};
|
||||||
|
|
||||||
use crate::{pool::Pool, result, Context};
|
use crate::{
|
||||||
|
pool::Pool,
|
||||||
|
util::{map_err, result},
|
||||||
|
Context,
|
||||||
|
};
|
||||||
|
|
||||||
pub struct Engine {
|
pub struct Engine {
|
||||||
pub(super) read_only: bool,
|
pub(super) read_only: bool,
|
||||||
|
@ -55,12 +62,22 @@ impl Engine {
|
||||||
#[tracing::instrument(skip(self), level = "debug")]
|
#[tracing::instrument(skip(self), level = "debug")]
|
||||||
pub fn flush(&self) -> Result { result(DBCommon::flush_wal(&self.db, false)) }
|
pub fn flush(&self) -> Result { result(DBCommon::flush_wal(&self.db, false)) }
|
||||||
|
|
||||||
#[tracing::instrument(skip(self), level = "debug")]
|
#[tracing::instrument(skip(self), level = "info")]
|
||||||
pub fn sort(&self) -> Result {
|
pub fn sort(&self) -> Result {
|
||||||
let flushoptions = rocksdb::FlushOptions::default();
|
let flushoptions = rocksdb::FlushOptions::default();
|
||||||
result(DBCommon::flush_opt(&self.db, &flushoptions))
|
result(DBCommon::flush_opt(&self.db, &flushoptions))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(self), level = "info")]
|
||||||
|
pub fn wait_compactions(&self) -> Result {
|
||||||
|
let mut opts = WaitForCompactOptions::default();
|
||||||
|
opts.set_abort_on_pause(true);
|
||||||
|
opts.set_flush(false);
|
||||||
|
opts.set_timeout(0);
|
||||||
|
|
||||||
|
self.db.wait_for_compact(&opts).map_err(map_err)
|
||||||
|
}
|
||||||
|
|
||||||
/// Query for database property by null-terminated name which is expected to
|
/// Query for database property by null-terminated name which is expected to
|
||||||
/// have a result with an integer representation. This is intended for
|
/// have a result with an integer representation. This is intended for
|
||||||
/// low-overhead programmatic use.
|
/// low-overhead programmatic use.
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
pub mod compact;
|
||||||
mod contains;
|
mod contains;
|
||||||
mod count;
|
mod count;
|
||||||
mod get;
|
mod get;
|
||||||
|
|
62
src/database/map/compact.rs
Normal file
62
src/database/map/compact.rs
Normal file
|
@ -0,0 +1,62 @@
|
||||||
|
use conduwuit::{implement, Err, Result};
|
||||||
|
use rocksdb::{BottommostLevelCompaction, CompactOptions};
|
||||||
|
|
||||||
|
use crate::keyval::KeyBuf;
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Default)]
|
||||||
|
pub struct Options {
|
||||||
|
/// Key range to start and stop compaction.
|
||||||
|
pub range: (Option<KeyBuf>, Option<KeyBuf>),
|
||||||
|
|
||||||
|
/// (None, None) - all levels to all necessary levels
|
||||||
|
/// (None, Some(1)) - compact all levels into level 1
|
||||||
|
/// (Some(1), None) - compact level 1 into level 1
|
||||||
|
/// (Some(_), Some(_) - currently unsupported
|
||||||
|
pub level: (Option<usize>, Option<usize>),
|
||||||
|
|
||||||
|
/// run compaction until complete. if false only one pass is made, and the
|
||||||
|
/// results of that pass are not further recompacted.
|
||||||
|
pub exhaustive: bool,
|
||||||
|
|
||||||
|
/// waits for other compactions to complete, then runs this compaction
|
||||||
|
/// exclusively before allowing automatic compactions to resume.
|
||||||
|
pub exclusive: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[implement(super::Map)]
|
||||||
|
#[tracing::instrument(
|
||||||
|
name = "compact",
|
||||||
|
level = "info"
|
||||||
|
skip(self),
|
||||||
|
fields(%self),
|
||||||
|
)]
|
||||||
|
pub fn compact_blocking(&self, opts: Options) -> Result {
|
||||||
|
let mut co = CompactOptions::default();
|
||||||
|
co.set_exclusive_manual_compaction(opts.exclusive);
|
||||||
|
co.set_bottommost_level_compaction(match opts.exhaustive {
|
||||||
|
| true => BottommostLevelCompaction::Force,
|
||||||
|
| false => BottommostLevelCompaction::ForceOptimized,
|
||||||
|
});
|
||||||
|
|
||||||
|
match opts.level {
|
||||||
|
| (None, None) => {
|
||||||
|
co.set_change_level(true);
|
||||||
|
co.set_target_level(-1);
|
||||||
|
},
|
||||||
|
| (None, Some(level)) => {
|
||||||
|
co.set_change_level(true);
|
||||||
|
co.set_target_level(level.try_into()?);
|
||||||
|
},
|
||||||
|
| (Some(level), None) => {
|
||||||
|
co.set_change_level(false);
|
||||||
|
co.set_target_level(level.try_into()?);
|
||||||
|
},
|
||||||
|
| (Some(_), Some(_)) => return Err!("compacting between specific levels not supported"),
|
||||||
|
};
|
||||||
|
|
||||||
|
self.db
|
||||||
|
.db
|
||||||
|
.compact_range_cf_opt(&self.cf(), opts.range.0, opts.range.1, &co);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -30,12 +30,12 @@ pub use self::{
|
||||||
deserialized::Deserialized,
|
deserialized::Deserialized,
|
||||||
handle::Handle,
|
handle::Handle,
|
||||||
keyval::{serialize_key, serialize_val, KeyVal, Slice},
|
keyval::{serialize_key, serialize_val, KeyVal, Slice},
|
||||||
map::Map,
|
map::{compact, Map},
|
||||||
ser::{serialize, serialize_to, serialize_to_vec, Interfix, Json, Separator, SEP},
|
ser::{serialize, serialize_to, serialize_to_vec, Interfix, Json, Separator, SEP},
|
||||||
};
|
};
|
||||||
pub(crate) use self::{
|
pub(crate) use self::{
|
||||||
engine::{context::Context, Engine},
|
engine::{context::Context, Engine},
|
||||||
util::{or_else, result},
|
util::or_else,
|
||||||
};
|
};
|
||||||
use crate::maps::{Maps, MapsKey, MapsVal};
|
use crate::maps::{Maps, MapsKey, MapsVal};
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue