tweak tracing spans; inlines

db deserializer tracing instrument cover

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-01-02 05:30:51 +00:00
parent 685b127f99
commit 02f19cf951
16 changed files with 146 additions and 34 deletions

View file

@ -19,8 +19,8 @@ type Value<Val> = Arc<tokio::sync::Mutex<Val>>;
impl<Key, Val> MutexMap<Key, Val> impl<Key, Val> MutexMap<Key, Val>
where where
Key: Send + Hash + Eq + Clone, Key: Clone + Eq + Hash + Send,
Val: Send + Default, Val: Default + Send,
{ {
#[must_use] #[must_use]
pub fn new() -> Self { pub fn new() -> Self {
@ -29,10 +29,10 @@ where
} }
} }
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(level = "trace", skip(self))]
pub async fn lock<K>(&self, k: &K) -> Guard<Key, Val> pub async fn lock<K>(&self, k: &K) -> Guard<Key, Val>
where where
K: ?Sized + Send + Sync + Debug, K: Debug + Send + ?Sized + Sync,
Key: for<'a> From<&'a K>, Key: for<'a> From<&'a K>,
{ {
let val = self let val = self
@ -61,13 +61,14 @@ where
impl<Key, Val> Default for MutexMap<Key, Val> impl<Key, Val> Default for MutexMap<Key, Val>
where where
Key: Send + Hash + Eq + Clone, Key: Clone + Eq + Hash + Send,
Val: Send + Default, Val: Default + Send,
{ {
fn default() -> Self { Self::new() } fn default() -> Self { Self::new() }
} }
impl<Key, Val> Drop for Guard<Key, Val> { impl<Key, Val> Drop for Guard<Key, Val> {
#[tracing::instrument(name = "unlock", level = "trace", skip_all)]
fn drop(&mut self) { fn drop(&mut self) {
if Arc::strong_count(Omg::mutex(&self.val)) <= 2 { if Arc::strong_count(Omg::mutex(&self.val)) <= 2 {
self.map.lock().expect("locked").retain(|_, val| { self.map.lock().expect("locked").retain(|_, val| {

View file

@ -9,6 +9,15 @@ use serde::{
use crate::util::unhandled; use crate::util::unhandled;
/// Deserialize into T from buffer. /// Deserialize into T from buffer.
#[cfg_attr(
unabridged,
tracing::instrument(
name = "deserialize",
level = "trace",
skip_all,
fields(len = %buf.len()),
)
)]
pub(crate) fn from_slice<'a, T>(buf: &'a [u8]) -> Result<T> pub(crate) fn from_slice<'a, T>(buf: &'a [u8]) -> Result<T>
where where
T: Deserialize<'a>, T: Deserialize<'a>,
@ -132,6 +141,17 @@ impl<'de> Deserializer<'de> {
/// Increment the position pointer. /// Increment the position pointer.
#[inline] #[inline]
#[cfg_attr(
unabridged,
tracing::instrument(
level = "trace",
skip(self),
fields(
len = self.buf.len(),
rem = self.remaining().unwrap_or_default().saturating_sub(n),
),
)
)]
fn inc_pos(&mut self, n: usize) { fn inc_pos(&mut self, n: usize) {
self.pos = self.pos.saturating_add(n); self.pos = self.pos.saturating_add(n);
debug_assert!(self.pos <= self.buf.len(), "pos out of range"); debug_assert!(self.pos <= self.buf.len(), "pos out of range");
@ -149,6 +169,7 @@ impl<'de> Deserializer<'de> {
impl<'a, 'de: 'a> de::Deserializer<'de> for &'a mut Deserializer<'de> { impl<'a, 'de: 'a> de::Deserializer<'de> for &'a mut Deserializer<'de> {
type Error = Error; type Error = Error;
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
fn deserialize_seq<V>(self, visitor: V) -> Result<V::Value> fn deserialize_seq<V>(self, visitor: V) -> Result<V::Value>
where where
V: Visitor<'de>, V: Visitor<'de>,
@ -157,6 +178,7 @@ impl<'a, 'de: 'a> de::Deserializer<'de> for &'a mut Deserializer<'de> {
visitor.visit_seq(self) visitor.visit_seq(self)
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip(self, visitor)))]
fn deserialize_tuple<V>(self, _len: usize, visitor: V) -> Result<V::Value> fn deserialize_tuple<V>(self, _len: usize, visitor: V) -> Result<V::Value>
where where
V: Visitor<'de>, V: Visitor<'de>,
@ -165,6 +187,7 @@ impl<'a, 'de: 'a> de::Deserializer<'de> for &'a mut Deserializer<'de> {
visitor.visit_seq(self) visitor.visit_seq(self)
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip(self, visitor)))]
fn deserialize_tuple_struct<V>( fn deserialize_tuple_struct<V>(
self, self,
_name: &'static str, _name: &'static str,
@ -178,6 +201,7 @@ impl<'a, 'de: 'a> de::Deserializer<'de> for &'a mut Deserializer<'de> {
visitor.visit_seq(self) visitor.visit_seq(self)
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
fn deserialize_map<V>(self, visitor: V) -> Result<V::Value> fn deserialize_map<V>(self, visitor: V) -> Result<V::Value>
where where
V: Visitor<'de>, V: Visitor<'de>,
@ -187,6 +211,7 @@ impl<'a, 'de: 'a> de::Deserializer<'de> for &'a mut Deserializer<'de> {
d.deserialize_map(visitor).map_err(Into::into) d.deserialize_map(visitor).map_err(Into::into)
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip(self, visitor)))]
fn deserialize_struct<V>( fn deserialize_struct<V>(
self, self,
name: &'static str, name: &'static str,
@ -202,6 +227,7 @@ impl<'a, 'de: 'a> de::Deserializer<'de> for &'a mut Deserializer<'de> {
.map_err(Into::into) .map_err(Into::into)
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip(self, visitor)))]
fn deserialize_unit_struct<V>(self, name: &'static str, visitor: V) -> Result<V::Value> fn deserialize_unit_struct<V>(self, name: &'static str, visitor: V) -> Result<V::Value>
where where
V: Visitor<'de>, V: Visitor<'de>,
@ -215,6 +241,7 @@ impl<'a, 'de: 'a> de::Deserializer<'de> for &'a mut Deserializer<'de> {
visitor.visit_unit() visitor.visit_unit()
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip(self, visitor)))]
fn deserialize_newtype_struct<V>(self, name: &'static str, visitor: V) -> Result<V::Value> fn deserialize_newtype_struct<V>(self, name: &'static str, visitor: V) -> Result<V::Value>
where where
V: Visitor<'de>, V: Visitor<'de>,
@ -225,6 +252,7 @@ impl<'a, 'de: 'a> de::Deserializer<'de> for &'a mut Deserializer<'de> {
} }
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip(self, _visitor)))]
fn deserialize_enum<V>( fn deserialize_enum<V>(
self, self,
_name: &'static str, _name: &'static str,
@ -237,26 +265,32 @@ impl<'a, 'de: 'a> de::Deserializer<'de> for &'a mut Deserializer<'de> {
unhandled!("deserialize Enum not implemented") unhandled!("deserialize Enum not implemented")
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
fn deserialize_option<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> { fn deserialize_option<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> {
unhandled!("deserialize Option not implemented") unhandled!("deserialize Option not implemented")
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
fn deserialize_bool<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> { fn deserialize_bool<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> {
unhandled!("deserialize bool not implemented") unhandled!("deserialize bool not implemented")
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
fn deserialize_i8<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> { fn deserialize_i8<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> {
unhandled!("deserialize i8 not implemented") unhandled!("deserialize i8 not implemented")
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
fn deserialize_i16<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> { fn deserialize_i16<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> {
unhandled!("deserialize i16 not implemented") unhandled!("deserialize i16 not implemented")
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
fn deserialize_i32<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> { fn deserialize_i32<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> {
unhandled!("deserialize i32 not implemented") unhandled!("deserialize i32 not implemented")
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
fn deserialize_i64<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value> { fn deserialize_i64<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value> {
const BYTES: usize = size_of::<i64>(); const BYTES: usize = size_of::<i64>();
@ -268,6 +302,7 @@ impl<'a, 'de: 'a> de::Deserializer<'de> for &'a mut Deserializer<'de> {
visitor.visit_i64(i64::from_be_bytes(bytes)) visitor.visit_i64(i64::from_be_bytes(bytes))
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
fn deserialize_u8<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> { fn deserialize_u8<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> {
unhandled!( unhandled!(
"deserialize u8 not implemented; try dereferencing the Handle for [u8] access \ "deserialize u8 not implemented; try dereferencing the Handle for [u8] access \
@ -275,14 +310,17 @@ impl<'a, 'de: 'a> de::Deserializer<'de> for &'a mut Deserializer<'de> {
) )
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
fn deserialize_u16<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> { fn deserialize_u16<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> {
unhandled!("deserialize u16 not implemented") unhandled!("deserialize u16 not implemented")
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
fn deserialize_u32<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> { fn deserialize_u32<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> {
unhandled!("deserialize u32 not implemented") unhandled!("deserialize u32 not implemented")
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
fn deserialize_u64<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value> { fn deserialize_u64<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value> {
const BYTES: usize = size_of::<u64>(); const BYTES: usize = size_of::<u64>();
@ -294,53 +332,67 @@ impl<'a, 'de: 'a> de::Deserializer<'de> for &'a mut Deserializer<'de> {
visitor.visit_u64(u64::from_be_bytes(bytes)) visitor.visit_u64(u64::from_be_bytes(bytes))
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
fn deserialize_f32<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> { fn deserialize_f32<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> {
unhandled!("deserialize f32 not implemented") unhandled!("deserialize f32 not implemented")
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
fn deserialize_f64<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> { fn deserialize_f64<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> {
unhandled!("deserialize f64 not implemented") unhandled!("deserialize f64 not implemented")
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
fn deserialize_char<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> { fn deserialize_char<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> {
unhandled!("deserialize char not implemented") unhandled!("deserialize char not implemented")
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
fn deserialize_str<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value> { fn deserialize_str<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value> {
let input = self.record_next(); let input = self.record_next();
let out = deserialize_str(input)?; let out = deserialize_str(input)?;
visitor.visit_borrowed_str(out) visitor.visit_borrowed_str(out)
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
fn deserialize_string<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value> { fn deserialize_string<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value> {
let input = self.record_next(); let input = self.record_next();
let out = string::string_from_bytes(input)?; let out = string::string_from_bytes(input)?;
visitor.visit_string(out) visitor.visit_string(out)
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
fn deserialize_bytes<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value> { fn deserialize_bytes<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value> {
let input = self.record_trail(); let input = self.record_trail();
visitor.visit_borrowed_bytes(input) visitor.visit_borrowed_bytes(input)
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
fn deserialize_byte_buf<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> { fn deserialize_byte_buf<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> {
unhandled!("deserialize Byte Buf not implemented") unhandled!("deserialize Byte Buf not implemented")
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
fn deserialize_unit<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> { fn deserialize_unit<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> {
unhandled!("deserialize Unit not implemented") unhandled!("deserialize Unit not implemented")
} }
// this only used for $serde_json::private::RawValue at this time; see MapAccess // this only used for $serde_json::private::RawValue at this time; see MapAccess
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
fn deserialize_identifier<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value> { fn deserialize_identifier<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value> {
let input = "$serde_json::private::RawValue"; let input = "$serde_json::private::RawValue";
visitor.visit_borrowed_str(input) visitor.visit_borrowed_str(input)
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
fn deserialize_ignored_any<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> { fn deserialize_ignored_any<V: Visitor<'de>>(self, _visitor: V) -> Result<V::Value> {
unhandled!("deserialize Ignored Any not implemented") unhandled!("deserialize Ignored Any not implemented")
} }
#[cfg_attr(
unabridged,
tracing::instrument(level = "trace", skip_all, fields(?self.buf))
)]
fn deserialize_any<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value> { fn deserialize_any<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value> {
debug_assert_eq!( debug_assert_eq!(
conduwuit::debug::type_name::<V>(), conduwuit::debug::type_name::<V>(),
@ -363,6 +415,7 @@ impl<'a, 'de: 'a> de::Deserializer<'de> for &'a mut Deserializer<'de> {
impl<'a, 'de: 'a> de::SeqAccess<'de> for &'a mut Deserializer<'de> { impl<'a, 'de: 'a> de::SeqAccess<'de> for &'a mut Deserializer<'de> {
type Error = Error; type Error = Error;
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip(self, seed)))]
fn next_element_seed<T>(&mut self, seed: T) -> Result<Option<T::Value>> fn next_element_seed<T>(&mut self, seed: T) -> Result<Option<T::Value>>
where where
T: DeserializeSeed<'de>, T: DeserializeSeed<'de>,
@ -381,6 +434,7 @@ impl<'a, 'de: 'a> de::SeqAccess<'de> for &'a mut Deserializer<'de> {
impl<'a, 'de: 'a> de::MapAccess<'de> for &'a mut Deserializer<'de> { impl<'a, 'de: 'a> de::MapAccess<'de> for &'a mut Deserializer<'de> {
type Error = Error; type Error = Error;
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip(self, seed)))]
fn next_key_seed<K>(&mut self, seed: K) -> Result<Option<K::Value>> fn next_key_seed<K>(&mut self, seed: K) -> Result<Option<K::Value>>
where where
K: DeserializeSeed<'de>, K: DeserializeSeed<'de>,
@ -388,6 +442,7 @@ impl<'a, 'de: 'a> de::MapAccess<'de> for &'a mut Deserializer<'de> {
seed.deserialize(&mut **self).map(Some) seed.deserialize(&mut **self).map(Some)
} }
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip(self, seed)))]
fn next_value_seed<V>(&mut self, seed: V) -> Result<V::Value> fn next_value_seed<V>(&mut self, seed: V) -> Result<V::Value>
where where
V: DeserializeSeed<'de>, V: DeserializeSeed<'de>,

View file

@ -157,11 +157,13 @@ impl Engine {
#[inline] #[inline]
pub fn corked(&self) -> bool { self.corks.load(std::sync::atomic::Ordering::Relaxed) > 0 } pub fn corked(&self) -> bool { self.corks.load(std::sync::atomic::Ordering::Relaxed) > 0 }
#[inline]
pub(crate) fn cork(&self) { pub(crate) fn cork(&self) {
self.corks self.corks
.fetch_add(1, std::sync::atomic::Ordering::Relaxed); .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
} }
#[inline]
pub(crate) fn uncork(&self) { pub(crate) fn uncork(&self) {
self.corks self.corks
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed); .fetch_sub(1, std::sync::atomic::Ordering::Relaxed);

View file

@ -207,8 +207,6 @@ pub(crate) async fn execute_get(self: &Arc<Self>, mut cmd: Get) -> Result<BatchR
.map_err(|e| err!(error!("recv failed {e:?}"))) .map_err(|e| err!(error!("recv failed {e:?}")))
}) })
.await .await
.map(Into::into)
.map_err(Into::into)
} }
#[implement(Pool)] #[implement(Pool)]

View file

@ -22,7 +22,9 @@ where
Ok(buf) Ok(buf)
} }
/// Serialize T into Writer W
#[inline] #[inline]
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
pub fn serialize<'a, W, T>(out: &'a mut W, val: T) -> Result<&'a [u8]> pub fn serialize<'a, W, T>(out: &'a mut W, val: T) -> Result<&'a [u8]>
where where
W: Write + AsRef<[u8]> + 'a, W: Write + AsRef<[u8]> + 'a,

View file

@ -29,12 +29,14 @@ pub(crate) trait Cursor<'a, T> {
fn seek(&mut self); fn seek(&mut self);
#[inline]
fn get(&self) -> Option<Result<T>> { fn get(&self) -> Option<Result<T>> {
self.fetch() self.fetch()
.map(Ok) .map(Ok)
.or_else(|| self.state().status().map(map_err).map(Err)) .or_else(|| self.state().status().map(map_err).map(Err))
} }
#[inline]
fn seek_and_get(&mut self) -> Option<Result<T>> { fn seek_and_get(&mut self) -> Option<Result<T>> {
self.seek(); self.seek();
self.get() self.get()
@ -45,6 +47,7 @@ type Inner<'a> = DBRawIteratorWithThreadMode<'a, Db>;
type From<'a> = Option<Key<'a>>; type From<'a> = Option<Key<'a>>;
impl<'a> State<'a> { impl<'a> State<'a> {
#[inline]
pub(super) fn new(map: &'a Arc<Map>, opts: ReadOptions) -> Self { pub(super) fn new(map: &'a Arc<Map>, opts: ReadOptions) -> Self {
Self { Self {
inner: map.db().db.raw_iterator_cf_opt(&map.cf(), opts), inner: map.db().db.raw_iterator_cf_opt(&map.cf(), opts),
@ -53,6 +56,8 @@ impl<'a> State<'a> {
} }
} }
#[inline]
#[tracing::instrument(level = "trace", skip_all)]
pub(super) fn init_fwd(mut self, from: From<'_>) -> Self { pub(super) fn init_fwd(mut self, from: From<'_>) -> Self {
debug_assert!(self.init, "init must be set to make this call"); debug_assert!(self.init, "init must be set to make this call");
debug_assert!(!self.seek, "seek must not be set to make this call"); debug_assert!(!self.seek, "seek must not be set to make this call");
@ -67,6 +72,8 @@ impl<'a> State<'a> {
self self
} }
#[inline]
#[tracing::instrument(level = "trace", skip_all)]
pub(super) fn init_rev(mut self, from: From<'_>) -> Self { pub(super) fn init_rev(mut self, from: From<'_>) -> Self {
debug_assert!(self.init, "init must be set to make this call"); debug_assert!(self.init, "init must be set to make this call");
debug_assert!(!self.seek, "seek must not be set to make this call"); debug_assert!(!self.seek, "seek must not be set to make this call");
@ -82,6 +89,7 @@ impl<'a> State<'a> {
} }
#[inline] #[inline]
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
pub(super) fn seek_fwd(&mut self) { pub(super) fn seek_fwd(&mut self) {
if !exchange(&mut self.init, false) { if !exchange(&mut self.init, false) {
self.inner.next(); self.inner.next();
@ -91,6 +99,7 @@ impl<'a> State<'a> {
} }
#[inline] #[inline]
#[cfg_attr(unabridged, tracing::instrument(level = "trace", skip_all))]
pub(super) fn seek_rev(&mut self) { pub(super) fn seek_rev(&mut self) {
if !exchange(&mut self.init, false) { if !exchange(&mut self.init, false) {
self.inner.prev(); self.inner.prev();
@ -103,12 +112,16 @@ impl<'a> State<'a> {
matches!(self.status(), Some(e) if is_incomplete(&e)) matches!(self.status(), Some(e) if is_incomplete(&e))
} }
#[inline]
fn fetch_key(&self) -> Option<Key<'_>> { self.inner.key().map(Key::from) } fn fetch_key(&self) -> Option<Key<'_>> { self.inner.key().map(Key::from) }
#[inline]
fn _fetch_val(&self) -> Option<Val<'_>> { self.inner.value().map(Val::from) } fn _fetch_val(&self) -> Option<Val<'_>> { self.inner.value().map(Val::from) }
#[inline]
fn fetch(&self) -> Option<KeyVal<'_>> { self.inner.item().map(KeyVal::from) } fn fetch(&self) -> Option<KeyVal<'_>> { self.inner.item().map(KeyVal::from) }
#[inline]
pub(super) fn status(&self) -> Option<rocksdb::Error> { self.inner.status().err() } pub(super) fn status(&self) -> Option<rocksdb::Error> { self.inner.status().err() }
#[inline] #[inline]

View file

@ -15,12 +15,15 @@ pub(crate) struct Items<'a> {
} }
impl<'a> From<State<'a>> for Items<'a> { impl<'a> From<State<'a>> for Items<'a> {
#[inline]
fn from(state: State<'a>) -> Self { Self { state } } fn from(state: State<'a>) -> Self { Self { state } }
} }
impl<'a> Cursor<'a, KeyVal<'a>> for Items<'a> { impl<'a> Cursor<'a, KeyVal<'a>> for Items<'a> {
#[inline]
fn state(&self) -> &State<'a> { &self.state } fn state(&self) -> &State<'a> { &self.state }
#[inline]
fn fetch(&self) -> Option<KeyVal<'a>> { self.state.fetch().map(keyval_longevity) } fn fetch(&self) -> Option<KeyVal<'a>> { self.state.fetch().map(keyval_longevity) }
#[inline] #[inline]

View file

@ -15,12 +15,15 @@ pub(crate) struct ItemsRev<'a> {
} }
impl<'a> From<State<'a>> for ItemsRev<'a> { impl<'a> From<State<'a>> for ItemsRev<'a> {
#[inline]
fn from(state: State<'a>) -> Self { Self { state } } fn from(state: State<'a>) -> Self { Self { state } }
} }
impl<'a> Cursor<'a, KeyVal<'a>> for ItemsRev<'a> { impl<'a> Cursor<'a, KeyVal<'a>> for ItemsRev<'a> {
#[inline]
fn state(&self) -> &State<'a> { &self.state } fn state(&self) -> &State<'a> { &self.state }
#[inline]
fn fetch(&self) -> Option<KeyVal<'a>> { self.state.fetch().map(keyval_longevity) } fn fetch(&self) -> Option<KeyVal<'a>> { self.state.fetch().map(keyval_longevity) }
#[inline] #[inline]

View file

@ -15,10 +15,12 @@ pub(crate) struct Keys<'a> {
} }
impl<'a> From<State<'a>> for Keys<'a> { impl<'a> From<State<'a>> for Keys<'a> {
#[inline]
fn from(state: State<'a>) -> Self { Self { state } } fn from(state: State<'a>) -> Self { Self { state } }
} }
impl<'a> Cursor<'a, Key<'a>> for Keys<'a> { impl<'a> Cursor<'a, Key<'a>> for Keys<'a> {
#[inline]
fn state(&self) -> &State<'a> { &self.state } fn state(&self) -> &State<'a> { &self.state }
#[inline] #[inline]

View file

@ -15,10 +15,12 @@ pub(crate) struct KeysRev<'a> {
} }
impl<'a> From<State<'a>> for KeysRev<'a> { impl<'a> From<State<'a>> for KeysRev<'a> {
#[inline]
fn from(state: State<'a>) -> Self { Self { state } } fn from(state: State<'a>) -> Self { Self { state } }
} }
impl<'a> Cursor<'a, Key<'a>> for KeysRev<'a> { impl<'a> Cursor<'a, Key<'a>> for KeysRev<'a> {
#[inline]
fn state(&self) -> &State<'a> { &self.state } fn state(&self) -> &State<'a> { &self.state }
#[inline] #[inline]

View file

@ -15,8 +15,12 @@ use ruma::{
use super::check_room_id; use super::check_room_id;
#[implement(super::Service)] #[implement(super::Service)]
#[tracing::instrument(
level = "warn",
skip_all,
fields(%origin),
)]
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
#[tracing::instrument(skip_all)]
pub(super) async fn fetch_prev( pub(super) async fn fetch_prev(
&self, &self,
origin: &ServerName, origin: &ServerName,

View file

@ -1,6 +1,6 @@
use std::collections::{hash_map, HashMap}; use std::collections::{hash_map, HashMap};
use conduwuit::{debug, implement, warn, Err, Error, PduEvent, Result}; use conduwuit::{debug, debug_warn, implement, Err, Error, PduEvent, Result};
use futures::FutureExt; use futures::FutureExt;
use ruma::{ use ruma::{
api::federation::event::get_room_state_ids, events::StateEventType, EventId, OwnedEventId, api::federation::event::get_room_state_ids, events::StateEventType, EventId, OwnedEventId,
@ -13,7 +13,11 @@ use crate::rooms::short::ShortStateKey;
/// server's response to some extend (sic), but we still do a lot of checks /// server's response to some extend (sic), but we still do a lot of checks
/// on the events /// on the events
#[implement(super::Service)] #[implement(super::Service)]
#[tracing::instrument(skip(self, create_event, room_version_id))] #[tracing::instrument(
level = "warn",
skip_all,
fields(%origin),
)]
pub(super) async fn fetch_state( pub(super) async fn fetch_state(
&self, &self,
origin: &ServerName, origin: &ServerName,
@ -22,7 +26,6 @@ pub(super) async fn fetch_state(
room_version_id: &RoomVersionId, room_version_id: &RoomVersionId,
event_id: &EventId, event_id: &EventId,
) -> Result<Option<HashMap<u64, OwnedEventId>>> { ) -> Result<Option<HashMap<u64, OwnedEventId>>> {
debug!("Fetching state ids");
let res = self let res = self
.services .services
.sending .sending
@ -31,7 +34,7 @@ pub(super) async fn fetch_state(
event_id: event_id.to_owned(), event_id: event_id.to_owned(),
}) })
.await .await
.inspect_err(|e| warn!("Fetching state for event failed: {e}"))?; .inspect_err(|e| debug_warn!("Fetching state for event failed: {e}"))?;
debug!("Fetching state events"); debug!("Fetching state events");
let state_vec = self let state_vec = self

View file

@ -39,7 +39,12 @@ use crate::rooms::timeline::RawPduId;
/// 14. Check if the event passes auth based on the "current state" of the room, /// 14. Check if the event passes auth based on the "current state" of the room,
/// if not soft fail it /// if not soft fail it
#[implement(super::Service)] #[implement(super::Service)]
#[tracing::instrument(skip(self, origin, value, is_timeline_event), name = "pdu")] #[tracing::instrument(
name = "pdu",
level = "warn",
skip_all,
fields(%room_id, %event_id),
)]
pub async fn handle_incoming_pdu<'a>( pub async fn handle_incoming_pdu<'a>(
&self, &self,
origin: &'a ServerName, origin: &'a ServerName,

View file

@ -13,8 +13,10 @@ use ruma::{CanonicalJsonValue, EventId, OwnedEventId, RoomId, ServerName};
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
#[tracing::instrument( #[tracing::instrument(
skip(self, origin, event_id, room_id, eventid_info, create_event, first_pdu_in_room), name = "prev",
name = "prev" level = "warn",
skip_all,
fields(%prev_id),
)] )]
pub(super) async fn handle_prev_pdu<'a>( pub(super) async fn handle_prev_pdu<'a>(
&self, &self,

View file

@ -95,7 +95,16 @@ impl crate::Service for Service {
impl Service { impl Service {
/// Update current membership data. /// Update current membership data.
#[tracing::instrument(skip(self, last_state))] #[tracing::instrument(
level = "debug",
skip_all,
fields(
%room_id,
%user_id,
%sender,
?membership_event,
),
)]
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]
pub async fn update_membership( pub async fn update_membership(
&self, &self,
@ -265,7 +274,7 @@ impl Service {
Ok(()) Ok(())
} }
#[tracing::instrument(skip(self, room_id, appservice), level = "debug")] #[tracing::instrument(level = "trace", skip_all)]
pub async fn appservice_in_room( pub async fn appservice_in_room(
&self, &self,
room_id: &RoomId, room_id: &RoomId,
@ -383,7 +392,7 @@ impl Service {
.map(|(_, server): (Ignore, &ServerName)| server) .map(|(_, server): (Ignore, &ServerName)| server)
} }
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "trace")]
pub async fn server_in_room<'a>( pub async fn server_in_room<'a>(
&'a self, &'a self,
server: &'a ServerName, server: &'a ServerName,
@ -409,7 +418,7 @@ impl Service {
} }
/// Returns true if server can see user by sharing at least one room. /// Returns true if server can see user by sharing at least one room.
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "trace")]
pub async fn server_sees_user(&self, server: &ServerName, user_id: &UserId) -> bool { pub async fn server_sees_user(&self, server: &ServerName, user_id: &UserId) -> bool {
self.server_rooms(server) self.server_rooms(server)
.any(|room_id| self.is_joined(user_id, room_id)) .any(|room_id| self.is_joined(user_id, room_id))
@ -417,7 +426,7 @@ impl Service {
} }
/// Returns true if user_a and user_b share at least one room. /// Returns true if user_a and user_b share at least one room.
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "trace")]
pub async fn user_sees_user(&self, user_a: &UserId, user_b: &UserId) -> bool { pub async fn user_sees_user(&self, user_a: &UserId, user_b: &UserId) -> bool {
let get_shared_rooms = self.get_shared_rooms(user_a, user_b); let get_shared_rooms = self.get_shared_rooms(user_a, user_b);
@ -426,6 +435,7 @@ impl Service {
} }
/// List the rooms common between two users /// List the rooms common between two users
#[tracing::instrument(skip(self), level = "debug")]
pub fn get_shared_rooms<'a>( pub fn get_shared_rooms<'a>(
&'a self, &'a self,
user_a: &'a UserId, user_a: &'a UserId,
@ -453,7 +463,7 @@ impl Service {
} }
/// Returns the number of users which are currently in a room /// Returns the number of users which are currently in a room
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "trace")]
pub async fn room_joined_count(&self, room_id: &RoomId) -> Result<u64> { pub async fn room_joined_count(&self, room_id: &RoomId) -> Result<u64> {
self.db.roomid_joinedcount.get(room_id).await.deserialized() self.db.roomid_joinedcount.get(room_id).await.deserialized()
} }
@ -469,9 +479,9 @@ impl Service {
.ready_filter(|user| self.services.globals.user_is_local(user)) .ready_filter(|user| self.services.globals.user_is_local(user))
} }
#[tracing::instrument(skip(self), level = "debug")]
/// Returns an iterator of all our local joined users in a room who are /// Returns an iterator of all our local joined users in a room who are
/// active (not deactivated, not guest) /// active (not deactivated, not guest)
#[tracing::instrument(skip(self), level = "trace")]
pub fn active_local_users_in_room<'a>( pub fn active_local_users_in_room<'a>(
&'a self, &'a self,
room_id: &'a RoomId, room_id: &'a RoomId,
@ -481,7 +491,7 @@ impl Service {
} }
/// Returns the number of users which are currently invited to a room /// Returns the number of users which are currently invited to a room
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "trace")]
pub async fn room_invited_count(&self, room_id: &RoomId) -> Result<u64> { pub async fn room_invited_count(&self, room_id: &RoomId) -> Result<u64> {
self.db self.db
.roomid_invitedcount .roomid_invitedcount
@ -518,7 +528,7 @@ impl Service {
.map(|(_, user_id): (Ignore, &UserId)| user_id) .map(|(_, user_id): (Ignore, &UserId)| user_id)
} }
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "trace")]
pub async fn get_invite_count(&self, room_id: &RoomId, user_id: &UserId) -> Result<u64> { pub async fn get_invite_count(&self, room_id: &RoomId, user_id: &UserId) -> Result<u64> {
let key = (room_id, user_id); let key = (room_id, user_id);
self.db self.db
@ -528,7 +538,7 @@ impl Service {
.deserialized() .deserialized()
} }
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "trace")]
pub async fn get_left_count(&self, room_id: &RoomId, user_id: &UserId) -> Result<u64> { pub async fn get_left_count(&self, room_id: &RoomId, user_id: &UserId) -> Result<u64> {
let key = (room_id, user_id); let key = (room_id, user_id);
self.db.roomuserid_leftcount.qry(&key).await.deserialized() self.db.roomuserid_leftcount.qry(&key).await.deserialized()
@ -566,7 +576,7 @@ impl Service {
.ignore_err() .ignore_err()
} }
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "trace")]
pub async fn invite_state( pub async fn invite_state(
&self, &self,
user_id: &UserId, user_id: &UserId,
@ -583,7 +593,7 @@ impl Service {
}) })
} }
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "trace")]
pub async fn left_state( pub async fn left_state(
&self, &self,
user_id: &UserId, user_id: &UserId,
@ -625,24 +635,25 @@ impl Service {
self.db.roomuseroncejoinedids.qry(&key).await.is_ok() self.db.roomuseroncejoinedids.qry(&key).await.is_ok()
} }
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "trace")]
pub async fn is_joined<'a>(&'a self, user_id: &'a UserId, room_id: &'a RoomId) -> bool { pub async fn is_joined<'a>(&'a self, user_id: &'a UserId, room_id: &'a RoomId) -> bool {
let key = (user_id, room_id); let key = (user_id, room_id);
self.db.userroomid_joined.qry(&key).await.is_ok() self.db.userroomid_joined.qry(&key).await.is_ok()
} }
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "trace")]
pub async fn is_invited(&self, user_id: &UserId, room_id: &RoomId) -> bool { pub async fn is_invited(&self, user_id: &UserId, room_id: &RoomId) -> bool {
let key = (user_id, room_id); let key = (user_id, room_id);
self.db.userroomid_invitestate.qry(&key).await.is_ok() self.db.userroomid_invitestate.qry(&key).await.is_ok()
} }
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "trace")]
pub async fn is_left(&self, user_id: &UserId, room_id: &RoomId) -> bool { pub async fn is_left(&self, user_id: &UserId, room_id: &RoomId) -> bool {
let key = (user_id, room_id); let key = (user_id, room_id);
self.db.userroomid_leftstate.qry(&key).await.is_ok() self.db.userroomid_leftstate.qry(&key).await.is_ok()
} }
#[tracing::instrument(skip(self), level = "trace")]
pub async fn user_membership( pub async fn user_membership(
&self, &self,
user_id: &UserId, user_id: &UserId,
@ -683,7 +694,7 @@ impl Service {
/// distant future. /// distant future.
/// ///
/// See <https://spec.matrix.org/latest/appendices/#routing> /// See <https://spec.matrix.org/latest/appendices/#routing>
#[tracing::instrument(skip(self), level = "debug")] #[tracing::instrument(skip(self), level = "trace")]
pub async fn servers_route_via(&self, room_id: &RoomId) -> Result<Vec<OwnedServerName>> { pub async fn servers_route_via(&self, room_id: &RoomId) -> Result<Vec<OwnedServerName>> {
let most_powerful_user_server = self let most_powerful_user_server = self
.services .services
@ -724,6 +735,7 @@ impl Service {
(cache.len(), cache.capacity()) (cache.len(), cache.capacity())
} }
#[tracing::instrument(level = "debug", skip_all)]
pub fn clear_appservice_in_room_cache(&self) { pub fn clear_appservice_in_room_cache(&self) {
self.appservice_in_room_cache self.appservice_in_room_cache
.write() .write()
@ -731,6 +743,7 @@ impl Service {
.clear(); .clear();
} }
#[tracing::instrument(level = "debug", skip(self))]
pub async fn update_joined_count(&self, room_id: &RoomId) { pub async fn update_joined_count(&self, room_id: &RoomId) {
let mut joinedcount = 0_u64; let mut joinedcount = 0_u64;
let mut invitedcount = 0_u64; let mut invitedcount = 0_u64;
@ -784,11 +797,13 @@ impl Service {
.remove(room_id); .remove(room_id);
} }
#[tracing::instrument(level = "debug", skip(self))]
fn mark_as_once_joined(&self, user_id: &UserId, room_id: &RoomId) { fn mark_as_once_joined(&self, user_id: &UserId, room_id: &RoomId) {
let key = (user_id, room_id); let key = (user_id, room_id);
self.db.roomuseroncejoinedids.put_raw(key, []); self.db.roomuseroncejoinedids.put_raw(key, []);
} }
#[tracing::instrument(level = "debug", skip(self, last_state, invite_via))]
pub async fn mark_as_invited( pub async fn mark_as_invited(
&self, &self,
user_id: &UserId, user_id: &UserId,
@ -821,7 +836,7 @@ impl Service {
} }
} }
#[tracing::instrument(skip(self, servers), level = "debug")] #[tracing::instrument(level = "debug", skip(self, servers))]
pub async fn add_servers_invite_via(&self, room_id: &RoomId, servers: Vec<OwnedServerName>) { pub async fn add_servers_invite_via(&self, room_id: &RoomId, servers: Vec<OwnedServerName>) {
let mut servers: Vec<_> = self let mut servers: Vec<_> = self
.servers_invite_via(room_id) .servers_invite_via(room_id)

View file

@ -80,7 +80,9 @@ impl Service {
self.work_loop(id, &mut futures, &mut statuses).await; self.work_loop(id, &mut futures, &mut statuses).await;
if !futures.is_empty() {
self.finish_responses(&mut futures).boxed().await; self.finish_responses(&mut futures).boxed().await;
}
Ok(()) Ok(())
} }