implement several broadband loops

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-12-04 00:00:40 +00:00
parent 59d5e3ebf1
commit 1d02851028
7 changed files with 47 additions and 22 deletions

View file

@ -1,10 +1,10 @@
use axum::extract::State; use axum::extract::State;
use conduit::{ use conduit::{
at, at,
utils::{result::FlatOk, IterStream, ReadyExt}, utils::{result::FlatOk, stream::WidebandExt, IterStream, ReadyExt},
PduCount, Result, PduCount, Result,
}; };
use futures::{FutureExt, StreamExt}; use futures::StreamExt;
use ruma::{ use ruma::{
api::{ api::{
client::relations::{ client::relations::{
@ -138,11 +138,10 @@ async fn paginate_relations_with_filter(
.is_none_or(|rel_type| pdu.relation_type_equal(rel_type)) .is_none_or(|rel_type| pdu.relation_type_equal(rel_type))
}) })
.stream() .stream()
.filter_map(|item| visibility_filter(services, sender_user, item))
.ready_take_while(|(count, _)| Some(*count) != to) .ready_take_while(|(count, _)| Some(*count) != to)
.wide_filter_map(|item| visibility_filter(services, sender_user, item))
.take(limit) .take(limit)
.collect() .collect()
.boxed()
.await; .await;
let next_batch = match dir { let next_batch = match dir {

View file

@ -1,7 +1,10 @@
mod v3; mod v3;
mod v4; mod v4;
use conduit::{utils::ReadyExt, PduCount}; use conduit::{
utils::stream::{BroadbandExt, ReadyExt},
PduCount,
};
use futures::StreamExt; use futures::StreamExt;
use ruma::{RoomId, UserId}; use ruma::{RoomId, UserId};
@ -55,7 +58,7 @@ async fn share_encrypted_room(
.state_cache .state_cache
.get_shared_rooms(sender_user, user_id) .get_shared_rooms(sender_user, user_id)
.ready_filter(|&room_id| Some(room_id) != ignore_room) .ready_filter(|&room_id| Some(room_id) != ignore_room)
.any(|other_room_id| { .broad_any(|other_room_id| {
services services
.rooms .rooms
.state_accessor .state_accessor

View file

@ -3,7 +3,12 @@
use std::{borrow::Borrow, collections::HashMap}; use std::{borrow::Borrow, collections::HashMap};
use axum::extract::State; use axum::extract::State;
use conduit::{err, pdu::gen_event_id_canonical_json, utils::IterStream, warn, Error, Result}; use conduit::{
err,
pdu::gen_event_id_canonical_json,
utils::stream::{IterStream, TryBroadbandExt},
warn, Error, Result,
};
use futures::{FutureExt, StreamExt, TryStreamExt}; use futures::{FutureExt, StreamExt, TryStreamExt};
use ruma::{ use ruma::{
api::{client::error::ErrorKind, federation::membership::create_join_event}, api::{client::error::ErrorKind, federation::membership::create_join_event},
@ -160,6 +165,7 @@ async fn create_join_event(
.rooms .rooms
.event_handler .event_handler
.handle_incoming_pdu(&origin, room_id, &event_id, value.clone(), true) .handle_incoming_pdu(&origin, room_id, &event_id, value.clone(), true)
.boxed()
.await? .await?
.ok_or_else(|| err!(Request(InvalidParam("Could not accept as timeline event."))))?; .ok_or_else(|| err!(Request(InvalidParam("Could not accept as timeline event."))))?;
@ -172,16 +178,17 @@ async fn create_join_event(
.await?; .await?;
let state = state_ids let state = state_ids
.iter() .values()
.try_stream() .try_stream()
.and_then(|(_, event_id)| services.rooms.timeline.get_pdu_json(event_id)) .broad_and_then(|event_id| services.rooms.timeline.get_pdu_json(event_id))
.and_then(|pdu| { .broad_and_then(|pdu| {
services services
.sending .sending
.convert_to_outgoing_federation_event(pdu) .convert_to_outgoing_federation_event(pdu)
.map(Ok) .map(Ok)
}) })
.try_collect() .try_collect()
.boxed()
.await?; .await?;
let starting_events = state_ids.values().map(Borrow::borrow); let starting_events = state_ids.values().map(Borrow::borrow);
@ -191,14 +198,15 @@ async fn create_join_event(
.event_ids_iter(room_id, starting_events) .event_ids_iter(room_id, starting_events)
.await? .await?
.map(Ok) .map(Ok)
.and_then(|event_id| async move { services.rooms.timeline.get_pdu_json(&event_id).await }) .broad_and_then(|event_id| async move { services.rooms.timeline.get_pdu_json(&event_id).await })
.and_then(|pdu| { .broad_and_then(|pdu| {
services services
.sending .sending
.convert_to_outgoing_federation_event(pdu) .convert_to_outgoing_federation_event(pdu)
.map(Ok) .map(Ok)
}) })
.try_collect() .try_collect()
.boxed()
.await?; .await?;
services.sending.send_pdu_room(room_id, &pdu_id).await?; services.sending.send_pdu_room(room_id, &pdu_id).await?;

View file

@ -4,7 +4,11 @@ use std::{
sync::Arc, sync::Arc,
}; };
use conduit::{debug, err, implement, utils::IterStream, Result}; use conduit::{
debug, err, implement,
utils::stream::{IterStream, WidebandExt},
Result,
};
use futures::{FutureExt, StreamExt, TryFutureExt}; use futures::{FutureExt, StreamExt, TryFutureExt};
use ruma::{ use ruma::{
state_res::{self, StateMap}, state_res::{self, StateMap},
@ -52,11 +56,11 @@ pub async fn resolve_state(
let fork_states: Vec<StateMap<Arc<EventId>>> = fork_states let fork_states: Vec<StateMap<Arc<EventId>>> = fork_states
.into_iter() .into_iter()
.stream() .stream()
.then(|fork_state| { .wide_then(|fork_state| {
fork_state fork_state
.into_iter() .into_iter()
.stream() .stream()
.filter_map(|(k, id)| { .wide_filter_map(|(k, id)| {
self.services self.services
.short .short
.get_statekey_from_short(k) .get_statekey_from_short(k)
@ -83,7 +87,7 @@ pub async fn resolve_state(
let state_events: Vec<_> = state let state_events: Vec<_> = state
.iter() .iter()
.stream() .stream()
.then(|((event_type, state_key), event_id)| { .wide_then(|((event_type, state_key), event_id)| {
self.services self.services
.short .short
.get_or_create_shortstatekey(event_type, state_key) .get_or_create_shortstatekey(event_type, state_key)

View file

@ -4,7 +4,12 @@ use std::{
sync::Arc, sync::Arc,
}; };
use conduit::{debug, err, implement, result::LogErr, utils::IterStream, PduEvent, Result}; use conduit::{
debug, err, implement,
result::LogErr,
utils::stream::{BroadbandExt, IterStream},
PduEvent, Result,
};
use futures::{FutureExt, StreamExt}; use futures::{FutureExt, StreamExt};
use ruma::{ use ruma::{
state_res::{self, StateMap}, state_res::{self, StateMap},
@ -166,7 +171,7 @@ pub(super) async fn state_at_incoming_resolved(
new_state new_state
.iter() .iter()
.stream() .stream()
.then(|((event_type, state_key), event_id)| { .broad_then(|((event_type, state_key), event_id)| {
self.services self.services
.short .short
.get_or_create_shortstatekey(event_type, state_key) .get_or_create_shortstatekey(event_type, state_key)

View file

@ -3,7 +3,10 @@ use std::{mem::size_of, sync::Arc};
use arrayvec::ArrayVec; use arrayvec::ArrayVec;
use conduit::{ use conduit::{
result::LogErr, result::LogErr,
utils::{stream::TryIgnore, u64_from_u8, ReadyExt}, utils::{
stream::{TryIgnore, WidebandExt},
u64_from_u8, ReadyExt,
},
PduCount, PduEvent, PduCount, PduEvent,
}; };
use database::Map; use database::Map;
@ -67,7 +70,7 @@ impl Data {
.ready_take_while(move |key| key.starts_with(&target.to_be_bytes())) .ready_take_while(move |key| key.starts_with(&target.to_be_bytes()))
.map(|to_from| u64_from_u8(&to_from[8..16])) .map(|to_from| u64_from_u8(&to_from[8..16]))
.map(PduCount::from_unsigned) .map(PduCount::from_unsigned)
.filter_map(move |shorteventid| async move { .wide_filter_map(move |shorteventid| async move {
let pdu_id: RawPduId = PduId { let pdu_id: RawPduId = PduId {
shortroomid, shortroomid,
shorteventid, shorteventid,

View file

@ -2,7 +2,10 @@ use std::{collections::BTreeMap, sync::Arc};
use conduit::{ use conduit::{
err, err,
utils::{stream::TryIgnore, ReadyExt}, utils::{
stream::{TryIgnore, WidebandExt},
ReadyExt,
},
PduCount, PduEvent, PduId, RawPduId, Result, PduCount, PduEvent, PduId, RawPduId, Result,
}; };
use database::{Deserialized, Map}; use database::{Deserialized, Map};
@ -143,7 +146,7 @@ impl Service {
.ignore_err() .ignore_err()
.map(RawPduId::from) .map(RawPduId::from)
.ready_take_while(move |pdu_id| pdu_id.shortroomid() == shortroomid.to_be_bytes()) .ready_take_while(move |pdu_id| pdu_id.shortroomid() == shortroomid.to_be_bytes())
.filter_map(move |pdu_id| async move { .wide_filter_map(move |pdu_id| async move {
let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?; let mut pdu = self.services.timeline.get_pdu_from_id(&pdu_id).await.ok()?;
let pdu_id: PduId = pdu_id.into(); let pdu_id: PduId = pdu_id.into();