messing around with arcs

This commit is contained in:
Timo Kösters 2022-10-05 15:33:57 +02:00 committed by Nyaaori
parent face766e0f
commit cff52d7ebb
No known key found for this signature in database
GPG key ID: E7819C3ED4D1F82E
77 changed files with 598 additions and 434 deletions

View file

@ -1,26 +1,25 @@
use std::collections::HashMap;
use ruma::{UserId, RoomId, events::{RoomAccountDataEventType, AnyEphemeralRoomEvent}, serde::Raw};
use serde::{Serialize, de::DeserializeOwned};
use crate::Result;
pub trait Data {
pub trait Data: Send + Sync {
/// Places one event in the account data of the user and removes the previous entry.
fn update<T: Serialize>(
fn update(
&self,
room_id: Option<&RoomId>,
user_id: &UserId,
event_type: RoomAccountDataEventType,
data: &T,
data: &serde_json::Value,
) -> Result<()>;
/// Searches the account data for a specific kind.
fn get<T: DeserializeOwned>(
fn get(
&self,
room_id: Option<&RoomId>,
user_id: &UserId,
kind: RoomAccountDataEventType,
) -> Result<Option<T>>;
) -> Result<Option<Box<serde_json::value::RawValue>>>;
/// Returns all changes to the account data that happened after `since`.
fn changes_since(

View file

@ -24,24 +24,24 @@ pub struct Service {
impl Service {
/// Places one event in the account data of the user and removes the previous entry.
#[tracing::instrument(skip(self, room_id, user_id, event_type, data))]
pub fn update<T: Serialize>(
pub fn update(
&self,
room_id: Option<&RoomId>,
user_id: &UserId,
event_type: RoomAccountDataEventType,
data: &T,
data: &serde_json::Value,
) -> Result<()> {
self.db.update(room_id, user_id, event_type, data)
}
/// Searches the account data for a specific kind.
#[tracing::instrument(skip(self, room_id, user_id, event_type))]
pub fn get<T: DeserializeOwned>(
pub fn get(
&self,
room_id: Option<&RoomId>,
user_id: &UserId,
event_type: RoomAccountDataEventType,
) -> Result<Option<T>> {
) -> Result<Option<Box<serde_json::value::RawValue>>> {
self.db.get(room_id, user_id, event_type)
}

View file

@ -28,7 +28,7 @@ use ruma::{
use serde_json::value::to_raw_value;
use tokio::sync::{mpsc, MutexGuard, RwLock, RwLockReadGuard};
use crate::{Result, services, Error, api::{server_server, client_server::AUTO_GEN_PASSWORD_LENGTH}, PduEvent, utils::{HtmlEscape, self}};
use crate::{Result, services, Error, api::{server_server, client_server::{AUTO_GEN_PASSWORD_LENGTH, leave_all_rooms}}, PduEvent, utils::{HtmlEscape, self}};
use super::pdu::PduBuilder;
@ -179,7 +179,8 @@ impl Service {
let conduit_room = services()
.rooms
.id_from_alias(
.alias
.resolve_local_alias(
format!("#admins:{}", services().globals.server_name())
.as_str()
.try_into()
@ -221,7 +222,7 @@ impl Service {
.roomid_mutex_state
.write()
.unwrap()
.entry(conduit_room.clone())
.entry(conduit_room.to_owned())
.or_default(),
);
@ -599,11 +600,11 @@ impl Service {
ruma::events::GlobalAccountDataEventType::PushRules
.to_string()
.into(),
&ruma::events::push_rules::PushRulesEvent {
&serde_json::to_value(ruma::events::push_rules::PushRulesEvent {
content: ruma::events::push_rules::PushRulesEventContent {
global: ruma::push::Ruleset::server_default(&user_id),
},
},
}).expect("to json value always works"),
)?;
// we dont add a device since we're not the user, just the creator
@ -614,12 +615,14 @@ impl Service {
))
}
AdminCommand::DisableRoom { room_id } => {
services().rooms.disabledroomids.insert(room_id.as_bytes(), &[])?;
RoomMessageEventContent::text_plain("Room disabled.")
todo!();
//services().rooms.disabledroomids.insert(room_id.as_bytes(), &[])?;
//RoomMessageEventContent::text_plain("Room disabled.")
}
AdminCommand::EnableRoom { room_id } => {
services().rooms.disabledroomids.remove(room_id.as_bytes())?;
RoomMessageEventContent::text_plain("Room enabled.")
todo!();
//services().rooms.disabledroomids.remove(room_id.as_bytes())?;
//RoomMessageEventContent::text_plain("Room enabled.")
}
AdminCommand::DeactivateUser {
leave_rooms,
@ -635,7 +638,7 @@ impl Service {
services().users.deactivate_account(&user_id)?;
if leave_rooms {
services().rooms.leave_all_rooms(&user_id).await?;
leave_all_rooms(&user_id).await?;
}
RoomMessageEventContent::text_plain(format!(
@ -694,7 +697,7 @@ impl Service {
if leave_rooms {
for &user_id in &user_ids {
let _ = services().rooms.leave_all_rooms(user_id).await;
let _ = leave_all_rooms(user_id).await;
}
}
@ -804,7 +807,7 @@ impl Service {
pub(crate) async fn create_admin_room(&self) -> Result<()> {
let room_id = RoomId::new(services().globals.server_name());
services().rooms.get_or_create_shortroomid(&room_id)?;
services().rooms.short.get_or_create_shortroomid(&room_id)?;
let mutex_state = Arc::clone(
services().globals

View file

@ -1,6 +1,6 @@
use crate::Result;
pub trait Data {
pub trait Data: Send + Sync {
/// Registers an appservice and returns the ID to the caller
fn register_appservice(&self, yaml: serde_yaml::Value) -> Result<String>;

View file

@ -1,10 +1,12 @@
mod data;
use std::sync::Arc;
pub use data::Data;
use crate::Result;
pub struct Service {
db: Box<dyn Data>,
db: Arc<dyn Data>,
}
impl Service {

View file

@ -1,8 +1,30 @@
use ruma::signatures::Ed25519KeyPair;
use std::collections::BTreeMap;
use async_trait::async_trait;
use ruma::{signatures::Ed25519KeyPair, DeviceId, UserId, ServerName, api::federation::discovery::{ServerSigningKeys, VerifyKey}, ServerSigningKeyId};
use crate::Result;
pub trait Data {
#[async_trait]
pub trait Data: Send + Sync {
fn next_count(&self) -> Result<u64>;
fn current_count(&self) -> Result<u64>;
async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result<()>;
fn cleanup(&self) -> Result<()>;
fn memory_usage(&self) -> Result<String>;
fn load_keypair(&self) -> Result<Ed25519KeyPair>;
fn remove_keypair(&self) -> Result<()>;
fn add_signing_key(
&self,
origin: &ServerName,
new_keys: ServerSigningKeys,
) -> Result<BTreeMap<Box<ServerSigningKeyId>, VerifyKey>>;
/// This returns an empty `Ok(BTreeMap<..>)` when there are no keys found for the server.
fn signing_keys_for(
&self,
origin: &ServerName,
) -> Result<BTreeMap<Box<ServerSigningKeyId>, VerifyKey>>;
fn database_version(&self) -> Result<u64>;
fn bump_database_version(&self, new_version: u64) -> Result<()>;
}

View file

@ -26,8 +26,6 @@ use tokio::sync::{broadcast, watch::Receiver, Mutex as TokioMutex, Semaphore};
use tracing::error;
use trust_dns_resolver::TokioAsyncResolver;
pub const COUNTER: &[u8] = b"c";
type WellKnownMap = HashMap<Box<ServerName>, (FedDest, String)>;
type TlsNameMap = HashMap<String, (Vec<IpAddr>, u16)>;
type RateLimitState = (Instant, u32); // Time if last failed try, number of failed tries
@ -198,16 +196,24 @@ impl Service {
#[tracing::instrument(skip(self))]
pub fn next_count(&self) -> Result<u64> {
utils::u64_from_bytes(&self.globals.increment(COUNTER)?)
.map_err(|_| Error::bad_database("Count has invalid bytes."))
self.db.next_count()
}
#[tracing::instrument(skip(self))]
pub fn current_count(&self) -> Result<u64> {
self.globals.get(COUNTER)?.map_or(Ok(0_u64), |bytes| {
utils::u64_from_bytes(&bytes)
.map_err(|_| Error::bad_database("Count has invalid bytes."))
})
self.db.current_count()
}
pub async fn watch(&self, user_id: &UserId, device_id: &DeviceId) -> Result<()> {
self.db.watch(user_id, device_id).await
}
pub fn cleanup(&self) -> Result<()> {
self.db.cleanup()
}
pub fn memory_usage(&self) -> Result<String> {
self.db.memory_usage()
}
pub fn server_name(&self) -> &ServerName {
@ -296,38 +302,7 @@ impl Service {
origin: &ServerName,
new_keys: ServerSigningKeys,
) -> Result<BTreeMap<Box<ServerSigningKeyId>, VerifyKey>> {
// Not atomic, but this is not critical
let signingkeys = self.server_signingkeys.get(origin.as_bytes())?;
let mut keys = signingkeys
.and_then(|keys| serde_json::from_slice(&keys).ok())
.unwrap_or_else(|| {
// Just insert "now", it doesn't matter
ServerSigningKeys::new(origin.to_owned(), MilliSecondsSinceUnixEpoch::now())
});
let ServerSigningKeys {
verify_keys,
old_verify_keys,
..
} = new_keys;
keys.verify_keys.extend(verify_keys.into_iter());
keys.old_verify_keys.extend(old_verify_keys.into_iter());
self.server_signingkeys.insert(
origin.as_bytes(),
&serde_json::to_vec(&keys).expect("serversigningkeys can be serialized"),
)?;
let mut tree = keys.verify_keys;
tree.extend(
keys.old_verify_keys
.into_iter()
.map(|old| (old.0, VerifyKey::new(old.1.key))),
);
Ok(tree)
self.db.add_signing_key(origin, new_keys)
}
/// This returns an empty `Ok(BTreeMap<..>)` when there are no keys found for the server.
@ -335,35 +310,15 @@ impl Service {
&self,
origin: &ServerName,
) -> Result<BTreeMap<Box<ServerSigningKeyId>, VerifyKey>> {
let signingkeys = self
.server_signingkeys
.get(origin.as_bytes())?
.and_then(|bytes| serde_json::from_slice(&bytes).ok())
.map(|keys: ServerSigningKeys| {
let mut tree = keys.verify_keys;
tree.extend(
keys.old_verify_keys
.into_iter()
.map(|old| (old.0, VerifyKey::new(old.1.key))),
);
tree
})
.unwrap_or_else(BTreeMap::new);
Ok(signingkeys)
self.db.signing_keys_for(origin)
}
pub fn database_version(&self) -> Result<u64> {
self.globals.get(b"version")?.map_or(Ok(0), |version| {
utils::u64_from_bytes(&version)
.map_err(|_| Error::bad_database("Database version id is invalid."))
})
self.db.database_version()
}
pub fn bump_database_version(&self, new_version: u64) -> Result<()> {
self.globals
.insert(b"version", &new_version.to_be_bytes())?;
Ok(())
self.db.bump_database_version(new_version)
}
pub fn get_media_folder(&self) -> PathBuf {

View file

@ -3,7 +3,7 @@ use std::collections::BTreeMap;
use ruma::{api::client::backup::{BackupAlgorithm, RoomKeyBackup, KeyBackupData}, serde::Raw, UserId, RoomId};
use crate::Result;
pub trait Data {
pub trait Data: Send + Sync {
fn create_backup(
&self,
user_id: &UserId,

View file

@ -1,7 +1,7 @@
use crate::Result;
pub trait Data {
fn create_file_metadata(&self, mxc: String, width: u32, height: u32, content_disposition: &Option<&str>, content_type: &Option<&str>) -> Result<Vec<u8>>;
pub trait Data: Send + Sync {
fn create_file_metadata(&self, mxc: String, width: u32, height: u32, content_disposition: Option<&str>, content_type: Option<&str>) -> Result<Vec<u8>>;
/// Returns content_disposition, content_type and the metadata key.
fn search_file_metadata(&self, mxc: String, width: u32, height: u32) -> Result<(Option<String>, Option<String>, Vec<u8>)>;

View file

@ -24,8 +24,8 @@ impl Service {
pub async fn create(
&self,
mxc: String,
content_disposition: &Option<&str>,
content_type: &Option<&str>,
content_disposition: Option<&str>,
content_type: Option<&str>,
file: &[u8],
) -> Result<()> {
// Width, Height = 0 if it's not a thumbnail
@ -42,8 +42,8 @@ impl Service {
pub async fn upload_thumbnail(
&self,
mxc: String,
content_disposition: &Option<&str>,
content_type: &Option<&str>,
content_disposition: Option<&str>,
content_type: Option<&str>,
width: u32,
height: u32,
file: &[u8],
@ -108,7 +108,7 @@ impl Service {
.thumbnail_properties(width, height)
.unwrap_or((0, 0, false)); // 0, 0 because that's the original file
if let Ok((content_disposition, content_type, key)) = self.db.search_file_metadata(mxc, width, height) {
if let Ok((content_disposition, content_type, key)) = self.db.search_file_metadata(mxc.clone(), width, height) {
// Using saved thumbnail
let path = services().globals.get_media_file(&key);
let mut file = Vec::new();
@ -119,7 +119,7 @@ impl Service {
content_type,
file: file.to_vec(),
}))
} else if let Ok((content_disposition, content_type, key)) = self.db.search_file_metadata(mxc, 0, 0) {
} else if let Ok((content_disposition, content_type, key)) = self.db.search_file_metadata(mxc.clone(), 0, 0) {
// Generate a thumbnail
let path = services().globals.get_media_file(&key);
let mut file = Vec::new();
@ -180,7 +180,7 @@ impl Service {
thumbnail.write_to(&mut thumbnail_bytes, image::ImageOutputFormat::Png)?;
// Save thumbnail in database so we don't have to generate it again next time
let thumbnail_key = self.db.create_file_metadata(mxc, width, height, content_disposition, content_type)?;
let thumbnail_key = self.db.create_file_metadata(mxc, width, height, content_disposition.as_deref(), content_type.as_deref())?;
let path = services().globals.get_media_file(&thumbnail_key);
let mut f = File::create(path).await?;

View file

@ -30,20 +30,20 @@ pub struct Services {
}
impl Services {
pub fn build<D: appservice::Data + pusher::Data + rooms::Data + transaction_ids::Data + uiaa::Data + users::Data + account_data::Data + globals::Data + key_backups::Data + media::Data>(db: Arc<D>) {
pub fn build<D: appservice::Data + pusher::Data + rooms::Data + transaction_ids::Data + uiaa::Data + users::Data + account_data::Data + globals::Data + key_backups::Data + media::Data>(db: Arc<D>) -> Self {
Self {
appservice: appservice::Service { db: Arc::clone(&db) },
pusher: appservice::Service { db: Arc::clone(&db) },
rooms: appservice::Service { db: Arc::clone(&db) },
transaction_ids: appservice::Service { db: Arc::clone(&db) },
uiaa: appservice::Service { db: Arc::clone(&db) },
users: appservice::Service { db: Arc::clone(&db) },
account_data: appservice::Service { db: Arc::clone(&db) },
admin: appservice::Service { db: Arc::clone(&db) },
globals: appservice::Service { db: Arc::clone(&db) },
key_backups: appservice::Service { db: Arc::clone(&db) },
media: appservice::Service { db: Arc::clone(&db) },
sending: appservice::Service { db: Arc::clone(&db) },
appservice: appservice::Service { db: db.clone() },
pusher: pusher::Service { db: db.clone() },
rooms: rooms::Service { db: Arc::clone(&db) },
transaction_ids: transaction_ids::Service { db: Arc::clone(&db) },
uiaa: uiaa::Service { db: Arc::clone(&db) },
users: users::Service { db: Arc::clone(&db) },
account_data: account_data::Service { db: Arc::clone(&db) },
admin: admin::Service { db: Arc::clone(&db) },
globals: globals::Service { db: Arc::clone(&db) },
key_backups: key_backups::Service { db: Arc::clone(&db) },
media: media::Service { db: Arc::clone(&db) },
sending: sending::Service { db: Arc::clone(&db) },
}
}
}

View file

@ -343,7 +343,7 @@ pub(crate) fn gen_event_id_canonical_json(
.and_then(|id| RoomId::parse(id.as_str()?).ok())
.ok_or_else(|| Error::bad_database("PDU in db has invalid room_id."))?;
let room_version_id = services().rooms.get_room_version(&room_id);
let room_version_id = services().rooms.state.get_room_version(&room_id);
let event_id = format!(
"${}",

View file

@ -1,7 +1,7 @@
use ruma::{UserId, api::client::push::{set_pusher, get_pushers}};
use crate::Result;
pub trait Data {
pub trait Data: Send + Sync {
fn set_pusher(&self, sender: &UserId, pusher: set_pusher::v3::Pusher) -> Result<()>;
fn get_pusher(&self, senderkey: &[u8]) -> Result<Option<get_pushers::v3::Pusher>>;

View file

@ -3,6 +3,7 @@ pub use data::Data;
use crate::{services, Error, PduEvent, Result};
use bytes::BytesMut;
use ruma::api::IncomingResponse;
use ruma::{
api::{
client::push::{get_pushers, set_pusher, PusherKind},
@ -20,11 +21,12 @@ use ruma::{
serde::Raw,
uint, RoomId, UInt, UserId,
};
use std::sync::Arc;
use std::{fmt::Debug, mem};
use tracing::{error, info, warn};
pub struct Service {
db: Box<dyn Data>,
db: Arc<dyn Data>,
}
impl Service {
@ -47,8 +49,9 @@ impl Service {
self.db.get_pusher_senderkeys(sender)
}
#[tracing::instrument(skip(destination, request))]
#[tracing::instrument(skip(self, destination, request))]
pub async fn send_request<T: OutgoingRequest>(
&self,
destination: &str,
request: T,
) -> Result<T::IncomingResponse>
@ -124,7 +127,7 @@ impl Service {
}
}
#[tracing::instrument(skip(user, unread, pusher, ruleset, pdu))]
#[tracing::instrument(skip(self, user, unread, pusher, ruleset, pdu))]
pub async fn send_push_notice(
&self,
user: &UserId,
@ -181,7 +184,7 @@ impl Service {
Ok(())
}
#[tracing::instrument(skip(user, ruleset, pdu))]
#[tracing::instrument(skip(self, user, ruleset, pdu))]
pub fn get_actions<'a>(
&self,
user: &UserId,
@ -204,7 +207,7 @@ impl Service {
Ok(ruleset.get_actions(pdu, &ctx))
}
#[tracing::instrument(skip(unread, pusher, tweaks, event))]
#[tracing::instrument(skip(self, unread, pusher, tweaks, event))]
async fn send_notice(
&self,
unread: UInt,

View file

@ -1,7 +1,7 @@
use ruma::{RoomId, RoomAliasId};
use crate::Result;
pub trait Data {
pub trait Data: Send + Sync {
/// Creates or updates the alias to the given room id.
fn set_alias(
&self,

View file

@ -1,7 +1,7 @@
use std::collections::HashSet;
use std::{collections::HashSet, sync::Arc};
use crate::Result;
pub trait Data {
fn get_cached_eventid_authchain(&self, shorteventid: u64) -> Result<Option<HashSet<u64>>>;
fn cache_eventid_authchain(&self, shorteventid: u64, auth_chain: &HashSet<u64>) -> Result<()>;
pub trait Data: Send + Sync {
fn get_cached_eventid_authchain(&self, shorteventid: &[u64]) -> Result<Option<Arc<HashSet<u64>>>>;
fn cache_auth_chain(&self, shorteventid: Vec<u64>, auth_chain: Arc<HashSet<u64>>) -> Result<()>;
}

View file

@ -15,41 +15,11 @@ impl Service {
&'a self,
key: &[u64],
) -> Result<Option<Arc<HashSet<u64>>>> {
// Check RAM cache
if let Some(result) = self.auth_chain_cache.lock().unwrap().get_mut(key.to_be_bytes()) {
return Ok(Some(Arc::clone(result)));
}
// We only save auth chains for single events in the db
if key.len() == 1 {
// Check DB cache
if let Some(chain) = self.db.get_cached_eventid_authchain(key[0])
{
let chain = Arc::new(chain);
// Cache in RAM
self.auth_chain_cache
.lock()
.unwrap()
.insert(vec![key[0]], Arc::clone(&chain));
return Ok(Some(chain));
}
}
Ok(None)
self.db.get_cached_eventid_authchain(key)
}
#[tracing::instrument(skip(self))]
pub fn cache_auth_chain(&self, key: Vec<u64>, auth_chain: Arc<HashSet<u64>>) -> Result<()> {
// Only persist single events in db
if key.len() == 1 {
self.db.cache_auth_chain(key[0], auth_chain)?;
}
// Cache in RAM
self.auth_chain_cache.lock().unwrap().insert(key, auth_chain);
Ok(())
self.db.cache_auth_chain(key, auth_chain)
}
}

View file

@ -1,7 +1,7 @@
use ruma::RoomId;
use crate::Result;
pub trait Data {
pub trait Data: Send + Sync {
/// Adds the room to the public room directory
fn set_public(&self, room_id: &RoomId) -> Result<()>;

View file

@ -3,7 +3,7 @@ use std::collections::HashMap;
use ruma::{UserId, RoomId, events::presence::PresenceEvent};
use crate::Result;
pub trait Data {
pub trait Data: Send + Sync {
/// Adds a presence event which will be saved until a new event replaces it.
///
/// Note: This method takes a RoomId because presence updates are always bound to rooms to

View file

@ -1,7 +1,7 @@
use ruma::{RoomId, events::receipt::ReceiptEvent, UserId, serde::Raw};
use crate::Result;
pub trait Data {
pub trait Data: Send + Sync {
/// Replaces the previous read receipt.
fn readreceipt_update(
&self,

View file

@ -2,7 +2,7 @@ use std::collections::HashSet;
use crate::Result;
use ruma::{UserId, RoomId};
pub trait Data {
pub trait Data: Send + Sync {
/// Sets a user as typing until the timeout timestamp is reached or roomtyping_remove is
/// called.
fn typing_add(&self, user_id: &UserId, room_id: &RoomId, timeout: u64) -> Result<()>;

View file

@ -117,7 +117,7 @@ impl Service {
room_id,
pub_key_map,
incoming_pdu.prev_events.clone(),
).await;
).await?;
let mut errors = 0;
for prev_id in dbg!(sorted_prev_events) {
@ -240,7 +240,7 @@ impl Service {
r
}
#[tracing::instrument(skip(create_event, value, pub_key_map))]
#[tracing::instrument(skip(self, create_event, value, pub_key_map))]
fn handle_outlier_pdu<'a>(
&self,
origin: &'a ServerName,
@ -272,7 +272,7 @@ impl Service {
RoomVersion::new(room_version_id).expect("room version is supported");
let mut val = match ruma::signatures::verify_event(
&*pub_key_map.read().map_err(|_| "RwLock is poisoned.")?,
&*pub_key_map.read().expect("RwLock is poisoned."),
&value,
room_version_id,
) {
@ -301,7 +301,7 @@ impl Service {
let incoming_pdu = serde_json::from_value::<PduEvent>(
serde_json::to_value(&val).expect("CanonicalJsonObj is a valid JsonValue"),
)
.map_err(|_| "Event is not a valid PDU.".to_owned())?;
.map_err(|_| Error::bad_database("Event is not a valid PDU."))?;
// 4. fetch any missing auth events doing all checks listed here starting at 1. These are not timeline events
// 5. Reject "due to auth events" if can't get all the auth events or some of the auth events are also rejected "due to auth events"
@ -329,7 +329,7 @@ impl Service {
// Build map of auth events
let mut auth_events = HashMap::new();
for id in &incoming_pdu.auth_events {
let auth_event = match services().rooms.get_pdu(id)? {
let auth_event = match services().rooms.timeline.get_pdu(id)? {
Some(e) => e,
None => {
warn!("Could not find auth event {}", id);
@ -373,7 +373,8 @@ impl Service {
&incoming_pdu,
None::<PduEvent>, // TODO: third party invite
|k, s| auth_events.get(&(k.to_string().into(), s.to_owned())),
)? {
).map_err(|_e| Error::BadRequest(ErrorKind::InvalidParam, "Auth check failed"))?
{
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Auth check failed",
@ -385,6 +386,7 @@ impl Service {
// 7. Persist the event as an outlier.
services()
.rooms
.outlier
.add_pdu_outlier(&incoming_pdu.event_id, &val)?;
info!("Added pdu as outlier.");
@ -393,7 +395,7 @@ impl Service {
})
}
#[tracing::instrument(skip(incoming_pdu, val, create_event, pub_key_map))]
#[tracing::instrument(skip(self, incoming_pdu, val, create_event, pub_key_map))]
pub async fn upgrade_outlier_to_timeline_pdu(
&self,
incoming_pdu: Arc<PduEvent>,
@ -412,7 +414,7 @@ impl Service {
.rooms
.pdu_metadata.is_event_soft_failed(&incoming_pdu.event_id)?
{
return Err("Event has been soft failed".into());
return Err(Error::BadRequest(ErrorKind::InvalidParam, "Event has been soft failed"));
}
info!("Upgrading {} to timeline pdu", incoming_pdu.event_id);
@ -1130,7 +1132,8 @@ impl Service {
room_id: &RoomId,
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, Base64>>>,
initial_set: Vec<Arc<EventId>>,
) -> Vec<(Arc<EventId>, HashMap<Arc<EventId>, (Arc<PduEvent>, BTreeMap<String, CanonicalJsonValue>)>)> {
) -> Result<(Vec<Arc<EventId>>, HashMap<Arc<EventId>,
(Arc<PduEvent>, BTreeMap<String, CanonicalJsonValue>)>)> {
let mut graph: HashMap<Arc<EventId>, _> = HashMap::new();
let mut eventid_info = HashMap::new();
let mut todo_outlier_stack: Vec<Arc<EventId>> = initial_set;
@ -1164,6 +1167,7 @@ impl Service {
if let Some(json) = json_opt.or_else(|| {
services()
.rooms
.outlier
.get_outlier_pdu_json(&prev_event_id)
.ok()
.flatten()
@ -1209,9 +1213,9 @@ impl Service {
.map_or_else(|| uint!(0), |info| info.0.origin_server_ts),
),
))
})?;
}).map_err(|_| Error::bad_database("Error sorting prev events"))?;
(sorted, eventid_info)
Ok((sorted, eventid_info))
}
#[tracing::instrument(skip_all)]

View file

@ -1,7 +1,7 @@
use ruma::{RoomId, DeviceId, UserId};
use crate::Result;
pub trait Data {
pub trait Data: Send + Sync {
fn lazy_load_was_sent_before(
&self,
user_id: &UserId,

View file

@ -1,6 +1,6 @@
use ruma::RoomId;
use crate::Result;
pub trait Data {
pub trait Data: Send + Sync {
fn exists(&self, room_id: &RoomId) -> Result<bool>;
}

View file

@ -2,7 +2,7 @@ use ruma::{signatures::CanonicalJsonObject, EventId};
use crate::{PduEvent, Result};
pub trait Data {
pub trait Data: Send + Sync {
fn get_outlier_pdu_json(&self, event_id: &EventId) -> Result<Option<CanonicalJsonObject>>;
fn get_outlier_pdu(&self, event_id: &EventId) -> Result<Option<PduEvent>>;
fn add_pdu_outlier(&self, event_id: &EventId, pdu: &CanonicalJsonObject) -> Result<()>;

View file

@ -3,7 +3,7 @@ use std::sync::Arc;
use ruma::{EventId, RoomId};
use crate::Result;
pub trait Data {
pub trait Data: Send + Sync {
fn mark_as_referenced(&self, room_id: &RoomId, event_ids: &[Arc<EventId>]) -> Result<()>;
fn is_event_referenced(&self, room_id: &RoomId, event_id: &EventId) -> Result<bool>;
fn mark_event_soft_failed(&self, event_id: &EventId) -> Result<()>;

View file

@ -1,7 +1,7 @@
use ruma::RoomId;
use crate::Result;
pub trait Data {
pub trait Data: Send + Sync {
fn index_pdu<'a>(&self, shortroomid: u64, pdu_id: &[u8], message_body: String) -> Result<()>;
fn search_pdus<'a>(

View file

@ -1,2 +1,2 @@
pub trait Data {
pub trait Data: Send + Sync {
}

View file

@ -1,9 +1,10 @@
use std::sync::Arc;
use std::{sync::MutexGuard, collections::HashSet};
use std::collections::HashSet;
use crate::Result;
use ruma::{EventId, RoomId};
use tokio::sync::MutexGuard;
pub trait Data {
pub trait Data: Send + Sync {
/// Returns the last state hash key added to the db for the given room.
fn get_room_shortstatehash(&self, room_id: &RoomId) -> Result<Option<u64>>;
@ -21,7 +22,7 @@ pub trait Data {
/// Replace the forward extremities of the room.
fn set_forward_extremities<'a>(&self,
room_id: &RoomId,
event_ids: &dyn Iterator<Item = &'a EventId>,
event_ids: &mut dyn Iterator<Item = &'a EventId>,
_mutex_lock: &MutexGuard<'_, ()>, // Take mutex guard to make sure users get the room state mutex
) -> Result<()>;
}

View file

@ -16,7 +16,7 @@ pub struct Service {
impl Service {
/// Set the room to the given statehash and update caches.
pub fn force_state(
pub async fn force_state(
&self,
room_id: &RoomId,
shortstatehash: u64,
@ -28,7 +28,7 @@ impl Service {
.roomid_mutex_state
.write()
.unwrap()
.entry(body.room_id.to_owned())
.entry(room_id.to_owned())
.or_default(),
);
let state_lock = mutex_state.lock().await;
@ -74,10 +74,10 @@ impl Service {
Err(_) => continue,
};
services().room.state_cache.update_membership(room_id, &user_id, membership, &pdu.sender, None, false)?;
services().rooms.state_cache.update_membership(room_id, &user_id, membership, &pdu.sender, None, false)?;
}
services().room.state_cache.update_joined_count(room_id)?;
services().rooms.state_cache.update_joined_count(room_id)?;
self.db.set_room_state(room_id, shortstatehash, &state_lock);

View file

@ -6,7 +6,7 @@ use ruma::{EventId, events::StateEventType, RoomId};
use crate::{Result, PduEvent};
#[async_trait]
pub trait Data {
pub trait Data: Send + Sync {
/// Builds a StateMap by iterating over all keys that start
/// with state_hash, this gives the full state for the given state_hash.
async fn state_full_ids(&self, shortstatehash: u64) -> Result<BTreeMap<u64, Arc<EventId>>>;

View file

@ -1,7 +1,7 @@
use ruma::{UserId, RoomId, serde::Raw, events::AnyStrippedStateEvent};
use crate::Result;
pub trait Data {
pub trait Data: Send + Sync {
fn mark_as_once_joined(&self, user_id: &UserId, room_id: &RoomId) -> Result<()>;
fn mark_as_joined(&self, user_id: &UserId, room_id: &RoomId) -> Result<()>;
fn mark_as_invited(&self, user_id: &UserId, room_id: &RoomId, last_state: Option<Vec<Raw<AnyStrippedStateEvent>>>) -> Result<()>;

View file

@ -9,7 +9,7 @@ pub struct StateDiff {
pub removed: HashSet<CompressedStateEvent>,
}
pub trait Data {
pub trait Data: Send + Sync {
fn get_statediff(&self, shortstatehash: u64) -> Result<StateDiff>;
fn save_statediff(&self, shortstatehash: u64, diff: StateDiff) -> Result<()>;
}

View file

@ -4,7 +4,7 @@ use ruma::{signatures::CanonicalJsonObject, EventId, UserId, RoomId};
use crate::{Result, PduEvent};
pub trait Data {
pub trait Data: Send + Sync {
fn last_timeline_count(&self, sender_user: &UserId, room_id: &RoomId) -> Result<u64>;
/// Returns the `count` of this pdu's id.

View file

@ -1,7 +1,7 @@
use ruma::{UserId, RoomId};
use crate::Result;
pub trait Data {
pub trait Data: Send + Sync {
fn reset_notification_counts(&self, user_id: &UserId, room_id: &RoomId) -> Result<()>;
fn notification_count(&self, user_id: &UserId, room_id: &RoomId) -> Result<u64>;

View file

@ -1,7 +1,7 @@
use ruma::{DeviceId, UserId, TransactionId};
use crate::Result;
pub trait Data {
pub trait Data: Send + Sync {
fn add_txnid(
&self,
user_id: &UserId,

View file

@ -1,7 +1,7 @@
use ruma::{api::client::uiaa::UiaaInfo, DeviceId, UserId, signatures::CanonicalJsonValue};
use crate::Result;
pub trait Data {
pub trait Data: Send + Sync {
fn set_uiaa_request(
&self,
user_id: &UserId,

View file

@ -223,18 +223,18 @@ impl Service {
self.db.get_device_keys(user_id, device_id)
}
pub fn get_master_key<F: Fn(&UserId) -> bool>(
pub fn get_master_key(
&self,
user_id: &UserId,
allowed_signatures: F,
allowed_signatures: &dyn Fn(&UserId) -> bool,
) -> Result<Option<Raw<CrossSigningKey>>> {
self.db.get_master_key(user_id, allowed_signatures)
}
pub fn get_self_signing_key<F: Fn(&UserId) -> bool>(
pub fn get_self_signing_key(
&self,
user_id: &UserId,
allowed_signatures: F,
allowed_signatures: &dyn Fn(&UserId) -> bool,
) -> Result<Option<Raw<CrossSigningKey>>> {
self.db.get_self_signing_key(user_id, allowed_signatures)
}