Merge branch 'slidingfixes' into 'next'

Better sliding sync

See merge request famedly/conduit!511
This commit is contained in:
Timo Kösters 2023-07-24 08:48:27 +00:00
commit 90a10c84ef
4 changed files with 609 additions and 29 deletions

View file

@ -23,7 +23,7 @@ use ruma::{
uint, DeviceId, OwnedDeviceId, OwnedUserId, RoomId, UInt, UserId,
};
use std::{
collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet},
sync::Arc,
time::Duration,
};
@ -463,7 +463,7 @@ async fn sync_helper(
}
for user_id in left_encrypted_users {
let still_share_encrypted_room = services()
let dont_share_encrypted_room = services()
.rooms
.user
.get_shared_rooms(vec![sender_user.clone(), user_id.clone()])?
@ -481,7 +481,7 @@ async fn sync_helper(
.all(|encrypted| !encrypted);
// If the user doesn't share an encrypted room with the target anymore, we need to tell
// them
if still_share_encrypted_room {
if dont_share_encrypted_room {
device_list_left.insert(user_id);
}
}
@ -1174,8 +1174,7 @@ pub async fn sync_events_v4_route(
) -> Result<sync_events::v4::Response, RumaResponse<UiaaResponse>> {
let sender_user = body.sender_user.expect("user is authenticated");
let sender_device = body.sender_device.expect("user is authenticated");
let body = dbg!(body.body);
let mut body = dbg!(body.body);
// Setup watchers, so if there's no response, we can wait for them
let watcher = services().globals.watch(&sender_user, &sender_device);
@ -1188,7 +1187,22 @@ pub async fn sync_events_v4_route(
.unwrap_or(0);
let sincecount = PduCount::Normal(since);
let initial = since == 0;
if since == 0 {
if let Some(conn_id) = &body.conn_id {
services().users.forget_sync_request_connection(
sender_user.clone(),
sender_device.clone(),
conn_id.clone(),
)
}
}
// Get sticky parameters from cache
let known_rooms = services().users.update_sync_request_with_cache(
sender_user.clone(),
sender_device.clone(),
&mut body,
);
let all_joined_rooms = services()
.rooms
@ -1197,6 +1211,195 @@ pub async fn sync_events_v4_route(
.filter_map(|r| r.ok())
.collect::<Vec<_>>();
if body.extensions.to_device.enabled.unwrap_or(false) {
services()
.users
.remove_to_device_events(&sender_user, &sender_device, since)?;
}
let mut left_encrypted_users = HashSet::new(); // Users that have left any encrypted rooms the sender was in
let mut device_list_changes = HashSet::new();
let mut device_list_left = HashSet::new();
if body.extensions.e2ee.enabled.unwrap_or(false) {
// Look for device list updates of this account
device_list_changes.extend(
services()
.users
.keys_changed(sender_user.as_ref(), since, None)
.filter_map(|r| r.ok()),
);
for room_id in &all_joined_rooms {
let current_shortstatehash =
if let Some(s) = services().rooms.state.get_room_shortstatehash(&room_id)? {
s
} else {
error!("Room {} has no state", room_id);
continue;
};
let since_shortstatehash = services()
.rooms
.user
.get_token_shortstatehash(&room_id, since)?;
let since_sender_member: Option<RoomMemberEventContent> = since_shortstatehash
.and_then(|shortstatehash| {
services()
.rooms
.state_accessor
.state_get(
shortstatehash,
&StateEventType::RoomMember,
sender_user.as_str(),
)
.transpose()
})
.transpose()?
.and_then(|pdu| {
serde_json::from_str(pdu.content.get())
.map_err(|_| Error::bad_database("Invalid PDU in database."))
.ok()
});
let encrypted_room = services()
.rooms
.state_accessor
.state_get(current_shortstatehash, &StateEventType::RoomEncryption, "")?
.is_some();
if let Some(since_shortstatehash) = since_shortstatehash {
// Skip if there are only timeline changes
if since_shortstatehash == current_shortstatehash {
continue;
}
let since_encryption = services().rooms.state_accessor.state_get(
since_shortstatehash,
&StateEventType::RoomEncryption,
"",
)?;
let joined_since_last_sync = since_sender_member
.map_or(true, |member| member.membership != MembershipState::Join);
let new_encrypted_room = encrypted_room && since_encryption.is_none();
if encrypted_room {
let current_state_ids = services()
.rooms
.state_accessor
.state_full_ids(current_shortstatehash)
.await?;
let since_state_ids = services()
.rooms
.state_accessor
.state_full_ids(since_shortstatehash)
.await?;
for (key, id) in current_state_ids {
if since_state_ids.get(&key) != Some(&id) {
let pdu = match services().rooms.timeline.get_pdu(&id)? {
Some(pdu) => pdu,
None => {
error!("Pdu in state not found: {}", id);
continue;
}
};
if pdu.kind == TimelineEventType::RoomMember {
if let Some(state_key) = &pdu.state_key {
let user_id =
UserId::parse(state_key.clone()).map_err(|_| {
Error::bad_database("Invalid UserId in member PDU.")
})?;
if user_id == sender_user {
continue;
}
let new_membership = serde_json::from_str::<
RoomMemberEventContent,
>(
pdu.content.get()
)
.map_err(|_| Error::bad_database("Invalid PDU in database."))?
.membership;
match new_membership {
MembershipState::Join => {
// A new user joined an encrypted room
if !share_encrypted_room(
&sender_user,
&user_id,
&room_id,
)? {
device_list_changes.insert(user_id);
}
}
MembershipState::Leave => {
// Write down users that have left encrypted rooms we are in
left_encrypted_users.insert(user_id);
}
_ => {}
}
}
}
}
}
if joined_since_last_sync || new_encrypted_room {
// If the user is in a new encrypted room, give them all joined users
device_list_changes.extend(
services()
.rooms
.state_cache
.room_members(&room_id)
.flatten()
.filter(|user_id| {
// Don't send key updates from the sender to the sender
&sender_user != user_id
})
.filter(|user_id| {
// Only send keys if the sender doesn't share an encrypted room with the target already
!share_encrypted_room(&sender_user, user_id, &room_id)
.unwrap_or(false)
}),
);
}
}
}
// Look for device list updates in this room
device_list_changes.extend(
services()
.users
.keys_changed(room_id.as_ref(), since, None)
.filter_map(|r| r.ok()),
);
}
for user_id in left_encrypted_users {
let dont_share_encrypted_room = services()
.rooms
.user
.get_shared_rooms(vec![sender_user.clone(), user_id.clone()])?
.filter_map(|r| r.ok())
.filter_map(|other_room_id| {
Some(
services()
.rooms
.state_accessor
.room_state_get(&other_room_id, &StateEventType::RoomEncryption, "")
.ok()?
.is_some(),
)
})
.all(|encrypted| !encrypted);
// If the user doesn't share an encrypted room with the target anymore, we need to tell
// them
if dont_share_encrypted_room {
device_list_left.insert(user_id);
}
}
}
let mut lists = BTreeMap::new();
let mut todo_rooms = BTreeMap::new(); // and required state
@ -1205,8 +1408,10 @@ pub async fn sync_events_v4_route(
continue;
}
let mut new_known_rooms = BTreeMap::new();
lists.insert(
list_id,
list_id.clone(),
sync_events::v4::SyncList {
ops: list
.ranges
@ -1219,14 +1424,27 @@ pub async fn sync_events_v4_route(
let room_ids = all_joined_rooms
[(u64::from(r.0) as usize)..=(u64::from(r.1) as usize)]
.to_vec();
todo_rooms.extend(room_ids.iter().cloned().map(|r| {
new_known_rooms.extend(room_ids.iter().cloned().map(|r| (r, true)));
for room_id in &room_ids {
let todo_room = todo_rooms.entry(room_id.clone()).or_insert((
BTreeSet::new(),
0,
true,
));
let limit = list
.room_details
.timeline_limit
.map_or(10, u64::from)
.min(100);
(r, (list.room_details.required_state.clone(), limit))
}));
todo_room
.0
.extend(list.room_details.required_state.iter().cloned());
todo_room.1 = todo_room.1.max(limit);
if known_rooms.get(&list_id).and_then(|k| k.get(room_id)) != Some(&true)
{
todo_room.2 = false;
}
}
sync_events::v4::SyncOp {
op: SlidingOp::Sync,
range: Some(r.clone()),
@ -1239,12 +1457,69 @@ pub async fn sync_events_v4_route(
count: UInt::from(all_joined_rooms.len() as u32),
},
);
if let Some(conn_id) = &body.conn_id {
services().users.update_sync_known_rooms(
sender_user.clone(),
sender_device.clone(),
conn_id.clone(),
list_id,
new_known_rooms,
);
}
}
let mut known_subscription_rooms = BTreeMap::new();
for (room_id, room) in dbg!(&body.room_subscriptions) {
let todo_room = todo_rooms
.entry(room_id.clone())
.or_insert((BTreeSet::new(), 0, true));
let limit = room.timeline_limit.map_or(10, u64::from).min(100);
todo_room.0.extend(room.required_state.iter().cloned());
todo_room.1 = todo_room.1.max(limit);
if known_rooms
.get("subscriptions")
.and_then(|k| k.get(room_id))
!= Some(&true)
{
todo_room.2 = false;
}
known_subscription_rooms.insert(room_id.clone(), true);
}
for r in body.unsubscribe_rooms {
known_subscription_rooms.remove(&r);
body.room_subscriptions.remove(&r);
}
if let Some(conn_id) = &body.conn_id {
services().users.update_sync_known_rooms(
sender_user.clone(),
sender_device.clone(),
conn_id.clone(),
"subscriptions".to_owned(),
known_subscription_rooms,
);
}
if let Some(conn_id) = &body.conn_id {
services().users.update_sync_subscriptions(
sender_user.clone(),
sender_device.clone(),
conn_id.clone(),
body.room_subscriptions,
);
}
let mut rooms = BTreeMap::new();
for (room_id, (required_state_request, timeline_limit)) in todo_rooms {
for (room_id, (required_state_request, timeline_limit, known)) in &todo_rooms {
// TODO: per-room sync tokens
let (timeline_pdus, limited) =
load_timeline(&sender_user, &room_id, sincecount, timeline_limit)?;
load_timeline(&sender_user, &room_id, sincecount, *timeline_limit)?;
if *known && timeline_pdus.is_empty() {
continue;
}
let prev_batch = timeline_pdus
.first()
@ -1256,7 +1531,14 @@ pub async fn sync_events_v4_route(
}
PduCount::Normal(c) => c.to_string(),
}))
})?;
})?
.or_else(|| {
if since != 0 {
Some(since.to_string())
} else {
None
}
});
let room_events: Vec<_> = timeline_pdus
.iter()
@ -1279,13 +1561,60 @@ pub async fn sync_events_v4_route(
rooms.insert(
room_id.clone(),
sync_events::v4::SlidingSyncRoom {
name: services().rooms.state_accessor.get_name(&room_id)?,
initial: Some(initial),
name: services()
.rooms
.state_accessor
.get_name(&room_id)?
.or_else(|| {
// Heroes
let mut names = services()
.rooms
.state_cache
.room_members(&room_id)
.filter_map(|r| r.ok())
.filter(|member| member != &sender_user)
.map(|member| {
Ok::<_, Error>(
services()
.rooms
.state_accessor
.get_member(&room_id, &member)?
.and_then(|memberevent| memberevent.displayname)
.unwrap_or(member.to_string()),
)
})
.filter_map(|r| r.ok())
.take(5)
.collect::<Vec<_>>();
if names.len() > 1 {
let last = names.pop().unwrap();
Some(names.join(", ") + " and " + &last)
} else if names.len() == 1 {
Some(names.pop().unwrap())
} else {
None
}
}),
initial: Some(!known),
is_dm: None,
invite_state: None,
unread_notifications: UnreadNotificationsCount {
highlight_count: None,
notification_count: None,
highlight_count: Some(
services()
.rooms
.user
.highlight_count(&sender_user, &room_id)?
.try_into()
.expect("notification count can't go that high"),
),
notification_count: Some(
services()
.rooms
.user
.notification_count(&sender_user, &room_id)?
.try_into()
.expect("notification count can't go that high"),
),
},
timeline: room_events,
required_state,
@ -1307,7 +1636,7 @@ pub async fn sync_events_v4_route(
.unwrap_or(0) as u32)
.into(),
),
num_live: None,
num_live: None, // Count events in timeline greater than global sync counter
},
);
}
@ -1326,23 +1655,50 @@ pub async fn sync_events_v4_route(
}
Ok(dbg!(sync_events::v4::Response {
initial: initial,
initial: since == 0,
txn_id: body.txn_id.clone(),
pos: next_batch.to_string(),
lists,
rooms,
extensions: sync_events::v4::Extensions {
to_device: None,
to_device: if body.extensions.to_device.enabled.unwrap_or(false) {
Some(sync_events::v4::ToDevice {
events: services()
.users
.get_to_device_events(&sender_user, &sender_device)?,
next_batch: next_batch.to_string(),
})
} else {
None
},
e2ee: sync_events::v4::E2EE {
device_lists: DeviceLists {
changed: Vec::new(),
left: Vec::new(),
changed: device_list_changes.into_iter().collect(),
left: device_list_left.into_iter().collect(),
},
device_one_time_keys_count: BTreeMap::new(),
device_one_time_keys_count: services()
.users
.count_one_time_keys(&sender_user, &sender_device)?,
// Fallback keys are not yet supported
device_unused_fallback_key_types: None,
},
account_data: sync_events::v4::AccountData {
global: Vec::new(),
global: if body.extensions.account_data.enabled.unwrap_or(false) {
services()
.account_data
.changes_since(None, &sender_user, since)?
.into_iter()
.filter_map(|(_, v)| {
serde_json::from_str(v.json().get())
.map_err(|_| {
Error::bad_database("Invalid account event in database.")
})
.ok()
})
.collect()
} else {
Vec::new()
},
rooms: BTreeMap::new(),
},
receipts: sync_events::v4::Receipts {