From 6684449f275d09fc3d571d39e3ebd8b39fa965fb Mon Sep 17 00:00:00 2001 From: Jade Ellis Date: Wed, 23 Apr 2025 21:28:46 +0100 Subject: [PATCH] WIP --- src/service/rooms/read_receipt/mod.rs | 52 ++++++++++++++++++++------- 1 file changed, 39 insertions(+), 13 deletions(-) diff --git a/src/service/rooms/read_receipt/mod.rs b/src/service/rooms/read_receipt/mod.rs index 69e859c4..01321571 100644 --- a/src/service/rooms/read_receipt/mod.rs +++ b/src/service/rooms/read_receipt/mod.rs @@ -1,6 +1,6 @@ mod data; -use std::{collections::BTreeMap, sync::Arc}; +use std::{collections::{BTreeMap, HashMap}, sync::Arc}; use conduwuit::{ Result, debug, err, @@ -9,12 +9,9 @@ use conduwuit::{ }; use futures::{Stream, TryFutureExt, try_join}; use ruma::{ - OwnedEventId, OwnedUserId, RoomId, UserId, events::{ - AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent, - receipt::{ReceiptEvent, ReceiptEventContent, Receipts}, - }, - serde::Raw, + receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType, Receipts}, AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent + }, serde::Raw, OwnedEventId, OwnedUserId, RoomId, UserId }; use self::data::{Data, ReceiptItem}; @@ -47,19 +44,48 @@ impl crate::Service for Service { } impl Service { - /// Replaces the previous read receipt. + /// Updates the public read receipt (`m.read`) based on the incoming event. + /// If the event referenced by the new public receipt is newer than the current + /// private read marker (`m.read.private`), the private marker is also updated + /// to match the public receipt's position. pub async fn readreceipt_update( &self, user_id: &UserId, room_id: &RoomId, event: &ReceiptEvent, ) { - self.db.readreceipt_update(user_id, room_id, event).await; - self.services - .sending - .flush_room(room_id) - .await - .expect("room flush failed"); + debug!(target: "readreceipt", %room_id, %user_id, "Updating read receipt in database."); + + // 2. Find the maximum PDU count for the m.read event(s) referenced in the new receipt + let mut max_new_public_pdu_count: Option = None; + for (event_id, receipts) in event.content.0.iter() { + // Check if this event_id has an m.read receipt for the target user + if let Some(user_receipts) = receipts.get(&ReceiptType::Read) { + if user_receipts.contains_key(user_id) { + // Try to get the PDU count (timeline position) for this event_id + match self.services.timeline.get_pdu_count(event_id).await { + Ok(count) => { + // Update the maximum count found so far + let current_max = max_new_public_pdu_count.unwrap_or(PduCount::Normal(0)); + max_new_public_pdu_count = Some(current_max.max(count)); + debug!(target: "readreceipt", %room_id, %user_id, %event_id, count, "Found PDU count for new public receipt event."); + } + Err(e) => { + warn!( + target: "readreceipt", %room_id, %user_id, %event_id, + "Failed to get PDU count for event ID from new public read receipt: {}", + e + ); + } + } + } + } + } + + // Flush the sending queue for the room to notify clients + if let Err(e) = self.services.sending.flush_room(room_id).await { + warn!(target: "readreceipt", %room_id, %user_id, "Failed to flush room after read receipt update: {}", e); + } } /// Gets the latest private read receipt from the user in the room