diff --git a/src/service/migrations.rs b/src/service/migrations.rs index 126d3c7e..cded9bce 100644 --- a/src/service/migrations.rs +++ b/src/service/migrations.rs @@ -14,7 +14,7 @@ use itertools::Itertools; use ruma::{ events::{push_rules::PushRulesEvent, room::member::MembershipState, GlobalAccountDataEventType}, push::Ruleset, - OwnedUserId, UserId, + OwnedUserId, RoomId, UserId, }; use crate::{media, Services}; @@ -69,6 +69,7 @@ async fn fresh(services: &Services) -> Result<()> { db["global"].insert(b"fix_bad_double_separator_in_state_cache", []); db["global"].insert(b"retroactively_fix_bad_data_from_roomuserid_joined", []); db["global"].insert(b"fix_referencedevents_missing_sep", []); + db["global"].insert(b"fix_readreceiptid_readreceipt_duplicates", []); // Create the admin room and server user on first run crate::admin::create_admin_room(services).boxed().await?; @@ -130,6 +131,14 @@ async fn migrate(services: &Services) -> Result<()> { fix_referencedevents_missing_sep(services).await?; } + if db["global"] + .get(b"fix_readreceiptid_readreceipt_duplicates") + .await + .is_not_found() + { + fix_readreceiptid_readreceipt_duplicates(services).await?; + } + let version_match = services.globals.db.database_version().await == DATABASE_VERSION || services.globals.db.database_version().await == CONDUIT_DATABASE_VERSION; @@ -493,3 +502,53 @@ async fn fix_referencedevents_missing_sep(services: &Services) -> Result { db["global"].insert(b"fix_referencedevents_missing_sep", []); db.db.cleanup() } + +async fn fix_readreceiptid_readreceipt_duplicates(services: &Services) -> Result { + use ruma::identifiers_validation::MAX_BYTES; + type ArrayId = arrayvec::ArrayString; + type Key<'a> = (&'a RoomId, u64, &'a UserId); + + warn!("Fixing undeleted entries in readreceiptid_readreceipt..."); + + let db = &services.db; + let cork = db.cork_and_sync(); + let readreceiptid_readreceipt = db["readreceiptid_readreceipt"].clone(); + + let mut cur_room: Option = None; + let mut cur_user: Option = None; + let (mut total, mut fixed): (usize, usize) = (0, 0); + readreceiptid_readreceipt + .keys() + .expect_ok() + .ready_for_each(|key: Key<'_>| { + let (room_id, _, user_id) = key; + let last_room = cur_room.replace( + room_id + .as_str() + .try_into() + .expect("invalid room_id in database"), + ); + + let last_user = cur_user.replace( + user_id + .as_str() + .try_into() + .expect("invalid user_id in database"), + ); + + let is_dup = cur_room == last_room && cur_user == last_user; + if is_dup { + readreceiptid_readreceipt.del(key); + } + + fixed = fixed.saturating_add(is_dup.into()); + total = total.saturating_add(1); + }) + .await; + + drop(cork); + info!(?total, ?fixed, "Fixed undeleted entries in readreceiptid_readreceipt."); + + db["global"].insert(b"fix_readreceiptid_readreceipt_duplicates", []); + db.db.cleanup() +}