diff --git a/src/service/migrations.rs b/src/service/migrations.rs index c953e7b1..45323fa2 100644 --- a/src/service/migrations.rs +++ b/src/service/migrations.rs @@ -1,7 +1,12 @@ +use std::cmp; + use conduit::{ - debug_info, debug_warn, error, info, + debug, debug_info, debug_warn, error, info, result::NotFound, - utils::{stream::TryIgnore, IterStream, ReadyExt}, + utils::{ + stream::{TryExpect, TryIgnore}, + IterStream, ReadyExt, + }, warn, Err, Result, }; use futures::{FutureExt, StreamExt}; @@ -120,6 +125,14 @@ async fn migrate(services: &Services) -> Result<()> { retroactively_fix_bad_data_from_roomuserid_joined(services).await?; } + if db["global"] + .get(b"fix_referencedevents_missing_sep") + .await + .is_not_found() + { + fix_referencedevents_missing_sep(services).await?; + } + let version_match = services.globals.db.database_version().await == DATABASE_VERSION || services.globals.db.database_version().await == CONDUIT_DATABASE_VERSION; @@ -444,3 +457,45 @@ async fn retroactively_fix_bad_data_from_roomuserid_joined(services: &Services) info!("Finished fixing"); Ok(()) } + +async fn fix_referencedevents_missing_sep(services: &Services) -> Result { + warn!("Fixing missing record separator between room_id and event_id in referencedevents"); + + let db = &services.db; + let cork = db.cork_and_sync(); + + let referencedevents = db["referencedevents"].clone(); + + let totals: (usize, usize) = (0, 0); + let (total, fixed) = referencedevents + .raw_stream() + .expect_ok() + .enumerate() + .ready_fold(totals, |mut a, (i, (key, val))| { + debug_assert!(val.is_empty(), "expected no value"); + + let has_sep = key.contains(&database::SEP); + + if !has_sep { + let key_str = std::str::from_utf8(key).expect("key not utf-8"); + let room_id_len = key_str.find('$').expect("missing '$' in key"); + let (room_id, event_id) = key_str.split_at(room_id_len); + debug!(?a, "fixing {room_id}, {event_id}"); + + let new_key = (room_id, event_id); + referencedevents.put_raw(new_key, val); + referencedevents.remove(key); + } + + a.0 = cmp::max(i, a.0); + a.1 = a.1.saturating_add((!has_sep).into()); + a + }) + .await; + + drop(cork); + info!(?total, ?fixed, "Fixed missing record separators in 'referencedevents'."); + + db["global"].insert(b"fix_referencedevents_missing_sep", []); + db.db.cleanup() +}