Database Refactor

combine service/users data w/ mod unit

split sliding sync related out of service/users

instrument database entry points

remove increment crap from database interface

de-wrap all database get() calls

de-wrap all database insert() calls

de-wrap all database remove() calls

refactor database interface for async streaming

add query key serializer for database

implement Debug for result handle

add query deserializer for database

add deserialization trait for option handle

start a stream utils suite

de-wrap/asyncify/type-query count_one_time_keys()

de-wrap/asyncify users count

add admin query users command suite

de-wrap/asyncify users exists

de-wrap/partially asyncify user filter related

asyncify/de-wrap users device/keys related

asyncify/de-wrap user auth/misc related

asyncify/de-wrap users blurhash

asyncify/de-wrap account_data get; merge Data into Service

partial asyncify/de-wrap uiaa; merge Data into Service

partially asyncify/de-wrap transaction_ids get; merge Data into Service

partially asyncify/de-wrap key_backups; merge Data into Service

asyncify/de-wrap pusher service getters; merge Data into Service

asyncify/de-wrap rooms alias getters/some iterators

asyncify/de-wrap rooms directory getters/iterator

partially asyncify/de-wrap rooms lazy-loading

partially asyncify/de-wrap rooms metadata

asyncify/dewrap rooms outlier

asyncify/dewrap rooms pdu_metadata

dewrap/partially asyncify rooms read receipt

de-wrap rooms search service

de-wrap/partially asyncify rooms user service

partial de-wrap rooms state_compressor

de-wrap rooms state_cache

de-wrap room state et al

de-wrap rooms timeline service

additional users device/keys related

de-wrap/asyncify sender

asyncify services

refactor database to TryFuture/TryStream

refactor services for TryFuture/TryStream

asyncify api handlers

additional asyncification for admin module

abstract stream related; support reverse streams

additional stream conversions

asyncify state-res related

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-08-08 17:18:30 +00:00 committed by strawberry
parent 6001014078
commit 946ca364e0
203 changed files with 12202 additions and 10709 deletions

View file

@ -7,18 +7,15 @@ use std::{
use base64::{engine::general_purpose, Engine as _};
use conduit::{
debug, debug_warn, error, trace,
utils::{calculate_hash, math::continue_exponential_backoff_secs},
debug, debug_warn, err, trace,
utils::{calculate_hash, math::continue_exponential_backoff_secs, ReadyExt},
warn, Error, Result,
};
use federation::transactions::send_transaction_message;
use futures_util::{future::BoxFuture, stream::FuturesUnordered, StreamExt};
use futures::{future::BoxFuture, pin_mut, stream::FuturesUnordered, FutureExt, StreamExt};
use ruma::{
api::federation::{
self,
transactions::edu::{
DeviceListUpdateContent, Edu, PresenceContent, PresenceUpdate, ReceiptContent, ReceiptData, ReceiptMap,
},
api::federation::transactions::{
edu::{DeviceListUpdateContent, Edu, PresenceContent, PresenceUpdate, ReceiptContent, ReceiptData, ReceiptMap},
send_transaction_message,
},
device_id,
events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType},
@ -28,7 +25,7 @@ use ruma::{
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
use tokio::time::sleep_until;
use super::{appservice, Destination, Msg, SendingEvent, Service};
use super::{appservice, data::QueueItem, Destination, Msg, SendingEvent, Service};
#[derive(Debug)]
enum TransactionStatus {
@ -50,20 +47,20 @@ const CLEANUP_TIMEOUT_MS: u64 = 3500;
impl Service {
#[tracing::instrument(skip_all, name = "sender")]
pub(super) async fn sender(&self) -> Result<()> {
let receiver = self.receiver.lock().await;
let mut futures: SendingFutures<'_> = FuturesUnordered::new();
let mut statuses: CurTransactionStatus = CurTransactionStatus::new();
let mut futures: SendingFutures<'_> = FuturesUnordered::new();
let receiver = self.receiver.lock().await;
self.initial_requests(&futures, &mut statuses);
self.initial_requests(&mut futures, &mut statuses).await;
loop {
debug_assert!(!receiver.is_closed(), "channel error");
tokio::select! {
request = receiver.recv_async() => match request {
Ok(request) => self.handle_request(request, &futures, &mut statuses),
Ok(request) => self.handle_request(request, &mut futures, &mut statuses).await,
Err(_) => break,
},
Some(response) = futures.next() => {
self.handle_response(response, &futures, &mut statuses);
self.handle_response(response, &mut futures, &mut statuses).await;
},
}
}
@ -72,18 +69,16 @@ impl Service {
Ok(())
}
fn handle_response<'a>(
&'a self, response: SendingResult, futures: &SendingFutures<'a>, statuses: &mut CurTransactionStatus,
async fn handle_response<'a>(
&'a self, response: SendingResult, futures: &mut SendingFutures<'a>, statuses: &mut CurTransactionStatus,
) {
match response {
Ok(dest) => self.handle_response_ok(&dest, futures, statuses),
Err((dest, e)) => Self::handle_response_err(dest, futures, statuses, &e),
Ok(dest) => self.handle_response_ok(&dest, futures, statuses).await,
Err((dest, e)) => Self::handle_response_err(dest, statuses, &e),
};
}
fn handle_response_err(
dest: Destination, _futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus, e: &Error,
) {
fn handle_response_err(dest: Destination, statuses: &mut CurTransactionStatus, e: &Error) {
debug!(dest = ?dest, "{e:?}");
statuses.entry(dest).and_modify(|e| {
*e = match e {
@ -94,39 +89,40 @@ impl Service {
});
}
fn handle_response_ok<'a>(
&'a self, dest: &Destination, futures: &SendingFutures<'a>, statuses: &mut CurTransactionStatus,
#[allow(clippy::needless_pass_by_ref_mut)]
async fn handle_response_ok<'a>(
&'a self, dest: &Destination, futures: &mut SendingFutures<'a>, statuses: &mut CurTransactionStatus,
) {
let _cork = self.db.db.cork();
self.db
.delete_all_active_requests_for(dest)
.expect("all active requests deleted");
self.db.delete_all_active_requests_for(dest).await;
// Find events that have been added since starting the last request
let new_events = self
.db
.queued_requests(dest)
.filter_map(Result::ok)
.take(DEQUEUE_LIMIT)
.collect::<Vec<_>>();
.collect::<Vec<_>>()
.await;
// Insert any pdus we found
if !new_events.is_empty() {
self.db
.mark_as_active(&new_events)
.expect("marked as active");
let new_events_vec = new_events.into_iter().map(|(event, _)| event).collect();
futures.push(Box::pin(self.send_events(dest.clone(), new_events_vec)));
self.db.mark_as_active(&new_events);
let new_events_vec = new_events.into_iter().map(|(_, event)| event).collect();
futures.push(self.send_events(dest.clone(), new_events_vec).boxed());
} else {
statuses.remove(dest);
}
}
fn handle_request<'a>(&'a self, msg: Msg, futures: &SendingFutures<'a>, statuses: &mut CurTransactionStatus) {
let iv = vec![(msg.event, msg.queue_id)];
if let Ok(Some(events)) = self.select_events(&msg.dest, iv, statuses) {
#[allow(clippy::needless_pass_by_ref_mut)]
async fn handle_request<'a>(
&'a self, msg: Msg, futures: &mut SendingFutures<'a>, statuses: &mut CurTransactionStatus,
) {
let iv = vec![(msg.queue_id, msg.event)];
if let Ok(Some(events)) = self.select_events(&msg.dest, iv, statuses).await {
if !events.is_empty() {
futures.push(Box::pin(self.send_events(msg.dest, events)));
futures.push(self.send_events(msg.dest, events).boxed());
} else {
statuses.remove(&msg.dest);
}
@ -142,7 +138,7 @@ impl Service {
tokio::select! {
() = sleep_until(deadline.into()) => break,
response = futures.next() => match response {
Some(response) => self.handle_response(response, futures, statuses),
Some(response) => self.handle_response(response, futures, statuses).await,
None => return,
}
}
@ -151,16 +147,17 @@ impl Service {
debug_warn!("Leaving with {} unfinished requests...", futures.len());
}
fn initial_requests<'a>(&'a self, futures: &SendingFutures<'a>, statuses: &mut CurTransactionStatus) {
#[allow(clippy::needless_pass_by_ref_mut)]
async fn initial_requests<'a>(&'a self, futures: &mut SendingFutures<'a>, statuses: &mut CurTransactionStatus) {
let keep = usize::try_from(self.server.config.startup_netburst_keep).unwrap_or(usize::MAX);
let mut txns = HashMap::<Destination, Vec<SendingEvent>>::new();
for (key, dest, event) in self.db.active_requests().filter_map(Result::ok) {
let mut active = self.db.active_requests().boxed();
while let Some((key, event, dest)) = active.next().await {
let entry = txns.entry(dest.clone()).or_default();
if self.server.config.startup_netburst_keep >= 0 && entry.len() >= keep {
warn!("Dropping unsent event {:?} {:?}", dest, String::from_utf8_lossy(&key));
self.db
.delete_active_request(&key)
.expect("active request deleted");
warn!("Dropping unsent event {dest:?} {:?}", String::from_utf8_lossy(&key));
self.db.delete_active_request(&key);
} else {
entry.push(event);
}
@ -169,16 +166,16 @@ impl Service {
for (dest, events) in txns {
if self.server.config.startup_netburst && !events.is_empty() {
statuses.insert(dest.clone(), TransactionStatus::Running);
futures.push(Box::pin(self.send_events(dest.clone(), events)));
futures.push(self.send_events(dest.clone(), events).boxed());
}
}
}
#[tracing::instrument(skip_all, level = "debug")]
fn select_events(
async fn select_events(
&self,
dest: &Destination,
new_events: Vec<(SendingEvent, Vec<u8>)>, // Events we want to send: event and full key
new_events: Vec<QueueItem>, // Events we want to send: event and full key
statuses: &mut CurTransactionStatus,
) -> Result<Option<Vec<SendingEvent>>> {
let (allow, retry) = self.select_events_current(dest.clone(), statuses)?;
@ -195,8 +192,8 @@ impl Service {
if retry {
self.db
.active_requests_for(dest)
.filter_map(Result::ok)
.for_each(|(_, e)| events.push(e));
.ready_for_each(|(_, e)| events.push(e))
.await;
return Ok(Some(events));
}
@ -204,17 +201,17 @@ impl Service {
// Compose the next transaction
let _cork = self.db.db.cork();
if !new_events.is_empty() {
self.db.mark_as_active(&new_events)?;
for (e, _) in new_events {
self.db.mark_as_active(&new_events);
for (_, e) in new_events {
events.push(e);
}
}
// Add EDU's into the transaction
if let Destination::Normal(server_name) = dest {
if let Ok((select_edus, last_count)) = self.select_edus(server_name) {
if let Ok((select_edus, last_count)) = self.select_edus(server_name).await {
events.extend(select_edus.into_iter().map(SendingEvent::Edu));
self.db.set_latest_educount(server_name, last_count)?;
self.db.set_latest_educount(server_name, last_count);
}
}
@ -248,26 +245,32 @@ impl Service {
}
#[tracing::instrument(skip_all, level = "debug")]
fn select_edus(&self, server_name: &ServerName) -> Result<(Vec<Vec<u8>>, u64)> {
async fn select_edus(&self, server_name: &ServerName) -> Result<(Vec<Vec<u8>>, u64)> {
// u64: count of last edu
let since = self.db.get_latest_educount(server_name)?;
let since = self.db.get_latest_educount(server_name).await;
let mut events = Vec::new();
let mut max_edu_count = since;
let mut device_list_changes = HashSet::new();
for room_id in self.services.state_cache.server_rooms(server_name) {
let room_id = room_id?;
let server_rooms = self.services.state_cache.server_rooms(server_name);
pin_mut!(server_rooms);
while let Some(room_id) = server_rooms.next().await {
// Look for device list updates in this room
device_list_changes.extend(
self.services
.users
.keys_changed(room_id.as_ref(), since, None)
.filter_map(Result::ok)
.filter(|user_id| self.services.globals.user_is_local(user_id)),
.keys_changed(room_id.as_str(), since, None)
.ready_filter(|user_id| self.services.globals.user_is_local(user_id))
.map(ToOwned::to_owned)
.collect::<Vec<_>>()
.await,
);
if self.server.config.allow_outgoing_read_receipts
&& !self.select_edus_receipts(&room_id, since, &mut max_edu_count, &mut events)?
&& !self
.select_edus_receipts(room_id, since, &mut max_edu_count, &mut events)
.await?
{
break;
}
@ -290,19 +293,22 @@ impl Service {
}
if self.server.config.allow_outgoing_presence {
self.select_edus_presence(server_name, since, &mut max_edu_count, &mut events)?;
self.select_edus_presence(server_name, since, &mut max_edu_count, &mut events)
.await?;
}
Ok((events, max_edu_count))
}
/// Look for presence
fn select_edus_presence(
async fn select_edus_presence(
&self, server_name: &ServerName, since: u64, max_edu_count: &mut u64, events: &mut Vec<Vec<u8>>,
) -> Result<bool> {
// Look for presence updates for this server
let presence_since = self.services.presence.presence_since(since);
pin_mut!(presence_since);
let mut presence_updates = Vec::new();
for (user_id, count, presence_bytes) in self.services.presence.presence_since(since) {
while let Some((user_id, count, presence_bytes)) = presence_since.next().await {
*max_edu_count = cmp::max(count, *max_edu_count);
if !self.services.globals.user_is_local(&user_id) {
@ -312,7 +318,8 @@ impl Service {
if !self
.services
.state_cache
.server_sees_user(server_name, &user_id)?
.server_sees_user(server_name, &user_id)
.await
{
continue;
}
@ -320,7 +327,9 @@ impl Service {
let presence_event = self
.services
.presence
.from_json_bytes_to_event(&presence_bytes, &user_id)?;
.from_json_bytes_to_event(&presence_bytes, &user_id)
.await?;
presence_updates.push(PresenceUpdate {
user_id,
presence: presence_event.content.presence,
@ -346,32 +355,33 @@ impl Service {
}
/// Look for read receipts in this room
fn select_edus_receipts(
async fn select_edus_receipts(
&self, room_id: &RoomId, since: u64, max_edu_count: &mut u64, events: &mut Vec<Vec<u8>>,
) -> Result<bool> {
for r in self
let receipts = self
.services
.read_receipt
.readreceipts_since(room_id, since)
{
let (user_id, count, read_receipt) = r?;
*max_edu_count = cmp::max(count, *max_edu_count);
.readreceipts_since(room_id, since);
pin_mut!(receipts);
while let Some((user_id, count, read_receipt)) = receipts.next().await {
*max_edu_count = cmp::max(count, *max_edu_count);
if !self.services.globals.user_is_local(&user_id) {
continue;
}
let event = serde_json::from_str(read_receipt.json().get())
.map_err(|_| Error::bad_database("Invalid edu event in read_receipts."))?;
let federation_event = if let AnySyncEphemeralRoomEvent::Receipt(r) = event {
let mut read = BTreeMap::new();
let (event_id, mut receipt) = r
.content
.0
.into_iter()
.next()
.expect("we only use one event per read receipt");
let receipt = receipt
.remove(&ReceiptType::Read)
.expect("our read receipts always set this")
@ -427,24 +437,17 @@ impl Service {
async fn send_events_dest_appservice(
&self, dest: &Destination, id: &str, events: Vec<SendingEvent>,
) -> SendingResult {
let mut pdu_jsons = Vec::new();
let Some(appservice) = self.services.appservice.get_registration(id).await else {
return Err((dest.clone(), err!(Database(warn!(?id, "Missing appservice registration")))));
};
let mut pdu_jsons = Vec::new();
for event in &events {
match event {
SendingEvent::Pdu(pdu_id) => {
pdu_jsons.push(
self.services
.timeline
.get_pdu_from_id(pdu_id)
.map_err(|e| (dest.clone(), e))?
.ok_or_else(|| {
(
dest.clone(),
Error::bad_database("[Appservice] Event in servernameevent_data not found in db."),
)
})?
.to_room_event(),
);
if let Ok(pdu) = self.services.timeline.get_pdu_from_id(pdu_id).await {
pdu_jsons.push(pdu.to_room_event());
}
},
SendingEvent::Edu(_) | SendingEvent::Flush => {
// Appservices don't need EDUs (?) and flush only;
@ -453,32 +456,24 @@ impl Service {
}
}
let txn_id = &*general_purpose::URL_SAFE_NO_PAD.encode(calculate_hash(
&events
.iter()
.map(|e| match e {
SendingEvent::Edu(b) | SendingEvent::Pdu(b) => &**b,
SendingEvent::Flush => &[],
})
.collect::<Vec<_>>(),
));
//debug_assert!(!pdu_jsons.is_empty(), "sending empty transaction");
let client = &self.services.client.appservice;
match appservice::send_request(
client,
self.services
.appservice
.get_registration(id)
.await
.ok_or_else(|| {
(
dest.clone(),
Error::bad_database("[Appservice] Could not load registration from db."),
)
})?,
appservice,
ruma::api::appservice::event::push_events::v1::Request {
events: pdu_jsons,
txn_id: (&*general_purpose::URL_SAFE_NO_PAD.encode(calculate_hash(
&events
.iter()
.map(|e| match e {
SendingEvent::Edu(b) | SendingEvent::Pdu(b) => &**b,
SendingEvent::Flush => &[],
})
.collect::<Vec<_>>(),
)))
.into(),
txn_id: txn_id.into(),
ephemeral: Vec::new(),
to_device: Vec::new(),
},
@ -494,23 +489,17 @@ impl Service {
async fn send_events_dest_push(
&self, dest: &Destination, userid: &OwnedUserId, pushkey: &str, events: Vec<SendingEvent>,
) -> SendingResult {
let mut pdus = Vec::new();
let Ok(pusher) = self.services.pusher.get_pusher(userid, pushkey).await else {
return Err((dest.clone(), err!(Database(error!(?userid, ?pushkey, "Missing pusher")))));
};
let mut pdus = Vec::new();
for event in &events {
match event {
SendingEvent::Pdu(pdu_id) => {
pdus.push(
self.services
.timeline
.get_pdu_from_id(pdu_id)
.map_err(|e| (dest.clone(), e))?
.ok_or_else(|| {
(
dest.clone(),
Error::bad_database("[Push] Event in servernameevent_data not found in db."),
)
})?,
);
if let Ok(pdu) = self.services.timeline.get_pdu_from_id(pdu_id).await {
pdus.push(pdu);
}
},
SendingEvent::Edu(_) | SendingEvent::Flush => {
// Push gateways don't need EDUs (?) and flush only;
@ -529,28 +518,22 @@ impl Service {
}
}
let Some(pusher) = self
.services
.pusher
.get_pusher(userid, pushkey)
.map_err(|e| (dest.clone(), e))?
else {
continue;
};
let rules_for_user = self
.services
.account_data
.get(None, userid, GlobalAccountDataEventType::PushRules.to_string().into())
.unwrap_or_default()
.and_then(|event| serde_json::from_str::<PushRulesEvent>(event.get()).ok())
.map_or_else(|| push::Ruleset::server_default(userid), |ev: PushRulesEvent| ev.content.global);
.await
.and_then(|event| serde_json::from_str::<PushRulesEvent>(event.get()).map_err(Into::into))
.map_or_else(
|_| push::Ruleset::server_default(userid),
|ev: PushRulesEvent| ev.content.global,
);
let unread: UInt = self
.services
.user
.notification_count(userid, &pdu.room_id)
.map_err(|e| (dest.clone(), e))?
.await
.try_into()
.expect("notification count can't go that high");
@ -559,7 +542,6 @@ impl Service {
.pusher
.send_push_notice(userid, unread, &pusher, rules_for_user, &pdu)
.await
.map(|_response| dest.clone())
.map_err(|e| (dest.clone(), e));
}
@ -586,21 +568,11 @@ impl Service {
for event in &events {
match event {
// TODO: check room version and remove event_id if needed
SendingEvent::Pdu(pdu_id) => pdu_jsons.push(
self.convert_to_outgoing_federation_event(
self.services
.timeline
.get_pdu_json_from_id(pdu_id)
.map_err(|e| (dest.clone(), e))?
.ok_or_else(|| {
error!(?dest, ?server, ?pdu_id, "event not found");
(
dest.clone(),
Error::bad_database("[Normal] Event in servernameevent_data not found in db."),
)
})?,
),
),
SendingEvent::Pdu(pdu_id) => {
if let Ok(pdu) = self.services.timeline.get_pdu_json_from_id(pdu_id).await {
pdu_jsons.push(self.convert_to_outgoing_federation_event(pdu).await);
}
},
SendingEvent::Edu(edu) => {
if let Ok(raw) = serde_json::from_slice(edu) {
edu_jsons.push(raw);
@ -647,7 +619,7 @@ impl Service {
}
/// This does not return a full `Pdu` it is only to satisfy ruma's types.
pub fn convert_to_outgoing_federation_event(&self, mut pdu_json: CanonicalJsonObject) -> Box<RawJsonValue> {
pub async fn convert_to_outgoing_federation_event(&self, mut pdu_json: CanonicalJsonObject) -> Box<RawJsonValue> {
if let Some(unsigned) = pdu_json
.get_mut("unsigned")
.and_then(|val| val.as_object_mut())
@ -660,7 +632,7 @@ impl Service {
.get("room_id")
.and_then(|val| RoomId::parse(val.as_str()?).ok())
{
match self.services.state.get_room_version(&room_id) {
match self.services.state.get_room_version(&room_id).await {
Ok(room_version_id) => match room_version_id {
RoomVersionId::V1 | RoomVersionId::V2 => {},
_ => _ = pdu_json.remove("event_id"),