use chain_width 60

Signed-off-by: strawberry <strawberry@puppygock.gay>
This commit is contained in:
strawberry 2024-03-25 17:05:11 -04:00 committed by June
parent 9d6b070f35
commit 868976a149
98 changed files with 4836 additions and 1767 deletions

View file

@ -53,7 +53,11 @@ impl Service {
}
let chunk_key: Vec<u64> = chunk.iter().map(|(short, _)| short).copied().collect();
if let Some(cached) = services().rooms.auth_chain.get_cached_eventid_authchain(&chunk_key)? {
if let Some(cached) = services()
.rooms
.auth_chain
.get_cached_eventid_authchain(&chunk_key)?
{
hits += 1;
full_auth_chain.extend(cached.iter().copied());
continue;
@ -65,13 +69,20 @@ impl Service {
let mut misses2 = 0;
let mut i = 0;
for (sevent_id, event_id) in chunk {
if let Some(cached) = services().rooms.auth_chain.get_cached_eventid_authchain(&[sevent_id])? {
if let Some(cached) = services()
.rooms
.auth_chain
.get_cached_eventid_authchain(&[sevent_id])?
{
hits2 += 1;
chunk_cache.extend(cached.iter().copied());
} else {
misses2 += 1;
let auth_chain = Arc::new(self.get_auth_chain_inner(room_id, &event_id)?);
services().rooms.auth_chain.cache_auth_chain(vec![sevent_id], Arc::clone(&auth_chain))?;
services()
.rooms
.auth_chain
.cache_auth_chain(vec![sevent_id], Arc::clone(&auth_chain))?;
debug!(
event_id = ?event_id,
chain_length = ?auth_chain.len(),
@ -92,7 +103,10 @@ impl Service {
"Chunk missed",
);
let chunk_cache = Arc::new(chunk_cache);
services().rooms.auth_chain.cache_auth_chain(chunk_key, Arc::clone(&chunk_cache))?;
services()
.rooms
.auth_chain
.cache_auth_chain(chunk_key, Arc::clone(&chunk_cache))?;
full_auth_chain.extend(chunk_cache.iter());
}
@ -103,7 +117,9 @@ impl Service {
"Auth chain stats",
);
Ok(full_auth_chain.into_iter().filter_map(move |sid| services().rooms.short.get_eventid_from_short(sid).ok()))
Ok(full_auth_chain
.into_iter()
.filter_map(move |sid| services().rooms.short.get_eventid_from_short(sid).ok()))
}
#[tracing::instrument(skip(self, event_id))]
@ -118,7 +134,10 @@ impl Service {
return Err(Error::BadRequest(ErrorKind::Forbidden, "Evil event in db"));
}
for auth_event in &pdu.auth_events {
let sauthevent = services().rooms.short.get_or_create_shorteventid(auth_event)?;
let sauthevent = services()
.rooms
.short
.get_or_create_shorteventid(auth_event)?;
if !found.contains(&sauthevent) {
found.insert(sauthevent);

View file

@ -91,7 +91,8 @@ impl Service {
&self, room_id: &RoomId, user_id: &UserId, presence_state: PresenceState, currently_active: Option<bool>,
last_active_ago: Option<UInt>, status_msg: Option<String>,
) -> Result<()> {
self.db.set_presence(room_id, user_id, presence_state, currently_active, last_active_ago, status_msg)
self.db
.set_presence(room_id, user_id, presence_state, currently_active, last_active_ago, status_msg)
}
/// Removes the presence record for the given user from the database.
@ -142,7 +143,11 @@ fn process_presence_timer(user_id: &OwnedUserId) -> Result<()> {
let mut status_msg = None;
for room_id in services().rooms.state_cache.rooms_joined(user_id) {
let presence_event = services().rooms.edus.presence.get_presence(&room_id?, user_id)?;
let presence_event = services()
.rooms
.edus
.presence
.get_presence(&room_id?, user_id)?;
if let Some(presence_event) = presence_event {
presence_state = presence_event.content.presence;

View file

@ -16,16 +16,32 @@ impl Service {
/// Sets a user as typing until the timeout timestamp is reached or
/// roomtyping_remove is called.
pub async fn typing_add(&self, user_id: &UserId, room_id: &RoomId, timeout: u64) -> Result<()> {
self.typing.write().await.entry(room_id.to_owned()).or_default().insert(user_id.to_owned(), timeout);
self.last_typing_update.write().await.insert(room_id.to_owned(), services().globals.next_count()?);
self.typing
.write()
.await
.entry(room_id.to_owned())
.or_default()
.insert(user_id.to_owned(), timeout);
self.last_typing_update
.write()
.await
.insert(room_id.to_owned(), services().globals.next_count()?);
_ = self.typing_update_sender.send(room_id.to_owned());
Ok(())
}
/// Removes a user from typing before the timeout is reached.
pub async fn typing_remove(&self, user_id: &UserId, room_id: &RoomId) -> Result<()> {
self.typing.write().await.entry(room_id.to_owned()).or_default().remove(user_id);
self.last_typing_update.write().await.insert(room_id.to_owned(), services().globals.next_count()?);
self.typing
.write()
.await
.entry(room_id.to_owned())
.or_default()
.remove(user_id);
self.last_typing_update
.write()
.await
.insert(room_id.to_owned(), services().globals.next_count()?);
_ = self.typing_update_sender.send(room_id.to_owned());
Ok(())
}
@ -67,7 +83,10 @@ impl Service {
for user in removable {
room.remove(&user);
}
self.last_typing_update.write().await.insert(room_id.to_owned(), services().globals.next_count()?);
self.last_typing_update
.write()
.await
.insert(room_id.to_owned(), services().globals.next_count()?);
_ = self.typing_update_sender.send(room_id.to_owned());
}
@ -77,7 +96,13 @@ impl Service {
/// Returns the count of the last typing update in this room.
pub async fn last_typing_update(&self, room_id: &RoomId) -> Result<u64> {
self.typings_maintain(room_id).await?;
Ok(self.last_typing_update.read().await.get(room_id).copied().unwrap_or(0))
Ok(self
.last_typing_update
.read()
.await
.get(room_id)
.copied()
.unwrap_or(0))
}
/// Returns a new typing EDU.

View file

@ -125,8 +125,9 @@ impl Service {
.first_pdu_in_room(room_id)?
.ok_or_else(|| Error::bad_database("Failed to find first pdu in db."))?;
let (incoming_pdu, val) =
self.handle_outlier_pdu(origin, &create_event, event_id, room_id, value, false, pub_key_map).await?;
let (incoming_pdu, val) = self
.handle_outlier_pdu(origin, &create_event, event_id, room_id, value, false, pub_key_map)
.await?;
self.check_room_id(room_id, &incoming_pdu)?;
// 8. if not timeline event: stop
@ -167,7 +168,13 @@ impl Service {
));
}
if let Some((time, tries)) = services().globals.bad_event_ratelimiter.read().await.get(&*prev_id) {
if let Some((time, tries)) = services()
.globals
.bad_event_ratelimiter
.read()
.await
.get(&*prev_id)
{
// Exponential backoff
let mut min_elapsed_duration = Duration::from_secs(5 * 60) * (*tries) * (*tries);
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) {
@ -182,7 +189,13 @@ impl Service {
if errors >= 5 {
// Timeout other events
match services().globals.bad_event_ratelimiter.write().await.entry((*prev_id).to_owned()) {
match services()
.globals
.bad_event_ratelimiter
.write()
.await
.entry((*prev_id).to_owned())
{
hash_map::Entry::Vacant(e) => {
e.insert((Instant::now(), 1));
},
@ -207,12 +220,19 @@ impl Service {
.await
.insert(room_id.to_owned(), ((*prev_id).to_owned(), start_time));
if let Err(e) =
self.upgrade_outlier_to_timeline_pdu(pdu, json, &create_event, origin, room_id, pub_key_map).await
if let Err(e) = self
.upgrade_outlier_to_timeline_pdu(pdu, json, &create_event, origin, room_id, pub_key_map)
.await
{
errors += 1;
warn!("Prev event {} failed: {}", prev_id, e);
match services().globals.bad_event_ratelimiter.write().await.entry((*prev_id).to_owned()) {
match services()
.globals
.bad_event_ratelimiter
.write()
.await
.entry((*prev_id).to_owned())
{
hash_map::Entry::Vacant(e) => {
e.insert((Instant::now(), 1));
},
@ -222,7 +242,12 @@ impl Service {
}
}
let elapsed = start_time.elapsed();
services().globals.roomid_federationhandletime.write().await.remove(&room_id.to_owned());
services()
.globals
.roomid_federationhandletime
.write()
.await
.remove(&room_id.to_owned());
debug!(
"Handling prev event {} took {}m{}s",
prev_id,
@ -246,7 +271,12 @@ impl Service {
.event_handler
.upgrade_outlier_to_timeline_pdu(incoming_pdu, val, &create_event, origin, room_id, pub_key_map)
.await;
services().globals.roomid_federationhandletime.write().await.remove(&room_id.to_owned());
services()
.globals
.roomid_federationhandletime
.write()
.await
.remove(&room_id.to_owned());
r
}
@ -323,7 +353,11 @@ impl Service {
debug!(event_id = ?incoming_pdu.event_id, "Fetching auth events");
self.fetch_and_handle_outliers(
origin,
&incoming_pdu.auth_events.iter().map(|x| Arc::from(&**x)).collect::<Vec<_>>(),
&incoming_pdu
.auth_events
.iter()
.map(|x| Arc::from(&**x))
.collect::<Vec<_>>(),
create_event,
room_id,
room_version_id,
@ -351,7 +385,10 @@ impl Service {
match auth_events.entry((
auth_event.kind.to_string().into(),
auth_event.state_key.clone().expect("all auth events have state keys"),
auth_event
.state_key
.clone()
.expect("all auth events have state keys"),
)) {
hash_map::Entry::Vacant(v) => {
v.insert(auth_event);
@ -367,7 +404,9 @@ impl Service {
// The original create event must be in the auth events
if !matches!(
auth_events.get(&(StateEventType::RoomCreate, String::new())).map(AsRef::as_ref),
auth_events
.get(&(StateEventType::RoomCreate, String::new()))
.map(AsRef::as_ref),
Some(_) | None
) {
return Err(Error::BadRequest(
@ -390,7 +429,10 @@ impl Service {
debug!("Validation successful.");
// 7. Persist the event as an outlier.
services().rooms.outlier.add_pdu_outlier(&incoming_pdu.event_id, &val)?;
services()
.rooms
.outlier
.add_pdu_outlier(&incoming_pdu.event_id, &val)?;
debug!("Added pdu as outlier.");
@ -407,7 +449,11 @@ impl Service {
return Ok(Some(pduid));
}
if services().rooms.pdu_metadata.is_event_soft_failed(&incoming_pdu.event_id)? {
if services()
.rooms
.pdu_metadata
.is_event_soft_failed(&incoming_pdu.event_id)?
{
return Err(Error::BadRequest(ErrorKind::InvalidParam, "Event has been soft failed"));
}
@ -434,10 +480,19 @@ impl Service {
if incoming_pdu.prev_events.len() == 1 {
let prev_event = &*incoming_pdu.prev_events[0];
let prev_event_sstatehash = services().rooms.state_accessor.pdu_shortstatehash(prev_event)?;
let prev_event_sstatehash = services()
.rooms
.state_accessor
.pdu_shortstatehash(prev_event)?;
let state = if let Some(shortstatehash) = prev_event_sstatehash {
Some(services().rooms.state_accessor.state_full_ids(shortstatehash).await)
Some(
services()
.rooms
.state_accessor
.state_full_ids(shortstatehash)
.await,
)
} else {
None
};
@ -477,7 +532,11 @@ impl Service {
break;
};
let sstatehash = if let Ok(Some(s)) = services().rooms.state_accessor.pdu_shortstatehash(prev_eventid) {
let sstatehash = if let Ok(Some(s)) = services()
.rooms
.state_accessor
.pdu_shortstatehash(prev_eventid)
{
s
} else {
okay = false;
@ -492,8 +551,11 @@ impl Service {
let mut auth_chain_sets = Vec::with_capacity(extremity_sstatehashes.len());
for (sstatehash, prev_event) in extremity_sstatehashes {
let mut leaf_state: HashMap<_, _> =
services().rooms.state_accessor.state_full_ids(sstatehash).await?;
let mut leaf_state: HashMap<_, _> = services()
.rooms
.state_accessor
.state_full_ids(sstatehash)
.await?;
if let Some(state_key) = &prev_event.state_key {
let shortstatekey = services()
@ -518,8 +580,14 @@ impl Service {
starting_events.push(id);
}
auth_chain_sets
.push(services().rooms.auth_chain.get_auth_chain(room_id, starting_events).await?.collect());
auth_chain_sets.push(
services()
.rooms
.auth_chain
.get_auth_chain(room_id, starting_events)
.await?
.collect(),
);
fork_states.push(state);
}
@ -579,7 +647,11 @@ impl Service {
Ok(res) => {
debug!("Fetching state events at event.");
let collect = res.pdu_ids.iter().map(|x| Arc::from(&**x)).collect::<Vec<_>>();
let collect = res
.pdu_ids
.iter()
.map(|x| Arc::from(&**x))
.collect::<Vec<_>>();
let state_vec = self
.fetch_and_handle_outliers(
@ -679,8 +751,15 @@ impl Service {
// 13. Use state resolution to find new room state
// We start looking at current room state now, so lets lock the room
let mutex_state =
Arc::clone(services().globals.roomid_mutex_state.write().await.entry(room_id.to_owned()).or_default());
let mutex_state = Arc::clone(
services()
.globals
.roomid_mutex_state
.write()
.await
.entry(room_id.to_owned())
.or_default(),
);
let state_lock = mutex_state.lock().await;
// Now we calculate the set of extremities this room has after the incoming
@ -698,13 +777,26 @@ impl Service {
}
// Only keep those extremities were not referenced yet
extremities.retain(|id| !matches!(services().rooms.pdu_metadata.is_event_referenced(room_id, id), Ok(true)));
extremities.retain(|id| {
!matches!(
services()
.rooms
.pdu_metadata
.is_event_referenced(room_id, id),
Ok(true)
)
});
debug!("Compressing state at event");
let state_ids_compressed = Arc::new(
state_at_incoming_event
.iter()
.map(|(shortstatekey, id)| services().rooms.state_compressor.compress_state_event(*shortstatekey, id))
.map(|(shortstatekey, id)| {
services()
.rooms
.state_compressor
.compress_state_event(*shortstatekey, id)
})
.collect::<Result<_>>()?,
);
@ -722,14 +814,23 @@ impl Service {
state_after.insert(shortstatekey, Arc::from(&*incoming_pdu.event_id));
}
let new_room_state = self.resolve_state(room_id, room_version_id, state_after).await?;
let new_room_state = self
.resolve_state(room_id, room_version_id, state_after)
.await?;
// Set the new room state to the resolved state
debug!("Forcing new room state");
let (sstatehash, new, removed) = services().rooms.state_compressor.save_state(room_id, new_room_state)?;
let (sstatehash, new, removed) = services()
.rooms
.state_compressor
.save_state(room_id, new_room_state)?;
services().rooms.state.force_state(room_id, sstatehash, new, removed, &state_lock).await?;
services()
.rooms
.state
.force_state(room_id, sstatehash, new, removed, &state_lock)
.await?;
}
// 14. Check if the event passes auth based on the "current state" of the room,
@ -752,7 +853,10 @@ impl Service {
// Soft fail, we keep the event as an outlier but don't add it to the timeline
warn!("Event was soft failed: {:?}", incoming_pdu);
services().rooms.pdu_metadata.mark_event_soft_failed(&incoming_pdu.event_id)?;
services()
.rooms
.pdu_metadata
.mark_event_soft_failed(&incoming_pdu.event_id)?;
return Err(Error::BadRequest(ErrorKind::InvalidParam, "Event has been soft failed"));
}
@ -787,10 +891,17 @@ impl Service {
&self, room_id: &RoomId, room_version_id: &RoomVersionId, incoming_state: HashMap<u64, Arc<EventId>>,
) -> Result<Arc<HashSet<CompressedStateEvent>>> {
debug!("Loading current room state ids");
let current_sstatehash =
services().rooms.state.get_room_shortstatehash(room_id)?.expect("every room has state");
let current_sstatehash = services()
.rooms
.state
.get_room_shortstatehash(room_id)?
.expect("every room has state");
let current_state_ids = services().rooms.state_accessor.state_full_ids(current_sstatehash).await?;
let current_state_ids = services()
.rooms
.state_accessor
.state_full_ids(current_sstatehash)
.await?;
let fork_states = [current_state_ids, incoming_state];
@ -852,9 +963,14 @@ impl Service {
let new_room_state = state
.into_iter()
.map(|((event_type, state_key), event_id)| {
let shortstatekey =
services().rooms.short.get_or_create_shortstatekey(&event_type.to_string().into(), &state_key)?;
services().rooms.state_compressor.compress_state_event(shortstatekey, &event_id)
let shortstatekey = services()
.rooms
.short
.get_or_create_shortstatekey(&event_type.to_string().into(), &state_key)?;
services()
.rooms
.state_compressor
.compress_state_event(shortstatekey, &event_id)
})
.collect::<Result<_>>()?;
@ -877,7 +993,13 @@ impl Service {
) -> AsyncRecursiveCanonicalJsonVec<'a> {
Box::pin(async move {
let back_off = |id| async {
match services().globals.bad_event_ratelimiter.write().await.entry(id) {
match services()
.globals
.bad_event_ratelimiter
.write()
.await
.entry(id)
{
hash_map::Entry::Vacant(e) => {
e.insert((Instant::now(), 1));
},
@ -904,7 +1026,13 @@ impl Service {
let mut events_all = HashSet::new();
let mut i = 0;
while let Some(next_id) = todo_auth_events.pop() {
if let Some((time, tries)) = services().globals.bad_event_ratelimiter.read().await.get(&*next_id) {
if let Some((time, tries)) = services()
.globals
.bad_event_ratelimiter
.read()
.await
.get(&*next_id)
{
// Exponential backoff
let mut min_elapsed_duration = Duration::from_secs(5 * 60) * (*tries) * (*tries);
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) {
@ -1010,7 +1138,13 @@ impl Service {
pdus.push((local_pdu, None));
}
for (next_id, value) in events_in_reverse_order.iter().rev() {
if let Some((time, tries)) = services().globals.bad_event_ratelimiter.read().await.get(&**next_id) {
if let Some((time, tries)) = services()
.globals
.bad_event_ratelimiter
.read()
.await
.get(&**next_id)
{
// Exponential backoff
let mut min_elapsed_duration = Duration::from_secs(5 * 60) * (*tries) * (*tries);
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) {
@ -1087,9 +1221,14 @@ impl Service {
continue;
}
if let Some(json) =
json_opt.or_else(|| services().rooms.outlier.get_outlier_pdu_json(&prev_event_id).ok().flatten())
{
if let Some(json) = json_opt.or_else(|| {
services()
.rooms
.outlier
.get_outlier_pdu_json(&prev_event_id)
.ok()
.flatten()
}) {
if pdu.origin_server_ts > first_pdu_in_room.origin_server_ts {
amount += 1;
for prev_prev in &pdu.prev_events {
@ -1122,7 +1261,9 @@ impl Service {
Ok((
int!(0),
MilliSecondsSinceUnixEpoch(
eventid_info.get(event_id).map_or_else(|| uint!(0), |info| info.0.origin_server_ts),
eventid_info
.get(event_id)
.map_or_else(|| uint!(0), |info| info.0.origin_server_ts),
),
))
})
@ -1172,7 +1313,11 @@ impl Service {
info!(
"Fetch keys for {}",
server_key_ids.keys().cloned().collect::<Vec<_>>().join(", ")
server_key_ids
.keys()
.cloned()
.collect::<Vec<_>>()
.join(", ")
);
let mut server_keys: FuturesUnordered<_> = server_key_ids
@ -1204,7 +1349,10 @@ impl Service {
while let Some(fetch_res) = server_keys.next().await {
match fetch_res {
Ok((signature_server, keys)) => {
pub_key_map.write().await.insert(signature_server.clone(), keys);
pub_key_map
.write()
.await
.insert(signature_server.clone(), keys);
},
Err((signature_server, e)) => {
warn!("Failed to fetch keys for {}: {:?}", signature_server, e);
@ -1235,7 +1383,13 @@ impl Service {
);
let event_id = <&EventId>::try_from(event_id.as_str()).expect("ruma's reference hashes are valid event ids");
if let Some((time, tries)) = services().globals.bad_event_ratelimiter.read().await.get(event_id) {
if let Some((time, tries)) = services()
.globals
.bad_event_ratelimiter
.read()
.await
.get(event_id)
{
// Exponential backoff
let mut min_elapsed_duration = Duration::from_secs(5 * 60) * (*tries) * (*tries);
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) {
@ -1275,8 +1429,12 @@ impl Service {
debug!("Loading signing keys for {}", origin);
let result: BTreeMap<_, _> =
services().globals.signing_keys_for(origin)?.into_iter().map(|(k, v)| (k.to_string(), v.key)).collect();
let result: BTreeMap<_, _> = services()
.globals
.signing_keys_for(origin)?
.into_iter()
.map(|(k, v)| (k.to_string(), v.key))
.collect();
if !contains_all_ids(&result) {
debug!("Signing key not loaded for {}", origin);
@ -1353,7 +1511,10 @@ impl Service {
.into_keys()
.map(|server| async move {
(
services().sending.send_federation_request(&server, get_server_keys::v2::Request::new()).await,
services()
.sending
.send_federation_request(&server, get_server_keys::v2::Request::new())
.await,
server,
)
})
@ -1391,10 +1552,14 @@ impl Service {
// Try to fetch keys, failure is okay
// Servers we couldn't find in the cache will be added to `servers`
for pdu in &event.room_state.state {
_ = self.get_server_keys_from_cache(pdu, &mut servers, room_version, &mut pkm).await;
_ = self
.get_server_keys_from_cache(pdu, &mut servers, room_version, &mut pkm)
.await;
}
for pdu in &event.room_state.auth_chain {
_ = self.get_server_keys_from_cache(pdu, &mut servers, room_version, &mut pkm).await;
_ = self
.get_server_keys_from_cache(pdu, &mut servers, room_version, &mut pkm)
.await;
}
drop(pkm);
@ -1411,7 +1576,8 @@ impl Service {
homeserver signing keys."
);
self.batch_request_signing_keys(servers.clone(), pub_key_map).await?;
self.batch_request_signing_keys(servers.clone(), pub_key_map)
.await?;
if servers.is_empty() {
info!("Trusted server supplied all signing keys, no more keys to fetch");
@ -1420,11 +1586,13 @@ impl Service {
info!("Remaining servers left that the notary/trusted servers did not provide: {servers:?}");
self.request_signing_keys(servers.clone(), pub_key_map).await?;
self.request_signing_keys(servers.clone(), pub_key_map)
.await?;
} else {
info!("query_trusted_key_servers_first is set to false, querying individual homeservers first");
self.request_signing_keys(servers.clone(), pub_key_map).await?;
self.request_signing_keys(servers.clone(), pub_key_map)
.await?;
if servers.is_empty() {
info!("Individual homeservers supplied all signing keys, no more keys to fetch");
@ -1433,7 +1601,8 @@ impl Service {
info!("Remaining servers left the individual homeservers did not provide: {servers:?}");
self.batch_request_signing_keys(servers.clone(), pub_key_map).await?;
self.batch_request_signing_keys(servers.clone(), pub_key_map)
.await?;
}
info!("Search for signing keys done");
@ -1448,7 +1617,11 @@ impl Service {
/// Returns Ok if the acl allows the server
pub fn acl_check(&self, server_name: &ServerName, room_id: &RoomId) -> Result<()> {
let acl_event =
match services().rooms.state_accessor.room_state_get(room_id, &StateEventType::RoomServerAcl, "")? {
match services()
.rooms
.state_accessor
.room_state_get(room_id, &StateEventType::RoomServerAcl, "")?
{
Some(acl) => {
debug!("ACL event found: {acl:?}");
acl
@ -1493,21 +1666,36 @@ impl Service {
) -> Result<BTreeMap<String, Base64>> {
let contains_all_ids = |keys: &BTreeMap<String, Base64>| signature_ids.iter().all(|id| keys.contains_key(id));
let permit =
services().globals.servername_ratelimiter.read().await.get(origin).map(|s| Arc::clone(s).acquire_owned());
let permit = services()
.globals
.servername_ratelimiter
.read()
.await
.get(origin)
.map(|s| Arc::clone(s).acquire_owned());
let permit = if let Some(p) = permit {
p
} else {
let mut write = services().globals.servername_ratelimiter.write().await;
let s = Arc::clone(write.entry(origin.to_owned()).or_insert_with(|| Arc::new(Semaphore::new(1))));
let s = Arc::clone(
write
.entry(origin.to_owned())
.or_insert_with(|| Arc::new(Semaphore::new(1))),
);
s.acquire_owned()
}
.await;
let back_off = |id| async {
match services().globals.bad_signature_ratelimiter.write().await.entry(id) {
match services()
.globals
.bad_signature_ratelimiter
.write()
.await
.entry(id)
{
hash_map::Entry::Vacant(e) => {
e.insert((Instant::now(), 1));
},
@ -1515,7 +1703,13 @@ impl Service {
}
};
if let Some((time, tries)) = services().globals.bad_signature_ratelimiter.read().await.get(&signature_ids) {
if let Some((time, tries)) = services()
.globals
.bad_signature_ratelimiter
.read()
.await
.get(&signature_ids)
{
// Exponential backoff
let mut min_elapsed_duration = Duration::from_secs(5 * 60) * (*tries) * (*tries);
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) {
@ -1530,8 +1724,12 @@ impl Service {
debug!("Loading signing keys for {origin} from our database if available");
let mut result: BTreeMap<_, _> =
services().globals.signing_keys_for(origin)?.into_iter().map(|(k, v)| (k.to_string(), v.key)).collect();
let mut result: BTreeMap<_, _> = services()
.globals
.signing_keys_for(origin)?
.into_iter()
.map(|(k, v)| (k.to_string(), v.key))
.collect();
if contains_all_ids(&result) {
debug!("We have all homeserver signing keys locally for {origin}, not fetching any remotely");
@ -1554,20 +1752,34 @@ impl Service {
get_remote_server_keys::v2::Request::new(
origin.to_owned(),
MilliSecondsSinceUnixEpoch::from_system_time(
SystemTime::now().checked_add(Duration::from_secs(3600)).expect("SystemTime too large"),
SystemTime::now()
.checked_add(Duration::from_secs(3600))
.expect("SystemTime too large"),
)
.expect("time is valid"),
),
)
.await
.ok()
.map(|resp| resp.server_keys.into_iter().filter_map(|e| e.deserialize().ok()).collect::<Vec<_>>())
{
.map(|resp| {
resp.server_keys
.into_iter()
.filter_map(|e| e.deserialize().ok())
.collect::<Vec<_>>()
}) {
debug!("Got signing keys: {:?}", server_keys);
for k in server_keys {
services().globals.add_signing_key(origin, k.clone())?;
result.extend(k.verify_keys.into_iter().map(|(k, v)| (k.to_string(), v.key)));
result.extend(k.old_verify_keys.into_iter().map(|(k, v)| (k.to_string(), v.key)));
result.extend(
k.verify_keys
.into_iter()
.map(|(k, v)| (k.to_string(), v.key)),
);
result.extend(
k.old_verify_keys
.into_iter()
.map(|(k, v)| (k.to_string(), v.key)),
);
}
if contains_all_ids(&result) {
@ -1584,10 +1796,22 @@ impl Service {
.ok()
.and_then(|resp| resp.server_key.deserialize().ok())
{
services().globals.add_signing_key(origin, server_key.clone())?;
services()
.globals
.add_signing_key(origin, server_key.clone())?;
result.extend(server_key.verify_keys.into_iter().map(|(k, v)| (k.to_string(), v.key)));
result.extend(server_key.old_verify_keys.into_iter().map(|(k, v)| (k.to_string(), v.key)));
result.extend(
server_key
.verify_keys
.into_iter()
.map(|(k, v)| (k.to_string(), v.key)),
);
result.extend(
server_key
.old_verify_keys
.into_iter()
.map(|(k, v)| (k.to_string(), v.key)),
);
if contains_all_ids(&result) {
return Ok(result);
@ -1604,10 +1828,22 @@ impl Service {
.ok()
.and_then(|resp| resp.server_key.deserialize().ok())
{
services().globals.add_signing_key(origin, server_key.clone())?;
services()
.globals
.add_signing_key(origin, server_key.clone())?;
result.extend(server_key.verify_keys.into_iter().map(|(k, v)| (k.to_string(), v.key)));
result.extend(server_key.old_verify_keys.into_iter().map(|(k, v)| (k.to_string(), v.key)));
result.extend(
server_key
.verify_keys
.into_iter()
.map(|(k, v)| (k.to_string(), v.key)),
);
result.extend(
server_key
.old_verify_keys
.into_iter()
.map(|(k, v)| (k.to_string(), v.key)),
);
if contains_all_ids(&result) {
return Ok(result);
@ -1623,20 +1859,34 @@ impl Service {
get_remote_server_keys::v2::Request::new(
origin.to_owned(),
MilliSecondsSinceUnixEpoch::from_system_time(
SystemTime::now().checked_add(Duration::from_secs(3600)).expect("SystemTime too large"),
SystemTime::now()
.checked_add(Duration::from_secs(3600))
.expect("SystemTime too large"),
)
.expect("time is valid"),
),
)
.await
.ok()
.map(|resp| resp.server_keys.into_iter().filter_map(|e| e.deserialize().ok()).collect::<Vec<_>>())
{
.map(|resp| {
resp.server_keys
.into_iter()
.filter_map(|e| e.deserialize().ok())
.collect::<Vec<_>>()
}) {
debug!("Got signing keys: {:?}", server_keys);
for k in server_keys {
services().globals.add_signing_key(origin, k.clone())?;
result.extend(k.verify_keys.into_iter().map(|(k, v)| (k.to_string(), v.key)));
result.extend(k.old_verify_keys.into_iter().map(|(k, v)| (k.to_string(), v.key)));
result.extend(
k.verify_keys
.into_iter()
.map(|(k, v)| (k.to_string(), v.key)),
);
result.extend(
k.old_verify_keys
.into_iter()
.map(|(k, v)| (k.to_string(), v.key)),
);
}
if contains_all_ids(&result) {

View file

@ -20,7 +20,8 @@ impl Service {
pub fn lazy_load_was_sent_before(
&self, user_id: &UserId, device_id: &DeviceId, room_id: &RoomId, ll_user: &UserId,
) -> Result<bool> {
self.db.lazy_load_was_sent_before(user_id, device_id, room_id, ll_user)
self.db
.lazy_load_was_sent_before(user_id, device_id, room_id, ll_user)
}
#[tracing::instrument(skip(self))]
@ -44,7 +45,8 @@ impl Service {
room_id.to_owned(),
since,
)) {
self.db.lazy_load_confirm_delivery(user_id, device_id, room_id, &mut user_ids.iter().map(|u| &**u))?;
self.db
.lazy_load_confirm_delivery(user_id, device_id, room_id, &mut user_ids.iter().map(|u| &**u))?;
} else {
// Ignore
}

View file

@ -126,8 +126,10 @@ impl Service {
next_token = events_before.last().map(|(count, _)| count).copied();
let events_before: Vec<_> =
events_before.into_iter().map(|(_, pdu)| pdu.to_message_like_event()).collect();
let events_before: Vec<_> = events_before
.into_iter()
.map(|(_, pdu)| pdu.to_message_like_event())
.collect();
Ok(get_relating_events::v1::Response {
chunk: events_before,

View file

@ -148,7 +148,13 @@ impl Arena {
)];
while let Some(parent) = self.parent(parents.last().expect("Has at least one value, as above").0) {
parents.push((parent, self.get(parent).expect("It is some, as above").room_id.clone()));
parents.push((
parent,
self.get(parent)
.expect("It is some, as above")
.room_id
.clone(),
));
}
// If at max_depth, don't add new rooms
@ -178,7 +184,10 @@ impl Arena {
}
if self.first_untraversed.is_none()
|| parent >= self.first_untraversed.expect("Should have already continued if none")
|| parent
>= self
.first_untraversed
.expect("Should have already continued if none")
{
self.first_untraversed = next_id;
}
@ -187,7 +196,9 @@ impl Arena {
// This is done as if we use an if-let above, we cannot reference self.nodes
// above as then we would have multiple mutable references
let node = self.get_mut(parent).expect("Must be some, as inside this block");
let node = self
.get_mut(parent)
.expect("Must be some, as inside this block");
node.first_child = next_id;
}
@ -324,7 +335,10 @@ impl Service {
pub async fn get_federation_hierarchy(
&self, room_id: &RoomId, server_name: &ServerName, suggested_only: bool,
) -> Result<federation::space::get_hierarchy::v1::Response> {
match self.get_summary_and_children(&room_id.to_owned(), suggested_only, Identifier::None).await? {
match self
.get_summary_and_children(&room_id.to_owned(), suggested_only, Identifier::None)
.await?
{
Some(SummaryAccessibility::Accessible(room)) => {
let mut children = Vec::new();
let mut inaccessible_children = Vec::new();
@ -360,7 +374,13 @@ impl Service {
async fn get_summary_and_children(
&self, current_room: &OwnedRoomId, suggested_only: bool, identifier: Identifier<'_>,
) -> Result<Option<SummaryAccessibility>> {
if let Some(cached) = self.roomid_spacehierarchy_cache.lock().await.get_mut(&current_room.to_owned()).as_ref() {
if let Some(cached) = self
.roomid_spacehierarchy_cache
.lock()
.await
.get_mut(&current_room.to_owned())
.as_ref()
{
return Ok(if let Some(cached) = cached {
if is_accessable_child(
current_room,
@ -395,7 +415,9 @@ impl Service {
// Federation requests should not request information from other
// servers
} else if let Identifier::UserId(_) = identifier {
let server = current_room.server_name().expect("Room IDs should always have a server name");
let server = current_room
.server_name()
.expect("Room IDs should always have a server name");
if server == services().globals.server_name() {
return Ok(None);
}
@ -473,7 +495,10 @@ impl Service {
Some(SummaryAccessibility::Inaccessible)
}
} else {
self.roomid_spacehierarchy_cache.lock().await.insert(current_room.clone(), None);
self.roomid_spacehierarchy_cache
.lock()
.await
.insert(current_room.clone(), None);
None
}
@ -494,10 +519,12 @@ impl Service {
.state_accessor
.room_state_get(room_id, &StateEventType::RoomJoinRules, "")?
.map(|s| {
serde_json::from_str(s.content.get()).map(|c: RoomJoinRulesEventContent| c.join_rule).map_err(|e| {
error!("Invalid room join rule event in database: {}", e);
Error::BadDatabase("Invalid room join rule event in database.")
})
serde_json::from_str(s.content.get())
.map(|c: RoomJoinRulesEventContent| c.join_rule)
.map_err(|e| {
error!("Invalid room join rule event in database: {}", e);
Error::BadDatabase("Invalid room join rule event in database.")
})
})
.transpose()?
.unwrap_or(JoinRule::Invite);
@ -534,15 +561,18 @@ impl Service {
.try_into()
.expect("user count should not be that big"),
room_id: room_id.to_owned(),
topic: services().rooms.state_accessor.room_state_get(room_id, &StateEventType::RoomTopic, "")?.map_or(
Ok(None),
|s| {
serde_json::from_str(s.content.get()).map(|c: RoomTopicEventContent| Some(c.topic)).map_err(|_| {
error!("Invalid room topic event in database for room {}", room_id);
Error::bad_database("Invalid room topic event in database.")
})
},
)?,
topic: services()
.rooms
.state_accessor
.room_state_get(room_id, &StateEventType::RoomTopic, "")?
.map_or(Ok(None), |s| {
serde_json::from_str(s.content.get())
.map(|c: RoomTopicEventContent| Some(c.topic))
.map_err(|_| {
error!("Invalid room topic event in database for room {}", room_id);
Error::bad_database("Invalid room topic event in database.")
})
})?,
world_readable: world_readable(room_id)?,
guest_can_join: guest_can_join(room_id)?,
avatar_url: services()
@ -588,7 +618,9 @@ impl Service {
let mut arena = Arena::new(summary.room_id.clone(), max_depth);
let mut results = Vec::new();
let root = arena.first_untraversed().expect("The node just added is not traversed");
let root = arena
.first_untraversed()
.expect("The node just added is not traversed");
arena.push(root, get_parent_children(&summary.clone(), suggested_only));
results.push(summary_to_chunk(*summary.clone()));
@ -597,7 +629,10 @@ impl Service {
if limit > results.len() {
if let Some(SummaryAccessibility::Accessible(summary)) = self
.get_summary_and_children(
&arena.get(current_room).expect("We added this node, it must exist").room_id,
&arena
.get(current_room)
.expect("We added this node, it must exist")
.room_id,
suggested_only,
Identifier::UserId(sender_user),
)
@ -651,7 +686,11 @@ async fn get_stripped_space_child_events(
room_id: &RoomId,
) -> Result<Option<Vec<Raw<HierarchySpaceChildEvent>>>, Error> {
if let Some(current_shortstatehash) = services().rooms.state.get_room_shortstatehash(room_id)? {
let state = services().rooms.state_accessor.state_full_ids(current_shortstatehash).await?;
let state = services()
.rooms
.state_accessor
.state_full_ids(current_shortstatehash)
.await?;
let mut children_pdus = Vec::new();
for (key, id) in state {
let (event_type, state_key) = services().rooms.short.get_statekey_from_short(key)?;
@ -702,13 +741,24 @@ fn is_accessable_child_recurse(
let room_id: &RoomId = current_room;
// Checks if ACLs allow for the server to participate
if services().rooms.event_handler.acl_check(server_name, room_id).is_err() {
if services()
.rooms
.event_handler
.acl_check(server_name, room_id)
.is_err()
{
return Ok(false);
}
},
Identifier::UserId(user_id) => {
if services().rooms.state_cache.is_joined(user_id, current_room)?
|| services().rooms.state_cache.is_invited(user_id, current_room)?
if services()
.rooms
.state_cache
.is_joined(user_id, current_room)?
|| services()
.rooms
.state_cache
.is_invited(user_id, current_room)?
{
return Ok(true);
}
@ -746,26 +796,28 @@ fn is_accessable_child_recurse(
/// Checks if guests are able to join a given room
fn guest_can_join(room_id: &RoomId) -> Result<bool, Error> {
services().rooms.state_accessor.room_state_get(room_id, &StateEventType::RoomGuestAccess, "")?.map_or(
Ok(false),
|s| {
services()
.rooms
.state_accessor
.room_state_get(room_id, &StateEventType::RoomGuestAccess, "")?
.map_or(Ok(false), |s| {
serde_json::from_str(s.content.get())
.map(|c: RoomGuestAccessEventContent| c.guest_access == GuestAccess::CanJoin)
.map_err(|_| Error::bad_database("Invalid room guest access event in database."))
},
)
})
}
/// Checks if guests are able to view room content without joining
fn world_readable(room_id: &RoomId) -> Result<bool, Error> {
services().rooms.state_accessor.room_state_get(room_id, &StateEventType::RoomHistoryVisibility, "")?.map_or(
Ok(false),
|s| {
services()
.rooms
.state_accessor
.room_state_get(room_id, &StateEventType::RoomHistoryVisibility, "")?
.map_or(Ok(false), |s| {
serde_json::from_str(s.content.get())
.map(|c: RoomHistoryVisibilityEventContent| c.history_visibility == HistoryVisibility::WorldReadable)
.map_err(|_| Error::bad_database("Invalid room history visibility event in database."))
},
)
})
}
/// Returns the join rule for a given room

View file

@ -36,7 +36,12 @@ impl Service {
state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<()> {
for event_id in statediffnew.iter().filter_map(|new| {
services().rooms.state_compressor.parse_compressed_state_event(new).ok().map(|(_, id)| id)
services()
.rooms
.state_compressor
.parse_compressed_state_event(new)
.ok()
.map(|(_, id)| id)
}) {
let pdu = match services().rooms.timeline.get_pdu_json(&event_id)? {
Some(pdu) => pdu,
@ -74,7 +79,13 @@ impl Service {
.await?;
},
TimelineEventType::SpaceChild => {
services().rooms.spaces.roomid_spacehierarchy_cache.lock().await.remove(&pdu.room_id);
services()
.rooms
.spaces
.roomid_spacehierarchy_cache
.lock()
.await
.remove(&pdu.room_id);
},
_ => continue,
}
@ -82,7 +93,8 @@ impl Service {
services().rooms.state_cache.update_joined_count(room_id)?;
self.db.set_room_state(room_id, shortstatehash, state_lock)?;
self.db
.set_room_state(room_id, shortstatehash, state_lock)?;
Ok(())
}
@ -95,25 +107,47 @@ impl Service {
pub fn set_event_state(
&self, event_id: &EventId, room_id: &RoomId, state_ids_compressed: Arc<HashSet<CompressedStateEvent>>,
) -> Result<u64> {
let shorteventid = services().rooms.short.get_or_create_shorteventid(event_id)?;
let shorteventid = services()
.rooms
.short
.get_or_create_shorteventid(event_id)?;
let previous_shortstatehash = self.db.get_room_shortstatehash(room_id)?;
let state_hash = calculate_hash(&state_ids_compressed.iter().map(|s| &s[..]).collect::<Vec<_>>());
let state_hash = calculate_hash(
&state_ids_compressed
.iter()
.map(|s| &s[..])
.collect::<Vec<_>>(),
);
let (shortstatehash, already_existed) = services().rooms.short.get_or_create_shortstatehash(&state_hash)?;
let (shortstatehash, already_existed) = services()
.rooms
.short
.get_or_create_shortstatehash(&state_hash)?;
if !already_existed {
let states_parents = previous_shortstatehash.map_or_else(
|| Ok(Vec::new()),
|p| services().rooms.state_compressor.load_shortstatehash_info(p),
|p| {
services()
.rooms
.state_compressor
.load_shortstatehash_info(p)
},
)?;
let (statediffnew, statediffremoved) = if let Some(parent_stateinfo) = states_parents.last() {
let statediffnew: HashSet<_> = state_ids_compressed.difference(&parent_stateinfo.1).copied().collect();
let statediffnew: HashSet<_> = state_ids_compressed
.difference(&parent_stateinfo.1)
.copied()
.collect();
let statediffremoved: HashSet<_> =
parent_stateinfo.1.difference(&state_ids_compressed).copied().collect();
let statediffremoved: HashSet<_> = parent_stateinfo
.1
.difference(&state_ids_compressed)
.copied()
.collect();
(Arc::new(statediffnew), Arc::new(statediffremoved))
} else {
@ -139,7 +173,10 @@ impl Service {
/// to `stateid_pduid` and adds the incoming event to `eventid_statehash`.
#[tracing::instrument(skip(self, new_pdu))]
pub fn append_to_state(&self, new_pdu: &PduEvent) -> Result<u64> {
let shorteventid = services().rooms.short.get_or_create_shorteventid(&new_pdu.event_id)?;
let shorteventid = services()
.rooms
.short
.get_or_create_shorteventid(&new_pdu.event_id)?;
let previous_shortstatehash = self.get_room_shortstatehash(&new_pdu.room_id)?;
@ -150,17 +187,31 @@ impl Service {
if let Some(state_key) = &new_pdu.state_key {
let states_parents = previous_shortstatehash.map_or_else(
|| Ok(Vec::new()),
|p| services().rooms.state_compressor.load_shortstatehash_info(p),
|p| {
services()
.rooms
.state_compressor
.load_shortstatehash_info(p)
},
)?;
let shortstatekey =
services().rooms.short.get_or_create_shortstatekey(&new_pdu.kind.to_string().into(), state_key)?;
let shortstatekey = services()
.rooms
.short
.get_or_create_shortstatekey(&new_pdu.kind.to_string().into(), state_key)?;
let new = services().rooms.state_compressor.compress_state_event(shortstatekey, &new_pdu.event_id)?;
let new = services()
.rooms
.state_compressor
.compress_state_event(shortstatekey, &new_pdu.event_id)?;
let replaces = states_parents
.last()
.map(|info| info.1.iter().find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes())))
.map(|info| {
info.1
.iter()
.find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes()))
})
.unwrap_or_default();
if Some(&new) == replaces {
@ -197,12 +248,18 @@ impl Service {
let mut state = Vec::new();
// Add recommended events
if let Some(e) =
services().rooms.state_accessor.room_state_get(&invite_event.room_id, &StateEventType::RoomCreate, "")?
services()
.rooms
.state_accessor
.room_state_get(&invite_event.room_id, &StateEventType::RoomCreate, "")?
{
state.push(e.to_stripped_state_event());
}
if let Some(e) =
services().rooms.state_accessor.room_state_get(&invite_event.room_id, &StateEventType::RoomJoinRules, "")?
services()
.rooms
.state_accessor
.room_state_get(&invite_event.room_id, &StateEventType::RoomJoinRules, "")?
{
state.push(e.to_stripped_state_event());
}
@ -214,12 +271,18 @@ impl Service {
state.push(e.to_stripped_state_event());
}
if let Some(e) =
services().rooms.state_accessor.room_state_get(&invite_event.room_id, &StateEventType::RoomAvatar, "")?
services()
.rooms
.state_accessor
.room_state_get(&invite_event.room_id, &StateEventType::RoomAvatar, "")?
{
state.push(e.to_stripped_state_event());
}
if let Some(e) =
services().rooms.state_accessor.room_state_get(&invite_event.room_id, &StateEventType::RoomName, "")?
services()
.rooms
.state_accessor
.room_state_get(&invite_event.room_id, &StateEventType::RoomName, "")?
{
state.push(e.to_stripped_state_event());
}
@ -249,7 +312,10 @@ impl Service {
/// Returns the room's version.
#[tracing::instrument(skip(self))]
pub fn get_room_version(&self, room_id: &RoomId) -> Result<RoomVersionId> {
let create_event = services().rooms.state_accessor.room_state_get(room_id, &StateEventType::RoomCreate, "")?;
let create_event = services()
.rooms
.state_accessor
.room_state_get(room_id, &StateEventType::RoomCreate, "")?;
let create_event_content: RoomCreateEventContent = create_event
.as_ref()
@ -279,7 +345,8 @@ impl Service {
event_ids: Vec<OwnedEventId>,
state_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<()> {
self.db.set_forward_extremities(room_id, event_ids, state_lock)
self.db
.set_forward_extremities(room_id, event_ids, state_lock)
}
/// This fetches auth events from the current state.
@ -321,9 +388,23 @@ impl Service {
Ok(full_state
.iter()
.filter_map(|compressed| services().rooms.state_compressor.parse_compressed_state_event(compressed).ok())
.filter_map(|compressed| {
services()
.rooms
.state_compressor
.parse_compressed_state_event(compressed)
.ok()
})
.filter_map(|(shortstatekey, event_id)| sauthevents.remove(&shortstatekey).map(|k| (k, event_id)))
.filter_map(|(k, event_id)| services().rooms.timeline.get_pdu(&event_id).ok().flatten().map(|pdu| (k, pdu)))
.filter_map(|(k, event_id)| {
services()
.rooms
.timeline
.get_pdu(&event_id)
.ok()
.flatten()
.map(|pdu| (k, pdu))
})
.collect())
}
}

View file

@ -59,19 +59,18 @@ impl Service {
/// Get membership for given user in state
fn user_membership(&self, shortstatehash: u64, user_id: &UserId) -> Result<MembershipState> {
self.state_get(shortstatehash, &StateEventType::RoomMember, user_id.as_str())?.map_or(
Ok(MembershipState::Leave),
|s| {
self.state_get(shortstatehash, &StateEventType::RoomMember, user_id.as_str())?
.map_or(Ok(MembershipState::Leave), |s| {
serde_json::from_str(s.content.get())
.map(|c: RoomMemberEventContent| c.membership)
.map_err(|_| Error::bad_database("Invalid room membership event in database."))
},
)
})
}
/// The user was a joined member at this state (potentially in the past)
fn user_was_joined(&self, shortstatehash: u64, user_id: &UserId) -> bool {
self.user_membership(shortstatehash, user_id).is_ok_and(|s| s == MembershipState::Join)
self.user_membership(shortstatehash, user_id)
.is_ok_and(|s| s == MembershipState::Join)
// Return sensible default, i.e.
// false
}
@ -92,20 +91,22 @@ impl Service {
return Ok(true);
};
if let Some(visibility) =
self.server_visibility_cache.lock().unwrap().get_mut(&(origin.to_owned(), shortstatehash))
if let Some(visibility) = self
.server_visibility_cache
.lock()
.unwrap()
.get_mut(&(origin.to_owned(), shortstatehash))
{
return Ok(*visibility);
}
let history_visibility = self.state_get(shortstatehash, &StateEventType::RoomHistoryVisibility, "")?.map_or(
Ok(HistoryVisibility::Shared),
|s| {
let history_visibility = self
.state_get(shortstatehash, &StateEventType::RoomHistoryVisibility, "")?
.map_or(Ok(HistoryVisibility::Shared), |s| {
serde_json::from_str(s.content.get())
.map(|c: RoomHistoryVisibilityEventContent| c.history_visibility)
.map_err(|_| Error::bad_database("Invalid history visibility event in database."))
},
)?;
})?;
let mut current_server_members = services()
.rooms
@ -130,7 +131,10 @@ impl Service {
},
};
self.server_visibility_cache.lock().unwrap().insert((origin.to_owned(), shortstatehash), visibility);
self.server_visibility_cache
.lock()
.unwrap()
.insert((origin.to_owned(), shortstatehash), visibility);
Ok(visibility)
}
@ -144,22 +148,24 @@ impl Service {
None => return Ok(true),
};
if let Some(visibility) =
self.user_visibility_cache.lock().unwrap().get_mut(&(user_id.to_owned(), shortstatehash))
if let Some(visibility) = self
.user_visibility_cache
.lock()
.unwrap()
.get_mut(&(user_id.to_owned(), shortstatehash))
{
return Ok(*visibility);
}
let currently_member = services().rooms.state_cache.is_joined(user_id, room_id)?;
let history_visibility = self.state_get(shortstatehash, &StateEventType::RoomHistoryVisibility, "")?.map_or(
Ok(HistoryVisibility::Shared),
|s| {
let history_visibility = self
.state_get(shortstatehash, &StateEventType::RoomHistoryVisibility, "")?
.map_or(Ok(HistoryVisibility::Shared), |s| {
serde_json::from_str(s.content.get())
.map(|c: RoomHistoryVisibilityEventContent| c.history_visibility)
.map_err(|_| Error::bad_database("Invalid history visibility event in database."))
},
)?;
})?;
let visibility = match history_visibility {
HistoryVisibility::WorldReadable => true,
@ -178,7 +184,10 @@ impl Service {
},
};
self.user_visibility_cache.lock().unwrap().insert((user_id.to_owned(), shortstatehash), visibility);
self.user_visibility_cache
.lock()
.unwrap()
.insert((user_id.to_owned(), shortstatehash), visibility);
Ok(visibility)
}
@ -189,17 +198,16 @@ impl Service {
pub fn user_can_see_state_events(&self, user_id: &UserId, room_id: &RoomId) -> Result<bool> {
let currently_member = services().rooms.state_cache.is_joined(user_id, room_id)?;
let history_visibility = self.room_state_get(room_id, &StateEventType::RoomHistoryVisibility, "")?.map_or(
Ok(HistoryVisibility::Shared),
|s| {
let history_visibility = self
.room_state_get(room_id, &StateEventType::RoomHistoryVisibility, "")?
.map_or(Ok(HistoryVisibility::Shared), |s| {
serde_json::from_str(s.content.get())
.map(|c: RoomHistoryVisibilityEventContent| c.history_visibility)
.map_err(|e| {
error!("Invalid history visibility event in database for room {}: {e}", &room_id);
Error::bad_database("Invalid history visibility event in database.")
})
},
)?;
})?;
Ok(currently_member || history_visibility == HistoryVisibility::WorldReadable)
}
@ -232,28 +240,34 @@ impl Service {
}
pub fn get_name(&self, room_id: &RoomId) -> Result<Option<String>> {
services().rooms.state_accessor.room_state_get(room_id, &StateEventType::RoomName, "")?.map_or(Ok(None), |s| {
Ok(serde_json::from_str(s.content.get()).map_or_else(|_| None, |c: RoomNameEventContent| Some(c.name)))
})
services()
.rooms
.state_accessor
.room_state_get(room_id, &StateEventType::RoomName, "")?
.map_or(Ok(None), |s| {
Ok(serde_json::from_str(s.content.get()).map_or_else(|_| None, |c: RoomNameEventContent| Some(c.name)))
})
}
pub fn get_avatar(&self, room_id: &RoomId) -> Result<ruma::JsOption<RoomAvatarEventContent>> {
services().rooms.state_accessor.room_state_get(room_id, &StateEventType::RoomAvatar, "")?.map_or(
Ok(ruma::JsOption::Undefined),
|s| {
services()
.rooms
.state_accessor
.room_state_get(room_id, &StateEventType::RoomAvatar, "")?
.map_or(Ok(ruma::JsOption::Undefined), |s| {
serde_json::from_str(s.content.get())
.map_err(|_| Error::bad_database("Invalid room avatar event in database."))
},
)
})
}
pub fn get_member(&self, room_id: &RoomId, user_id: &UserId) -> Result<Option<RoomMemberEventContent>> {
services().rooms.state_accessor.room_state_get(room_id, &StateEventType::RoomMember, user_id.as_str())?.map_or(
Ok(None),
|s| {
services()
.rooms
.state_accessor
.room_state_get(room_id, &StateEventType::RoomMember, user_id.as_str())?
.map_or(Ok(None), |s| {
serde_json::from_str(s.content.get())
.map_err(|_| Error::bad_database("Invalid room member event in database."))
},
)
})
}
}

View file

@ -165,7 +165,9 @@ impl Service {
.get(
None, // Ignored users are in global account data
user_id, // Receiver
GlobalAccountDataEventType::IgnoredUserList.to_string().into(),
GlobalAccountDataEventType::IgnoredUserList
.to_string()
.into(),
)?
.map(|event| {
serde_json::from_str::<IgnoredUserListEvent>(event.get()).map_err(|e| {
@ -175,7 +177,11 @@ impl Service {
})
.transpose()?
.map_or(false, |ignored| {
ignored.content.ignored_users.iter().any(|(user, _details)| user == sender)
ignored
.content
.ignored_users
.iter()
.any(|(user, _details)| user == sender)
});
if is_ignored {

View file

@ -55,7 +55,12 @@ impl Service {
/// removed diff for the selected shortstatehash and each parent layer.
#[tracing::instrument(skip(self))]
pub fn load_shortstatehash_info(&self, shortstatehash: u64) -> ShortStateInfoResult {
if let Some(r) = self.stateinfo_cache.lock().unwrap().get_mut(&shortstatehash) {
if let Some(r) = self
.stateinfo_cache
.lock()
.unwrap()
.get_mut(&shortstatehash)
{
return Ok(r.clone());
}
@ -76,19 +81,31 @@ impl Service {
response.push((shortstatehash, Arc::new(state), added, Arc::new(removed)));
self.stateinfo_cache.lock().unwrap().insert(shortstatehash, response.clone());
self.stateinfo_cache
.lock()
.unwrap()
.insert(shortstatehash, response.clone());
Ok(response)
} else {
let response = vec![(shortstatehash, added.clone(), added, removed)];
self.stateinfo_cache.lock().unwrap().insert(shortstatehash, response.clone());
self.stateinfo_cache
.lock()
.unwrap()
.insert(shortstatehash, response.clone());
Ok(response)
}
}
pub fn compress_state_event(&self, shortstatekey: u64, event_id: &EventId) -> Result<CompressedStateEvent> {
let mut v = shortstatekey.to_be_bytes().to_vec();
v.extend_from_slice(&services().rooms.short.get_or_create_shorteventid(event_id)?.to_be_bytes());
v.extend_from_slice(
&services()
.rooms
.short
.get_or_create_shorteventid(event_id)?
.to_be_bytes(),
);
Ok(v.try_into().expect("we checked the size above"))
}
@ -238,10 +255,17 @@ impl Service {
) -> HashSetCompressStateEvent {
let previous_shortstatehash = services().rooms.state.get_room_shortstatehash(room_id)?;
let state_hash =
utils::calculate_hash(&new_state_ids_compressed.iter().map(|bytes| &bytes[..]).collect::<Vec<_>>());
let state_hash = utils::calculate_hash(
&new_state_ids_compressed
.iter()
.map(|bytes| &bytes[..])
.collect::<Vec<_>>(),
);
let (new_shortstatehash, already_existed) = services().rooms.short.get_or_create_shortstatehash(&state_hash)?;
let (new_shortstatehash, already_existed) = services()
.rooms
.short
.get_or_create_shortstatehash(&state_hash)?;
if Some(new_shortstatehash) == previous_shortstatehash {
return Ok((new_shortstatehash, Arc::new(HashSet::new()), Arc::new(HashSet::new())));
@ -251,10 +275,16 @@ impl Service {
previous_shortstatehash.map_or_else(|| Ok(Vec::new()), |p| self.load_shortstatehash_info(p))?;
let (statediffnew, statediffremoved) = if let Some(parent_stateinfo) = states_parents.last() {
let statediffnew: HashSet<_> = new_state_ids_compressed.difference(&parent_stateinfo.1).copied().collect();
let statediffnew: HashSet<_> = new_state_ids_compressed
.difference(&parent_stateinfo.1)
.copied()
.collect();
let statediffremoved: HashSet<_> =
parent_stateinfo.1.difference(&new_state_ids_compressed).copied().collect();
let statediffremoved: HashSet<_> = parent_stateinfo
.1
.difference(&new_state_ids_compressed)
.copied()
.collect();
(Arc::new(statediffnew), Arc::new(statediffremoved))
} else {

View file

@ -60,7 +60,9 @@ impl Service {
unsigned.insert(
"m.relations".to_owned(),
json!({ "m.thread": content }).try_into().expect("thread is valid json"),
json!({ "m.thread": content })
.try_into()
.expect("thread is valid json"),
);
} else {
// New thread
@ -74,11 +76,16 @@ impl Service {
unsigned.insert(
"m.relations".to_owned(),
json!({ "m.thread": content }).try_into().expect("thread is valid json"),
json!({ "m.thread": content })
.try_into()
.expect("thread is valid json"),
);
}
services().rooms.timeline.replace_pdu(root_id, &root_pdu_json, &root_pdu)?;
services()
.rooms
.timeline
.replace_pdu(root_id, &root_pdu_json, &root_pdu)?;
}
let mut users = Vec::new();

View file

@ -152,7 +152,10 @@ impl Service {
/// Returns the version of a room, if known
pub fn get_room_version(&self, room_id: &RoomId) -> Result<Option<RoomVersionId>> {
let create_event = services().rooms.state_accessor.room_state_get(room_id, &StateEventType::RoomCreate, "")?;
let create_event = services()
.rooms
.state_accessor
.room_state_get(room_id, &StateEventType::RoomCreate, "")?;
let create_event_content: Option<RoomCreateEventContent> = create_event
.as_ref()
@ -225,16 +228,25 @@ impl Service {
// Coalesce database writes for the remainder of this scope.
let _cork = services().globals.db.cork_and_flush()?;
let shortroomid = services().rooms.short.get_shortroomid(&pdu.room_id)?.expect("room exists");
let shortroomid = services()
.rooms
.short
.get_shortroomid(&pdu.room_id)?
.expect("room exists");
// Make unsigned fields correct. This is not properly documented in the spec,
// but state events need to have previous content in the unsigned field, so
// clients can easily interpret things like membership changes
if let Some(state_key) = &pdu.state_key {
if let CanonicalJsonValue::Object(unsigned) =
pdu_json.entry("unsigned".to_owned()).or_insert_with(|| CanonicalJsonValue::Object(BTreeMap::default()))
if let CanonicalJsonValue::Object(unsigned) = pdu_json
.entry("unsigned".to_owned())
.or_insert_with(|| CanonicalJsonValue::Object(BTreeMap::default()))
{
if let Some(shortstatehash) = services().rooms.state_accessor.pdu_shortstatehash(&pdu.event_id).unwrap()
if let Some(shortstatehash) = services()
.rooms
.state_accessor
.pdu_shortstatehash(&pdu.event_id)
.unwrap()
{
if let Some(prev_state) = services()
.rooms
@ -259,18 +271,38 @@ impl Service {
}
// We must keep track of all events that have been referenced.
services().rooms.pdu_metadata.mark_as_referenced(&pdu.room_id, &pdu.prev_events)?;
services().rooms.state.set_forward_extremities(&pdu.room_id, leaves, state_lock)?;
services()
.rooms
.pdu_metadata
.mark_as_referenced(&pdu.room_id, &pdu.prev_events)?;
services()
.rooms
.state
.set_forward_extremities(&pdu.room_id, leaves, state_lock)?;
let mutex_insert =
Arc::clone(services().globals.roomid_mutex_insert.write().await.entry(pdu.room_id.clone()).or_default());
let mutex_insert = Arc::clone(
services()
.globals
.roomid_mutex_insert
.write()
.await
.entry(pdu.room_id.clone())
.or_default(),
);
let insert_lock = mutex_insert.lock().await;
let count1 = services().globals.next_count()?;
// Mark as read first so the sending client doesn't get a notification even if
// appending fails
services().rooms.edus.read_receipt.private_read_set(&pdu.room_id, &pdu.sender, count1)?;
services().rooms.user.reset_notification_counts(&pdu.sender, &pdu.room_id)?;
services()
.rooms
.edus
.read_receipt
.private_read_set(&pdu.room_id, &pdu.sender, count1)?;
services()
.rooms
.user
.reset_notification_counts(&pdu.sender, &pdu.room_id)?;
let count2 = services().globals.next_count()?;
let mut pdu_id = shortroomid.to_be_bytes().to_vec();
@ -318,7 +350,10 @@ impl Service {
let mut notifies = Vec::new();
let mut highlights = Vec::new();
let mut push_target = services().rooms.state_cache.get_our_real_users(&pdu.room_id)?;
let mut push_target = services()
.rooms
.state_cache
.get_our_real_users(&pdu.room_id)?;
if pdu.kind == TimelineEventType::RoomMember {
if let Some(state_key) = &pdu.state_key {
@ -354,7 +389,9 @@ impl Service {
let mut notify = false;
for action in
services().pusher.get_actions(user, &rules_for_user, &power_levels, &sync_pdu, &pdu.room_id)?
services()
.pusher
.get_actions(user, &rules_for_user, &power_levels, &sync_pdu, &pdu.room_id)?
{
match action {
Action::Notify => notify = true,
@ -378,7 +415,8 @@ impl Service {
}
}
self.db.increment_notification_counts(&pdu.room_id, notifies, highlights)?;
self.db
.increment_notification_counts(&pdu.room_id, notifies, highlights)?;
match pdu.kind {
TimelineEventType::RoomRedaction => {
@ -422,7 +460,13 @@ impl Service {
},
TimelineEventType::SpaceChild => {
if let Some(_state_key) = &pdu.state_key {
services().rooms.spaces.roomid_spacehierarchy_cache.lock().await.remove(&pdu.room_id);
services()
.rooms
.spaces
.roomid_spacehierarchy_cache
.lock()
.await
.remove(&pdu.room_id);
}
},
TimelineEventType::RoomMember => {
@ -463,7 +507,10 @@ impl Service {
.map_err(|_| Error::bad_database("Invalid content in pdu."))?;
if let Some(body) = content.body {
services().rooms.search.index_pdu(shortroomid, &pdu_id, &body)?;
services()
.rooms
.search
.index_pdu(shortroomid, &pdu_id, &body)?;
let server_user = format!("@conduit:{}", services().globals.server_name());
@ -488,8 +535,15 @@ impl Service {
}
if let Ok(content) = serde_json::from_str::<ExtractRelatesToEventId>(pdu.content.get()) {
if let Some(related_pducount) = services().rooms.timeline.get_pdu_count(&content.relates_to.event_id)? {
services().rooms.pdu_metadata.add_relation(PduCount::Normal(count2), related_pducount)?;
if let Some(related_pducount) = services()
.rooms
.timeline
.get_pdu_count(&content.relates_to.event_id)?
{
services()
.rooms
.pdu_metadata
.add_relation(PduCount::Normal(count2), related_pducount)?;
}
}
@ -500,32 +554,52 @@ impl Service {
} => {
// We need to do it again here, because replies don't have
// event_id as a top level field
if let Some(related_pducount) = services().rooms.timeline.get_pdu_count(&in_reply_to.event_id)? {
services().rooms.pdu_metadata.add_relation(PduCount::Normal(count2), related_pducount)?;
if let Some(related_pducount) = services()
.rooms
.timeline
.get_pdu_count(&in_reply_to.event_id)?
{
services()
.rooms
.pdu_metadata
.add_relation(PduCount::Normal(count2), related_pducount)?;
}
},
Relation::Thread(thread) => {
services().rooms.threads.add_to_thread(&thread.event_id, pdu)?;
services()
.rooms
.threads
.add_to_thread(&thread.event_id, pdu)?;
},
_ => {}, // TODO: Aggregate other types
}
}
for appservice in services().appservice.read().await.values() {
if services().rooms.state_cache.appservice_in_room(&pdu.room_id, appservice)? {
services().sending.send_pdu_appservice(appservice.registration.id.clone(), pdu_id.clone())?;
if services()
.rooms
.state_cache
.appservice_in_room(&pdu.room_id, appservice)?
{
services()
.sending
.send_pdu_appservice(appservice.registration.id.clone(), pdu_id.clone())?;
continue;
}
// If the RoomMember event has a non-empty state_key, it is targeted at someone.
// If it is our appservice user, we send this PDU to it.
if pdu.kind == TimelineEventType::RoomMember {
if let Some(state_key_uid) =
&pdu.state_key.as_ref().and_then(|state_key| UserId::parse(state_key.as_str()).ok())
if let Some(state_key_uid) = &pdu
.state_key
.as_ref()
.and_then(|state_key| UserId::parse(state_key.as_str()).ok())
{
let appservice_uid = appservice.registration.sender_localpart.as_str();
if state_key_uid == appservice_uid {
services().sending.send_pdu_appservice(appservice.registration.id.clone(), pdu_id.clone())?;
services()
.sending
.send_pdu_appservice(appservice.registration.id.clone(), pdu_id.clone())?;
continue;
}
}
@ -534,7 +608,10 @@ impl Service {
let matching_users = |users: &NamespaceRegex| {
appservice.users.is_match(pdu.sender.as_str())
|| pdu.kind == TimelineEventType::RoomMember
&& pdu.state_key.as_ref().map_or(false, |state_key| users.is_match(state_key))
&& pdu
.state_key
.as_ref()
.map_or(false, |state_key| users.is_match(state_key))
};
let matching_aliases = |aliases: &NamespaceRegex| {
services()
@ -549,7 +626,9 @@ impl Service {
|| appservice.rooms.is_match(pdu.room_id.as_str())
|| matching_users(&appservice.users)
{
services().sending.send_pdu_appservice(appservice.registration.id.clone(), pdu_id.clone())?;
services()
.sending
.send_pdu_appservice(appservice.registration.id.clone(), pdu_id.clone())?;
}
}
@ -571,31 +650,43 @@ impl Service {
redacts,
} = pdu_builder;
let prev_events: Vec<_> =
services().rooms.state.get_forward_extremities(room_id)?.into_iter().take(20).collect();
let prev_events: Vec<_> = services()
.rooms
.state
.get_forward_extremities(room_id)?
.into_iter()
.take(20)
.collect();
// If there was no create event yet, assume we are creating a room
let room_version_id = services().rooms.state.get_room_version(room_id).or_else(|_| {
if event_type == TimelineEventType::RoomCreate {
#[derive(Deserialize)]
struct RoomCreate {
room_version: RoomVersionId,
let room_version_id = services()
.rooms
.state
.get_room_version(room_id)
.or_else(|_| {
if event_type == TimelineEventType::RoomCreate {
#[derive(Deserialize)]
struct RoomCreate {
room_version: RoomVersionId,
}
let content =
serde_json::from_str::<RoomCreate>(content.get()).expect("Invalid content in RoomCreate pdu.");
Ok(content.room_version)
} else {
Err(Error::InconsistentRoomState(
"non-create event for room of unknown version",
room_id.to_owned(),
))
}
let content =
serde_json::from_str::<RoomCreate>(content.get()).expect("Invalid content in RoomCreate pdu.");
Ok(content.room_version)
} else {
Err(Error::InconsistentRoomState(
"non-create event for room of unknown version",
room_id.to_owned(),
))
}
})?;
})?;
let room_version = RoomVersion::new(&room_version_id).expect("room version is supported");
let auth_events =
services().rooms.state.get_auth_events(room_id, &event_type, sender, state_key.as_deref(), &content)?;
services()
.rooms
.state
.get_auth_events(room_id, &event_type, sender, state_key.as_deref(), &content)?;
// Our depth is the maximum depth of prev_events + 1
let depth = prev_events
@ -609,7 +700,10 @@ impl Service {
if let Some(state_key) = &state_key {
if let Some(prev_pdu) =
services().rooms.state_accessor.room_state_get(room_id, &event_type.to_string().into(), state_key)?
services()
.rooms
.state_accessor
.room_state_get(room_id, &event_type.to_string().into(), state_key)?
{
unsigned.insert(
"prev_content".to_owned(),
@ -626,13 +720,18 @@ impl Service {
event_id: ruma::event_id!("$thiswillbefilledinlater").into(),
room_id: room_id.to_owned(),
sender: sender.to_owned(),
origin_server_ts: utils::millis_since_unix_epoch().try_into().expect("time is valid"),
origin_server_ts: utils::millis_since_unix_epoch()
.try_into()
.expect("time is valid"),
kind: event_type,
content,
state_key,
prev_events,
depth,
auth_events: auth_events.values().map(|pdu| pdu.event_id.clone()).collect(),
auth_events: auth_events
.values()
.map(|pdu| pdu.event_id.clone())
.collect(),
redacts,
unsigned: if unsigned.is_empty() {
None
@ -710,7 +809,10 @@ impl Service {
);
// Generate short event id
let _shorteventid = services().rooms.short.get_or_create_shorteventid(&pdu.event_id)?;
let _shorteventid = services()
.rooms
.short
.get_or_create_shorteventid(&pdu.event_id)?;
Ok((pdu, pdu_json))
}
@ -744,7 +846,10 @@ impl Service {
membership: MembershipState,
}
let target = pdu.state_key().filter(|v| v.starts_with('@')).unwrap_or(sender.as_str());
let target = pdu
.state_key()
.filter(|v| v.starts_with('@'))
.unwrap_or(sender.as_str());
let server_name = services().globals.server_name();
let server_user = format!("@conduit:{server_name}");
let content = serde_json::from_str::<ExtractMembership>(pdu.content.get())
@ -825,16 +930,25 @@ impl Service {
// We set the room state after inserting the pdu, so that we never have a moment
// in time where events in the current room state do not exist
services().rooms.state.set_room_state(room_id, statehashid, state_lock)?;
services()
.rooms
.state
.set_room_state(room_id, statehashid, state_lock)?;
let mut servers: HashSet<OwnedServerName> =
services().rooms.state_cache.room_servers(room_id).filter_map(Result::ok).collect();
let mut servers: HashSet<OwnedServerName> = services()
.rooms
.state_cache
.room_servers(room_id)
.filter_map(Result::ok)
.collect();
// In case we are kicking or banning a user, we need to inform their server of
// the change
if pdu.kind == TimelineEventType::RoomMember {
if let Some(state_key_uid) =
&pdu.state_key.as_ref().and_then(|state_key| UserId::parse(state_key.as_str()).ok())
if let Some(state_key_uid) = &pdu
.state_key
.as_ref()
.and_then(|state_key| UserId::parse(state_key.as_str()).ok())
{
servers.insert(state_key_uid.server_name().to_owned());
}
@ -864,15 +978,28 @@ impl Service {
// We append to state before appending the pdu, so we don't have a moment in
// time with the pdu without it's state. This is okay because append_pdu can't
// fail.
services().rooms.state.set_event_state(&pdu.event_id, &pdu.room_id, state_ids_compressed)?;
services()
.rooms
.state
.set_event_state(&pdu.event_id, &pdu.room_id, state_ids_compressed)?;
if soft_fail {
services().rooms.pdu_metadata.mark_as_referenced(&pdu.room_id, &pdu.prev_events)?;
services().rooms.state.set_forward_extremities(&pdu.room_id, new_room_leaves, state_lock)?;
services()
.rooms
.pdu_metadata
.mark_as_referenced(&pdu.room_id, &pdu.prev_events)?;
services()
.rooms
.state
.set_forward_extremities(&pdu.room_id, new_room_leaves, state_lock)?;
return Ok(None);
}
let pdu_id = services().rooms.timeline.append_pdu(pdu, pdu_json, new_room_leaves, state_lock).await?;
let pdu_id = services()
.rooms
.timeline
.append_pdu(pdu, pdu_json, new_room_leaves, state_lock)
.await?;
Ok(Some(pdu_id))
}
@ -908,8 +1035,9 @@ impl Service {
pub fn redact_pdu(&self, event_id: &EventId, reason: &PduEvent) -> Result<()> {
// TODO: Don't reserialize, keep original json
if let Some(pdu_id) = self.get_pdu_id(event_id)? {
let mut pdu =
self.get_pdu_from_id(&pdu_id)?.ok_or_else(|| Error::bad_database("PDU ID points to invalid PDU."))?;
let mut pdu = self
.get_pdu_from_id(&pdu_id)?
.ok_or_else(|| Error::bad_database("PDU ID points to invalid PDU."))?;
let room_version_id = services().rooms.state.get_room_version(&pdu.room_id)?;
pdu.redact(room_version_id, reason)?;
self.replace_pdu(
@ -927,8 +1055,10 @@ impl Service {
#[tracing::instrument(skip(self, room_id))]
pub async fn backfill_if_required(&self, room_id: &RoomId, from: PduCount) -> Result<()> {
let first_pdu =
self.all_pdus(user_id!("@doesntmatter:conduit.rs"), room_id)?.next().expect("Room is not empty")?;
let first_pdu = self
.all_pdus(user_id!("@doesntmatter:conduit.rs"), room_id)?
.next()
.expect("Room is not empty")?;
if first_pdu.0 < from {
// No backfill required, there are still events between them
@ -938,7 +1068,11 @@ impl Service {
let mut servers: Vec<&ServerName> = vec![];
// add server names from room aliases on the room ID
let room_aliases = services().rooms.alias.local_aliases_for_room(room_id).collect::<Result<Vec<_>, _>>();
let room_aliases = services()
.rooms
.alias
.local_aliases_for_room(room_id)
.collect::<Result<Vec<_>, _>>();
if let Ok(aliases) = &room_aliases {
for alias in aliases {
if alias.server_name() != services().globals.server_name() {
@ -979,8 +1113,10 @@ impl Service {
}
// don't backfill from ourselves (might be noop if we checked it above already)
if let Some(server_index) =
servers.clone().into_iter().position(|server| server == services().globals.server_name())
if let Some(server_index) = servers
.clone()
.into_iter()
.position(|server| server == services().globals.server_name())
{
servers.remove(server_index);
}
@ -1030,8 +1166,15 @@ impl Service {
let (event_id, value, room_id) = server_server::parse_incoming_pdu(&pdu)?;
// Lock so we cannot backfill the same pdu twice at the same time
let mutex =
Arc::clone(services().globals.roomid_mutex_federation.write().await.entry(room_id.clone()).or_default());
let mutex = Arc::clone(
services()
.globals
.roomid_mutex_federation
.write()
.await
.entry(room_id.clone())
.or_default(),
);
let mutex_lock = mutex.lock().await;
// Skip the PDU if we already have it as a timeline event
@ -1040,7 +1183,11 @@ impl Service {
return Ok(());
}
services().rooms.event_handler.fetch_required_signing_keys([&value], pub_key_map).await?;
services()
.rooms
.event_handler
.fetch_required_signing_keys([&value], pub_key_map)
.await?;
services()
.rooms
@ -1051,10 +1198,21 @@ impl Service {
let value = self.get_pdu_json(&event_id)?.expect("We just created it");
let pdu = self.get_pdu(&event_id)?.expect("We just created it");
let shortroomid = services().rooms.short.get_shortroomid(&room_id)?.expect("room exists");
let shortroomid = services()
.rooms
.short
.get_shortroomid(&room_id)?
.expect("room exists");
let mutex_insert =
Arc::clone(services().globals.roomid_mutex_insert.write().await.entry(room_id.clone()).or_default());
let mutex_insert = Arc::clone(
services()
.globals
.roomid_mutex_insert
.write()
.await
.entry(room_id.clone())
.or_default(),
);
let insert_lock = mutex_insert.lock().await;
let count = services().globals.next_count()?;
@ -1077,7 +1235,10 @@ impl Service {
.map_err(|_| Error::bad_database("Invalid content in pdu."))?;
if let Some(body) = content.body {
services().rooms.search.index_pdu(shortroomid, &pdu_id, &body)?;
services()
.rooms
.search
.index_pdu(shortroomid, &pdu_id, &body)?;
}
}
drop(mutex_lock);

View file

@ -27,7 +27,8 @@ impl Service {
}
pub fn associate_token_shortstatehash(&self, room_id: &RoomId, token: u64, shortstatehash: u64) -> Result<()> {
self.db.associate_token_shortstatehash(room_id, token, shortstatehash)
self.db
.associate_token_shortstatehash(room_id, token, shortstatehash)
}
pub fn get_token_shortstatehash(&self, room_id: &RoomId, token: u64) -> Result<Option<u64>> {