Merge branch 'state-res-closure' into 'master'

Remove auth_cache using a closure to fetch events in state-res

See merge request famedly/conduit!108
This commit is contained in:
Timo Kösters 2021-07-02 10:56:21 +00:00
commit 5711467ad9
16 changed files with 587 additions and 487 deletions

View file

@ -1,8 +1,11 @@
use crate::{database::Config, utils, Error, Result};
use crate::{database::Config, utils, ConduitResult, Error, Result};
use log::{error, info};
use ruma::{
api::federation::discovery::{ServerSigningKeys, VerifyKey},
EventId, MilliSecondsSinceUnixEpoch, ServerName, ServerSigningKeyId,
api::{
client::r0::sync::sync_events,
federation::discovery::{ServerSigningKeys, VerifyKey},
},
DeviceId, EventId, MilliSecondsSinceUnixEpoch, ServerName, ServerSigningKeyId, UserId,
};
use rustls::{ServerCertVerifier, WebPKIVerifier};
use std::{
@ -35,6 +38,15 @@ pub struct Globals {
pub bad_event_ratelimiter: Arc<RwLock<BTreeMap<EventId, RateLimitState>>>,
pub bad_signature_ratelimiter: Arc<RwLock<BTreeMap<Vec<String>, RateLimitState>>>,
pub servername_ratelimiter: Arc<RwLock<BTreeMap<Box<ServerName>, Arc<Semaphore>>>>,
pub sync_receivers: RwLock<
BTreeMap<
(UserId, Box<DeviceId>),
(
Option<String>,
tokio::sync::watch::Receiver<Option<ConduitResult<sync_events::Response>>>,
), // since, rx
>,
>,
}
struct MatrixServerVerifier {
@ -155,6 +167,7 @@ impl Globals {
bad_event_ratelimiter: Arc::new(RwLock::new(BTreeMap::new())),
bad_signature_ratelimiter: Arc::new(RwLock::new(BTreeMap::new())),
servername_ratelimiter: Arc::new(RwLock::new(BTreeMap::new())),
sync_receivers: RwLock::new(BTreeMap::new()),
};
fs::create_dir_all(s.get_media_folder())?;

View file

@ -203,7 +203,7 @@ pub fn get_actions<'a>(
.rooms
.room_state_get(&pdu.room_id, &EventType::RoomPowerLevels, "")?
.map(|ev| {
serde_json::from_value(ev.content)
serde_json::from_value(ev.content.clone())
.map_err(|_| Error::bad_database("invalid m.room.power_levels event"))
})
.transpose()?

View file

@ -5,6 +5,7 @@ use member::MembershipState;
use crate::{pdu::PduBuilder, utils, Database, Error, PduEvent, Result};
use log::{debug, error, warn};
use lru_cache::LruCache;
use regex::Regex;
use ring::digest;
use ruma::{
@ -23,7 +24,7 @@ use std::{
collections::{BTreeMap, HashMap, HashSet},
convert::{TryFrom, TryInto},
mem,
sync::Arc,
sync::{Arc, RwLock},
};
use super::{abstraction::Tree, admin::AdminCommand, pusher};
@ -81,6 +82,8 @@ pub struct Rooms {
/// RoomId + EventId -> Parent PDU EventId.
pub(super) prevevent_parent: Arc<dyn Tree>,
pub(super) pdu_cache: RwLock<LruCache<EventId, Arc<PduEvent>>>,
}
impl Rooms {
@ -105,8 +108,8 @@ impl Rooms {
pub fn state_full(
&self,
shortstatehash: u64,
) -> Result<BTreeMap<(EventType, String), PduEvent>> {
Ok(self
) -> Result<BTreeMap<(EventType, String), Arc<PduEvent>>> {
let state = self
.stateid_shorteventid
.scan_prefix(shortstatehash.to_be_bytes().to_vec())
.map(|(_, bytes)| self.shorteventid_eventid.get(&bytes).ok().flatten())
@ -133,7 +136,9 @@ impl Rooms {
))
})
.filter_map(|r| r.ok())
.collect())
.collect();
Ok(state)
}
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
@ -179,7 +184,7 @@ impl Rooms {
shortstatehash: u64,
event_type: &EventType,
state_key: &str,
) -> Result<Option<PduEvent>> {
) -> Result<Option<Arc<PduEvent>>> {
self.state_get_id(shortstatehash, event_type, state_key)?
.map_or(Ok(None), |event_id| self.get_pdu(&event_id))
}
@ -234,7 +239,7 @@ impl Rooms {
let mut events = StateMap::new();
for (event_type, state_key) in auth_events {
if let Some(pdu) = self.room_state_get(room_id, &event_type, &state_key)? {
events.insert((event_type, state_key), Arc::new(pdu));
events.insert((event_type, state_key), pdu);
} else {
// This is okay because when creating a new room some events were not created yet
debug!(
@ -396,7 +401,7 @@ impl Rooms {
pub fn room_state_full(
&self,
room_id: &RoomId,
) -> Result<BTreeMap<(EventType, String), PduEvent>> {
) -> Result<BTreeMap<(EventType, String), Arc<PduEvent>>> {
if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? {
self.state_full(current_shortstatehash)
} else {
@ -426,7 +431,7 @@ impl Rooms {
room_id: &RoomId,
event_type: &EventType,
state_key: &str,
) -> Result<Option<PduEvent>> {
) -> Result<Option<Arc<PduEvent>>> {
if let Some(current_shortstatehash) = self.current_shortstatehash(room_id)? {
self.state_get(current_shortstatehash, event_type, state_key)
} else {
@ -514,21 +519,42 @@ impl Rooms {
/// Returns the pdu.
///
/// Checks the `eventid_outlierpdu` Tree if not found in the timeline.
pub fn get_pdu(&self, event_id: &EventId) -> Result<Option<PduEvent>> {
self.eventid_pduid
pub fn get_pdu(&self, event_id: &EventId) -> Result<Option<Arc<PduEvent>>> {
if let Some(p) = self.pdu_cache.write().unwrap().get_mut(&event_id) {
return Ok(Some(Arc::clone(p)));
}
if let Some(pdu) = self
.eventid_pduid
.get(event_id.as_bytes())?
.map_or_else::<Result<_>, _, _>(
|| self.eventid_outlierpdu.get(event_id.as_bytes()),
|| {
let r = self.eventid_outlierpdu.get(event_id.as_bytes());
r
},
|pduid| {
Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| {
let r = Ok(Some(self.pduid_pdu.get(&pduid)?.ok_or_else(|| {
Error::bad_database("Invalid pduid in eventid_pduid.")
})?))
})?));
r
},
)?
.map(|pdu| {
serde_json::from_slice(&pdu).map_err(|_| Error::bad_database("Invalid PDU in db."))
let r = serde_json::from_slice(&pdu)
.map_err(|_| Error::bad_database("Invalid PDU in db."))
.map(Arc::new);
r
})
.transpose()
.transpose()?
{
self.pdu_cache
.write()
.unwrap()
.insert(event_id.clone(), Arc::clone(&pdu));
Ok(Some(pdu))
} else {
Ok(None)
}
}
/// Returns the pdu.
@ -663,7 +689,7 @@ impl Rooms {
unsigned.insert(
"prev_content".to_owned(),
CanonicalJsonValue::Object(
utils::to_canonical_object(prev_state.content)
utils::to_canonical_object(prev_state.content.clone())
.expect("event is valid, we just created it"),
),
);
@ -1204,7 +1230,7 @@ impl Rooms {
let create_prev_event = if prev_events.len() == 1
&& Some(&prev_events[0]) == create_event.as_ref().map(|c| &c.event_id)
{
create_event.map(Arc::new)
create_event
} else {
None
};
@ -1235,10 +1261,10 @@ impl Rooms {
let mut unsigned = unsigned.unwrap_or_default();
if let Some(state_key) = &state_key {
if let Some(prev_pdu) = self.room_state_get(&room_id, &event_type, &state_key)? {
unsigned.insert("prev_content".to_owned(), prev_pdu.content);
unsigned.insert("prev_content".to_owned(), prev_pdu.content.clone());
unsigned.insert(
"prev_sender".to_owned(),
serde_json::to_value(prev_pdu.sender).expect("UserId::to_value always works"),
serde_json::to_value(&prev_pdu.sender).expect("UserId::to_value always works"),
);
}
}
@ -1583,7 +1609,7 @@ impl Rooms {
.and_then(|create| {
serde_json::from_value::<
Raw<ruma::events::room::create::CreateEventContent>,
>(create.content)
>(create.content.clone())
.expect("Raw::from_value always works")
.deserialize()
.ok()
@ -1764,7 +1790,8 @@ impl Rooms {
ErrorKind::BadState,
"Cannot leave a room you are not a member of.",
))?
.content,
.content
.clone(),
)
.expect("from_value::<Raw<..>> can never fail")
.deserialize()