diff --git a/src/service/presence/mod.rs b/src/service/presence/mod.rs index 3b5c4caf..82a99bd5 100644 --- a/src/service/presence/mod.rs +++ b/src/service/presence/mod.rs @@ -55,14 +55,13 @@ impl crate::Service for Service { async fn worker(self: Arc) -> Result<()> { let mut presence_timers = FuturesUnordered::new(); let receiver = self.timer_receiver.lock().await; - loop { - debug_assert!(!receiver.is_closed(), "channel error"); + while !receiver.is_closed() { tokio::select! { Some(user_id) = presence_timers.next() => { self.process_presence_timer(&user_id).await.log_err().ok(); }, event = receiver.recv_async() => match event { - Err(_e) => return Ok(()), + Err(_) => break, Ok((user_id, timeout)) => { debug!("Adding timer {}: {user_id} timeout:{timeout:?}", presence_timers.len()); presence_timers.push(presence_timer(user_id, timeout)); @@ -70,6 +69,8 @@ impl crate::Service for Service { }, } } + + Ok(()) } fn interrupt(&self) { diff --git a/src/service/sending/sender.rs b/src/service/sending/sender.rs index 3a401995..19205a65 100644 --- a/src/service/sending/sender.rs +++ b/src/service/sending/sender.rs @@ -57,8 +57,7 @@ impl Service { let receiver = self.receiver.lock().await; self.initial_requests(&mut futures, &mut statuses).await; - loop { - debug_assert!(!receiver.is_closed(), "channel error"); + while !receiver.is_closed() { tokio::select! { request = receiver.recv_async() => match request { Ok(request) => self.handle_request(request, &mut futures, &mut statuses).await,