diff --git a/src/database/map/insert.rs b/src/database/map/insert.rs index 39a0c422..b8b08b34 100644 --- a/src/database/map/insert.rs +++ b/src/database/map/insert.rs @@ -203,7 +203,7 @@ where #[tracing::instrument(skip(self, iter), fields(%self), level = "trace")] pub fn insert_batch<'a, I, K, V>(&'a self, iter: I) where - I: Iterator + Send + Debug, + I: Iterator + Send + Debug, K: AsRef<[u8]> + Sized + Debug + 'a, V: AsRef<[u8]> + Sized + 'a, { diff --git a/src/service/rooms/search/mod.rs b/src/service/rooms/search/mod.rs index 1af37d9e..d59d1d11 100644 --- a/src/service/rooms/search/mod.rs +++ b/src/service/rooms/search/mod.rs @@ -73,11 +73,13 @@ pub fn index_pdu(&self, shortroomid: ShortRoomId, pdu_id: &RawPduId, message_bod key.extend_from_slice(word.as_bytes()); key.push(0xFF); key.extend_from_slice(pdu_id.as_ref()); // TODO: currently we save the room id a second time here - (key, Vec::::new()) + key }) .collect::>(); - self.db.tokenids.insert_batch(batch.iter()); + self.db + .tokenids + .insert_batch(batch.iter().map(|k| (k.as_slice(), &[]))); } #[implement(Service)] diff --git a/src/service/sending/data.rs b/src/service/sending/data.rs index cd25776a..ca7ca19a 100644 --- a/src/service/sending/data.rs +++ b/src/service/sending/data.rs @@ -1,7 +1,7 @@ -use std::sync::Arc; +use std::{fmt::Debug, sync::Arc}; use conduit::{ - utils, + at, utils, utils::{stream::TryIgnore, ReadyExt}, Error, Result, }; @@ -69,20 +69,22 @@ impl Data { .await; } - pub(super) fn mark_as_active(&self, events: &[QueueItem]) { - for (key, e) in events { - if key.is_empty() { - continue; - } + pub(super) fn mark_as_active<'a, I>(&self, events: I) + where + I: Iterator, + { + events + .filter(|(key, _)| !key.is_empty()) + .for_each(|(key, val)| { + let val = if let SendingEvent::Edu(val) = &val { + &**val + } else { + &[] + }; - let value = if let SendingEvent::Edu(value) = &e { - &**value - } else { - &[] - }; - self.servercurrentevent_data.insert(key, value); - self.servernameevent_data.remove(key); - } + self.servercurrentevent_data.insert(key, val); + self.servernameevent_data.remove(key); + }); } #[inline] @@ -110,26 +112,40 @@ impl Data { }) } - pub(super) fn queue_requests(&self, requests: &[(&SendingEvent, &Destination)]) -> Vec> { - let mut batch = Vec::new(); - let mut keys = Vec::new(); - for (event, destination) in requests { - let mut key = destination.get_prefix(); - if let SendingEvent::Pdu(value) = event { - key.extend(value.as_ref()); - } else { - key.extend(&self.services.globals.next_count().unwrap().to_be_bytes()); - } - let value = if let SendingEvent::Edu(value) = &event { - &**value - } else { - &[] - }; - batch.push((key.clone(), value.to_owned())); - keys.push(key); - } + pub(super) fn queue_requests<'a, I>(&self, requests: I) -> Vec> + where + I: Iterator + Clone + Debug + Send, + { + let keys: Vec<_> = requests + .clone() + .map(|(event, dest)| { + let mut key = dest.get_prefix(); + if let SendingEvent::Pdu(value) = event { + key.extend(value.as_ref()); + } else { + let count = self.services.globals.next_count().unwrap(); + key.extend(&count.to_be_bytes()); + } + + key + }) + .collect(); + + self.servernameevent_data.insert_batch( + keys.iter() + .map(Vec::as_slice) + .zip(requests.map(at!(0))) + .map(|(key, event)| { + let value = if let SendingEvent::Edu(value) = &event { + &**value + } else { + &[] + }; + + (key, value) + }), + ); - self.servernameevent_data.insert_batch(batch.iter()); keys } diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 77997f69..5a070306 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -4,7 +4,7 @@ mod dest; mod send; mod sender; -use std::{fmt::Debug, sync::Arc}; +use std::{fmt::Debug, iter::once, sync::Arc}; use async_trait::async_trait; use conduit::{ @@ -117,7 +117,7 @@ impl Service { let dest = Destination::Push(user.to_owned(), pushkey); let event = SendingEvent::Pdu(*pdu_id); let _cork = self.db.db.cork(); - let keys = self.db.queue_requests(&[(&event, &dest)]); + let keys = self.db.queue_requests(once((&event, &dest))); self.dispatch(Msg { dest, event, @@ -130,7 +130,7 @@ impl Service { let dest = Destination::Appservice(appservice_id); let event = SendingEvent::Pdu(pdu_id); let _cork = self.db.db.cork(); - let keys = self.db.queue_requests(&[(&event, &dest)]); + let keys = self.db.queue_requests(once((&event, &dest))); self.dispatch(Msg { dest, event, @@ -160,9 +160,7 @@ impl Service { .collect::>() .await; - let keys = self - .db - .queue_requests(&requests.iter().map(|(o, e)| (e, o)).collect::>()); + let keys = self.db.queue_requests(requests.iter().map(|(o, e)| (e, o))); for ((dest, event), queue_id) in requests.into_iter().zip(keys) { self.dispatch(Msg { @@ -180,7 +178,7 @@ impl Service { let dest = Destination::Normal(server.to_owned()); let event = SendingEvent::Edu(serialized); let _cork = self.db.db.cork(); - let keys = self.db.queue_requests(&[(&event, &dest)]); + let keys = self.db.queue_requests(once((&event, &dest))); self.dispatch(Msg { dest, event, @@ -210,9 +208,7 @@ impl Service { .collect::>() .await; - let keys = self - .db - .queue_requests(&requests.iter().map(|(o, e)| (e, o)).collect::>()); + let keys = self.db.queue_requests(requests.iter().map(|(o, e)| (e, o))); for ((dest, event), queue_id) in requests.into_iter().zip(keys) { self.dispatch(Msg { diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index ee818289..0a0aae39 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -118,7 +118,7 @@ impl Service { // Insert any pdus we found if !new_events.is_empty() { - self.db.mark_as_active(&new_events); + self.db.mark_as_active(new_events.iter()); let new_events_vec = new_events.into_iter().map(|(_, event)| event).collect(); futures.push(self.send_events(dest.clone(), new_events_vec).boxed()); @@ -213,7 +213,7 @@ impl Service { // Compose the next transaction let _cork = self.db.db.cork(); if !new_events.is_empty() { - self.db.mark_as_active(&new_events); + self.db.mark_as_active(new_events.iter()); for (_, e) in new_events { events.push(e); }