From 3675c941f8fe4b92b5426f24a667e0b03cabd9da Mon Sep 17 00:00:00 2001
From: Jade Ellis <jade@ellis.link>
Date: Wed, 18 Dec 2024 03:04:39 +0000
Subject: [PATCH] Send read reciept and typing indicator EDUs to appservices
 with receive_ephemeral

---
 Cargo.lock                            | 26 ++++++-------
 Cargo.toml                            |  4 +-
 src/api/client/read_marker.rs         | 14 +++----
 src/api/server/send.rs                |  2 +-
 src/service/rooms/read_receipt/mod.rs | 18 +++++++--
 src/service/rooms/typing/mod.rs       | 54 +++++++++++++++++++++++----
 src/service/sending/mod.rs            | 48 +++++++++++++++++++++++-
 src/service/sending/sender.rs         | 15 +++-----
 8 files changed, 135 insertions(+), 46 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index c86904d7..2c0ae75c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3162,7 +3162,7 @@ dependencies = [
 [[package]]
 name = "ruma"
 version = "0.10.1"
-source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
+source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f"
 dependencies = [
  "assign",
  "js_int",
@@ -3184,7 +3184,7 @@ dependencies = [
 [[package]]
 name = "ruma-appservice-api"
 version = "0.10.0"
-source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
+source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f"
 dependencies = [
  "js_int",
  "ruma-common",
@@ -3196,7 +3196,7 @@ dependencies = [
 [[package]]
 name = "ruma-client-api"
 version = "0.18.0"
-source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
+source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f"
 dependencies = [
  "as_variant",
  "assign",
@@ -3219,7 +3219,7 @@ dependencies = [
 [[package]]
 name = "ruma-common"
 version = "0.13.0"
-source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
+source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f"
 dependencies = [
  "as_variant",
  "base64 0.22.1",
@@ -3249,7 +3249,7 @@ dependencies = [
 [[package]]
 name = "ruma-events"
 version = "0.28.1"
-source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
+source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f"
 dependencies = [
  "as_variant",
  "indexmap 2.7.0",
@@ -3273,7 +3273,7 @@ dependencies = [
 [[package]]
 name = "ruma-federation-api"
 version = "0.9.0"
-source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
+source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f"
 dependencies = [
  "bytes",
  "http",
@@ -3291,7 +3291,7 @@ dependencies = [
 [[package]]
 name = "ruma-identifiers-validation"
 version = "0.9.5"
-source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
+source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f"
 dependencies = [
  "js_int",
  "thiserror 2.0.7",
@@ -3300,7 +3300,7 @@ dependencies = [
 [[package]]
 name = "ruma-identity-service-api"
 version = "0.9.0"
-source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
+source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f"
 dependencies = [
  "js_int",
  "ruma-common",
@@ -3310,7 +3310,7 @@ dependencies = [
 [[package]]
 name = "ruma-macros"
 version = "0.13.0"
-source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
+source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f"
 dependencies = [
  "cfg-if",
  "proc-macro-crate",
@@ -3325,7 +3325,7 @@ dependencies = [
 [[package]]
 name = "ruma-push-gateway-api"
 version = "0.9.0"
-source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
+source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f"
 dependencies = [
  "js_int",
  "ruma-common",
@@ -3337,7 +3337,7 @@ dependencies = [
 [[package]]
 name = "ruma-server-util"
 version = "0.3.0"
-source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
+source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f"
 dependencies = [
  "headers",
  "http",
@@ -3350,7 +3350,7 @@ dependencies = [
 [[package]]
 name = "ruma-signatures"
 version = "0.15.0"
-source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
+source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f"
 dependencies = [
  "base64 0.22.1",
  "ed25519-dalek",
@@ -3366,7 +3366,7 @@ dependencies = [
 [[package]]
 name = "ruma-state-res"
 version = "0.11.0"
-source = "git+https://github.com/girlbossceo/ruwuma?rev=a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2#a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
+source = "git+https://github.com/girlbossceo/ruwuma?rev=112ccc24cb14de26757715d611285d0806d5d91f#112ccc24cb14de26757715d611285d0806d5d91f"
 dependencies = [
  "futures-util",
  "js_int",
diff --git a/Cargo.toml b/Cargo.toml
index cb2ab916..38d6d729 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -334,7 +334,7 @@ version = "0.1.2"
 [workspace.dependencies.ruma]
 git = "https://github.com/girlbossceo/ruwuma"
 #branch = "conduwuit-changes"
-rev = "a204cb56dbc20f72a1cbd0e9d6c827bbfd4082f2"
+rev = "112ccc24cb14de26757715d611285d0806d5d91f"
 features = [
     "compat",
     "rand",
@@ -350,7 +350,6 @@ features = [
     "compat-upload-signatures",
     "identifiers-validation",
     "unstable-unspecified",
-    "unstable-msc2409",
     "unstable-msc2448",
     "unstable-msc2666",
     "unstable-msc2867",
@@ -366,6 +365,7 @@ features = [
     "unstable-msc4121",
     "unstable-msc4125",
     "unstable-msc4186",
+    "unstable-msc4203", # sending to-device events to appservices 
     "unstable-msc4210", # remove legacy mentions
     "unstable-extensible-events",
 ]
diff --git a/src/api/client/read_marker.rs b/src/api/client/read_marker.rs
index 89fe003a..ab7cc6ad 100644
--- a/src/api/client/read_marker.rs
+++ b/src/api/client/read_marker.rs
@@ -72,14 +72,10 @@ pub(crate) async fn set_read_marker_route(
 		services
 			.rooms
 			.read_receipt
-			.readreceipt_update(
-				sender_user,
-				&body.room_id,
-				&ruma::events::receipt::ReceiptEvent {
-					content: ruma::events::receipt::ReceiptEventContent(receipt_content),
-					room_id: body.room_id.clone(),
-				},
-			)
+			.readreceipt_update(sender_user, &body.room_id, ruma::events::receipt::ReceiptEvent {
+				content: ruma::events::receipt::ReceiptEventContent(receipt_content),
+				room_id: body.room_id.clone(),
+			})
 			.await;
 	}
 
@@ -171,7 +167,7 @@ pub(crate) async fn create_receipt_route(
 				.readreceipt_update(
 					sender_user,
 					&body.room_id,
-					&ruma::events::receipt::ReceiptEvent {
+					ruma::events::receipt::ReceiptEvent {
 						content: ruma::events::receipt::ReceiptEventContent(receipt_content),
 						room_id: body.room_id.clone(),
 					},
diff --git a/src/api/server/send.rs b/src/api/server/send.rs
index c5fc7118..db6fd748 100644
--- a/src/api/server/send.rs
+++ b/src/api/server/send.rs
@@ -275,7 +275,7 @@ async fn handle_edu_receipt(
 					services
 						.rooms
 						.read_receipt
-						.readreceipt_update(&user_id, &room_id, &event)
+						.readreceipt_update(&user_id, &room_id, event)
 						.await;
 				}
 			} else {
diff --git a/src/service/rooms/read_receipt/mod.rs b/src/service/rooms/read_receipt/mod.rs
index 53e64957..4075c447 100644
--- a/src/service/rooms/read_receipt/mod.rs
+++ b/src/service/rooms/read_receipt/mod.rs
@@ -2,9 +2,10 @@ mod data;
 
 use std::{collections::BTreeMap, sync::Arc};
 
-use conduwuit::{debug, err, warn, PduCount, PduId, RawPduId, Result};
+use conduwuit::{debug, err, result::LogErr, warn, PduCount, PduId, RawPduId, Result};
 use futures::{try_join, Stream, TryFutureExt};
 use ruma::{
+	api::appservice::event::push_events::v1::EphemeralData,
 	events::{
 		receipt::{ReceiptEvent, ReceiptEventContent, Receipts},
 		AnySyncEphemeralRoomEvent, SyncEphemeralRoomEvent,
@@ -48,14 +49,25 @@ impl Service {
 		&self,
 		user_id: &UserId,
 		room_id: &RoomId,
-		event: &ReceiptEvent,
+		event: ReceiptEvent,
 	) {
-		self.db.readreceipt_update(user_id, room_id, event).await;
+		self.db.readreceipt_update(user_id, room_id, &event).await;
 		self.services
 			.sending
 			.flush_room(room_id)
 			.await
 			.expect("room flush failed");
+		// update appservices
+		let edu = EphemeralData::Receipt(event);
+		let _ = self
+			.services
+			.sending
+			.send_edu_appservice_room(
+				room_id,
+				serde_json::to_vec(&edu).expect("Serialized EphemeralData::Receipt"),
+			)
+			.await
+			.log_err();
 	}
 
 	/// Gets the latest private read receipt from the user in the room
diff --git a/src/service/rooms/typing/mod.rs b/src/service/rooms/typing/mod.rs
index a6123322..31ea40ae 100644
--- a/src/service/rooms/typing/mod.rs
+++ b/src/service/rooms/typing/mod.rs
@@ -7,8 +7,11 @@ use conduwuit::{
 };
 use futures::StreamExt;
 use ruma::{
-	api::federation::transactions::edu::{Edu, TypingContent},
-	events::SyncEphemeralRoomEvent,
+	api::{
+		appservice::event::push_events::v1::EphemeralData,
+		federation::transactions::edu::{Edu, TypingContent},
+	},
+	events::{typing::TypingEventContent, EphemeralRoomEvent, SyncEphemeralRoomEvent},
 	OwnedRoomId, OwnedUserId, RoomId, UserId,
 };
 use tokio::sync::{broadcast, RwLock};
@@ -76,6 +79,9 @@ impl Service {
 			trace!("receiver found what it was looking for and is no longer interested");
 		}
 
+		// update appservices
+		self.appservice_send(room_id).await?;
+
 		// update federation
 		if self.services.globals.user_is_local(user_id) {
 			self.federation_send(room_id, user_id, true).await?;
@@ -103,7 +109,8 @@ impl Service {
 		if self.typing_update_sender.send(room_id.to_owned()).is_err() {
 			trace!("receiver found what it was looking for and is no longer interested");
 		}
-
+		// update appservices
+		self.appservice_send(room_id).await?;
 		// update federation
 		if self.services.globals.user_is_local(user_id) {
 			self.federation_send(room_id, user_id, false).await?;
@@ -157,6 +164,9 @@ impl Service {
 				trace!("receiver found what it was looking for and is no longer interested");
 			}
 
+			// update appservices
+			self.appservice_send(room_id).await?;
+
 			// update federation
 			for user in &removable {
 				if self.services.globals.user_is_local(user) {
@@ -180,17 +190,30 @@ impl Service {
 			.unwrap_or(0))
 	}
 
+	/// Returns a new typing EDU.
+	pub async fn typings_content(&self, room_id: &RoomId) -> Result<TypingEventContent> {
+		let room_typing_indicators = self.typing.read().await.get(room_id).cloned();
+
+		let Some(typing_indicators) = room_typing_indicators else {
+			return Ok(TypingEventContent { user_ids: Vec::new() });
+		};
+
+		let user_ids: Vec<_> = typing_indicators.into_keys().collect();
+
+		Ok(TypingEventContent { user_ids })
+	}
+
 	/// Returns a new typing EDU.
 	pub async fn typings_all(
 		&self,
 		room_id: &RoomId,
 		sender_user: &UserId,
-	) -> Result<SyncEphemeralRoomEvent<ruma::events::typing::TypingEventContent>> {
+	) -> Result<SyncEphemeralRoomEvent<TypingEventContent>> {
 		let room_typing_indicators = self.typing.read().await.get(room_id).cloned();
 
 		let Some(typing_indicators) = room_typing_indicators else {
 			return Ok(SyncEphemeralRoomEvent {
-				content: ruma::events::typing::TypingEventContent { user_ids: Vec::new() },
+				content: TypingEventContent { user_ids: Vec::new() },
 			});
 		};
 
@@ -208,9 +231,7 @@ impl Service {
 			.collect()
 			.await;
 
-		Ok(SyncEphemeralRoomEvent {
-			content: ruma::events::typing::TypingEventContent { user_ids },
-		})
+		Ok(SyncEphemeralRoomEvent { content: TypingEventContent { user_ids } })
 	}
 
 	async fn federation_send(
@@ -237,4 +258,21 @@ impl Service {
 
 		Ok(())
 	}
+
+	async fn appservice_send(&self, room_id: &RoomId) -> Result<()> {
+		let edu = EphemeralData::Typing(EphemeralRoomEvent {
+			content: self.typings_content(room_id).await?,
+			room_id: room_id.into(),
+		});
+
+		self.services
+			.sending
+			.send_edu_appservice_room(
+				room_id,
+				serde_json::to_vec(&edu).expect("Serialized EphemeralData::Typing"),
+			)
+			.await?;
+
+		Ok(())
+	}
 }
diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs
index 2038f4eb..2b571034 100644
--- a/src/service/sending/mod.rs
+++ b/src/service/sending/mod.rs
@@ -25,7 +25,10 @@ pub use self::{
 	sender::{EDU_LIMIT, PDU_LIMIT},
 };
 use crate::{
-	account_data, client, globals, presence, pusher, resolver, rooms, rooms::timeline::RawPduId,
+	account_data,
+	appservice::NamespaceRegex,
+	client, globals, presence, pusher, resolver,
+	rooms::{self, timeline::RawPduId},
 	server_keys, users, Dep,
 };
 
@@ -38,6 +41,7 @@ pub struct Service {
 }
 
 struct Services {
+	alias: Dep<rooms::alias::Service>,
 	client: Dep<client::Service>,
 	globals: Dep<globals::Service>,
 	resolver: Dep<resolver::Service>,
@@ -76,6 +80,7 @@ impl crate::Service for Service {
 		Ok(Arc::new(Self {
 			server: args.server.clone(),
 			services: Services {
+				alias: args.depend::<rooms::alias::Service>("rooms::alias"),
 				client: args.depend::<client::Service>("client"),
 				globals: args.depend::<globals::Service>("globals"),
 				resolver: args.depend::<resolver::Service>("resolver"),
@@ -184,6 +189,47 @@ impl Service {
 		})
 	}
 
+	#[tracing::instrument(skip(self, serialized), level = "debug")]
+	pub fn send_edu_appservice(&self, appservice_id: String, serialized: Vec<u8>) -> Result {
+		let dest = Destination::Appservice(appservice_id);
+		let event = SendingEvent::Edu(serialized);
+		let _cork = self.db.db.cork();
+		let keys = self.db.queue_requests(once((&event, &dest)));
+		self.dispatch(Msg {
+			dest,
+			event,
+			queue_id: keys.into_iter().next().expect("request queue key"),
+		})
+	}
+
+	#[tracing::instrument(skip(self, room_id, serialized), level = "debug")]
+	pub async fn send_edu_appservice_room(
+		&self,
+		room_id: &RoomId,
+		serialized: Vec<u8>,
+	) -> Result<()> {
+		for appservice in self.services.appservice.read().await.values() {
+			let matching_aliases = |aliases: NamespaceRegex| {
+				self.services
+					.alias
+					.local_aliases_for_room(room_id)
+					.ready_any(move |room_alias| aliases.is_match(room_alias.as_str()))
+			};
+
+			if appservice.rooms.is_match(room_id.as_str())
+				|| matching_aliases(appservice.aliases.clone()).await
+				|| self
+					.services
+					.state_cache
+					.appservice_in_room(room_id, appservice)
+					.await
+			{
+				self.send_edu_appservice(appservice.registration.id.clone(), serialized.clone())?;
+			}
+		}
+		Ok(())
+	}
+
 	#[tracing::instrument(skip(self, room_id, serialized), level = "debug")]
 	pub async fn send_edu_room(&self, room_id: &RoomId, serialized: Vec<u8>) -> Result<()> {
 		let servers = self
diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs
index 1f462f39..1589101b 100644
--- a/src/service/sending/sender.rs
+++ b/src/service/sending/sender.rs
@@ -1,3 +1,4 @@
+use core::str;
 use std::{
 	collections::{BTreeMap, HashMap, HashSet},
 	fmt::Debug,
@@ -21,7 +22,7 @@ use futures::{
 };
 use ruma::{
 	api::{
-		appservice::event::push_events::v1::Edu as RumaEdu,
+		appservice::event::push_events::v1::EphemeralData,
 		federation::transactions::{
 			edu::{
 				DeviceListUpdateContent, Edu, PresenceContent, PresenceUpdate, ReceiptContent,
@@ -587,7 +588,7 @@ impl Service {
 				.filter(|event| matches!(event, SendingEvent::Pdu(_)))
 				.count(),
 		);
-		let mut edu_jsons: Vec<RumaEdu> = Vec::with_capacity(
+		let mut edu_jsons: Vec<EphemeralData> = Vec::with_capacity(
 			events
 				.iter()
 				.filter(|event| matches!(event, SendingEvent::Edu(_)))
@@ -600,16 +601,12 @@ impl Service {
 						pdu_jsons.push(pdu.to_room_event());
 					}
 				},
-				| SendingEvent::Edu(edu) => {
-					if appservice
-						.receive_ephemeral
-						.is_some_and(|receive_edus| receive_edus)
-					{
+				| SendingEvent::Edu(edu) =>
+					if appservice.receive_ephemeral {
 						if let Ok(edu) = serde_json::from_slice(edu) {
 							edu_jsons.push(edu);
 						}
-					}
-				},
+					},
 				| SendingEvent::Flush => {}, // flush only; no new content
 			}
 		}