diff --git a/src/api/client/keys.rs b/src/api/client/keys.rs index 254d92cc..44d9164c 100644 --- a/src/api/client/keys.rs +++ b/src/api/client/keys.rs @@ -232,7 +232,7 @@ pub(crate) async fn get_key_changes_route( device_list_updates.extend( services .users - .keys_changed(sender_user.as_str(), from, Some(to)) + .keys_changed(sender_user, from, Some(to)) .map(ToOwned::to_owned) .collect::>() .await, @@ -244,7 +244,8 @@ pub(crate) async fn get_key_changes_route( device_list_updates.extend( services .users - .keys_changed(room_id.as_str(), from, Some(to)) + .room_keys_changed(room_id, from, Some(to)) + .map(|(user_id, _)| user_id) .map(ToOwned::to_owned) .collect::>() .await, diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index 2bd318df..ccca1f85 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -138,7 +138,7 @@ pub(crate) async fn sync_events_route( device_list_updates.extend( services .users - .keys_changed(sender_user.as_ref(), since, None) + .keys_changed(&sender_user, since, None) .map(ToOwned::to_owned) .collect::>() .await, @@ -917,7 +917,8 @@ async fn load_joined_room( device_list_updates.extend( services .users - .keys_changed(room_id.as_ref(), since, None) + .room_keys_changed(room_id, since, None) + .map(|(user_id, _)| user_id) .map(ToOwned::to_owned) .collect::>() .await, diff --git a/src/api/client/sync/v4.rs b/src/api/client/sync/v4.rs index 2adb3b71..4f8323e6 100644 --- a/src/api/client/sync/v4.rs +++ b/src/api/client/sync/v4.rs @@ -162,7 +162,7 @@ pub(crate) async fn sync_events_v4_route( device_list_changes.extend( services .users - .keys_changed(sender_user.as_ref(), globalsince, None) + .keys_changed(sender_user, globalsince, None) .map(ToOwned::to_owned) .collect::>() .await, @@ -285,7 +285,8 @@ pub(crate) async fn sync_events_v4_route( device_list_changes.extend( services .users - .keys_changed(room_id.as_ref(), globalsince, None) + .room_keys_changed(room_id, globalsince, None) + .map(|(user_id, _)| user_id) .map(ToOwned::to_owned) .collect::>() .await, diff --git a/src/api/mod.rs b/src/api/mod.rs index 96837470..ed8aacf2 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -6,7 +6,6 @@ extern crate conduit_core as conduit; extern crate conduit_service as service; pub(crate) use conduit::{debug_info, pdu::PduEvent, utils, Error, Result}; -pub(crate) use service::services; pub(crate) use self::router::{Ruma, RumaResponse, State}; diff --git a/src/service/users/mod.rs b/src/service/users/mod.rs index 589aee8a..b9183e12 100644 --- a/src/service/users/mod.rs +++ b/src/service/users/mod.rs @@ -13,7 +13,7 @@ use ruma::{ events::{ignored_user_list::IgnoredUserListEvent, AnyToDeviceEvent, GlobalAccountDataEventType}, serde::Raw, DeviceId, DeviceKeyAlgorithm, DeviceKeyId, MilliSecondsSinceUnixEpoch, OwnedDeviceId, OwnedDeviceKeyId, - OwnedMxcUri, OwnedUserId, UInt, UserId, + OwnedMxcUri, OwnedUserId, RoomId, UInt, UserId, }; use serde_json::json; @@ -585,9 +585,24 @@ impl Service { Ok(()) } + #[inline] pub fn keys_changed<'a>( - &'a self, user_or_room_id: &'a str, from: u64, to: Option, + &'a self, user_id: &'a UserId, from: u64, to: Option, ) -> impl Stream + Send + 'a { + self.keys_changed_user_or_room(user_id.as_str(), from, to) + .map(|(user_id, ..)| user_id) + } + + #[inline] + pub fn room_keys_changed<'a>( + &'a self, room_id: &'a RoomId, from: u64, to: Option, + ) -> impl Stream + Send + 'a { + self.keys_changed_user_or_room(room_id.as_str(), from, to) + } + + fn keys_changed_user_or_room<'a>( + &'a self, user_or_room_id: &'a str, from: u64, to: Option, + ) -> impl Stream + Send + 'a { type KeyVal<'a> = ((&'a str, u64), &'a UserId); let to = to.unwrap_or(u64::MAX); @@ -597,7 +612,7 @@ impl Service { .stream_from(&start) .ignore_err() .ready_take_while(move |((prefix, count), _): &KeyVal<'_>| *prefix == user_or_room_id && *count <= to) - .map(|((..), user_id): KeyVal<'_>| user_id) + .map(|((_, count), user_id): KeyVal<'_>| (user_id, count)) } pub async fn mark_device_key_update(&self, user_id: &UserId) {