use std::{borrow::Borrow, collections::HashMap, sync::Arc}; use conduit::{ at, err, ref_at, utils::stream::{BroadbandExt, IterStream, ReadyExt}, PduEvent, Result, }; use database::{Deserialized, Map}; use futures::{FutureExt, StreamExt, TryFutureExt}; use ruma::{events::StateEventType, EventId, OwnedEventId, RoomId}; use serde::Deserialize; use crate::{ rooms, rooms::{ short::{ShortEventId, ShortStateHash, ShortStateKey}, state_compressor::parse_compressed_state_event, }, Dep, }; pub(super) struct Data { shorteventid_shortstatehash: Arc, services: Services, } struct Services { short: Dep, state: Dep, state_compressor: Dep, timeline: Dep, } impl Data { pub(super) fn new(args: &crate::Args<'_>) -> Self { let db = &args.db; Self { shorteventid_shortstatehash: db["shorteventid_shortstatehash"].clone(), services: Services { short: args.depend::("rooms::short"), state: args.depend::("rooms::state"), state_compressor: args.depend::("rooms::state_compressor"), timeline: args.depend::("rooms::timeline"), }, } } pub(super) async fn state_full( &self, shortstatehash: ShortStateHash, ) -> Result> { let state = self .state_full_pdus(shortstatehash) .await? .into_iter() .filter_map(|pdu| Some(((pdu.kind.to_string().into(), pdu.state_key.clone()?), pdu))) .collect(); Ok(state) } pub(super) async fn state_full_pdus(&self, shortstatehash: ShortStateHash) -> Result> { let short_ids = self.state_full_shortids(shortstatehash).await?; let full_pdus = self .services .short .multi_get_eventid_from_short(short_ids.iter().map(ref_at!(1))) .ready_filter_map(Result::ok) .broad_filter_map( |event_id: OwnedEventId| async move { self.services.timeline.get_pdu(&event_id).await.ok() }, ) .collect() .await; Ok(full_pdus) } pub(super) async fn state_full_ids(&self, shortstatehash: ShortStateHash) -> Result> where Id: for<'de> Deserialize<'de> + Send + Sized + ToOwned, ::Owned: Borrow, { let short_ids = self.state_full_shortids(shortstatehash).await?; let full_ids = self .services .short .multi_get_eventid_from_short(short_ids.iter().map(ref_at!(1))) .zip(short_ids.iter().stream().map(at!(0))) .ready_filter_map(|(event_id, shortstatekey)| Some((shortstatekey, event_id.ok()?))) .collect() .boxed() .await; Ok(full_ids) } pub(super) async fn state_full_shortids( &self, shortstatehash: ShortStateHash, ) -> Result> { let shortids = self .services .state_compressor .load_shortstatehash_info(shortstatehash) .await .map_err(|e| err!(Database("Missing state IDs: {e}")))? .pop() .expect("there is always one layer") .full_state .iter() .copied() .map(parse_compressed_state_event) .collect(); Ok(shortids) } /// Returns a single EventId from `room_id` with key /// (`event_type`,`state_key`). pub(super) async fn state_get_id( &self, shortstatehash: ShortStateHash, event_type: &StateEventType, state_key: &str, ) -> Result where Id: for<'de> Deserialize<'de> + Sized + ToOwned, ::Owned: Borrow, { let shortstatekey = self .services .short .get_shortstatekey(event_type, state_key) .await?; let full_state = self .services .state_compressor .load_shortstatehash_info(shortstatehash) .await .map_err(|e| err!(Database(error!(?event_type, ?state_key, "Missing state: {e:?}"))))? .pop() .expect("there is always one layer") .full_state; let compressed = full_state .iter() .find(|bytes| bytes.starts_with(&shortstatekey.to_be_bytes())) .ok_or(err!(Database("No shortstatekey in compressed state")))?; let (_, shorteventid) = parse_compressed_state_event(*compressed); self.services .short .get_eventid_from_short(shorteventid) .await } /// Returns a single PDU from `room_id` with key (`event_type`,`state_key`). pub(super) async fn state_get( &self, shortstatehash: ShortStateHash, event_type: &StateEventType, state_key: &str, ) -> Result { self.state_get_id(shortstatehash, event_type, state_key) .and_then(|event_id: OwnedEventId| async move { self.services.timeline.get_pdu(&event_id).await }) .await } /// Returns the state hash for this pdu. pub(super) async fn pdu_shortstatehash(&self, event_id: &EventId) -> Result { const BUFSIZE: usize = size_of::(); self.services .short .get_shorteventid(event_id) .and_then(|shorteventid| { self.shorteventid_shortstatehash .aqry::(&shorteventid) }) .await .deserialized() } /// Returns the full room state. pub(super) async fn room_state_full( &self, room_id: &RoomId, ) -> Result> { self.services .state .get_room_shortstatehash(room_id) .and_then(|shortstatehash| self.state_full(shortstatehash)) .map_err(|e| err!(Database("Missing state for {room_id:?}: {e:?}"))) .await } /// Returns the full room state's pdus. #[allow(unused_qualifications)] // async traits pub(super) async fn room_state_full_pdus(&self, room_id: &RoomId) -> Result> { self.services .state .get_room_shortstatehash(room_id) .and_then(|shortstatehash| self.state_full_pdus(shortstatehash)) .map_err(|e| err!(Database("Missing state pdus for {room_id:?}: {e:?}"))) .await } /// Returns a single EventId from `room_id` with key /// (`event_type`,`state_key`). pub(super) async fn room_state_get_id( &self, room_id: &RoomId, event_type: &StateEventType, state_key: &str, ) -> Result where Id: for<'de> Deserialize<'de> + Sized + ToOwned, ::Owned: Borrow, { self.services .state .get_room_shortstatehash(room_id) .and_then(|shortstatehash| self.state_get_id(shortstatehash, event_type, state_key)) .await } /// Returns a single PDU from `room_id` with key (`event_type`,`state_key`). pub(super) async fn room_state_get( &self, room_id: &RoomId, event_type: &StateEventType, state_key: &str, ) -> Result { self.services .state .get_room_shortstatehash(room_id) .and_then(|shortstatehash| self.state_get(shortstatehash, event_type, state_key)) .await } }