fix arithmetic side-effects
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
52a561ff9e
commit
7397064edd
25 changed files with 139 additions and 114 deletions
|
@ -769,7 +769,7 @@ perf = "warn"
|
||||||
###################
|
###################
|
||||||
#restriction = "warn"
|
#restriction = "warn"
|
||||||
|
|
||||||
#arithmetic_side_effects = "warn" # TODO
|
arithmetic_side_effects = "warn"
|
||||||
#as_conversions = "warn" # TODO
|
#as_conversions = "warn" # TODO
|
||||||
assertions_on_result_states = "warn"
|
assertions_on_result_states = "warn"
|
||||||
dbg_macro = "warn"
|
dbg_macro = "warn"
|
||||||
|
|
|
@ -58,7 +58,7 @@ pub(super) async fn parse_pdu(body: Vec<&str>) -> Result<RoomMessageEventContent
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let string = body[1..body.len() - 1].join("\n");
|
let string = body[1..body.len().saturating_sub(1)].join("\n");
|
||||||
match serde_json::from_str(&string) {
|
match serde_json::from_str(&string) {
|
||||||
Ok(value) => match ruma::signatures::reference_hash(&value, &RoomVersionId::V6) {
|
Ok(value) => match ruma::signatures::reference_hash(&value, &RoomVersionId::V6) {
|
||||||
Ok(hash) => {
|
Ok(hash) => {
|
||||||
|
|
|
@ -191,7 +191,10 @@ async fn ban_list_of_rooms(body: Vec<&str>, force: bool, disable_federation: boo
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let rooms_s = body.clone().drain(1..body.len() - 1).collect::<Vec<_>>();
|
let rooms_s = body
|
||||||
|
.clone()
|
||||||
|
.drain(1..body.len().saturating_sub(1))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
let admin_room_alias = &services().globals.admin_alias;
|
let admin_room_alias = &services().globals.admin_alias;
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,7 @@ pub(super) async fn list(_body: Vec<&str>) -> Result<RoomMessageEventContent> {
|
||||||
match services().users.list_local_users() {
|
match services().users.list_local_users() {
|
||||||
Ok(users) => {
|
Ok(users) => {
|
||||||
let mut plain_msg = format!("Found {} local user account(s):\n```\n", users.len());
|
let mut plain_msg = format!("Found {} local user account(s):\n```\n", users.len());
|
||||||
plain_msg += &users.join("\n");
|
plain_msg += users.join("\n").as_str();
|
||||||
plain_msg += "\n```";
|
plain_msg += "\n```";
|
||||||
|
|
||||||
Ok(RoomMessageEventContent::notice_markdown(plain_msg))
|
Ok(RoomMessageEventContent::notice_markdown(plain_msg))
|
||||||
|
@ -195,7 +195,10 @@ pub(super) async fn deactivate_all(
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
let usernames = body.clone().drain(1..body.len() - 1).collect::<Vec<_>>();
|
let usernames = body
|
||||||
|
.clone()
|
||||||
|
.drain(1..body.len().saturating_sub(1))
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
let mut user_ids: Vec<OwnedUserId> = Vec::with_capacity(usernames.len());
|
let mut user_ids: Vec<OwnedUserId> = Vec::with_capacity(usernames.len());
|
||||||
let mut admins = Vec::new();
|
let mut admins = Vec::new();
|
||||||
|
|
|
@ -26,7 +26,7 @@ impl fmt::Display for Escape<'_> {
|
||||||
fmt.write_str(s)?;
|
fmt.write_str(s)?;
|
||||||
// NOTE: we only expect single byte characters here - which is fine as long as
|
// NOTE: we only expect single byte characters here - which is fine as long as
|
||||||
// we only match single byte characters
|
// we only match single byte characters
|
||||||
last = i + 1;
|
last = i.saturating_add(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
if last < s.len() {
|
if last < s.len() {
|
||||||
|
|
|
@ -16,7 +16,15 @@ macro_rules! checked {
|
||||||
#[cfg(not(debug_assertions))]
|
#[cfg(not(debug_assertions))]
|
||||||
#[macro_export]
|
#[macro_export]
|
||||||
macro_rules! validated {
|
macro_rules! validated {
|
||||||
($($input:tt)*) => { Ok($($input)*) }
|
($($input:tt)*) => {
|
||||||
|
//#[allow(clippy::arithmetic_side_effects)] {
|
||||||
|
//Some($($input)*)
|
||||||
|
// .ok_or_else(|| $crate::Error::Arithmetic("this error should never been seen"))
|
||||||
|
//}
|
||||||
|
|
||||||
|
//NOTE: remove me when stmt_expr_attributes is stable
|
||||||
|
$crate::checked!($($input)*)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(debug_assertions)]
|
#[cfg(debug_assertions)]
|
||||||
|
|
|
@ -15,7 +15,11 @@ pub fn string(length: usize) -> String {
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn timepoint_secs(range: Range<u64>) -> SystemTime { SystemTime::now() + secs(range) }
|
pub fn timepoint_secs(range: Range<u64>) -> SystemTime {
|
||||||
|
SystemTime::now()
|
||||||
|
.checked_add(secs(range))
|
||||||
|
.expect("range does not overflow SystemTime")
|
||||||
|
}
|
||||||
|
|
||||||
#[must_use]
|
#[must_use]
|
||||||
pub fn secs(range: Range<u64>) -> Duration {
|
pub fn secs(range: Range<u64>) -> Duration {
|
||||||
|
|
|
@ -65,7 +65,7 @@ fn common_prefix_none() {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn checked_add() {
|
fn checked_add() {
|
||||||
use utils::math::checked;
|
use crate::checked;
|
||||||
|
|
||||||
let a = 1234;
|
let a = 1234;
|
||||||
let res = checked!(a + 1).unwrap();
|
let res = checked!(a + 1).unwrap();
|
||||||
|
@ -75,9 +75,9 @@ fn checked_add() {
|
||||||
#[test]
|
#[test]
|
||||||
#[should_panic(expected = "overflow")]
|
#[should_panic(expected = "overflow")]
|
||||||
fn checked_add_overflow() {
|
fn checked_add_overflow() {
|
||||||
use utils::math::checked;
|
use crate::checked;
|
||||||
|
|
||||||
let a: u64 = u64::MAX;
|
let a = u64::MAX;
|
||||||
let res = checked!(a + 1).expect("overflow");
|
let res = checked!(a + 1).expect("overflow");
|
||||||
assert_eq!(res, 0);
|
assert_eq!(res, 0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,7 +65,10 @@ impl Data {
|
||||||
"counter mismatch"
|
"counter mismatch"
|
||||||
);
|
);
|
||||||
|
|
||||||
*counter = counter.wrapping_add(1);
|
*counter = counter
|
||||||
|
.checked_add(1)
|
||||||
|
.expect("counter must not overflow u64");
|
||||||
|
|
||||||
self.global.insert(COUNTER, &counter.to_be_bytes())?;
|
self.global.insert(COUNTER, &counter.to_be_bytes())?;
|
||||||
|
|
||||||
Ok(*counter)
|
Ok(*counter)
|
||||||
|
|
|
@ -836,11 +836,14 @@ async fn fix_bad_double_separator_in_state_cache(db: &Arc<Database>, _config: &C
|
||||||
for (mut key, value) in roomuserid_joined.iter() {
|
for (mut key, value) in roomuserid_joined.iter() {
|
||||||
iter_count = iter_count.saturating_add(1);
|
iter_count = iter_count.saturating_add(1);
|
||||||
debug_info!(%iter_count);
|
debug_info!(%iter_count);
|
||||||
let first_sep_index = key.iter().position(|&i| i == 0xFF).unwrap();
|
let first_sep_index = key
|
||||||
|
.iter()
|
||||||
|
.position(|&i| i == 0xFF)
|
||||||
|
.expect("found 0xFF delim");
|
||||||
|
|
||||||
if key
|
if key
|
||||||
.iter()
|
.iter()
|
||||||
.get(first_sep_index..=first_sep_index + 1)
|
.get(first_sep_index..=first_sep_index.saturating_add(1))
|
||||||
.copied()
|
.copied()
|
||||||
.collect_vec()
|
.collect_vec()
|
||||||
== vec![0xFF, 0xFF]
|
== vec![0xFF, 0xFF]
|
||||||
|
|
|
@ -1,10 +1,10 @@
|
||||||
mod data;
|
mod data;
|
||||||
mod tests;
|
mod tests;
|
||||||
|
|
||||||
use std::{collections::HashMap, io::Cursor, path::PathBuf, sync::Arc, time::SystemTime};
|
use std::{collections::HashMap, io::Cursor, num::Saturating as Sat, path::PathBuf, sync::Arc, time::SystemTime};
|
||||||
|
|
||||||
use base64::{engine::general_purpose, Engine as _};
|
use base64::{engine::general_purpose, Engine as _};
|
||||||
use conduit::{debug, debug_error, error, utils, Error, Result, Server};
|
use conduit::{checked, debug, debug_error, error, utils, Error, Result, Server};
|
||||||
use data::Data;
|
use data::Data;
|
||||||
use image::imageops::FilterType;
|
use image::imageops::FilterType;
|
||||||
use ruma::{OwnedMxcUri, OwnedUserId};
|
use ruma::{OwnedMxcUri, OwnedUserId};
|
||||||
|
@ -305,36 +305,20 @@ impl Service {
|
||||||
image.resize_to_fill(width, height, FilterType::CatmullRom)
|
image.resize_to_fill(width, height, FilterType::CatmullRom)
|
||||||
} else {
|
} else {
|
||||||
let (exact_width, exact_height) = {
|
let (exact_width, exact_height) = {
|
||||||
// Copied from image::dynimage::resize_dimensions
|
let ratio = Sat(original_width) * Sat(height);
|
||||||
//
|
let nratio = Sat(width) * Sat(original_height);
|
||||||
// https://github.com/image-rs/image/blob/6edf8ae492c4bb1dacb41da88681ea74dab1bab3/src/math/utils.rs#L5-L11
|
|
||||||
// Calculates the width and height an image should be
|
|
||||||
// resized to. This preserves aspect ratio, and based
|
|
||||||
// on the `fill` parameter will either fill the
|
|
||||||
// dimensions to fit inside the smaller constraint
|
|
||||||
// (will overflow the specified bounds on one axis to
|
|
||||||
// preserve aspect ratio), or will shrink so that both
|
|
||||||
// dimensions are completely contained within the given
|
|
||||||
// `width` and `height`, with empty space on one axis.
|
|
||||||
let ratio = u64::from(original_width) * u64::from(height);
|
|
||||||
let nratio = u64::from(width) * u64::from(original_height);
|
|
||||||
|
|
||||||
let use_width = nratio <= ratio;
|
let use_width = nratio <= ratio;
|
||||||
let intermediate = if use_width {
|
let intermediate = if use_width {
|
||||||
u64::from(original_height) * u64::from(width) / u64::from(original_width)
|
Sat(original_height) * Sat(checked!(width / original_width)?)
|
||||||
} else {
|
} else {
|
||||||
u64::from(original_width) * u64::from(height) / u64::from(original_height)
|
Sat(original_width) * Sat(checked!(height / original_height)?)
|
||||||
};
|
};
|
||||||
|
|
||||||
if use_width {
|
if use_width {
|
||||||
if u32::try_from(intermediate).is_ok() {
|
(width, intermediate.0)
|
||||||
(width, intermediate as u32)
|
|
||||||
} else {
|
|
||||||
((u64::from(width) * u64::from(u32::MAX) / intermediate) as u32, u32::MAX)
|
|
||||||
}
|
|
||||||
} else if u32::try_from(intermediate).is_ok() {
|
|
||||||
(intermediate as u32, height)
|
|
||||||
} else {
|
} else {
|
||||||
(u32::MAX, (u64::from(height) * u64::from(u32::MAX) / intermediate) as u32)
|
(intermediate.0, height)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,7 @@ mod data;
|
||||||
use std::{sync::Arc, time::Duration};
|
use std::{sync::Arc, time::Duration};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use conduit::{debug, error, utils, Error, Result};
|
use conduit::{checked, debug, error, utils, Error, Result};
|
||||||
use data::Data;
|
use data::Data;
|
||||||
use futures_util::{stream::FuturesUnordered, StreamExt};
|
use futures_util::{stream::FuturesUnordered, StreamExt};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
|
@ -79,12 +79,16 @@ pub struct Service {
|
||||||
timer_receiver: Mutex<loole::Receiver<(OwnedUserId, Duration)>>,
|
timer_receiver: Mutex<loole::Receiver<(OwnedUserId, Duration)>>,
|
||||||
handler_join: Mutex<Option<JoinHandle<()>>>,
|
handler_join: Mutex<Option<JoinHandle<()>>>,
|
||||||
timeout_remote_users: bool,
|
timeout_remote_users: bool,
|
||||||
|
idle_timeout: u64,
|
||||||
|
offline_timeout: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl crate::Service for Service {
|
impl crate::Service for Service {
|
||||||
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||||
let config = &args.server.config;
|
let config = &args.server.config;
|
||||||
|
let idle_timeout_s = config.presence_idle_timeout_s;
|
||||||
|
let offline_timeout_s = config.presence_offline_timeout_s;
|
||||||
let (timer_sender, timer_receiver) = loole::unbounded();
|
let (timer_sender, timer_receiver) = loole::unbounded();
|
||||||
Ok(Arc::new(Self {
|
Ok(Arc::new(Self {
|
||||||
db: Data::new(args.db),
|
db: Data::new(args.db),
|
||||||
|
@ -92,6 +96,8 @@ impl crate::Service for Service {
|
||||||
timer_receiver: Mutex::new(timer_receiver),
|
timer_receiver: Mutex::new(timer_receiver),
|
||||||
handler_join: Mutex::new(None),
|
handler_join: Mutex::new(None),
|
||||||
timeout_remote_users: config.presence_timeout_remote_users,
|
timeout_remote_users: config.presence_timeout_remote_users,
|
||||||
|
idle_timeout: checked!(idle_timeout_s * 1_000)?,
|
||||||
|
offline_timeout: checked!(offline_timeout_s * 1_000)?,
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -219,7 +225,7 @@ impl Service {
|
||||||
loop {
|
loop {
|
||||||
debug_assert!(!receiver.is_closed(), "channel error");
|
debug_assert!(!receiver.is_closed(), "channel error");
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
Some(user_id) = presence_timers.next() => process_presence_timer(&user_id)?,
|
Some(user_id) = presence_timers.next() => self.process_presence_timer(&user_id)?,
|
||||||
event = receiver.recv_async() => match event {
|
event = receiver.recv_async() => match event {
|
||||||
Err(_e) => return Ok(()),
|
Err(_e) => return Ok(()),
|
||||||
Ok((user_id, timeout)) => {
|
Ok((user_id, timeout)) => {
|
||||||
|
@ -230,6 +236,36 @@ impl Service {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn process_presence_timer(&self, user_id: &OwnedUserId) -> Result<()> {
|
||||||
|
let mut presence_state = PresenceState::Offline;
|
||||||
|
let mut last_active_ago = None;
|
||||||
|
let mut status_msg = None;
|
||||||
|
|
||||||
|
let presence_event = self.get_presence(user_id)?;
|
||||||
|
|
||||||
|
if let Some(presence_event) = presence_event {
|
||||||
|
presence_state = presence_event.content.presence;
|
||||||
|
last_active_ago = presence_event.content.last_active_ago;
|
||||||
|
status_msg = presence_event.content.status_msg;
|
||||||
|
}
|
||||||
|
|
||||||
|
let new_state = match (&presence_state, last_active_ago.map(u64::from)) {
|
||||||
|
(PresenceState::Online, Some(ago)) if ago >= self.idle_timeout => Some(PresenceState::Unavailable),
|
||||||
|
(PresenceState::Unavailable, Some(ago)) if ago >= self.offline_timeout => Some(PresenceState::Offline),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
debug!(
|
||||||
|
"Processed presence timer for user '{user_id}': Old state = {presence_state}, New state = {new_state:?}"
|
||||||
|
);
|
||||||
|
|
||||||
|
if let Some(new_state) = new_state {
|
||||||
|
self.set_presence(user_id, &new_state, Some(false), last_active_ago, status_msg)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn presence_timer(user_id: OwnedUserId, timeout: Duration) -> OwnedUserId {
|
async fn presence_timer(user_id: OwnedUserId, timeout: Duration) -> OwnedUserId {
|
||||||
|
@ -237,36 +273,3 @@ async fn presence_timer(user_id: OwnedUserId, timeout: Duration) -> OwnedUserId
|
||||||
|
|
||||||
user_id
|
user_id
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_presence_timer(user_id: &OwnedUserId) -> Result<()> {
|
|
||||||
let idle_timeout = services().globals.config.presence_idle_timeout_s * 1_000;
|
|
||||||
let offline_timeout = services().globals.config.presence_offline_timeout_s * 1_000;
|
|
||||||
|
|
||||||
let mut presence_state = PresenceState::Offline;
|
|
||||||
let mut last_active_ago = None;
|
|
||||||
let mut status_msg = None;
|
|
||||||
|
|
||||||
let presence_event = services().presence.get_presence(user_id)?;
|
|
||||||
|
|
||||||
if let Some(presence_event) = presence_event {
|
|
||||||
presence_state = presence_event.content.presence;
|
|
||||||
last_active_ago = presence_event.content.last_active_ago;
|
|
||||||
status_msg = presence_event.content.status_msg;
|
|
||||||
}
|
|
||||||
|
|
||||||
let new_state = match (&presence_state, last_active_ago.map(u64::from)) {
|
|
||||||
(PresenceState::Online, Some(ago)) if ago >= idle_timeout => Some(PresenceState::Unavailable),
|
|
||||||
(PresenceState::Unavailable, Some(ago)) if ago >= offline_timeout => Some(PresenceState::Offline),
|
|
||||||
_ => None,
|
|
||||||
};
|
|
||||||
|
|
||||||
debug!("Processed presence timer for user '{user_id}': Old state = {presence_state}, New state = {new_state:?}");
|
|
||||||
|
|
||||||
if let Some(new_state) = new_state {
|
|
||||||
services()
|
|
||||||
.presence
|
|
||||||
.set_presence(user_id, &new_state, Some(false), last_active_ago, status_msg)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
|
@ -5,7 +5,7 @@ use std::{
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
|
|
||||||
use conduit::{debug, error, trace, warn, Error, Result};
|
use conduit::{debug, error, trace, validated, warn, Error, Result};
|
||||||
use data::Data;
|
use data::Data;
|
||||||
use ruma::{api::client::error::ErrorKind, EventId, RoomId};
|
use ruma::{api::client::error::ErrorKind, EventId, RoomId};
|
||||||
|
|
||||||
|
@ -43,20 +43,20 @@ impl Service {
|
||||||
|
|
||||||
#[tracing::instrument(skip_all, name = "auth_chain")]
|
#[tracing::instrument(skip_all, name = "auth_chain")]
|
||||||
pub async fn get_auth_chain(&self, room_id: &RoomId, starting_events: &[&EventId]) -> Result<Vec<u64>> {
|
pub async fn get_auth_chain(&self, room_id: &RoomId, starting_events: &[&EventId]) -> Result<Vec<u64>> {
|
||||||
const NUM_BUCKETS: usize = 50; //TODO: change possible w/o disrupting db?
|
const NUM_BUCKETS: u64 = 50; //TODO: change possible w/o disrupting db?
|
||||||
const BUCKET: BTreeSet<(u64, &EventId)> = BTreeSet::new();
|
const BUCKET: BTreeSet<(u64, &EventId)> = BTreeSet::new();
|
||||||
|
|
||||||
let started = std::time::Instant::now();
|
let started = std::time::Instant::now();
|
||||||
let mut buckets = [BUCKET; NUM_BUCKETS];
|
let mut buckets = [BUCKET; NUM_BUCKETS as usize];
|
||||||
for (i, short) in services()
|
for (i, &short) in services()
|
||||||
.rooms
|
.rooms
|
||||||
.short
|
.short
|
||||||
.multi_get_or_create_shorteventid(starting_events)?
|
.multi_get_or_create_shorteventid(starting_events)?
|
||||||
.iter()
|
.iter()
|
||||||
.enumerate()
|
.enumerate()
|
||||||
{
|
{
|
||||||
let bucket = short % NUM_BUCKETS as u64;
|
let bucket = validated!(short % NUM_BUCKETS)?;
|
||||||
buckets[bucket as usize].insert((*short, starting_events[i]));
|
buckets[bucket as usize].insert((short, starting_events[i]));
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
|
|
|
@ -191,7 +191,7 @@ impl Service {
|
||||||
e.insert((Instant::now(), 1));
|
e.insert((Instant::now(), 1));
|
||||||
},
|
},
|
||||||
hash_map::Entry::Occupied(mut e) => {
|
hash_map::Entry::Occupied(mut e) => {
|
||||||
*e.get_mut() = (Instant::now(), e.get().1 + 1);
|
*e.get_mut() = (Instant::now(), e.get().1.saturating_add(1));
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
},
|
},
|
||||||
|
@ -1072,7 +1072,7 @@ impl Service {
|
||||||
let mut todo_auth_events = vec![Arc::clone(id)];
|
let mut todo_auth_events = vec![Arc::clone(id)];
|
||||||
let mut events_in_reverse_order = Vec::with_capacity(todo_auth_events.len());
|
let mut events_in_reverse_order = Vec::with_capacity(todo_auth_events.len());
|
||||||
let mut events_all = HashSet::with_capacity(todo_auth_events.len());
|
let mut events_all = HashSet::with_capacity(todo_auth_events.len());
|
||||||
let mut i = 0;
|
let mut i: u64 = 0;
|
||||||
while let Some(next_id) = todo_auth_events.pop() {
|
while let Some(next_id) = todo_auth_events.pop() {
|
||||||
if let Some((time, tries)) = services()
|
if let Some((time, tries)) = services()
|
||||||
.globals
|
.globals
|
||||||
|
@ -1094,7 +1094,7 @@ impl Service {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
i += 1;
|
i = i.saturating_add(1);
|
||||||
if i % 100 == 0 {
|
if i % 100 == 0 {
|
||||||
tokio::task::yield_now().await;
|
tokio::task::yield_now().await;
|
||||||
}
|
}
|
||||||
|
|
|
@ -205,7 +205,7 @@ impl Service {
|
||||||
if let Ok(relations) = self.db.relations_until(user_id, room_id, target, until) {
|
if let Ok(relations) = self.db.relations_until(user_id, room_id, target, until) {
|
||||||
for relation in relations.flatten() {
|
for relation in relations.flatten() {
|
||||||
if stack_pdu.1 < max_depth {
|
if stack_pdu.1 < max_depth {
|
||||||
stack.push((relation.clone(), stack_pdu.1 + 1));
|
stack.push((relation.clone(), stack_pdu.1.saturating_add(1)));
|
||||||
}
|
}
|
||||||
|
|
||||||
pdus.push(relation);
|
pdus.push(relation);
|
||||||
|
|
|
@ -76,10 +76,12 @@ impl Data {
|
||||||
.iter_from(&first_possible_edu, false)
|
.iter_from(&first_possible_edu, false)
|
||||||
.take_while(move |(k, _)| k.starts_with(&prefix2))
|
.take_while(move |(k, _)| k.starts_with(&prefix2))
|
||||||
.map(move |(k, v)| {
|
.map(move |(k, v)| {
|
||||||
let count = utils::u64_from_bytes(&k[prefix.len()..prefix.len() + size_of::<u64>()])
|
let count_offset = prefix.len().saturating_add(size_of::<u64>());
|
||||||
|
let count = utils::u64_from_bytes(&k[prefix.len()..count_offset])
|
||||||
.map_err(|_| Error::bad_database("Invalid readreceiptid count in db."))?;
|
.map_err(|_| Error::bad_database("Invalid readreceiptid count in db."))?;
|
||||||
|
let user_id_offset = count_offset.saturating_add(1);
|
||||||
let user_id = UserId::parse(
|
let user_id = UserId::parse(
|
||||||
utils::string_from_bytes(&k[prefix.len() + size_of::<u64>() + 1..])
|
utils::string_from_bytes(&k[user_id_offset..])
|
||||||
.map_err(|_| Error::bad_database("Invalid readreceiptid userid bytes in db."))?,
|
.map_err(|_| Error::bad_database("Invalid readreceiptid userid bytes in db."))?,
|
||||||
)
|
)
|
||||||
.map_err(|_| Error::bad_database("Invalid readreceiptid userid in db."))?;
|
.map_err(|_| Error::bad_database("Invalid readreceiptid userid in db."))?;
|
||||||
|
|
|
@ -7,7 +7,7 @@ use std::{
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
|
|
||||||
use conduit::debug_info;
|
use conduit::{checked, debug_info};
|
||||||
use lru_cache::LruCache;
|
use lru_cache::LruCache;
|
||||||
use ruma::{
|
use ruma::{
|
||||||
api::{
|
api::{
|
||||||
|
@ -508,7 +508,8 @@ impl Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
// We have reached the room after where we last left off
|
// We have reached the room after where we last left off
|
||||||
if parents.len() + 1 == short_room_ids.len() {
|
let parents_len = parents.len();
|
||||||
|
if checked!(parents_len + 1)? == short_room_ids.len() {
|
||||||
populate_results = true;
|
populate_results = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use std::{collections::HashSet, mem::size_of, sync::Arc};
|
use std::{collections::HashSet, mem::size_of, sync::Arc};
|
||||||
|
|
||||||
use conduit::{utils, Error, Result};
|
use conduit::{checked, utils, Error, Result};
|
||||||
use database::{Database, Map};
|
use database::{Database, Map};
|
||||||
|
|
||||||
use super::CompressedStateEvent;
|
use super::CompressedStateEvent;
|
||||||
|
@ -38,11 +38,12 @@ impl Data {
|
||||||
let mut added = HashSet::new();
|
let mut added = HashSet::new();
|
||||||
let mut removed = HashSet::new();
|
let mut removed = HashSet::new();
|
||||||
|
|
||||||
let mut i = size_of::<u64>();
|
let stride = size_of::<u64>();
|
||||||
while let Some(v) = value.get(i..i + 2 * size_of::<u64>()) {
|
let mut i = stride;
|
||||||
|
while let Some(v) = value.get(i..checked!(i + 2 * stride)?) {
|
||||||
if add_mode && v.starts_with(&0_u64.to_be_bytes()) {
|
if add_mode && v.starts_with(&0_u64.to_be_bytes()) {
|
||||||
add_mode = false;
|
add_mode = false;
|
||||||
i += size_of::<u64>();
|
i = checked!(i + stride)?;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if add_mode {
|
if add_mode {
|
||||||
|
@ -50,7 +51,7 @@ impl Data {
|
||||||
} else {
|
} else {
|
||||||
removed.insert(v.try_into().expect("we checked the size above"));
|
removed.insert(v.try_into().expect("we checked the size above"));
|
||||||
}
|
}
|
||||||
i += 2 * size_of::<u64>();
|
i = checked!(i + 2 * stride)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(StateDiff {
|
Ok(StateDiff {
|
||||||
|
|
|
@ -7,7 +7,7 @@ use std::{
|
||||||
sync::{Arc, Mutex as StdMutex, Mutex},
|
sync::{Arc, Mutex as StdMutex, Mutex},
|
||||||
};
|
};
|
||||||
|
|
||||||
use conduit::{utils, Result};
|
use conduit::{checked, utils, Result};
|
||||||
use data::Data;
|
use data::Data;
|
||||||
use lru_cache::LruCache;
|
use lru_cache::LruCache;
|
||||||
use ruma::{EventId, RoomId};
|
use ruma::{EventId, RoomId};
|
||||||
|
@ -169,12 +169,14 @@ impl Service {
|
||||||
statediffremoved: Arc<HashSet<CompressedStateEvent>>, diff_to_sibling: usize,
|
statediffremoved: Arc<HashSet<CompressedStateEvent>>, diff_to_sibling: usize,
|
||||||
mut parent_states: ParentStatesVec,
|
mut parent_states: ParentStatesVec,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let diffsum = statediffnew.len() + statediffremoved.len();
|
let statediffnew_len = statediffnew.len();
|
||||||
|
let statediffremoved_len = statediffremoved.len();
|
||||||
|
let diffsum = checked!(statediffnew_len + statediffremoved_len)?;
|
||||||
|
|
||||||
if parent_states.len() > 3 {
|
if parent_states.len() > 3 {
|
||||||
// Number of layers
|
// Number of layers
|
||||||
// To many layers, we have to go deeper
|
// To many layers, we have to go deeper
|
||||||
let parent = parent_states.pop().unwrap();
|
let parent = parent_states.pop().expect("parent must have a state");
|
||||||
|
|
||||||
let mut parent_new = (*parent.2).clone();
|
let mut parent_new = (*parent.2).clone();
|
||||||
let mut parent_removed = (*parent.3).clone();
|
let mut parent_removed = (*parent.3).clone();
|
||||||
|
@ -226,10 +228,12 @@ impl Service {
|
||||||
// 1. We add the current diff on top of the parent layer.
|
// 1. We add the current diff on top of the parent layer.
|
||||||
// 2. We replace a layer above
|
// 2. We replace a layer above
|
||||||
|
|
||||||
let parent = parent_states.pop().unwrap();
|
let parent = parent_states.pop().expect("parent must have a state");
|
||||||
let parent_diff = parent.2.len() + parent.3.len();
|
let parent_2_len = parent.2.len();
|
||||||
|
let parent_3_len = parent.3.len();
|
||||||
|
let parent_diff = checked!(parent_2_len + parent_3_len)?;
|
||||||
|
|
||||||
if diffsum * diffsum >= 2 * diff_to_sibling * parent_diff {
|
if checked!(diffsum * diffsum)? >= checked!(2 * diff_to_sibling * parent_diff)? {
|
||||||
// Diff too big, we replace above layer(s)
|
// Diff too big, we replace above layer(s)
|
||||||
let mut parent_new = (*parent.2).clone();
|
let mut parent_new = (*parent.2).clone();
|
||||||
let mut parent_removed = (*parent.3).clone();
|
let mut parent_removed = (*parent.3).clone();
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use std::{mem::size_of, sync::Arc};
|
use std::{mem::size_of, sync::Arc};
|
||||||
|
|
||||||
use conduit::{utils, Error, Result};
|
use conduit::{checked, utils, Error, Result};
|
||||||
use database::{Database, Map};
|
use database::{Database, Map};
|
||||||
use ruma::{api::client::threads::get_threads::v1::IncludeThreads, OwnedUserId, RoomId, UserId};
|
use ruma::{api::client::threads::get_threads::v1::IncludeThreads, OwnedUserId, RoomId, UserId};
|
||||||
|
|
||||||
|
@ -31,7 +31,7 @@ impl Data {
|
||||||
.to_vec();
|
.to_vec();
|
||||||
|
|
||||||
let mut current = prefix.clone();
|
let mut current = prefix.clone();
|
||||||
current.extend_from_slice(&(until - 1).to_be_bytes());
|
current.extend_from_slice(&(checked!(until - 1)?).to_be_bytes());
|
||||||
|
|
||||||
Ok(Box::new(
|
Ok(Box::new(
|
||||||
self.threadid_userids
|
self.threadid_userids
|
||||||
|
|
|
@ -64,7 +64,7 @@ impl Service {
|
||||||
.and_then(|relations| serde_json::from_value::<BundledThread>(relations.clone().into()).ok())
|
.and_then(|relations| serde_json::from_value::<BundledThread>(relations.clone().into()).ok())
|
||||||
{
|
{
|
||||||
// Thread already existed
|
// Thread already existed
|
||||||
relations.count += uint!(1);
|
relations.count = relations.count.saturating_add(uint!(1));
|
||||||
relations.latest_event = pdu.to_message_like_event();
|
relations.latest_event = pdu.to_message_like_event();
|
||||||
|
|
||||||
let content = serde_json::to_value(relations).expect("to_value always works");
|
let content = serde_json::to_value(relations).expect("to_value always works");
|
||||||
|
|
|
@ -4,7 +4,7 @@ use std::{
|
||||||
sync::{Arc, Mutex},
|
sync::{Arc, Mutex},
|
||||||
};
|
};
|
||||||
|
|
||||||
use conduit::{error, utils, Error, Result};
|
use conduit::{checked, error, utils, Error, Result};
|
||||||
use database::{Database, Map};
|
use database::{Database, Map};
|
||||||
use ruma::{api::client::error::ErrorKind, CanonicalJsonObject, EventId, OwnedRoomId, OwnedUserId, RoomId, UserId};
|
use ruma::{api::client::error::ErrorKind, CanonicalJsonObject, EventId, OwnedRoomId, OwnedUserId, RoomId, UserId};
|
||||||
|
|
||||||
|
@ -281,10 +281,12 @@ impl Data {
|
||||||
|
|
||||||
/// Returns the `count` of this pdu's id.
|
/// Returns the `count` of this pdu's id.
|
||||||
pub(super) fn pdu_count(pdu_id: &[u8]) -> Result<PduCount> {
|
pub(super) fn pdu_count(pdu_id: &[u8]) -> Result<PduCount> {
|
||||||
let last_u64 = utils::u64_from_bytes(&pdu_id[pdu_id.len() - size_of::<u64>()..])
|
let stride = size_of::<u64>();
|
||||||
|
let pdu_id_len = pdu_id.len();
|
||||||
|
let last_u64 = utils::u64_from_bytes(&pdu_id[checked!(pdu_id_len - stride)?..])
|
||||||
.map_err(|_| Error::bad_database("PDU has invalid count bytes."))?;
|
.map_err(|_| Error::bad_database("PDU has invalid count bytes."))?;
|
||||||
let second_last_u64 =
|
let second_last_u64 =
|
||||||
utils::u64_from_bytes(&pdu_id[pdu_id.len() - 2 * size_of::<u64>()..pdu_id.len() - size_of::<u64>()]);
|
utils::u64_from_bytes(&pdu_id[checked!(pdu_id_len - 2 * stride)?..checked!(pdu_id_len - stride)?]);
|
||||||
|
|
||||||
if matches!(second_last_u64, Ok(0)) {
|
if matches!(second_last_u64, Ok(0)) {
|
||||||
Ok(PduCount::Backfilled(u64::MAX.saturating_sub(last_u64)))
|
Ok(PduCount::Backfilled(u64::MAX.saturating_sub(last_u64)))
|
||||||
|
|
|
@ -6,7 +6,7 @@ use std::{
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
};
|
};
|
||||||
|
|
||||||
use conduit::{debug, error, info, utils, utils::mutex_map, warn, Error, Result};
|
use conduit::{debug, error, info, utils, utils::mutex_map, validated, warn, Error, Result};
|
||||||
use data::Data;
|
use data::Data;
|
||||||
use itertools::Itertools;
|
use itertools::Itertools;
|
||||||
use rand::prelude::SliceRandom;
|
use rand::prelude::SliceRandom;
|
||||||
|
@ -670,7 +670,7 @@ impl Service {
|
||||||
.filter_map(|event_id| Some(self.get_pdu(event_id).ok()??.depth))
|
.filter_map(|event_id| Some(self.get_pdu(event_id).ok()??.depth))
|
||||||
.max()
|
.max()
|
||||||
.unwrap_or_else(|| uint!(0))
|
.unwrap_or_else(|| uint!(0))
|
||||||
+ uint!(1);
|
.saturating_add(uint!(1));
|
||||||
|
|
||||||
let mut unsigned = unsigned.unwrap_or_default();
|
let mut unsigned = unsigned.unwrap_or_default();
|
||||||
|
|
||||||
|
@ -1240,10 +1240,11 @@ impl Service {
|
||||||
|
|
||||||
let insert_lock = services().globals.roomid_mutex_insert.lock(&room_id).await;
|
let insert_lock = services().globals.roomid_mutex_insert.lock(&room_id).await;
|
||||||
|
|
||||||
|
let max = u64::MAX;
|
||||||
let count = services().globals.next_count()?;
|
let count = services().globals.next_count()?;
|
||||||
let mut pdu_id = shortroomid.to_be_bytes().to_vec();
|
let mut pdu_id = shortroomid.to_be_bytes().to_vec();
|
||||||
pdu_id.extend_from_slice(&0_u64.to_be_bytes());
|
pdu_id.extend_from_slice(&0_u64.to_be_bytes());
|
||||||
pdu_id.extend_from_slice(&(u64::MAX - count).to_be_bytes());
|
pdu_id.extend_from_slice(&(validated!(max - count)?).to_be_bytes());
|
||||||
|
|
||||||
// Insert pdu
|
// Insert pdu
|
||||||
self.db.prepend_backfill_pdu(&pdu_id, &event_id, &value)?;
|
self.db.prepend_backfill_pdu(&pdu_id, &event_id, &value)?;
|
||||||
|
|
|
@ -93,7 +93,7 @@ impl Service {
|
||||||
statuses.entry(dest).and_modify(|e| {
|
statuses.entry(dest).and_modify(|e| {
|
||||||
*e = match e {
|
*e = match e {
|
||||||
TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()),
|
TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()),
|
||||||
TransactionStatus::Retrying(n) => TransactionStatus::Failed(*n + 1, Instant::now()),
|
TransactionStatus::Retrying(ref n) => TransactionStatus::Failed(n.saturating_add(1), Instant::now()),
|
||||||
TransactionStatus::Failed(..) => panic!("Request that was not even running failed?!"),
|
TransactionStatus::Failed(..) => panic!("Request that was not even running failed?!"),
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -463,7 +463,8 @@ impl Data {
|
||||||
.algorithm(),
|
.algorithm(),
|
||||||
)
|
)
|
||||||
}) {
|
}) {
|
||||||
*counts.entry(algorithm?).or_default() += uint!(1);
|
let count: &mut UInt = counts.entry(algorithm?).or_default();
|
||||||
|
*count = count.saturating_add(uint!(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(counts)
|
Ok(counts)
|
||||||
|
@ -814,7 +815,7 @@ impl Data {
|
||||||
.map(|(key, _)| {
|
.map(|(key, _)| {
|
||||||
Ok::<_, Error>((
|
Ok::<_, Error>((
|
||||||
key.clone(),
|
key.clone(),
|
||||||
utils::u64_from_bytes(&key[key.len() - size_of::<u64>()..key.len()])
|
utils::u64_from_bytes(&key[key.len().saturating_sub(size_of::<u64>())..key.len()])
|
||||||
.map_err(|_| Error::bad_database("ToDeviceId has invalid count bytes."))?,
|
.map_err(|_| Error::bad_database("ToDeviceId has invalid count bytes."))?,
|
||||||
))
|
))
|
||||||
})
|
})
|
||||||
|
@ -928,10 +929,12 @@ impl Data {
|
||||||
/// Creates an OpenID token, which can be used to prove that a user has
|
/// Creates an OpenID token, which can be used to prove that a user has
|
||||||
/// access to an account (primarily for integrations)
|
/// access to an account (primarily for integrations)
|
||||||
pub(super) fn create_openid_token(&self, user_id: &UserId, token: &str) -> Result<u64> {
|
pub(super) fn create_openid_token(&self, user_id: &UserId, token: &str) -> Result<u64> {
|
||||||
let expires_in = services().globals.config.openid_token_ttl;
|
use std::num::Saturating as Sat;
|
||||||
let expires_at = utils::millis_since_unix_epoch().saturating_add(expires_in * 1000);
|
|
||||||
|
|
||||||
let mut value = expires_at.to_be_bytes().to_vec();
|
let expires_in = services().globals.config.openid_token_ttl;
|
||||||
|
let expires_at = Sat(utils::millis_since_unix_epoch()) + Sat(expires_in) * Sat(1000);
|
||||||
|
|
||||||
|
let mut value = expires_at.0.to_be_bytes().to_vec();
|
||||||
value.extend_from_slice(user_id.as_bytes());
|
value.extend_from_slice(user_id.as_bytes());
|
||||||
|
|
||||||
self.openidtoken_expiresatuserid
|
self.openidtoken_expiresatuserid
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue