diff --git a/src/database/engine.rs b/src/database/engine.rs index 8be9eecc..76b2889b 100644 --- a/src/database/engine.rs +++ b/src/database/engine.rs @@ -41,12 +41,49 @@ pub struct Engine { pub(crate) type Db = DBWithThreadMode; impl Engine { - pub(crate) fn cf(&self, name: &str) -> Arc> { - self.db - .cf_handle(name) - .expect("column must be described prior to database open") + #[tracing::instrument( + level = "info", + skip_all, + fields( + sequence = ?self.current_sequence(), + ), + )] + pub fn wait_compactions_blocking(&self) -> Result { + let mut opts = WaitForCompactOptions::default(); + opts.set_abort_on_pause(true); + opts.set_flush(false); + opts.set_timeout(0); + + self.db.wait_for_compact(&opts).map_err(map_err) } + #[tracing::instrument( + level = "info", + skip_all, + fields( + sequence = ?self.current_sequence(), + ), + )] + pub fn sort(&self) -> Result { + let flushoptions = rocksdb::FlushOptions::default(); + result(DBCommon::flush_opt(&self.db, &flushoptions)) + } + + #[tracing::instrument( + level = "debug", + skip_all, + fields( + sequence = ?self.current_sequence(), + ), + )] + pub fn update(&self) -> Result { self.db.try_catch_up_with_primary().map_err(map_err) } + + #[tracing::instrument(level = "info", skip_all)] + pub fn sync(&self) -> Result { result(DBCommon::flush_wal(&self.db, true)) } + + #[tracing::instrument(level = "debug", skip_all)] + pub fn flush(&self) -> Result { result(DBCommon::flush_wal(&self.db, false)) } + #[inline] pub(crate) fn cork(&self) { self.corks.fetch_add(1, Ordering::Relaxed); } @@ -56,28 +93,6 @@ impl Engine { #[inline] pub fn corked(&self) -> bool { self.corks.load(Ordering::Relaxed) > 0 } - #[tracing::instrument(skip(self))] - pub fn sync(&self) -> Result { result(DBCommon::flush_wal(&self.db, true)) } - - #[tracing::instrument(skip(self), level = "debug")] - pub fn flush(&self) -> Result { result(DBCommon::flush_wal(&self.db, false)) } - - #[tracing::instrument(skip(self), level = "info")] - pub fn sort(&self) -> Result { - let flushoptions = rocksdb::FlushOptions::default(); - result(DBCommon::flush_opt(&self.db, &flushoptions)) - } - - #[tracing::instrument(skip(self), level = "info")] - pub fn wait_compactions(&self) -> Result { - let mut opts = WaitForCompactOptions::default(); - opts.set_abort_on_pause(true); - opts.set_flush(false); - opts.set_timeout(0); - - self.db.wait_for_compact(&opts).map_err(map_err) - } - /// Query for database property by null-terminated name which is expected to /// have a result with an integer representation. This is intended for /// low-overhead programmatic use. @@ -96,6 +111,24 @@ impl Engine { .and_then(|val| val.map_or_else(|| Err!("Property {name:?} not found."), Ok)) } + pub(crate) fn cf(&self, name: &str) -> Arc> { + self.db + .cf_handle(name) + .expect("column must be described prior to database open") + } + + #[inline] + #[must_use] + #[tracing::instrument(name = "sequence", level = "debug", skip_all, fields(sequence))] + pub fn current_sequence(&self) -> u64 { + let sequence = self.db.latest_sequence_number(); + + #[cfg(debug_assertions)] + tracing::Span::current().record("sequence", sequence); + + sequence + } + #[inline] #[must_use] pub fn is_read_only(&self) -> bool { self.secondary || self.read_only } @@ -114,7 +147,7 @@ impl Drop for Engine { self.db.cancel_all_background_work(BLOCKING); info!( - sequence = %self.db.latest_sequence_number(), + sequence = %self.current_sequence(), "Closing database..." ); }