refactor spaces

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2025-02-07 07:09:45 +00:00 committed by strawberry
parent 6113803038
commit f47677c995
4 changed files with 568 additions and 551 deletions

View file

@ -1,18 +1,25 @@
use std::{collections::VecDeque, str::FromStr};
use std::{
collections::{BTreeSet, VecDeque},
str::FromStr,
};
use axum::extract::State;
use conduwuit::{checked, pdu::ShortRoomId, utils::stream::IterStream};
use futures::{StreamExt, TryFutureExt};
use conduwuit::{
utils::{future::TryExtExt, stream::IterStream},
Err, Result,
};
use futures::{future::OptionFuture, StreamExt, TryFutureExt};
use ruma::{
api::client::{error::ErrorKind, space::get_hierarchy},
OwnedRoomId, OwnedServerName, RoomId, UInt, UserId,
api::client::space::get_hierarchy, OwnedRoomId, OwnedServerName, RoomId, UInt, UserId,
};
use service::{
rooms::spaces::{get_parent_children_via, summary_to_chunk, SummaryAccessibility},
rooms::spaces::{
get_parent_children_via, summary_to_chunk, PaginationToken, SummaryAccessibility,
},
Services,
};
use crate::{service::rooms::spaces::PaginationToken, Error, Result, Ruma};
use crate::Ruma;
/// # `GET /_matrix/client/v1/rooms/{room_id}/hierarchy`
///
@ -40,10 +47,9 @@ pub(crate) async fn get_hierarchy_route(
// Should prevent unexpeded behaviour in (bad) clients
if let Some(ref token) = key {
if token.suggested_only != body.suggested_only || token.max_depth != max_depth {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"suggested_only and max_depth cannot change on paginated requests",
));
return Err!(Request(InvalidParam(
"suggested_only and max_depth cannot change on paginated requests"
)));
}
}
@ -52,58 +58,70 @@ pub(crate) async fn get_hierarchy_route(
body.sender_user(),
&body.room_id,
limit.try_into().unwrap_or(10),
key.map_or(vec![], |token| token.short_room_ids),
max_depth.into(),
max_depth.try_into().unwrap_or(usize::MAX),
body.suggested_only,
key.as_ref()
.into_iter()
.flat_map(|t| t.short_room_ids.iter()),
)
.await
}
async fn get_client_hierarchy(
async fn get_client_hierarchy<'a, ShortRoomIds>(
services: &Services,
sender_user: &UserId,
room_id: &RoomId,
limit: usize,
short_room_ids: Vec<ShortRoomId>,
max_depth: u64,
max_depth: usize,
suggested_only: bool,
) -> Result<get_hierarchy::v1::Response> {
let mut parents = VecDeque::new();
short_room_ids: ShortRoomIds,
) -> Result<get_hierarchy::v1::Response>
where
ShortRoomIds: Iterator<Item = &'a u64> + Clone + Send + Sync + 'a,
{
type Via = Vec<OwnedServerName>;
type Entry = (OwnedRoomId, Via);
type Rooms = VecDeque<Entry>;
// Don't start populating the results if we have to start at a specific room.
let mut populate_results = short_room_ids.is_empty();
let mut queue: Rooms = [(
room_id.to_owned(),
room_id
.server_name()
.map(ToOwned::to_owned)
.into_iter()
.collect(),
)]
.into();
let mut stack = vec![vec![(room_id.to_owned(), match room_id.server_name() {
| Some(server_name) => vec![server_name.into()],
| None => vec![],
})]];
let mut rooms = Vec::with_capacity(limit);
let mut parents = BTreeSet::new();
while let Some((current_room, via)) = queue.pop_front() {
let summary = services
.rooms
.spaces
.get_summary_and_children_client(&current_room, suggested_only, sender_user, &via)
.await?;
let mut results = Vec::with_capacity(limit);
while let Some((current_room, via)) = { next_room_to_traverse(&mut stack, &mut parents) } {
if results.len() >= limit {
break;
}
match (
services
.rooms
.spaces
.get_summary_and_children_client(&current_room, suggested_only, sender_user, &via)
.await?,
current_room == room_id,
) {
match (summary, current_room == room_id) {
| (None | Some(SummaryAccessibility::Inaccessible), false) => {
// Just ignore other unavailable rooms
},
| (None, true) => {
return Err!(Request(Forbidden("The requested room was not found")));
},
| (Some(SummaryAccessibility::Inaccessible), true) => {
return Err!(Request(Forbidden("The requested room is inaccessible")));
},
| (Some(SummaryAccessibility::Accessible(summary)), _) => {
let mut children: Vec<(OwnedRoomId, Vec<OwnedServerName>)> =
get_parent_children_via(&summary, suggested_only)
.into_iter()
.filter(|(room, _)| parents.iter().all(|parent| parent != room))
.rev()
.collect();
let populate = parents.len() >= short_room_ids.clone().count();
if populate_results {
results.push(summary_to_chunk(*summary.clone()));
} else {
let mut children: Vec<Entry> = get_parent_children_via(&summary, suggested_only)
.filter(|(room, _)| !parents.contains(room))
.rev()
.map(|(key, val)| (key, val.collect()))
.collect();
if !populate {
children = children
.iter()
.rev()
@ -113,97 +131,69 @@ async fn get_client_hierarchy(
.rooms
.short
.get_shortroomid(room)
.map_ok(|short| Some(&short) != short_room_ids.get(parents.len()))
.map_ok(|short| {
Some(&short) != short_room_ids.clone().nth(parents.len())
})
.unwrap_or_else(|_| false)
})
.map(Clone::clone)
.collect::<Vec<(OwnedRoomId, Vec<OwnedServerName>)>>()
.collect::<Vec<Entry>>()
.await
.into_iter()
.rev()
.collect();
if children.is_empty() {
return Err(Error::BadRequest(
ErrorKind::InvalidParam,
"Room IDs in token were not found.",
));
}
// We have reached the room after where we last left off
let parents_len = parents.len();
if checked!(parents_len + 1)? == short_room_ids.len() {
populate_results = true;
}
}
let parents_len: u64 = parents.len().try_into()?;
if !children.is_empty() && parents_len < max_depth {
parents.push_back(current_room.clone());
stack.push(children);
if populate {
rooms.push(summary_to_chunk(summary.clone()));
} else if queue.is_empty() && children.is_empty() {
return Err!(Request(InvalidParam("Room IDs in token were not found.")));
}
// Root room in the space hierarchy, we return an error
// if this one fails.
parents.insert(current_room.clone());
if rooms.len() >= limit {
break;
}
if children.is_empty() {
break;
}
if parents.len() >= max_depth {
continue;
}
queue.extend(children);
},
| (Some(SummaryAccessibility::Inaccessible), true) => {
return Err(Error::BadRequest(
ErrorKind::forbidden(),
"The requested room is inaccessible",
));
},
| (None, true) => {
return Err(Error::BadRequest(
ErrorKind::forbidden(),
"The requested room was not found",
));
},
// Just ignore other unavailable rooms
| (None | Some(SummaryAccessibility::Inaccessible), false) => (),
}
}
Ok(get_hierarchy::v1::Response {
next_batch: if let Some((room, _)) = next_room_to_traverse(&mut stack, &mut parents) {
parents.pop_front();
parents.push_back(room);
let next_batch: OptionFuture<_> = queue
.pop_front()
.map(|(room, _)| async move {
parents.insert(room);
let next_short_room_ids: Vec<_> = parents
.iter()
.stream()
.filter_map(|room_id| async move {
services.rooms.short.get_shortroomid(room_id).await.ok()
})
.filter_map(|room_id| services.rooms.short.get_shortroomid(room_id).ok())
.collect()
.await;
(next_short_room_ids != short_room_ids && !next_short_room_ids.is_empty()).then(
|| {
PaginationToken {
short_room_ids: next_short_room_ids,
limit: UInt::new(max_depth)
.expect("When sent in request it must have been valid UInt"),
max_depth: UInt::new(max_depth)
.expect("When sent in request it must have been valid UInt"),
suggested_only,
}
.to_string()
},
)
} else {
None
},
rooms: results,
(next_short_room_ids.iter().ne(short_room_ids) && !next_short_room_ids.is_empty())
.then_some(PaginationToken {
short_room_ids: next_short_room_ids,
limit: max_depth.try_into().ok()?,
max_depth: max_depth.try_into().ok()?,
suggested_only,
})
.as_ref()
.map(PaginationToken::to_string)
})
.into();
Ok(get_hierarchy::v1::Response {
next_batch: next_batch.await.flatten(),
rooms,
})
}
fn next_room_to_traverse(
stack: &mut Vec<Vec<(OwnedRoomId, Vec<OwnedServerName>)>>,
parents: &mut VecDeque<OwnedRoomId>,
) -> Option<(OwnedRoomId, Vec<OwnedServerName>)> {
while stack.last().is_some_and(Vec::is_empty) {
stack.pop();
parents.pop_back();
}
stack.last_mut().and_then(Vec::pop)
}

View file

@ -1,10 +1,11 @@
use axum::extract::State;
use conduwuit::{Err, Result};
use ruma::{api::federation::space::get_hierarchy, RoomId, ServerName};
use service::{
rooms::spaces::{get_parent_children_via, Identifier, SummaryAccessibility},
Services,
use conduwuit::{
utils::stream::{BroadbandExt, IterStream},
Err, Result,
};
use futures::{FutureExt, StreamExt};
use ruma::api::federation::space::get_hierarchy;
use service::rooms::spaces::{get_parent_children_via, Identifier, SummaryAccessibility};
use crate::Ruma;
@ -20,54 +21,51 @@ pub(crate) async fn get_hierarchy_route(
return Err!(Request(NotFound("Room does not exist.")));
}
get_hierarchy(&services, &body.room_id, body.origin(), body.suggested_only).await
}
/// Gets the response for the space hierarchy over federation request
///
/// Errors if the room does not exist, so a check if the room exists should
/// be done
async fn get_hierarchy(
services: &Services,
room_id: &RoomId,
server_name: &ServerName,
suggested_only: bool,
) -> Result<get_hierarchy::v1::Response> {
let room_id = &body.room_id;
let suggested_only = body.suggested_only;
let ref identifier = Identifier::ServerName(body.origin());
match services
.rooms
.spaces
.get_summary_and_children_local(&room_id.to_owned(), Identifier::ServerName(server_name))
.get_summary_and_children_local(room_id, identifier)
.await?
{
| Some(SummaryAccessibility::Accessible(room)) => {
let mut children = Vec::new();
let mut inaccessible_children = Vec::new();
| None => Err!(Request(NotFound("The requested room was not found"))),
for (child, _via) in get_parent_children_via(&room, suggested_only) {
match services
.rooms
.spaces
.get_summary_and_children_local(&child, Identifier::ServerName(server_name))
.await?
{
| Some(SummaryAccessibility::Accessible(summary)) => {
children.push((*summary).into());
},
| Some(SummaryAccessibility::Inaccessible) => {
inaccessible_children.push(child);
},
| None => (),
}
}
Ok(get_hierarchy::v1::Response {
room: *room,
children,
inaccessible_children,
})
},
| Some(SummaryAccessibility::Inaccessible) =>
Err!(Request(NotFound("The requested room is inaccessible"))),
| None => Err!(Request(NotFound("The requested room was not found"))),
| Some(SummaryAccessibility::Accessible(room)) => {
let (children, inaccessible_children) =
get_parent_children_via(&room, suggested_only)
.stream()
.broad_filter_map(|(child, _via)| async move {
match services
.rooms
.spaces
.get_summary_and_children_local(&child, identifier)
.await
.ok()?
{
| None => None,
| Some(SummaryAccessibility::Inaccessible) =>
Some((None, Some(child))),
| Some(SummaryAccessibility::Accessible(summary)) =>
Some((Some(summary), None)),
}
})
.unzip()
.map(|(children, inaccessible_children): (Vec<_>, Vec<_>)| {
(
children.into_iter().flatten().map(Into::into).collect(),
inaccessible_children.into_iter().flatten().collect(),
)
})
.await;
Ok(get_hierarchy::v1::Response { room, children, inaccessible_children })
},
}
}