modernize state_res w/ stream extensions
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
7c9d3f7e07
commit
fd33f9aa79
1 changed files with 111 additions and 109 deletions
|
@ -15,11 +15,10 @@ use std::{
|
||||||
borrow::Borrow,
|
borrow::Borrow,
|
||||||
cmp::{Ordering, Reverse},
|
cmp::{Ordering, Reverse},
|
||||||
collections::{BinaryHeap, HashMap, HashSet},
|
collections::{BinaryHeap, HashMap, HashSet},
|
||||||
fmt::Debug,
|
|
||||||
hash::{BuildHasher, Hash},
|
hash::{BuildHasher, Hash},
|
||||||
};
|
};
|
||||||
|
|
||||||
use futures::{Future, FutureExt, StreamExt, TryFutureExt, TryStreamExt, future, stream};
|
use futures::{Future, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt, future};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
EventId, Int, MilliSecondsSinceUnixEpoch, RoomVersionId,
|
EventId, Int, MilliSecondsSinceUnixEpoch, RoomVersionId,
|
||||||
events::{
|
events::{
|
||||||
|
@ -37,9 +36,13 @@ pub use self::{
|
||||||
room_version::RoomVersion,
|
room_version::RoomVersion,
|
||||||
};
|
};
|
||||||
use crate::{
|
use crate::{
|
||||||
debug,
|
debug, debug_error,
|
||||||
matrix::{event::Event, pdu::StateKey},
|
matrix::{event::Event, pdu::StateKey},
|
||||||
trace, warn,
|
trace,
|
||||||
|
utils::stream::{
|
||||||
|
BroadbandExt, IterStream, ReadyExt, TryBroadbandExt, TryReadyExt, WidebandExt,
|
||||||
|
},
|
||||||
|
warn,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// A mapping of event type and state_key to some value `T`, usually an
|
/// A mapping of event type and state_key to some value `T`, usually an
|
||||||
|
@ -112,19 +115,15 @@ where
|
||||||
debug!(count = conflicting.len(), "conflicting events");
|
debug!(count = conflicting.len(), "conflicting events");
|
||||||
trace!(map = ?conflicting, "conflicting events");
|
trace!(map = ?conflicting, "conflicting events");
|
||||||
|
|
||||||
let auth_chain_diff =
|
let conflicting_values = conflicting.into_values().flatten().stream();
|
||||||
get_auth_chain_diff(auth_chain_sets).chain(conflicting.into_values().flatten());
|
|
||||||
|
|
||||||
// `all_conflicted` contains unique items
|
// `all_conflicted` contains unique items
|
||||||
// synapse says `full_set = {eid for eid in full_conflicted_set if eid in
|
// synapse says `full_set = {eid for eid in full_conflicted_set if eid in
|
||||||
// event_map}`
|
// event_map}`
|
||||||
let all_conflicted: HashSet<_> = stream::iter(auth_chain_diff)
|
let all_conflicted: HashSet<_> = get_auth_chain_diff(auth_chain_sets)
|
||||||
// Don't honor events we cannot "verify"
|
.chain(conflicting_values)
|
||||||
.map(|id| event_exists(id.clone()).map(move |exists| (id, exists)))
|
.broad_filter_map(async |id| event_exists(id.clone()).await.then_some(id))
|
||||||
.buffer_unordered(parallel_fetches)
|
|
||||||
.filter_map(|(id, exists)| future::ready(exists.then_some(id)))
|
|
||||||
.collect()
|
.collect()
|
||||||
.boxed()
|
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
debug!(count = all_conflicted.len(), "full conflicted set");
|
debug!(count = all_conflicted.len(), "full conflicted set");
|
||||||
|
@ -135,12 +134,15 @@ where
|
||||||
|
|
||||||
// Get only the control events with a state_key: "" or ban/kick event (sender !=
|
// Get only the control events with a state_key: "" or ban/kick event (sender !=
|
||||||
// state_key)
|
// state_key)
|
||||||
let control_events: Vec<_> = stream::iter(all_conflicted.iter())
|
let control_events: Vec<_> = all_conflicted
|
||||||
.map(|id| is_power_event_id(id, &event_fetch).map(move |is| (id, is)))
|
.iter()
|
||||||
.buffer_unordered(parallel_fetches)
|
.stream()
|
||||||
.filter_map(|(id, is)| future::ready(is.then_some(id.clone())))
|
.wide_filter_map(async |id| {
|
||||||
|
is_power_event_id(id, &event_fetch)
|
||||||
|
.await
|
||||||
|
.then_some(id.clone())
|
||||||
|
})
|
||||||
.collect()
|
.collect()
|
||||||
.boxed()
|
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
// Sort the control events based on power_level/clock/event_id and
|
// Sort the control events based on power_level/clock/event_id and
|
||||||
|
@ -160,10 +162,9 @@ where
|
||||||
// Sequentially auth check each control event.
|
// Sequentially auth check each control event.
|
||||||
let resolved_control = iterative_auth_check(
|
let resolved_control = iterative_auth_check(
|
||||||
&room_version,
|
&room_version,
|
||||||
sorted_control_levels.iter(),
|
sorted_control_levels.iter().stream(),
|
||||||
clean.clone(),
|
clean.clone(),
|
||||||
&event_fetch,
|
&event_fetch,
|
||||||
parallel_fetches,
|
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
@ -172,36 +173,35 @@ where
|
||||||
|
|
||||||
// At this point the control_events have been resolved we now have to
|
// At this point the control_events have been resolved we now have to
|
||||||
// sort the remaining events using the mainline of the resolved power level.
|
// sort the remaining events using the mainline of the resolved power level.
|
||||||
let deduped_power_ev = sorted_control_levels.into_iter().collect::<HashSet<_>>();
|
let deduped_power_ev: HashSet<_> = sorted_control_levels.into_iter().collect();
|
||||||
|
|
||||||
// This removes the control events that passed auth and more importantly those
|
// This removes the control events that passed auth and more importantly those
|
||||||
// that failed auth
|
// that failed auth
|
||||||
let events_to_resolve = all_conflicted
|
let events_to_resolve: Vec<_> = all_conflicted
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|&id| !deduped_power_ev.contains(id.borrow()))
|
.filter(|&id| !deduped_power_ev.contains(id.borrow()))
|
||||||
.cloned()
|
.cloned()
|
||||||
.collect::<Vec<_>>();
|
.collect();
|
||||||
|
|
||||||
debug!(count = events_to_resolve.len(), "events left to resolve");
|
debug!(count = events_to_resolve.len(), "events left to resolve");
|
||||||
trace!(list = ?events_to_resolve, "events left to resolve");
|
trace!(list = ?events_to_resolve, "events left to resolve");
|
||||||
|
|
||||||
// This "epochs" power level event
|
// This "epochs" power level event
|
||||||
let power_event = resolved_control.get(&(StateEventType::RoomPowerLevels, StateKey::new()));
|
let power_levels_ty_sk = (StateEventType::RoomPowerLevels, StateKey::new());
|
||||||
|
let power_event = resolved_control.get(&power_levels_ty_sk);
|
||||||
|
|
||||||
debug!(event_id = ?power_event, "power event");
|
debug!(event_id = ?power_event, "power event");
|
||||||
|
|
||||||
let sorted_left_events =
|
let sorted_left_events =
|
||||||
mainline_sort(&events_to_resolve, power_event.cloned(), &event_fetch, parallel_fetches)
|
mainline_sort(&events_to_resolve, power_event.cloned(), &event_fetch).await?;
|
||||||
.await?;
|
|
||||||
|
|
||||||
trace!(list = ?sorted_left_events, "events left, sorted");
|
trace!(list = ?sorted_left_events, "events left, sorted");
|
||||||
|
|
||||||
let mut resolved_state = iterative_auth_check(
|
let mut resolved_state = iterative_auth_check(
|
||||||
&room_version,
|
&room_version,
|
||||||
sorted_left_events.iter(),
|
sorted_left_events.iter().stream(),
|
||||||
resolved_control, // The control events are added to the final resolved state
|
resolved_control, // The control events are added to the final resolved state
|
||||||
&event_fetch,
|
&event_fetch,
|
||||||
parallel_fetches,
|
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
@ -265,7 +265,7 @@ where
|
||||||
#[allow(clippy::arithmetic_side_effects)]
|
#[allow(clippy::arithmetic_side_effects)]
|
||||||
fn get_auth_chain_diff<Id, Hasher>(
|
fn get_auth_chain_diff<Id, Hasher>(
|
||||||
auth_chain_sets: &[HashSet<Id, Hasher>],
|
auth_chain_sets: &[HashSet<Id, Hasher>],
|
||||||
) -> impl Iterator<Item = Id> + Send + use<Id, Hasher>
|
) -> impl Stream<Item = Id> + Send + use<Id, Hasher>
|
||||||
where
|
where
|
||||||
Id: Clone + Eq + Hash + Send,
|
Id: Clone + Eq + Hash + Send,
|
||||||
Hasher: BuildHasher + Send + Sync,
|
Hasher: BuildHasher + Send + Sync,
|
||||||
|
@ -279,6 +279,7 @@ where
|
||||||
id_counts
|
id_counts
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter_map(move |(id, count)| (count < num_sets).then_some(id))
|
.filter_map(move |(id, count)| (count < num_sets).then_some(id))
|
||||||
|
.stream()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Events are sorted from "earliest" to "latest".
|
/// Events are sorted from "earliest" to "latest".
|
||||||
|
@ -310,13 +311,15 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is used in the `key_fn` passed to the lexico_topo_sort fn
|
// This is used in the `key_fn` passed to the lexico_topo_sort fn
|
||||||
let event_to_pl = stream::iter(graph.keys())
|
let event_to_pl = graph
|
||||||
|
.keys()
|
||||||
|
.stream()
|
||||||
.map(|event_id| {
|
.map(|event_id| {
|
||||||
get_power_level_for_sender(event_id.clone(), fetch_event, parallel_fetches)
|
get_power_level_for_sender(event_id.clone(), fetch_event)
|
||||||
.map(move |res| res.map(|pl| (event_id, pl)))
|
.map(move |res| res.map(|pl| (event_id, pl)))
|
||||||
})
|
})
|
||||||
.buffer_unordered(parallel_fetches)
|
.buffer_unordered(parallel_fetches)
|
||||||
.try_fold(HashMap::new(), |mut event_to_pl, (event_id, pl)| {
|
.ready_try_fold(HashMap::new(), |mut event_to_pl, (event_id, pl)| {
|
||||||
debug!(
|
debug!(
|
||||||
event_id = event_id.borrow().as_str(),
|
event_id = event_id.borrow().as_str(),
|
||||||
power_level = i64::from(pl),
|
power_level = i64::from(pl),
|
||||||
|
@ -324,7 +327,7 @@ where
|
||||||
);
|
);
|
||||||
|
|
||||||
event_to_pl.insert(event_id.clone(), pl);
|
event_to_pl.insert(event_id.clone(), pl);
|
||||||
future::ok(event_to_pl)
|
Ok(event_to_pl)
|
||||||
})
|
})
|
||||||
.boxed()
|
.boxed()
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -475,7 +478,6 @@ where
|
||||||
async fn get_power_level_for_sender<E, F, Fut>(
|
async fn get_power_level_for_sender<E, F, Fut>(
|
||||||
event_id: E::Id,
|
event_id: E::Id,
|
||||||
fetch_event: &F,
|
fetch_event: &F,
|
||||||
parallel_fetches: usize,
|
|
||||||
) -> serde_json::Result<Int>
|
) -> serde_json::Result<Int>
|
||||||
where
|
where
|
||||||
F: Fn(E::Id) -> Fut + Sync,
|
F: Fn(E::Id) -> Fut + Sync,
|
||||||
|
@ -485,19 +487,17 @@ where
|
||||||
{
|
{
|
||||||
debug!("fetch event ({event_id}) senders power level");
|
debug!("fetch event ({event_id}) senders power level");
|
||||||
|
|
||||||
let event = fetch_event(event_id.clone()).await;
|
let event = fetch_event(event_id).await;
|
||||||
|
|
||||||
let auth_events = event.as_ref().map(Event::auth_events).into_iter().flatten();
|
let auth_events = event.as_ref().map(Event::auth_events);
|
||||||
|
|
||||||
let pl = stream::iter(auth_events)
|
let pl = auth_events
|
||||||
.map(|aid| fetch_event(aid.clone()))
|
|
||||||
.buffer_unordered(parallel_fetches.min(5))
|
|
||||||
.filter_map(future::ready)
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
.boxed()
|
|
||||||
.await
|
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.find(|aev| is_type_and_key(aev, &TimelineEventType::RoomPowerLevels, ""));
|
.flatten()
|
||||||
|
.stream()
|
||||||
|
.broadn_filter_map(5, |aid| fetch_event(aid.clone()))
|
||||||
|
.ready_find(|aev| is_type_and_key(aev, &TimelineEventType::RoomPowerLevels, ""))
|
||||||
|
.await;
|
||||||
|
|
||||||
let content: PowerLevelsContentFields = match pl {
|
let content: PowerLevelsContentFields = match pl {
|
||||||
| None => return Ok(int!(0)),
|
| None => return Ok(int!(0)),
|
||||||
|
@ -525,34 +525,28 @@ where
|
||||||
/// For each `events_to_check` event we gather the events needed to auth it from
|
/// For each `events_to_check` event we gather the events needed to auth it from
|
||||||
/// the the `fetch_event` closure and verify each event using the
|
/// the the `fetch_event` closure and verify each event using the
|
||||||
/// `event_auth::auth_check` function.
|
/// `event_auth::auth_check` function.
|
||||||
async fn iterative_auth_check<'a, E, F, Fut, I>(
|
async fn iterative_auth_check<'a, E, F, Fut, S>(
|
||||||
room_version: &RoomVersion,
|
room_version: &RoomVersion,
|
||||||
events_to_check: I,
|
events_to_check: S,
|
||||||
unconflicted_state: StateMap<E::Id>,
|
unconflicted_state: StateMap<E::Id>,
|
||||||
fetch_event: &F,
|
fetch_event: &F,
|
||||||
parallel_fetches: usize,
|
|
||||||
) -> Result<StateMap<E::Id>>
|
) -> Result<StateMap<E::Id>>
|
||||||
where
|
where
|
||||||
F: Fn(E::Id) -> Fut + Sync,
|
F: Fn(E::Id) -> Fut + Sync,
|
||||||
Fut: Future<Output = Option<E>> + Send,
|
Fut: Future<Output = Option<E>> + Send,
|
||||||
E::Id: Borrow<EventId> + Clone + Eq + Ord + Send + Sync + 'a,
|
E::Id: Borrow<EventId> + Clone + Eq + Ord + Send + Sync + 'a,
|
||||||
I: Iterator<Item = &'a E::Id> + Debug + Send + 'a,
|
S: Stream<Item = &'a E::Id> + Send + 'a,
|
||||||
E: Event + Clone + Send + Sync,
|
E: Event + Clone + Send + Sync,
|
||||||
{
|
{
|
||||||
debug!("starting iterative auth check");
|
debug!("starting iterative auth check");
|
||||||
trace!(
|
|
||||||
list = ?events_to_check,
|
|
||||||
"events to check"
|
|
||||||
);
|
|
||||||
|
|
||||||
let events_to_check: Vec<_> = stream::iter(events_to_check)
|
let events_to_check: Vec<_> = events_to_check
|
||||||
.map(Result::Ok)
|
.map(Result::Ok)
|
||||||
.map_ok(|event_id| {
|
.broad_and_then(async |event_id| {
|
||||||
fetch_event(event_id.clone()).map(move |result| {
|
fetch_event(event_id.clone())
|
||||||
result.ok_or_else(|| Error::NotFound(format!("Failed to find {event_id}")))
|
.await
|
||||||
|
.ok_or_else(|| Error::NotFound(format!("Failed to find {event_id}")))
|
||||||
})
|
})
|
||||||
})
|
|
||||||
.try_buffer_unordered(parallel_fetches)
|
|
||||||
.try_collect()
|
.try_collect()
|
||||||
.boxed()
|
.boxed()
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -562,10 +556,10 @@ where
|
||||||
.flat_map(|event: &E| event.auth_events().map(Clone::clone))
|
.flat_map(|event: &E| event.auth_events().map(Clone::clone))
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let auth_events: HashMap<E::Id, E> = stream::iter(auth_event_ids.into_iter())
|
let auth_events: HashMap<E::Id, E> = auth_event_ids
|
||||||
.map(fetch_event)
|
.into_iter()
|
||||||
.buffer_unordered(parallel_fetches)
|
.stream()
|
||||||
.filter_map(future::ready)
|
.broad_filter_map(fetch_event)
|
||||||
.map(|auth_event| (auth_event.event_id().clone(), auth_event))
|
.map(|auth_event| (auth_event.event_id().clone(), auth_event))
|
||||||
.collect()
|
.collect()
|
||||||
.boxed()
|
.boxed()
|
||||||
|
@ -574,7 +568,6 @@ where
|
||||||
let auth_events = &auth_events;
|
let auth_events = &auth_events;
|
||||||
let mut resolved_state = unconflicted_state;
|
let mut resolved_state = unconflicted_state;
|
||||||
for event in &events_to_check {
|
for event in &events_to_check {
|
||||||
let event_id = event.event_id();
|
|
||||||
let state_key = event
|
let state_key = event
|
||||||
.state_key()
|
.state_key()
|
||||||
.ok_or_else(|| Error::InvalidPdu("State event had no state key".to_owned()))?;
|
.ok_or_else(|| Error::InvalidPdu("State event had no state key".to_owned()))?;
|
||||||
|
@ -603,11 +596,10 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
stream::iter(
|
|
||||||
auth_types
|
auth_types
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|key| Some((key, resolved_state.get(key)?))),
|
.stream()
|
||||||
)
|
.ready_filter_map(|key| Some((key, resolved_state.get(key)?)))
|
||||||
.filter_map(|(key, ev_id)| async move {
|
.filter_map(|(key, ev_id)| async move {
|
||||||
if let Some(event) = auth_events.get(ev_id.borrow()) {
|
if let Some(event) = auth_events.get(ev_id.borrow()) {
|
||||||
Some((key, event.clone()))
|
Some((key, event.clone()))
|
||||||
|
@ -615,10 +607,9 @@ where
|
||||||
Some((key, fetch_event(ev_id.clone()).await?))
|
Some((key, fetch_event(ev_id.clone()).await?))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.for_each(|(key, event)| {
|
.ready_for_each(|(key, event)| {
|
||||||
//TODO: synapse checks "rejected_reason" is None here
|
//TODO: synapse checks "rejected_reason" is None here
|
||||||
auth_state.insert(key.to_owned(), event);
|
auth_state.insert(key.to_owned(), event);
|
||||||
future::ready(())
|
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
|
@ -634,12 +625,25 @@ where
|
||||||
future::ready(auth_state.get(&ty.with_state_key(key)))
|
future::ready(auth_state.get(&ty.with_state_key(key)))
|
||||||
};
|
};
|
||||||
|
|
||||||
if auth_check(room_version, &event, current_third_party.as_ref(), fetch_state).await? {
|
let auth_result =
|
||||||
|
auth_check(room_version, &event, current_third_party.as_ref(), fetch_state).await;
|
||||||
|
|
||||||
|
match auth_result {
|
||||||
|
| Ok(true) => {
|
||||||
// add event to resolved state map
|
// add event to resolved state map
|
||||||
resolved_state.insert(event.event_type().with_state_key(state_key), event_id.clone());
|
resolved_state.insert(
|
||||||
} else {
|
event.event_type().with_state_key(state_key),
|
||||||
|
event.event_id().clone(),
|
||||||
|
);
|
||||||
|
},
|
||||||
|
| Ok(false) => {
|
||||||
// synapse passes here on AuthError. We do not add this event to resolved_state.
|
// synapse passes here on AuthError. We do not add this event to resolved_state.
|
||||||
warn!("event {event_id} failed the authentication check");
|
warn!("event {} failed the authentication check", event.event_id());
|
||||||
|
},
|
||||||
|
| Err(e) => {
|
||||||
|
debug_error!("event {} failed the authentication check: {e}", event.event_id());
|
||||||
|
return Err(e);
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -659,7 +663,6 @@ async fn mainline_sort<E, F, Fut>(
|
||||||
to_sort: &[E::Id],
|
to_sort: &[E::Id],
|
||||||
resolved_power_level: Option<E::Id>,
|
resolved_power_level: Option<E::Id>,
|
||||||
fetch_event: &F,
|
fetch_event: &F,
|
||||||
parallel_fetches: usize,
|
|
||||||
) -> Result<Vec<E::Id>>
|
) -> Result<Vec<E::Id>>
|
||||||
where
|
where
|
||||||
F: Fn(E::Id) -> Fut + Sync,
|
F: Fn(E::Id) -> Fut + Sync,
|
||||||
|
@ -682,11 +685,13 @@ where
|
||||||
let event = fetch_event(p.clone())
|
let event = fetch_event(p.clone())
|
||||||
.await
|
.await
|
||||||
.ok_or_else(|| Error::NotFound(format!("Failed to find {p}")))?;
|
.ok_or_else(|| Error::NotFound(format!("Failed to find {p}")))?;
|
||||||
|
|
||||||
pl = None;
|
pl = None;
|
||||||
for aid in event.auth_events() {
|
for aid in event.auth_events() {
|
||||||
let ev = fetch_event(aid.clone())
|
let ev = fetch_event(aid.clone())
|
||||||
.await
|
.await
|
||||||
.ok_or_else(|| Error::NotFound(format!("Failed to find {aid}")))?;
|
.ok_or_else(|| Error::NotFound(format!("Failed to find {aid}")))?;
|
||||||
|
|
||||||
if is_type_and_key(&ev, &TimelineEventType::RoomPowerLevels, "") {
|
if is_type_and_key(&ev, &TimelineEventType::RoomPowerLevels, "") {
|
||||||
pl = Some(aid.to_owned());
|
pl = Some(aid.to_owned());
|
||||||
break;
|
break;
|
||||||
|
@ -694,36 +699,32 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let mainline_map = mainline
|
let mainline_map: HashMap<_, _> = mainline
|
||||||
.iter()
|
.iter()
|
||||||
.rev()
|
.rev()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.map(|(idx, eid)| ((*eid).clone(), idx))
|
.map(|(idx, eid)| ((*eid).clone(), idx))
|
||||||
.collect::<HashMap<_, _>>();
|
.collect();
|
||||||
|
|
||||||
let order_map = stream::iter(to_sort.iter())
|
let order_map: HashMap<_, _> = to_sort
|
||||||
.map(|ev_id| {
|
.iter()
|
||||||
fetch_event(ev_id.clone()).map(move |event| event.map(|event| (event, ev_id)))
|
.stream()
|
||||||
|
.broad_filter_map(async |ev_id| {
|
||||||
|
fetch_event(ev_id.clone()).await.map(|event| (event, ev_id))
|
||||||
})
|
})
|
||||||
.buffer_unordered(parallel_fetches)
|
.broad_filter_map(|(event, ev_id)| {
|
||||||
.filter_map(future::ready)
|
|
||||||
.map(|(event, ev_id)| {
|
|
||||||
get_mainline_depth(Some(event.clone()), &mainline_map, fetch_event)
|
get_mainline_depth(Some(event.clone()), &mainline_map, fetch_event)
|
||||||
.map_ok(move |depth| (depth, event, ev_id))
|
.map_ok(move |depth| (ev_id, (depth, event.origin_server_ts(), ev_id)))
|
||||||
.map(Result::ok)
|
.map(Result::ok)
|
||||||
})
|
})
|
||||||
.buffer_unordered(parallel_fetches)
|
.collect()
|
||||||
.filter_map(future::ready)
|
|
||||||
.fold(HashMap::new(), |mut order_map, (depth, event, ev_id)| {
|
|
||||||
order_map.insert(ev_id, (depth, event.origin_server_ts(), ev_id));
|
|
||||||
future::ready(order_map)
|
|
||||||
})
|
|
||||||
.boxed()
|
.boxed()
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
// Sort the event_ids by their depth, timestamp and EventId
|
// Sort the event_ids by their depth, timestamp and EventId
|
||||||
// unwrap is OK order map and sort_event_ids are from to_sort (the same Vec)
|
// unwrap is OK order map and sort_event_ids are from to_sort (the same Vec)
|
||||||
let mut sort_event_ids = order_map.keys().map(|&k| k.clone()).collect::<Vec<_>>();
|
let mut sort_event_ids: Vec<_> = order_map.keys().map(|&k| k.clone()).collect();
|
||||||
|
|
||||||
sort_event_ids.sort_by_key(|sort_id| &order_map[sort_id]);
|
sort_event_ids.sort_by_key(|sort_id| &order_map[sort_id]);
|
||||||
|
|
||||||
Ok(sort_event_ids)
|
Ok(sort_event_ids)
|
||||||
|
@ -744,6 +745,7 @@ where
|
||||||
{
|
{
|
||||||
while let Some(sort_ev) = event {
|
while let Some(sort_ev) = event {
|
||||||
debug!(event_id = sort_ev.event_id().borrow().as_str(), "mainline");
|
debug!(event_id = sort_ev.event_id().borrow().as_str(), "mainline");
|
||||||
|
|
||||||
let id = sort_ev.event_id();
|
let id = sort_ev.event_id();
|
||||||
if let Some(depth) = mainline_map.get(id.borrow()) {
|
if let Some(depth) = mainline_map.get(id.borrow()) {
|
||||||
return Ok(*depth);
|
return Ok(*depth);
|
||||||
|
@ -754,6 +756,7 @@ where
|
||||||
let aev = fetch_event(aid.clone())
|
let aev = fetch_event(aid.clone())
|
||||||
.await
|
.await
|
||||||
.ok_or_else(|| Error::NotFound(format!("Failed to find {aid}")))?;
|
.ok_or_else(|| Error::NotFound(format!("Failed to find {aid}")))?;
|
||||||
|
|
||||||
if is_type_and_key(&aev, &TimelineEventType::RoomPowerLevels, "") {
|
if is_type_and_key(&aev, &TimelineEventType::RoomPowerLevels, "") {
|
||||||
event = Some(aev);
|
event = Some(aev);
|
||||||
break;
|
break;
|
||||||
|
@ -884,7 +887,7 @@ mod tests {
|
||||||
zara,
|
zara,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
use crate::debug;
|
use crate::{debug, utils::stream::IterStream};
|
||||||
|
|
||||||
async fn test_event_sort() {
|
async fn test_event_sort() {
|
||||||
use futures::future::ready;
|
use futures::future::ready;
|
||||||
|
@ -915,10 +918,9 @@ mod tests {
|
||||||
|
|
||||||
let resolved_power = super::iterative_auth_check(
|
let resolved_power = super::iterative_auth_check(
|
||||||
&RoomVersion::V6,
|
&RoomVersion::V6,
|
||||||
sorted_power_events.iter(),
|
sorted_power_events.iter().stream(),
|
||||||
HashMap::new(), // unconflicted events
|
HashMap::new(), // unconflicted events
|
||||||
&fetcher,
|
&fetcher,
|
||||||
1,
|
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
.expect("iterative auth check failed on resolved events");
|
.expect("iterative auth check failed on resolved events");
|
||||||
|
@ -932,7 +934,7 @@ mod tests {
|
||||||
.get(&(StateEventType::RoomPowerLevels, "".into()))
|
.get(&(StateEventType::RoomPowerLevels, "".into()))
|
||||||
.cloned();
|
.cloned();
|
||||||
|
|
||||||
let sorted_event_ids = super::mainline_sort(&events_to_sort, power_level, &fetcher, 1)
|
let sorted_event_ids = super::mainline_sort(&events_to_sort, power_level, &fetcher)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue