Add general rules matching for pusher, calc unread msgs

This commit is contained in:
Devin Ragotzy 2021-01-29 10:14:09 -05:00
parent 2d69e81699
commit 73124629b7
5 changed files with 469 additions and 97 deletions

View file

@ -13,16 +13,11 @@ use rocket::futures::stream::{FuturesUnordered, StreamExt};
use ruma::{
api::{appservice, federation, OutgoingRequest},
events::{push_rules, EventType},
ServerName,
uint, ServerName, UInt,
};
use sled::IVec;
use tokio::{select, sync::Semaphore};
use super::{
account_data::AccountData, appservice::Appservice, globals::Globals, pusher::PushData,
rooms::Rooms,
};
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum OutgoingKind {
Appservice(Box<ServerName>),
@ -52,11 +47,7 @@ impl Sending {
pub fn start_handler(&self, db: &Database) {
let servernamepduids = self.servernamepduids.clone();
let servercurrentpdus = self.servercurrentpdus.clone();
let rooms = db.rooms.clone();
let globals = db.globals.clone();
let appservice = db.appservice.clone();
let pusher = db.pusher.clone();
let account_data = db.account_data.clone();
let db = db.clone();
tokio::spawn(async move {
let mut futures = FuturesUnordered::new();
@ -79,15 +70,7 @@ impl Sending {
}
for (outgoing_kind, pdus) in current_transactions {
futures.push(Self::handle_event(
outgoing_kind,
pdus,
&rooms,
&globals,
&appservice,
&pusher,
&account_data,
));
futures.push(Self::handle_event(outgoing_kind, pdus, &db));
}
let mut last_failed_try: HashMap<OutgoingKind, (u32, Instant)> = HashMap::new();
@ -151,11 +134,7 @@ impl Sending {
Self::handle_event(
outgoing_kind.clone(),
new_pdus,
&rooms,
&globals,
&appservice,
&pusher,
&account_data
&db,
)
);
} else {
@ -275,11 +254,7 @@ impl Sending {
Self::handle_event(
outgoing_kind,
vec![pdu_id.into()],
&rooms,
&globals,
&appservice,
&pusher,
&account_data
&db,
)
);
}
@ -325,14 +300,11 @@ impl Sending {
Ok(())
}
// TODO this is the whole DB but is it better to clone smaller parts than the whole thing??
async fn handle_event(
kind: OutgoingKind,
pdu_ids: Vec<IVec>,
rooms: &Rooms,
globals: &Globals,
appservice: &Appservice,
pusher: &PushData,
account_data: &AccountData,
db: &Database,
) -> std::result::Result<OutgoingKind, (OutgoingKind, Error)> {
match kind {
OutgoingKind::Appservice(server) => {
@ -340,7 +312,7 @@ impl Sending {
.iter()
.map(|pdu_id| {
Ok::<_, (Box<ServerName>, Error)>(
rooms
db.rooms
.get_pdu_from_id(pdu_id)
.map_err(|e| (server.clone(), e))?
.ok_or_else(|| {
@ -357,8 +329,8 @@ impl Sending {
.filter_map(|r| r.ok())
.collect::<Vec<_>>();
appservice_server::send_request(
&globals,
appservice
&db.globals,
db.appservice
.get_registration(server.as_str())
.unwrap()
.unwrap(), // TODO: handle error
@ -376,7 +348,7 @@ impl Sending {
.iter()
.map(|pdu_id| {
Ok::<_, (Vec<u8>, Error)>(
rooms
db.rooms
.get_pdu_from_id(pdu_id)
.map_err(|e| (id.clone(), e))?
.ok_or_else(|| {
@ -391,36 +363,67 @@ impl Sending {
})
.filter_map(|r| r.ok())
.collect::<Vec<_>>();
dbg!(&pdus);
for pdu in &pdus {
for user in rooms.room_members(&pdu.room_id) {
// Redacted events are not notification targets (we don't send push for them)
if pdu.unsigned.get("redacted_because").is_some() {
continue;
}
for user in db.rooms.room_members(&pdu.room_id) {
dbg!(&user);
let user = user.map_err(|e| (OutgoingKind::Push(id.clone()), e))?;
for pusher in pusher
let pushers = db
.pusher
.get_pusher(&user)
.map_err(|e| (OutgoingKind::Push(id.clone()), e))?;
let rules_for_user = db
.account_data
.get::<push_rules::PushRulesEvent>(None, &user, EventType::PushRules)
.map_err(|e| (OutgoingKind::Push(id.clone()), e))?
.map(|ev| ev.content.global)
.unwrap_or_else(|| crate::push_rules::default_pushrules(&user));
let unread: UInt = if let Some(last_read) = db
.rooms
.edus
.private_read_get(&pdu.room_id, &user)
.map_err(|e| (OutgoingKind::Push(id.clone()), e))?
{
let rules_for_user = account_data
.get::<push_rules::PushRulesEvent>(
None,
&user,
EventType::PushRules,
)
(db.rooms
.pdus_since(&user, &pdu.room_id, last_read)
.map_err(|e| (OutgoingKind::Push(id.clone()), e))?
.map(|ev| ev.content.global)
.unwrap_or_else(|| crate::push_rules::default_pushrules(&user));
dbg!(&pusher);
dbg!(&rules_for_user);
.filter_map(|pdu| pdu.ok()) // Filter out buggy events
.filter(|(_, pdu)| {
matches!(
pdu.kind.clone(),
EventType::RoomMessage | EventType::RoomEncrypted
)
})
.count() as u32)
.into()
} else {
// Just return zero unread messages
uint!(0)
};
crate::database::pusher::send_push_notice(
&user,
&pusher,
rules_for_user,
pdu,
)
.await
.map_err(|e| (OutgoingKind::Push(id.clone()), e))?;
}
dbg!(&pushers);
// dbg!(&rules_for_user);
crate::database::pusher::send_push_notice(
&user,
unread,
&pushers,
rules_for_user,
pdu,
db,
)
.await
.map_err(|e| (OutgoingKind::Push(id.clone()), e))?;
}
}
@ -434,7 +437,7 @@ impl Sending {
// TODO: check room version and remove event_id if needed
serde_json::from_str(
PduEvent::convert_to_outgoing_federation_event(
rooms
db.rooms
.get_pdu_json_from_id(pdu_id)
.map_err(|e| (OutgoingKind::Normal(server.clone()), e))?
.ok_or_else(|| {
@ -456,10 +459,10 @@ impl Sending {
.collect::<Vec<_>>();
server_server::send_request(
&globals,
&db.globals,
&*server,
send_transaction_message::v1::Request {
origin: globals.server_name(),
origin: db.globals.server_name(),
pdus: &pdu_jsons,
edus: &[],
origin_server_ts: SystemTime::now(),