From 2f5df4aac97d22f28b43b64272f2e75ca4272c22 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Timo=20K=C3=B6sters?= <timo@koesters.xyz>
Date: Wed, 21 Oct 2020 16:08:54 +0200
Subject: [PATCH 1/4] improvement: more reliable federation sending

---
 src/database.rs         |   3 +-
 src/database/rooms.rs   |   2 +-
 src/database/sending.rs | 163 ++++++++++++++++++++++++++++++++--------
 src/server_server.rs    |   5 +-
 4 files changed, 137 insertions(+), 36 deletions(-)

diff --git a/src/database.rs b/src/database.rs
index 883ef853..3b0bd6fa 100644
--- a/src/database.rs
+++ b/src/database.rs
@@ -149,7 +149,8 @@ impl Database {
                 userdevicetxnid_response: db.open_tree("userdevicetxnid_response")?,
             },
             sending: sending::Sending {
-                serverpduids: db.open_tree("serverpduids")?,
+                servernamepduids: db.open_tree("servernamepduids")?,
+                servercurrentpdus: db.open_tree("servercurrentpdus")?,
             },
             _db: db,
         })
diff --git a/src/database/rooms.rs b/src/database/rooms.rs
index 35c3eac1..1cc20a43 100644
--- a/src/database/rooms.rs
+++ b/src/database/rooms.rs
@@ -367,7 +367,7 @@ impl Rooms {
     }
 
     /// Returns the pdu.
-    pub fn get_pdu_json_from_id(&self, pdu_id: &IVec) -> Result<Option<serde_json::Value>> {
+    pub fn get_pdu_json_from_id(&self, pdu_id: &[u8]) -> Result<Option<serde_json::Value>> {
         self.pduid_pdu.get(pdu_id)?.map_or(Ok(None), |pdu| {
             Ok(Some(
                 serde_json::from_slice(&pdu)
diff --git a/src/database/sending.rs b/src/database/sending.rs
index 24a783b6..33ee5303 100644
--- a/src/database/sending.rs
+++ b/src/database/sending.rs
@@ -1,8 +1,8 @@
-use std::{collections::HashSet, convert::TryFrom, time::SystemTime};
+use std::{collections::HashMap, convert::TryFrom, time::SystemTime};
 
 use crate::{server_server, utils, Error, PduEvent, Result};
 use federation::transactions::send_transaction_message;
-use log::warn;
+use log::debug;
 use rocket::futures::stream::{FuturesUnordered, StreamExt};
 use ruma::{api::federation, ServerName};
 use sled::IVec;
@@ -10,54 +10,145 @@ use tokio::select;
 
 pub struct Sending {
     /// The state for a given state hash.
-    pub(super) serverpduids: sled::Tree, // ServerPduId = ServerName + PduId
+    pub(super) servernamepduids: sled::Tree, // ServernamePduId = ServerName + PduId
+    pub(super) servercurrentpdus: sled::Tree, // ServerCurrentPdus = ServerName + PduId (pduid can be empty for reservation)
 }
 
 impl Sending {
     pub fn start_handler(&self, globals: &super::globals::Globals, rooms: &super::rooms::Rooms) {
-        let serverpduids = self.serverpduids.clone();
+        let servernamepduids = self.servernamepduids.clone();
+        let servercurrentpdus = self.servercurrentpdus.clone();
         let rooms = rooms.clone();
         let globals = globals.clone();
 
         tokio::spawn(async move {
             let mut futures = FuturesUnordered::new();
-            let mut waiting_servers = HashSet::new();
 
-            let mut subscriber = serverpduids.watch_prefix(b"");
+            // Retry requests we could not finish yet
+            let mut current_transactions = HashMap::new();
+
+            for (server, pdu) in servercurrentpdus
+                .iter()
+                .filter_map(|r| r.ok())
+                .map(|(key, _)| {
+                    let mut parts = key.splitn(2, |&b| b == 0xff);
+                    let server = parts.next().expect("splitn always returns one element");
+                    let pdu = parts.next().ok_or_else(|| {
+                        Error::bad_database("Invalid bytes in servercurrentpdus.")
+                    })?;
+
+                    Ok::<_, Error>((
+                        Box::<ServerName>::try_from(utils::string_from_bytes(&server).map_err(
+                            |_| {
+                                Error::bad_database(
+                                    "Invalid server bytes in server_currenttransaction",
+                                )
+                            },
+                        )?)
+                        .map_err(|_| {
+                            Error::bad_database(
+                                "Invalid server string in server_currenttransaction",
+                            )
+                        })?,
+                        IVec::from(pdu),
+                    ))
+                })
+                .filter_map(|r| r.ok())
+            {
+                if !pdu.is_empty() {
+                    current_transactions
+                        .entry(server)
+                        .or_insert_with(Vec::new)
+                        .push(pdu);
+                }
+            }
+
+            for (server, pdus) in current_transactions {
+                futures.push(Self::handle_event(server, pdus, &globals, &rooms));
+            }
+
+            let mut subscriber = servernamepduids.watch_prefix(b"");
             loop {
                 select! {
                     Some(server) = futures.next() => {
-                        warn!("response: {:?}", &server);
-                        warn!("futures left: {}", &futures.len());
+                        debug!("response: {:?}", &server);
                         match server {
                             Ok((server, _response)) => {
-                                waiting_servers.remove(&server)
+                                let mut prefix = server.as_bytes().to_vec();
+                                prefix.push(0xff);
+
+                                for key in servercurrentpdus
+                                    .scan_prefix(&prefix)
+                                    .keys()
+                                    .filter_map(|r| r.ok())
+                                {
+                                    // Don't remove reservation yet
+                                    if prefix.len() != key.len() {
+                                        servercurrentpdus.remove(key).unwrap();
+                                    }
+                                }
+
+                                // Find events that have been added since starting the last request
+                                let new_pdus = servernamepduids
+                                    .scan_prefix(&prefix)
+                                    .keys()
+                                    .filter_map(|r| r.ok())
+                                    .map(|k| {
+                                        k.subslice(prefix.len(), k.len() - prefix.len())
+                                    }).collect::<Vec<_>>();
+
+                                if !new_pdus.is_empty() {
+                                    for pdu_id in &new_pdus {
+                                        let mut current_key = prefix.clone();
+                                        current_key.extend_from_slice(pdu_id);
+                                        servercurrentpdus.insert(&current_key, &[]).unwrap();
+                                        servernamepduids.remove(&current_key).unwrap();
+                                    }
+
+                                    futures.push(Self::handle_event(server, new_pdus, &globals, &rooms));
+                                } else {
+                                    servercurrentpdus.remove(&prefix).unwrap();
+                                }
                             }
-                            Err((server, _e)) => {
-                                waiting_servers.remove(&server)
+                            Err((_server, _e)) => {
+                                // TODO: exponential backoff
                             }
                         };
                     },
                     Some(event) = &mut subscriber => {
                         if let sled::Event::Insert { key, .. } = event {
-                            let serverpduid = key.clone();
-                            let mut parts = serverpduid.splitn(2, |&b| b == 0xff);
+                            let servernamepduid = key.clone();
+                            let mut parts = servernamepduid.splitn(2, |&b| b == 0xff);
 
                             if let Some((server, pdu_id)) = utils::string_from_bytes(
                                     parts
                                         .next()
                                         .expect("splitn will always return 1 or more elements"),
                                 )
-                                .map_err(|_| Error::bad_database("ServerName in serverpduid bytes are invalid."))
+                                .map_err(|_| Error::bad_database("ServerName in servernamepduid bytes are invalid."))
                                 .and_then(|server_str|Box::<ServerName>::try_from(server_str)
-                                    .map_err(|_| Error::bad_database("ServerName in serverpduid is invalid.")))
+                                    .map_err(|_| Error::bad_database("ServerName in servernamepduid is invalid.")))
                                 .ok()
-                                .filter(|server| waiting_servers.insert(server.clone()))
                                 .and_then(|server| parts
-                                .next()
-                                .ok_or_else(|| Error::bad_database("Invalid serverpduid in db.")).ok().map(|pdu_id| (server, pdu_id)))
+                                    .next()
+                                    .ok_or_else(|| Error::bad_database("Invalid servernamepduid in db."))
+                                    .ok()
+                                    .map(|pdu_id| (server, pdu_id))
+                                )
+                                // TODO: exponential backoff
+                                .filter(|(server, _)| {
+                                    let mut prefix = server.to_string().as_bytes().to_vec();
+                                    prefix.push(0xff);
+
+                                    servercurrentpdus
+                                        .compare_and_swap(prefix, Option::<&[u8]>::None, Some(&[])) // Try to reserve
+                                        == Ok(Ok(()))
+                                })
                             {
-                                futures.push(Self::handle_event(server, pdu_id.into(), &globals, &rooms));
+                                servercurrentpdus.insert(&key, &[]).unwrap();
+                                servernamepduids.remove(&key).unwrap();
+
+                                futures.push(Self::handle_event(server, vec![pdu_id.into()], &globals, &rooms));
                             }
                         }
                     }
@@ -70,38 +161,44 @@ impl Sending {
         let mut key = server.as_bytes().to_vec();
         key.push(0xff);
         key.extend_from_slice(pdu_id);
-        self.serverpduids.insert(key, b"")?;
+        self.servernamepduids.insert(key, b"")?;
 
         Ok(())
     }
 
     async fn handle_event(
         server: Box<ServerName>,
-        pdu_id: IVec,
+        pdu_ids: Vec<IVec>,
         globals: &super::globals::Globals,
         rooms: &super::rooms::Rooms,
     ) -> std::result::Result<
         (Box<ServerName>, send_transaction_message::v1::Response),
         (Box<ServerName>, Error),
     > {
-        let pdu_json = PduEvent::convert_to_outgoing_federation_event(
-            rooms
-                .get_pdu_json_from_id(&pdu_id)
-                .map_err(|e| (server.clone(), e))?
-                .ok_or_else(|| {
-                    (
-                        server.clone(),
-                        Error::bad_database("Event in serverpduids not found in db."),
-                    )
-                })?,
-        );
+        let pdu_jsons = pdu_ids
+            .iter()
+            .map(|pdu_id| {
+                Ok::<_, (Box<ServerName>, Error)>(PduEvent::convert_to_outgoing_federation_event(
+                    rooms
+                        .get_pdu_json_from_id(pdu_id)
+                        .map_err(|e| (server.clone(), e))?
+                        .ok_or_else(|| {
+                            (
+                                server.clone(),
+                                Error::bad_database("Event in servernamepduids not found in db."),
+                            )
+                        })?,
+                ))
+            })
+            .filter_map(|r| r.ok())
+            .collect::<Vec<_>>();
 
         server_server::send_request(
             &globals,
             server.clone(),
             send_transaction_message::v1::Request {
                 origin: globals.server_name(),
-                pdus: &[pdu_json],
+                pdus: &pdu_jsons,
                 edus: &[],
                 origin_server_ts: SystemTime::now(),
                 transaction_id: &utils::random_string(16),
diff --git a/src/server_server.rs b/src/server_server.rs
index 184f3339..ccb13994 100644
--- a/src/server_server.rs
+++ b/src/server_server.rs
@@ -186,7 +186,10 @@ where
             let body = reqwest_response
                 .bytes()
                 .await
-                .unwrap()
+                .unwrap_or_else(|e| {
+                    warn!("server error: {}", e);
+                    Vec::new().into()
+                }) // TODO: handle timeout
                 .into_iter()
                 .collect();
 

From 07621969638d3da6b638d161f4feb86a9affb511 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Timo=20K=C3=B6sters?= <timo@koesters.xyz>
Date: Tue, 27 Oct 2020 20:25:43 +0100
Subject: [PATCH 2/4] fix: don't send new events from left rooms

---
 src/client_server/membership.rs |  5 ++-
 src/client_server/profile.rs    |  2 ++
 src/client_server/room.rs       |  4 ++-
 src/client_server/state.rs      | 12 ++++---
 src/client_server/sync.rs       | 64 +++++++++++++++++++++------------
 src/database/rooms.rs           | 40 +++++++++++----------
 6 files changed, 79 insertions(+), 48 deletions(-)

diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs
index 33806013..d79079db 100644
--- a/src/client_server/membership.rs
+++ b/src/client_server/membership.rs
@@ -103,6 +103,7 @@ pub async fn leave_room_route(
                 ErrorKind::BadState,
                 "Cannot leave a room you are not a member of.",
             ))?
+            .1
             .content,
     )
     .expect("from_value::<Raw<..>> can never fail")
@@ -193,6 +194,7 @@ pub async fn kick_user_route(
                 ErrorKind::BadState,
                 "Cannot kick member that's not in the room.",
             ))?
+            .1
             .content,
     )
     .expect("Raw::from_value always works")
@@ -249,7 +251,7 @@ pub async fn ban_user_route(
                 is_direct: None,
                 third_party_invite: None,
             }),
-            |event| {
+            |(_, event)| {
                 let mut event =
                     serde_json::from_value::<Raw<member::MemberEventContent>>(event.content)
                         .expect("Raw::from_value always works")
@@ -301,6 +303,7 @@ pub async fn unban_user_route(
                 ErrorKind::BadState,
                 "Cannot unban a user who is not banned.",
             ))?
+            .1
             .content,
     )
     .expect("from_value::<Raw<..>> can never fail")
diff --git a/src/client_server/profile.rs b/src/client_server/profile.rs
index d754aceb..3fa1da65 100644
--- a/src/client_server/profile.rs
+++ b/src/client_server/profile.rs
@@ -48,6 +48,7 @@ pub async fn set_displayname_route(
                                     "Tried to send displayname update for user not in the room.",
                                 )
                             })?
+                            .1
                             .content
                             .clone(),
                     )
@@ -142,6 +143,7 @@ pub async fn set_avatar_url_route(
                                     "Tried to send avatar url update for user not in the room.",
                                 )
                             })?
+                            .1
                             .content
                             .clone(),
                     )
diff --git a/src/client_server/room.rs b/src/client_server/room.rs
index d1d051f1..eeab68b2 100644
--- a/src/client_server/room.rs
+++ b/src/client_server/room.rs
@@ -395,6 +395,7 @@ pub async fn upgrade_room_route(
         db.rooms
             .room_state_get(&body.room_id, &EventType::RoomCreate, "")?
             .ok_or_else(|| Error::bad_database("Found room without m.room.create event."))?
+            .1
             .content,
     )
     .expect("Raw::from_value always works")
@@ -470,7 +471,7 @@ pub async fn upgrade_room_route(
     // Replicate transferable state events to the new room
     for event_type in transferable_state_events {
         let event_content = match db.rooms.room_state_get(&body.room_id, &event_type, "")? {
-            Some(v) => v.content.clone(),
+            Some((_, v)) => v.content.clone(),
             None => continue, // Skipping missing events.
         };
 
@@ -502,6 +503,7 @@ pub async fn upgrade_room_route(
             db.rooms
                 .room_state_get(&body.room_id, &EventType::RoomPowerLevels, "")?
                 .ok_or_else(|| Error::bad_database("Found room without m.room.create event."))?
+                .1
                 .content,
         )
         .expect("database contains invalid PDU")
diff --git a/src/client_server/state.rs b/src/client_server/state.rs
index eae96b5b..dbc7fdd4 100644
--- a/src/client_server/state.rs
+++ b/src/client_server/state.rs
@@ -109,7 +109,7 @@ pub async fn get_state_events_route(
         if !matches!(
             db.rooms
                 .room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")?
-                .map(|event| {
+                .map(|(_, event)| {
                     serde_json::from_value::<HistoryVisibilityEventContent>(event.content)
                         .map_err(|_| {
                             Error::bad_database(
@@ -154,7 +154,7 @@ pub async fn get_state_events_for_key_route(
         if !matches!(
             db.rooms
                 .room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")?
-                .map(|event| {
+                .map(|(_, event)| {
                     serde_json::from_value::<HistoryVisibilityEventContent>(event.content)
                         .map_err(|_| {
                             Error::bad_database(
@@ -178,7 +178,8 @@ pub async fn get_state_events_for_key_route(
         .ok_or(Error::BadRequest(
             ErrorKind::NotFound,
             "State event not found.",
-        ))?;
+        ))?
+        .1;
 
     Ok(get_state_events_for_key::Response {
         content: serde_json::value::to_raw_value(&event.content)
@@ -203,7 +204,7 @@ pub async fn get_state_events_for_empty_key_route(
         if !matches!(
             db.rooms
                 .room_state_get(&body.room_id, &EventType::RoomHistoryVisibility, "")?
-                .map(|event| {
+                .map(|(_, event)| {
                     serde_json::from_value::<HistoryVisibilityEventContent>(event.content)
                         .map_err(|_| {
                             Error::bad_database(
@@ -227,7 +228,8 @@ pub async fn get_state_events_for_empty_key_route(
         .ok_or(Error::BadRequest(
             ErrorKind::NotFound,
             "State event not found.",
-        ))?;
+        ))?
+        .1;
 
     Ok(get_state_events_for_empty_key::Response {
         content: serde_json::value::to_raw_value(&event)
diff --git a/src/client_server/sync.rs b/src/client_server/sync.rs
index caab9ea1..360691ab 100644
--- a/src/client_server/sync.rs
+++ b/src/client_server/sync.rs
@@ -440,23 +440,8 @@ pub async fn sync_events_route(
     let mut left_rooms = BTreeMap::new();
     for room_id in db.rooms.rooms_left(&sender_user) {
         let room_id = room_id?;
-        let pdus = db.rooms.pdus_since(&sender_user, &room_id, since)?;
-        let room_events = pdus
-            .filter_map(|pdu| pdu.ok()) // Filter out buggy events
-            .map(|(_, pdu)| pdu.to_sync_room_event())
-            .collect();
 
-        let left_room = sync_events::LeftRoom {
-            account_data: sync_events::AccountData { events: Vec::new() },
-            timeline: sync_events::Timeline {
-                limited: false,
-                prev_batch: Some(next_batch.clone()),
-                events: room_events,
-            },
-            state: sync_events::State { events: Vec::new() },
-        };
-
-        let since_member = db
+        let since_member = if let Some(since_member) = db
             .rooms
             .pdus_after(sender_user, &room_id, since)
             .next()
@@ -475,20 +460,25 @@ pub async fn sync_events_route(
                     .ok_or_else(|| Error::bad_database("State hash in db doesn't have a state."))
                     .ok()
             })
-            .and_then(|pdu| {
+            .and_then(|(pdu_id, pdu)| {
                 serde_json::from_value::<Raw<ruma::events::room::member::MemberEventContent>>(
-                    pdu.content,
+                    pdu.content.clone(),
                 )
                 .expect("Raw::from_value always works")
                 .deserialize()
                 .map_err(|_| Error::bad_database("Invalid PDU in database."))
+                .map(|content| (pdu_id, pdu, content))
                 .ok()
-            });
+            }) {
+            since_member
+        } else {
+            // We couldn't find the since_member event. This is very weird - we better abort
+            continue;
+        };
 
-        let left_since_last_sync =
-            since_member.map_or(false, |member| member.membership == MembershipState::Join);
+        let left_since_last_sync = since_member.2.membership == MembershipState::Join;
 
-        if left_since_last_sync {
+        let left_room = if left_since_last_sync {
             device_list_left.extend(
                 db.rooms
                     .room_members(&room_id)
@@ -503,7 +493,35 @@ pub async fn sync_events_route(
                         !share_encrypted_room(&db, sender_user, user_id, &room_id)
                     }),
             );
-        }
+
+            let pdus = db.rooms.pdus_since(&sender_user, &room_id, since)?;
+            let mut room_events = pdus
+                .filter_map(|pdu| pdu.ok()) // Filter out buggy events
+                .take_while(|(pdu_id, _)| since_member.0 != pdu_id)
+                .map(|(_, pdu)| pdu.to_sync_room_event())
+                .collect::<Vec<_>>();
+            room_events.push(since_member.1.to_sync_room_event());
+
+            sync_events::LeftRoom {
+                account_data: sync_events::AccountData { events: Vec::new() },
+                timeline: sync_events::Timeline {
+                    limited: false,
+                    prev_batch: Some(next_batch.clone()),
+                    events: room_events,
+                },
+                state: sync_events::State { events: Vec::new() },
+            }
+        } else {
+            sync_events::LeftRoom {
+                account_data: sync_events::AccountData { events: Vec::new() },
+                timeline: sync_events::Timeline {
+                    limited: false,
+                    prev_batch: Some(next_batch.clone()),
+                    events: Vec::new(),
+                },
+                state: sync_events::State { events: Vec::new() },
+            }
+        };
 
         if !left_room.is_empty() {
             left_rooms.insert(room_id.clone(), left_room);
diff --git a/src/database/rooms.rs b/src/database/rooms.rs
index 1cc20a43..05abe03e 100644
--- a/src/database/rooms.rs
+++ b/src/database/rooms.rs
@@ -169,7 +169,7 @@ impl Rooms {
         state_hash: &StateHashId,
         event_type: &EventType,
         state_key: &str,
-    ) -> Result<Option<PduEvent>> {
+    ) -> Result<Option<(IVec, PduEvent)>> {
         let mut key = state_hash.to_vec();
         key.push(0xff);
         key.extend_from_slice(&event_type.to_string().as_bytes());
@@ -177,14 +177,15 @@ impl Rooms {
         key.extend_from_slice(&state_key.as_bytes());
 
         self.stateid_pduid.get(&key)?.map_or(Ok(None), |pdu_id| {
-            Ok::<_, Error>(Some(
+            Ok::<_, Error>(Some((
+                pdu_id.clone(),
                 serde_json::from_slice::<PduEvent>(
-                    &self.pduid_pdu.get(pdu_id)?.ok_or_else(|| {
+                    &self.pduid_pdu.get(&pdu_id)?.ok_or_else(|| {
                         Error::bad_database("PDU in state not found in database.")
                     })?,
                 )
                 .map_err(|_| Error::bad_database("Invalid PDU bytes in room state."))?,
-            ))
+            )))
         })
     }
 
@@ -216,7 +217,7 @@ impl Rooms {
 
         let mut events = StateMap::new();
         for (event_type, state_key) in auth_events {
-            if let Some(pdu) = self.room_state_get(room_id, &event_type, &state_key)? {
+            if let Some((_, pdu)) = self.room_state_get(room_id, &event_type, &state_key)? {
                 events.insert((event_type, state_key), pdu);
             }
         }
@@ -299,7 +300,7 @@ impl Rooms {
         room_id: &RoomId,
         event_type: &EventType,
         state_key: &str,
-    ) -> Result<Option<PduEvent>> {
+    ) -> Result<Option<(IVec, PduEvent)>> {
         if let Some(current_state_hash) = self.current_state_hash(room_id)? {
             self.state_get(&current_state_hash, event_type, state_key)
         } else {
@@ -653,7 +654,7 @@ impl Rooms {
                                 },
                         })
                     },
-                    |power_levels| {
+                    |(_, power_levels)| {
                         Ok(serde_json::from_value::<Raw<PowerLevelsEventContent>>(
                             power_levels.content,
                         )
@@ -664,15 +665,18 @@ impl Rooms {
                 )?;
             let sender_membership = self
                 .room_state_get(&room_id, &EventType::RoomMember, &sender.to_string())?
-                .map_or(Ok::<_, Error>(member::MembershipState::Leave), |pdu| {
-                    Ok(
-                        serde_json::from_value::<Raw<member::MemberEventContent>>(pdu.content)
-                            .expect("Raw::from_value always works.")
-                            .deserialize()
-                            .map_err(|_| Error::bad_database("Invalid Member event in db."))?
-                            .membership,
-                    )
-                })?;
+                .map_or(
+                    Ok::<_, Error>(member::MembershipState::Leave),
+                    |(_, pdu)| {
+                        Ok(
+                            serde_json::from_value::<Raw<member::MemberEventContent>>(pdu.content)
+                                .expect("Raw::from_value always works.")
+                                .deserialize()
+                                .map_err(|_| Error::bad_database("Invalid Member event in db."))?
+                                .membership,
+                        )
+                    },
+                )?;
 
             let sender_power = power_levels.users.get(&sender).map_or_else(
                 || {
@@ -759,7 +763,7 @@ impl Rooms {
 
         let mut unsigned = unsigned.unwrap_or_default();
         if let Some(state_key) = &state_key {
-            if let Some(prev_pdu) = self.room_state_get(&room_id, &event_type, &state_key)? {
+            if let Some((_, prev_pdu)) = self.room_state_get(&room_id, &event_type, &state_key)? {
                 unsigned.insert("prev_content".to_owned(), prev_pdu.content);
                 unsigned.insert(
                     "prev_sender".to_owned(),
@@ -1017,7 +1021,7 @@ impl Rooms {
                     // Check if the room has a predecessor
                     if let Some(predecessor) = self
                         .room_state_get(&room_id, &EventType::RoomCreate, "")?
-                        .and_then(|create| {
+                        .and_then(|(_, create)| {
                             serde_json::from_value::<
                                 Raw<ruma::events::room::create::CreateEventContent>,
                             >(create.content)

From 16b22bb432b3424000c76870eaf967873047dfd1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Timo=20K=C3=B6sters?= <timo@koesters.xyz>
Date: Tue, 3 Nov 2020 21:20:35 +0100
Subject: [PATCH 3/4] fix: don't allow more than 50 PDUs in a transaction

---
 src/database/sending.rs | 18 +++++++++++-------
 1 file changed, 11 insertions(+), 7 deletions(-)

diff --git a/src/database/sending.rs b/src/database/sending.rs
index 33ee5303..597778f2 100644
--- a/src/database/sending.rs
+++ b/src/database/sending.rs
@@ -54,13 +54,14 @@ impl Sending {
                     ))
                 })
                 .filter_map(|r| r.ok())
+                .filter(|pdu| !pdu.is_empty()) // Skip reservation key
+                .take(50)
+            // This should not contain more than 50 anyway
             {
-                if !pdu.is_empty() {
-                    current_transactions
-                        .entry(server)
-                        .or_insert_with(Vec::new)
-                        .push(pdu);
-                }
+                current_transactions
+                    .entry(server)
+                    .or_insert_with(Vec::new)
+                    .push(pdu);
             }
 
             for (server, pdus) in current_transactions {
@@ -95,7 +96,9 @@ impl Sending {
                                     .filter_map(|r| r.ok())
                                     .map(|k| {
                                         k.subslice(prefix.len(), k.len() - prefix.len())
-                                    }).collect::<Vec<_>>();
+                                    })
+                                    .take(50)
+                                    .collect::<Vec<_>>();
 
                                 if !new_pdus.is_empty() {
                                     for pdu_id in &new_pdus {
@@ -108,6 +111,7 @@ impl Sending {
                                     futures.push(Self::handle_event(server, new_pdus, &globals, &rooms));
                                 } else {
                                     servercurrentpdus.remove(&prefix).unwrap();
+                                    // servercurrentpdus with the prefix should be empty now
                                 }
                             }
                             Err((_server, _e)) => {

From 9f8cffcd22dcf4ca27a82c262aeb25c7ec05e257 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Timo=20K=C3=B6sters?= <timo@koesters.xyz>
Date: Mon, 9 Nov 2020 12:21:04 +0100
Subject: [PATCH 4/4] Admin room improvements

---
 src/client_server/account.rs    | 13 ++++++
 src/client_server/membership.rs |  8 +++-
 src/client_server/message.rs    |  1 +
 src/client_server/profile.rs    |  2 +
 src/client_server/redact.rs     |  1 +
 src/client_server/room.rs       | 15 +++++++
 src/client_server/state.rs      |  1 +
 src/database.rs                 | 21 ++++++++--
 src/database/account_data.rs    |  1 +
 src/database/admin.rs           | 74 +++++++++++++++++++++++++++++++++
 src/database/key_backups.rs     |  1 +
 src/database/media.rs           |  1 +
 src/database/rooms.rs           | 34 +++++----------
 src/database/sending.rs         |  3 +-
 src/database/transaction_ids.rs |  1 +
 src/database/uiaa.rs            |  1 +
 src/database/users.rs           |  1 +
 src/server_server.rs            |  2 +-
 18 files changed, 152 insertions(+), 29 deletions(-)
 create mode 100644 src/database/admin.rs

diff --git a/src/client_server/account.rs b/src/client_server/account.rs
index fad59c37..81119ba3 100644
--- a/src/client_server/account.rs
+++ b/src/client_server/account.rs
@@ -241,6 +241,7 @@ pub async fn register_route(
             &room_id,
             &db.globals,
             &db.sending,
+            &db.admin,
             &db.account_data,
         )?;
 
@@ -264,6 +265,7 @@ pub async fn register_route(
             &room_id,
             &db.globals,
             &db.sending,
+            &db.admin,
             &db.account_data,
         )?;
 
@@ -300,6 +302,7 @@ pub async fn register_route(
             &room_id,
             &db.globals,
             &db.sending,
+            &db.admin,
             &db.account_data,
         )?;
 
@@ -319,6 +322,7 @@ pub async fn register_route(
             &room_id,
             &db.globals,
             &db.sending,
+            &db.admin,
             &db.account_data,
         )?;
 
@@ -340,6 +344,7 @@ pub async fn register_route(
             &room_id,
             &db.globals,
             &db.sending,
+            &db.admin,
             &db.account_data,
         )?;
 
@@ -359,6 +364,7 @@ pub async fn register_route(
             &room_id,
             &db.globals,
             &db.sending,
+            &db.admin,
             &db.account_data,
         )?;
 
@@ -380,6 +386,7 @@ pub async fn register_route(
             &room_id,
             &db.globals,
             &db.sending,
+            &db.admin,
             &db.account_data,
         )?;
 
@@ -398,6 +405,7 @@ pub async fn register_route(
             &room_id,
             &db.globals,
             &db.sending,
+            &db.admin,
             &db.account_data,
         )?;
 
@@ -422,6 +430,7 @@ pub async fn register_route(
             &room_id,
             &db.globals,
             &db.sending,
+            &db.admin,
             &db.account_data,
         )?;
 
@@ -447,6 +456,7 @@ pub async fn register_route(
             &room_id,
             &db.globals,
             &db.sending,
+            &db.admin,
             &db.account_data,
         )?;
         db.rooms.build_and_append_pdu(
@@ -468,6 +478,7 @@ pub async fn register_route(
             &room_id,
             &db.globals,
             &db.sending,
+            &db.admin,
             &db.account_data,
         )?;
 
@@ -494,6 +505,7 @@ pub async fn register_route(
             &room_id,
             &db.globals,
             &db.sending,
+            &db.admin,
             &db.account_data,
         )?;
     }
@@ -666,6 +678,7 @@ pub async fn deactivate_route(
             &room_id,
             &db.globals,
             &db.sending,
+            &db.admin,
             &db.account_data,
         )?;
     }
diff --git a/src/client_server/membership.rs b/src/client_server/membership.rs
index d79079db..25cad85c 100644
--- a/src/client_server/membership.rs
+++ b/src/client_server/membership.rs
@@ -124,6 +124,7 @@ pub async fn leave_room_route(
         &body.room_id,
         &db.globals,
         &db.sending,
+        &db.admin,
         &db.account_data,
     )?;
 
@@ -162,6 +163,7 @@ pub async fn invite_user_route(
             &body.room_id,
             &db.globals,
             &db.sending,
+            &db.admin,
             &db.account_data,
         )?;
 
@@ -216,6 +218,7 @@ pub async fn kick_user_route(
         &body.room_id,
         &db.globals,
         &db.sending,
+        &db.admin,
         &db.account_data,
     )?;
 
@@ -274,6 +277,7 @@ pub async fn ban_user_route(
         &body.room_id,
         &db.globals,
         &db.sending,
+        &db.admin,
         &db.account_data,
     )?;
 
@@ -324,6 +328,7 @@ pub async fn unban_user_route(
         &body.room_id,
         &db.globals,
         &db.sending,
+        &db.admin,
         &db.account_data,
     )?;
 
@@ -673,7 +678,7 @@ async fn join_room_by_id_helper(
                 pdu_id.clone().into(),
                 &db.globals,
                 &db.account_data,
-                &db.sending,
+                &db.admin,
             )?;
 
             if state_events.contains(ev_id) {
@@ -703,6 +708,7 @@ async fn join_room_by_id_helper(
             &room_id,
             &db.globals,
             &db.sending,
+            &db.admin,
             &db.account_data,
         )?;
     }
diff --git a/src/client_server/message.rs b/src/client_server/message.rs
index f9c8ba10..327b9ab2 100644
--- a/src/client_server/message.rs
+++ b/src/client_server/message.rs
@@ -67,6 +67,7 @@ pub async fn send_message_event_route(
         &body.room_id,
         &db.globals,
         &db.sending,
+        &db.admin,
         &db.account_data,
     )?;
 
diff --git a/src/client_server/profile.rs b/src/client_server/profile.rs
index 3fa1da65..22d13cbd 100644
--- a/src/client_server/profile.rs
+++ b/src/client_server/profile.rs
@@ -65,6 +65,7 @@ pub async fn set_displayname_route(
             &room_id,
             &db.globals,
             &db.sending,
+            &db.admin,
             &db.account_data,
         )?;
 
@@ -160,6 +161,7 @@ pub async fn set_avatar_url_route(
             &room_id,
             &db.globals,
             &db.sending,
+            &db.admin,
             &db.account_data,
         )?;
 
diff --git a/src/client_server/redact.rs b/src/client_server/redact.rs
index 486eb6c8..6f7728a3 100644
--- a/src/client_server/redact.rs
+++ b/src/client_server/redact.rs
@@ -33,6 +33,7 @@ pub async fn redact_event_route(
         &body.room_id,
         &db.globals,
         &db.sending,
+        &db.admin,
         &db.account_data,
     )?;
 
diff --git a/src/client_server/room.rs b/src/client_server/room.rs
index eeab68b2..fdc9529a 100644
--- a/src/client_server/room.rs
+++ b/src/client_server/room.rs
@@ -65,6 +65,7 @@ pub async fn create_room_route(
         &room_id,
         &db.globals,
         &db.sending,
+        &db.admin,
         &db.account_data,
     )?;
 
@@ -88,6 +89,7 @@ pub async fn create_room_route(
         &room_id,
         &db.globals,
         &db.sending,
+        &db.admin,
         &db.account_data,
     )?;
 
@@ -131,6 +133,7 @@ pub async fn create_room_route(
         &room_id,
         &db.globals,
         &db.sending,
+        &db.admin,
         &db.account_data,
     )?;
 
@@ -165,6 +168,7 @@ pub async fn create_room_route(
         &room_id,
         &db.globals,
         &db.sending,
+        &db.admin,
         &db.account_data,
     )?;
 
@@ -184,6 +188,7 @@ pub async fn create_room_route(
         &room_id,
         &db.globals,
         &db.sending,
+        &db.admin,
         &db.account_data,
     )?;
 
@@ -211,6 +216,7 @@ pub async fn create_room_route(
         &room_id,
         &db.globals,
         &db.sending,
+        &db.admin,
         &db.account_data,
     )?;
 
@@ -232,6 +238,7 @@ pub async fn create_room_route(
             &room_id,
             &db.globals,
             &db.sending,
+            &db.admin,
             &db.account_data,
         )?;
     }
@@ -255,6 +262,7 @@ pub async fn create_room_route(
             &room_id,
             &db.globals,
             &db.sending,
+            &db.admin,
             &db.account_data,
         )?;
     }
@@ -275,6 +283,7 @@ pub async fn create_room_route(
             &room_id,
             &db.globals,
             &db.sending,
+            &db.admin,
             &db.account_data,
         )?;
     }
@@ -300,6 +309,7 @@ pub async fn create_room_route(
             &room_id,
             &db.globals,
             &db.sending,
+            &db.admin,
             &db.account_data,
         )?;
     }
@@ -387,6 +397,7 @@ pub async fn upgrade_room_route(
         &body.room_id,
         &db.globals,
         &db.sending,
+        &db.admin,
         &db.account_data,
     )?;
 
@@ -429,6 +440,7 @@ pub async fn upgrade_room_route(
         &replacement_room,
         &db.globals,
         &db.sending,
+        &db.admin,
         &db.account_data,
     )?;
 
@@ -452,6 +464,7 @@ pub async fn upgrade_room_route(
         &replacement_room,
         &db.globals,
         &db.sending,
+        &db.admin,
         &db.account_data,
     )?;
 
@@ -487,6 +500,7 @@ pub async fn upgrade_room_route(
             &replacement_room,
             &db.globals,
             &db.sending,
+            &db.admin,
             &db.account_data,
         )?;
     }
@@ -532,6 +546,7 @@ pub async fn upgrade_room_route(
         &body.room_id,
         &db.globals,
         &db.sending,
+        &db.admin,
         &db.account_data,
     )?;
 
diff --git a/src/client_server/state.rs b/src/client_server/state.rs
index dbc7fdd4..ca6bdf7e 100644
--- a/src/client_server/state.rs
+++ b/src/client_server/state.rs
@@ -284,6 +284,7 @@ pub async fn send_state_event_for_key_helper(
         &room_id,
         &db.globals,
         &db.sending,
+        &db.admin,
         &db.account_data,
     )?;
 
diff --git a/src/database.rs b/src/database.rs
index 3b0bd6fa..51c3895a 100644
--- a/src/database.rs
+++ b/src/database.rs
@@ -1,4 +1,5 @@
 pub mod account_data;
+pub mod admin;
 pub mod globals;
 pub mod key_backups;
 pub mod media;
@@ -12,10 +13,14 @@ use crate::{Error, Result};
 use directories::ProjectDirs;
 use futures::StreamExt;
 use log::info;
-use rocket::{futures, Config};
+use rocket::{
+    futures::{self, channel::mpsc},
+    Config,
+};
 use ruma::{DeviceId, UserId};
 use std::{convert::TryFrom, fs::remove_dir_all};
 
+#[derive(Clone)]
 pub struct Database {
     pub globals: globals::Globals,
     pub users: users::Users,
@@ -26,6 +31,7 @@ pub struct Database {
     pub key_backups: key_backups::KeyBackups,
     pub transaction_ids: transaction_ids::TransactionIds,
     pub sending: sending::Sending,
+    pub admin: admin::Admin,
     pub _db: sled::Db,
 }
 
@@ -80,7 +86,9 @@ impl Database {
 
         info!("Opened sled database at {}", path);
 
-        Ok(Self {
+        let (admin_sender, admin_receiver) = mpsc::unbounded();
+
+        let db = Self {
             globals: globals::Globals::load(db.open_tree("global")?, config)?,
             users: users::Users {
                 userid_password: db.open_tree("userid_password")?,
@@ -152,8 +160,15 @@ impl Database {
                 servernamepduids: db.open_tree("servernamepduids")?,
                 servercurrentpdus: db.open_tree("servercurrentpdus")?,
             },
+            admin: admin::Admin {
+                sender: admin_sender,
+            },
             _db: db,
-        })
+        };
+
+        db.admin.start_handler(db.clone(), admin_receiver);
+
+        Ok(db)
     }
 
     pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) {
diff --git a/src/database/account_data.rs b/src/database/account_data.rs
index a9171235..9a6a050f 100644
--- a/src/database/account_data.rs
+++ b/src/database/account_data.rs
@@ -8,6 +8,7 @@ use serde::{de::DeserializeOwned, Serialize};
 use sled::IVec;
 use std::{collections::HashMap, convert::TryFrom};
 
+#[derive(Clone)]
 pub struct AccountData {
     pub(super) roomuserdataid_accountdata: sled::Tree, // RoomUserDataId = Room + User + Count + Type
 }
diff --git a/src/database/admin.rs b/src/database/admin.rs
new file mode 100644
index 00000000..f8b23855
--- /dev/null
+++ b/src/database/admin.rs
@@ -0,0 +1,74 @@
+use std::convert::{TryFrom, TryInto};
+
+use crate::{pdu::PduBuilder, Error};
+use rocket::futures::{channel::mpsc, stream::StreamExt};
+use ruma::{events::room::message, events::EventType, UserId};
+use tokio::select;
+
+pub enum AdminCommand {
+    SendTextMessage(message::TextMessageEventContent),
+}
+
+#[derive(Clone)]
+pub struct Admin {
+    pub sender: mpsc::UnboundedSender<AdminCommand>,
+}
+
+impl Admin {
+    pub fn start_handler(
+        &self,
+        db: super::Database,
+        mut receiver: mpsc::UnboundedReceiver<AdminCommand>,
+    ) {
+        tokio::spawn(async move {
+            // TODO: Use futures when we have long admin commands
+            //let mut futures = FuturesUnordered::new();
+
+            let conduit_user = UserId::try_from(format!("@conduit:{}", db.globals.server_name()))
+                .expect("@conduit:server_name is valid");
+
+            let conduit_room = db
+                .rooms
+                .id_from_alias(
+                    &format!("#admins:{}", db.globals.server_name())
+                        .try_into()
+                        .expect("#admins:server_name is a valid room alias"),
+                )
+                .unwrap()
+                .ok_or_else(|| Error::BadConfig("Conduit instance does not have an #admins room."))
+                .unwrap();
+
+            loop {
+                select! {
+                    Some(event) = receiver.next() => {
+                        match event {
+                            AdminCommand::SendTextMessage(message) => {
+                                println!("{:?}", message);
+
+                                db.rooms.build_and_append_pdu(
+                                    PduBuilder {
+                                        event_type: EventType::RoomMessage,
+                                        content: serde_json::to_value(message).expect("event is valid, we just created it"),
+                                        unsigned: None,
+                                        state_key: None,
+                                        redacts: None,
+                                    },
+                                    &conduit_user,
+                                    &conduit_room,
+                                    &db.globals,
+                                    &db.sending,
+                                    &db.admin,
+                                    &db.account_data,
+                                ).unwrap();
+                            }
+                        }
+                    }
+                }
+            }
+        });
+    }
+
+    pub fn send(&self, command: AdminCommand) {
+        self.sender.unbounded_send(command).unwrap()
+    }
+}
diff --git a/src/database/key_backups.rs b/src/database/key_backups.rs
index 1ce75955..a50e45eb 100644
--- a/src/database/key_backups.rs
+++ b/src/database/key_backups.rs
@@ -8,6 +8,7 @@ use ruma::{
 };
 use std::{collections::BTreeMap, convert::TryFrom};
 
+#[derive(Clone)]
 pub struct KeyBackups {
     pub(super) backupid_algorithm: sled::Tree, // BackupId = UserId + Version(Count)
     pub(super) backupid_etag: sled::Tree,      // BackupId = UserId + Version(Count)
diff --git a/src/database/media.rs b/src/database/media.rs
index 3ecf4bd9..8c59aa4d 100644
--- a/src/database/media.rs
+++ b/src/database/media.rs
@@ -9,6 +9,7 @@ pub struct FileMeta {
     pub file: Vec<u8>,
 }
 
+#[derive(Clone)]
 pub struct Media {
     pub(super) mediaid_file: sled::Tree, // MediaId = MXC + WidthHeight + Filename + ContentType
 }
diff --git a/src/database/rooms.rs b/src/database/rooms.rs
index 05abe03e..8ab900fd 100644
--- a/src/database/rooms.rs
+++ b/src/database/rooms.rs
@@ -27,6 +27,8 @@ use std::{
     sync::Arc,
 };
 
+use super::admin::AdminCommand;
+
 /// The unique identifier of each state group.
 ///
 /// This is created when a state group is added to the database by
@@ -443,7 +445,7 @@ impl Rooms {
         pdu_id: IVec,
         globals: &super::globals::Globals,
         account_data: &super::account_data::AccountData,
-        sending: &super::sending::Sending,
+        admin: &super::admin::Admin,
     ) -> Result<()> {
         self.replace_pdu_leaves(&pdu.room_id, &pdu.event_id)?;
 
@@ -514,28 +516,13 @@ impl Rooms {
                         if let Some(command) = parts.next() {
                             let args = parts.collect::<Vec<_>>();
 
-                            self.build_and_append_pdu(
-                                PduBuilder {
-                                    event_type: EventType::RoomMessage,
-                                    content: serde_json::to_value(
-                                        message::TextMessageEventContent {
-                                            body: format!("Command: {}, Args: {:?}", command, args),
-                                            formatted: None,
-                                            relates_to: None,
-                                        },
-                                    )
-                                    .expect("event is valid, we just created it"),
-                                    unsigned: None,
-                                    state_key: None,
-                                    redacts: None,
+                            admin.send(AdminCommand::SendTextMessage(
+                                message::TextMessageEventContent {
+                                    body: format!("Command: {}, Args: {:?}", command, args),
+                                    formatted: None,
+                                    relates_to: None,
                                 },
-                                &UserId::try_from(format!("@conduit:{}", globals.server_name()))
-                                    .expect("@conduit:server_name is valid"),
-                                &pdu.room_id,
-                                &globals,
-                                &sending,
-                                &account_data,
-                            )?;
+                            ));
                         }
                     }
                 }
@@ -612,6 +599,7 @@ impl Rooms {
         room_id: &RoomId,
         globals: &super::globals::Globals,
         sending: &super::sending::Sending,
+        admin: &super::admin::Admin,
         account_data: &super::account_data::AccountData,
     ) -> Result<EventId> {
         let PduBuilder {
@@ -849,7 +837,7 @@ impl Rooms {
             pdu_id.clone().into(),
             globals,
             account_data,
-            sending,
+            admin,
         )?;
 
         for server in self
diff --git a/src/database/sending.rs b/src/database/sending.rs
index 597778f2..e3fca4f0 100644
--- a/src/database/sending.rs
+++ b/src/database/sending.rs
@@ -8,6 +8,7 @@ use ruma::{api::federation, ServerName};
 use sled::IVec;
 use tokio::select;
 
+#[derive(Clone)]
 pub struct Sending {
     /// The state for a given state hash.
     pub(super) servernamepduids: sled::Tree, // ServernamePduId = ServerName + PduId
@@ -54,7 +55,7 @@ impl Sending {
                     ))
                 })
                 .filter_map(|r| r.ok())
-                .filter(|pdu| !pdu.is_empty()) // Skip reservation key
+                .filter(|(_, pdu)| !pdu.is_empty()) // Skip reservation key
                 .take(50)
             // This should not contain more than 50 anyway
             {
diff --git a/src/database/transaction_ids.rs b/src/database/transaction_ids.rs
index 9485b361..7c0eb98b 100644
--- a/src/database/transaction_ids.rs
+++ b/src/database/transaction_ids.rs
@@ -2,6 +2,7 @@ use crate::Result;
 use ruma::{DeviceId, UserId};
 use sled::IVec;
 
+#[derive(Clone)]
 pub struct TransactionIds {
     pub(super) userdevicetxnid_response: sled::Tree, // Response can be empty (/sendToDevice) or the event id (/send)
 }
diff --git a/src/database/uiaa.rs b/src/database/uiaa.rs
index e318f436..381a7016 100644
--- a/src/database/uiaa.rs
+++ b/src/database/uiaa.rs
@@ -7,6 +7,7 @@ use ruma::{
     DeviceId, UserId,
 };
 
+#[derive(Clone)]
 pub struct Uiaa {
     pub(super) userdeviceid_uiaainfo: sled::Tree, // User-interactive authentication
 }
diff --git a/src/database/users.rs b/src/database/users.rs
index 0d35e362..2a039602 100644
--- a/src/database/users.rs
+++ b/src/database/users.rs
@@ -14,6 +14,7 @@ use ruma::{
 };
 use std::{collections::BTreeMap, convert::TryFrom, mem, time::SystemTime};
 
+#[derive(Clone)]
 pub struct Users {
     pub(super) userid_password: sled::Tree,
     pub(super) userid_displayname: sled::Tree,
diff --git a/src/server_server.rs b/src/server_server.rs
index ccb13994..0f24e153 100644
--- a/src/server_server.rs
+++ b/src/server_server.rs
@@ -404,7 +404,7 @@ pub fn send_transaction_message_route<'a>(
                 pdu_id.clone().into(),
                 &db.globals,
                 &db.account_data,
-                &db.sending,
+                &db.admin,
             )?;
         }
     }