From 7c8eeaf4ea117645de6af9c8e9c1cd5a13703835 Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Sun, 29 Dec 2024 04:23:12 +0000 Subject: [PATCH] simplify multi_get_or_create/related stream implementations Signed-off-by: Jason Volk --- src/service/rooms/short/mod.rs | 12 +++++------- src/service/rooms/state_compressor/mod.rs | 7 +------ 2 files changed, 6 insertions(+), 13 deletions(-) diff --git a/src/service/rooms/short/mod.rs b/src/service/rooms/short/mod.rs index f814411b..00c1d16c 100644 --- a/src/service/rooms/short/mod.rs +++ b/src/service/rooms/short/mod.rs @@ -1,7 +1,7 @@ use std::{borrow::Borrow, fmt::Debug, mem::size_of_val, sync::Arc}; pub use conduwuit::pdu::{ShortEventId, ShortId, ShortRoomId}; -use conduwuit::{err, implement, utils, utils::stream::ReadyExt, Result}; +use conduwuit::{err, implement, utils, utils::IterStream, Result}; use database::{Deserialized, Map}; use futures::{Stream, StreamExt}; use ruma::{events::StateEventType, EventId, RoomId}; @@ -65,16 +65,13 @@ pub fn multi_get_or_create_shorteventid<'a, I>( event_ids: I, ) -> impl Stream + Send + '_ where - I: Iterator + Clone + Debug + ExactSizeIterator + Send + 'a, - ::Item: AsRef<[u8]> + Send + Sync + 'a, + I: Iterator + Clone + Debug + Send + 'a, { self.db .eventid_shorteventid .get_batch(event_ids.clone()) - .ready_scan(event_ids, |event_ids, result| { - event_ids.next().map(|event_id| (event_id, result)) - }) - .map(|(event_id, result)| match result { + .zip(event_ids.into_iter().stream()) + .map(|(result, event_id)| match result { | Ok(ref short) => utils::u64_from_u8(short), | Err(_) => self.create_shorteventid(event_id), }) @@ -90,6 +87,7 @@ fn create_shorteventid(&self, event_id: &EventId) -> ShortEventId { self.db .eventid_shorteventid .raw_aput::(event_id, short); + self.db .shorteventid_eventid .aput_raw::(short, event_id); diff --git a/src/service/rooms/state_compressor/mod.rs b/src/service/rooms/state_compressor/mod.rs index dbe0a386..a61a66a1 100644 --- a/src/service/rooms/state_compressor/mod.rs +++ b/src/service/rooms/state_compressor/mod.rs @@ -187,12 +187,7 @@ impl Service { state: I, ) -> impl Stream + Send + 'a where - I: Iterator - + Clone - + Debug - + ExactSizeIterator - + Send - + 'a, + I: Iterator + Clone + Debug + Send + 'a, { let event_ids = state.clone().map(at!(1));