From 0522fe7d92abda7fe1478609f6cb4d334209a9fb Mon Sep 17 00:00:00 2001 From: Jason Volk Date: Tue, 2 Jul 2024 00:03:54 +0000 Subject: [PATCH] reimplement iterator from lowlevel Signed-off-by: Jason Volk --- src/database/iter.rs | 105 +++++++++++++++++++++++++++++++++++++++++++ src/database/map.rs | 37 ++++----------- src/database/mod.rs | 2 + 3 files changed, 115 insertions(+), 29 deletions(-) create mode 100644 src/database/iter.rs diff --git a/src/database/iter.rs b/src/database/iter.rs new file mode 100644 index 00000000..bbbab491 --- /dev/null +++ b/src/database/iter.rs @@ -0,0 +1,105 @@ +use std::{iter::FusedIterator, sync::Arc}; + +use conduit::Result; +use rocksdb::{ColumnFamily, DBRawIteratorWithThreadMode, Direction, IteratorMode, ReadOptions}; + +use crate::{engine::Db, map::KeyVal, result, Engine}; + +type Cursor<'cursor> = DBRawIteratorWithThreadMode<'cursor, Db>; +type Key<'item> = &'item [u8]; +type Val<'item> = &'item [u8]; +type Item<'item> = (Key<'item>, Val<'item>); + +struct State<'cursor> { + cursor: Cursor<'cursor>, + direction: Direction, + valid: bool, + init: bool, +} + +impl<'cursor> State<'cursor> { + pub(crate) fn new( + db: &'cursor Arc, cf: &'cursor Arc, opts: ReadOptions, mode: &IteratorMode<'_>, + ) -> Self { + let mut cursor = db.db.raw_iterator_cf_opt(&**cf, opts); + let direction = into_direction(mode); + let valid = seek_init(&mut cursor, mode); + Self { + cursor, + direction, + valid, + init: true, + } + } +} + +pub struct Iter<'cursor> { + state: State<'cursor>, +} + +impl<'cursor> Iter<'cursor> { + pub(crate) fn new( + db: &'cursor Arc, cf: &'cursor Arc, opts: ReadOptions, mode: &IteratorMode<'_>, + ) -> Self { + Self { + state: State::new(db, cf, opts, mode), + } + } +} + +impl Iterator for Iter<'_> { + type Item = KeyVal; + + fn next(&mut self) -> Option { + if !self.state.init && self.state.valid { + seek_next(&mut self.state.cursor, self.state.direction); + } else if self.state.init { + self.state.init = false; + } + + self.state.cursor.item().map(into_keyval).or_else(|| { + when_invalid(&mut self.state).expect("iterator invalidated due to error"); + None + }) + } +} + +impl FusedIterator for Iter<'_> {} + +fn when_invalid(state: &mut State<'_>) -> Result<()> { + state.valid = false; + result(state.cursor.status()) +} + +fn seek_next(cursor: &mut Cursor<'_>, direction: Direction) { + match direction { + Direction::Forward => cursor.next(), + Direction::Reverse => cursor.prev(), + } +} + +fn seek_init(cursor: &mut Cursor<'_>, mode: &IteratorMode<'_>) -> bool { + use Direction::{Forward, Reverse}; + use IteratorMode::{End, From, Start}; + + match mode { + Start => cursor.seek_to_first(), + End => cursor.seek_to_last(), + From(key, Forward) => cursor.seek(key), + From(key, Reverse) => cursor.seek_for_prev(key), + }; + + cursor.valid() +} + +fn into_direction(mode: &IteratorMode<'_>) -> Direction { + use Direction::{Forward, Reverse}; + use IteratorMode::{End, From, Start}; + + match mode { + Start | From(_, Forward) => Forward, + End | From(_, Reverse) => Reverse, + } +} + +fn into_keyval((key, val): Item<'_>) -> KeyVal { (Vec::from(key), Vec::from(val)) } diff --git a/src/database/map.rs b/src/database/map.rs index 8f21f76d..afd74080 100644 --- a/src/database/map.rs +++ b/src/database/map.rs @@ -5,7 +5,7 @@ use rocksdb::{ AsColumnFamilyRef, ColumnFamily, Direction, IteratorMode, ReadOptions, WriteBatchWithTransaction, WriteOptions, }; -use super::{or_else, result, watchers::Watchers, Engine}; +use crate::{or_else, result, watchers::Watchers, Engine, Iter}; pub struct Map { name: String, @@ -16,9 +16,9 @@ pub struct Map { read_options: ReadOptions, } -type Key = Vec; -type Val = Vec; -type KeyVal = (Key, Val); +pub(crate) type KeyVal = (Key, Val); +pub(crate) type Val = Vec; +pub(crate) type Key = Vec; impl Map { pub(crate) fn open(db: &Arc, name: &str) -> Result> { @@ -121,15 +121,9 @@ impl Map { } pub fn iter<'a>(&'a self) -> Box + 'a> { + let mode = IteratorMode::Start; let read_options = read_options_default(); - let it = self - .db - .db - .iterator_cf_opt(&self.cf(), read_options, IteratorMode::Start) - .map(Result::unwrap) - .map(|(k, v)| (Vec::from(k), Vec::from(v))); - - Box::new(it) + Box::new(Iter::new(&self.db, &self.cf, read_options, &mode)) } pub fn iter_from<'a>(&'a self, from: &[u8], backwards: bool) -> Box + 'a> { @@ -140,28 +134,13 @@ impl Map { }; let mode = IteratorMode::From(from, direction); let read_options = read_options_default(); - let it = self - .db - .db - .iterator_cf_opt(&self.cf(), read_options, mode) - .map(Result::unwrap) - .map(|(k, v)| (Vec::from(k), Vec::from(v))); - - Box::new(it) + Box::new(Iter::new(&self.db, &self.cf, read_options, &mode)) } pub fn scan_prefix<'a>(&'a self, prefix: Vec) -> Box + 'a> { let mode = IteratorMode::From(&prefix, Direction::Forward); let read_options = read_options_default(); - let it = self - .db - .db - .iterator_cf_opt(&self.cf(), read_options, mode) - .map(Result::unwrap) - .map(|(k, v)| (Vec::from(k), Vec::from(v))) - .take_while(move |(k, _)| k.starts_with(&prefix)); - - Box::new(it) + Box::new(Iter::new(&self.db, &self.cf, read_options, &mode).take_while(move |(k, _)| k.starts_with(&prefix))) } pub fn increment(&self, key: &[u8]) -> Result> { diff --git a/src/database/mod.rs b/src/database/mod.rs index e10f6cee..74e598b6 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1,6 +1,7 @@ mod cork; mod database; mod engine; +mod iter; mod map; pub mod maps; mod opts; @@ -12,6 +13,7 @@ extern crate rust_rocksdb as rocksdb; pub use database::Database; pub(crate) use engine::Engine; +pub use iter::Iter; pub use map::Map; pub(crate) use util::{or_else, result};