syncv3: support per room account data

This commit is contained in:
morguldir 2024-08-30 10:31:08 +02:00
parent 36b8de1339
commit 77c0c13a83
No known key found for this signature in database
GPG key ID: 5A6025D4F6E7A8A3
5 changed files with 91 additions and 67 deletions

View file

@ -25,9 +25,16 @@ use ruma::{
DeviceLists, UnreadNotificationsCount,
},
uiaa::UiaaResponse,
}, events::{
presence::PresenceEvent, room::member::{MembershipState, RoomMemberEventContent}, StateEventType, TimelineEventType
}, room::RoomType, serde::Raw, state_res::Event, uint, DeviceId, EventId, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedUserId, RoomId, UInt, UserId
},
events::{
presence::PresenceEvent,
room::member::{MembershipState, RoomMemberEventContent},
AnyRawAccountDataEvent, StateEventType, TimelineEventType,
},
room::RoomType,
serde::Raw,
state_res::Event,
uint, DeviceId, EventId, MilliSecondsSinceUnixEpoch, OwnedRoomId, OwnedUserId, RoomId, UInt, UserId,
};
use tracing::{Instrument as _, Span};
@ -46,6 +53,15 @@ const DEFAULT_BUMP_TYPES: &[TimelineEventType] = &[
TimelineEventType::Beacon,
];
macro_rules! extract_variant {
($e:expr, $variant:path) => {
match $e {
$variant(value) => Some(value),
_ => None,
}
};
}
/// # `GET /_matrix/client/r0/sync`
///
/// Synchronize the client's state with the latest state on the server.
@ -284,11 +300,7 @@ pub(crate) async fn sync_events_route(
.account_data
.changes_since(None, &sender_user, since)?
.into_iter()
.filter_map(|(_, v)| {
serde_json::from_str(v.json().get())
.map_err(|_| Error::bad_database("Invalid account event in database."))
.ok()
})
.filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
.collect(),
},
device_lists: DeviceLists {
@ -981,11 +993,7 @@ async fn load_joined_room(
.account_data
.changes_since(Some(room_id), sender_user, since)?
.into_iter()
.filter_map(|(_, v)| {
serde_json::from_str(v.json().get())
.map_err(|_| Error::bad_database("Invalid account event in database."))
.ok()
})
.filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect(),
},
summary: RoomSummary {
@ -1160,6 +1168,33 @@ pub(crate) async fn sync_events_v4_route(
let mut device_list_changes = HashSet::new();
let mut device_list_left = HashSet::new();
let mut account_data = sync_events::v4::AccountData {
global: Vec::new(),
rooms: BTreeMap::new(),
};
if body.extensions.account_data.enabled.unwrap_or(false) {
account_data.global = services
.account_data
.changes_since(None, &sender_user, globalsince)?
.into_iter()
.filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global))
.collect();
if let Some(rooms) = body.extensions.account_data.rooms {
for room in rooms {
account_data.rooms.insert(
room.clone(),
services
.account_data
.changes_since(Some(&room), &sender_user, globalsince)?
.into_iter()
.filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect(),
);
}
}
}
if body.extensions.e2ee.enabled.unwrap_or(false) {
// Look for device list updates of this account
device_list_changes.extend(
@ -1464,7 +1499,17 @@ pub(crate) async fn sync_events_v4_route(
let (timeline_pdus, limited) =
load_timeline(&services, &sender_user, room_id, roomsincecount, *timeline_limit)?;
if roomsince != &0 && timeline_pdus.is_empty() {
account_data.rooms.insert(
room_id.clone(),
services
.account_data
.changes_since(Some(room_id), &sender_user, *roomsince)?
.into_iter()
.filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room))
.collect(),
);
if roomsince != &0 && timeline_pdus.is_empty() && account_data.rooms.get(room_id).is_some_and(Vec::is_empty) {
continue;
}
@ -1504,7 +1549,7 @@ pub(crate) async fn sync_events_v4_route(
let mut timestamp: Option<_> = None;
for (_, pdu) in timeline_pdus {
timestamp = Some(MilliSecondsSinceUnixEpoch(pdu.origin_server_ts));
timestamp = Some(MilliSecondsSinceUnixEpoch(pdu.origin_server_ts));
if DEFAULT_BUMP_TYPES.contains(pdu.event_type()) {
break;
}
@ -1677,23 +1722,7 @@ pub(crate) async fn sync_events_v4_route(
// Fallback keys are not yet supported
device_unused_fallback_key_types: None,
},
account_data: sync_events::v4::AccountData {
global: if body.extensions.account_data.enabled.unwrap_or(false) {
services
.account_data
.changes_since(None, &sender_user, globalsince)?
.into_iter()
.filter_map(|(_, v)| {
serde_json::from_str(v.json().get())
.map_err(|_| Error::bad_database("Invalid account event in database."))
.ok()
})
.collect()
} else {
Vec::new()
},
rooms: BTreeMap::new(),
},
account_data,
receipts: sync_events::v4::Receipts {
rooms: BTreeMap::new(),
},