simplify multi_get_or_create/related stream implementations

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-12-29 04:23:12 +00:00 committed by strawberry
parent 9eb99f8070
commit 7c8eeaf4ea
2 changed files with 6 additions and 13 deletions

View file

@ -1,7 +1,7 @@
use std::{borrow::Borrow, fmt::Debug, mem::size_of_val, sync::Arc}; use std::{borrow::Borrow, fmt::Debug, mem::size_of_val, sync::Arc};
pub use conduwuit::pdu::{ShortEventId, ShortId, ShortRoomId}; pub use conduwuit::pdu::{ShortEventId, ShortId, ShortRoomId};
use conduwuit::{err, implement, utils, utils::stream::ReadyExt, Result}; use conduwuit::{err, implement, utils, utils::IterStream, Result};
use database::{Deserialized, Map}; use database::{Deserialized, Map};
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt};
use ruma::{events::StateEventType, EventId, RoomId}; use ruma::{events::StateEventType, EventId, RoomId};
@ -65,16 +65,13 @@ pub fn multi_get_or_create_shorteventid<'a, I>(
event_ids: I, event_ids: I,
) -> impl Stream<Item = ShortEventId> + Send + '_ ) -> impl Stream<Item = ShortEventId> + Send + '_
where where
I: Iterator<Item = &'a EventId> + Clone + Debug + ExactSizeIterator + Send + 'a, I: Iterator<Item = &'a EventId> + Clone + Debug + Send + 'a,
<I as Iterator>::Item: AsRef<[u8]> + Send + Sync + 'a,
{ {
self.db self.db
.eventid_shorteventid .eventid_shorteventid
.get_batch(event_ids.clone()) .get_batch(event_ids.clone())
.ready_scan(event_ids, |event_ids, result| { .zip(event_ids.into_iter().stream())
event_ids.next().map(|event_id| (event_id, result)) .map(|(result, event_id)| match result {
})
.map(|(event_id, result)| match result {
| Ok(ref short) => utils::u64_from_u8(short), | Ok(ref short) => utils::u64_from_u8(short),
| Err(_) => self.create_shorteventid(event_id), | Err(_) => self.create_shorteventid(event_id),
}) })
@ -90,6 +87,7 @@ fn create_shorteventid(&self, event_id: &EventId) -> ShortEventId {
self.db self.db
.eventid_shorteventid .eventid_shorteventid
.raw_aput::<BUFSIZE, _, _>(event_id, short); .raw_aput::<BUFSIZE, _, _>(event_id, short);
self.db self.db
.shorteventid_eventid .shorteventid_eventid
.aput_raw::<BUFSIZE, _, _>(short, event_id); .aput_raw::<BUFSIZE, _, _>(short, event_id);

View file

@ -187,12 +187,7 @@ impl Service {
state: I, state: I,
) -> impl Stream<Item = CompressedStateEvent> + Send + 'a ) -> impl Stream<Item = CompressedStateEvent> + Send + 'a
where where
I: Iterator<Item = (&'a ShortStateKey, &'a EventId)> I: Iterator<Item = (&'a ShortStateKey, &'a EventId)> + Clone + Debug + Send + 'a,
+ Clone
+ Debug
+ ExactSizeIterator
+ Send
+ 'a,
{ {
let event_ids = state.clone().map(at!(1)); let event_ids = state.clone().map(at!(1));