Revert "Send read reciept and typing indicator EDUs to appservices with receive_ephemeral"

This reverts commit 3675c941f8.
This commit is contained in:
strawberry 2024-12-18 11:26:18 -05:00
parent 9040ad054e
commit f54a62dda0
8 changed files with 46 additions and 135 deletions

26
Cargo.lock generated
View file

@ -3162,7 +3162,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma" name = "ruma"
version = "0.10.1" version = "0.10.1"
source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
dependencies = [ dependencies = [
"assign", "assign",
"js_int", "js_int",
@ -3184,7 +3184,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-appservice-api" name = "ruma-appservice-api"
version = "0.10.0" version = "0.10.0"
source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
dependencies = [ dependencies = [
"js_int", "js_int",
"ruma-common", "ruma-common",
@ -3196,7 +3196,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-client-api" name = "ruma-client-api"
version = "0.18.0" version = "0.18.0"
source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
dependencies = [ dependencies = [
"as_variant", "as_variant",
"assign", "assign",
@ -3219,7 +3219,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-common" name = "ruma-common"
version = "0.13.0" version = "0.13.0"
source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
dependencies = [ dependencies = [
"as_variant", "as_variant",
"base64 0.22.1", "base64 0.22.1",
@ -3249,7 +3249,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-events" name = "ruma-events"
version = "0.28.1" version = "0.28.1"
source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
dependencies = [ dependencies = [
"as_variant", "as_variant",
"indexmap 2.7.0", "indexmap 2.7.0",
@ -3273,7 +3273,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-federation-api" name = "ruma-federation-api"
version = "0.9.0" version = "0.9.0"
source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
dependencies = [ dependencies = [
"bytes", "bytes",
"http", "http",
@ -3291,7 +3291,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-identifiers-validation" name = "ruma-identifiers-validation"
version = "0.9.5" version = "0.9.5"
source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
dependencies = [ dependencies = [
"js_int", "js_int",
"thiserror 2.0.7", "thiserror 2.0.7",
@ -3300,7 +3300,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-identity-service-api" name = "ruma-identity-service-api"
version = "0.9.0" version = "0.9.0"
source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
dependencies = [ dependencies = [
"js_int", "js_int",
"ruma-common", "ruma-common",
@ -3310,7 +3310,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-macros" name = "ruma-macros"
version = "0.13.0" version = "0.13.0"
source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"proc-macro-crate", "proc-macro-crate",
@ -3325,7 +3325,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-push-gateway-api" name = "ruma-push-gateway-api"
version = "0.9.0" version = "0.9.0"
source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
dependencies = [ dependencies = [
"js_int", "js_int",
"ruma-common", "ruma-common",
@ -3337,7 +3337,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-server-util" name = "ruma-server-util"
version = "0.3.0" version = "0.3.0"
source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
dependencies = [ dependencies = [
"headers", "headers",
"http", "http",
@ -3350,7 +3350,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-signatures" name = "ruma-signatures"
version = "0.15.0" version = "0.15.0"
source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
dependencies = [ dependencies = [
"base64 0.22.1", "base64 0.22.1",
"ed25519-dalek", "ed25519-dalek",
@ -3366,7 +3366,7 @@ dependencies = [
[[package]] [[package]]
name = "ruma-state-res" name = "ruma-state-res"
version = "0.11.0" version = "0.11.0"
source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f" source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
dependencies = [ dependencies = [
"futures-util", "futures-util",
"js_int", "js_int",

View file

@ -334,7 +334,7 @@ version = "0.1.2"
[workspace.dependencies.ruma] [workspace.dependencies.ruma]
git = "https://github.com/girlbossceo/ruwuma" git = "https://github.com/girlbossceo/ruwuma"
#branch = "conduwuit-changes" #branch = "conduwuit-changes"
rev = "112ccc24cb14de26757715d611285d0806d5d91f" rev = "a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
features = [ features = [
"compat", "compat",
"rand", "rand",
@ -350,6 +350,7 @@ features = [
"compat-upload-signatures", "compat-upload-signatures",
"identifiers-validation", "identifiers-validation",
"unstable-unspecified", "unstable-unspecified",
"unstable-msc2409",
"unstable-msc2448", "unstable-msc2448",
"unstable-msc2666", "unstable-msc2666",
"unstable-msc2867", "unstable-msc2867",
@ -365,7 +366,6 @@ features = [
"unstable-msc4121", "unstable-msc4121",
"unstable-msc4125", "unstable-msc4125",
"unstable-msc4186", "unstable-msc4186",
"unstable-msc4203", # sending to-device events to appservices
"unstable-msc4210", # remove legacy mentions "unstable-msc4210", # remove legacy mentions
"unstable-extensible-events", "unstable-extensible-events",
] ]

View file

@ -72,10 +72,14 @@ pub(crate) async fn set_read_marker_route(
services services
.rooms .rooms
.read_receipt .read_receipt
.readreceipt_update(sender_user, &body.room_id, ruma::events::receipt::ReceiptEvent { .readreceipt_update(
sender_user,
&body.room_id,
&ruma::events::receipt::ReceiptEvent {
content: ruma::events::receipt::ReceiptEventContent(receipt_content), content: ruma::events::receipt::ReceiptEventContent(receipt_content),
room_id: body.room_id.clone(), room_id: body.room_id.clone(),
}) },
)
.await; .await;
} }
@ -167,7 +171,7 @@ pub(crate) async fn create_receipt_route(
.readreceipt_update( .readreceipt_update(
sender_user, sender_user,
&body.room_id, &body.room_id,
ruma::events::receipt::ReceiptEvent { &ruma::events::receipt::ReceiptEvent {
content: ruma::events::receipt::ReceiptEventContent(receipt_content), content: ruma::events::receipt::ReceiptEventContent(receipt_content),
room_id: body.room_id.clone(), room_id: body.room_id.clone(),
}, },

View file

@ -275,7 +275,7 @@ async fn handle_edu_receipt(
services services
.rooms .rooms
.read_receipt .read_receipt
.readreceipt_update(&user_id, &room_id, event) .readreceipt_update(&user_id, &room_id, &event)
.await; .await;
} }
} else { } else {

View file

@ -2,10 +2,9 @@ mod data;
use std::{collections::BTreeMap, sync::Arc}; use std::{collections::BTreeMap, sync::Arc};
use conduwuit::{debug, err, result::LogErr, warn, PduCount, PduId, RawPduId, Result}; use conduwuit::{debug, err, warn, PduCount, PduId, RawPduId, Result};
use futures::{try_join, Stream, TryFutureExt}; use futures::{try_join, Stream, TryFutureExt};
use ruma::{ use ruma::{
api::appservice::event::push_events::v1::EphemeralData,
events::{ events::{
receipt::{ReceiptEvent, ReceiptEventContent, Receipts}, receipt::{ReceiptEvent, ReceiptEventContent, Receipts},
AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent, AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent,
@ -49,25 +48,14 @@ impl Service {
&self, &self,
user_id: &UserId, user_id: &UserId,
room_id: &RoomId, room_id: &RoomId,
event: ReceiptEvent, event: &ReceiptEvent,
) { ) {
self.db.readreceipt_update(user_id, room_id, &event).await; self.db.readreceipt_update(user_id, room_id, event).await;
self.services self.services
.sending .sending
.flush_room(room_id) .flush_room(room_id)
.await .await
.expect("room flush failed"); .expect("room flush failed");
// update appservices
let edu = EphemeralData::Receipt(event);
let _ = self
.services
.sending
.send_edu_appservice_room(
room_id,
serde_json::to_vec(&edu).expect("Serialized EphemeralData::Receipt"),
)
.await
.log_err();
} }
/// Gets the latest private read receipt from the user in the room /// Gets the latest private read receipt from the user in the room

View file

@ -7,11 +7,8 @@ use conduwuit::{
}; };
use futures::StreamExt; use futures::StreamExt;
use ruma::{ use ruma::{
api::{ api::federation::transactions::edu::{Edu, TypingContent},
appservice::event::push_events::v1::EphemeralData, events::SyncEphemeralRoomEvent,
federation::transactions::edu::{Edu, TypingContent},
},
events::{typing::TypingEventContent, EphemeralRoomEvent, SyncEphemeralRoomEvent},
OwnedRoomId, OwnedUserId, RoomId, UserId, OwnedRoomId, OwnedUserId, RoomId, UserId,
}; };
use tokio::sync::{broadcast, RwLock}; use tokio::sync::{broadcast, RwLock};
@ -79,9 +76,6 @@ impl Service {
trace!("receiver found what it was looking for and is no longer interested"); trace!("receiver found what it was looking for and is no longer interested");
} }
// update appservices
self.appservice_send(room_id).await?;
// update federation // update federation
if self.services.globals.user_is_local(user_id) { if self.services.globals.user_is_local(user_id) {
self.federation_send(room_id, user_id, true).await?; self.federation_send(room_id, user_id, true).await?;
@ -109,8 +103,7 @@ impl Service {
if self.typing_update_sender.send(room_id.to_owned()).is_err() { if self.typing_update_sender.send(room_id.to_owned()).is_err() {
trace!("receiver found what it was looking for and is no longer interested"); trace!("receiver found what it was looking for and is no longer interested");
} }
// update appservices
self.appservice_send(room_id).await?;
// update federation // update federation
if self.services.globals.user_is_local(user_id) { if self.services.globals.user_is_local(user_id) {
self.federation_send(room_id, user_id, false).await?; self.federation_send(room_id, user_id, false).await?;
@ -164,9 +157,6 @@ impl Service {
trace!("receiver found what it was looking for and is no longer interested"); trace!("receiver found what it was looking for and is no longer interested");
} }
// update appservices
self.appservice_send(room_id).await?;
// update federation // update federation
for user in &removable { for user in &removable {
if self.services.globals.user_is_local(user) { if self.services.globals.user_is_local(user) {
@ -190,30 +180,17 @@ impl Service {
.unwrap_or(0)) .unwrap_or(0))
} }
/// Returns a new typing EDU.
pub async fn typings_content(&self, room_id: &RoomId) -> Result<TypingEventContent> {
let room_typing_indicators = self.typing.read().await.get(room_id).cloned();
let Some(typing_indicators) = room_typing_indicators else {
return Ok(TypingEventContent { user_ids: Vec::new() });
};
let user_ids: Vec<_> = typing_indicators.into_keys().collect();
Ok(TypingEventContent { user_ids })
}
/// Returns a new typing EDU. /// Returns a new typing EDU.
pub async fn typings_all( pub async fn typings_all(
&self, &self,
room_id: &RoomId, room_id: &RoomId,
sender_user: &UserId, sender_user: &UserId,
) -> Result<SyncEphemeralRoomEvent<TypingEventContent>> { ) -> Result<SyncEphemeralRoomEvent<ruma::events::typing::TypingEventContent>> {
let room_typing_indicators = self.typing.read().await.get(room_id).cloned(); let room_typing_indicators = self.typing.read().await.get(room_id).cloned();
let Some(typing_indicators) = room_typing_indicators else { let Some(typing_indicators) = room_typing_indicators else {
return Ok(SyncEphemeralRoomEvent { return Ok(SyncEphemeralRoomEvent {
content: TypingEventContent { user_ids: Vec::new() }, content: ruma::events::typing::TypingEventContent { user_ids: Vec::new() },
}); });
}; };
@ -231,7 +208,9 @@ impl Service {
.collect() .collect()
.await; .await;
Ok(SyncEphemeralRoomEvent { content: TypingEventContent { user_ids } }) Ok(SyncEphemeralRoomEvent {
content: ruma::events::typing::TypingEventContent { user_ids },
})
} }
async fn federation_send( async fn federation_send(
@ -258,21 +237,4 @@ impl Service {
Ok(()) Ok(())
} }
async fn appservice_send(&self, room_id: &RoomId) -> Result<()> {
let edu = EphemeralData::Typing(EphemeralRoomEvent {
content: self.typings_content(room_id).await?,
room_id: room_id.into(),
});
self.services
.sending
.send_edu_appservice_room(
room_id,
serde_json::to_vec(&edu).expect("Serialized EphemeralData::Typing"),
)
.await?;
Ok(())
}
} }

View file

@ -25,10 +25,7 @@ pub use self::{
sender::{EDU_LIMIT, PDU_LIMIT}, sender::{EDU_LIMIT, PDU_LIMIT},
}; };
use crate::{ use crate::{
account_data, account_data, client, globals, presence, pusher, resolver, rooms, rooms::timeline::RawPduId,
appservice::NamespaceRegex,
client, globals, presence, pusher, resolver,
rooms::{self, timeline::RawPduId},
server_keys, users, Dep, server_keys, users, Dep,
}; };
@ -41,7 +38,6 @@ pub struct Service {
} }
struct Services { struct Services {
alias: Dep<rooms::alias::Service>,
client: Dep<client::Service>, client: Dep<client::Service>,
globals: Dep<globals::Service>, globals: Dep<globals::Service>,
resolver: Dep<resolver::Service>, resolver: Dep<resolver::Service>,
@ -80,7 +76,6 @@ impl crate::Service for Service {
Ok(Arc::new(Self { Ok(Arc::new(Self {
server: args.server.clone(), server: args.server.clone(),
services: Services { services: Services {
alias: args.depend::<rooms::alias::Service>("rooms::alias"),
client: args.depend::<client::Service>("client"), client: args.depend::<client::Service>("client"),
globals: args.depend::<globals::Service>("globals"), globals: args.depend::<globals::Service>("globals"),
resolver: args.depend::<resolver::Service>("resolver"), resolver: args.depend::<resolver::Service>("resolver"),
@ -189,47 +184,6 @@ impl Service {
}) })
} }
#[tracing::instrument(skip(self, serialized), level = "debug")]
pub fn send_edu_appservice(&self, appservice_id: String, serialized: Vec<u8>) -> Result {
let dest = Destination::Appservice(appservice_id);
let event = SendingEvent::Edu(serialized);
let _cork = self.db.db.cork();
let keys = self.db.queue_requests(once((&event, &dest)));
self.dispatch(Msg {
dest,
event,
queue_id: keys.into_iter().next().expect("request queue key"),
})
}
#[tracing::instrument(skip(self, room_id, serialized), level = "debug")]
pub async fn send_edu_appservice_room(
&self,
room_id: &RoomId,
serialized: Vec<u8>,
) -> Result<()> {
for appservice in self.services.appservice.read().await.values() {
let matching_aliases = |aliases: NamespaceRegex| {
self.services
.alias
.local_aliases_for_room(room_id)
.ready_any(move |room_alias| aliases.is_match(room_alias.as_str()))
};
if appservice.rooms.is_match(room_id.as_str())
|| matching_aliases(appservice.aliases.clone()).await
|| self
.services
.state_cache
.appservice_in_room(room_id, appservice)
.await
{
self.send_edu_appservice(appservice.registration.id.clone(), serialized.clone())?;
}
}
Ok(())
}
#[tracing::instrument(skip(self, room_id, serialized), level = "debug")] #[tracing::instrument(skip(self, room_id, serialized), level = "debug")]
pub async fn send_edu_room(&self, room_id: &RoomId, serialized: Vec<u8>) -> Result<()> { pub async fn send_edu_room(&self, room_id: &RoomId, serialized: Vec<u8>) -> Result<()> {
let servers = self let servers = self

View file

@ -1,4 +1,3 @@
use core::str;
use std::{ use std::{
collections::{BTreeMap, HashMap, HashSet}, collections::{BTreeMap, HashMap, HashSet},
fmt::Debug, fmt::Debug,
@ -22,7 +21,7 @@ use futures::{
}; };
use ruma::{ use ruma::{
api::{ api::{
appservice::event::push_events::v1::EphemeralData, appservice::event::push_events::v1::Edu as RumaEdu,
federation::transactions::{ federation::transactions::{
edu::{ edu::{
DeviceListUpdateContent, Edu, PresenceContent, PresenceUpdate, ReceiptContent, DeviceListUpdateContent, Edu, PresenceContent, PresenceUpdate, ReceiptContent,
@ -588,7 +587,7 @@ impl Service {
.filter(|event| matches!(event, SendingEvent::Pdu(_))) .filter(|event| matches!(event, SendingEvent::Pdu(_)))
.count(), .count(),
); );
let mut edu_jsons: Vec<EphemeralData> = Vec::with_capacity( let mut edu_jsons: Vec<RumaEdu> = Vec::with_capacity(
events events
.iter() .iter()
.filter(|event| matches!(event, SendingEvent::Edu(_))) .filter(|event| matches!(event, SendingEvent::Edu(_)))
@ -601,11 +600,15 @@ impl Service {
pdu_jsons.push(pdu.to_room_event()); pdu_jsons.push(pdu.to_room_event());
} }
}, },
| SendingEvent::Edu(edu) => | SendingEvent::Edu(edu) => {
if appservice.receive_ephemeral { if appservice
.receive_ephemeral
.is_some_and(|receive_edus| receive_edus)
{
if let Ok(edu) = serde_json::from_slice(edu) { if let Ok(edu) = serde_json::from_slice(edu) {
edu_jsons.push(edu); edu_jsons.push(edu);
} }
}
}, },
| SendingEvent::Flush => {}, // flush only; no new content | SendingEvent::Flush => {}, // flush only; no new content
} }