diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4a4df488..3ccbf5d9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -733,7 +733,7 @@ jobs: docker: name: Docker publish runs-on: ubuntu-24.04 - needs: [build, variables] + needs: [build, variables, tests] permissions: packages: write contents: read diff --git a/flake.nix b/flake.nix index d8ad47a8..fb40cae7 100644 --- a/flake.nix +++ b/flake.nix @@ -191,27 +191,59 @@ in { packages = { - default = scopeHost.main; - default-debug = scopeHost.main.override { - profile = "dev"; - # debug build users expect full logs - disable_release_max_log_level = true; - }; - default-test = scopeHost.main.override { - profile = "test"; - disable_release_max_log_level = true; - }; - all-features = scopeHost.main.override { - all_features = true; + default = scopeHost.main.override { disable_features = [ - # this is non-functional on nix for some reason - "hardened_malloc" # dont include experimental features "experimental" # jemalloc profiling/stats features are expensive and shouldn't # be expected on non-debug builds. "jemalloc_prof" "jemalloc_stats" + # this is non-functional on nix for some reason + "hardened_malloc" + # conduwuit_mods is a development-only hot reload feature + "conduwuit_mods" + ]; + }; + default-debug = scopeHost.main.override { + profile = "dev"; + # debug build users expect full logs + disable_release_max_log_level = true; + disable_features = [ + # dont include experimental features + "experimental" + # this is non-functional on nix for some reason + "hardened_malloc" + # conduwuit_mods is a development-only hot reload feature + "conduwuit_mods" + ]; + }; + # just a test profile used for things like CI and complement + default-test = scopeHost.main.override { + profile = "test"; + disable_release_max_log_level = true; + disable_features = [ + # dont include experimental features + "experimental" + # this is non-functional on nix for some reason + "hardened_malloc" + # conduwuit_mods is a development-only hot reload feature + "conduwuit_mods" + ]; + }; + all-features = scopeHost.main.override { + all_features = true; + disable_features = [ + # dont include experimental features + "experimental" + # jemalloc profiling/stats features are expensive and shouldn't + # be expected on non-debug builds. + "jemalloc_prof" + "jemalloc_stats" + # this is non-functional on nix for some reason + "hardened_malloc" + # conduwuit_mods is a development-only hot reload feature + "conduwuit_mods" ]; }; all-features-debug = scopeHost.main.override { @@ -220,10 +252,12 @@ # debug build users expect full logs disable_release_max_log_level = true; disable_features = [ - # this is non-functional on nix for some reason - "hardened_malloc" # dont include experimental features "experimental" + # this is non-functional on nix for some reason + "hardened_malloc" + # conduwuit_mods is a development-only hot reload feature + "conduwuit_mods" ]; }; hmalloc = scopeHost.main.override { features = ["hardened_malloc"]; }; @@ -233,14 +267,16 @@ main = scopeHost.main.override { all_features = true; disable_features = [ - # this is non-functional on nix for some reason - "hardened_malloc" # dont include experimental features "experimental" # jemalloc profiling/stats features are expensive and shouldn't # be expected on non-debug builds. "jemalloc_prof" "jemalloc_stats" + # this is non-functional on nix for some reason + "hardened_malloc" + # conduwuit_mods is a development-only hot reload feature + "conduwuit_mods" ]; }; }; @@ -251,10 +287,12 @@ # debug build users expect full logs disable_release_max_log_level = true; disable_features = [ - # this is non-functional on nix for some reason - "hardened_malloc" # dont include experimental features "experimental" + # this is non-functional on nix for some reason + "hardened_malloc" + # conduwuit_mods is a development-only hot reload feature + "conduwuit_mods" ]; }; }; @@ -313,6 +351,14 @@ value = scopeCrossStatic.main.override { profile = "test"; disable_release_max_log_level = true; + disable_features = [ + # dont include experimental features + "experimental" + # this is non-functional on nix for some reason + "hardened_malloc" + # conduwuit_mods is a development-only hot reload feature + "conduwuit_mods" + ]; }; } @@ -322,14 +368,16 @@ value = scopeCrossStatic.main.override { all_features = true; disable_features = [ - # this is non-functional on nix for some reason - "hardened_malloc" # dont include experimental features "experimental" # jemalloc profiling/stats features are expensive and shouldn't # be expected on non-debug builds. "jemalloc_prof" "jemalloc_stats" + # this is non-functional on nix for some reason + "hardened_malloc" + # conduwuit_mods is a development-only hot reload feature + "conduwuit_mods" ]; }; } @@ -341,14 +389,16 @@ value = scopeCrossStatic.main.override { all_features = true; disable_features = [ - # this is non-functional on nix for some reason - "hardened_malloc" # dont include experimental features "experimental" # jemalloc profiling/stats features are expensive and shouldn't # be expected on non-debug builds. "jemalloc_prof" "jemalloc_stats" + # this is non-functional on nix for some reason + "hardened_malloc" + # conduwuit_mods is a development-only hot reload feature + "conduwuit_mods" ]; x86_64_haswell_target_optimised = (if (crossSystem == "x86_64-linux-gnu" || crossSystem == "x86_64-linux-musl") then true else false); }; @@ -363,10 +413,12 @@ # debug build users expect full logs disable_release_max_log_level = true; disable_features = [ - # this is non-functional on nix for some reason - "hardened_malloc" # dont include experimental features "experimental" + # this is non-functional on nix for some reason + "hardened_malloc" + # conduwuit_mods is a development-only hot reload feature + "conduwuit_mods" ]; }; } @@ -415,14 +467,16 @@ main = scopeCrossStatic.main.override { all_features = true; disable_features = [ - # this is non-functional on nix for some reason - "hardened_malloc" - # dont include experimental features - "experimental" - # jemalloc profiling/stats features are expensive and shouldn't - # be expected on non-debug builds. - "jemalloc_prof" - "jemalloc_stats" + # dont include experimental features + "experimental" + # jemalloc profiling/stats features are expensive and shouldn't + # be expected on non-debug builds. + "jemalloc_prof" + "jemalloc_stats" + # this is non-functional on nix for some reason + "hardened_malloc" + # conduwuit_mods is a development-only hot reload feature + "conduwuit_mods" ]; }; }; @@ -436,14 +490,16 @@ main = scopeCrossStatic.main.override { all_features = true; disable_features = [ - # this is non-functional on nix for some reason - "hardened_malloc" - # dont include experimental features - "experimental" - # jemalloc profiling/stats features are expensive and shouldn't - # be expected on non-debug builds. - "jemalloc_prof" - "jemalloc_stats" + # dont include experimental features + "experimental" + # jemalloc profiling/stats features are expensive and shouldn't + # be expected on non-debug builds. + "jemalloc_prof" + "jemalloc_stats" + # this is non-functional on nix for some reason + "hardened_malloc" + # conduwuit_mods is a development-only hot reload feature + "conduwuit_mods" ]; x86_64_haswell_target_optimised = (if (crossSystem == "x86_64-linux-gnu" || crossSystem == "x86_64-linux-musl") then true else false); }; @@ -460,10 +516,12 @@ # debug build users expect full logs disable_release_max_log_level = true; disable_features = [ - # this is non-functional on nix for some reason - "hardened_malloc" - # dont include experimental features - "experimental" + # dont include experimental features + "experimental" + # this is non-functional on nix for some reason + "hardened_malloc" + # conduwuit_mods is a development-only hot reload feature + "conduwuit_mods" ]; }; }; @@ -502,14 +560,16 @@ main = prev.main.override { all_features = true; disable_features = [ - # this is non-functional on nix for some reason - "hardened_malloc" # dont include experimental features "experimental" # jemalloc profiling/stats features are expensive and shouldn't # be expected on non-debug builds. "jemalloc_prof" "jemalloc_stats" + # this is non-functional on nix for some reason + "hardened_malloc" + # conduwuit_mods is a development-only hot reload feature + "conduwuit_mods" ]; }; })); diff --git a/nix/pkgs/complement/default.nix b/nix/pkgs/complement/default.nix index 36f12400..e35cbf04 100644 --- a/nix/pkgs/complement/default.nix +++ b/nix/pkgs/complement/default.nix @@ -20,6 +20,8 @@ let disable_features = [ # no reason to use jemalloc for complement, just has compatibility/build issues "jemalloc" + "jemalloc_stats" + "jemalloc_prof" # console/CLI stuff isn't used or relevant for complement "console" "tokio_console" @@ -32,6 +34,14 @@ let "hardened_malloc" # dont include experimental features "experimental" + # compression isn't needed for complement + "brotli_compression" + "gzip_compression" + "zstd_compression" + # complement doesn't need hot reloading + "conduwuit_mods" + # complement doesn't have URL preview media tests + "url_preview" ]; }; diff --git a/nix/pkgs/main/default.nix b/nix/pkgs/main/default.nix index a785e7f2..d7424d11 100644 --- a/nix/pkgs/main/default.nix +++ b/nix/pkgs/main/default.nix @@ -15,7 +15,19 @@ # Options (keep sorted) , all_features ? false , default_features ? true -, disable_features ? [] +# default list of disabled features +, disable_features ? [ + # dont include experimental features + "experimental" + # jemalloc profiling/stats features are expensive and shouldn't + # be expected on non-debug builds. + "jemalloc_prof" + "jemalloc_stats" + # this is non-functional on nix for some reason + "hardened_malloc" + # conduwuit_mods is a development-only hot reload feature + "conduwuit_mods" +] , disable_release_max_log_level ? false , features ? [] , profile ? "release" diff --git a/src/api/client/sync/mod.rs b/src/api/client/sync/mod.rs index 79e4b1ca..1967f4a2 100644 --- a/src/api/client/sync/mod.rs +++ b/src/api/client/sync/mod.rs @@ -1,16 +1,31 @@ mod v3; mod v4; +mod v5; use conduwuit::{ - utils::stream::{BroadbandExt, ReadyExt, TryIgnore}, + utils::{ + stream::{BroadbandExt, ReadyExt, TryIgnore}, + IterStream, + }, PduCount, }; use futures::{pin_mut, StreamExt}; -use ruma::{RoomId, UserId}; +use ruma::{ + directory::RoomTypeFilter, + events::TimelineEventType::{ + self, Beacon, CallInvite, PollStart, RoomEncrypted, RoomMessage, Sticker, + }, + RoomId, UserId, +}; -pub(crate) use self::{v3::sync_events_route, v4::sync_events_v4_route}; +pub(crate) use self::{ + v3::sync_events_route, v4::sync_events_v4_route, v5::sync_events_v5_route, +}; use crate::{service::Services, Error, PduEvent, Result}; +pub(crate) const DEFAULT_BUMP_TYPES: &[TimelineEventType; 6] = + &[CallInvite, PollStart, Beacon, RoomEncrypted, RoomMessage, Sticker]; + async fn load_timeline( services: &Services, sender_user: &UserId, @@ -69,3 +84,33 @@ async fn share_encrypted_room( }) .await } + +pub(crate) async fn filter_rooms<'a>( + services: &Services, + rooms: &[&'a RoomId], + filter: &[RoomTypeFilter], + negate: bool, +) -> Vec<&'a RoomId> { + rooms + .iter() + .stream() + .filter_map(|r| async move { + let room_type = services.rooms.state_accessor.get_room_type(r).await; + + if room_type.as_ref().is_err_and(|e| !e.is_not_found()) { + return None; + } + + let room_type_filter = RoomTypeFilter::from(room_type.ok()); + + let include = if negate { + !filter.contains(&room_type_filter) + } else { + filter.is_empty() || filter.contains(&room_type_filter) + }; + + include.then_some(r) + }) + .collect() + .await +} diff --git a/src/api/client/sync/v4.rs b/src/api/client/sync/v4.rs index 9915752e..8937a12c 100644 --- a/src/api/client/sync/v4.rs +++ b/src/api/client/sync/v4.rs @@ -23,24 +23,23 @@ use ruma::{ DeviceLists, UnreadNotificationsCount, }, }, - directory::RoomTypeFilter, events::{ room::member::{MembershipState, RoomMemberEventContent}, AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType, - TimelineEventType::{self, *}, + TimelineEventType::*, }, serde::Raw, - uint, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId, UInt, + uint, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UInt, }; -use service::{rooms::read_receipt::pack_receipts, Services}; +use service::rooms::read_receipt::pack_receipts; use super::{load_timeline, share_encrypted_room}; -use crate::{client::ignored_filter, Ruma}; +use crate::{ + client::{filter_rooms, ignored_filter, sync::v5::TodoRooms, DEFAULT_BUMP_TYPES}, + Ruma, +}; -const SINGLE_CONNECTION_SYNC: &str = "single_connection_sync"; - -const DEFAULT_BUMP_TYPES: &[TimelineEventType; 6] = - &[CallInvite, PollStart, Beacon, RoomEncrypted, RoomMessage, Sticker]; +pub(crate) const SINGLE_CONNECTION_SYNC: &str = "single_connection_sync"; /// POST `/_matrix/client/unstable/org.matrix.msc3575/sync` /// @@ -113,12 +112,17 @@ pub(crate) async fn sync_events_v4_route( .collect() .await; - let all_rooms = all_joined_rooms + let all_invited_rooms: Vec<&RoomId> = all_invited_rooms.iter().map(AsRef::as_ref).collect(); + + let all_rooms: Vec<&RoomId> = all_joined_rooms .iter() - .chain(all_invited_rooms.iter()) - .map(Clone::clone) + .map(AsRef::as_ref) + .chain(all_invited_rooms.iter().map(AsRef::as_ref)) .collect(); + let all_joined_rooms = all_joined_rooms.iter().map(AsRef::as_ref).collect(); + let all_invited_rooms = all_invited_rooms.iter().map(AsRef::as_ref).collect(); + if body.extensions.to_device.enabled.unwrap_or(false) { services .users @@ -171,6 +175,7 @@ pub(crate) async fn sync_events_v4_route( ); for room_id in &all_joined_rooms { + let room_id: &&RoomId = room_id; let Ok(current_shortstatehash) = services.rooms.state.get_room_shortstatehash(room_id).await else { @@ -323,7 +328,7 @@ pub(crate) async fn sync_events_v4_route( } let mut lists = BTreeMap::new(); - let mut todo_rooms = BTreeMap::new(); // and required state + let mut todo_rooms: TodoRooms = BTreeMap::new(); // and required state for (list_id, list) in &body.lists { let active_rooms = match list.filters.clone().and_then(|f| f.is_invite) { @@ -344,7 +349,7 @@ pub(crate) async fn sync_events_v4_route( | None => active_rooms, }; - let mut new_known_rooms = BTreeSet::new(); + let mut new_known_rooms: BTreeSet = BTreeSet::new(); let ranges = list.ranges.clone(); lists.insert(list_id.clone(), sync_events::v4::SyncList { @@ -366,9 +371,9 @@ pub(crate) async fn sync_events_v4_route( Vec::new() }; - new_known_rooms.extend(room_ids.iter().cloned()); + new_known_rooms.extend(room_ids.clone().into_iter().map(ToOwned::to_owned)); for room_id in &room_ids { - let todo_room = todo_rooms.entry(room_id.clone()).or_insert(( + let todo_room = todo_rooms.entry((*room_id).to_owned()).or_insert(( BTreeSet::new(), 0_usize, u64::MAX, @@ -390,7 +395,7 @@ pub(crate) async fn sync_events_v4_route( todo_room.2 = todo_room.2.min( known_rooms .get(list_id.as_str()) - .and_then(|k| k.get(room_id)) + .and_then(|k| k.get(*room_id)) .copied() .unwrap_or(0), ); @@ -399,7 +404,7 @@ pub(crate) async fn sync_events_v4_route( op: SlidingOp::Sync, range: Some(r), index: None, - room_ids, + room_ids: room_ids.into_iter().map(ToOwned::to_owned).collect(), room_id: None, } }) @@ -409,8 +414,8 @@ pub(crate) async fn sync_events_v4_route( if let Some(conn_id) = &body.conn_id { services.sync.update_sync_known_rooms( - sender_user.clone(), - sender_device.clone(), + sender_user, + &sender_device, conn_id.clone(), list_id.clone(), new_known_rooms, @@ -455,8 +460,8 @@ pub(crate) async fn sync_events_v4_route( if let Some(conn_id) = &body.conn_id { services.sync.update_sync_known_rooms( - sender_user.clone(), - sender_device.clone(), + sender_user, + &sender_device, conn_id.clone(), "subscriptions".to_owned(), known_subscription_rooms, @@ -480,7 +485,8 @@ pub(crate) async fn sync_events_v4_route( let mut timestamp: Option<_> = None; let mut invite_state = None; let (timeline_pdus, limited); - if all_invited_rooms.contains(room_id) { + let new_room_id: &RoomId = (*room_id).as_ref(); + if all_invited_rooms.contains(&new_room_id) { // TODO: figure out a timestamp we can use for remote invites invite_state = services .rooms @@ -510,7 +516,7 @@ pub(crate) async fn sync_events_v4_route( } account_data.rooms.insert( - room_id.clone(), + room_id.to_owned(), services .account_data .changes_since(Some(room_id), sender_user, *roomsince) @@ -740,10 +746,9 @@ pub(crate) async fn sync_events_v4_route( }); } - if rooms - .iter() - .all(|(_, r)| r.timeline.is_empty() && r.required_state.is_empty()) - { + if rooms.iter().all(|(id, r)| { + r.timeline.is_empty() && r.required_state.is_empty() && !receipts.rooms.contains_key(id) + }) { // Hang a few seconds so requests are not spammed // Stop hanging if new info arrives let default = Duration::from_secs(30); @@ -789,33 +794,3 @@ pub(crate) async fn sync_events_v4_route( delta_token: None, }) } - -async fn filter_rooms( - services: &Services, - rooms: &[OwnedRoomId], - filter: &[RoomTypeFilter], - negate: bool, -) -> Vec { - rooms - .iter() - .stream() - .filter_map(|r| async move { - let room_type = services.rooms.state_accessor.get_room_type(r).await; - - if room_type.as_ref().is_err_and(|e| !e.is_not_found()) { - return None; - } - - let room_type_filter = RoomTypeFilter::from(room_type.ok()); - - let include = if negate { - !filter.contains(&room_type_filter) - } else { - filter.is_empty() || filter.contains(&room_type_filter) - }; - - include.then_some(r.to_owned()) - }) - .collect() - .await -} diff --git a/src/api/client/sync/v5.rs b/src/api/client/sync/v5.rs new file mode 100644 index 00000000..6cc788ca --- /dev/null +++ b/src/api/client/sync/v5.rs @@ -0,0 +1,871 @@ +use std::{ + cmp::{self, Ordering}, + collections::{BTreeMap, BTreeSet, HashMap, HashSet}, + time::Duration, +}; + +use axum::extract::State; +use conduwuit::{ + debug, error, extract_variant, trace, + utils::{ + math::{ruma_from_usize, usize_from_ruma}, + BoolExt, IterStream, ReadyExt, TryFutureExtExt, + }, + warn, Error, Result, +}; +use futures::{FutureExt, StreamExt, TryFutureExt}; +use ruma::{ + api::client::{ + error::ErrorKind, + sync::sync_events::{self, DeviceLists, UnreadNotificationsCount}, + }, + events::{ + room::member::{MembershipState, RoomMemberEventContent}, + AnyRawAccountDataEvent, AnySyncEphemeralRoomEvent, StateEventType, TimelineEventType, + }, + serde::Raw, + state_res::TypeStateKey, + uint, DeviceId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UInt, UserId, +}; +use service::{rooms::read_receipt::pack_receipts, PduCount}; + +use super::{filter_rooms, share_encrypted_room}; +use crate::{ + client::{ignored_filter, sync::load_timeline, DEFAULT_BUMP_TYPES}, + Ruma, +}; + +type SyncInfo<'a> = (&'a UserId, &'a DeviceId, u64, &'a sync_events::v5::Request); + +pub(crate) async fn sync_events_v5_route( + State(services): State, + body: Ruma, +) -> Result { + debug_assert!(DEFAULT_BUMP_TYPES.is_sorted(), "DEFAULT_BUMP_TYPES is not sorted"); + let sender_user = body.sender_user.as_ref().expect("user is authenticated"); + let sender_device = body.sender_device.as_ref().expect("user is authenticated"); + let mut body = body.body; + + // Setup watchers, so if there's no response, we can wait for them + let watcher = services.sync.watch(sender_user, sender_device); + + let next_batch = services.globals.next_count()?; + + let conn_id = body.conn_id.clone(); + + let globalsince = body + .pos + .as_ref() + .and_then(|string| string.parse().ok()) + .unwrap_or(0); + + if globalsince != 0 + && !services.sync.snake_connection_cached( + sender_user.clone(), + sender_device.clone(), + conn_id.clone(), + ) { + debug!("Restarting sync stream because it was gone from the database"); + return Err(Error::Request( + ErrorKind::UnknownPos, + "Connection data lost since last time".into(), + http::StatusCode::BAD_REQUEST, + )); + } + + // Client / User requested an initial sync + if globalsince == 0 { + services.sync.forget_snake_sync_connection( + sender_user.clone(), + sender_device.clone(), + conn_id.clone(), + ); + } + + // Get sticky parameters from cache + let known_rooms = services.sync.update_snake_sync_request_with_cache( + sender_user.clone(), + sender_device.clone(), + &mut body, + ); + + let all_joined_rooms: Vec<_> = services + .rooms + .state_cache + .rooms_joined(sender_user) + .map(ToOwned::to_owned) + .collect() + .await; + + let all_invited_rooms: Vec<_> = services + .rooms + .state_cache + .rooms_invited(sender_user) + .map(|r| r.0) + .collect() + .await; + + let all_rooms: Vec<&RoomId> = all_joined_rooms + .iter() + .map(AsRef::as_ref) + .chain(all_invited_rooms.iter().map(AsRef::as_ref)) + .collect(); + + let all_joined_rooms = all_joined_rooms.iter().map(AsRef::as_ref).collect(); + let all_invited_rooms = all_invited_rooms.iter().map(AsRef::as_ref).collect(); + + let pos = next_batch.clone().to_string(); + + let mut todo_rooms: TodoRooms = BTreeMap::new(); + + let sync_info: SyncInfo<'_> = (sender_user, sender_device, globalsince, &body); + let mut response = sync_events::v5::Response { + txn_id: body.txn_id.clone(), + pos, + lists: BTreeMap::new(), + rooms: BTreeMap::new(), + extensions: sync_events::v5::response::Extensions { + account_data: collect_account_data(services, sync_info).await, + e2ee: collect_e2ee(services, sync_info, &all_joined_rooms).await?, + to_device: collect_to_device(services, sync_info, next_batch).await, + receipts: collect_receipts(services).await, + typing: sync_events::v5::response::Typing::default(), + }, + }; + + { + let _test2 = handle_lists( + services, + sync_info, + &all_invited_rooms, + &all_joined_rooms, + &all_rooms, + &mut todo_rooms, + &known_rooms, + &mut response, + ) + .await; + } + + { + fetch_subscriptions(services, sync_info, &known_rooms, &mut todo_rooms).await; + }; + + response.rooms = process_rooms( + services, + sender_user, + next_batch, + &all_invited_rooms, + &todo_rooms, + &mut response, + &body, + ) + .await?; + + if response.rooms.iter().all(|(id, r)| { + r.timeline.is_empty() + && r.required_state.is_empty() + && !response.extensions.receipts.rooms.contains_key(id) + }) && response + .extensions + .to_device + .clone() + .is_none_or(|to| to.events.is_empty()) + { + // Hang a few seconds so requests are not spammed + // Stop hanging if new info arrives + let default = Duration::from_secs(30); + let duration = cmp::min(body.timeout.unwrap_or(default), default); + _ = tokio::time::timeout(duration, watcher).await; + } + + trace!( + rooms=?response.rooms.len(), + account_data=?response.extensions.account_data.rooms.len(), + receipts=?response.extensions.receipts.rooms.len(), + "responding to request with" + ); + Ok(response) +} + +type KnownRooms = BTreeMap>; +pub(crate) type TodoRooms = BTreeMap, usize, u64)>; + +async fn fetch_subscriptions( + services: crate::State, + (sender_user, sender_device, globalsince, body): SyncInfo<'_>, + known_rooms: &KnownRooms, + todo_rooms: &mut TodoRooms, +) { + let mut known_subscription_rooms = BTreeSet::new(); + for (room_id, room) in &body.room_subscriptions { + if !services.rooms.metadata.exists(room_id).await { + continue; + } + let todo_room = + todo_rooms + .entry(room_id.clone()) + .or_insert((BTreeSet::new(), 0_usize, u64::MAX)); + + let limit: UInt = room.timeline_limit; + + todo_room.0.extend(room.required_state.iter().cloned()); + todo_room.1 = todo_room.1.max(usize_from_ruma(limit)); + // 0 means unknown because it got out of date + todo_room.2 = todo_room.2.min( + known_rooms + .get("subscriptions") + .and_then(|k| k.get(room_id)) + .copied() + .unwrap_or(0), + ); + known_subscription_rooms.insert(room_id.clone()); + } + // where this went (protomsc says it was removed) + //for r in body.unsubscribe_rooms { + // known_subscription_rooms.remove(&r); + // body.room_subscriptions.remove(&r); + //} + + if let Some(conn_id) = &body.conn_id { + services.sync.update_snake_sync_known_rooms( + sender_user, + sender_device, + conn_id.clone(), + "subscriptions".to_owned(), + known_subscription_rooms, + globalsince, + ); + } +} + +#[allow(clippy::too_many_arguments)] +async fn handle_lists<'a>( + services: crate::State, + (sender_user, sender_device, globalsince, body): SyncInfo<'_>, + all_invited_rooms: &Vec<&'a RoomId>, + all_joined_rooms: &Vec<&'a RoomId>, + all_rooms: &Vec<&'a RoomId>, + todo_rooms: &'a mut TodoRooms, + known_rooms: &'a KnownRooms, + response: &'_ mut sync_events::v5::Response, +) -> KnownRooms { + for (list_id, list) in &body.lists { + let active_rooms = match list.filters.clone().and_then(|f| f.is_invite) { + | Some(true) => all_invited_rooms, + | Some(false) => all_joined_rooms, + | None => all_rooms, + }; + + let active_rooms = match list.filters.clone().map(|f| f.not_room_types) { + | Some(filter) if filter.is_empty() => active_rooms, + | Some(value) => &filter_rooms(&services, active_rooms, &value, true).await, + | None => active_rooms, + }; + + let mut new_known_rooms: BTreeSet = BTreeSet::new(); + + let ranges = list.ranges.clone(); + + for mut range in ranges { + range.0 = uint!(0); + range.1 = range + .1 + .clamp(range.0, UInt::try_from(active_rooms.len()).unwrap_or(UInt::MAX)); + + let room_ids = + active_rooms[usize_from_ruma(range.0)..usize_from_ruma(range.1)].to_vec(); + + let new_rooms: BTreeSet = + room_ids.clone().into_iter().map(From::from).collect(); + new_known_rooms.extend(new_rooms); + //new_known_rooms.extend(room_ids..cloned()); + for room_id in room_ids { + let todo_room = todo_rooms.entry(room_id.to_owned()).or_insert(( + BTreeSet::new(), + 0_usize, + u64::MAX, + )); + + let limit: usize = usize_from_ruma(list.room_details.timeline_limit).min(100); + + todo_room + .0 + .extend(list.room_details.required_state.iter().cloned()); + + todo_room.1 = todo_room.1.max(limit); + // 0 means unknown because it got out of date + todo_room.2 = todo_room.2.min( + known_rooms + .get(list_id.as_str()) + .and_then(|k| k.get(room_id)) + .copied() + .unwrap_or(0), + ); + } + } + response + .lists + .insert(list_id.clone(), sync_events::v5::response::List { + count: ruma_from_usize(active_rooms.len()), + }); + + if let Some(conn_id) = &body.conn_id { + services.sync.update_snake_sync_known_rooms( + sender_user, + sender_device, + conn_id.clone(), + list_id.clone(), + new_known_rooms, + globalsince, + ); + } + } + BTreeMap::default() +} + +async fn process_rooms( + services: crate::State, + sender_user: &UserId, + next_batch: u64, + all_invited_rooms: &[&RoomId], + todo_rooms: &TodoRooms, + response: &mut sync_events::v5::Response, + body: &sync_events::v5::Request, +) -> Result> { + let mut rooms = BTreeMap::new(); + for (room_id, (required_state_request, timeline_limit, roomsince)) in todo_rooms { + let roomsincecount = PduCount::Normal(*roomsince); + + let mut timestamp: Option<_> = None; + let mut invite_state = None; + let (timeline_pdus, limited); + let new_room_id: &RoomId = (*room_id).as_ref(); + if all_invited_rooms.contains(&new_room_id) { + // TODO: figure out a timestamp we can use for remote invites + invite_state = services + .rooms + .state_cache + .invite_state(sender_user, room_id) + .await + .ok(); + + (timeline_pdus, limited) = (Vec::new(), true); + } else { + (timeline_pdus, limited) = match load_timeline( + &services, + sender_user, + room_id, + roomsincecount, + Some(PduCount::from(next_batch)), + *timeline_limit, + ) + .await + { + | Ok(value) => value, + | Err(err) => { + warn!("Encountered missing timeline in {}, error {}", room_id, err); + continue; + }, + }; + } + + if body.extensions.to_device.enabled == Some(true) { + response.extensions.account_data.rooms.insert( + room_id.to_owned(), + services + .account_data + .changes_since(Some(room_id), sender_user, *roomsince) + .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) + .collect() + .await, + ); + } + + let last_privateread_update = services + .rooms + .read_receipt + .last_privateread_update(sender_user, room_id) + .await > *roomsince; + + let private_read_event = if last_privateread_update { + services + .rooms + .read_receipt + .private_read_get(room_id, sender_user) + .await + .ok() + } else { + None + }; + + let mut receipts: Vec> = services + .rooms + .read_receipt + .readreceipts_since(room_id, *roomsince) + .filter_map(|(read_user, _ts, v)| async move { + services + .users + .user_is_ignored(read_user, sender_user) + .await + .or_some(v) + }) + .collect() + .await; + + if let Some(private_read_event) = private_read_event { + receipts.push(private_read_event); + } + + let receipt_size = receipts.len(); + + if receipt_size > 0 { + response + .extensions + .receipts + .rooms + .insert(room_id.clone(), pack_receipts(Box::new(receipts.into_iter()))); + } + + if roomsince != &0 + && timeline_pdus.is_empty() + && response + .extensions + .account_data + .rooms + .get(room_id) + .is_none_or(Vec::is_empty) + && receipt_size == 0 + { + continue; + } + + let prev_batch = timeline_pdus + .first() + .map_or(Ok::<_, Error>(None), |(pdu_count, _)| { + Ok(Some(match pdu_count { + | PduCount::Backfilled(_) => { + error!("timeline in backfill state?!"); + "0".to_owned() + }, + | PduCount::Normal(c) => c.to_string(), + })) + })? + .or_else(|| { + if roomsince != &0 { + Some(roomsince.to_string()) + } else { + None + } + }); + + let room_events: Vec<_> = timeline_pdus + .iter() + .stream() + .filter_map(|item| ignored_filter(&services, item.clone(), sender_user)) + .map(|(_, pdu)| pdu.to_sync_room_event()) + .collect() + .await; + + for (_, pdu) in timeline_pdus { + let ts = pdu.origin_server_ts; + if DEFAULT_BUMP_TYPES.binary_search(&pdu.kind).is_ok() + && timestamp.is_none_or(|time| time <= ts) + { + timestamp = Some(ts); + } + } + + let required_state = required_state_request + .iter() + .stream() + .filter_map(|state| async move { + services + .rooms + .state_accessor + .room_state_get(room_id, &state.0, &state.1) + .await + .map(|s| s.to_sync_state_event()) + .ok() + }) + .collect() + .await; + + // Heroes + let heroes: Vec<_> = services + .rooms + .state_cache + .room_members(room_id) + .ready_filter(|member| *member != sender_user) + .filter_map(|user_id| { + services + .rooms + .state_accessor + .get_member(room_id, user_id) + .map_ok(|memberevent| sync_events::v5::response::Hero { + user_id: user_id.into(), + name: memberevent.displayname, + avatar: memberevent.avatar_url, + }) + .ok() + }) + .take(5) + .collect() + .await; + + let name = match heroes.len().cmp(&(1_usize)) { + | Ordering::Greater => { + let firsts = heroes[1..] + .iter() + .map(|h| h.name.clone().unwrap_or_else(|| h.user_id.to_string())) + .collect::>() + .join(", "); + + let last = heroes[0] + .name + .clone() + .unwrap_or_else(|| heroes[0].user_id.to_string()); + + Some(format!("{firsts} and {last}")) + }, + | Ordering::Equal => Some( + heroes[0] + .name + .clone() + .unwrap_or_else(|| heroes[0].user_id.to_string()), + ), + | Ordering::Less => None, + }; + + let heroes_avatar = if heroes.len() == 1 { + heroes[0].avatar.clone() + } else { + None + }; + + rooms.insert(room_id.clone(), sync_events::v5::response::Room { + name: services + .rooms + .state_accessor + .get_name(room_id) + .await + .ok() + .or(name), + avatar: if let Some(heroes_avatar) = heroes_avatar { + ruma::JsOption::Some(heroes_avatar) + } else { + match services.rooms.state_accessor.get_avatar(room_id).await { + | ruma::JsOption::Some(avatar) => ruma::JsOption::from_option(avatar.url), + | ruma::JsOption::Null => ruma::JsOption::Null, + | ruma::JsOption::Undefined => ruma::JsOption::Undefined, + } + }, + initial: Some(roomsince == &0), + is_dm: None, + invite_state, + unread_notifications: UnreadNotificationsCount { + highlight_count: Some( + services + .rooms + .user + .highlight_count(sender_user, room_id) + .await + .try_into() + .expect("notification count can't go that high"), + ), + notification_count: Some( + services + .rooms + .user + .notification_count(sender_user, room_id) + .await + .try_into() + .expect("notification count can't go that high"), + ), + }, + timeline: room_events, + required_state, + prev_batch, + limited, + joined_count: Some( + services + .rooms + .state_cache + .room_joined_count(room_id) + .await + .unwrap_or(0) + .try_into() + .unwrap_or_else(|_| uint!(0)), + ), + invited_count: Some( + services + .rooms + .state_cache + .room_invited_count(room_id) + .await + .unwrap_or(0) + .try_into() + .unwrap_or_else(|_| uint!(0)), + ), + num_live: None, // Count events in timeline greater than global sync counter + bump_stamp: timestamp, + heroes: Some(heroes), + }); + } + Ok(rooms) +} +async fn collect_account_data( + services: crate::State, + (sender_user, _, globalsince, body): (&UserId, &DeviceId, u64, &sync_events::v5::Request), +) -> sync_events::v5::response::AccountData { + let mut account_data = sync_events::v5::response::AccountData { + global: Vec::new(), + rooms: BTreeMap::new(), + }; + + if !body.extensions.account_data.enabled.unwrap_or(false) { + return sync_events::v5::response::AccountData::default(); + } + + account_data.global = services + .account_data + .changes_since(None, sender_user, globalsince) + .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Global)) + .collect() + .await; + + if let Some(rooms) = &body.extensions.account_data.rooms { + for room in rooms { + account_data.rooms.insert( + room.clone(), + services + .account_data + .changes_since(Some(room), sender_user, globalsince) + .ready_filter_map(|e| extract_variant!(e, AnyRawAccountDataEvent::Room)) + .collect() + .await, + ); + } + } + + account_data +} + +async fn collect_e2ee<'a>( + services: crate::State, + (sender_user, sender_device, globalsince, body): ( + &UserId, + &DeviceId, + u64, + &sync_events::v5::Request, + ), + all_joined_rooms: &'a Vec<&'a RoomId>, +) -> Result { + if !body.extensions.e2ee.enabled.unwrap_or(false) { + return Ok(sync_events::v5::response::E2EE::default()); + } + let mut left_encrypted_users = HashSet::new(); // Users that have left any encrypted rooms the sender was in + let mut device_list_changes = HashSet::new(); + let mut device_list_left = HashSet::new(); + // Look for device list updates of this account + device_list_changes.extend( + services + .users + .keys_changed(sender_user, globalsince, None) + .map(ToOwned::to_owned) + .collect::>() + .await, + ); + + for room_id in all_joined_rooms { + let Ok(current_shortstatehash) = + services.rooms.state.get_room_shortstatehash(room_id).await + else { + error!("Room {room_id} has no state"); + continue; + }; + + let since_shortstatehash = services + .rooms + .user + .get_token_shortstatehash(room_id, globalsince) + .await + .ok(); + + let encrypted_room = services + .rooms + .state_accessor + .state_get(current_shortstatehash, &StateEventType::RoomEncryption, "") + .await + .is_ok(); + + if let Some(since_shortstatehash) = since_shortstatehash { + // Skip if there are only timeline changes + if since_shortstatehash == current_shortstatehash { + continue; + } + + let since_encryption = services + .rooms + .state_accessor + .state_get(since_shortstatehash, &StateEventType::RoomEncryption, "") + .await; + + let since_sender_member: Option = services + .rooms + .state_accessor + .state_get_content( + since_shortstatehash, + &StateEventType::RoomMember, + sender_user.as_str(), + ) + .ok() + .await; + + let joined_since_last_sync = since_sender_member + .as_ref() + .is_none_or(|member| member.membership != MembershipState::Join); + + let new_encrypted_room = encrypted_room && since_encryption.is_err(); + + if encrypted_room { + let current_state_ids: HashMap<_, OwnedEventId> = services + .rooms + .state_accessor + .state_full_ids(current_shortstatehash) + .await?; + + let since_state_ids = services + .rooms + .state_accessor + .state_full_ids(since_shortstatehash) + .await?; + + for (key, id) in current_state_ids { + if since_state_ids.get(&key) != Some(&id) { + let Ok(pdu) = services.rooms.timeline.get_pdu(&id).await else { + error!("Pdu in state not found: {id}"); + continue; + }; + if pdu.kind == TimelineEventType::RoomMember { + if let Some(state_key) = &pdu.state_key { + let user_id = + OwnedUserId::parse(state_key.clone()).map_err(|_| { + Error::bad_database("Invalid UserId in member PDU.") + })?; + + if user_id == *sender_user { + continue; + } + + let content: RoomMemberEventContent = pdu.get_content()?; + match content.membership { + | MembershipState::Join => { + // A new user joined an encrypted room + if !share_encrypted_room( + &services, + sender_user, + &user_id, + Some(room_id), + ) + .await + { + device_list_changes.insert(user_id); + } + }, + | MembershipState::Leave => { + // Write down users that have left encrypted rooms we + // are in + left_encrypted_users.insert(user_id); + }, + | _ => {}, + } + } + } + } + } + if joined_since_last_sync || new_encrypted_room { + // If the user is in a new encrypted room, give them all joined users + device_list_changes.extend( + services + .rooms + .state_cache + .room_members(room_id) + // Don't send key updates from the sender to the sender + .ready_filter(|user_id| sender_user != *user_id) + // Only send keys if the sender doesn't share an encrypted room with the target + // already + .filter_map(|user_id| { + share_encrypted_room(&services, sender_user, user_id, Some(room_id)) + .map(|res| res.or_some(user_id.to_owned())) + }) + .collect::>() + .await, + ); + } + } + } + // Look for device list updates in this room + device_list_changes.extend( + services + .users + .room_keys_changed(room_id, globalsince, None) + .map(|(user_id, _)| user_id) + .map(ToOwned::to_owned) + .collect::>() + .await, + ); + } + + for user_id in left_encrypted_users { + let dont_share_encrypted_room = + !share_encrypted_room(&services, sender_user, &user_id, None).await; + + // If the user doesn't share an encrypted room with the target anymore, we need + // to tell them + if dont_share_encrypted_room { + device_list_left.insert(user_id); + } + } + + Ok(sync_events::v5::response::E2EE { + device_lists: DeviceLists { + changed: device_list_changes.into_iter().collect(), + left: device_list_left.into_iter().collect(), + }, + device_one_time_keys_count: services + .users + .count_one_time_keys(sender_user, sender_device) + .await, + device_unused_fallback_key_types: None, + }) +} + +async fn collect_to_device( + services: crate::State, + (sender_user, sender_device, globalsince, body): SyncInfo<'_>, + next_batch: u64, +) -> Option { + if !body.extensions.to_device.enabled.unwrap_or(false) { + return None; + } + + services + .users + .remove_to_device_events(sender_user, sender_device, globalsince) + .await; + + Some(sync_events::v5::response::ToDevice { + next_batch: next_batch.to_string(), + events: services + .users + .get_to_device_events(sender_user, sender_device) + .collect() + .await, + }) +} + +async fn collect_receipts(_services: crate::State) -> sync_events::v5::response::Receipts { + sync_events::v5::response::Receipts { rooms: BTreeMap::new() } + // TODO: get explicitly requested read receipts +} diff --git a/src/api/client/unversioned.rs b/src/api/client/unversioned.rs index b4856d72..904f1d2f 100644 --- a/src/api/client/unversioned.rs +++ b/src/api/client/unversioned.rs @@ -52,6 +52,7 @@ pub(crate) async fn get_supported_versions_route( ("org.matrix.msc4180".to_owned(), true), /* stable flag for 3916 (https://github.com/matrix-org/matrix-spec-proposals/pull/4180) */ ("uk.tcpip.msc4133".to_owned(), true), /* Extending User Profile API with Key:Value Pairs (https://github.com/matrix-org/matrix-spec-proposals/pull/4133) */ ("us.cloke.msc4175".to_owned(), true), /* Profile field for user time zone (https://github.com/matrix-org/matrix-spec-proposals/pull/4175) */ + ("org.matrix.simplified_msc3575".to_owned(), true), /* Simplified Sliding sync (https://github.com/matrix-org/matrix-spec-proposals/pull/4186) */ ]), }; diff --git a/src/api/router.rs b/src/api/router.rs index 1b38670d..c62295d7 100644 --- a/src/api/router.rs +++ b/src/api/router.rs @@ -144,6 +144,7 @@ pub fn build(router: Router, server: &Server) -> Router { ) .ruma_route(&client::sync_events_route) .ruma_route(&client::sync_events_v4_route) + .ruma_route(&client::sync_events_v5_route) .ruma_route(&client::get_context_route) .ruma_route(&client::get_message_events_route) .ruma_route(&client::search_events_route) diff --git a/src/core/Cargo.toml b/src/core/Cargo.toml index 4a9cc462..c716e9c2 100644 --- a/src/core/Cargo.toml +++ b/src/core/Cargo.toml @@ -50,6 +50,9 @@ zstd_compression = [ ] perf_measurements = [] sentry_telemetry = [] +conduwuit_mods = [ + "dep:libloading" +] [dependencies] argon2.workspace = true @@ -75,6 +78,7 @@ ipaddress.workspace = true itertools.workspace = true libc.workspace = true libloading.workspace = true +libloading.optional = true log.workspace = true num-traits.workspace = true rand.workspace = true diff --git a/src/core/mod.rs b/src/core/mod.rs index 87cb58ae..1416ed9e 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -25,7 +25,7 @@ pub use crate as conduwuit_core; rustc_flags_capture! {} -#[cfg(not(conduwuit_mods))] +#[cfg(any(not(conduwuit_mods), not(feature = "conduwuit_mods")))] pub mod mods { #[macro_export] macro_rules! mod_ctor { diff --git a/src/core/mods/mod.rs b/src/core/mods/mod.rs index ac0c333b..b8f06f29 100644 --- a/src/core/mods/mod.rs +++ b/src/core/mods/mod.rs @@ -1,4 +1,4 @@ -#![cfg(conduwuit_mods)] +#![cfg(all(conduwuit_mods, feature = "conduwuit_mods"))] pub(crate) use libloading::os::unix::{Library, Symbol}; diff --git a/src/core/server.rs b/src/core/server.rs index 8a4d9f66..948eea36 100644 --- a/src/core/server.rs +++ b/src/core/server.rs @@ -59,7 +59,7 @@ impl Server { } pub fn reload(&self) -> Result<()> { - if cfg!(not(conduwuit_mods)) { + if cfg!(any(not(conduwuit_mods), not(feature = "conduwuit_mods"))) { return Err!("Reloading not enabled"); } diff --git a/src/main/Cargo.toml b/src/main/Cargo.toml index 38eb7188..baf5336f 100644 --- a/src/main/Cargo.toml +++ b/src/main/Cargo.toml @@ -135,6 +135,9 @@ zstd_compression = [ "conduwuit-database/zstd_compression", "conduwuit-router/zstd_compression", ] +conduwuit_mods = [ + "conduwuit-core/conduwuit_mods", +] [dependencies] conduwuit-admin.workspace = true diff --git a/src/main/main.rs b/src/main/main.rs index e7aaf3fc..dacc2a2e 100644 --- a/src/main/main.rs +++ b/src/main/main.rs @@ -37,7 +37,7 @@ fn main() -> Result<(), Error> { /// Operate the server normally in release-mode static builds. This will start, /// run and stop the server within the asynchronous runtime. -#[cfg(not(conduwuit_mods))] +#[cfg(any(not(conduwuit_mods), not(feature = "conduwuit_mods")))] #[tracing::instrument( name = "main", parent = None, @@ -89,7 +89,7 @@ async fn async_main(server: &Arc) -> Result<(), Error> { /// Operate the server in developer-mode dynamic builds. This will start, run, /// and hot-reload portions of the server as-needed before returning for an /// actual shutdown. This is not available in release-mode or static builds. -#[cfg(conduwuit_mods)] +#[cfg(all(conduwuit_mods, feature = "conduwuit_mods"))] async fn async_main(server: &Arc) -> Result<(), Error> { let mut starts = true; let mut reloads = true; diff --git a/src/main/mods.rs b/src/main/mods.rs index ca799b90..9ab36e6c 100644 --- a/src/main/mods.rs +++ b/src/main/mods.rs @@ -1,4 +1,4 @@ -#![cfg(conduwuit_mods)] +#![cfg(all(conduwuit_mods, feature = "conduwuit_mods"))] #[unsafe(no_link)] extern crate conduwuit_service; diff --git a/src/main/server.rs b/src/main/server.rs index a81b708d..359a029c 100644 --- a/src/main/server.rs +++ b/src/main/server.rs @@ -23,7 +23,7 @@ pub(crate) struct Server { #[cfg(feature = "sentry_telemetry")] _sentry_guard: Option<::sentry::ClientInitGuard>, - #[cfg(conduwuit_mods)] + #[cfg(all(conduwuit_mods, feature = "conduwuit_mods"))] // Module instances; TODO: move to mods::loaded mgmt vector pub(crate) mods: tokio::sync::RwLock>, } @@ -75,7 +75,7 @@ impl Server { #[cfg(feature = "sentry_telemetry")] _sentry_guard: sentry_guard, - #[cfg(conduwuit_mods)] + #[cfg(all(conduwuit_mods, feature = "conduwuit_mods"))] mods: tokio::sync::RwLock::new(Vec::new()), })) } diff --git a/src/main/signal.rs b/src/main/signal.rs index 0f541099..cecb718b 100644 --- a/src/main/signal.rs +++ b/src/main/signal.rs @@ -12,7 +12,7 @@ pub(super) async fn signal(server: Arc) { use unix::SignalKind; const CONSOLE: bool = cfg!(feature = "console"); - const RELOADING: bool = cfg!(all(conduwuit_mods, not(CONSOLE))); + const RELOADING: bool = cfg!(all(conduwuit_mods, feature = "conduwuit_mods", not(CONSOLE))); let mut quit = unix::signal(SignalKind::quit()).expect("SIGQUIT handler"); let mut term = unix::signal(SignalKind::terminate()).expect("SIGTERM handler"); diff --git a/src/router/Cargo.toml b/src/router/Cargo.toml index 1623590b..51e15aed 100644 --- a/src/router/Cargo.toml +++ b/src/router/Cargo.toml @@ -80,7 +80,7 @@ tower.workspace = true tower-http.workspace = true tracing.workspace = true -[target.'cfg(unix)'.dependencies] +[target.'cfg(all(unix, target_os = "linux"))'.dependencies] sd-notify.workspace = true sd-notify.optional = true diff --git a/src/router/run.rs b/src/router/run.rs index 1b4d7437..95d12559 100644 --- a/src/router/run.rs +++ b/src/router/run.rs @@ -63,7 +63,7 @@ pub(crate) async fn start(server: Arc) -> Result> { let services = Services::build(server).await?.start().await?; - #[cfg(feature = "systemd")] + #[cfg(all(feature = "systemd", target_os = "linux"))] sd_notify::notify(true, &[sd_notify::NotifyState::Ready]) .expect("failed to notify systemd of ready state"); @@ -99,7 +99,7 @@ pub(crate) async fn stop(services: Arc) -> Result<()> { ); } - #[cfg(feature = "systemd")] + #[cfg(all(feature = "systemd", target_os = "linux"))] sd_notify::notify(true, &[sd_notify::NotifyState::Stopping]) .expect("failed to notify systemd of stopping state"); diff --git a/src/service/rooms/read_receipt/mod.rs b/src/service/rooms/read_receipt/mod.rs index 9777faeb..2bc21355 100644 --- a/src/service/rooms/read_receipt/mod.rs +++ b/src/service/rooms/read_receipt/mod.rs @@ -155,6 +155,7 @@ where } let content = ReceiptEventContent::from_iter(json); + conduwuit::trace!(?content); Raw::from_json( serde_json::value::to_raw_value(&SyncEphemeralRoomEvent { content }) .expect("received valid json"), diff --git a/src/service/sync/mod.rs b/src/service/sync/mod.rs index 97f4ce9c..61d9d1dd 100644 --- a/src/service/sync/mod.rs +++ b/src/service/sync/mod.rs @@ -11,8 +11,9 @@ use ruma::{ api::client::sync::sync_events::{ self, v4::{ExtensionsConfig, SyncRequestList}, + v5, }, - OwnedDeviceId, OwnedRoomId, OwnedUserId, + DeviceId, OwnedDeviceId, OwnedRoomId, OwnedUserId, UserId, }; use crate::{rooms, Dep}; @@ -20,7 +21,8 @@ use crate::{rooms, Dep}; pub struct Service { db: Data, services: Services, - connections: DbConnections, + connections: DbConnections, + snake_connections: DbConnections, } pub struct Data { @@ -52,9 +54,19 @@ struct SlidingSyncCache { extensions: ExtensionsConfig, } -type DbConnections = Mutex>; +#[derive(Default)] +struct SnakeSyncCache { + lists: BTreeMap, + subscriptions: BTreeMap, + known_rooms: BTreeMap>, + extensions: v5::request::Extensions, +} + +type DbConnections = Mutex>; type DbConnectionsKey = (OwnedUserId, OwnedDeviceId, String); type DbConnectionsVal = Arc>; +type SnakeConnectionsKey = (OwnedUserId, OwnedDeviceId, Option); +type SnakeConnectionsVal = Arc>; impl crate::Service for Service { fn build(args: crate::Args<'_>) -> Result> { @@ -79,13 +91,51 @@ impl crate::Service for Service { typing: args.depend::("rooms::typing"), }, connections: StdMutex::new(BTreeMap::new()), + snake_connections: StdMutex::new(BTreeMap::new()), })) } fn name(&self) -> &str { crate::service::make_name(std::module_path!()) } } +/// load params from cache if body doesn't contain it, as long as it's allowed +// in some cases we may need to allow an empty list as an actual value +fn list_or_sticky(target: &mut Vec, cached: &Vec) { + if target.is_empty() { + target.clone_from(cached); + } +} +fn some_or_sticky(target: &mut Option, cached: Option) { + if target.is_none() { + *target = cached; + } +} + impl Service { + pub fn snake_connection_cached( + &self, + user_id: OwnedUserId, + device_id: OwnedDeviceId, + conn_id: Option, + ) -> bool { + self.snake_connections + .lock() + .unwrap() + .contains_key(&(user_id, device_id, conn_id)) + } + + pub fn forget_snake_sync_connection( + &self, + user_id: OwnedUserId, + device_id: OwnedDeviceId, + conn_id: Option, + ) { + self.snake_connections + .lock() + .expect("locked") + .remove(&(user_id, device_id, conn_id)); + } + pub fn remembered( &self, user_id: OwnedUserId, @@ -110,6 +160,112 @@ impl Service { .remove(&(user_id, device_id, conn_id)); } + pub fn update_snake_sync_request_with_cache( + &self, + user_id: OwnedUserId, + device_id: OwnedDeviceId, + request: &mut v5::Request, + ) -> BTreeMap> { + let conn_id = request.conn_id.clone(); + let mut cache = self.snake_connections.lock().expect("locked"); + let cached = Arc::clone( + cache + .entry((user_id, device_id, conn_id)) + .or_insert_with(|| Arc::new(Mutex::new(SnakeSyncCache::default()))), + ); + let cached = &mut cached.lock().expect("locked"); + drop(cache); + + //v5::Request::try_from_http_request(req, path_args); + for (list_id, list) in &mut request.lists { + if let Some(cached_list) = cached.lists.get(list_id) { + list_or_sticky( + &mut list.room_details.required_state, + &cached_list.room_details.required_state, + ); + some_or_sticky(&mut list.include_heroes, cached_list.include_heroes); + + match (&mut list.filters, cached_list.filters.clone()) { + | (Some(filters), Some(cached_filters)) => { + some_or_sticky(&mut filters.is_invite, cached_filters.is_invite); + // TODO (morguldir): Find out how a client can unset this, probably need + // to change into an option inside ruma + list_or_sticky( + &mut filters.not_room_types, + &cached_filters.not_room_types, + ); + }, + | (_, Some(cached_filters)) => list.filters = Some(cached_filters), + | (Some(list_filters), _) => list.filters = Some(list_filters.clone()), + | (..) => {}, + } + } + cached.lists.insert(list_id.clone(), list.clone()); + } + + cached + .subscriptions + .extend(request.room_subscriptions.clone()); + request + .room_subscriptions + .extend(cached.subscriptions.clone()); + + request.extensions.e2ee.enabled = request + .extensions + .e2ee + .enabled + .or(cached.extensions.e2ee.enabled); + + request.extensions.to_device.enabled = request + .extensions + .to_device + .enabled + .or(cached.extensions.to_device.enabled); + + request.extensions.account_data.enabled = request + .extensions + .account_data + .enabled + .or(cached.extensions.account_data.enabled); + request.extensions.account_data.lists = request + .extensions + .account_data + .lists + .clone() + .or_else(|| cached.extensions.account_data.lists.clone()); + request.extensions.account_data.rooms = request + .extensions + .account_data + .rooms + .clone() + .or_else(|| cached.extensions.account_data.rooms.clone()); + + some_or_sticky(&mut request.extensions.typing.enabled, cached.extensions.typing.enabled); + some_or_sticky( + &mut request.extensions.typing.rooms, + cached.extensions.typing.rooms.clone(), + ); + some_or_sticky( + &mut request.extensions.typing.lists, + cached.extensions.typing.lists.clone(), + ); + some_or_sticky( + &mut request.extensions.receipts.enabled, + cached.extensions.receipts.enabled, + ); + some_or_sticky( + &mut request.extensions.receipts.rooms, + cached.extensions.receipts.rooms.clone(), + ); + some_or_sticky( + &mut request.extensions.receipts.lists, + cached.extensions.receipts.lists.clone(), + ); + + cached.extensions = request.extensions.clone(); + cached.known_rooms.clone() + } + pub fn update_sync_request_with_cache( &self, user_id: OwnedUserId, @@ -136,57 +292,37 @@ impl Service { for (list_id, list) in &mut request.lists { if let Some(cached_list) = cached.lists.get(list_id) { - if list.sort.is_empty() { - list.sort.clone_from(&cached_list.sort); - }; - if list.room_details.required_state.is_empty() { - list.room_details - .required_state - .clone_from(&cached_list.room_details.required_state); - }; - list.room_details.timeline_limit = list - .room_details - .timeline_limit - .or(cached_list.room_details.timeline_limit); - list.include_old_rooms = list - .include_old_rooms - .clone() - .or_else(|| cached_list.include_old_rooms.clone()); + list_or_sticky(&mut list.sort, &cached_list.sort); + list_or_sticky( + &mut list.room_details.required_state, + &cached_list.room_details.required_state, + ); + some_or_sticky( + &mut list.room_details.timeline_limit, + cached_list.room_details.timeline_limit, + ); + some_or_sticky( + &mut list.include_old_rooms, + cached_list.include_old_rooms.clone(), + ); match (&mut list.filters, cached_list.filters.clone()) { - | (Some(list_filters), Some(cached_filters)) => { - list_filters.is_dm = list_filters.is_dm.or(cached_filters.is_dm); - if list_filters.spaces.is_empty() { - list_filters.spaces = cached_filters.spaces; - } - list_filters.is_encrypted = - list_filters.is_encrypted.or(cached_filters.is_encrypted); - list_filters.is_invite = - list_filters.is_invite.or(cached_filters.is_invite); - if list_filters.room_types.is_empty() { - list_filters.room_types = cached_filters.room_types; - } - if list_filters.not_room_types.is_empty() { - list_filters.not_room_types = cached_filters.not_room_types; - } - list_filters.room_name_like = list_filters - .room_name_like - .clone() - .or(cached_filters.room_name_like); - if list_filters.tags.is_empty() { - list_filters.tags = cached_filters.tags; - } - if list_filters.not_tags.is_empty() { - list_filters.not_tags = cached_filters.not_tags; - } + | (Some(filter), Some(cached_filter)) => { + some_or_sticky(&mut filter.is_dm, cached_filter.is_dm); + list_or_sticky(&mut filter.spaces, &cached_filter.spaces); + some_or_sticky(&mut filter.is_encrypted, cached_filter.is_encrypted); + some_or_sticky(&mut filter.is_invite, cached_filter.is_invite); + list_or_sticky(&mut filter.room_types, &cached_filter.room_types); + // Should be made possible to change + list_or_sticky(&mut filter.not_room_types, &cached_filter.not_room_types); + some_or_sticky(&mut filter.room_name_like, cached_filter.room_name_like); + list_or_sticky(&mut filter.tags, &cached_filter.tags); + list_or_sticky(&mut filter.not_tags, &cached_filter.not_tags); }, | (_, Some(cached_filters)) => list.filters = Some(cached_filters), | (Some(list_filters), _) => list.filters = Some(list_filters.clone()), | (..) => {}, } - if list.bump_event_types.is_empty() { - list.bump_event_types - .clone_from(&cached_list.bump_event_types); - }; + list_or_sticky(&mut list.bump_event_types, &cached_list.bump_event_types); } cached.lists.insert(list_id.clone(), list.clone()); } @@ -259,24 +395,26 @@ impl Service { pub fn update_sync_known_rooms( &self, - user_id: OwnedUserId, - device_id: OwnedDeviceId, + user_id: &UserId, + device_id: &DeviceId, conn_id: String, list_id: String, new_cached_rooms: BTreeSet, globalsince: u64, ) { let mut cache = self.connections.lock().expect("locked"); - let cached = Arc::clone(cache.entry((user_id, device_id, conn_id)).or_insert_with( - || { - Arc::new(Mutex::new(SlidingSyncCache { - lists: BTreeMap::new(), - subscriptions: BTreeMap::new(), - known_rooms: BTreeMap::new(), - extensions: ExtensionsConfig::default(), - })) - }, - )); + let cached = Arc::clone( + cache + .entry((user_id.to_owned(), device_id.to_owned(), conn_id)) + .or_insert_with(|| { + Arc::new(Mutex::new(SlidingSyncCache { + lists: BTreeMap::new(), + subscriptions: BTreeMap::new(), + known_rooms: BTreeMap::new(), + extensions: ExtensionsConfig::default(), + })) + }), + ); let cached = &mut cached.lock().expect("locked"); drop(cache); @@ -295,4 +433,57 @@ impl Service { list.insert(roomid, globalsince); } } + + pub fn update_snake_sync_known_rooms( + &self, + user_id: &UserId, + device_id: &DeviceId, + conn_id: String, + list_id: String, + new_cached_rooms: BTreeSet, + globalsince: u64, + ) { + let mut cache = self.snake_connections.lock().expect("locked"); + let cached = Arc::clone( + cache + .entry((user_id.to_owned(), device_id.to_owned(), Some(conn_id))) + .or_insert_with(|| Arc::new(Mutex::new(SnakeSyncCache::default()))), + ); + let cached = &mut cached.lock().expect("locked"); + drop(cache); + + for (roomid, lastsince) in cached + .known_rooms + .entry(list_id.clone()) + .or_default() + .iter_mut() + { + if !new_cached_rooms.contains(roomid) { + *lastsince = 0; + } + } + let list = cached.known_rooms.entry(list_id).or_default(); + for roomid in new_cached_rooms { + list.insert(roomid, globalsince); + } + } + + pub fn update_snake_sync_subscriptions( + &self, + user_id: OwnedUserId, + device_id: OwnedDeviceId, + conn_id: Option, + subscriptions: BTreeMap, + ) { + let mut cache = self.snake_connections.lock().expect("locked"); + let cached = Arc::clone( + cache + .entry((user_id, device_id, conn_id)) + .or_insert_with(|| Arc::new(Mutex::new(SnakeSyncCache::default()))), + ); + let cached = &mut cached.lock().expect("locked"); + drop(cache); + + cached.subscriptions = subscriptions; + } }