optimize with SmallString; consolidate related re-exports

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-02-08 00:16:37 +00:00 committed by strawberry
parent ecc9099127
commit b872f8e593
39 changed files with 113 additions and 96 deletions

16
Cargo.lock generated
View file

@ -822,6 +822,8 @@ dependencies = [
"serde_json",
"serde_regex",
"serde_yaml",
"smallstr",
"smallvec",
"thiserror 2.0.11",
"tikv-jemalloc-ctl",
"tikv-jemalloc-sys",
@ -839,7 +841,6 @@ dependencies = [
name = "conduwuit_database"
version = "0.5.0"
dependencies = [
"arrayvec",
"async-channel",
"conduwuit_core",
"const-str",
@ -850,7 +851,6 @@ dependencies = [
"rust-rocksdb-uwu",
"serde",
"serde_json",
"smallvec",
"tokio",
"tracing",
]
@ -902,7 +902,6 @@ dependencies = [
name = "conduwuit_service"
version = "0.5.0"
dependencies = [
"arrayvec",
"async-trait",
"base64 0.22.1",
"blurhash",
@ -929,7 +928,6 @@ dependencies = [
"serde_json",
"serde_yaml",
"sha2",
"smallvec",
"termimad",
"tokio",
"tracing",
@ -4275,6 +4273,16 @@ dependencies = [
"autocfg",
]
[[package]]
name = "smallstr"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63b1aefdf380735ff8ded0b15f31aab05daf1f70216c01c02a12926badd1df9d"
dependencies = [
"serde",
"smallvec",
]
[[package]]
name = "smallvec"
version = "1.13.2"

View file

@ -40,6 +40,10 @@ features = [
"write",
]
[workspace.dependencies.smallstr]
version = "0.3"
features = ["ffi", "std", "union"]
[workspace.dependencies.const-str]
version = "0.5.7"

View file

@ -2,7 +2,7 @@ use std::collections::BTreeMap;
use axum::extract::State;
use conduwuit::{
debug_info, debug_warn, err, error, info, pdu::PduBuilder, warn, Err, Error, Result,
debug_info, debug_warn, err, error, info, pdu::PduBuilder, warn, Err, Error, Result, StateKey,
};
use futures::FutureExt;
use ruma::{
@ -198,7 +198,7 @@ pub(crate) async fn create_room_route(
event_type: TimelineEventType::RoomCreate,
content: to_raw_value(&create_content)
.expect("create event content serialization"),
state_key: Some(String::new()),
state_key: Some(StateKey::new()),
..Default::default()
},
sender_user,
@ -267,7 +267,7 @@ pub(crate) async fn create_room_route(
event_type: TimelineEventType::RoomPowerLevels,
content: to_raw_value(&power_levels_content)
.expect("serialized power_levels event content"),
state_key: Some(String::new()),
state_key: Some(StateKey::new()),
..Default::default()
},
sender_user,
@ -371,7 +371,7 @@ pub(crate) async fn create_room_route(
}
// Implicit state key defaults to ""
pdu_builder.state_key.get_or_insert_with(String::new);
pdu_builder.state_key.get_or_insert_with(StateKey::new);
// Silently skip encryption events if they are not allowed
if pdu_builder.event_type == TimelineEventType::RoomEncryption

View file

@ -1,7 +1,7 @@
use std::cmp::max;
use axum::extract::State;
use conduwuit::{err, info, pdu::PduBuilder, Error, Result};
use conduwuit::{err, info, pdu::PduBuilder, Error, Result, StateKey};
use futures::StreamExt;
use ruma::{
api::client::{error::ErrorKind, room::upgrade_room},
@ -77,7 +77,7 @@ pub(crate) async fn upgrade_room_route(
.rooms
.timeline
.build_and_append_pdu(
PduBuilder::state(String::new(), &RoomTombstoneEventContent {
PduBuilder::state(StateKey::new(), &RoomTombstoneEventContent {
body: "This room has been replaced".to_owned(),
replacement_room: replacement_room.clone(),
}),
@ -159,7 +159,7 @@ pub(crate) async fn upgrade_room_route(
content: to_raw_value(&create_event_content)
.expect("event is valid, we just created it"),
unsigned: None,
state_key: Some(String::new()),
state_key: Some(StateKey::new()),
redacts: None,
timestamp: None,
},
@ -188,7 +188,7 @@ pub(crate) async fn upgrade_room_route(
})
.expect("event is valid, we just created it"),
unsigned: None,
state_key: Some(sender_user.to_string()),
state_key: Some(sender_user.as_str().into()),
redacts: None,
timestamp: None,
},
@ -217,7 +217,7 @@ pub(crate) async fn upgrade_room_route(
PduBuilder {
event_type: event_type.to_string().into(),
content: event_content,
state_key: Some(String::new()),
state_key: Some(StateKey::new()),
..Default::default()
},
sender_user,
@ -272,7 +272,7 @@ pub(crate) async fn upgrade_room_route(
.rooms
.timeline
.build_and_append_pdu(
PduBuilder::state(String::new(), &RoomPowerLevelsEventContent {
PduBuilder::state(StateKey::new(), &RoomPowerLevelsEventContent {
events_default: new_level,
invite: new_level,
..power_levels_event_content

View file

@ -172,7 +172,7 @@ async fn send_state_event_for_key_helper(
PduBuilder {
event_type: event_type.to_string().into(),
content: serde_json::from_str(json.json().get())?,
state_key: Some(String::from(state_key)),
state_key: Some(state_key.into()),
timestamp,
..Default::default()
},

View file

@ -441,7 +441,7 @@ async fn handle_left_room(
kind: RoomMember,
content: serde_json::from_str(r#"{"membership":"leave"}"#)
.expect("this is valid JSON"),
state_key: Some(sender_user.to_string()),
state_key: Some(sender_user.as_str().into()),
unsigned: None,
// The following keys are dropped on conversion
room_id: room_id.clone(),

View file

@ -29,7 +29,7 @@ use ruma::{
TimelineEventType::*,
},
serde::Raw,
uint, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UInt,
uint, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, RoomId, UInt, UserId,
};
use service::rooms::read_receipt::pack_receipts;
@ -258,12 +258,9 @@ pub(crate) async fn sync_events_v4_route(
continue;
};
if pdu.kind == RoomMember {
if let Some(state_key) = &pdu.state_key {
let user_id =
OwnedUserId::parse(state_key.clone()).map_err(|_| {
Error::bad_database("Invalid UserId in member PDU.")
})?;
if let Some(Ok(user_id)) =
pdu.state_key.as_deref().map(UserId::parse)
{
if user_id == *sender_user {
continue;
}
@ -275,18 +272,18 @@ pub(crate) async fn sync_events_v4_route(
if !share_encrypted_room(
&services,
sender_user,
&user_id,
user_id,
Some(room_id),
)
.await
{
device_list_changes.insert(user_id);
device_list_changes.insert(user_id.to_owned());
}
},
| MembershipState::Leave => {
// Write down users that have left encrypted rooms we
// are in
left_encrypted_users.insert(user_id);
left_encrypted_users.insert(user_id.to_owned());
},
| _ => {},
}

View file

@ -25,7 +25,7 @@ use ruma::{
},
serde::Raw,
state_res::TypeStateKey,
uint, DeviceId, OwnedEventId, OwnedRoomId, OwnedUserId, RoomId, UInt, UserId,
uint, DeviceId, OwnedEventId, OwnedRoomId, RoomId, UInt, UserId,
};
use service::{rooms::read_receipt::pack_receipts, PduCount};
@ -765,13 +765,9 @@ async fn collect_e2ee<'a>(
continue;
};
if pdu.kind == TimelineEventType::RoomMember {
if let Some(state_key) = &pdu.state_key {
let user_id =
OwnedUserId::parse(state_key.clone()).map_err(|_| {
Error::bad_database("Invalid UserId in member PDU.")
})?;
if user_id == *sender_user {
if let Some(Ok(user_id)) = pdu.state_key.as_deref().map(UserId::parse)
{
if user_id == sender_user {
continue;
}
@ -782,18 +778,18 @@ async fn collect_e2ee<'a>(
if !share_encrypted_room(
&services,
sender_user,
&user_id,
user_id,
Some(room_id),
)
.await
{
device_list_changes.insert(user_id);
device_list_changes.insert(user_id.to_owned());
}
},
| MembershipState::Leave => {
// Write down users that have left encrypted rooms we
// are in
left_encrypted_users.insert(user_id);
left_encrypted_users.insert(user_id.to_owned());
},
| _ => {},
}

View file

@ -92,6 +92,8 @@ serde_json.workspace = true
serde_regex.workspace = true
serde_yaml.workspace = true
serde.workspace = true
smallvec.workspace = true
smallstr.workspace = true
thiserror.workspace = true
tikv-jemallocator.optional = true
tikv-jemallocator.workspace = true

View file

@ -10,14 +10,17 @@ pub mod pdu;
pub mod server;
pub mod utils;
pub use ::arrayvec;
pub use ::http;
pub use ::ruma;
pub use ::smallstr;
pub use ::smallvec;
pub use ::toml;
pub use ::tracing;
pub use config::Config;
pub use error::Error;
pub use info::{rustc_flags_capture, version, version::version};
pub use pdu::{Event, PduBuilder, PduCount, PduEvent, PduId, RawPduId};
pub use pdu::{Event, PduBuilder, PduCount, PduEvent, PduId, RawPduId, StateKey};
pub use server::Server;
pub use utils::{ctor, dtor, implement, result, result::Result};

View file

@ -7,6 +7,8 @@ use ruma::{
use serde::Deserialize;
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
use super::StateKey;
/// Build the start of a PDU in order to add it to the Database.
#[derive(Debug, Deserialize)]
pub struct Builder {
@ -17,7 +19,7 @@ pub struct Builder {
pub unsigned: Option<Unsigned>,
pub state_key: Option<String>,
pub state_key: Option<StateKey>,
pub redacts: Option<OwnedEventId>,
@ -29,15 +31,16 @@ pub struct Builder {
type Unsigned = BTreeMap<String, serde_json::Value>;
impl Builder {
pub fn state<T>(state_key: String, content: &T) -> Self
pub fn state<S, T>(state_key: S, content: &T) -> Self
where
T: EventContent<EventType = StateEventType>,
S: Into<StateKey>,
{
Self {
event_type: content.event_type().into(),
content: to_raw_value(content)
.expect("Builder failed to serialize state event content to RawValue"),
state_key: Some(state_key),
state_key: Some(state_key.into()),
..Self::default()
}
}

View file

@ -8,6 +8,7 @@ mod id;
mod raw_id;
mod redact;
mod relation;
mod state_key;
mod strip;
#[cfg(test)]
mod tests;
@ -17,7 +18,7 @@ use std::cmp::Ordering;
use ruma::{
events::TimelineEventType, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId,
OwnedRoomId, OwnedUserId, UInt,
OwnedRoomId, OwnedServerName, OwnedUserId, UInt,
};
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue as RawJsonValue;
@ -29,6 +30,7 @@ pub use self::{
event_id::*,
id::*,
raw_id::*,
state_key::{ShortStateKey, StateKey},
Count as PduCount, Id as PduId, Pdu as PduEvent, RawId as RawPduId,
};
use crate::Result;
@ -40,13 +42,13 @@ pub struct Pdu {
pub room_id: OwnedRoomId,
pub sender: OwnedUserId,
#[serde(skip_serializing_if = "Option::is_none")]
pub origin: Option<String>,
pub origin: Option<OwnedServerName>,
pub origin_server_ts: UInt,
#[serde(rename = "type")]
pub kind: TimelineEventType,
pub content: Box<RawJsonValue>,
#[serde(skip_serializing_if = "Option::is_none")]
pub state_key: Option<String>,
pub state_key: Option<StateKey>,
pub prev_events: Vec<OwnedEventId>,
pub depth: UInt,
pub auth_events: Vec<OwnedEventId>,

View file

@ -0,0 +1,8 @@
use smallstr::SmallString;
use super::ShortId;
pub type StateKey = SmallString<[u8; INLINE_SIZE]>;
pub type ShortStateKey = ShortId;
const INLINE_SIZE: usize = 48;

View file

@ -34,7 +34,6 @@ zstd_compression = [
]
[dependencies]
arrayvec.workspace = true
async-channel.workspace = true
conduwuit-core.workspace = true
const-str.workspace = true
@ -45,7 +44,6 @@ minicbor-serde.workspace = true
rust-rocksdb.workspace = true
serde.workspace = true
serde_json.workspace = true
smallvec.workspace = true
tokio.workspace = true
tracing.workspace = true

View file

@ -1,5 +1,6 @@
use arrayvec::ArrayVec;
use conduwuit::{checked, debug::DebugInspect, err, utils::string, Error, Result};
use conduwuit::{
arrayvec::ArrayVec, checked, debug::DebugInspect, err, utils::string, Error, Result,
};
use serde::{
de,
de::{DeserializeSeed, Visitor},

View file

@ -1,6 +1,5 @@
use conduwuit::Result;
use conduwuit::{smallvec::SmallVec, Result};
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use crate::{de, ser};

View file

@ -1,7 +1,7 @@
use std::{convert::AsRef, fmt::Debug, future::Future, io::Write, sync::Arc};
use arrayvec::ArrayVec;
use conduwuit::{
arrayvec::ArrayVec,
err, implement,
utils::{future::TryExtExt, result::FlatOk},
Result,

View file

@ -5,8 +5,7 @@
use std::{convert::AsRef, fmt::Debug, io::Write};
use arrayvec::ArrayVec;
use conduwuit::implement;
use conduwuit::{arrayvec::ArrayVec, implement};
use rocksdb::WriteBatchWithTransaction;
use serde::Serialize;

View file

@ -1,7 +1,6 @@
use std::{convert::AsRef, fmt::Debug, io::Write, sync::Arc};
use arrayvec::ArrayVec;
use conduwuit::{implement, Result};
use conduwuit::{arrayvec::ArrayVec, implement, Result};
use futures::Future;
use serde::Serialize;

View file

@ -1,7 +1,6 @@
use std::{convert::AsRef, fmt::Debug, io::Write};
use arrayvec::ArrayVec;
use conduwuit::implement;
use conduwuit::{arrayvec::ArrayVec, implement};
use serde::Serialize;
use crate::{keyval::KeyBuf, ser, util::or_else};

View file

@ -14,6 +14,7 @@ use async_channel::{QueueStrategy, Receiver, RecvError, Sender};
use conduwuit::{
debug, debug_warn, err, error, implement,
result::DebugInspect,
smallvec::SmallVec,
trace,
utils::sys::compute::{get_affinity, nth_core_available, set_affinity},
Error, Result, Server,
@ -21,7 +22,6 @@ use conduwuit::{
use futures::{channel::oneshot, TryFutureExt};
use oneshot::Sender as ResultSender;
use rocksdb::Direction;
use smallvec::SmallVec;
use self::configure::configure;
use crate::{keyval::KeyBuf, stream, Handle, Map};

View file

@ -2,8 +2,10 @@
use std::fmt::Debug;
use arrayvec::ArrayVec;
use conduwuit::ruma::{serde::Raw, EventId, RoomId, UserId};
use conduwuit::{
arrayvec::ArrayVec,
ruma::{serde::Raw, EventId, RoomId, UserId},
};
use serde::Serialize;
use crate::{

View file

@ -47,7 +47,6 @@ zstd_compression = [
blurhashing = ["dep:image","dep:blurhash"]
[dependencies]
arrayvec.workspace = true
async-trait.workspace = true
base64.workspace = true
bytes.workspace = true
@ -75,7 +74,6 @@ serde_json.workspace = true
serde.workspace = true
serde_yaml.workspace = true
sha2.workspace = true
smallvec.workspace = true
termimad.workspace = true
termimad.optional = true
tokio.workspace = true

View file

@ -507,8 +507,10 @@ async fn fix_referencedevents_missing_sep(services: &Services) -> Result {
}
async fn fix_readreceiptid_readreceipt_duplicates(services: &Services) -> Result {
use conduwuit::arrayvec::ArrayString;
use ruma::identifiers_validation::MAX_BYTES;
type ArrayId = arrayvec::ArrayString<MAX_BYTES>;
type ArrayId = ArrayString<MAX_BYTES>;
type Key<'a> = (&'a RoomId, u64, &'a UserId);
warn!("Fixing undeleted entries in readreceiptid_readreceipt...");

View file

@ -1,7 +1,7 @@
use std::{net::IpAddr, sync::Arc, time::SystemTime};
use arrayvec::ArrayVec;
use conduwuit::{
arrayvec::ArrayVec,
at, err, implement,
utils::{math::Expected, rand, stream::TryIgnore},
Result,

View file

@ -4,8 +4,7 @@ use std::{
net::{IpAddr, SocketAddr},
};
use arrayvec::ArrayString;
use conduwuit::utils::math::Expected;
use conduwuit::{arrayvec::ArrayString, utils::math::Expected};
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]

View file

@ -6,8 +6,7 @@ mod tests;
use std::sync::Arc;
use arrayvec::ArrayString;
use conduwuit::{utils::MutexMap, Result, Server};
use conduwuit::{arrayvec::ArrayString, utils::MutexMap, Result, Server};
use self::{cache::Cache, dns::Resolver};
use crate::{client, Dep};

View file

@ -6,10 +6,8 @@ use std::{
use conduwuit::{debug, debug_info, err, implement, trace, warn, Err, Error, PduEvent, Result};
use futures::{future::ready, TryFutureExt};
use ruma::{
api::client::error::ErrorKind,
events::StateEventType,
state_res::{self, EventTypeExt},
CanonicalJsonObject, CanonicalJsonValue, EventId, RoomId, ServerName,
api::client::error::ErrorKind, events::StateEventType, state_res, CanonicalJsonObject,
CanonicalJsonValue, EventId, RoomId, ServerName,
};
use super::{check_room_id, get_room_version_id, to_room_version};
@ -123,7 +121,7 @@ pub(super) async fn handle_outlier_pdu<'a>(
// The original create event must be in the auth events
if !matches!(
auth_events
.get(&(StateEventType::RoomCreate, String::new()))
.get(&(StateEventType::RoomCreate, String::new().into()))
.map(AsRef::as_ref),
Some(_) | None
) {
@ -134,7 +132,7 @@ pub(super) async fn handle_outlier_pdu<'a>(
}
let state_fetch = |ty: &'static StateEventType, sk: &str| {
let key = ty.with_state_key(sk);
let key = (ty.to_owned(), sk.into());
ready(auth_events.get(&key))
};

View file

@ -64,6 +64,7 @@ pub async fn resolve_state(
.multi_get_statekey_from_short(shortstatekeys)
.zip(event_ids)
.ready_filter_map(|(ty_sk, id)| Some((ty_sk.ok()?, id)))
.map(|((ty, sk), id)| ((ty, sk.as_str().to_owned()), id))
.collect()
})
.map(Ok::<_, Error>)

View file

@ -172,6 +172,7 @@ async fn state_at_incoming_fork(
.short
.get_statekey_from_short(*k)
.map_ok(|(ty, sk)| ((ty, sk), id.clone()))
.map_ok(|((ty, sk), id)| ((ty, sk.as_str().to_owned()), id))
})
.ready_filter_map(Result::ok)
.collect()

View file

@ -1,7 +1,7 @@
use std::{mem::size_of, sync::Arc};
use arrayvec::ArrayVec;
use conduwuit::{
arrayvec::ArrayVec,
result::LogErr,
utils::{
stream::{TryIgnore, WidebandExt},

View file

@ -1,7 +1,7 @@
use std::sync::Arc;
use arrayvec::ArrayVec;
use conduwuit::{
arrayvec::ArrayVec,
implement,
utils::{
set,

View file

@ -1,7 +1,7 @@
use std::{borrow::Borrow, fmt::Debug, mem::size_of_val, sync::Arc};
pub use conduwuit::pdu::{ShortEventId, ShortId, ShortRoomId};
use conduwuit::{err, implement, utils, utils::IterStream, Result};
pub use conduwuit::pdu::{ShortEventId, ShortId, ShortRoomId, ShortStateKey};
use conduwuit::{err, implement, utils, utils::IterStream, Result, StateKey};
use database::{Deserialized, Get, Map, Qry};
use futures::{Stream, StreamExt};
use ruma::{events::StateEventType, EventId, RoomId};
@ -28,7 +28,6 @@ struct Services {
}
pub type ShortStateHash = ShortId;
pub type ShortStateKey = ShortId;
impl crate::Service for Service {
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
@ -181,7 +180,7 @@ where
pub async fn get_statekey_from_short(
&self,
shortstatekey: ShortStateKey,
) -> Result<(StateEventType, String)> {
) -> Result<(StateEventType, StateKey)> {
const BUFSIZE: usize = size_of::<ShortStateKey>();
self.db
@ -200,7 +199,7 @@ pub async fn get_statekey_from_short(
pub fn multi_get_statekey_from_short<'a, S>(
&'a self,
shortstatekey: S,
) -> impl Stream<Item = Result<(StateEventType, String)>> + Send + 'a
) -> impl Stream<Item = Result<(StateEventType, StateKey)>> + Send + 'a
where
S: Stream<Item = ShortStateKey> + Send + 'a,
{

View file

@ -1,6 +1,6 @@
use std::borrow::Borrow;
use conduwuit::{err, implement, PduEvent, Result};
use conduwuit::{err, implement, PduEvent, Result, StateKey};
use futures::{Stream, StreamExt, TryFutureExt};
use ruma::{events::StateEventType, EventId, RoomId};
use serde::Deserialize;
@ -27,7 +27,7 @@ where
pub fn room_state_full<'a>(
&'a self,
room_id: &'a RoomId,
) -> impl Stream<Item = Result<((StateEventType, String), PduEvent)>> + Send + 'a {
) -> impl Stream<Item = Result<((StateEventType, StateKey), PduEvent)>> + Send + 'a {
self.services
.state
.get_room_shortstatehash(room_id)

View file

@ -6,7 +6,7 @@ use conduwuit::{
result::FlatOk,
stream::{BroadbandExt, IterStream, ReadyExt, TryExpect},
},
PduEvent, Result,
PduEvent, Result, StateKey,
};
use database::Deserialized;
use futures::{future::try_join, pin_mut, FutureExt, Stream, StreamExt, TryFutureExt};
@ -192,7 +192,7 @@ pub fn state_keys_with_ids<'a, Id>(
&'a self,
shortstatehash: ShortStateHash,
event_type: &'a StateEventType,
) -> impl Stream<Item = (String, Id)> + Send + 'a
) -> impl Stream<Item = (StateKey, Id)> + Send + 'a
where
Id: for<'de> Deserialize<'de> + Send + Sized + ToOwned + 'a,
<Id as ToOwned>::Owned: Borrow<EventId>,
@ -200,7 +200,7 @@ where
let state_keys_with_short_ids = self
.state_keys_with_shortids(shortstatehash, event_type)
.unzip()
.map(|(ssks, sids): (Vec<String>, Vec<u64>)| (ssks, sids))
.map(|(ssks, sids): (Vec<StateKey>, Vec<u64>)| (ssks, sids))
.shared();
let state_keys = state_keys_with_short_ids
@ -230,7 +230,7 @@ pub fn state_keys_with_shortids<'a>(
&'a self,
shortstatehash: ShortStateHash,
event_type: &'a StateEventType,
) -> impl Stream<Item = (String, ShortEventId)> + Send + 'a {
) -> impl Stream<Item = (StateKey, ShortEventId)> + Send + 'a {
let short_ids = self
.state_full_shortids(shortstatehash)
.expect_ok()
@ -267,7 +267,7 @@ pub fn state_keys<'a>(
&'a self,
shortstatehash: ShortStateHash,
event_type: &'a StateEventType,
) -> impl Stream<Item = String> + Send + 'a {
) -> impl Stream<Item = StateKey> + Send + 'a {
let short_ids = self
.state_full_shortids(shortstatehash)
.expect_ok()
@ -314,7 +314,7 @@ pub fn state_added(
pub fn state_full(
&self,
shortstatehash: ShortStateHash,
) -> impl Stream<Item = ((StateEventType, String), PduEvent)> + Send + '_ {
) -> impl Stream<Item = ((StateEventType, StateKey), PduEvent)> + Send + '_ {
self.state_full_pdus(shortstatehash)
.ready_filter_map(|pdu| {
Some(((pdu.kind.to_string().into(), pdu.state_key.clone()?), pdu))

View file

@ -175,7 +175,7 @@ pub async fn user_can_invite(
.timeline
.create_hash_and_sign_event(
PduBuilder::state(
target_user.into(),
target_user.as_str(),
&RoomMemberEventContent::new(MembershipState::Invite),
),
sender,

View file

@ -5,8 +5,8 @@ use std::{
sync::{Arc, Mutex},
};
use arrayvec::ArrayVec;
use conduwuit::{
arrayvec::ArrayVec,
at, checked, err, expected, utils,
utils::{bytes, math::usize_from_f64, stream::IterStream},
Result,

View file

@ -38,7 +38,7 @@ use ruma::{
push::{Action, Ruleset, Tweak},
state_res::{self, Event, RoomVersion},
uint, CanonicalJsonObject, CanonicalJsonValue, EventId, OwnedEventId, OwnedRoomId,
OwnedServerName, OwnedUserId, RoomId, RoomVersionId, ServerName, UserId,
OwnedServerName, RoomId, RoomVersionId, ServerName, UserId,
};
use serde::Deserialize;
use serde_json::value::{to_raw_value, RawValue as RawJsonValue};
@ -387,10 +387,10 @@ impl Service {
if pdu.kind == TimelineEventType::RoomMember {
if let Some(state_key) = &pdu.state_key {
let target_user_id = OwnedUserId::parse(state_key)?;
let target_user_id = UserId::parse(state_key)?;
if self.services.users.is_active_local(&target_user_id).await {
push_target.insert(target_user_id);
if self.services.users.is_active_local(target_user_id).await {
push_target.insert(target_user_id.to_owned());
}
}
}

View file

@ -13,6 +13,7 @@ use std::{
use async_trait::async_trait;
use conduwuit::{
debug, debug_warn, err, error,
smallvec::SmallVec,
utils::{available_parallelism, math::usize_from_u64_truncated, ReadyExt, TryReadyExt},
warn, Result, Server,
};
@ -21,7 +22,6 @@ use ruma::{
api::{appservice::Registration, OutgoingRequest},
RoomId, ServerName, UserId,
};
use smallvec::SmallVec;
use tokio::{task, task::JoinSet};
use self::data::Data;