diff --git a/src/database/iter.rs b/src/database/iter.rs index bbbab491..4845e977 100644 --- a/src/database/iter.rs +++ b/src/database/iter.rs @@ -3,12 +3,14 @@ use std::{iter::FusedIterator, sync::Arc}; use conduit::Result; use rocksdb::{ColumnFamily, DBRawIteratorWithThreadMode, Direction, IteratorMode, ReadOptions}; -use crate::{engine::Db, map::KeyVal, result, Engine}; +use crate::{ + engine::Db, + result, + slice::{OwnedKeyVal, OwnedKeyValPair}, + Engine, +}; type Cursor<'cursor> = DBRawIteratorWithThreadMode<'cursor, Db>; -type Key<'item> = &'item [u8]; -type Val<'item> = &'item [u8]; -type Item<'item> = (Key<'item>, Val<'item>); struct State<'cursor> { cursor: Cursor<'cursor>, @@ -48,7 +50,7 @@ impl<'cursor> Iter<'cursor> { } impl Iterator for Iter<'_> { - type Item = KeyVal; + type Item = OwnedKeyValPair; fn next(&mut self) -> Option { if !self.state.init && self.state.valid { @@ -57,10 +59,15 @@ impl Iterator for Iter<'_> { self.state.init = false; } - self.state.cursor.item().map(into_keyval).or_else(|| { - when_invalid(&mut self.state).expect("iterator invalidated due to error"); - None - }) + self.state + .cursor + .item() + .map(OwnedKeyVal::from) + .map(OwnedKeyVal::to_tuple) + .or_else(|| { + when_invalid(&mut self.state).expect("iterator invalidated due to error"); + None + }) } } @@ -101,5 +108,3 @@ fn into_direction(mode: &IteratorMode<'_>) -> Direction { End | From(_, Reverse) => Reverse, } } - -fn into_keyval((key, val): Item<'_>) -> KeyVal { (Vec::from(key), Vec::from(val)) } diff --git a/src/database/map.rs b/src/database/map.rs index f7b16b7e..73cf5107 100644 --- a/src/database/map.rs +++ b/src/database/map.rs @@ -1,11 +1,16 @@ -use std::{future::Future, pin::Pin, sync::Arc}; +use std::{future::Future, mem::size_of, pin::Pin, sync::Arc}; use conduit::{utils, Result}; use rocksdb::{ AsColumnFamilyRef, ColumnFamily, Direction, IteratorMode, ReadOptions, WriteBatchWithTransaction, WriteOptions, }; -use crate::{or_else, result, watchers::Watchers, Engine, Handle, Iter}; +use crate::{ + or_else, result, + slice::{Byte, Key, KeyVal, OwnedKey, OwnedKeyValPair, OwnedVal, Val}, + watchers::Watchers, + Engine, Handle, Iter, +}; pub struct Map { name: String, @@ -16,9 +21,7 @@ pub struct Map { read_options: ReadOptions, } -pub(crate) type KeyVal = (Key, Val); -pub(crate) type Val = Vec; -pub(crate) type Key = Vec; +type OwnedKeyValPairIter<'a> = Box + Send + 'a>; impl Map { pub(crate) fn open(db: &Arc, name: &str) -> Result> { @@ -32,19 +35,19 @@ impl Map { })) } - pub fn get(&self, key: &[u8]) -> Result>> { + pub fn get(&self, key: &Key) -> Result>> { let read_options = &self.read_options; let res = self.db.db.get_pinned_cf_opt(&self.cf(), key, read_options); Ok(result(res)?.map(Handle::from)) } - pub fn multi_get(&self, keys: &[&[u8]]) -> Result>>> { + pub fn multi_get(&self, keys: &[&Key]) -> Result>> { // Optimization can be `true` if key vector is pre-sorted **by the column // comparator**. const SORTED: bool = false; - let mut ret: Vec>> = Vec::with_capacity(keys.len()); + let mut ret: Vec> = Vec::with_capacity(keys.len()); let read_options = &self.read_options; for res in self .db @@ -61,7 +64,7 @@ impl Map { Ok(ret) } - pub fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> { + pub fn insert(&self, key: &Key, value: &Val) -> Result<()> { let write_options = &self.write_options; self.db .db @@ -77,9 +80,12 @@ impl Map { Ok(()) } - pub fn insert_batch(&self, iter: &mut dyn Iterator) -> Result<()> { + pub fn insert_batch<'a, I>(&'a self, iter: I) -> Result<()> + where + I: Iterator>, + { let mut batch = WriteBatchWithTransaction::::default(); - for (key, value) in iter { + for KeyVal(key, value) in iter { batch.put_cf(&self.cf(), key, value); } @@ -93,7 +99,7 @@ impl Map { result(res) } - pub fn remove(&self, key: &[u8]) -> Result<()> { + pub fn remove(&self, key: &Key) -> Result<()> { let write_options = &self.write_options; let res = self.db.db.delete_cf_opt(&self.cf(), key, write_options); @@ -104,7 +110,10 @@ impl Map { result(res) } - pub fn remove_batch(&self, iter: &mut dyn Iterator) -> Result<()> { + pub fn remove_batch<'a, I>(&'a self, iter: I) -> Result<()> + where + I: Iterator, + { let mut batch = WriteBatchWithTransaction::::default(); for key in iter { batch.delete_cf(&self.cf(), key); @@ -120,14 +129,14 @@ impl Map { result(res) } - pub fn iter<'a>(&'a self) -> Box + 'a> { + pub fn iter(&self) -> OwnedKeyValPairIter<'_> { let mode = IteratorMode::Start; let read_options = read_options_default(); Box::new(Iter::new(&self.db, &self.cf, read_options, &mode)) } - pub fn iter_from<'a>(&'a self, from: &[u8], backwards: bool) -> Box + 'a> { - let direction = if backwards { + pub fn iter_from(&self, from: &Key, reverse: bool) -> OwnedKeyValPairIter<'_> { + let direction = if reverse { Direction::Reverse } else { Direction::Forward @@ -137,13 +146,13 @@ impl Map { Box::new(Iter::new(&self.db, &self.cf, read_options, &mode)) } - pub fn scan_prefix<'a>(&'a self, prefix: Vec) -> Box + 'a> { + pub fn scan_prefix(&self, prefix: OwnedKey) -> OwnedKeyValPairIter<'_> { let mode = IteratorMode::From(&prefix, Direction::Forward); let read_options = read_options_default(); Box::new(Iter::new(&self.db, &self.cf, read_options, &mode).take_while(move |(k, _)| k.starts_with(&prefix))) } - pub fn increment(&self, key: &[u8]) -> Result<[u8; 8]> { + pub fn increment(&self, key: &Key) -> Result<[Byte; size_of::()]> { let old = self.get(key)?; let new = utils::increment(old.as_deref()); self.insert(key, &new)?; @@ -155,10 +164,13 @@ impl Map { Ok(new) } - pub fn increment_batch(&self, iter: &mut dyn Iterator) -> Result<()> { + pub fn increment_batch<'a, I>(&'a self, iter: I) -> Result<()> + where + I: Iterator, + { let mut batch = WriteBatchWithTransaction::::default(); for key in iter { - let old = self.get(&key)?; + let old = self.get(key)?; let new = utils::increment(old.as_deref()); batch.put_cf(&self.cf(), key, new); } @@ -173,7 +185,7 @@ impl Map { result(res) } - pub fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin + Send + 'a>> { + pub fn watch_prefix<'a>(&'a self, prefix: &Key) -> Pin + Send + 'a>> { self.watchers.watch(prefix) } @@ -184,8 +196,8 @@ impl Map { } impl<'a> IntoIterator for &'a Map { - type IntoIter = Box + 'a>; - type Item = KeyVal; + type IntoIter = Box + Send + 'a>; + type Item = OwnedKeyValPair; fn into_iter(self) -> Self::IntoIter { self.iter() } } diff --git a/src/database/mod.rs b/src/database/mod.rs index 39566896..283224f6 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -6,6 +6,7 @@ mod iter; mod map; pub mod maps; mod opts; +mod slice; mod util; mod watchers; @@ -17,6 +18,7 @@ pub(crate) use engine::Engine; pub use handle::Handle; pub use iter::Iter; pub use map::Map; +pub use slice::{Key, KeyVal, OwnedKey, OwnedKeyVal, OwnedVal, Val}; pub(crate) use util::{or_else, result}; conduit::mod_ctor! {} diff --git a/src/database/slice.rs b/src/database/slice.rs new file mode 100644 index 00000000..770f1957 --- /dev/null +++ b/src/database/slice.rs @@ -0,0 +1,64 @@ +pub struct OwnedKeyVal(pub OwnedKey, pub OwnedVal); +pub(crate) type OwnedKeyValPair = (OwnedKey, OwnedVal); +pub type OwnedVal = Vec; +pub type OwnedKey = Vec; + +pub struct KeyVal<'item>(pub &'item Key, pub &'item Val); +pub(crate) type KeyValPair<'item> = (&'item Key, &'item Val); +pub type Val = [Byte]; +pub type Key = [Byte]; + +pub(crate) type Byte = u8; + +impl OwnedKeyVal { + #[inline] + #[must_use] + pub fn as_slice(&self) -> KeyVal<'_> { KeyVal(&self.0, &self.1) } + + #[inline] + #[must_use] + pub fn to_tuple(self) -> OwnedKeyValPair { (self.0, self.1) } +} + +impl From for OwnedKeyVal { + #[inline] + fn from((key, val): OwnedKeyValPair) -> Self { Self(key, val) } +} + +impl From<&KeyVal<'_>> for OwnedKeyVal { + fn from(slice: &KeyVal<'_>) -> Self { slice.to_owned() } +} + +impl From> for OwnedKeyVal { + fn from((key, val): KeyValPair<'_>) -> Self { Self(Vec::from(key), Vec::from(val)) } +} + +impl From for OwnedKeyValPair { + #[inline] + fn from(val: OwnedKeyVal) -> Self { val.to_tuple() } +} + +impl KeyVal<'_> { + #[inline] + #[must_use] + pub fn to_owned(&self) -> OwnedKeyVal { OwnedKeyVal::from(self) } + + #[inline] + #[must_use] + pub fn as_tuple(&self) -> KeyValPair<'_> { (self.0, self.1) } +} + +impl<'a> From<&'a OwnedKeyVal> for KeyVal<'a> { + #[inline] + fn from(owned: &'a OwnedKeyVal) -> Self { owned.as_slice() } +} + +impl<'a> From<&'a OwnedKeyValPair> for KeyVal<'a> { + #[inline] + fn from((key, val): &'a OwnedKeyValPair) -> Self { KeyVal(key.as_slice(), val.as_slice()) } +} + +impl<'a> From> for KeyVal<'a> { + #[inline] + fn from((key, val): KeyValPair<'a>) -> Self { KeyVal(key, val) } +} diff --git a/src/service/globals/migrations.rs b/src/service/globals/migrations.rs index 4109017d..a98eec4e 100644 --- a/src/service/globals/migrations.rs +++ b/src/service/globals/migrations.rs @@ -466,47 +466,53 @@ async fn db_lt_8(db: &Arc, _config: &Config) -> Result<()> { info!("Migration: 8"); } // Update pduids db layout - let mut batch = pduid_pdu.iter().filter_map(|(key, v)| { - if !key.starts_with(b"!") { - return None; - } - let mut parts = key.splitn(2, |&b| b == 0xFF); - let room_id = parts.next().unwrap(); - let count = parts.next().unwrap(); + let batch = pduid_pdu + .iter() + .filter_map(|(key, v)| { + if !key.starts_with(b"!") { + return None; + } + let mut parts = key.splitn(2, |&b| b == 0xFF); + let room_id = parts.next().unwrap(); + let count = parts.next().unwrap(); - let short_room_id = roomid_shortroomid - .get(room_id) - .unwrap() - .expect("shortroomid should exist"); + let short_room_id = roomid_shortroomid + .get(room_id) + .unwrap() + .expect("shortroomid should exist"); - let mut new_key = short_room_id.to_vec(); - new_key.extend_from_slice(count); + let mut new_key = short_room_id.to_vec(); + new_key.extend_from_slice(count); - Some((new_key, v)) - }); + Some(database::OwnedKeyVal(new_key, v)) + }) + .collect::>(); - pduid_pdu.insert_batch(&mut batch)?; + pduid_pdu.insert_batch(batch.iter().map(database::KeyVal::from))?; - let mut batch2 = eventid_pduid.iter().filter_map(|(k, value)| { - if !value.starts_with(b"!") { - return None; - } - let mut parts = value.splitn(2, |&b| b == 0xFF); - let room_id = parts.next().unwrap(); - let count = parts.next().unwrap(); + let batch2 = eventid_pduid + .iter() + .filter_map(|(k, value)| { + if !value.starts_with(b"!") { + return None; + } + let mut parts = value.splitn(2, |&b| b == 0xFF); + let room_id = parts.next().unwrap(); + let count = parts.next().unwrap(); - let short_room_id = roomid_shortroomid - .get(room_id) - .unwrap() - .expect("shortroomid should exist"); + let short_room_id = roomid_shortroomid + .get(room_id) + .unwrap() + .expect("shortroomid should exist"); - let mut new_value = short_room_id.to_vec(); - new_value.extend_from_slice(count); + let mut new_value = short_room_id.to_vec(); + new_value.extend_from_slice(count); - Some((k, new_value)) - }); + Some(database::OwnedKeyVal(k, new_value)) + }) + .collect::>(); - eventid_pduid.insert_batch(&mut batch2)?; + eventid_pduid.insert_batch(batch2.iter().map(database::KeyVal::from))?; services().globals.bump_database_version(8)?; info!("Migration: 7 -> 8 finished"); @@ -538,12 +544,13 @@ async fn db_lt_9(db: &Arc, _config: &Config) -> Result<()> { new_key.extend_from_slice(word); new_key.push(0xFF); new_key.extend_from_slice(pdu_id_count); - Some((new_key, Vec::new())) + Some(database::OwnedKeyVal(new_key, Vec::::new())) }) .peekable(); while iter.peek().is_some() { - tokenids.insert_batch(&mut iter.by_ref().take(1000))?; + let batch = iter.by_ref().take(1000).collect::>(); + tokenids.insert_batch(batch.iter().map(database::KeyVal::from))?; debug!("Inserted smaller batch"); } diff --git a/src/service/rooms/search/data.rs b/src/service/rooms/search/data.rs index fe5d3edf..79b23cba 100644 --- a/src/service/rooms/search/data.rs +++ b/src/service/rooms/search/data.rs @@ -20,15 +20,18 @@ impl Data { } pub(super) fn index_pdu(&self, shortroomid: u64, pdu_id: &[u8], message_body: &str) -> Result<()> { - let mut batch = tokenize(message_body).map(|word| { - let mut key = shortroomid.to_be_bytes().to_vec(); - key.extend_from_slice(word.as_bytes()); - key.push(0xFF); - key.extend_from_slice(pdu_id); // TODO: currently we save the room id a second time here - (key, Vec::new()) - }); + let batch = tokenize(message_body) + .map(|word| { + let mut key = shortroomid.to_be_bytes().to_vec(); + key.extend_from_slice(word.as_bytes()); + key.push(0xFF); + key.extend_from_slice(pdu_id); // TODO: currently we save the room id a second time here + (key, Vec::::new()) + }) + .collect::>(); - self.tokenids.insert_batch(&mut batch) + self.tokenids + .insert_batch(batch.iter().map(database::KeyVal::from)) } pub(super) fn deindex_pdu(&self, shortroomid: u64, pdu_id: &[u8], message_body: &str) -> Result<()> { diff --git a/src/service/rooms/timeline/data.rs b/src/service/rooms/timeline/data.rs index 6609bde8..023457fa 100644 --- a/src/service/rooms/timeline/data.rs +++ b/src/service/rooms/timeline/data.rs @@ -268,9 +268,9 @@ impl Data { } self.userroomid_notificationcount - .increment_batch(&mut notifies_batch.into_iter())?; + .increment_batch(notifies_batch.iter().map(Vec::as_slice))?; self.userroomid_highlightcount - .increment_batch(&mut highlights_batch.into_iter())?; + .increment_batch(highlights_batch.iter().map(Vec::as_slice))?; Ok(()) } } diff --git a/src/service/sending/data.rs b/src/service/sending/data.rs index 20e5b77a..22b69818 100644 --- a/src/service/sending/data.rs +++ b/src/service/sending/data.rs @@ -87,7 +87,7 @@ impl Data { keys.push(key); } self.servernameevent_data - .insert_batch(&mut batch.into_iter())?; + .insert_batch(batch.iter().map(database::KeyVal::from))?; Ok(keys) }