From 07ba00f74e2dfea314d0e5236f0415b2de6d543c Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Wed, 26 Mar 2025 04:40:38 +0000 Subject: [PATCH] abstract raw query command iterations Signed-off-by: Jason Volk --- src/admin/query/raw.rs | 141 ++++++++++------------------------------- 1 file changed, 35 insertions(+), 106 deletions(-) diff --git a/src/admin/query/raw.rs b/src/admin/query/raw.rs index 23f11cc8..c503eee5 100644 --- a/src/admin/query/raw.rs +++ b/src/admin/query/raw.rs @@ -1,15 +1,16 @@ -use std::{borrow::Cow, collections::BTreeMap, ops::Deref}; +use std::{borrow::Cow, collections::BTreeMap, ops::Deref, sync::Arc}; use clap::Subcommand; use conduwuit::{ Err, Result, apply, at, is_zero, utils::{ - IterStream, - stream::{ReadyExt, TryIgnore, TryParallelExt}, + stream::{IterStream, ReadyExt, TryIgnore, TryParallelExt}, string::EMPTY, }, }; -use futures::{FutureExt, StreamExt, TryStreamExt}; +use conduwuit_database::Map; +use conduwuit_service::Services; +use futures::{FutureExt, Stream, StreamExt, TryStreamExt}; use ruma::events::room::message::RoomMessageEventContent; use tokio::time::Instant; @@ -172,22 +173,18 @@ pub(super) async fn compact( ) -> Result { use conduwuit_database::compact::Options; - let default_all_maps = map - .is_none() - .then(|| { - self.services - .db - .keys() - .map(Deref::deref) - .map(ToOwned::to_owned) - }) - .into_iter() - .flatten(); + let default_all_maps: Option<_> = map.is_none().then(|| { + self.services + .db + .keys() + .map(Deref::deref) + .map(ToOwned::to_owned) + }); let maps: Vec<_> = map .unwrap_or_default() .into_iter() - .chain(default_all_maps) + .chain(default_all_maps.into_iter().flatten()) .map(|map| self.services.db.get(&map)) .filter_map(Result::ok) .cloned() @@ -237,25 +234,8 @@ pub(super) async fn raw_count( ) -> Result { let prefix = prefix.as_deref().unwrap_or(EMPTY); - let default_all_maps = map - .is_none() - .then(|| self.services.db.keys().map(Deref::deref)) - .into_iter() - .flatten(); - - let maps: Vec<_> = map - .iter() - .map(String::as_str) - .chain(default_all_maps) - .map(|map| self.services.db.get(map)) - .filter_map(Result::ok) - .cloned() - .collect(); - let timer = Instant::now(); - let count = maps - .iter() - .stream() + let count = with_maps_or(map.as_deref(), self.services) .then(|map| map.raw_count_prefix(&prefix)) .ready_fold(0_usize, usize::saturating_add) .await; @@ -300,25 +280,8 @@ pub(super) async fn raw_keys_sizes( ) -> Result { let prefix = prefix.as_deref().unwrap_or(EMPTY); - let default_all_maps = map - .is_none() - .then(|| self.services.db.keys().map(Deref::deref)) - .into_iter() - .flatten(); - - let maps: Vec<_> = map - .iter() - .map(String::as_str) - .chain(default_all_maps) - .map(|map| self.services.db.get(map)) - .filter_map(Result::ok) - .cloned() - .collect(); - let timer = Instant::now(); - let result = maps - .iter() - .stream() + let result = with_maps_or(map.as_deref(), self.services) .map(|map| map.raw_keys_prefix(&prefix)) .flatten() .ignore_err() @@ -345,25 +308,8 @@ pub(super) async fn raw_keys_total( ) -> Result { let prefix = prefix.as_deref().unwrap_or(EMPTY); - let default_all_maps = map - .is_none() - .then(|| self.services.db.keys().map(Deref::deref)) - .into_iter() - .flatten(); - - let maps: Vec<_> = map - .iter() - .map(String::as_str) - .chain(default_all_maps) - .map(|map| self.services.db.get(map)) - .filter_map(Result::ok) - .cloned() - .collect(); - let timer = Instant::now(); - let result = maps - .iter() - .stream() + let result = with_maps_or(map.as_deref(), self.services) .map(|map| map.raw_keys_prefix(&prefix)) .flatten() .ignore_err() @@ -387,25 +333,8 @@ pub(super) async fn raw_vals_sizes( ) -> Result { let prefix = prefix.as_deref().unwrap_or(EMPTY); - let default_all_maps = map - .is_none() - .then(|| self.services.db.keys().map(Deref::deref)) - .into_iter() - .flatten(); - - let maps: Vec<_> = map - .iter() - .map(String::as_str) - .chain(default_all_maps) - .map(|map| self.services.db.get(map)) - .filter_map(Result::ok) - .cloned() - .collect(); - let timer = Instant::now(); - let result = maps - .iter() - .stream() + let result = with_maps_or(map.as_deref(), self.services) .map(|map| map.raw_stream_prefix(&prefix)) .flatten() .ignore_err() @@ -433,25 +362,8 @@ pub(super) async fn raw_vals_total( ) -> Result { let prefix = prefix.as_deref().unwrap_or(EMPTY); - let default_all_maps = map - .is_none() - .then(|| self.services.db.keys().map(Deref::deref)) - .into_iter() - .flatten(); - - let maps: Vec<_> = map - .iter() - .map(String::as_str) - .chain(default_all_maps) - .map(|map| self.services.db.get(map)) - .filter_map(Result::ok) - .cloned() - .collect(); - let timer = Instant::now(); - let result = maps - .iter() - .stream() + let result = with_maps_or(map.as_deref(), self.services) .map(|map| map.raw_stream_prefix(&prefix)) .flatten() .ignore_err() @@ -573,3 +485,20 @@ pub(super) async fn raw_maps(&self) -> Result { Ok(RoomMessageEventContent::notice_markdown(format!("{list:#?}"))) } + +fn with_maps_or<'a>( + map: Option<&'a str>, + services: &'a Services, +) -> impl Stream> + Send + 'a { + let default_all_maps = map + .is_none() + .then(|| services.db.keys().map(Deref::deref)) + .into_iter() + .flatten(); + + map.into_iter() + .chain(default_all_maps) + .map(|map| services.db.get(map)) + .filter_map(Result::ok) + .stream() +}