refactor sending interface stack
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
f956e8c3b5
commit
4becbed2a7
5 changed files with 95 additions and 68 deletions
|
@ -1348,16 +1348,7 @@ pub(crate) async fn invite_helper(
|
||||||
"Could not accept incoming PDU as timeline event.",
|
"Could not accept incoming PDU as timeline event.",
|
||||||
))?;
|
))?;
|
||||||
|
|
||||||
// Bind to variable because of lifetimes
|
services().sending.send_pdu_room(room_id, &pdu_id)?;
|
||||||
let servers = services()
|
|
||||||
.rooms
|
|
||||||
.state_cache
|
|
||||||
.room_servers(room_id)
|
|
||||||
.filter_map(Result::ok)
|
|
||||||
.filter(|server| &**server != services().globals.server_name());
|
|
||||||
|
|
||||||
services().sending.send_pdu(servers, &pdu_id)?;
|
|
||||||
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -37,7 +37,7 @@ pub async fn send_event_to_device_route(
|
||||||
messages.insert(target_user_id.clone(), map);
|
messages.insert(target_user_id.clone(), map);
|
||||||
let count = services().globals.next_count()?;
|
let count = services().globals.next_count()?;
|
||||||
|
|
||||||
services().sending.send_reliable_edu(
|
services().sending.send_edu_server(
|
||||||
target_user_id.server_name(),
|
target_user_id.server_name(),
|
||||||
serde_json::to_vec(&federation::transactions::edu::Edu::DirectToDevice(DirectDeviceContent {
|
serde_json::to_vec(&federation::transactions::edu::Edu::DirectToDevice(DirectDeviceContent {
|
||||||
sender: sender_user.clone(),
|
sender: sender_user.clone(),
|
||||||
|
@ -46,7 +46,6 @@ pub async fn send_event_to_device_route(
|
||||||
messages,
|
messages,
|
||||||
}))
|
}))
|
||||||
.expect("DirectToDevice EDU can be serialized"),
|
.expect("DirectToDevice EDU can be serialized"),
|
||||||
count,
|
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -1672,14 +1672,7 @@ async fn create_join_event(
|
||||||
.get_auth_chain(room_id, state_ids.values().cloned().collect())
|
.get_auth_chain(room_id, state_ids.values().cloned().collect())
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let servers = services()
|
services().sending.send_pdu_room(room_id, &pdu_id)?;
|
||||||
.rooms
|
|
||||||
.state_cache
|
|
||||||
.room_servers(room_id)
|
|
||||||
.filter_map(Result::ok)
|
|
||||||
.filter(|server| &**server != services().globals.server_name());
|
|
||||||
|
|
||||||
services().sending.send_pdu(servers, &pdu_id)?;
|
|
||||||
|
|
||||||
Ok(create_join_event::v1::RoomState {
|
Ok(create_join_event::v1::RoomState {
|
||||||
auth_chain: auth_chain_ids
|
auth_chain: auth_chain_ids
|
||||||
|
|
|
@ -411,7 +411,7 @@ impl Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
for push_key in services().pusher.get_pushkeys(user) {
|
for push_key in services().pusher.get_pushkeys(user) {
|
||||||
services().sending.send_push_pdu(&pdu_id, user, push_key?)?;
|
services().sending.send_pdu_push(&pdu_id, user, push_key?)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -958,7 +958,9 @@ impl Service {
|
||||||
// room_servers() and/or the if statement above
|
// room_servers() and/or the if statement above
|
||||||
servers.remove(services().globals.server_name());
|
servers.remove(services().globals.server_name());
|
||||||
|
|
||||||
services().sending.send_pdu(servers.into_iter(), &pdu_id)?;
|
services()
|
||||||
|
.sending
|
||||||
|
.send_pdu_servers(servers.into_iter(), &pdu_id)?;
|
||||||
|
|
||||||
Ok(pdu.event_id)
|
Ok(pdu.event_id)
|
||||||
}
|
}
|
||||||
|
|
|
@ -408,7 +408,7 @@ impl Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self, pdu_id, user, pushkey))]
|
#[tracing::instrument(skip(self, pdu_id, user, pushkey))]
|
||||||
pub fn send_push_pdu(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> {
|
pub fn send_pdu_push(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> {
|
||||||
let outgoing_kind = OutgoingKind::Push(user.to_owned(), pushkey);
|
let outgoing_kind = OutgoingKind::Push(user.to_owned(), pushkey);
|
||||||
let event = SendingEventType::Pdu(pdu_id.to_owned());
|
let event = SendingEventType::Pdu(pdu_id.to_owned());
|
||||||
let _cork = services().globals.db.cork()?;
|
let _cork = services().globals.db.cork()?;
|
||||||
|
@ -420,41 +420,6 @@ impl Service {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self, servers, pdu_id))]
|
|
||||||
pub fn send_pdu<I: Iterator<Item = OwnedServerName>>(&self, servers: I, pdu_id: &[u8]) -> Result<()> {
|
|
||||||
let requests = servers
|
|
||||||
.into_iter()
|
|
||||||
.map(|server| (OutgoingKind::Normal(server), SendingEventType::Pdu(pdu_id.to_owned())))
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
let _cork = services().globals.db.cork()?;
|
|
||||||
let keys = self.db.queue_requests(
|
|
||||||
&requests
|
|
||||||
.iter()
|
|
||||||
.map(|(o, e)| (o, e.clone()))
|
|
||||||
.collect::<Vec<_>>(),
|
|
||||||
)?;
|
|
||||||
for ((outgoing_kind, event), key) in requests.into_iter().zip(keys) {
|
|
||||||
self.sender
|
|
||||||
.send((outgoing_kind.clone(), event, key))
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(skip(self, server, serialized))]
|
|
||||||
pub fn send_reliable_edu(&self, server: &ServerName, serialized: Vec<u8>, id: u64) -> Result<()> {
|
|
||||||
let outgoing_kind = OutgoingKind::Normal(server.to_owned());
|
|
||||||
let event = SendingEventType::Edu(serialized);
|
|
||||||
let _cork = services().globals.db.cork()?;
|
|
||||||
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
|
|
||||||
self.sender
|
|
||||||
.send((outgoing_kind, event, keys.into_iter().next().unwrap()))
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec<u8>) -> Result<()> {
|
pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec<u8>) -> Result<()> {
|
||||||
let outgoing_kind = OutgoingKind::Appservice(appservice_id);
|
let outgoing_kind = OutgoingKind::Appservice(appservice_id);
|
||||||
|
@ -468,25 +433,102 @@ impl Service {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self, room_id))]
|
#[tracing::instrument(skip(self, room_id, pdu_id))]
|
||||||
pub fn flush_room(&self, room_id: &RoomId) -> Result<()> {
|
pub fn send_pdu_room(&self, room_id: &RoomId, pdu_id: &[u8]) -> Result<()> {
|
||||||
let servers: HashSet<OwnedServerName> = services()
|
let servers = services()
|
||||||
.rooms
|
.rooms
|
||||||
.state_cache
|
.state_cache
|
||||||
.room_servers(room_id)
|
.room_servers(room_id)
|
||||||
.filter_map(Result::ok)
|
.filter_map(Result::ok)
|
||||||
.collect();
|
.filter(|server| &**server != services().globals.server_name());
|
||||||
|
|
||||||
self.flush_servers(servers.into_iter())
|
self.send_pdu_servers(servers, pdu_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(self, servers, pdu_id))]
|
||||||
|
pub fn send_pdu_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I, pdu_id: &[u8]) -> Result<()> {
|
||||||
|
let requests = servers
|
||||||
|
.into_iter()
|
||||||
|
.map(|server| (OutgoingKind::Normal(server), SendingEventType::Pdu(pdu_id.to_owned())))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let _cork = services().globals.db.cork()?;
|
||||||
|
let keys = self.db.queue_requests(
|
||||||
|
&requests
|
||||||
|
.iter()
|
||||||
|
.map(|(o, e)| (o, e.clone()))
|
||||||
|
.collect::<Vec<_>>(),
|
||||||
|
)?;
|
||||||
|
for ((outgoing_kind, event), key) in requests.into_iter().zip(keys) {
|
||||||
|
self.sender
|
||||||
|
.send((outgoing_kind.clone(), event, key))
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(self, server, serialized))]
|
||||||
|
pub fn send_edu_server(&self, server: &ServerName, serialized: Vec<u8>) -> Result<()> {
|
||||||
|
let outgoing_kind = OutgoingKind::Normal(server.to_owned());
|
||||||
|
let event = SendingEventType::Edu(serialized);
|
||||||
|
let _cork = services().globals.db.cork()?;
|
||||||
|
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
|
||||||
|
self.sender
|
||||||
|
.send((outgoing_kind, event, keys.into_iter().next().unwrap()))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(self, room_id, serialized))]
|
||||||
|
pub fn send_edu_room(&self, room_id: &RoomId, serialized: Vec<u8>) -> Result<()> {
|
||||||
|
let servers = services()
|
||||||
|
.rooms
|
||||||
|
.state_cache
|
||||||
|
.room_servers(room_id)
|
||||||
|
.filter_map(Result::ok)
|
||||||
|
.filter(|server| &**server != services().globals.server_name());
|
||||||
|
|
||||||
|
self.send_edu_servers(servers, serialized)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(self, servers, serialized))]
|
||||||
|
pub fn send_edu_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I, serialized: Vec<u8>) -> Result<()> {
|
||||||
|
let requests = servers
|
||||||
|
.into_iter()
|
||||||
|
.map(|server| (OutgoingKind::Normal(server), SendingEventType::Edu(serialized.clone())))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
let _cork = services().globals.db.cork()?;
|
||||||
|
let keys = self.db.queue_requests(
|
||||||
|
&requests
|
||||||
|
.iter()
|
||||||
|
.map(|(o, e)| (o, e.clone()))
|
||||||
|
.collect::<Vec<_>>(),
|
||||||
|
)?;
|
||||||
|
for ((outgoing_kind, event), key) in requests.into_iter().zip(keys) {
|
||||||
|
self.sender
|
||||||
|
.send((outgoing_kind.clone(), event, key))
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tracing::instrument(skip(self, room_id))]
|
||||||
|
pub fn flush_room(&self, room_id: &RoomId) -> Result<()> {
|
||||||
|
let servers = services()
|
||||||
|
.rooms
|
||||||
|
.state_cache
|
||||||
|
.room_servers(room_id)
|
||||||
|
.filter_map(Result::ok)
|
||||||
|
.filter(|server| &**server != services().globals.server_name());
|
||||||
|
|
||||||
|
self.flush_servers(servers)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self, servers))]
|
#[tracing::instrument(skip(self, servers))]
|
||||||
pub fn flush_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I) -> Result<()> {
|
pub fn flush_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I) -> Result<()> {
|
||||||
let requests = servers
|
let requests = servers.into_iter().map(OutgoingKind::Normal);
|
||||||
.into_iter()
|
|
||||||
.filter(|server| server != services().globals.server_name())
|
|
||||||
.map(OutgoingKind::Normal)
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
for outgoing_kind in requests {
|
for outgoing_kind in requests {
|
||||||
self.sender
|
self.sender
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue