send EDUs to appservices if in events

to_device is not supported yet

Signed-off-by: strawberry <strawberry@puppygock.gay>
This commit is contained in:
strawberry 2024-09-28 19:38:35 -04:00
parent 8311952629
commit fafe320899

View file

@ -13,10 +13,15 @@ use conduit::{
}; };
use futures::{future::BoxFuture, pin_mut, stream::FuturesUnordered, FutureExt, StreamExt}; use futures::{future::BoxFuture, pin_mut, stream::FuturesUnordered, FutureExt, StreamExt};
use ruma::{ use ruma::{
api::federation::transactions::{ api::{
edu::{DeviceListUpdateContent, Edu, PresenceContent, PresenceUpdate, ReceiptContent, ReceiptData, ReceiptMap}, appservice::event::push_events::v1::Edu as RumaEdu,
federation::transactions::{
edu::{
DeviceListUpdateContent, Edu, PresenceContent, PresenceUpdate, ReceiptContent, ReceiptData, ReceiptMap,
},
send_transaction_message, send_transaction_message,
}, },
},
device_id, device_id,
events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType}, events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType},
push, uint, CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, RoomVersionId, push, uint, CanonicalJsonObject, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, RoomVersionId,
@ -441,7 +446,18 @@ impl Service {
return Err((dest.clone(), err!(Database(warn!(?id, "Missing appservice registration"))))); return Err((dest.clone(), err!(Database(warn!(?id, "Missing appservice registration")))));
}; };
let mut pdu_jsons = Vec::new(); let mut pdu_jsons = Vec::with_capacity(
events
.iter()
.filter(|event| matches!(event, SendingEvent::Pdu(_)))
.count(),
);
let mut edu_jsons: Vec<RumaEdu> = Vec::with_capacity(
events
.iter()
.filter(|event| matches!(event, SendingEvent::Edu(_)))
.count(),
);
for event in &events { for event in &events {
match event { match event {
SendingEvent::Pdu(pdu_id) => { SendingEvent::Pdu(pdu_id) => {
@ -449,10 +465,12 @@ impl Service {
pdu_jsons.push(pdu.to_room_event()); pdu_jsons.push(pdu.to_room_event());
} }
}, },
SendingEvent::Edu(_) | SendingEvent::Flush => { SendingEvent::Edu(edu) => {
// Appservices don't need EDUs (?) and flush only; if let Ok(edu) = serde_json::from_slice(edu) {
// no new content edu_jsons.push(edu);
}
}, },
SendingEvent::Flush => {}, // flush only; no new content
} }
} }
@ -466,7 +484,8 @@ impl Service {
.collect::<Vec<_>>(), .collect::<Vec<_>>(),
)); ));
//debug_assert!(!pdu_jsons.is_empty(), "sending empty transaction"); //debug_assert!(pdu_jsons.len() + edu_jsons.len() > 0, "sending empty
// transaction");
let client = &self.services.client.appservice; let client = &self.services.client.appservice;
match appservice::send_request( match appservice::send_request(
client, client,
@ -474,8 +493,8 @@ impl Service {
ruma::api::appservice::event::push_events::v1::Request { ruma::api::appservice::event::push_events::v1::Request {
events: pdu_jsons, events: pdu_jsons,
txn_id: txn_id.into(), txn_id: txn_id.into(),
ephemeral: Vec::new(), ephemeral: edu_jsons,
to_device: Vec::new(), to_device: Vec::new(), // TODO
}, },
) )
.await .await