add database migration for missing referencedevents separator
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
f191b4bad4
commit
52f09fdb51
1 changed files with 57 additions and 2 deletions
|
@ -1,7 +1,12 @@
|
||||||
|
use std::cmp;
|
||||||
|
|
||||||
use conduit::{
|
use conduit::{
|
||||||
debug_info, debug_warn, error, info,
|
debug, debug_info, debug_warn, error, info,
|
||||||
result::NotFound,
|
result::NotFound,
|
||||||
utils::{stream::TryIgnore, IterStream, ReadyExt},
|
utils::{
|
||||||
|
stream::{TryExpect, TryIgnore},
|
||||||
|
IterStream, ReadyExt,
|
||||||
|
},
|
||||||
warn, Err, Result,
|
warn, Err, Result,
|
||||||
};
|
};
|
||||||
use futures::{FutureExt, StreamExt};
|
use futures::{FutureExt, StreamExt};
|
||||||
|
@ -120,6 +125,14 @@ async fn migrate(services: &Services) -> Result<()> {
|
||||||
retroactively_fix_bad_data_from_roomuserid_joined(services).await?;
|
retroactively_fix_bad_data_from_roomuserid_joined(services).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if db["global"]
|
||||||
|
.get(b"fix_referencedevents_missing_sep")
|
||||||
|
.await
|
||||||
|
.is_not_found()
|
||||||
|
{
|
||||||
|
fix_referencedevents_missing_sep(services).await?;
|
||||||
|
}
|
||||||
|
|
||||||
let version_match = services.globals.db.database_version().await == DATABASE_VERSION
|
let version_match = services.globals.db.database_version().await == DATABASE_VERSION
|
||||||
|| services.globals.db.database_version().await == CONDUIT_DATABASE_VERSION;
|
|| services.globals.db.database_version().await == CONDUIT_DATABASE_VERSION;
|
||||||
|
|
||||||
|
@ -444,3 +457,45 @@ async fn retroactively_fix_bad_data_from_roomuserid_joined(services: &Services)
|
||||||
info!("Finished fixing");
|
info!("Finished fixing");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn fix_referencedevents_missing_sep(services: &Services) -> Result {
|
||||||
|
warn!("Fixing missing record separator between room_id and event_id in referencedevents");
|
||||||
|
|
||||||
|
let db = &services.db;
|
||||||
|
let cork = db.cork_and_sync();
|
||||||
|
|
||||||
|
let referencedevents = db["referencedevents"].clone();
|
||||||
|
|
||||||
|
let totals: (usize, usize) = (0, 0);
|
||||||
|
let (total, fixed) = referencedevents
|
||||||
|
.raw_stream()
|
||||||
|
.expect_ok()
|
||||||
|
.enumerate()
|
||||||
|
.ready_fold(totals, |mut a, (i, (key, val))| {
|
||||||
|
debug_assert!(val.is_empty(), "expected no value");
|
||||||
|
|
||||||
|
let has_sep = key.contains(&database::SEP);
|
||||||
|
|
||||||
|
if !has_sep {
|
||||||
|
let key_str = std::str::from_utf8(key).expect("key not utf-8");
|
||||||
|
let room_id_len = key_str.find('$').expect("missing '$' in key");
|
||||||
|
let (room_id, event_id) = key_str.split_at(room_id_len);
|
||||||
|
debug!(?a, "fixing {room_id}, {event_id}");
|
||||||
|
|
||||||
|
let new_key = (room_id, event_id);
|
||||||
|
referencedevents.put_raw(new_key, val);
|
||||||
|
referencedevents.remove(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
a.0 = cmp::max(i, a.0);
|
||||||
|
a.1 = a.1.saturating_add((!has_sep).into());
|
||||||
|
a
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
drop(cork);
|
||||||
|
info!(?total, ?fixed, "Fixed missing record separators in 'referencedevents'.");
|
||||||
|
|
||||||
|
db["global"].insert(b"fix_referencedevents_missing_sep", []);
|
||||||
|
db.db.cleanup()
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue