improvement: more efficient /sync with gaps

This commit is contained in:
Timo Kösters 2020-09-17 14:44:47 +02:00
parent 506c2a3146
commit ea3aaa6b5c
No known key found for this signature in database
GPG key ID: 24DA7517711A2BA4
10 changed files with 251 additions and 299 deletions

View file

@ -31,7 +31,7 @@ use std::{
///
/// This is created when a state group is added to the database by
/// hashing the entire state.
pub type StateHashId = Vec<u8>;
pub type StateHashId = IVec;
#[derive(Clone)]
pub struct Rooms {
@ -100,7 +100,7 @@ impl StateStore for Rooms {
impl Rooms {
/// Builds a StateMap by iterating over all keys that start
/// with state_hash, this gives the full state for the given state_hash.
pub fn state_full(&self, state_hash: StateHashId) -> Result<StateMap<EventId>> {
pub fn state_full(&self, state_hash: &StateHashId) -> Result<StateMap<PduEvent>> {
self.stateid_pduid
.scan_prefix(&state_hash)
.values()
@ -115,61 +115,87 @@ impl Rooms {
})
.map(|pdu| {
let pdu = pdu?;
Ok(((pdu.kind, pdu.state_key), pdu.event_id))
Ok((
(
pdu.kind.clone(),
pdu.state_key
.as_ref()
.ok_or_else(|| Error::bad_database("State event has no state key."))?
.clone(),
),
pdu,
))
})
.collect::<Result<StateMap<_>>>()
}
// TODO make this return Result
/// Fetches the previous StateHash ID to `current`.
pub fn prev_state_hash(&self, current: StateHashId) -> Option<StateHashId> {
let mut found = false;
for pair in self.pduid_statehash.iter().rev() {
let prev = pair.ok()?.1;
if current == prev.as_ref() {
found = true;
}
if current != prev.as_ref() && found {
return Some(prev.to_vec());
}
/// Returns all state entries for this type.
pub fn state_type(
&self,
state_hash: &StateHashId,
event_type: &EventType,
) -> Result<HashMap<String, PduEvent>> {
let mut prefix = state_hash.to_vec();
prefix.push(0xff);
prefix.extend_from_slice(&event_type.to_string().as_bytes());
prefix.push(0xff);
let mut hashmap = HashMap::new();
for pdu in self
.stateid_pduid
.scan_prefix(&prefix)
.values()
.map(|pdu_id| {
Ok::<_, Error>(
serde_json::from_slice::<PduEvent>(&self.pduid_pdu.get(pdu_id?)?.ok_or_else(
|| Error::bad_database("PDU in state not found in database."),
)?)
.map_err(|_| Error::bad_database("Invalid PDU bytes in room state."))?,
)
})
{
let pdu = pdu?;
let state_key = pdu.state_key.clone().ok_or_else(|| {
Error::bad_database("Room state contains event without state_key.")
})?;
hashmap.insert(state_key, pdu);
}
None
Ok(hashmap)
}
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
pub fn state_get(
&self,
state_hash: &StateHashId,
event_type: &EventType,
state_key: &str,
) -> Result<Option<PduEvent>> {
let mut key = state_hash.to_vec();
key.push(0xff);
key.extend_from_slice(&event_type.to_string().as_bytes());
key.push(0xff);
key.extend_from_slice(&state_key.as_bytes());
self.stateid_pduid.get(&key)?.map_or(Ok(None), |pdu_id| {
Ok::<_, Error>(Some(
serde_json::from_slice::<PduEvent>(
&self.pduid_pdu.get(pdu_id)?.ok_or_else(|| {
Error::bad_database("PDU in state not found in database.")
})?,
)
.map_err(|_| Error::bad_database("Invalid PDU bytes in room state."))?,
))
})
}
/// Returns the last state hash key added to the db.
pub fn pdu_state_hash(&self, pdu_id: &[u8]) -> Result<Option<StateHashId>> {
Ok(self.pduid_statehash.get(pdu_id)?)
}
/// Returns the last state hash key added to the db.
pub fn current_state_hash(&self, room_id: &RoomId) -> Result<Option<StateHashId>> {
Ok(self
.roomid_statehash
.get(room_id.as_bytes())?
.map(|bytes| bytes.to_vec()))
}
/// This fetches auth event_ids from the current state using the
/// full `roomstateid_pdu` tree.
pub fn get_auth_event_ids(
&self,
room_id: &RoomId,
kind: &EventType,
sender: &UserId,
state_key: Option<&str>,
content: serde_json::Value,
) -> Result<Vec<EventId>> {
let auth_events = state_res::auth_types_for_event(
kind.clone(),
sender,
state_key.map(|s| s.to_string()),
content,
);
let mut events = vec![];
for (event_type, state_key) in auth_events {
if let Some(state_key) = state_key.as_ref() {
if let Some(id) = self.room_state_get(room_id, &event_type, state_key)? {
events.push(id.event_id);
}
}
}
Ok(events)
Ok(self.roomid_statehash.get(room_id.as_bytes())?)
}
/// This fetches auth events from the current state.
@ -190,10 +216,8 @@ impl Rooms {
let mut events = StateMap::new();
for (event_type, state_key) in auth_events {
if let Some(s_key) = state_key.as_ref() {
if let Some(pdu) = self.room_state_get(room_id, &event_type, s_key)? {
events.insert((event_type, state_key), pdu);
}
if let Some(pdu) = self.room_state_get(room_id, &event_type, &state_key)? {
events.insert((event_type, state_key), pdu);
}
}
Ok(events)
@ -206,7 +230,7 @@ impl Rooms {
// We only hash the pdu's event ids, not the whole pdu
let bytes = pdu_id_bytes.join(&0xff);
let hash = digest::digest(&digest::SHA256, &bytes);
Ok(hash.as_ref().to_vec())
Ok(hash.as_ref().into())
}
/// Checks if a room exists.
@ -230,7 +254,7 @@ impl Rooms {
) -> Result<()> {
let state_hash =
self.calculate_hash(&state.values().map(|pdu_id| &**pdu_id).collect::<Vec<_>>())?;
let mut prefix = state_hash.clone();
let mut prefix = state_hash.to_vec();
prefix.push(0xff);
for ((event_type, state_key), pdu_id) in state {
@ -248,41 +272,11 @@ impl Rooms {
}
/// Returns the full room state.
pub fn room_state_full(
&self,
room_id: &RoomId,
) -> Result<HashMap<(EventType, String), PduEvent>> {
pub fn room_state_full(&self, room_id: &RoomId) -> Result<StateMap<PduEvent>> {
if let Some(current_state_hash) = self.current_state_hash(room_id)? {
let mut prefix = current_state_hash;
prefix.push(0xff);
let mut hashmap = HashMap::new();
for pdu in self
.stateid_pduid
.scan_prefix(prefix)
.values()
.map(|pdu_id| {
Ok::<_, Error>(
serde_json::from_slice::<PduEvent>(
&self.pduid_pdu.get(pdu_id?)?.ok_or_else(|| {
Error::bad_database("PDU in state not found in database.")
})?,
)
.map_err(|_| {
Error::bad_database("Invalid PDU bytes in current room state.")
})?,
)
})
{
let pdu = pdu?;
let state_key = pdu.state_key.clone().ok_or_else(|| {
Error::bad_database("Room state contains event without state_key.")
})?;
hashmap.insert((pdu.kind.clone(), state_key), pdu);
}
Ok(hashmap)
self.state_full(&current_state_hash)
} else {
Ok(HashMap::new())
Ok(BTreeMap::new())
}
}
@ -293,36 +287,7 @@ impl Rooms {
event_type: &EventType,
) -> Result<HashMap<String, PduEvent>> {
if let Some(current_state_hash) = self.current_state_hash(room_id)? {
let mut prefix = current_state_hash;
prefix.push(0xff);
prefix.extend_from_slice(&event_type.to_string().as_bytes());
prefix.push(0xff);
let mut hashmap = HashMap::new();
for pdu in self
.stateid_pduid
.scan_prefix(&prefix)
.values()
.map(|pdu_id| {
Ok::<_, Error>(
serde_json::from_slice::<PduEvent>(
&self.pduid_pdu.get(pdu_id?)?.ok_or_else(|| {
Error::bad_database("PDU in state not found in database.")
})?,
)
.map_err(|_| {
Error::bad_database("Invalid PDU bytes in current room state.")
})?,
)
})
{
let pdu = pdu?;
let state_key = pdu.state_key.clone().ok_or_else(|| {
Error::bad_database("Room state contains event without state_key.")
})?;
hashmap.insert(state_key, pdu);
}
Ok(hashmap)
self.state_type(&current_state_hash, event_type)
} else {
Ok(HashMap::new())
}
@ -336,20 +301,7 @@ impl Rooms {
state_key: &str,
) -> Result<Option<PduEvent>> {
if let Some(current_state_hash) = self.current_state_hash(room_id)? {
let mut key = current_state_hash;
key.push(0xff);
key.extend_from_slice(&event_type.to_string().as_bytes());
key.push(0xff);
key.extend_from_slice(&state_key.as_bytes());
self.stateid_pduid.get(&key)?.map_or(Ok(None), |pdu_id| {
Ok::<_, Error>(Some(
serde_json::from_slice::<PduEvent>(&self.pduid_pdu.get(pdu_id)?.ok_or_else(
|| Error::bad_database("PDU in state not found in database."),
)?)
.map_err(|_| Error::bad_database("Invalid PDU bytes in current room state."))?,
))
})
self.state_get(&current_state_hash, event_type, state_key)
} else {
Ok(None)
}
@ -562,14 +514,15 @@ impl Rooms {
/// This adds all current state events (not including the incoming event)
/// to `stateid_pduid` and adds the incoming event to `pduid_statehash`.
/// The incoming event is the `pdu_id` passed to this method.
fn append_to_state(&self, new_pdu_id: &[u8], new_pdu: &PduEvent) -> Result<StateHashId> {
pub fn append_to_state(&self, new_pdu_id: &[u8], new_pdu: &PduEvent) -> Result<StateHashId> {
let old_state =
if let Some(old_state_hash) = self.roomid_statehash.get(new_pdu.room_id.as_bytes())? {
// Store state for event. The state does not include the event itself.
// Instead it's the state before the pdu, so the room's old state.
self.pduid_statehash.insert(new_pdu_id, &old_state_hash)?;
self.pduid_statehash
.insert(dbg!(new_pdu_id), &old_state_hash)?;
if new_pdu.state_key.is_none() {
return Ok(old_state_hash.to_vec());
return Ok(old_state_hash);
}
let mut prefix = old_state_hash.to_vec();
@ -841,9 +794,7 @@ impl Rooms {
let pdu_id = self.append_pdu(&pdu, &pdu_json, globals, account_data)?;
if pdu.state_key.is_some() {
self.append_to_state(&pdu_id, &pdu)?;
}
self.append_to_state(&pdu_id, &pdu)?;
for server in self
.room_servers(room_id)
@ -905,7 +856,7 @@ impl Rooms {
user_id: &UserId,
room_id: &RoomId,
until: u64,
) -> impl Iterator<Item = Result<(u64, PduEvent)>> {
) -> impl Iterator<Item = Result<(IVec, PduEvent)>> {
// Create the first part of the full pdu id
let mut prefix = room_id.to_string().as_bytes().to_vec();
prefix.push(0xff);
@ -916,23 +867,18 @@ impl Rooms {
let current: &[u8] = &current;
let user_id = user_id.clone();
let prefixlen = prefix.len();
self.pduid_pdu
.range(..current)
.rev()
.filter_map(|r| r.ok())
.take_while(move |(k, _)| k.starts_with(&prefix))
.map(move |(k, v)| {
.map(move |(pdu_id, v)| {
let mut pdu = serde_json::from_slice::<PduEvent>(&v)
.map_err(|_| Error::bad_database("PDU in db is invalid."))?;
if pdu.sender != user_id {
pdu.unsigned.remove("transaction_id");
}
Ok((
utils::u64_from_bytes(&k[prefixlen..])
.map_err(|_| Error::bad_database("Invalid pdu id in db."))?,
pdu,
))
Ok((pdu_id, pdu))
})
}
@ -943,7 +889,7 @@ impl Rooms {
user_id: &UserId,
room_id: &RoomId,
from: u64,
) -> impl Iterator<Item = Result<(u64, PduEvent)>> {
) -> impl Iterator<Item = Result<(IVec, PduEvent)>> {
// Create the first part of the full pdu id
let mut prefix = room_id.to_string().as_bytes().to_vec();
prefix.push(0xff);
@ -954,22 +900,17 @@ impl Rooms {
let current: &[u8] = &current;
let user_id = user_id.clone();
let prefixlen = prefix.len();
self.pduid_pdu
.range(current..)
.filter_map(|r| r.ok())
.take_while(move |(k, _)| k.starts_with(&prefix))
.map(move |(k, v)| {
.map(move |(pdu_id, v)| {
let mut pdu = serde_json::from_slice::<PduEvent>(&v)
.map_err(|_| Error::bad_database("PDU in db is invalid."))?;
if pdu.sender != user_id {
pdu.unsigned.remove("transaction_id");
}
Ok((
utils::u64_from_bytes(&k[prefixlen..])
.map_err(|_| Error::bad_database("Invalid pdu id in db."))?,
pdu,
))
Ok((pdu_id, pdu))
})
}