diff --git a/src/admin/query/presence.rs b/src/admin/query/presence.rs index 6189270c..0963429e 100644 --- a/src/admin/query/presence.rs +++ b/src/admin/query/presence.rs @@ -42,12 +42,16 @@ pub(super) async fn process(subcommand: PresenceCommand, context: &Command<'_>) since, } => { let timer = tokio::time::Instant::now(); - let results = services.presence.db.presence_since(since); - let presence_since: Vec<(_, _, _)> = results.collect().await; + let results: Vec<(_, _, _)> = services + .presence + .presence_since(since) + .map(|(user_id, count, bytes)| (user_id.to_owned(), count, bytes.to_vec())) + .collect() + .await; let query_time = timer.elapsed(); Ok(RoomMessageEventContent::notice_markdown(format!( - "Query completed in {query_time:?}:\n\n```rs\n{presence_since:#?}\n```" + "Query completed in {query_time:?}:\n\n```rs\n{results:#?}\n```" ))) }, } diff --git a/src/api/client/sync/v3.rs b/src/api/client/sync/v3.rs index f29fe220..2bd318df 100644 --- a/src/api/client/sync/v3.rs +++ b/src/api/client/sync/v3.rs @@ -488,7 +488,7 @@ async fn process_presence_updates( if !services .rooms .state_cache - .user_sees_user(syncing_user, &user_id) + .user_sees_user(syncing_user, user_id) .await { continue; @@ -496,10 +496,10 @@ async fn process_presence_updates( let presence_event = services .presence - .from_json_bytes_to_event(&presence_bytes, &user_id) + .from_json_bytes_to_event(presence_bytes, user_id) .await?; - match presence_updates.entry(user_id) { + match presence_updates.entry(user_id.into()) { Entry::Vacant(slot) => { slot.insert(presence_event); }, @@ -524,7 +524,7 @@ async fn process_presence_updates( .currently_active .or(curr_content.currently_active); }, - } + }; } Ok(()) diff --git a/src/service/presence/data.rs b/src/service/presence/data.rs index 8522746f..68b2c3fe 100644 --- a/src/service/presence/data.rs +++ b/src/service/presence/data.rs @@ -7,7 +7,7 @@ use conduit::{ }; use database::{Deserialized, Json, Map}; use futures::Stream; -use ruma::{events::presence::PresenceEvent, presence::PresenceState, OwnedUserId, UInt, UserId}; +use ruma::{events::presence::PresenceEvent, presence::PresenceState, UInt, UserId}; use super::Presence; use crate::{globals, users, Dep}; @@ -137,13 +137,14 @@ impl Data { self.userid_presenceid.remove(user_id); } - pub fn presence_since(&self, since: u64) -> impl Stream)> + Send + '_ { + #[inline] + pub(super) fn presence_since(&self, since: u64) -> impl Stream + Send + '_ { self.presenceid_presence .raw_stream() .ignore_err() - .ready_filter_map(move |(key, presence_bytes)| { - let (count, user_id) = presenceid_parse(key).expect("invalid presenceid_parse"); - (count > since).then(|| (user_id.to_owned(), count, presence_bytes.to_vec())) + .ready_filter_map(move |(key, presence)| { + let (count, user_id) = presenceid_parse(key).ok()?; + (count > since).then_some((user_id, count, presence)) }) } } diff --git a/src/service/presence/mod.rs b/src/service/presence/mod.rs index 82a99bd5..b2106f3f 100644 --- a/src/service/presence/mod.rs +++ b/src/service/presence/mod.rs @@ -162,8 +162,7 @@ impl Service { /// Returns the most recent presence updates that happened after the event /// with id `since`. - #[inline] - pub fn presence_since(&self, since: u64) -> impl Stream)> + Send + '_ { + pub fn presence_since(&self, since: u64) -> impl Stream + Send + '_ { self.db.presence_since(since) } diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index 5c0a324b..a57d4aea 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -7,7 +7,9 @@ use std::{ use base64::{engine::general_purpose, Engine as _}; use conduit::{ - debug, debug_warn, err, trace, + debug, debug_warn, err, + result::LogErr, + trace, utils::{calculate_hash, math::continue_exponential_backoff_secs, ReadyExt}, warn, Error, Result, }; @@ -315,14 +317,14 @@ impl Service { while let Some((user_id, count, presence_bytes)) = presence_since.next().await { *max_edu_count = cmp::max(count, *max_edu_count); - if !self.services.globals.user_is_local(&user_id) { + if !self.services.globals.user_is_local(user_id) { continue; } if !self .services .state_cache - .server_sees_user(server_name, &user_id) + .server_sees_user(server_name, user_id) .await { continue;