Clean up (mostly automated with cargo clippy --fix)
This commit is contained in:
parent
979ec6b4fa
commit
d68c93b5fa
36 changed files with 364 additions and 393 deletions
|
@ -150,7 +150,7 @@ where
|
|||
} else {
|
||||
write_destination_to_cache = true;
|
||||
|
||||
let result = find_actual_destination(globals, &destination).await;
|
||||
let result = find_actual_destination(globals, destination).await;
|
||||
|
||||
(result.0, result.1.into_uri_string())
|
||||
};
|
||||
|
@ -359,7 +359,7 @@ async fn find_actual_destination(
|
|||
let (host, port) = destination_str.split_at(pos);
|
||||
FedDest::Named(host.to_string(), port.to_string())
|
||||
} else {
|
||||
match request_well_known(globals, &destination.as_str()).await {
|
||||
match request_well_known(globals, destination.as_str()).await {
|
||||
// 3: A .well-known file is available
|
||||
Some(delegated_hostname) => {
|
||||
hostname = add_port_to_hostname(&delegated_hostname).into_uri_string();
|
||||
|
@ -806,7 +806,7 @@ pub async fn send_transaction_message_route(
|
|||
.event_ids
|
||||
.iter()
|
||||
.filter_map(|id| {
|
||||
db.rooms.get_pdu_count(&id).ok().flatten().map(|r| (id, r))
|
||||
db.rooms.get_pdu_count(id).ok().flatten().map(|r| (id, r))
|
||||
})
|
||||
.max_by_key(|(_, count)| *count)
|
||||
{
|
||||
|
@ -875,8 +875,8 @@ pub async fn send_transaction_message_route(
|
|||
DeviceIdOrAllDevices::DeviceId(target_device_id) => {
|
||||
db.users.add_to_device_event(
|
||||
&sender,
|
||||
&target_user_id,
|
||||
&target_device_id,
|
||||
target_user_id,
|
||||
target_device_id,
|
||||
&ev_type.to_string(),
|
||||
event.deserialize_as().map_err(|_| {
|
||||
Error::BadRequest(
|
||||
|
@ -889,10 +889,10 @@ pub async fn send_transaction_message_route(
|
|||
}
|
||||
|
||||
DeviceIdOrAllDevices::AllDevices => {
|
||||
for target_device_id in db.users.all_device_ids(&target_user_id) {
|
||||
for target_device_id in db.users.all_device_ids(target_user_id) {
|
||||
db.users.add_to_device_event(
|
||||
&sender,
|
||||
&target_user_id,
|
||||
target_user_id,
|
||||
&target_device_id?,
|
||||
&ev_type.to_string(),
|
||||
event.deserialize_as().map_err(|_| {
|
||||
|
@ -959,7 +959,7 @@ pub(crate) async fn handle_incoming_pdu<'a>(
|
|||
db: &'a Database,
|
||||
pub_key_map: &'a RwLock<BTreeMap<String, BTreeMap<String, String>>>,
|
||||
) -> StdResult<Option<Vec<u8>>, String> {
|
||||
match db.rooms.exists(&room_id) {
|
||||
match db.rooms.exists(room_id) {
|
||||
Ok(true) => {}
|
||||
_ => {
|
||||
return Err("Room is unknown to this server.".to_string());
|
||||
|
@ -967,19 +967,19 @@ pub(crate) async fn handle_incoming_pdu<'a>(
|
|||
}
|
||||
|
||||
// 1. Skip the PDU if we already have it as a timeline event
|
||||
if let Ok(Some(pdu_id)) = db.rooms.get_pdu_id(&event_id) {
|
||||
if let Ok(Some(pdu_id)) = db.rooms.get_pdu_id(event_id) {
|
||||
return Ok(Some(pdu_id.to_vec()));
|
||||
}
|
||||
|
||||
let create_event = db
|
||||
.rooms
|
||||
.room_state_get(&room_id, &EventType::RoomCreate, "")
|
||||
.room_state_get(room_id, &EventType::RoomCreate, "")
|
||||
.map_err(|_| "Failed to ask database for event.".to_owned())?
|
||||
.ok_or_else(|| "Failed to find create event in db.".to_owned())?;
|
||||
|
||||
let first_pdu_in_room = db
|
||||
.rooms
|
||||
.first_pdu_in_room(&room_id)
|
||||
.first_pdu_in_room(room_id)
|
||||
.map_err(|_| "Error loading first room event.".to_owned())?
|
||||
.expect("Room exists");
|
||||
|
||||
|
@ -1021,7 +1021,7 @@ pub(crate) async fn handle_incoming_pdu<'a>(
|
|||
origin,
|
||||
&[prev_event_id.clone()],
|
||||
&create_event,
|
||||
&room_id,
|
||||
room_id,
|
||||
pub_key_map,
|
||||
)
|
||||
.await
|
||||
|
@ -1049,12 +1049,12 @@ pub(crate) async fn handle_incoming_pdu<'a>(
|
|||
(*prev_event_id).clone(),
|
||||
pdu.prev_events.iter().cloned().collect(),
|
||||
);
|
||||
eventid_info.insert(prev_event_id.clone(), (pdu, json));
|
||||
} else {
|
||||
// Time based check failed
|
||||
graph.insert((*prev_event_id).clone(), HashSet::new());
|
||||
eventid_info.insert(prev_event_id.clone(), (pdu, json));
|
||||
}
|
||||
|
||||
eventid_info.insert(prev_event_id.clone(), (pdu, json));
|
||||
} else {
|
||||
// Get json failed
|
||||
graph.insert((*prev_event_id).clone(), HashSet::new());
|
||||
|
@ -1146,7 +1146,7 @@ fn handle_outlier_pdu<'a>(
|
|||
|
||||
// We go through all the signatures we see on the value and fetch the corresponding signing
|
||||
// keys
|
||||
fetch_required_signing_keys(&value, &pub_key_map, db)
|
||||
fetch_required_signing_keys(&value, pub_key_map, db)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
|
@ -1210,8 +1210,8 @@ fn handle_outlier_pdu<'a>(
|
|||
.cloned()
|
||||
.map(Arc::new)
|
||||
.collect::<Vec<_>>(),
|
||||
&create_event,
|
||||
&room_id,
|
||||
create_event,
|
||||
room_id,
|
||||
pub_key_map,
|
||||
)
|
||||
.await;
|
||||
|
@ -1256,7 +1256,7 @@ fn handle_outlier_pdu<'a>(
|
|||
if auth_events
|
||||
.get(&(EventType::RoomCreate, "".to_owned()))
|
||||
.map(|a| a.as_ref())
|
||||
!= Some(&create_event)
|
||||
!= Some(create_event)
|
||||
{
|
||||
return Err("Incoming event refers to wrong create event.".to_owned());
|
||||
}
|
||||
|
@ -1273,8 +1273,6 @@ fn handle_outlier_pdu<'a>(
|
|||
None
|
||||
};
|
||||
|
||||
let incoming_pdu = Arc::new(incoming_pdu.clone());
|
||||
|
||||
if !state_res::event_auth::auth_check(
|
||||
&room_version,
|
||||
&incoming_pdu,
|
||||
|
@ -1295,7 +1293,7 @@ fn handle_outlier_pdu<'a>(
|
|||
.map_err(|_| "Failed to add pdu as outlier.".to_owned())?;
|
||||
debug!("Added pdu as outlier.");
|
||||
|
||||
Ok((incoming_pdu, val))
|
||||
Ok((Arc::new(incoming_pdu), val))
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -1427,7 +1425,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|||
}
|
||||
|
||||
auth_chain_sets.push(
|
||||
get_auth_chain(&room_id, starting_events, db)
|
||||
get_auth_chain(room_id, starting_events, db)
|
||||
.map_err(|_| "Failed to load auth chain.".to_owned())?
|
||||
.map(|event_id| (*event_id).clone())
|
||||
.collect(),
|
||||
|
@ -1478,7 +1476,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|||
&db.globals,
|
||||
origin,
|
||||
get_room_state_ids::v1::Request {
|
||||
room_id: &room_id,
|
||||
room_id,
|
||||
event_id: &incoming_pdu.event_id,
|
||||
},
|
||||
)
|
||||
|
@ -1487,15 +1485,15 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|||
Ok(res) => {
|
||||
warn!("Fetching state events at event.");
|
||||
let state_vec = fetch_and_handle_outliers(
|
||||
&db,
|
||||
db,
|
||||
origin,
|
||||
&res.pdu_ids
|
||||
.iter()
|
||||
.cloned()
|
||||
.map(Arc::new)
|
||||
.collect::<Vec<_>>(),
|
||||
&create_event,
|
||||
&room_id,
|
||||
create_event,
|
||||
room_id,
|
||||
pub_key_map,
|
||||
)
|
||||
.await;
|
||||
|
@ -1568,11 +1566,11 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|||
None::<PduEvent>, // TODO: third party invite
|
||||
|k, s| {
|
||||
db.rooms
|
||||
.get_shortstatekey(&k, &s)
|
||||
.get_shortstatekey(k, s)
|
||||
.ok()
|
||||
.flatten()
|
||||
.and_then(|shortstatekey| state_at_incoming_event.get(&shortstatekey))
|
||||
.and_then(|event_id| db.rooms.get_pdu(&event_id).ok().flatten())
|
||||
.and_then(|event_id| db.rooms.get_pdu(event_id).ok().flatten())
|
||||
},
|
||||
)
|
||||
.map_err(|_e| "Auth check failed.".to_owned())?;
|
||||
|
@ -1598,7 +1596,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|||
// applied. We start with the previous extremities (aka leaves)
|
||||
let mut extremities = db
|
||||
.rooms
|
||||
.get_pdu_leaves(&room_id)
|
||||
.get_pdu_leaves(room_id)
|
||||
.map_err(|_| "Failed to load room leaves".to_owned())?;
|
||||
|
||||
// Remove any forward extremities that are referenced by this incoming event's prev_events
|
||||
|
@ -1609,11 +1607,11 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|||
}
|
||||
|
||||
// Only keep those extremities were not referenced yet
|
||||
extremities.retain(|id| !matches!(db.rooms.is_event_referenced(&room_id, id), Ok(true)));
|
||||
extremities.retain(|id| !matches!(db.rooms.is_event_referenced(room_id, id), Ok(true)));
|
||||
|
||||
let current_sstatehash = db
|
||||
.rooms
|
||||
.current_shortstatehash(&room_id)
|
||||
.current_shortstatehash(room_id)
|
||||
.map_err(|_| "Failed to load current state hash.".to_owned())?
|
||||
.expect("every room has state");
|
||||
|
||||
|
@ -1625,7 +1623,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|||
let auth_events = db
|
||||
.rooms
|
||||
.get_auth_events(
|
||||
&room_id,
|
||||
room_id,
|
||||
&incoming_pdu.kind,
|
||||
&incoming_pdu.sender,
|
||||
incoming_pdu.state_key.as_deref(),
|
||||
|
@ -1637,7 +1635,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|||
.iter()
|
||||
.map(|(shortstatekey, id)| {
|
||||
db.rooms
|
||||
.compress_state_event(*shortstatekey, &id, &db.globals)
|
||||
.compress_state_event(*shortstatekey, id, &db.globals)
|
||||
.map_err(|_| "Failed to compress_state_event".to_owned())
|
||||
})
|
||||
.collect::<StdResult<_, String>>()?;
|
||||
|
@ -1656,7 +1654,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|||
|
||||
if soft_fail {
|
||||
append_incoming_pdu(
|
||||
&db,
|
||||
db,
|
||||
&incoming_pdu,
|
||||
val,
|
||||
extremities,
|
||||
|
@ -1680,7 +1678,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|||
for id in dbg!(&extremities) {
|
||||
match db
|
||||
.rooms
|
||||
.get_pdu(&id)
|
||||
.get_pdu(id)
|
||||
.map_err(|_| "Failed to ask db for pdu.".to_owned())?
|
||||
{
|
||||
Some(leaf_pdu) => {
|
||||
|
@ -1757,7 +1755,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|||
.iter()
|
||||
.map(|(k, id)| {
|
||||
db.rooms
|
||||
.compress_state_event(*k, &id, &db.globals)
|
||||
.compress_state_event(*k, id, &db.globals)
|
||||
.map_err(|_| "Failed to compress_state_event.".to_owned())
|
||||
})
|
||||
.collect::<StdResult<_, String>>()?
|
||||
|
@ -1769,7 +1767,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|||
for state in &fork_states {
|
||||
auth_chain_sets.push(
|
||||
get_auth_chain(
|
||||
&room_id,
|
||||
room_id,
|
||||
state.iter().map(|(_, id)| id.clone()).collect(),
|
||||
db,
|
||||
)
|
||||
|
@ -1828,7 +1826,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|||
// Set the new room state to the resolved state
|
||||
if update_state {
|
||||
db.rooms
|
||||
.force_state(&room_id, new_room_state, &db)
|
||||
.force_state(room_id, new_room_state, db)
|
||||
.map_err(|_| "Failed to set new room state.".to_owned())?;
|
||||
}
|
||||
debug!("Updated resolved state");
|
||||
|
@ -1841,7 +1839,7 @@ async fn upgrade_outlier_to_timeline_pdu(
|
|||
// represent the state for this event.
|
||||
|
||||
let pdu_id = append_incoming_pdu(
|
||||
&db,
|
||||
db,
|
||||
&incoming_pdu,
|
||||
val,
|
||||
extremities,
|
||||
|
@ -1886,7 +1884,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
|
|||
|
||||
let mut pdus = vec![];
|
||||
for id in events {
|
||||
if let Some((time, tries)) = db.globals.bad_event_ratelimiter.read().unwrap().get(&id) {
|
||||
if let Some((time, tries)) = db.globals.bad_event_ratelimiter.read().unwrap().get(id) {
|
||||
// Exponential backoff
|
||||
let mut min_elapsed_duration = Duration::from_secs(5 * 60) * (*tries) * (*tries);
|
||||
if min_elapsed_duration > Duration::from_secs(60 * 60 * 24) {
|
||||
|
@ -1902,7 +1900,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
|
|||
// a. Look in the main timeline (pduid_pdu tree)
|
||||
// b. Look at outlier pdu tree
|
||||
// (get_pdu_json checks both)
|
||||
let local_pdu = db.rooms.get_pdu(&id);
|
||||
let local_pdu = db.rooms.get_pdu(id);
|
||||
let pdu = match local_pdu {
|
||||
Ok(Some(pdu)) => {
|
||||
trace!("Found {} in db", id);
|
||||
|
@ -1916,7 +1914,7 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
|
|||
.send_federation_request(
|
||||
&db.globals,
|
||||
origin,
|
||||
get_event::v1::Request { event_id: &id },
|
||||
get_event::v1::Request { event_id: id },
|
||||
)
|
||||
.await
|
||||
{
|
||||
|
@ -1940,8 +1938,8 @@ pub(crate) fn fetch_and_handle_outliers<'a>(
|
|||
match handle_outlier_pdu(
|
||||
origin,
|
||||
create_event,
|
||||
&id,
|
||||
&room_id,
|
||||
id,
|
||||
room_id,
|
||||
value.clone(),
|
||||
db,
|
||||
pub_key_map,
|
||||
|
@ -2089,7 +2087,7 @@ pub(crate) async fn fetch_signing_keys(
|
|||
.sending
|
||||
.send_federation_request(
|
||||
&db.globals,
|
||||
&server,
|
||||
server,
|
||||
get_remote_server_keys::v2::Request::new(
|
||||
origin,
|
||||
MilliSecondsSinceUnixEpoch::from_system_time(
|
||||
|
@ -2168,7 +2166,7 @@ fn append_incoming_pdu(
|
|||
pdu,
|
||||
pdu_json,
|
||||
&new_room_leaves.into_iter().collect::<Vec<_>>(),
|
||||
&db,
|
||||
db,
|
||||
)?;
|
||||
|
||||
for appservice in db.appservice.all()? {
|
||||
|
@ -2206,7 +2204,7 @@ fn append_incoming_pdu(
|
|||
&& pdu
|
||||
.state_key
|
||||
.as_ref()
|
||||
.map_or(false, |state_key| users.is_match(&state_key))
|
||||
.map_or(false, |state_key| users.is_match(state_key))
|
||||
};
|
||||
let matching_aliases = |aliases: &Regex| {
|
||||
db.rooms
|
||||
|
@ -2273,7 +2271,7 @@ pub(crate) fn get_auth_chain<'a>(
|
|||
chunk_cache.extend(cached.iter().cloned());
|
||||
} else {
|
||||
misses2 += 1;
|
||||
let auth_chain = Arc::new(get_auth_chain_inner(&room_id, &event_id, db)?);
|
||||
let auth_chain = Arc::new(get_auth_chain_inner(room_id, &event_id, db)?);
|
||||
db.rooms
|
||||
.cache_auth_chain(vec![sevent_id], Arc::clone(&auth_chain))?;
|
||||
println!(
|
||||
|
@ -2821,7 +2819,7 @@ async fn create_join_event(
|
|||
// We need to return the state prior to joining, let's keep a reference to that here
|
||||
let shortstatehash = db
|
||||
.rooms
|
||||
.current_shortstatehash(&room_id)?
|
||||
.current_shortstatehash(room_id)?
|
||||
.ok_or(Error::BadRequest(
|
||||
ErrorKind::NotFound,
|
||||
"Pdu state not found.",
|
||||
|
@ -2831,7 +2829,7 @@ async fn create_join_event(
|
|||
// let mut auth_cache = EventMap::new();
|
||||
|
||||
// We do not add the event_id field to the pdu here because of signature and hashes checks
|
||||
let (event_id, value) = match crate::pdu::gen_event_id_canonical_json(&pdu) {
|
||||
let (event_id, value) = match crate::pdu::gen_event_id_canonical_json(pdu) {
|
||||
Ok(t) => t,
|
||||
Err(_) => {
|
||||
// Event could not be converted to canonical json
|
||||
|
@ -2860,7 +2858,7 @@ async fn create_join_event(
|
|||
.or_default(),
|
||||
);
|
||||
let mutex_lock = mutex.lock().await;
|
||||
let pdu_id = handle_incoming_pdu(&origin, &event_id, &room_id, value, true, &db, &pub_key_map)
|
||||
let pdu_id = handle_incoming_pdu(&origin, &event_id, room_id, value, true, db, &pub_key_map)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
warn!("Error while handling incoming send join PDU: {}", e);
|
||||
|
@ -2877,14 +2875,14 @@ async fn create_join_event(
|
|||
|
||||
let state_ids = db.rooms.state_full_ids(shortstatehash)?;
|
||||
let auth_chain_ids = get_auth_chain(
|
||||
&room_id,
|
||||
room_id,
|
||||
state_ids.iter().map(|(_, id)| id.clone()).collect(),
|
||||
&db,
|
||||
db,
|
||||
)?;
|
||||
|
||||
for server in db
|
||||
.rooms
|
||||
.room_servers(&room_id)
|
||||
.room_servers(room_id)
|
||||
.filter_map(|r| r.ok())
|
||||
.filter(|server| &**server != db.globals.server_name())
|
||||
{
|
||||
|
@ -2900,7 +2898,7 @@ async fn create_join_event(
|
|||
.collect(),
|
||||
state: state_ids
|
||||
.iter()
|
||||
.filter_map(|(_, id)| db.rooms.get_pdu_json(&id).ok().flatten())
|
||||
.filter_map(|(_, id)| db.rooms.get_pdu_json(id).ok().flatten())
|
||||
.map(PduEvent::convert_to_outgoing_federation_event)
|
||||
.collect(),
|
||||
})
|
||||
|
@ -3296,7 +3294,7 @@ fn get_server_keys_from_cache(
|
|||
|
||||
let event_id = EventId::try_from(&*format!(
|
||||
"${}",
|
||||
ruma::signatures::reference_hash(&value, &room_version)
|
||||
ruma::signatures::reference_hash(&value, room_version)
|
||||
.expect("ruma can calculate reference hashes")
|
||||
))
|
||||
.expect("ruma's reference hashes are valid event ids");
|
||||
|
@ -3388,10 +3386,10 @@ pub(crate) async fn fetch_join_signing_keys(
|
|||
// Try to fetch keys, failure is okay
|
||||
// Servers we couldn't find in the cache will be added to `servers`
|
||||
for pdu in &event.room_state.state {
|
||||
let _ = get_server_keys_from_cache(pdu, &mut servers, &room_version, &mut pkm, &db);
|
||||
let _ = get_server_keys_from_cache(pdu, &mut servers, room_version, &mut pkm, db);
|
||||
}
|
||||
for pdu in &event.room_state.auth_chain {
|
||||
let _ = get_server_keys_from_cache(pdu, &mut servers, &room_version, &mut pkm, &db);
|
||||
let _ = get_server_keys_from_cache(pdu, &mut servers, room_version, &mut pkm, db);
|
||||
}
|
||||
|
||||
drop(pkm);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue