use axum::extract::State; use conduit::{at, PduCount, PduEvent}; use futures::StreamExt; use ruma::{api::client::threads::get_threads, uint}; use crate::{Result, Ruma}; /// # `GET /_matrix/client/r0/rooms/{roomId}/threads` pub(crate) async fn get_threads_route( State(services): State, ref body: Ruma, ) -> Result { // Use limit or else 10, with maximum 100 let limit = body .limit .unwrap_or_else(|| uint!(10)) .try_into() .unwrap_or(10) .min(100); let from: PduCount = body .from .as_deref() .map(str::parse) .transpose()? .unwrap_or_else(PduCount::max); let threads: Vec<(PduCount, PduEvent)> = services .rooms .threads .threads_until(body.sender_user(), &body.room_id, from, &body.include) .await? .take(limit) .filter_map(|(count, pdu)| async move { services .rooms .state_accessor .user_can_see_event(body.sender_user(), &body.room_id, &pdu.event_id) .await .then_some((count, pdu)) }) .collect() .await; Ok(get_threads::v1::Response { next_batch: threads .last() .filter(|_| threads.len() >= limit) .map(at!(0)) .as_ref() .map(ToString::to_string), chunk: threads .into_iter() .map(at!(1)) .map(|pdu| pdu.to_room_event()) .collect(), }) }