syncv3: reset the connection after restarts
This commit is contained in:
parent
13334a88ca
commit
85400d15bc
2 changed files with 23 additions and 2 deletions
|
@ -7,12 +7,13 @@ use std::{
|
||||||
|
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use conduit::{
|
use conduit::{
|
||||||
error,
|
debug, error,
|
||||||
utils::math::{ruma_from_u64, ruma_from_usize, usize_from_ruma, usize_from_u64_truncated},
|
utils::math::{ruma_from_u64, ruma_from_usize, usize_from_ruma, usize_from_u64_truncated},
|
||||||
warn, Err, PduCount,
|
warn, Err, PduCount,
|
||||||
};
|
};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
api::client::{
|
api::client::{
|
||||||
|
error::ErrorKind,
|
||||||
filter::{FilterDefinition, LazyLoadOptions},
|
filter::{FilterDefinition, LazyLoadOptions},
|
||||||
sync::sync_events::{
|
sync::sync_events::{
|
||||||
self,
|
self,
|
||||||
|
@ -1081,7 +1082,7 @@ fn share_encrypted_room(
|
||||||
/// Sliding Sync endpoint (future endpoint: `/_matrix/client/v4/sync`)
|
/// Sliding Sync endpoint (future endpoint: `/_matrix/client/v4/sync`)
|
||||||
pub(crate) async fn sync_events_v4_route(
|
pub(crate) async fn sync_events_v4_route(
|
||||||
State(services): State<crate::State>, body: Ruma<sync_events::v4::Request>,
|
State(services): State<crate::State>, body: Ruma<sync_events::v4::Request>,
|
||||||
) -> Result<sync_events::v4::Response, RumaResponse<UiaaResponse>> {
|
) -> Result<sync_events::v4::Response> {
|
||||||
let sender_user = body.sender_user.expect("user is authenticated");
|
let sender_user = body.sender_user.expect("user is authenticated");
|
||||||
let sender_device = body.sender_device.expect("user is authenticated");
|
let sender_device = body.sender_device.expect("user is authenticated");
|
||||||
let mut body = body.body;
|
let mut body = body.body;
|
||||||
|
@ -1101,6 +1102,19 @@ pub(crate) async fn sync_events_v4_route(
|
||||||
.and_then(|string| string.parse().ok())
|
.and_then(|string| string.parse().ok())
|
||||||
.unwrap_or(0);
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
if globalsince != 0
|
||||||
|
&& !services
|
||||||
|
.users
|
||||||
|
.remembered(sender_user.clone(), sender_device.clone(), conn_id.clone())
|
||||||
|
{
|
||||||
|
debug!("Restarting sync stream because it was gone from the database");
|
||||||
|
return Err(Error::Request(
|
||||||
|
ErrorKind::UnknownPos,
|
||||||
|
"Connection data lost since last time".into(),
|
||||||
|
http::StatusCode::BAD_REQUEST,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
if globalsince == 0 {
|
if globalsince == 0 {
|
||||||
services
|
services
|
||||||
.users
|
.users
|
||||||
|
|
|
@ -68,6 +68,13 @@ impl Service {
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn exists(&self, user_id: &UserId) -> Result<bool> { self.db.exists(user_id) }
|
pub fn exists(&self, user_id: &UserId) -> Result<bool> { self.db.exists(user_id) }
|
||||||
|
|
||||||
|
pub fn remembered(&self, user_id: OwnedUserId, device_id: OwnedDeviceId, conn_id: String) -> bool {
|
||||||
|
self.connections
|
||||||
|
.lock()
|
||||||
|
.unwrap()
|
||||||
|
.contains_key(&(user_id, device_id, conn_id))
|
||||||
|
}
|
||||||
|
|
||||||
pub fn forget_sync_request_connection(&self, user_id: OwnedUserId, device_id: OwnedDeviceId, conn_id: String) {
|
pub fn forget_sync_request_connection(&self, user_id: OwnedUserId, device_id: OwnedDeviceId, conn_id: String) {
|
||||||
self.connections
|
self.connections
|
||||||
.lock()
|
.lock()
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue