add outgoing federation typing and conf items

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-03-29 18:35:02 -07:00 committed by June
parent 4a57592378
commit 18c34434bc
5 changed files with 105 additions and 9 deletions

View file

@ -1,7 +1,12 @@
use std::collections::BTreeMap;
use ruma::{events::SyncEphemeralRoomEvent, OwnedRoomId, OwnedUserId, RoomId, UserId};
use ruma::{
api::federation::transactions::edu::{Edu, TypingContent},
events::SyncEphemeralRoomEvent,
OwnedRoomId, OwnedUserId, RoomId, UserId,
};
use tokio::sync::{broadcast, RwLock};
use tracing::debug;
use crate::{services, utils, Result};
@ -16,6 +21,8 @@ impl Service {
/// Sets a user as typing until the timeout timestamp is reached or
/// roomtyping_remove is called.
pub async fn typing_add(&self, user_id: &UserId, room_id: &RoomId, timeout: u64) -> Result<()> {
debug!("typing add {:?} in {:?} timeout:{:?}", user_id, room_id, timeout);
// update clients
self.typing
.write()
.await
@ -27,11 +34,19 @@ impl Service {
.await
.insert(room_id.to_owned(), services().globals.next_count()?);
_ = self.typing_update_sender.send(room_id.to_owned());
// update federation
if user_id.server_name() == services().globals.server_name() {
self.federation_send(room_id, user_id, true)?;
}
Ok(())
}
/// Removes a user from typing before the timeout is reached.
pub async fn typing_remove(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> {
debug!("typing remove {:?} in {:?}", user_id, room_id);
// update clients
self.typing
.write()
.await
@ -43,6 +58,12 @@ impl Service {
.await
.insert(room_id.to_owned(), services().globals.next_count()?);
_ = self.typing_update_sender.send(room_id.to_owned());
// update federation
if user_id.server_name() == services().globals.server_name() {
self.federation_send(room_id, user_id, false)?;
}
Ok(())
}
@ -80,14 +101,23 @@ impl Service {
if !removable.is_empty() {
let typing = &mut self.typing.write().await;
let room = typing.entry(room_id.to_owned()).or_default();
for user in removable {
room.remove(&user);
for user in &removable {
debug!("typing maintain remove {:?} in {:?}", &user, room_id);
room.remove(user);
}
// update clients
self.last_typing_update
.write()
.await
.insert(room_id.to_owned(), services().globals.next_count()?);
_ = self.typing_update_sender.send(room_id.to_owned());
// update federation
for user in removable {
if user.server_name() == services().globals.server_name() {
self.federation_send(room_id, &user, false)?;
}
}
}
Ok(())
@ -121,4 +151,22 @@ impl Service {
},
})
}
fn federation_send(&self, room_id: &RoomId, user_id: &UserId, typing: bool) -> Result<()> {
debug_assert!(
user_id.server_name() == services().globals.server_name(),
"tried to broadcast typing status of remote user",
);
if !services().globals.config.allow_outgoing_typing {
return Ok(());
}
let edu = Edu::Typing(TypingContent::new(room_id.to_owned(), user_id.to_owned(), typing));
services()
.sending
.send_edu_room(room_id, serde_json::to_vec(&edu).expect("Serialized Edu::Typing"))?;
Ok(())
}
}