feat: swappable database backend

This commit is contained in:
Timo Kösters 2021-06-08 18:10:00 +02:00
parent 81715bd84d
commit d0ee823254
No known key found for this signature in database
GPG key ID: 24DA7517711A2BA4
47 changed files with 1434 additions and 981 deletions

View file

@ -12,7 +12,10 @@ use crate::{
use federation::transactions::send_transaction_message;
use log::{error, warn};
use ring::digest;
use rocket::futures::stream::{FuturesUnordered, StreamExt};
use rocket::futures::{
channel::mpsc,
stream::{FuturesUnordered, StreamExt},
};
use ruma::{
api::{
appservice,
@ -27,9 +30,10 @@ use ruma::{
receipt::ReceiptType,
MilliSecondsSinceUnixEpoch, ServerName, UInt, UserId,
};
use sled::IVec;
use tokio::{select, sync::Semaphore};
use super::abstraction::Tree;
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum OutgoingKind {
Appservice(Box<ServerName>),
@ -70,13 +74,13 @@ pub enum SendingEventType {
Edu(Vec<u8>),
}
#[derive(Clone)]
pub struct Sending {
/// The state for a given state hash.
pub(super) servername_educount: sled::Tree, // EduCount: Count of last EDU sync
pub(super) servernamepduids: sled::Tree, // ServernamePduId = (+ / $)SenderKey / ServerName / UserId + PduId
pub(super) servercurrentevents: sled::Tree, // ServerCurrentEvents = (+ / $)ServerName / UserId + PduId / (*)EduEvent
pub(super) servername_educount: Arc<dyn Tree>, // EduCount: Count of last EDU sync
pub(super) servernamepduids: Arc<dyn Tree>, // ServernamePduId = (+ / $)SenderKey / ServerName / UserId + PduId
pub(super) servercurrentevents: Arc<dyn Tree>, // ServerCurrentEvents = (+ / $)ServerName / UserId + PduId / (*)EduEvent
pub(super) maximum_requests: Arc<Semaphore>,
pub sender: mpsc::UnboundedSender<Vec<u8>>,
}
enum TransactionStatus {
@ -86,28 +90,25 @@ enum TransactionStatus {
}
impl Sending {
pub fn start_handler(&self, db: &Database) {
let servernamepduids = self.servernamepduids.clone();
let servercurrentevents = self.servercurrentevents.clone();
pub fn start_handler(&self, db: Arc<Database>, mut receiver: mpsc::UnboundedReceiver<Vec<u8>>) {
let db = db.clone();
tokio::spawn(async move {
let mut futures = FuturesUnordered::new();
// Retry requests we could not finish yet
let mut subscriber = servernamepduids.watch_prefix(b"");
let mut current_transaction_status = HashMap::<Vec<u8>, TransactionStatus>::new();
// Retry requests we could not finish yet
let mut initial_transactions = HashMap::<OutgoingKind, Vec<SendingEventType>>::new();
for (key, outgoing_kind, event) in servercurrentevents
.iter()
.filter_map(|r| r.ok())
.filter_map(|(key, _)| {
Self::parse_servercurrentevent(&key)
.ok()
.map(|(k, e)| (key, k, e))
})
for (key, outgoing_kind, event) in
db.sending
.servercurrentevents
.iter()
.filter_map(|(key, _)| {
Self::parse_servercurrentevent(&key)
.ok()
.map(|(k, e)| (key, k, e))
})
{
let entry = initial_transactions
.entry(outgoing_kind.clone())
@ -118,7 +119,7 @@ impl Sending {
"Dropping some current events: {:?} {:?} {:?}",
key, outgoing_kind, event
);
servercurrentevents.remove(key).unwrap();
db.sending.servercurrentevents.remove(&key).unwrap();
continue;
}
@ -137,20 +138,16 @@ impl Sending {
match response {
Ok(outgoing_kind) => {
let prefix = outgoing_kind.get_prefix();
for key in servercurrentevents
.scan_prefix(&prefix)
.keys()
.filter_map(|r| r.ok())
for (key, _) in db.sending.servercurrentevents
.scan_prefix(prefix.clone())
{
servercurrentevents.remove(key).unwrap();
db.sending.servercurrentevents.remove(&key).unwrap();
}
// Find events that have been added since starting the last request
let new_events = servernamepduids
.scan_prefix(&prefix)
.keys()
.filter_map(|r| r.ok())
.map(|k| {
let new_events = db.sending.servernamepduids
.scan_prefix(prefix.clone())
.map(|(k, _)| {
SendingEventType::Pdu(k[prefix.len()..].to_vec())
})
.take(30)
@ -166,8 +163,8 @@ impl Sending {
SendingEventType::Pdu(b) |
SendingEventType::Edu(b) => {
current_key.extend_from_slice(&b);
servercurrentevents.insert(&current_key, &[]).unwrap();
servernamepduids.remove(&current_key).unwrap();
db.sending.servercurrentevents.insert(&current_key, &[]).unwrap();
db.sending.servernamepduids.remove(&current_key).unwrap();
}
}
}
@ -195,18 +192,15 @@ impl Sending {
}
};
},
Some(event) = &mut subscriber => {
// New sled version:
//for (_tree, key, value_opt) in &event {
// if value_opt.is_none() {
// continue;
// }
if let sled::Event::Insert { key, .. } = event {
if let Ok((outgoing_kind, event)) = Self::parse_servercurrentevent(&key) {
if let Some(events) = Self::select_events(&outgoing_kind, vec![(event, key)], &mut current_transaction_status, &servercurrentevents, &servernamepduids, &db) {
futures.push(Self::handle_events(outgoing_kind, events, &db));
}
Some(key) = receiver.next() => {
if let Ok((outgoing_kind, event)) = Self::parse_servercurrentevent(&key) {
if let Ok(Some(events)) = Self::select_events(
&outgoing_kind,
vec![(event, key)],
&mut current_transaction_status,
&db
) {
futures.push(Self::handle_events(outgoing_kind, events, &db));
}
}
}
@ -217,12 +211,10 @@ impl Sending {
fn select_events(
outgoing_kind: &OutgoingKind,
new_events: Vec<(SendingEventType, IVec)>, // Events we want to send: event and full key
new_events: Vec<(SendingEventType, Vec<u8>)>, // Events we want to send: event and full key
current_transaction_status: &mut HashMap<Vec<u8>, TransactionStatus>,
servercurrentevents: &sled::Tree,
servernamepduids: &sled::Tree,
db: &Database,
) -> Option<Vec<SendingEventType>> {
) -> Result<Option<Vec<SendingEventType>>> {
let mut retry = false;
let mut allow = true;
@ -252,29 +244,25 @@ impl Sending {
.or_insert(TransactionStatus::Running);
if !allow {
return None;
return Ok(None);
}
let mut events = Vec::new();
if retry {
// We retry the previous transaction
for key in servercurrentevents
.scan_prefix(&prefix)
.keys()
.filter_map(|r| r.ok())
{
for (key, _) in db.sending.servercurrentevents.scan_prefix(prefix) {
if let Ok((_, e)) = Self::parse_servercurrentevent(&key) {
events.push(e);
}
}
} else {
for (e, full_key) in new_events {
servercurrentevents.insert(&full_key, &[]).unwrap();
db.sending.servercurrentevents.insert(&full_key, &[])?;
// If it was a PDU we have to unqueue it
// TODO: don't try to unqueue EDUs
servernamepduids.remove(&full_key).unwrap();
db.sending.servernamepduids.remove(&full_key)?;
events.push(e);
}
@ -284,13 +272,12 @@ impl Sending {
events.extend_from_slice(&select_edus);
db.sending
.servername_educount
.insert(server_name.as_bytes(), &last_count.to_be_bytes())
.unwrap();
.insert(server_name.as_bytes(), &last_count.to_be_bytes())?;
}
}
}
Some(events)
Ok(Some(events))
}
pub fn select_edus(db: &Database, server: &ServerName) -> Result<(Vec<SendingEventType>, u64)> {
@ -307,7 +294,7 @@ impl Sending {
let mut max_edu_count = since;
'outer: for room_id in db.rooms.server_rooms(server) {
let room_id = room_id?;
for r in db.rooms.edus.readreceipts_since(&room_id, since)? {
for r in db.rooms.edus.readreceipts_since(&room_id, since) {
let (user_id, count, read_receipt) = r?;
if count > max_edu_count {
@ -372,12 +359,13 @@ impl Sending {
}
#[tracing::instrument(skip(self))]
pub fn send_push_pdu(&self, pdu_id: &[u8], senderkey: IVec) -> Result<()> {
pub fn send_push_pdu(&self, pdu_id: &[u8], senderkey: Box<[u8]>) -> Result<()> {
let mut key = b"$".to_vec();
key.extend_from_slice(&senderkey);
key.push(0xff);
key.extend_from_slice(pdu_id);
self.servernamepduids.insert(key, b"")?;
self.servernamepduids.insert(&key, b"")?;
self.sender.unbounded_send(key).unwrap();
Ok(())
}
@ -387,7 +375,8 @@ impl Sending {
let mut key = server.as_bytes().to_vec();
key.push(0xff);
key.extend_from_slice(pdu_id);
self.servernamepduids.insert(key, b"")?;
self.servernamepduids.insert(&key, b"")?;
self.sender.unbounded_send(key).unwrap();
Ok(())
}
@ -398,7 +387,8 @@ impl Sending {
key.extend_from_slice(appservice_id.as_bytes());
key.push(0xff);
key.extend_from_slice(pdu_id);
self.servernamepduids.insert(key, b"")?;
self.servernamepduids.insert(&key, b"")?;
self.sender.unbounded_send(key).unwrap();
Ok(())
}
@ -641,7 +631,7 @@ impl Sending {
}
}
fn parse_servercurrentevent(key: &IVec) -> Result<(OutgoingKind, SendingEventType)> {
fn parse_servercurrentevent(key: &[u8]) -> Result<(OutgoingKind, SendingEventType)> {
// Appservices start with a plus
Ok::<_, Error>(if key.starts_with(b"+") {
let mut parts = key[1..].splitn(2, |&b| b == 0xff);