Update state-res, use the new Event trait

This also bumps ruma to latest and removes js_int infavor of the ruma
re-export
This commit is contained in:
Devin Ragotzy 2020-12-31 08:40:49 -05:00 committed by Timo Kösters
parent 690c066064
commit 9e83d2b2d5
No known key found for this signature in database
GPG key ID: 24DA7517711A2BA4
11 changed files with 252 additions and 184 deletions

View file

@ -124,7 +124,7 @@ pub async fn get_room_visibility_route(
pub async fn get_public_rooms_filtered_helper(
db: &Database,
server: Option<&ServerName>,
limit: Option<js_int::UInt>,
limit: Option<ruma::UInt>,
since: Option<&str>,
filter: &IncomingFilter,
_network: &IncomingRoomNetwork,

View file

@ -131,7 +131,7 @@ pub async fn get_content_thumbnail_route(
allow_remote: false,
height: body.height,
width: body.width,
method: body.method,
method: body.method.clone(),
server_name: &body.server_name,
media_id: &body.media_id,
},

View file

@ -21,7 +21,7 @@ use ruma::{
serde::{to_canonical_value, CanonicalJsonObject, Raw},
EventId, RoomId, RoomVersionId, ServerName, UserId,
};
use state_res::StateEvent;
use state_res::Event;
use std::{
collections::{BTreeMap, HashMap, HashSet},
convert::TryFrom,
@ -594,19 +594,19 @@ async fn join_room_by_id_helper(
.chain(iter::once(Ok((event_id, join_event)))) // Add join event we just created
.map(|r| {
let (event_id, value) = r?;
state_res::StateEvent::from_id_canon_obj(event_id.clone(), value.clone())
PduEvent::from_id_val(&event_id, value.clone())
.map(|ev| (event_id, Arc::new(ev)))
.map_err(|e| {
warn!("{:?}: {}", value, e);
Error::BadServerResponse("Invalid PDU in send_join response.")
})
})
.collect::<Result<BTreeMap<EventId, Arc<StateEvent>>>>()?;
.collect::<Result<BTreeMap<EventId, Arc<PduEvent>>>>()?;
let control_events = event_map
.values()
.filter(|pdu| pdu.is_power_event())
.map(|pdu| pdu.event_id())
.filter(|pdu| state_res::is_power_event(pdu))
.map(|pdu| pdu.event_id.clone())
.collect::<Vec<_>>();
// These events are not guaranteed to be sorted but they are resolved according to spec
@ -646,7 +646,8 @@ async fn join_room_by_id_helper(
.cloned()
.collect::<Vec<_>>();
let power_level = resolved_control_events.get(&(EventType::RoomPowerLevels, "".into()));
let power_level =
resolved_control_events.get(&(EventType::RoomPowerLevels, Some("".to_string())));
// Sort the remaining non control events
let sorted_event_ids = state_res::StateResolution::mainline_sort(
room_id,
@ -685,8 +686,13 @@ async fn join_room_by_id_helper(
pdu_id.push(0xff);
pdu_id.extend_from_slice(&count.to_be_bytes());
db.rooms.append_pdu(
<<<<<<< HEAD
&PduEvent::from(&**pdu),
utils::to_canonical_object(&**pdu).expect("Pdu is valid canonical object"),
=======
&pdu,
&utils::to_canonical_object(&**pdu).expect("Pdu is valid canonical object"),
>>>>>>> 6232d1f (Update state-res, use the new Event trait)
count,
pdu_id.clone().into(),
&db.globals,
@ -695,7 +701,9 @@ async fn join_room_by_id_helper(
)?;
if state_events.contains(ev_id) {
state.insert((pdu.kind(), pdu.state_key()), pdu_id);
if let Some(key) = &pdu.state_key {
state.insert((pdu.kind(), key.to_string()), pdu_id);
}
}
}

View file

@ -8,7 +8,10 @@ use ruma::{
events::EventContent,
EventId,
};
use std::convert::{TryFrom, TryInto};
use std::{
collections::BTreeMap,
convert::{TryFrom, TryInto},
};
#[cfg(feature = "conduit_bin")]
use rocket::{get, put};
@ -46,7 +49,7 @@ pub async fn send_message_event_route(
return Ok(send_message_event::Response { event_id }.into());
}
let mut unsigned = serde_json::Map::new();
let mut unsigned = BTreeMap::new();
unsigned.insert("transaction_id".to_owned(), body.txn_id.clone().into());
let event_id = db.rooms.build_and_append_pdu(

View file

@ -20,7 +20,7 @@ use ruma::{
EventId, RoomAliasId, RoomId, RoomVersionId, ServerName, UserId,
};
use sled::IVec;
use state_res::{event_auth, Error as StateError, Requester, StateEvent, StateMap, StateStore};
use state_res::{event_auth, Error as StateError, Event, StateMap, StateStore};
use std::{
collections::{BTreeMap, HashMap},
@ -67,12 +67,8 @@ pub struct Rooms {
pub(super) stateid_pduid: sled::Tree, // StateId = StateHash + Short, PduId = Count (without roomid)
}
impl StateStore for Rooms {
fn get_event(
&self,
room_id: &RoomId,
event_id: &EventId,
) -> state_res::Result<Arc<StateEvent>> {
impl StateStore<PduEvent> for Rooms {
fn get_event(&self, room_id: &RoomId, event_id: &EventId) -> state_res::Result<Arc<PduEvent>> {
let pid = self
.get_pdu_id(event_id)
.map_err(StateError::custom)?
@ -91,7 +87,7 @@ impl StateStore for Rooms {
.ok_or_else(|| StateError::NotFound("PDU via pduid not found in db.".into()))?,
)
.map_err(Into::into)
.and_then(|pdu: StateEvent| {
.and_then(|pdu: PduEvent| {
// conduit's PDU's always contain a room_id but some
// of ruma's do not so this must be an Option
if pdu.room_id() == room_id {
@ -112,7 +108,7 @@ impl Rooms {
&self,
room_id: &RoomId,
state_hash: &StateHashId,
) -> Result<StateMap<PduEvent>> {
) -> Result<BTreeMap<(EventType, String), PduEvent>> {
self.stateid_pduid
.scan_prefix(&state_hash)
.values()
@ -141,7 +137,7 @@ impl Rooms {
pdu,
))
})
.collect::<Result<StateMap<_>>>()
.collect()
}
/// Returns a single PDU from `room_id` with key (`event_type`, `state_key`).
@ -181,7 +177,7 @@ impl Rooms {
)))
})
} else {
return Ok(None);
Ok(None)
}
}
@ -205,7 +201,7 @@ impl Rooms {
content: serde_json::Value,
) -> Result<StateMap<PduEvent>> {
let auth_events = state_res::auth_types_for_event(
kind.clone(),
kind,
sender,
state_key.map(|s| s.to_string()),
content,
@ -213,7 +209,13 @@ impl Rooms {
let mut events = StateMap::new();
for (event_type, state_key) in auth_events {
if let Some((_, pdu)) = self.room_state_get(room_id, &event_type, &state_key)? {
if let Some((_, pdu)) = self.room_state_get(
room_id,
&event_type,
&state_key
.as_deref()
.expect("found a non state event in auth events"),
)? {
events.insert((event_type, state_key), pdu);
}
}
@ -290,7 +292,10 @@ impl Rooms {
}
/// Returns the full room state.
pub fn room_state_full(&self, room_id: &RoomId) -> Result<StateMap<PduEvent>> {
pub fn room_state_full(
&self,
room_id: &RoomId,
) -> Result<BTreeMap<(EventType, String), PduEvent>> {
if let Some(current_state_hash) = self.current_state_hash(room_id)? {
self.state_full(&room_id, &current_state_hash)
} else {
@ -795,23 +800,40 @@ impl Rooms {
ErrorKind::Unknown,
"Membership can't be the first event",
))?)?
.map(|pdu| pdu.convert_for_state_res());
.map(Arc::new);
event_auth::valid_membership_change(
// TODO this is a bit of a hack but not sure how to have a type
// declared in `state_res` crate easily convert to/from conduit::PduEvent
Requester {
prev_event_ids: prev_events.to_owned(),
room_id: &room_id,
content: &content,
state_key: Some(state_key.to_owned()),
sender: &sender,
},
&Arc::new(PduEvent {
event_id: ruma::event_id!("$thiswillbefilledinlater"),
room_id: room_id.clone(),
sender: sender.clone(),
origin_server_ts: utils::millis_since_unix_epoch()
.try_into()
.expect("time is valid"),
kind: event_type,
content,
state_key: Some(state_key.clone()),
prev_events,
depth: (prev_events.len() as u32).into(),
auth_events: auth_events
.into_iter()
.map(|(_, pdu)| pdu.event_id)
.collect(),
redacts,
unsigned: unsigned
.map_or_else(BTreeMap::new, |m| m.into_iter().collect()),
hashes: ruma::events::pdu::EventHash {
sha256: "aaa".to_owned(),
},
signatures: BTreeMap::new(),
}),
prev_event,
None, // TODO: third party invite
&auth_events
.iter()
.map(|((ty, key), pdu)| {
Ok(((ty.clone(), key.clone()), pdu.convert_for_state_res()))
Ok(((ty.clone(), key.clone()), Arc::new(pdu.clone())))
})
.collect::<Result<StateMap<_>>>()?,
)

View file

@ -1,5 +1,4 @@
use crate::{utils, Error, Result};
use js_int::UInt;
use ruma::{
events::{
presence::{PresenceEvent, PresenceEventContent},
@ -7,7 +6,7 @@ use ruma::{
},
presence::PresenceState,
serde::Raw,
RoomId, UserId,
RoomId, UInt, UserId,
};
use std::{
collections::HashMap,

View file

@ -1,5 +1,4 @@
use crate::{utils, Error, Result};
use js_int::UInt;
use ruma::{
api::client::{
error::ErrorKind,
@ -11,7 +10,7 @@ use ruma::{
encryption::DeviceKeys,
events::{AnyToDeviceEvent, EventType},
serde::Raw,
DeviceId, DeviceKeyAlgorithm, DeviceKeyId, UserId,
DeviceId, DeviceKeyAlgorithm, DeviceKeyId, UInt, UserId,
};
use std::{collections::BTreeMap, convert::TryFrom, mem, time::SystemTime};

View file

@ -1,12 +1,11 @@
use crate::Error;
use js_int::UInt;
use ruma::{
events::{
pdu::EventHash, room::member::MemberEventContent, AnyEvent, AnyRoomEvent, AnyStateEvent,
AnyStrippedStateEvent, AnySyncRoomEvent, AnySyncStateEvent, EventType, StateEvent,
},
serde::{to_canonical_value, CanonicalJsonObject, CanonicalJsonValue, Raw},
EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId,
EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UInt, UserId,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
@ -33,8 +32,8 @@ pub struct PduEvent {
pub auth_events: Vec<EventId>,
#[serde(skip_serializing_if = "Option::is_none")]
pub redacts: Option<EventId>,
#[serde(default, skip_serializing_if = "serde_json::Map::is_empty")]
pub unsigned: serde_json::Map<String, serde_json::Value>,
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub unsigned: BTreeMap<String, serde_json::Value>,
pub hashes: EventHash,
pub signatures: BTreeMap<Box<ServerName>, BTreeMap<ServerSigningKeyId, String>>,
}
@ -227,61 +226,66 @@ impl PduEvent {
)
.expect("Raw::from_value always works")
}
}
impl From<&state_res::StateEvent> for PduEvent {
fn from(pdu: &state_res::StateEvent) -> Self {
Self {
event_id: pdu.event_id(),
room_id: pdu.room_id().clone(),
sender: pdu.sender().clone(),
origin_server_ts: (pdu
.origin_server_ts()
.duration_since(UNIX_EPOCH)
.expect("time is valid")
.as_millis() as u64)
.try_into()
.expect("time is valid"),
kind: pdu.kind(),
content: pdu.content().clone(),
state_key: Some(pdu.state_key()),
prev_events: pdu.prev_event_ids(),
depth: *pdu.depth(),
auth_events: pdu.auth_events(),
redacts: pdu.redacts().cloned(),
unsigned: pdu.unsigned().clone().into_iter().collect(),
hashes: pdu.hashes().clone(),
signatures: pdu.signatures(),
}
pub fn from_id_val(
event_id: &EventId,
json: CanonicalJsonObject,
) -> Result<Self, serde_json::Error> {
json.insert(
"event_id".to_string(),
ruma::serde::to_canonical_value(event_id).expect("event_id is a valid Value"),
);
serde_json::from_value(serde_json::to_value(json).expect("valid JSON"))
}
}
impl PduEvent {
pub fn convert_for_state_res(&self) -> Arc<state_res::StateEvent> {
Arc::new(
// For consistency of eventId (just in case) we use the one
// generated by conduit for everything.
state_res::StateEvent::from_id_value(
self.event_id.clone(),
json!({
"event_id": self.event_id,
"room_id": self.room_id,
"sender": self.sender,
"origin_server_ts": self.origin_server_ts,
"type": self.kind,
"content": self.content,
"state_key": self.state_key,
"prev_events": self.prev_events,
"depth": self.depth,
"auth_events": self.auth_events,
"redacts": self.redacts,
"unsigned": self.unsigned,
"hashes": self.hashes,
"signatures": self.signatures,
}),
)
.expect("all conduit PDUs are state events"),
)
impl state_res::Event for PduEvent {
fn event_id(&self) -> &EventId {
&self.event_id
}
fn room_id(&self) -> &RoomId {
&self.room_id
}
fn sender(&self) -> &UserId {
&self.sender
}
fn kind(&self) -> EventType {
self.kind.clone()
}
fn content(&self) -> serde_json::Value {
self.content.clone()
}
fn origin_server_ts(&self) -> std::time::SystemTime {
UNIX_EPOCH + std::time::Duration::from_millis(self.origin_server_ts.into())
}
fn state_key(&self) -> Option<String> {
self.state_key.clone()
}
fn prev_events(&self) -> Vec<EventId> {
self.prev_events.to_vec()
}
fn depth(&self) -> &UInt {
&self.depth
}
fn auth_events(&self) -> Vec<EventId> {
self.auth_events.to_vec()
}
fn redacts(&self) -> Option<&EventId> {
self.redacts.as_ref()
}
fn hashes(&self) -> &EventHash {
&self.hashes
}
fn signatures(&self) -> BTreeMap<Box<ServerName>, BTreeMap<ruma::ServerSigningKeyId, String>> {
self.signatures.clone()
}
fn unsigned(&self) -> &BTreeMap<String, serde_json::Value> {
&self.unsigned
}
}
@ -315,7 +319,7 @@ pub struct PduBuilder {
#[serde(rename = "type")]
pub event_type: EventType,
pub content: serde_json::Value,
pub unsigned: Option<serde_json::Map<String, serde_json::Value>>,
pub unsigned: Option<BTreeMap<String, serde_json::Value>>,
pub state_key: Option<String>,
pub redacts: Option<EventId>,
}

View file

@ -20,12 +20,13 @@ use ruma::{
directory::{IncomingFilter, IncomingRoomNetwork},
EventId, RoomId, RoomVersionId, ServerName, ServerSigningKeyId, UserId,
};
use state_res::StateMap;
use state_res::{Event, StateMap};
use std::{
collections::{BTreeMap, BTreeSet},
convert::TryFrom,
fmt::Debug,
net::{IpAddr, SocketAddr},
sync::Arc,
time::{Duration, SystemTime},
};
@ -610,17 +611,12 @@ pub async fn send_transaction_message_route<'a>(
continue;
}
// TODO: remove the need to convert to state_res
let event = pdu.convert_for_state_res();
let event = Arc::new(pdu.clone());
let previous = pdu
.prev_events
.first()
.map(|id| {
db.rooms
.get_pdu(id)
.expect("todo")
.map(|ev| ev.convert_for_state_res())
})
.map(|id| db.rooms.get_pdu(id).expect("todo").map(Arc::new))
.flatten();
// 4.
@ -637,27 +633,32 @@ pub async fn send_transaction_message_route<'a>(
previous.clone(),
auth_events
.into_iter()
.map(|(k, v)| (k, v.convert_for_state_res()))
.map(|(k, v)| (k, Arc::new(v)))
.collect(),
None,
)
.map_err(|_e| Error::Conflict("Auth check failed"))?
{
resolved_map.insert(
event.event_id(),
pdu.event_id,
Err("Event has failed auth check with auth events".into()),
);
continue;
}
let mut previous_states = vec![];
let mut previous_states: Vec<StateMap<Arc<PduEvent>>> = vec![];
for id in &pdu.prev_events {
if let Some(id) = db.rooms.get_pdu_id(id)? {
let state_hash = db
.rooms
.pdu_state_hash(&id)?
.expect("found pdu with no statehash");
let state = db.rooms.state_full(&pdu.room_id, &state_hash)?;
let state = db
.rooms
.state_full(&pdu.room_id, &state_hash)?
.into_iter()
.map(|((et, sk), ev)| ((et, Some(sk)), Arc::new(ev)))
.collect();
previous_states.push(state);
} else {
// fetch the state
@ -693,7 +694,7 @@ pub async fn send_transaction_message_route<'a>(
.into_iter()
.map(|map| {
map.into_iter()
.map(|(k, v)| (k, v.event_id))
.map(|(k, v)| (k, v.event_id.clone()))
.collect::<StateMap<_>>()
})
.collect::<Vec<_>>(),
@ -702,7 +703,7 @@ pub async fn send_transaction_message_route<'a>(
) {
Ok(res) => res
.into_iter()
.map(|(k, v)| (k, db.rooms.get_pdu(&v).unwrap().unwrap()))
.map(|(k, v)| (k, Arc::new(db.rooms.get_pdu(&v).unwrap().unwrap())))
.collect(),
Err(e) => panic!("{:?}", e),
}
@ -712,17 +713,14 @@ pub async fn send_transaction_message_route<'a>(
&RoomVersionId::Version6,
&event,
previous.clone(),
state_at_event
.into_iter()
.map(|(k, v)| (k, v.convert_for_state_res()))
.collect(),
state_at_event,
None,
)
.map_err(|_e| Error::Conflict("Auth check failed"))?
{
// Event failed auth with state_at
resolved_map.insert(
event.event_id(),
pdu.event_id,
Err("Event has failed auth check with state at the event".into()),
);
continue;
@ -733,14 +731,20 @@ pub async fn send_transaction_message_route<'a>(
// Gather the forward extremities and resolve
let forward_extrems = forward_extremity_ids(&db, &pdu.room_id)?;
let mut fork_states = vec![];
let mut fork_states: Vec<StateMap<Arc<PduEvent>>> = vec![];
for id in &forward_extrems {
if let Some(id) = db.rooms.get_pdu_id(id)? {
let state_hash = db
.rooms
.pdu_state_hash(&id)?
.expect("found pdu with no statehash");
let state = db.rooms.state_full(&pdu.room_id, &state_hash)?;
let state = db
.rooms
.state_full(&pdu.room_id, &state_hash)?
.into_iter()
.map(|(k, v)| ((k.0, Some(k.1)), Arc::new(v)))
.collect();
fork_states.push(state);
} else {
// This is probably an error??
@ -776,7 +780,7 @@ pub async fn send_transaction_message_route<'a>(
.into_iter()
.map(|map| {
map.into_iter()
.map(|(k, v)| (k, v.event_id))
.map(|(k, v)| (k, v.event_id.clone()))
.collect::<StateMap<_>>()
})
.collect::<Vec<_>>(),
@ -785,7 +789,7 @@ pub async fn send_transaction_message_route<'a>(
) {
Ok(res) => res
.into_iter()
.map(|(k, v)| (k, db.rooms.get_pdu(&v).unwrap().unwrap()))
.map(|(k, v)| (k, Arc::new(db.rooms.get_pdu(&v).unwrap().unwrap())))
.collect(),
Err(e) => panic!("{:?}", e),
}
@ -795,20 +799,20 @@ pub async fn send_transaction_message_route<'a>(
&RoomVersionId::Version6,
&event,
previous,
state_at_forks
.into_iter()
.map(|(k, v)| (k, v.convert_for_state_res()))
.collect(),
state_at_forks,
None,
)
.map_err(|_e| Error::Conflict("Auth check failed"))?
{
// Soft fail
resolved_map.insert(event.event_id(), Err("Event has been soft failed".into()));
resolved_map.insert(
event.event_id().clone(),
Err("Event has been soft failed".into()),
);
} else {
append_state(&db, &pdu)?;
// Event has passed all auth/stateres checks
resolved_map.insert(event.event_id(), Ok(()));
resolved_map.insert(event.event_id().clone(), Ok(()));
}
}