From 68086717516225af96fe6c7cff743836103188eb Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Fri, 25 Oct 2024 05:22:50 +0000 Subject: [PATCH] merge search service w/ data Signed-off-by: Jason Volk --- src/service/rooms/search/data.rs | 113 ---------------------------- src/service/rooms/search/mod.rs | 123 ++++++++++++++++++++++++++----- 2 files changed, 103 insertions(+), 133 deletions(-) delete mode 100644 src/service/rooms/search/data.rs diff --git a/src/service/rooms/search/data.rs b/src/service/rooms/search/data.rs deleted file mode 100644 index de98beee..00000000 --- a/src/service/rooms/search/data.rs +++ /dev/null @@ -1,113 +0,0 @@ -use std::sync::Arc; - -use conduit::utils::{set, stream::TryIgnore, IterStream, ReadyExt}; -use database::Map; -use futures::StreamExt; -use ruma::RoomId; - -use crate::{rooms, Dep}; - -pub(super) struct Data { - tokenids: Arc, - services: Services, -} - -struct Services { - short: Dep, -} - -impl Data { - pub(super) fn new(args: &crate::Args<'_>) -> Self { - let db = &args.db; - Self { - tokenids: db["tokenids"].clone(), - services: Services { - short: args.depend::("rooms::short"), - }, - } - } - - pub(super) fn index_pdu(&self, shortroomid: u64, pdu_id: &[u8], message_body: &str) { - let batch = tokenize(message_body) - .map(|word| { - let mut key = shortroomid.to_be_bytes().to_vec(); - key.extend_from_slice(word.as_bytes()); - key.push(0xFF); - key.extend_from_slice(pdu_id); // TODO: currently we save the room id a second time here - (key, Vec::::new()) - }) - .collect::>(); - - self.tokenids.insert_batch(batch.iter()); - } - - pub(super) fn deindex_pdu(&self, shortroomid: u64, pdu_id: &[u8], message_body: &str) { - let batch = tokenize(message_body).map(|word| { - let mut key = shortroomid.to_be_bytes().to_vec(); - key.extend_from_slice(word.as_bytes()); - key.push(0xFF); - key.extend_from_slice(pdu_id); // TODO: currently we save the room id a second time here - key - }); - - for token in batch { - self.tokenids.remove(&token); - } - } - - pub(super) async fn search_pdus( - &self, room_id: &RoomId, search_string: &str, - ) -> Option<(Vec>, Vec)> { - let prefix = self - .services - .short - .get_shortroomid(room_id) - .await - .ok()? - .to_be_bytes() - .to_vec(); - - let words: Vec<_> = tokenize(search_string).collect(); - - let bufs: Vec<_> = words - .clone() - .into_iter() - .stream() - .then(move |word| { - let mut prefix2 = prefix.clone(); - prefix2.extend_from_slice(word.as_bytes()); - prefix2.push(0xFF); - let prefix3 = prefix2.clone(); - - let mut last_possible_id = prefix2.clone(); - last_possible_id.extend_from_slice(&u64::MAX.to_be_bytes()); - - self.tokenids - .rev_raw_keys_from(&last_possible_id) // Newest pdus first - .ignore_err() - .ready_take_while(move |key| key.starts_with(&prefix2)) - .map(move |key| key[prefix3.len()..].to_vec()) - .collect::>() - }) - .collect() - .await; - - Some(( - set::intersection(bufs.iter().map(|buf| buf.iter())) - .cloned() - .collect(), - words, - )) - } -} - -/// Splits a string into tokens used as keys in the search inverted index -/// -/// This may be used to tokenize both message bodies (for indexing) or search -/// queries (for querying). -fn tokenize(body: &str) -> impl Iterator + Send + '_ { - body.split_terminator(|c: char| !c.is_alphanumeric()) - .filter(|s| !s.is_empty()) - .filter(|word| word.len() <= 50) - .map(str::to_lowercase) -} diff --git a/src/service/rooms/search/mod.rs b/src/service/rooms/search/mod.rs index 80b58804..032ad55c 100644 --- a/src/service/rooms/search/mod.rs +++ b/src/service/rooms/search/mod.rs @@ -1,41 +1,124 @@ -mod data; - use std::sync::Arc; -use conduit::Result; -use data::Data; +use conduit::{ + implement, + utils::{set, stream::TryIgnore, IterStream, ReadyExt}, + Result, +}; +use database::Map; +use futures::StreamExt; use ruma::RoomId; +use crate::{rooms, Dep}; + pub struct Service { db: Data, + services: Services, +} + +struct Data { + tokenids: Arc, +} + +struct Services { + short: Dep, } impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { Ok(Arc::new(Self { - db: Data::new(&args), + db: Data { + tokenids: args.db["tokenids"].clone(), + }, + services: Services { + short: args.depend::("rooms::short"), + }, })) } fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } } -impl Service { - #[inline] - #[tracing::instrument(skip(self), level = "debug")] - pub fn index_pdu(&self, shortroomid: u64, pdu_id: &[u8], message_body: &str) { - self.db.index_pdu(shortroomid, pdu_id, message_body); - } +#[implement(Service)] +pub fn index_pdu(&self, shortroomid: u64, pdu_id: &[u8], message_body: &str) { + let batch = tokenize(message_body) + .map(|word| { + let mut key = shortroomid.to_be_bytes().to_vec(); + key.extend_from_slice(word.as_bytes()); + key.push(0xFF); + key.extend_from_slice(pdu_id); // TODO: currently we save the room id a second time here + (key, Vec::::new()) + }) + .collect::>(); - #[inline] - #[tracing::instrument(skip(self), level = "debug")] - pub fn deindex_pdu(&self, shortroomid: u64, pdu_id: &[u8], message_body: &str) { - self.db.deindex_pdu(shortroomid, pdu_id, message_body); - } + self.db.tokenids.insert_batch(batch.iter()); +} - #[inline] - #[tracing::instrument(skip(self), level = "debug")] - pub async fn search_pdus(&self, room_id: &RoomId, search_string: &str) -> Option<(Vec>, Vec)> { - self.db.search_pdus(room_id, search_string).await +#[implement(Service)] +pub fn deindex_pdu(&self, shortroomid: u64, pdu_id: &[u8], message_body: &str) { + let batch = tokenize(message_body).map(|word| { + let mut key = shortroomid.to_be_bytes().to_vec(); + key.extend_from_slice(word.as_bytes()); + key.push(0xFF); + key.extend_from_slice(pdu_id); // TODO: currently we save the room id a second time here + key + }); + + for token in batch { + self.db.tokenids.remove(&token); } } + +#[implement(Service)] +pub async fn search_pdus(&self, room_id: &RoomId, search_string: &str) -> Option<(Vec>, Vec)> { + let prefix = self + .services + .short + .get_shortroomid(room_id) + .await + .ok()? + .to_be_bytes() + .to_vec(); + + let words: Vec<_> = tokenize(search_string).collect(); + + let bufs: Vec<_> = words + .clone() + .into_iter() + .stream() + .then(move |word| { + let mut prefix2 = prefix.clone(); + prefix2.extend_from_slice(word.as_bytes()); + prefix2.push(0xFF); + let prefix3 = prefix2.clone(); + + let mut last_possible_id = prefix2.clone(); + last_possible_id.extend_from_slice(&u64::MAX.to_be_bytes()); + + self.db.tokenids + .rev_raw_keys_from(&last_possible_id) // Newest pdus first + .ignore_err() + .ready_take_while(move |key| key.starts_with(&prefix2)) + .map(move |key| key[prefix3.len()..].to_vec()) + .collect::>() + }) + .collect() + .await; + + let bufs = bufs.iter().map(|buf| buf.iter()); + + let results = set::intersection(bufs).cloned().collect(); + + Some((results, words)) +} + +/// Splits a string into tokens used as keys in the search inverted index +/// +/// This may be used to tokenize both message bodies (for indexing) or search +/// queries (for querying). +fn tokenize(body: &str) -> impl Iterator + Send + '_ { + body.split_terminator(|c: char| !c.is_alphanumeric()) + .filter(|s| !s.is_empty()) + .filter(|word| word.len() <= 50) + .map(str::to_lowercase) +}