add database get_batch stream wrapper
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
9f7a4a012b
commit
14e3b242df
3 changed files with 28 additions and 19 deletions
|
@ -1,8 +1,8 @@
|
||||||
use std::{convert::AsRef, fmt::Debug, future::Future, io::Write};
|
use std::{convert::AsRef, fmt::Debug, future::Future, io::Write};
|
||||||
|
|
||||||
use arrayvec::ArrayVec;
|
use arrayvec::ArrayVec;
|
||||||
use conduit::{err, implement, Result};
|
use conduit::{err, implement, utils::IterStream, Result};
|
||||||
use futures::future::ready;
|
use futures::{future::ready, Stream};
|
||||||
use rocksdb::DBPinnableSlice;
|
use rocksdb::DBPinnableSlice;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
|
|
||||||
|
@ -50,6 +50,7 @@ where
|
||||||
/// Fetch a value from the database into cache, returning a reference-handle
|
/// Fetch a value from the database into cache, returning a reference-handle
|
||||||
/// asynchronously. The key is referenced directly to perform the query.
|
/// asynchronously. The key is referenced directly to perform the query.
|
||||||
#[implement(super::Map)]
|
#[implement(super::Map)]
|
||||||
|
#[tracing::instrument(skip(self, key), fields(%self), level = "trace")]
|
||||||
pub fn get<K>(&self, key: &K) -> impl Future<Output = Result<Handle<'_>>> + Send
|
pub fn get<K>(&self, key: &K) -> impl Future<Output = Result<Handle<'_>>> + Send
|
||||||
where
|
where
|
||||||
K: AsRef<[u8]> + ?Sized + Debug,
|
K: AsRef<[u8]> + ?Sized + Debug,
|
||||||
|
@ -61,10 +62,9 @@ where
|
||||||
/// The key is referenced directly to perform the query. This is a thread-
|
/// The key is referenced directly to perform the query. This is a thread-
|
||||||
/// blocking call.
|
/// blocking call.
|
||||||
#[implement(super::Map)]
|
#[implement(super::Map)]
|
||||||
#[tracing::instrument(skip(self, key), fields(%self), level = "trace")]
|
|
||||||
pub fn get_blocking<K>(&self, key: &K) -> Result<Handle<'_>>
|
pub fn get_blocking<K>(&self, key: &K) -> Result<Handle<'_>>
|
||||||
where
|
where
|
||||||
K: AsRef<[u8]> + ?Sized + Debug,
|
K: AsRef<[u8]> + ?Sized,
|
||||||
{
|
{
|
||||||
let res = self
|
let res = self
|
||||||
.db
|
.db
|
||||||
|
@ -76,10 +76,19 @@ where
|
||||||
|
|
||||||
#[implement(super::Map)]
|
#[implement(super::Map)]
|
||||||
#[tracing::instrument(skip(self, keys), fields(%self), level = "trace")]
|
#[tracing::instrument(skip(self, keys), fields(%self), level = "trace")]
|
||||||
pub fn get_batch_blocking<'a, I, K>(&self, keys: I) -> Vec<Result<Handle<'_>>>
|
pub fn get_batch<'a, I, K>(&self, keys: I) -> impl Stream<Item = Result<Handle<'_>>>
|
||||||
where
|
where
|
||||||
I: Iterator<Item = &'a K> + ExactSizeIterator + Send + Debug,
|
I: Iterator<Item = &'a K> + ExactSizeIterator + Send + Debug,
|
||||||
K: AsRef<[u8]> + Sized + Debug + 'a,
|
K: AsRef<[u8]> + Send + Sync + Sized + Debug + 'a,
|
||||||
|
{
|
||||||
|
self.get_batch_blocking(keys).stream()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[implement(super::Map)]
|
||||||
|
pub fn get_batch_blocking<'a, I, K>(&self, keys: I) -> impl Iterator<Item = Result<Handle<'_>>>
|
||||||
|
where
|
||||||
|
I: Iterator<Item = &'a K> + ExactSizeIterator + Send,
|
||||||
|
K: AsRef<[u8]> + Sized + 'a,
|
||||||
{
|
{
|
||||||
// Optimization can be `true` if key vector is pre-sorted **by the column
|
// Optimization can be `true` if key vector is pre-sorted **by the column
|
||||||
// comparator**.
|
// comparator**.
|
||||||
|
@ -91,7 +100,6 @@ where
|
||||||
.batched_multi_get_cf_opt(&self.cf(), keys, SORTED, read_options)
|
.batched_multi_get_cf_opt(&self.cf(), keys, SORTED, read_options)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(into_result_handle)
|
.map(into_result_handle)
|
||||||
.collect()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn into_result_handle(result: RocksdbResult<'_>) -> Result<Handle<'_>> {
|
fn into_result_handle(result: RocksdbResult<'_>) -> Result<Handle<'_>> {
|
||||||
|
|
|
@ -6,7 +6,7 @@ use std::{
|
||||||
};
|
};
|
||||||
|
|
||||||
use conduit::{debug, debug_error, trace, utils::IterStream, validated, warn, Err, Result};
|
use conduit::{debug, debug_error, trace, utils::IterStream, validated, warn, Err, Result};
|
||||||
use futures::Stream;
|
use futures::{Stream, StreamExt};
|
||||||
use ruma::{EventId, RoomId};
|
use ruma::{EventId, RoomId};
|
||||||
|
|
||||||
use self::data::Data;
|
use self::data::Data;
|
||||||
|
@ -69,15 +69,15 @@ impl Service {
|
||||||
const BUCKET: BTreeSet<(u64, &EventId)> = BTreeSet::new();
|
const BUCKET: BTreeSet<(u64, &EventId)> = BTreeSet::new();
|
||||||
|
|
||||||
let started = std::time::Instant::now();
|
let started = std::time::Instant::now();
|
||||||
let mut buckets = [BUCKET; NUM_BUCKETS];
|
let mut starting_ids = self
|
||||||
for (i, &short) in self
|
|
||||||
.services
|
.services
|
||||||
.short
|
.short
|
||||||
.multi_get_or_create_shorteventid(starting_events)
|
.multi_get_or_create_shorteventid(starting_events)
|
||||||
.await
|
|
||||||
.iter()
|
|
||||||
.enumerate()
|
.enumerate()
|
||||||
{
|
.boxed();
|
||||||
|
|
||||||
|
let mut buckets = [BUCKET; NUM_BUCKETS];
|
||||||
|
while let Some((i, short)) = starting_ids.next().await {
|
||||||
let bucket: usize = short.try_into()?;
|
let bucket: usize = short.try_into()?;
|
||||||
let bucket: usize = validated!(bucket % NUM_BUCKETS);
|
let bucket: usize = validated!(bucket % NUM_BUCKETS);
|
||||||
buckets[bucket].insert((short, starting_events[i]));
|
buckets[bucket].insert((short, starting_events[i]));
|
||||||
|
|
|
@ -3,6 +3,7 @@ use std::{mem::size_of_val, sync::Arc};
|
||||||
pub use conduit::pdu::{ShortEventId, ShortId, ShortRoomId};
|
pub use conduit::pdu::{ShortEventId, ShortId, ShortRoomId};
|
||||||
use conduit::{err, implement, utils, Result};
|
use conduit::{err, implement, utils, Result};
|
||||||
use database::{Deserialized, Map};
|
use database::{Deserialized, Map};
|
||||||
|
use futures::{Stream, StreamExt};
|
||||||
use ruma::{events::StateEventType, EventId, RoomId};
|
use ruma::{events::StateEventType, EventId, RoomId};
|
||||||
|
|
||||||
use crate::{globals, Dep};
|
use crate::{globals, Dep};
|
||||||
|
@ -71,11 +72,12 @@ pub async fn get_or_create_shorteventid(&self, event_id: &EventId) -> ShortEvent
|
||||||
}
|
}
|
||||||
|
|
||||||
#[implement(Service)]
|
#[implement(Service)]
|
||||||
pub async fn multi_get_or_create_shorteventid(&self, event_ids: &[&EventId]) -> Vec<ShortEventId> {
|
pub fn multi_get_or_create_shorteventid<'a>(
|
||||||
|
&'a self, event_ids: &'a [&EventId],
|
||||||
|
) -> impl Stream<Item = ShortEventId> + Send + 'a {
|
||||||
self.db
|
self.db
|
||||||
.eventid_shorteventid
|
.eventid_shorteventid
|
||||||
.get_batch_blocking(event_ids.iter())
|
.get_batch(event_ids.iter())
|
||||||
.into_iter()
|
|
||||||
.enumerate()
|
.enumerate()
|
||||||
.map(|(i, result)| match result {
|
.map(|(i, result)| match result {
|
||||||
Ok(ref short) => utils::u64_from_u8(short),
|
Ok(ref short) => utils::u64_from_u8(short),
|
||||||
|
@ -95,7 +97,6 @@ pub async fn multi_get_or_create_shorteventid(&self, event_ids: &[&EventId]) ->
|
||||||
short
|
short
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
.collect()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[implement(Service)]
|
#[implement(Service)]
|
||||||
|
@ -163,10 +164,10 @@ pub async fn multi_get_eventid_from_short(&self, shorteventid: &[ShortEventId])
|
||||||
|
|
||||||
self.db
|
self.db
|
||||||
.shorteventid_eventid
|
.shorteventid_eventid
|
||||||
.get_batch_blocking(keys.iter())
|
.get_batch(keys.iter())
|
||||||
.into_iter()
|
|
||||||
.map(Deserialized::deserialized)
|
.map(Deserialized::deserialized)
|
||||||
.collect()
|
.collect()
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[implement(Service)]
|
#[implement(Service)]
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue