Hot-Reloading Refactor

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-05-09 15:59:08 -07:00 committed by June 🍓🦴
parent ae1a4fd283
commit 6c1434c165
212 changed files with 5679 additions and 4206 deletions

View file

@ -2,7 +2,7 @@ mod data;
use std::{sync::Arc, time::Duration};
pub(crate) use data::Data;
pub use data::Data;
use futures_util::{stream::FuturesUnordered, StreamExt};
use ruma::{
events::presence::{PresenceEvent, PresenceEventContent},
@ -10,19 +10,19 @@ use ruma::{
OwnedUserId, UInt, UserId,
};
use serde::{Deserialize, Serialize};
use tokio::{sync::Mutex, time::sleep};
use tokio::{sync::Mutex, task::JoinHandle, time::sleep};
use tracing::{debug, error};
use crate::{
services,
utils::{self, user_id::user_is_local},
services, user_is_local,
utils::{self},
Config, Error, Result,
};
/// Represents data required to be kept in order to implement the presence
/// specification.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub(crate) struct Presence {
pub struct Presence {
state: PresenceState,
currently_active: bool,
last_active_ts: u64,
@ -30,9 +30,8 @@ pub(crate) struct Presence {
}
impl Presence {
pub(crate) fn new(
state: PresenceState, currently_active: bool, last_active_ts: u64, status_msg: Option<String>,
) -> Self {
#[must_use]
pub fn new(state: PresenceState, currently_active: bool, last_active_ts: u64, status_msg: Option<String>) -> Self {
Self {
state,
currently_active,
@ -41,21 +40,21 @@ impl Presence {
}
}
pub(crate) fn from_json_bytes_to_event(bytes: &[u8], user_id: &UserId) -> Result<PresenceEvent> {
pub fn from_json_bytes_to_event(bytes: &[u8], user_id: &UserId) -> Result<PresenceEvent> {
let presence = Self::from_json_bytes(bytes)?;
presence.to_presence_event(user_id)
}
pub(crate) fn from_json_bytes(bytes: &[u8]) -> Result<Self> {
pub fn from_json_bytes(bytes: &[u8]) -> Result<Self> {
serde_json::from_slice(bytes).map_err(|_| Error::bad_database("Invalid presence data in database"))
}
pub(crate) fn to_json_bytes(&self) -> Result<Vec<u8>> {
pub fn to_json_bytes(&self) -> Result<Vec<u8>> {
serde_json::to_vec(self).map_err(|_| Error::bad_database("Could not serialize Presence to JSON"))
}
/// Creates a PresenceEvent from available data.
pub(crate) fn to_presence_event(&self, user_id: &UserId) -> Result<PresenceEvent> {
pub fn to_presence_event(&self, user_id: &UserId) -> Result<PresenceEvent> {
let now = utils::millis_since_unix_epoch();
let last_active_ago = if self.currently_active {
None
@ -77,37 +76,55 @@ impl Presence {
}
}
pub(crate) struct Service {
pub(crate) db: &'static dyn Data,
pub(crate) timer_sender: loole::Sender<(OwnedUserId, Duration)>,
pub struct Service {
pub db: Arc<dyn Data>,
pub timer_sender: loole::Sender<(OwnedUserId, Duration)>,
timer_receiver: Mutex<loole::Receiver<(OwnedUserId, Duration)>>,
handler_join: Mutex<Option<JoinHandle<()>>>,
timeout_remote_users: bool,
}
impl Service {
pub(crate) fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> {
pub fn build(db: Arc<dyn Data>, config: &Config) -> Arc<Self> {
let (timer_sender, timer_receiver) = loole::unbounded();
Arc::new(Self {
db,
timer_sender,
timer_receiver: Mutex::new(timer_receiver),
handler_join: Mutex::new(None),
timeout_remote_users: config.presence_timeout_remote_users,
})
}
pub(crate) fn start_handler(self: &Arc<Self>) {
pub async fn start_handler(self: &Arc<Self>) {
let self_ = Arc::clone(self);
tokio::spawn(async move {
let handle = services().server.runtime().spawn(async move {
self_
.handler()
.await
.expect("Failed to start presence handler");
});
_ = self.handler_join.lock().await.insert(handle);
}
pub async fn close(&self) {
self.interrupt();
if let Some(handler_join) = self.handler_join.lock().await.take() {
if let Err(e) = handler_join.await {
error!("Failed to shutdown: {e:?}");
}
}
}
pub fn interrupt(&self) {
if !self.timer_sender.is_closed() {
self.timer_sender.close();
}
}
/// Returns the latest presence event for the given user.
pub(crate) fn get_presence(&self, user_id: &UserId) -> Result<Option<PresenceEvent>> {
pub fn get_presence(&self, user_id: &UserId) -> Result<Option<PresenceEvent>> {
if let Some((_, presence)) = self.db.get_presence(user_id)? {
Ok(Some(presence))
} else {
@ -117,7 +134,7 @@ impl Service {
/// Pings the presence of the given user in the given room, setting the
/// specified state.
pub(crate) fn ping_presence(&self, user_id: &UserId, new_state: &PresenceState) -> Result<()> {
pub fn ping_presence(&self, user_id: &UserId, new_state: &PresenceState) -> Result<()> {
const REFRESH_TIMEOUT: u64 = 60 * 25 * 1000;
let last_presence = self.db.get_presence(user_id)?;
@ -146,7 +163,7 @@ impl Service {
}
/// Adds a presence event which will be saved until a new event replaces it.
pub(crate) fn set_presence(
pub fn set_presence(
&self, user_id: &UserId, state: &PresenceState, currently_active: Option<bool>, last_active_ago: Option<UInt>,
status_msg: Option<String>,
) -> Result<()> {
@ -179,11 +196,11 @@ impl Service {
///
/// TODO: Why is this not used?
#[allow(dead_code)]
pub(crate) fn remove_presence(&self, user_id: &UserId) -> Result<()> { self.db.remove_presence(user_id) }
pub fn remove_presence(&self, user_id: &UserId) -> Result<()> { self.db.remove_presence(user_id) }
/// Returns the most recent presence updates that happened after the event
/// with id `since`.
pub(crate) fn presence_since(&self, since: u64) -> Box<dyn Iterator<Item = (OwnedUserId, u64, Vec<u8>)>> {
pub fn presence_since(&self, since: u64) -> Box<dyn Iterator<Item = (OwnedUserId, u64, Vec<u8>)> + '_> {
self.db.presence_since(since)
}
@ -191,24 +208,16 @@ impl Service {
let mut presence_timers = FuturesUnordered::new();
let receiver = self.timer_receiver.lock().await;
loop {
debug_assert!(!receiver.is_closed(), "channel error");
tokio::select! {
event = receiver.recv_async() => {
match event {
Ok((user_id, timeout)) => {
debug!("Adding timer {}: {user_id} timeout:{timeout:?}", presence_timers.len());
presence_timers.push(presence_timer(user_id, timeout));
}
Err(e) => {
// generally shouldn't happen
error!("Failed to receive presence timer through channel: {e}");
}
}
}
Some(user_id) = presence_timers.next() => {
process_presence_timer(&user_id)?;
}
Some(user_id) = presence_timers.next() => process_presence_timer(&user_id)?,
event = receiver.recv_async() => match event {
Err(_e) => return Ok(()),
Ok((user_id, timeout)) => {
debug!("Adding timer {}: {user_id} timeout:{timeout:?}", presence_timers.len());
presence_timers.push(presence_timer(user_id, timeout));
},
},
}
}
}