abstract raw query command iterations

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-03-26 04:40:38 +00:00
parent 9d0ce3965e
commit 07ba00f74e

View file

@ -1,15 +1,16 @@
use std::{borrow::Cow, collections::BTreeMap, ops::Deref}; use std::{borrow::Cow, collections::BTreeMap, ops::Deref, sync::Arc};
use clap::Subcommand; use clap::Subcommand;
use conduwuit::{ use conduwuit::{
Err, Result, apply, at, is_zero, Err, Result, apply, at, is_zero,
utils::{ utils::{
IterStream, stream::{IterStream, ReadyExt, TryIgnore, TryParallelExt},
stream::{ReadyExt, TryIgnore, TryParallelExt},
string::EMPTY, string::EMPTY,
}, },
}; };
use futures::{FutureExt, StreamExt, TryStreamExt}; use conduwuit_database::Map;
use conduwuit_service::Services;
use futures::{FutureExt, Stream, StreamExt, TryStreamExt};
use ruma::events::room::message::RoomMessageEventContent; use ruma::events::room::message::RoomMessageEventContent;
use tokio::time::Instant; use tokio::time::Instant;
@ -172,22 +173,18 @@ pub(super) async fn compact(
) -> Result<RoomMessageEventContent> { ) -> Result<RoomMessageEventContent> {
use conduwuit_database::compact::Options; use conduwuit_database::compact::Options;
let default_all_maps = map let default_all_maps: Option<_> = map.is_none().then(|| {
.is_none()
.then(|| {
self.services self.services
.db .db
.keys() .keys()
.map(Deref::deref) .map(Deref::deref)
.map(ToOwned::to_owned) .map(ToOwned::to_owned)
}) });
.into_iter()
.flatten();
let maps: Vec<_> = map let maps: Vec<_> = map
.unwrap_or_default() .unwrap_or_default()
.into_iter() .into_iter()
.chain(default_all_maps) .chain(default_all_maps.into_iter().flatten())
.map(|map| self.services.db.get(&map)) .map(|map| self.services.db.get(&map))
.filter_map(Result::ok) .filter_map(Result::ok)
.cloned() .cloned()
@ -237,25 +234,8 @@ pub(super) async fn raw_count(
) -> Result<RoomMessageEventContent> { ) -> Result<RoomMessageEventContent> {
let prefix = prefix.as_deref().unwrap_or(EMPTY); let prefix = prefix.as_deref().unwrap_or(EMPTY);
let default_all_maps = map
.is_none()
.then(|| self.services.db.keys().map(Deref::deref))
.into_iter()
.flatten();
let maps: Vec<_> = map
.iter()
.map(String::as_str)
.chain(default_all_maps)
.map(|map| self.services.db.get(map))
.filter_map(Result::ok)
.cloned()
.collect();
let timer = Instant::now(); let timer = Instant::now();
let count = maps let count = with_maps_or(map.as_deref(), self.services)
.iter()
.stream()
.then(|map| map.raw_count_prefix(&prefix)) .then(|map| map.raw_count_prefix(&prefix))
.ready_fold(0_usize, usize::saturating_add) .ready_fold(0_usize, usize::saturating_add)
.await; .await;
@ -300,25 +280,8 @@ pub(super) async fn raw_keys_sizes(
) -> Result<RoomMessageEventContent> { ) -> Result<RoomMessageEventContent> {
let prefix = prefix.as_deref().unwrap_or(EMPTY); let prefix = prefix.as_deref().unwrap_or(EMPTY);
let default_all_maps = map
.is_none()
.then(|| self.services.db.keys().map(Deref::deref))
.into_iter()
.flatten();
let maps: Vec<_> = map
.iter()
.map(String::as_str)
.chain(default_all_maps)
.map(|map| self.services.db.get(map))
.filter_map(Result::ok)
.cloned()
.collect();
let timer = Instant::now(); let timer = Instant::now();
let result = maps let result = with_maps_or(map.as_deref(), self.services)
.iter()
.stream()
.map(|map| map.raw_keys_prefix(&prefix)) .map(|map| map.raw_keys_prefix(&prefix))
.flatten() .flatten()
.ignore_err() .ignore_err()
@ -345,25 +308,8 @@ pub(super) async fn raw_keys_total(
) -> Result<RoomMessageEventContent> { ) -> Result<RoomMessageEventContent> {
let prefix = prefix.as_deref().unwrap_or(EMPTY); let prefix = prefix.as_deref().unwrap_or(EMPTY);
let default_all_maps = map
.is_none()
.then(|| self.services.db.keys().map(Deref::deref))
.into_iter()
.flatten();
let maps: Vec<_> = map
.iter()
.map(String::as_str)
.chain(default_all_maps)
.map(|map| self.services.db.get(map))
.filter_map(Result::ok)
.cloned()
.collect();
let timer = Instant::now(); let timer = Instant::now();
let result = maps let result = with_maps_or(map.as_deref(), self.services)
.iter()
.stream()
.map(|map| map.raw_keys_prefix(&prefix)) .map(|map| map.raw_keys_prefix(&prefix))
.flatten() .flatten()
.ignore_err() .ignore_err()
@ -387,25 +333,8 @@ pub(super) async fn raw_vals_sizes(
) -> Result<RoomMessageEventContent> { ) -> Result<RoomMessageEventContent> {
let prefix = prefix.as_deref().unwrap_or(EMPTY); let prefix = prefix.as_deref().unwrap_or(EMPTY);
let default_all_maps = map
.is_none()
.then(|| self.services.db.keys().map(Deref::deref))
.into_iter()
.flatten();
let maps: Vec<_> = map
.iter()
.map(String::as_str)
.chain(default_all_maps)
.map(|map| self.services.db.get(map))
.filter_map(Result::ok)
.cloned()
.collect();
let timer = Instant::now(); let timer = Instant::now();
let result = maps let result = with_maps_or(map.as_deref(), self.services)
.iter()
.stream()
.map(|map| map.raw_stream_prefix(&prefix)) .map(|map| map.raw_stream_prefix(&prefix))
.flatten() .flatten()
.ignore_err() .ignore_err()
@ -433,25 +362,8 @@ pub(super) async fn raw_vals_total(
) -> Result<RoomMessageEventContent> { ) -> Result<RoomMessageEventContent> {
let prefix = prefix.as_deref().unwrap_or(EMPTY); let prefix = prefix.as_deref().unwrap_or(EMPTY);
let default_all_maps = map
.is_none()
.then(|| self.services.db.keys().map(Deref::deref))
.into_iter()
.flatten();
let maps: Vec<_> = map
.iter()
.map(String::as_str)
.chain(default_all_maps)
.map(|map| self.services.db.get(map))
.filter_map(Result::ok)
.cloned()
.collect();
let timer = Instant::now(); let timer = Instant::now();
let result = maps let result = with_maps_or(map.as_deref(), self.services)
.iter()
.stream()
.map(|map| map.raw_stream_prefix(&prefix)) .map(|map| map.raw_stream_prefix(&prefix))
.flatten() .flatten()
.ignore_err() .ignore_err()
@ -573,3 +485,20 @@ pub(super) async fn raw_maps(&self) -> Result<RoomMessageEventContent> {
Ok(RoomMessageEventContent::notice_markdown(format!("{list:#?}"))) Ok(RoomMessageEventContent::notice_markdown(format!("{list:#?}")))
} }
fn with_maps_or<'a>(
map: Option<&'a str>,
services: &'a Services,
) -> impl Stream<Item = &'a Arc<Map>> + Send + 'a {
let default_all_maps = map
.is_none()
.then(|| services.db.keys().map(Deref::deref))
.into_iter()
.flatten();
map.into_iter()
.chain(default_all_maps)
.map(|map| services.db.get(map))
.filter_map(Result::ok)
.stream()
}