Compare commits
1 commit
alpine-pac
...
jade/read-
Author | SHA1 | Date | |
---|---|---|---|
|
6684449f27 |
1 changed files with 39 additions and 13 deletions
|
@ -1,6 +1,6 @@
|
|||
mod data;
|
||||
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
use std::{collections::{BTreeMap, HashMap}, sync::Arc};
|
||||
|
||||
use conduwuit::{
|
||||
Result, debug, err,
|
||||
|
@ -9,12 +9,9 @@ use conduwuit::{
|
|||
};
|
||||
use futures::{Stream, TryFutureExt, try_join};
|
||||
use ruma::{
|
||||
OwnedEventId, OwnedUserId, RoomId, UserId,
|
||||
events::{
|
||||
AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent,
|
||||
receipt::{ReceiptEvent, ReceiptEventContent, Receipts},
|
||||
},
|
||||
serde::Raw,
|
||||
receipt::{ReceiptEvent, ReceiptEventContent, ReceiptType, Receipts}, AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent
|
||||
}, serde::Raw, OwnedEventId, OwnedUserId, RoomId, UserId
|
||||
};
|
||||
|
||||
use self::data::{Data, ReceiptItem};
|
||||
|
@ -47,19 +44,48 @@ impl crate::Service for Service {
|
|||
}
|
||||
|
||||
impl Service {
|
||||
/// Replaces the previous read receipt.
|
||||
/// Updates the public read receipt (`m.read`) based on the incoming event.
|
||||
/// If the event referenced by the new public receipt is newer than the current
|
||||
/// private read marker (`m.read.private`), the private marker is also updated
|
||||
/// to match the public receipt's position.
|
||||
pub async fn readreceipt_update(
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
room_id: &RoomId,
|
||||
event: &ReceiptEvent,
|
||||
) {
|
||||
self.db.readreceipt_update(user_id, room_id, event).await;
|
||||
self.services
|
||||
.sending
|
||||
.flush_room(room_id)
|
||||
.await
|
||||
.expect("room flush failed");
|
||||
debug!(target: "readreceipt", %room_id, %user_id, "Updating read receipt in database.");
|
||||
|
||||
// 2. Find the maximum PDU count for the m.read event(s) referenced in the new receipt
|
||||
let mut max_new_public_pdu_count: Option<PduCount> = None;
|
||||
for (event_id, receipts) in event.content.0.iter() {
|
||||
// Check if this event_id has an m.read receipt for the target user
|
||||
if let Some(user_receipts) = receipts.get(&ReceiptType::Read) {
|
||||
if user_receipts.contains_key(user_id) {
|
||||
// Try to get the PDU count (timeline position) for this event_id
|
||||
match self.services.timeline.get_pdu_count(event_id).await {
|
||||
Ok(count) => {
|
||||
// Update the maximum count found so far
|
||||
let current_max = max_new_public_pdu_count.unwrap_or(PduCount::Normal(0));
|
||||
max_new_public_pdu_count = Some(current_max.max(count));
|
||||
debug!(target: "readreceipt", %room_id, %user_id, %event_id, count, "Found PDU count for new public receipt event.");
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
target: "readreceipt", %room_id, %user_id, %event_id,
|
||||
"Failed to get PDU count for event ID from new public read receipt: {}",
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Flush the sending queue for the room to notify clients
|
||||
if let Err(e) = self.services.sending.flush_room(room_id).await {
|
||||
warn!(target: "readreceipt", %room_id, %user_id, "Failed to flush room after read receipt update: {}", e);
|
||||
}
|
||||
}
|
||||
|
||||
/// Gets the latest private read receipt from the user in the room
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue