feat: implement /claim, handle to-device events

This commit is contained in:
Timo Kösters 2021-05-28 13:44:40 +02:00
parent 953f2b005f
commit 5b5cc0574e
No known key found for this signature in database
GPG key ID: 24DA7517711A2BA4
8 changed files with 409 additions and 64 deletions

View file

@ -12,7 +12,7 @@ use ruma::{
},
},
encryption::UnsignedDeviceInfo,
DeviceId, UserId,
DeviceId, DeviceKeyAlgorithm, UserId,
};
use std::collections::{BTreeMap, HashSet};
@ -98,29 +98,11 @@ pub async fn claim_keys_route(
db: State<'_, Database>,
body: Ruma<claim_keys::Request>,
) -> ConduitResult<claim_keys::Response> {
let mut one_time_keys = BTreeMap::new();
for (user_id, map) in &body.one_time_keys {
let mut container = BTreeMap::new();
for (device_id, key_algorithm) in map {
if let Some(one_time_keys) =
db.users
.take_one_time_key(user_id, device_id, key_algorithm, &db.globals)?
{
let mut c = BTreeMap::new();
c.insert(one_time_keys.0, one_time_keys.1);
container.insert(device_id.clone(), c);
}
}
one_time_keys.insert(user_id.clone(), container);
}
let response = claim_keys_helper(&body.one_time_keys, &db)?;
db.flush().await?;
Ok(claim_keys::Response {
failures: BTreeMap::new(),
one_time_keys,
}
.into())
Ok(response.into())
}
#[cfg_attr(
@ -375,3 +357,29 @@ pub fn get_keys_helper<F: Fn(&UserId) -> bool>(
failures: BTreeMap::new(),
})
}
pub fn claim_keys_helper(
one_time_keys_input: &BTreeMap<UserId, BTreeMap<Box<DeviceId>, DeviceKeyAlgorithm>>,
db: &Database,
) -> Result<claim_keys::Response> {
let mut one_time_keys = BTreeMap::new();
for (user_id, map) in one_time_keys_input {
let mut container = BTreeMap::new();
for (device_id, key_algorithm) in map {
if let Some(one_time_keys) =
db.users
.take_one_time_key(user_id, device_id, key_algorithm, &db.globals)?
{
let mut c = BTreeMap::new();
c.insert(one_time_keys.0, one_time_keys.1);
container.insert(device_id.clone(), c);
}
}
one_time_keys.insert(user_id.clone(), container);
}
Ok(claim_keys::Response {
failures: BTreeMap::new(),
one_time_keys,
})
}

View file

@ -1,5 +1,6 @@
use super::State;
use crate::{ConduitResult, Database, Error, Ruma};
use log::error;
use ruma::{
api::client::r0::sync::sync_events,
events::{room::member::MembershipState, AnySyncEphemeralRoomEvent, EventType},
@ -71,7 +72,12 @@ pub async fn sync_events_route(
let mut non_timeline_pdus = db
.rooms
.pdus_since(&sender_user, &room_id, since)?
.filter_map(|r| r.ok()); // Filter out buggy events
.filter_map(|r| {
if r.is_err() {
error!("Bad pdu in pdus_since: {:?}", r);
}
r.ok()
}); // Filter out buggy events
// Take the last 10 events for the timeline
let timeline_pdus = non_timeline_pdus

View file

@ -94,7 +94,7 @@ impl Globals {
.map(|key| (version, key))
})
.and_then(|(version, key)| {
ruma::signatures::Ed25519KeyPair::new(&key, version)
ruma::signatures::Ed25519KeyPair::from_der(&key, version)
.map_err(|_| Error::bad_database("Private or public keys are invalid."))
});

View file

@ -1494,7 +1494,12 @@ impl Rooms {
Ok(self
.pduid_pdu
.range(first_pdu_id..last_pdu_id)
.filter_map(|r| r.ok())
.filter_map(|r| {
if r.is_err() {
error!("Bad pdu in pduid_pdu: {:?}", r);
}
r.ok()
})
.map(move |(pdu_id, v)| {
let mut pdu = serde_json::from_slice::<PduEvent>(&v)
.map_err(|_| Error::bad_database("PDU in db is invalid."))?;

View file

@ -159,6 +159,7 @@ fn setup_rocket(config: Figment, data: Database) -> rocket::Rocket<rocket::Build
server_server::get_room_information_route,
server_server::get_profile_information_route,
server_server::get_keys_route,
server_server::claim_keys_route,
],
)
.register(

View file

@ -1,5 +1,5 @@
use crate::{
client_server::{self, get_keys_helper},
client_server::{self, claim_keys_helper, get_keys_helper},
utils, ConduitResult, Database, Error, PduEvent, Result, Ruma,
};
use get_profile_information::v1::ProfileField;
@ -9,7 +9,10 @@ use regex::Regex;
use rocket::{response::content::Json, State};
use ruma::{
api::{
client::error::{Error as RumaError, ErrorKind},
client::{
error::{Error as RumaError, ErrorKind},
r0::to_device,
},
federation::{
device::get_devices::{self, v1::UserDevice},
directory::{get_public_rooms, get_public_rooms_filtered},
@ -18,14 +21,17 @@ use ruma::{
VerifyKey,
},
event::{get_event, get_missing_events, get_room_state_ids},
keys::get_keys,
keys::{claim_keys, get_keys},
membership::{
create_invite,
create_join_event::{self, RoomState},
create_join_event_template,
},
query::{get_profile_information, get_room_information},
transactions::{edu::Edu, send_transaction_message},
transactions::{
edu::{DirectDeviceContent, Edu},
send_transaction_message,
},
},
EndpointError, IncomingResponse, OutgoingRequest, OutgoingResponse, SendAccessToken,
},
@ -720,8 +726,68 @@ pub async fn send_transaction_message_route<'a>(
.typing_remove(&typing.user_id, &typing.room_id, &db.globals)?;
}
}
Edu::DeviceListUpdate(_) => {}
Edu::DirectToDevice(_) => {}
Edu::DeviceListUpdate(_) => {
// TODO: Instead of worrying about stream ids we can just fetch all devices again
}
Edu::DirectToDevice(DirectDeviceContent {
sender,
ev_type,
message_id,
messages,
}) => {
// Check if this is a new transaction id
if db
.transaction_ids
.existing_txnid(&sender, None, &message_id)?
.is_some()
{
continue;
}
for (target_user_id, map) in &messages {
for (target_device_id_maybe, event) in map {
match target_device_id_maybe {
to_device::DeviceIdOrAllDevices::DeviceId(target_device_id) => {
db.users.add_to_device_event(
&sender,
&target_user_id,
&target_device_id,
&ev_type,
serde_json::from_str(event.get()).map_err(|_| {
Error::BadRequest(
ErrorKind::InvalidParam,
"Event is invalid",
)
})?,
&db.globals,
)?
}
to_device::DeviceIdOrAllDevices::AllDevices => {
for target_device_id in db.users.all_device_ids(&target_user_id) {
db.users.add_to_device_event(
&sender,
&target_user_id,
&target_device_id?,
&ev_type,
serde_json::from_str(event.get()).map_err(|_| {
Error::BadRequest(
ErrorKind::InvalidParam,
"Event is invalid",
)
})?,
&db.globals,
)?;
}
}
}
}
}
// Save transaction id with empty data
db.transaction_ids
.add_txnid(&sender, None, &message_id, &[])?;
}
Edu::_Custom(_) => {}
}
}
@ -2335,6 +2401,29 @@ pub fn get_keys_route<'a>(
.into())
}
#[cfg_attr(
feature = "conduit_bin",
post("/_matrix/federation/v1/user/keys/claim", data = "<body>")
)]
#[tracing::instrument(skip(db, body))]
pub async fn claim_keys_route<'a>(
db: State<'a, Database>,
body: Ruma<claim_keys::v1::Request>,
) -> ConduitResult<claim_keys::v1::Response> {
if !db.globals.allow_federation() {
return Err(Error::bad_config("Federation is disabled."));
}
let result = claim_keys_helper(&body.one_time_keys, &db)?;
db.flush().await?;
Ok(claim_keys::v1::Response {
one_time_keys: result.one_time_keys,
}
.into())
}
pub async fn fetch_required_signing_keys(
event: &BTreeMap<String, CanonicalJsonValue>,
pub_key_map: &RwLock<BTreeMap<String, BTreeMap<String, String>>>,