This commit is contained in:
Jade Ellis 2025-04-23 21:28:46 +01:00
parent 0307238bf8
commit 6684449f27
No known key found for this signature in database
GPG key ID: 8705A2A3EBF77BD2

View file

@ -1,6 +1,6 @@
mod data; mod data;
use std::{collections::BTreeMap, sync::Arc}; use std::{collections::{BTreeMap, HashMap}, sync::Arc};
use conduwuit::{ use conduwuit::{
Result, debug, err, Result, debug, err,
@ -9,12 +9,9 @@ use conduwuit::{
}; };
use futures::{Stream, TryFutureExt, try_join}; use futures::{Stream, TryFutureExt, try_join};
use ruma::{ use ruma::{
OwnedEventId, OwnedUserId, RoomId, UserId,
events::{ events::{
AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent, receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType, Receipts}, AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent
receipt::{ReceiptEvent, ReceiptEventContent, Receipts}, }, serde::Raw, OwnedEventId, OwnedUserId, RoomId, UserId
},
serde::Raw,
}; };
use self::data::{Data, ReceiptItem}; use self::data::{Data, ReceiptItem};
@ -47,19 +44,48 @@ impl crate::Service for Service {
} }
impl Service { impl Service {
/// Replaces the previous read receipt. /// Updates the public read receipt (`m.read`) based on the incoming event.
/// If the event referenced by the new public receipt is newer than the current
/// private read marker (`m.read.private`), the private marker is also updated
/// to match the public receipt's position.
pub async fn readreceipt_update( pub async fn readreceipt_update(
&self, &self,
user_id: &UserId, user_id: &UserId,
room_id: &RoomId, room_id: &RoomId,
event: &ReceiptEvent, event: &ReceiptEvent,
) { ) {
self.db.readreceipt_update(user_id, room_id, event).await; debug!(target: "readreceipt", %room_id, %user_id, "Updating read receipt in database.");
self.services
.sending // 2. Find the maximum PDU count for the m.read event(s) referenced in the new receipt
.flush_room(room_id) let mut max_new_public_pdu_count: Option<PduCount> = None;
.await for (event_id, receipts) in event.content.0.iter() {
.expect("room flush failed"); // Check if this event_id has an m.read receipt for the target user
if let Some(user_receipts) = receipts.get(&ReceiptType::Read) {
if user_receipts.contains_key(user_id) {
// Try to get the PDU count (timeline position) for this event_id
match self.services.timeline.get_pdu_count(event_id).await {
Ok(count) => {
// Update the maximum count found so far
let current_max = max_new_public_pdu_count.unwrap_or(PduCount::Normal(0));
max_new_public_pdu_count = Some(current_max.max(count));
debug!(target: "readreceipt", %room_id, %user_id, %event_id, count, "Found PDU count for new public receipt event.");
}
Err(e) => {
warn!(
target: "readreceipt", %room_id, %user_id, %event_id,
"Failed to get PDU count for event ID from new public read receipt: {}",
e
);
}
}
}
}
}
// Flush the sending queue for the room to notify clients
if let Err(e) = self.services.sending.flush_room(room_id).await {
warn!(target: "readreceipt", %room_id, %user_id, "Failed to flush room after read receipt update: {}", e);
}
} }
/// Gets the latest private read receipt from the user in the room /// Gets the latest private read receipt from the user in the room