parent
4dd809fdc4
commit
9040ad054e
4 changed files with 42 additions and 40 deletions
|
@ -722,7 +722,7 @@ async fn load_joined_room(
|
||||||
.rooms
|
.rooms
|
||||||
.typing
|
.typing
|
||||||
.typings_all(room_id, sender_user)
|
.typings_all(room_id, sender_user)
|
||||||
.await;
|
.await?;
|
||||||
|
|
||||||
Ok(vec![serde_json::from_str(&serde_json::to_string(&typings)?)?])
|
Ok(vec![serde_json::from_str(&serde_json::to_string(&typings)?)?])
|
||||||
})
|
})
|
||||||
|
|
|
@ -59,7 +59,7 @@ impl Service {
|
||||||
.expect("room flush failed");
|
.expect("room flush failed");
|
||||||
// update appservices
|
// update appservices
|
||||||
let edu = EphemeralData::Receipt(event);
|
let edu = EphemeralData::Receipt(event);
|
||||||
_ = self
|
let _ = self
|
||||||
.services
|
.services
|
||||||
.sending
|
.sending
|
||||||
.send_edu_appservice_room(
|
.send_edu_appservice_room(
|
||||||
|
|
|
@ -146,34 +146,32 @@ impl Service {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if removable.is_empty() {
|
if !removable.is_empty() {
|
||||||
return Ok(());
|
let typing = &mut self.typing.write().await;
|
||||||
}
|
let room = typing.entry(room_id.to_owned()).or_default();
|
||||||
|
for user in &removable {
|
||||||
|
debug_info!("typing timeout {user:?} in {room_id:?}");
|
||||||
|
room.remove(user);
|
||||||
|
}
|
||||||
|
|
||||||
let typing = &mut self.typing.write().await;
|
// update clients
|
||||||
let room = typing.entry(room_id.to_owned()).or_default();
|
self.last_typing_update
|
||||||
for user in &removable {
|
.write()
|
||||||
debug_info!("typing timeout {user:?} in {room_id:?}");
|
.await
|
||||||
room.remove(user);
|
.insert(room_id.to_owned(), self.services.globals.next_count()?);
|
||||||
}
|
|
||||||
|
|
||||||
// update clients
|
if self.typing_update_sender.send(room_id.to_owned()).is_err() {
|
||||||
self.last_typing_update
|
trace!("receiver found what it was looking for and is no longer interested");
|
||||||
.write()
|
}
|
||||||
.await
|
|
||||||
.insert(room_id.to_owned(), self.services.globals.next_count()?);
|
|
||||||
|
|
||||||
if self.typing_update_sender.send(room_id.to_owned()).is_err() {
|
// update appservices
|
||||||
trace!("receiver found what it was looking for and is no longer interested");
|
self.appservice_send(room_id).await?;
|
||||||
}
|
|
||||||
|
|
||||||
// update appservices
|
// update federation
|
||||||
self.appservice_send(room_id).await?;
|
for user in &removable {
|
||||||
|
if self.services.globals.user_is_local(user) {
|
||||||
// update federation
|
self.federation_send(room_id, user, false).await?;
|
||||||
for user in &removable {
|
}
|
||||||
if self.services.globals.user_is_local(user) {
|
|
||||||
self.federation_send(room_id, user, false).await?;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -192,17 +190,17 @@ impl Service {
|
||||||
.unwrap_or(0))
|
.unwrap_or(0))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a new typing EDU's content.
|
/// Returns a new typing EDU.
|
||||||
pub async fn typings_content(&self, room_id: &RoomId) -> TypingEventContent {
|
pub async fn typings_content(&self, room_id: &RoomId) -> Result<TypingEventContent> {
|
||||||
let room_typing_indicators = self.typing.read().await.get(room_id).cloned();
|
let room_typing_indicators = self.typing.read().await.get(room_id).cloned();
|
||||||
|
|
||||||
let Some(typing_indicators) = room_typing_indicators else {
|
let Some(typing_indicators) = room_typing_indicators else {
|
||||||
return TypingEventContent { user_ids: Vec::new() };
|
return Ok(TypingEventContent { user_ids: Vec::new() });
|
||||||
};
|
};
|
||||||
|
|
||||||
let user_ids: Vec<_> = typing_indicators.into_keys().collect();
|
let user_ids: Vec<_> = typing_indicators.into_keys().collect();
|
||||||
|
|
||||||
TypingEventContent { user_ids }
|
Ok(TypingEventContent { user_ids })
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns a new typing EDU.
|
/// Returns a new typing EDU.
|
||||||
|
@ -210,13 +208,13 @@ impl Service {
|
||||||
&self,
|
&self,
|
||||||
room_id: &RoomId,
|
room_id: &RoomId,
|
||||||
sender_user: &UserId,
|
sender_user: &UserId,
|
||||||
) -> SyncEphemeralRoomEvent<TypingEventContent> {
|
) -> Result<SyncEphemeralRoomEvent<TypingEventContent>> {
|
||||||
let room_typing_indicators = self.typing.read().await.get(room_id).cloned();
|
let room_typing_indicators = self.typing.read().await.get(room_id).cloned();
|
||||||
|
|
||||||
let Some(typing_indicators) = room_typing_indicators else {
|
let Some(typing_indicators) = room_typing_indicators else {
|
||||||
return SyncEphemeralRoomEvent {
|
return Ok(SyncEphemeralRoomEvent {
|
||||||
content: TypingEventContent { user_ids: Vec::new() },
|
content: TypingEventContent { user_ids: Vec::new() },
|
||||||
};
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
let user_ids: Vec<_> = typing_indicators
|
let user_ids: Vec<_> = typing_indicators
|
||||||
|
@ -233,7 +231,7 @@ impl Service {
|
||||||
.collect()
|
.collect()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
SyncEphemeralRoomEvent { content: TypingEventContent { user_ids } }
|
Ok(SyncEphemeralRoomEvent { content: TypingEventContent { user_ids } })
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn federation_send(
|
async fn federation_send(
|
||||||
|
@ -256,12 +254,14 @@ impl Service {
|
||||||
self.services
|
self.services
|
||||||
.sending
|
.sending
|
||||||
.send_edu_room(room_id, serde_json::to_vec(&edu).expect("Serialized Edu::Typing"))
|
.send_edu_room(room_id, serde_json::to_vec(&edu).expect("Serialized Edu::Typing"))
|
||||||
.await
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn appservice_send(&self, room_id: &RoomId) -> Result<()> {
|
async fn appservice_send(&self, room_id: &RoomId) -> Result<()> {
|
||||||
let edu = EphemeralData::Typing(EphemeralRoomEvent {
|
let edu = EphemeralData::Typing(EphemeralRoomEvent {
|
||||||
content: self.typings_content(room_id).await,
|
content: self.typings_content(room_id).await?,
|
||||||
room_id: room_id.into(),
|
room_id: room_id.into(),
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -271,6 +271,8 @@ impl Service {
|
||||||
room_id,
|
room_id,
|
||||||
serde_json::to_vec(&edu).expect("Serialized EphemeralData::Typing"),
|
serde_json::to_vec(&edu).expect("Serialized EphemeralData::Typing"),
|
||||||
)
|
)
|
||||||
.await
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -190,8 +190,8 @@ impl Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self, serialized), level = "debug")]
|
#[tracing::instrument(skip(self, serialized), level = "debug")]
|
||||||
pub fn send_edu_appservice(&self, appservice_id: &str, serialized: Vec<u8>) -> Result {
|
pub fn send_edu_appservice(&self, appservice_id: String, serialized: Vec<u8>) -> Result {
|
||||||
let dest = Destination::Appservice(appservice_id.to_owned());
|
let dest = Destination::Appservice(appservice_id);
|
||||||
let event = SendingEvent::Edu(serialized);
|
let event = SendingEvent::Edu(serialized);
|
||||||
let _cork = self.db.db.cork();
|
let _cork = self.db.db.cork();
|
||||||
let keys = self.db.queue_requests(once((&event, &dest)));
|
let keys = self.db.queue_requests(once((&event, &dest)));
|
||||||
|
@ -224,7 +224,7 @@ impl Service {
|
||||||
.appservice_in_room(room_id, appservice)
|
.appservice_in_room(room_id, appservice)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
self.send_edu_appservice(&appservice.registration.id, serialized.clone())?;
|
self.send_edu_appservice(appservice.registration.id.clone(), serialized.clone())?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue