make pdu stuff async, remove unnecessary db version check

Signed-off-by: strawberry <strawberry@pupbrain.dev>
This commit is contained in:
strawberry 2023-11-25 18:29:38 -05:00
parent 4d7b5eb759
commit 6958c720d0
14 changed files with 805 additions and 645 deletions

View file

@ -208,7 +208,10 @@ pub async fn kick_user_route(
); );
let state_lock = mutex_state.lock().await; let state_lock = mutex_state.lock().await;
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomMember, event_type: TimelineEventType::RoomMember,
content: to_raw_value(&event).expect("event is valid, we just created it"), content: to_raw_value(&event).expect("event is valid, we just created it"),
@ -219,7 +222,8 @@ pub async fn kick_user_route(
sender_user, sender_user,
&body.room_id, &body.room_id,
&state_lock, &state_lock,
)?; )
.await?;
drop(state_lock); drop(state_lock);
@ -272,7 +276,10 @@ pub async fn ban_user_route(body: Ruma<ban_user::v3::Request>) -> Result<ban_use
); );
let state_lock = mutex_state.lock().await; let state_lock = mutex_state.lock().await;
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomMember, event_type: TimelineEventType::RoomMember,
content: to_raw_value(&event).expect("event is valid, we just created it"), content: to_raw_value(&event).expect("event is valid, we just created it"),
@ -283,7 +290,8 @@ pub async fn ban_user_route(body: Ruma<ban_user::v3::Request>) -> Result<ban_use
sender_user, sender_user,
&body.room_id, &body.room_id,
&state_lock, &state_lock,
)?; )
.await?;
drop(state_lock); drop(state_lock);
@ -330,7 +338,10 @@ pub async fn unban_user_route(
); );
let state_lock = mutex_state.lock().await; let state_lock = mutex_state.lock().await;
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomMember, event_type: TimelineEventType::RoomMember,
content: to_raw_value(&event).expect("event is valid, we just created it"), content: to_raw_value(&event).expect("event is valid, we just created it"),
@ -341,7 +352,8 @@ pub async fn unban_user_route(
sender_user, sender_user,
&body.room_id, &body.room_id,
&state_lock, &state_lock,
)?; )
.await?;
drop(state_lock); drop(state_lock);
@ -770,12 +782,16 @@ async fn join_room_by_id_helper(
let statehash_after_join = services().rooms.state.append_to_state(&parsed_join_pdu)?; let statehash_after_join = services().rooms.state.append_to_state(&parsed_join_pdu)?;
info!("Appending new room join event"); info!("Appending new room join event");
services().rooms.timeline.append_pdu( services()
.rooms
.timeline
.append_pdu(
&parsed_join_pdu, &parsed_join_pdu,
join_event, join_event,
vec![(*parsed_join_pdu.event_id).to_owned()], vec![(*parsed_join_pdu.event_id).to_owned()],
&state_lock, &state_lock,
)?; )
.await?;
info!("Setting final room state for new room"); info!("Setting final room state for new room");
// We set the room state after inserting the pdu, so that we never have a moment in time // We set the room state after inserting the pdu, so that we never have a moment in time
@ -888,7 +904,10 @@ async fn join_room_by_id_helper(
}; };
// Try normal join first // Try normal join first
let error = match services().rooms.timeline.build_and_append_pdu( let error = match services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomMember, event_type: TimelineEventType::RoomMember,
content: to_raw_value(&event).expect("event is valid, we just created it"), content: to_raw_value(&event).expect("event is valid, we just created it"),
@ -899,7 +918,9 @@ async fn join_room_by_id_helper(
sender_user, sender_user,
room_id, room_id,
&state_lock, &state_lock,
) { )
.await
{
Ok(_event_id) => return Ok(join_room_by_id::v3::Response::new(room_id.to_owned())), Ok(_event_id) => return Ok(join_room_by_id::v3::Response::new(room_id.to_owned())),
Err(e) => e, Err(e) => e,
}; };
@ -1315,7 +1336,10 @@ pub(crate) async fn invite_helper<'a>(
); );
let state_lock = mutex_state.lock().await; let state_lock = mutex_state.lock().await;
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomMember, event_type: TimelineEventType::RoomMember,
content: to_raw_value(&RoomMemberEventContent { content: to_raw_value(&RoomMemberEventContent {
@ -1336,7 +1360,8 @@ pub(crate) async fn invite_helper<'a>(
sender_user, sender_user,
room_id, room_id,
&state_lock, &state_lock,
)?; )
.await?;
drop(state_lock); drop(state_lock);
@ -1390,14 +1415,18 @@ pub async fn leave_room(user_id: &UserId, room_id: &RoomId, reason: Option<Strin
)?; )?;
// We always drop the invite, we can't rely on other servers // We always drop the invite, we can't rely on other servers
services().rooms.state_cache.update_membership( services()
.rooms
.state_cache
.update_membership(
room_id, room_id,
user_id, user_id,
MembershipState::Leave, MembershipState::Leave,
user_id, user_id,
last_state, last_state,
true, true,
)?; )
.await?;
} else { } else {
let mutex_state = Arc::clone( let mutex_state = Arc::clone(
services() services()
@ -1421,14 +1450,18 @@ pub async fn leave_room(user_id: &UserId, room_id: &RoomId, reason: Option<Strin
None => { None => {
error!("Trying to leave a room you are not a member of."); error!("Trying to leave a room you are not a member of.");
services().rooms.state_cache.update_membership( services()
.rooms
.state_cache
.update_membership(
room_id, room_id,
user_id, user_id,
MembershipState::Leave, MembershipState::Leave,
user_id, user_id,
None, None,
true, true,
)?; )
.await?;
return Ok(()); return Ok(());
} }
Some(e) => e, Some(e) => e,
@ -1440,7 +1473,10 @@ pub async fn leave_room(user_id: &UserId, room_id: &RoomId, reason: Option<Strin
event.membership = MembershipState::Leave; event.membership = MembershipState::Leave;
event.reason = reason; event.reason = reason;
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomMember, event_type: TimelineEventType::RoomMember,
content: to_raw_value(&event).expect("event is valid, we just created it"), content: to_raw_value(&event).expect("event is valid, we just created it"),
@ -1451,7 +1487,8 @@ pub async fn leave_room(user_id: &UserId, room_id: &RoomId, reason: Option<Strin
user_id, user_id,
room_id, room_id,
&state_lock, &state_lock,
)?; )
.await?;
} }
Ok(()) Ok(())

View file

@ -73,7 +73,10 @@ pub async fn send_message_event_route(
let mut unsigned = BTreeMap::new(); let mut unsigned = BTreeMap::new();
unsigned.insert("transaction_id".to_owned(), body.txn_id.to_string().into()); unsigned.insert("transaction_id".to_owned(), body.txn_id.to_string().into());
let event_id = services().rooms.timeline.build_and_append_pdu( let event_id = services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: body.event_type.to_string().into(), event_type: body.event_type.to_string().into(),
content: serde_json::from_str(body.body.body.json().get()) content: serde_json::from_str(body.body.body.json().get())
@ -85,7 +88,8 @@ pub async fn send_message_event_route(
sender_user, sender_user,
&body.room_id, &body.room_id,
&state_lock, &state_lock,
)?; )
.await?;
services().transaction_ids.add_txnid( services().transaction_ids.add_txnid(
sender_user, sender_user,

View file

@ -30,7 +30,10 @@ pub async fn redact_event_route(
); );
let state_lock = mutex_state.lock().await; let state_lock = mutex_state.lock().await;
let event_id = services().rooms.timeline.build_and_append_pdu( let event_id = services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomRedaction, event_type: TimelineEventType::RoomRedaction,
content: to_raw_value(&RoomRedactionEventContent { content: to_raw_value(&RoomRedactionEventContent {
@ -45,7 +48,8 @@ pub async fn redact_event_route(
sender_user, sender_user,
&body.room_id, &body.room_id,
&state_lock, &state_lock,
)?; )
.await?;
drop(state_lock); drop(state_lock);

View file

@ -174,7 +174,10 @@ pub async fn create_room_route(
} }
// 1. The room create event // 1. The room create event
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomCreate, event_type: TimelineEventType::RoomCreate,
content: to_raw_value(&content).expect("event is valid, we just created it"), content: to_raw_value(&content).expect("event is valid, we just created it"),
@ -185,10 +188,14 @@ pub async fn create_room_route(
sender_user, sender_user,
&room_id, &room_id,
&state_lock, &state_lock,
)?; )
.await?;
// 2. Let the room creator join // 2. Let the room creator join
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomMember, event_type: TimelineEventType::RoomMember,
content: to_raw_value(&RoomMemberEventContent { content: to_raw_value(&RoomMemberEventContent {
@ -209,7 +216,8 @@ pub async fn create_room_route(
sender_user, sender_user,
&room_id, &room_id,
&state_lock, &state_lock,
)?; )
.await?;
// 3. Power levels // 3. Power levels
@ -246,7 +254,10 @@ pub async fn create_room_route(
} }
} }
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomPowerLevels, event_type: TimelineEventType::RoomPowerLevels,
content: to_raw_value(&power_levels_content) content: to_raw_value(&power_levels_content)
@ -258,11 +269,15 @@ pub async fn create_room_route(
sender_user, sender_user,
&room_id, &room_id,
&state_lock, &state_lock,
)?; )
.await?;
// 4. Canonical room alias // 4. Canonical room alias
if let Some(room_alias_id) = &alias { if let Some(room_alias_id) = &alias {
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomCanonicalAlias, event_type: TimelineEventType::RoomCanonicalAlias,
content: to_raw_value(&RoomCanonicalAliasEventContent { content: to_raw_value(&RoomCanonicalAliasEventContent {
@ -277,13 +292,17 @@ pub async fn create_room_route(
sender_user, sender_user,
&room_id, &room_id,
&state_lock, &state_lock,
)?; )
.await?;
} }
// 5. Events set by preset // 5. Events set by preset
// 5.1 Join Rules // 5.1 Join Rules
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomJoinRules, event_type: TimelineEventType::RoomJoinRules,
content: to_raw_value(&RoomJoinRulesEventContent::new(match preset { content: to_raw_value(&RoomJoinRulesEventContent::new(match preset {
@ -299,10 +318,14 @@ pub async fn create_room_route(
sender_user, sender_user,
&room_id, &room_id,
&state_lock, &state_lock,
)?; )
.await?;
// 5.2 History Visibility // 5.2 History Visibility
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomHistoryVisibility, event_type: TimelineEventType::RoomHistoryVisibility,
content: to_raw_value(&RoomHistoryVisibilityEventContent::new( content: to_raw_value(&RoomHistoryVisibilityEventContent::new(
@ -316,10 +339,14 @@ pub async fn create_room_route(
sender_user, sender_user,
&room_id, &room_id,
&state_lock, &state_lock,
)?; )
.await?;
// 5.3 Guest Access // 5.3 Guest Access
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomGuestAccess, event_type: TimelineEventType::RoomGuestAccess,
content: to_raw_value(&RoomGuestAccessEventContent::new(match preset { content: to_raw_value(&RoomGuestAccessEventContent::new(match preset {
@ -334,7 +361,8 @@ pub async fn create_room_route(
sender_user, sender_user,
&room_id, &room_id,
&state_lock, &state_lock,
)?; )
.await?;
// 6. Events listed in initial_state // 6. Events listed in initial_state
for event in &body.initial_state { for event in &body.initial_state {
@ -353,17 +381,19 @@ pub async fn create_room_route(
continue; continue;
} }
services().rooms.timeline.build_and_append_pdu( services()
pdu_builder, .rooms
sender_user, .timeline
&room_id, .build_and_append_pdu(pdu_builder, sender_user, &room_id, &state_lock)
&state_lock, .await?;
)?;
} }
// 7. Events implied by name and topic // 7. Events implied by name and topic
if let Some(name) = &body.name { if let Some(name) = &body.name {
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomName, event_type: TimelineEventType::RoomName,
content: to_raw_value(&RoomNameEventContent::new(name.clone())) content: to_raw_value(&RoomNameEventContent::new(name.clone()))
@ -375,11 +405,15 @@ pub async fn create_room_route(
sender_user, sender_user,
&room_id, &room_id,
&state_lock, &state_lock,
)?; )
.await?;
} }
if let Some(topic) = &body.topic { if let Some(topic) = &body.topic {
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomTopic, event_type: TimelineEventType::RoomTopic,
content: to_raw_value(&RoomTopicEventContent { content: to_raw_value(&RoomTopicEventContent {
@ -393,7 +427,8 @@ pub async fn create_room_route(
sender_user, sender_user,
&room_id, &room_id,
&state_lock, &state_lock,
)?; )
.await?;
} }
// 8. Events implied by invite (and TODO: invite_3pid) // 8. Events implied by invite (and TODO: invite_3pid)
@ -531,7 +566,10 @@ pub async fn upgrade_room_route(
// Send a m.room.tombstone event to the old room to indicate that it is not intended to be used any further // Send a m.room.tombstone event to the old room to indicate that it is not intended to be used any further
// Fail if the sender does not have the required permissions // Fail if the sender does not have the required permissions
let tombstone_event_id = services().rooms.timeline.build_and_append_pdu( let tombstone_event_id = services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomTombstone, event_type: TimelineEventType::RoomTombstone,
content: to_raw_value(&RoomTombstoneEventContent { content: to_raw_value(&RoomTombstoneEventContent {
@ -546,7 +584,8 @@ pub async fn upgrade_room_route(
sender_user, sender_user,
&body.room_id, &body.room_id,
&state_lock, &state_lock,
)?; )
.await?;
// Change lock to replacement room // Change lock to replacement room
drop(state_lock); drop(state_lock);
@ -613,7 +652,10 @@ pub async fn upgrade_room_route(
)); ));
} }
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomCreate, event_type: TimelineEventType::RoomCreate,
content: to_raw_value(&create_event_content) content: to_raw_value(&create_event_content)
@ -625,10 +667,14 @@ pub async fn upgrade_room_route(
sender_user, sender_user,
&replacement_room, &replacement_room,
&state_lock, &state_lock,
)?; )
.await?;
// Join the new room // Join the new room
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomMember, event_type: TimelineEventType::RoomMember,
content: to_raw_value(&RoomMemberEventContent { content: to_raw_value(&RoomMemberEventContent {
@ -649,7 +695,8 @@ pub async fn upgrade_room_route(
sender_user, sender_user,
&replacement_room, &replacement_room,
&state_lock, &state_lock,
)?; )
.await?;
// Recommended transferable state events list from the specs // Recommended transferable state events list from the specs
let transferable_state_events = vec![ let transferable_state_events = vec![
@ -676,7 +723,10 @@ pub async fn upgrade_room_route(
None => continue, // Skipping missing events. None => continue, // Skipping missing events.
}; };
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: event_type.to_string().into(), event_type: event_type.to_string().into(),
content: event_content, content: event_content,
@ -687,7 +737,8 @@ pub async fn upgrade_room_route(
sender_user, sender_user,
&replacement_room, &replacement_room,
&state_lock, &state_lock,
)?; )
.await?;
} }
// Moves any local aliases to the new room // Moves any local aliases to the new room
@ -721,7 +772,10 @@ pub async fn upgrade_room_route(
power_levels_event_content.invite = new_level; power_levels_event_content.invite = new_level;
// Modify the power levels in the old room to prevent sending of events and inviting new users // Modify the power levels in the old room to prevent sending of events and inviting new users
let _ = services().rooms.timeline.build_and_append_pdu( let _ = services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomPowerLevels, event_type: TimelineEventType::RoomPowerLevels,
content: to_raw_value(&power_levels_event_content) content: to_raw_value(&power_levels_event_content)
@ -733,7 +787,8 @@ pub async fn upgrade_room_route(
sender_user, sender_user,
&body.room_id, &body.room_id,
&state_lock, &state_lock,
)?; )
.await?;
drop(state_lock); drop(state_lock);

View file

@ -233,7 +233,10 @@ async fn send_state_event_for_key_helper(
); );
let state_lock = mutex_state.lock().await; let state_lock = mutex_state.lock().await;
let event_id = services().rooms.timeline.build_and_append_pdu( let event_id = services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: event_type.to_string().into(), event_type: event_type.to_string().into(),
content: serde_json::from_str(json.json().get()).expect("content is valid json"), content: serde_json::from_str(json.json().get()).expect("content is valid json"),
@ -244,7 +247,8 @@ async fn send_state_event_for_key_helper(
sender_user, sender_user,
room_id, room_id,
&state_lock, &state_lock,
)?; )
.await?;
Ok(event_id) Ok(event_id)
} }

View file

@ -282,7 +282,7 @@ async fn sync_helper(
.entry(room_id.clone()) .entry(room_id.clone())
.or_default(), .or_default(),
); );
let insert_lock = mutex_insert.lock().unwrap(); let insert_lock = mutex_insert.lock().await;
drop(insert_lock); drop(insert_lock);
} }
@ -414,7 +414,7 @@ async fn sync_helper(
.entry(room_id.clone()) .entry(room_id.clone())
.or_default(), .or_default(),
); );
let insert_lock = mutex_insert.lock().unwrap(); let insert_lock = mutex_insert.lock().await;
drop(insert_lock); drop(insert_lock);
} }
@ -599,7 +599,7 @@ async fn load_joined_room(
.entry(room_id.to_owned()) .entry(room_id.to_owned())
.or_default(), .or_default(),
); );
let insert_lock = mutex_insert.lock().unwrap(); let insert_lock = mutex_insert.lock().await;
drop(insert_lock); drop(insert_lock);
} }

View file

@ -1870,14 +1870,18 @@ pub async fn create_invite_route(
.state_cache .state_cache
.server_in_room(services().globals.server_name(), &body.room_id)? .server_in_room(services().globals.server_name(), &body.room_id)?
{ {
services().rooms.state_cache.update_membership( services()
.rooms
.state_cache
.update_membership(
&body.room_id, &body.room_id,
&invited_user, &invited_user,
MembershipState::Invite, MembershipState::Invite,
&sender, &sender,
Some(invite_state), Some(invite_state),
true, true,
)?; )
.await?;
} }
Ok(create_invite::v2::Response { Ok(create_invite::v2::Response {

View file

@ -226,26 +226,6 @@ impl Service {
.expect("Database data for admin room alias must be valid") .expect("Database data for admin room alias must be valid")
.expect("Admin room must exist"); .expect("Admin room must exist");
let send_message = |message: RoomMessageEventContent, mutex_lock: &MutexGuard<'_, ()>| {
services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder {
event_type: TimelineEventType::RoomMessage,
content: to_raw_value(&message)
.expect("event is valid, we just created it"),
unsigned: None,
state_key: None,
redacts: None,
},
&conduit_user,
&conduit_room,
mutex_lock,
)
.unwrap();
};
loop { loop {
tokio::select! { tokio::select! {
Some(event) = receiver.recv() => { Some(event) = receiver.recv() => {
@ -265,7 +245,21 @@ impl Service {
let state_lock = mutex_state.lock().await; let state_lock = mutex_state.lock().await;
send_message(message_content, &state_lock); services().rooms.timeline.build_and_append_pdu(
PduBuilder {
event_type: TimelineEventType::RoomMessage,
content: to_raw_value(&message_content)
.expect("event is valid, we just created it"),
unsigned: None,
state_key: None,
redacts: None,
},
&conduit_user,
&conduit_room,
&state_lock)
.await
.unwrap();
drop(state_lock); drop(state_lock);
} }
@ -938,7 +932,10 @@ impl Service {
content.room_version = services().globals.default_room_version(); content.room_version = services().globals.default_room_version();
// 1. The room create event // 1. The room create event
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomCreate, event_type: TimelineEventType::RoomCreate,
content: to_raw_value(&content).expect("event is valid, we just created it"), content: to_raw_value(&content).expect("event is valid, we just created it"),
@ -949,10 +946,14 @@ impl Service {
&conduit_user, &conduit_user,
&room_id, &room_id,
&state_lock, &state_lock,
)?; )
.await?;
// 2. Make conduit bot join // 2. Make conduit bot join
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomMember, event_type: TimelineEventType::RoomMember,
content: to_raw_value(&RoomMemberEventContent { content: to_raw_value(&RoomMemberEventContent {
@ -973,13 +974,17 @@ impl Service {
&conduit_user, &conduit_user,
&room_id, &room_id,
&state_lock, &state_lock,
)?; )
.await?;
// 3. Power levels // 3. Power levels
let mut users = BTreeMap::new(); let mut users = BTreeMap::new();
users.insert(conduit_user.clone(), 100.into()); users.insert(conduit_user.clone(), 100.into());
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomPowerLevels, event_type: TimelineEventType::RoomPowerLevels,
content: to_raw_value(&RoomPowerLevelsEventContent { content: to_raw_value(&RoomPowerLevelsEventContent {
@ -994,10 +999,14 @@ impl Service {
&conduit_user, &conduit_user,
&room_id, &room_id,
&state_lock, &state_lock,
)?; )
.await?;
// 4.1 Join Rules // 4.1 Join Rules
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomJoinRules, event_type: TimelineEventType::RoomJoinRules,
content: to_raw_value(&RoomJoinRulesEventContent::new(JoinRule::Invite)) content: to_raw_value(&RoomJoinRulesEventContent::new(JoinRule::Invite))
@ -1009,10 +1018,14 @@ impl Service {
&conduit_user, &conduit_user,
&room_id, &room_id,
&state_lock, &state_lock,
)?; )
.await?;
// 4.2 History Visibility // 4.2 History Visibility
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomHistoryVisibility, event_type: TimelineEventType::RoomHistoryVisibility,
content: to_raw_value(&RoomHistoryVisibilityEventContent::new( content: to_raw_value(&RoomHistoryVisibilityEventContent::new(
@ -1026,13 +1039,19 @@ impl Service {
&conduit_user, &conduit_user,
&room_id, &room_id,
&state_lock, &state_lock,
)?; )
.await?;
// 4.3 Guest Access // 4.3 Guest Access
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomGuestAccess, event_type: TimelineEventType::RoomGuestAccess,
content: to_raw_value(&RoomGuestAccessEventContent::new(GuestAccess::Forbidden)) content: to_raw_value(&RoomGuestAccessEventContent::new(
GuestAccess::Forbidden,
))
.expect("event is valid, we just created it"), .expect("event is valid, we just created it"),
unsigned: None, unsigned: None,
state_key: Some("".to_owned()), state_key: Some("".to_owned()),
@ -1041,11 +1060,15 @@ impl Service {
&conduit_user, &conduit_user,
&room_id, &room_id,
&state_lock, &state_lock,
)?; )
.await?;
// 5. Events implied by name and topic // 5. Events implied by name and topic
let room_name = format!("{} Admin Room", services().globals.server_name()); let room_name = format!("{} Admin Room", services().globals.server_name());
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomName, event_type: TimelineEventType::RoomName,
content: to_raw_value(&RoomNameEventContent::new(room_name)) content: to_raw_value(&RoomNameEventContent::new(room_name))
@ -1057,9 +1080,13 @@ impl Service {
&conduit_user, &conduit_user,
&room_id, &room_id,
&state_lock, &state_lock,
)?; )
.await?;
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomTopic, event_type: TimelineEventType::RoomTopic,
content: to_raw_value(&RoomTopicEventContent { content: to_raw_value(&RoomTopicEventContent {
@ -1073,14 +1100,18 @@ impl Service {
&conduit_user, &conduit_user,
&room_id, &room_id,
&state_lock, &state_lock,
)?; )
.await?;
// 6. Room alias // 6. Room alias
let alias: OwnedRoomAliasId = format!("#admins:{}", services().globals.server_name()) let alias: OwnedRoomAliasId = format!("#admins:{}", services().globals.server_name())
.try_into() .try_into()
.expect("#admins:server_name is a valid alias name"); .expect("#admins:server_name is a valid alias name");
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomCanonicalAlias, event_type: TimelineEventType::RoomCanonicalAlias,
content: to_raw_value(&RoomCanonicalAliasEventContent { content: to_raw_value(&RoomCanonicalAliasEventContent {
@ -1095,7 +1126,8 @@ impl Service {
&conduit_user, &conduit_user,
&room_id, &room_id,
&state_lock, &state_lock,
)?; )
.await?;
services().rooms.alias.set_alias(&alias, &room_id)?; services().rooms.alias.set_alias(&alias, &room_id)?;
@ -1137,7 +1169,10 @@ impl Service {
.expect("@conduit:server_name is valid"); .expect("@conduit:server_name is valid");
// Invite and join the real user // Invite and join the real user
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomMember, event_type: TimelineEventType::RoomMember,
content: to_raw_value(&RoomMemberEventContent { content: to_raw_value(&RoomMemberEventContent {
@ -1158,8 +1193,12 @@ impl Service {
&conduit_user, &conduit_user,
&room_id, &room_id,
&state_lock, &state_lock,
)?; )
services().rooms.timeline.build_and_append_pdu( .await?;
services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomMember, event_type: TimelineEventType::RoomMember,
content: to_raw_value(&RoomMemberEventContent { content: to_raw_value(&RoomMemberEventContent {
@ -1180,14 +1219,18 @@ impl Service {
user_id, user_id,
&room_id, &room_id,
&state_lock, &state_lock,
)?; )
.await?;
// Set power level // Set power level
let mut users = BTreeMap::new(); let mut users = BTreeMap::new();
users.insert(conduit_user.to_owned(), 100.into()); users.insert(conduit_user.to_owned(), 100.into());
users.insert(user_id.to_owned(), 100.into()); users.insert(user_id.to_owned(), 100.into());
services().rooms.timeline.build_and_append_pdu( services()
.rooms
.timeline
.build_and_append_pdu(
PduBuilder { PduBuilder {
event_type: TimelineEventType::RoomPowerLevels, event_type: TimelineEventType::RoomPowerLevels,
content: to_raw_value(&RoomPowerLevelsEventContent { content: to_raw_value(&RoomPowerLevelsEventContent {
@ -1202,7 +1245,8 @@ impl Service {
&conduit_user, &conduit_user,
&room_id, &room_id,
&state_lock, &state_lock,
)?; )
.await?;
// Send welcome message // Send welcome message
services().rooms.timeline.build_and_append_pdu( services().rooms.timeline.build_and_append_pdu(
@ -1220,7 +1264,7 @@ impl Service {
&conduit_user, &conduit_user,
&room_id, &room_id,
&state_lock, &state_lock,
)?; ).await?;
Ok(()) Ok(())
} }

View file

@ -69,7 +69,7 @@ pub struct Service {
pub bad_query_ratelimiter: Arc<RwLock<HashMap<OwnedServerName, RateLimitState>>>, pub bad_query_ratelimiter: Arc<RwLock<HashMap<OwnedServerName, RateLimitState>>>,
pub servername_ratelimiter: Arc<RwLock<HashMap<OwnedServerName, Arc<Semaphore>>>>, pub servername_ratelimiter: Arc<RwLock<HashMap<OwnedServerName, Arc<Semaphore>>>>,
pub sync_receivers: RwLock<HashMap<(OwnedUserId, OwnedDeviceId), SyncHandle>>, pub sync_receivers: RwLock<HashMap<(OwnedUserId, OwnedDeviceId), SyncHandle>>,
pub roomid_mutex_insert: RwLock<HashMap<OwnedRoomId, Arc<Mutex<()>>>>, pub roomid_mutex_insert: RwLock<HashMap<OwnedRoomId, Arc<TokioMutex<()>>>>,
pub roomid_mutex_state: RwLock<HashMap<OwnedRoomId, Arc<TokioMutex<()>>>>, pub roomid_mutex_state: RwLock<HashMap<OwnedRoomId, Arc<TokioMutex<()>>>>,
pub roomid_mutex_federation: RwLock<HashMap<OwnedRoomId, Arc<TokioMutex<()>>>>, // this lock will be held longer pub roomid_mutex_federation: RwLock<HashMap<OwnedRoomId, Arc<TokioMutex<()>>>>, // this lock will be held longer
pub roomid_federationhandletime: RwLock<HashMap<OwnedRoomId, (OwnedEventId, Instant)>>, pub roomid_federationhandletime: RwLock<HashMap<OwnedRoomId, (OwnedEventId, Instant)>>,
@ -452,9 +452,6 @@ impl Service {
/// new SHA256 file name media function, requires "sha256_media" feature flag enabled and database migrated /// new SHA256 file name media function, requires "sha256_media" feature flag enabled and database migrated
/// uses SHA256 hash of the base64 key as the file name /// uses SHA256 hash of the base64 key as the file name
pub fn get_media_file_new(&self, key: &[u8]) -> PathBuf { pub fn get_media_file_new(&self, key: &[u8]) -> PathBuf {
if services().globals.database_version().unwrap() < 14 {
error!("Using SHA256 key file names requires database to be migrated.")
}
let mut r = PathBuf::new(); let mut r = PathBuf::new();
r.push(self.config.database_path.clone()); r.push(self.config.database_path.clone());
r.push("media"); r.push("media");

View file

@ -860,14 +860,18 @@ impl Service {
debug!("Starting soft fail auth check"); debug!("Starting soft fail auth check");
if soft_fail { if soft_fail {
services().rooms.timeline.append_incoming_pdu( services()
.rooms
.timeline
.append_incoming_pdu(
&incoming_pdu, &incoming_pdu,
val, val,
extremities.iter().map(|e| (**e).to_owned()).collect(), extremities.iter().map(|e| (**e).to_owned()).collect(),
state_ids_compressed, state_ids_compressed,
soft_fail, soft_fail,
&state_lock, &state_lock,
)?; )
.await?;
// Soft fail, we keep the event as an outlier but don't add it to the timeline // Soft fail, we keep the event as an outlier but don't add it to the timeline
warn!("Event was soft failed: {:?}", incoming_pdu); warn!("Event was soft failed: {:?}", incoming_pdu);
@ -888,14 +892,18 @@ impl Service {
// We use the `state_at_event` instead of `state_after` so we accurately // We use the `state_at_event` instead of `state_after` so we accurately
// represent the state for this event. // represent the state for this event.
let pdu_id = services().rooms.timeline.append_incoming_pdu( let pdu_id = services()
.rooms
.timeline
.append_incoming_pdu(
&incoming_pdu, &incoming_pdu,
val, val,
extremities.iter().map(|e| (**e).to_owned()).collect(), extremities.iter().map(|e| (**e).to_owned()).collect(),
state_ids_compressed, state_ids_compressed,
soft_fail, soft_fail,
&state_lock, &state_lock,
)?; )
.await?;
debug!("Appended incoming pdu"); debug!("Appended incoming pdu");

View file

@ -205,7 +205,7 @@ impl Service {
) )
.await .await
{ {
warn!("Got response from {server} for /hierarchy\n{response:?}"); debug!("Got response from {server} for /hierarchy\n{response:?}");
let chunk = SpaceHierarchyRoomsChunk { let chunk = SpaceHierarchyRoomsChunk {
canonical_alias: response.room.canonical_alias, canonical_alias: response.room.canonical_alias,
name: response.room.name, name: response.room.name,

View file

@ -80,14 +80,11 @@ impl Service {
Err(_) => continue, Err(_) => continue,
}; };
services().rooms.state_cache.update_membership( services()
room_id, .rooms
&user_id, .state_cache
membership, .update_membership(room_id, &user_id, membership, &pdu.sender, None, false)
&pdu.sender, .await?;
None,
false,
)?;
} }
TimelineEventType::SpaceChild => { TimelineEventType::SpaceChild => {
services() services()

View file

@ -25,7 +25,7 @@ pub struct Service {
impl Service { impl Service {
/// Update current membership data. /// Update current membership data.
#[tracing::instrument(skip(self, last_state))] #[tracing::instrument(skip(self, last_state))]
pub fn update_membership( pub async fn update_membership(
&self, &self,
room_id: &RoomId, room_id: &RoomId,
user_id: &UserId, user_id: &UserId,

View file

@ -213,7 +213,7 @@ impl Service {
/// ///
/// Returns pdu id /// Returns pdu id
#[tracing::instrument(skip(self, pdu, pdu_json, leaves))] #[tracing::instrument(skip(self, pdu, pdu_json, leaves))]
pub fn append_pdu<'a>( pub async fn append_pdu<'a>(
&self, &self,
pdu: &PduEvent, pdu: &PduEvent,
mut pdu_json: CanonicalJsonObject, mut pdu_json: CanonicalJsonObject,
@ -279,7 +279,7 @@ impl Service {
.entry(pdu.room_id.clone()) .entry(pdu.room_id.clone())
.or_default(), .or_default(),
); );
let insert_lock = mutex_insert.lock().unwrap(); let insert_lock = mutex_insert.lock().await;
let count1 = services().globals.next_count()?; let count1 = services().globals.next_count()?;
// Mark as read first so the sending client doesn't get a notification even if appending // Mark as read first so the sending client doesn't get a notification even if appending
@ -422,14 +422,18 @@ impl Service {
// Update our membership info, we do this here incase a user is invited // Update our membership info, we do this here incase a user is invited
// and immediately leaves we need the DB to record the invite event for auth // and immediately leaves we need the DB to record the invite event for auth
services().rooms.state_cache.update_membership( services()
.rooms
.state_cache
.update_membership(
&pdu.room_id, &pdu.room_id,
&target_user_id, &target_user_id,
content.membership, content.membership,
&pdu.sender, &pdu.sender,
invite_state, invite_state,
true, true,
)?; )
.await?;
} }
} }
TimelineEventType::RoomMessage => { TimelineEventType::RoomMessage => {
@ -655,7 +659,7 @@ impl Service {
.as_ref() .as_ref()
.map(|create_event| { .map(|create_event| {
serde_json::from_str(create_event.content.get()).map_err(|e| { serde_json::from_str(create_event.content.get()).map_err(|e| {
warn!("Invalid create event: {}", e); warn!("Invalid database create event: {}", e);
Error::bad_database("Invalid create event in db.") Error::bad_database("Invalid create event in db.")
}) })
}) })
@ -809,7 +813,7 @@ impl Service {
/// Creates a new persisted data unit and adds it to a room. This function takes a /// Creates a new persisted data unit and adds it to a room. This function takes a
/// roomid_mutex_state, meaning that only this function is able to mutate the room state. /// roomid_mutex_state, meaning that only this function is able to mutate the room state.
#[tracing::instrument(skip(self, state_lock))] #[tracing::instrument(skip(self, state_lock))]
pub fn build_and_append_pdu( pub async fn build_and_append_pdu(
&self, &self,
pdu_builder: PduBuilder, pdu_builder: PduBuilder,
sender: &UserId, sender: &UserId,
@ -909,14 +913,16 @@ impl Service {
// pdu without it's state. This is okay because append_pdu can't fail. // pdu without it's state. This is okay because append_pdu can't fail.
let statehashid = services().rooms.state.append_to_state(&pdu)?; let statehashid = services().rooms.state.append_to_state(&pdu)?;
let pdu_id = self.append_pdu( let pdu_id = self
.append_pdu(
&pdu, &pdu,
pdu_json, pdu_json,
// Since this PDU references all pdu_leaves we can update the leaves // Since this PDU references all pdu_leaves we can update the leaves
// of the room // of the room
vec![(*pdu.event_id).to_owned()], vec![(*pdu.event_id).to_owned()],
state_lock, state_lock,
)?; )
.await?;
// We set the room state after inserting the pdu, so that we never have a moment in time // We set the room state after inserting the pdu, so that we never have a moment in time
// where events in the current room state do not exist // where events in the current room state do not exist
@ -954,7 +960,7 @@ impl Service {
/// Append the incoming event setting the state snapshot to the state from the /// Append the incoming event setting the state snapshot to the state from the
/// server that sent the event. /// server that sent the event.
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
pub fn append_incoming_pdu<'a>( pub async fn append_incoming_pdu<'a>(
&self, &self,
pdu: &PduEvent, pdu: &PduEvent,
pdu_json: CanonicalJsonObject, pdu_json: CanonicalJsonObject,
@ -984,11 +990,11 @@ impl Service {
return Ok(None); return Ok(None);
} }
let pdu_id = let pdu_id = services()
services()
.rooms .rooms
.timeline .timeline
.append_pdu(pdu, pdu_json, new_room_leaves, state_lock)?; .append_pdu(pdu, pdu_json, new_room_leaves, state_lock)
.await?;
Ok(Some(pdu_id)) Ok(Some(pdu_id))
} }
@ -1169,7 +1175,7 @@ impl Service {
.entry(room_id.clone()) .entry(room_id.clone())
.or_default(), .or_default(),
); );
let insert_lock = mutex_insert.lock().unwrap(); let insert_lock = mutex_insert.lock().await;
let count = services().globals.next_count()?; let count = services().globals.next_count()?;
let mut pdu_id = shortroomid.to_be_bytes().to_vec(); let mut pdu_id = shortroomid.to_be_bytes().to_vec();