feat: add threadpool for iterator threads, bug fixes, tracing_flame support

This commit is contained in:
Timo Kösters 2021-07-29 08:36:01 +02:00
parent e0072eff63
commit 5e924227b6
No known key found for this signature in database
GPG key ID: 24DA7517711A2BA4
26 changed files with 472 additions and 228 deletions

View file

@ -3,9 +3,8 @@ use crate::{database::Config, Result};
use crossbeam::channel::{
bounded, unbounded, Receiver as ChannelReceiver, Sender as ChannelSender, TryRecvError,
};
use log::debug;
use parking_lot::{Mutex, MutexGuard, RwLock};
use rusqlite::{params, Connection, DatabaseName::Main, OptionalExtension};
use rusqlite::{params, Connection, DatabaseName::Main, OptionalExtension, Params};
use std::{
collections::HashMap,
future::Future,
@ -13,10 +12,11 @@ use std::{
path::{Path, PathBuf},
pin::Pin,
sync::Arc,
thread,
time::{Duration, Instant},
};
use threadpool::ThreadPool;
use tokio::sync::oneshot::Sender;
use tracing::{debug, warn};
struct Pool {
writer: Mutex<Connection>,
@ -86,9 +86,9 @@ impl Deref for RecycledConn {
impl Drop for RecycledConn {
fn drop(&mut self) {
if let Some(conn) = self.0.take() {
log::debug!("Recycled connection");
debug!("Recycled connection");
if let Err(e) = self.1.send(conn) {
log::warn!("Recycling a connection led to the following error: {:?}", e)
warn!("Recycling a connection led to the following error: {:?}", e)
}
}
}
@ -149,14 +149,14 @@ impl Pool {
}
}
log::debug!("read_lock: All permanent readers locked, obtaining spillover reader...");
debug!("read_lock: All permanent readers locked, obtaining spillover reader...");
// We didn't get a connection from the permanent pool, so we'll dumpster-dive for recycled connections.
// Either we have a connection or we dont, if we don't, we make a new one.
let conn = match self.spills.try_take() {
Some(conn) => conn,
None => {
log::debug!("read_lock: No recycled connections left, creating new one...");
debug!("read_lock: No recycled connections left, creating new one...");
Self::prepare_conn(&self.path, None).unwrap()
}
};
@ -169,7 +169,7 @@ impl Pool {
// If the spillover readers are more than the number of total readers, there might be a problem.
if now_count > self.readers.len() {
log::warn!(
warn!(
"Database is under high load. Consider increasing sqlite_read_pool_size ({} spillover readers exist)",
now_count
);
@ -182,6 +182,7 @@ impl Pool {
pub struct Engine {
pool: Pool,
iter_pool: Mutex<ThreadPool>,
}
impl DatabaseEngine for Engine {
@ -195,7 +196,10 @@ impl DatabaseEngine for Engine {
pool.write_lock()
.execute("CREATE TABLE IF NOT EXISTS _noop (\"key\" INT)", params![])?;
let arc = Arc::new(Engine { pool });
let arc = Arc::new(Engine {
pool,
iter_pool: Mutex::new(ThreadPool::new(10)),
});
Ok(arc)
}
@ -259,7 +263,7 @@ impl Engine {
}
}
log::debug!("Reaped {} connections", reaped);
debug!("Reaped {} connections", reaped);
}
}
@ -272,6 +276,7 @@ pub struct SqliteTable {
type TupleOfBytes = (Vec<u8>, Vec<u8>);
impl SqliteTable {
#[tracing::instrument(skip(self, guard, key))]
fn get_with_guard(&self, guard: &Connection, key: &[u8]) -> Result<Option<Vec<u8>>> {
Ok(guard
.prepare(format!("SELECT value FROM {} WHERE key = ?", self.name).as_str())?
@ -279,6 +284,7 @@ impl SqliteTable {
.optional()?)
}
#[tracing::instrument(skip(self, guard, key, value))]
fn insert_with_guard(&self, guard: &Connection, key: &[u8], value: &[u8]) -> Result<()> {
guard.execute(
format!(
@ -291,41 +297,67 @@ impl SqliteTable {
Ok(())
}
fn _iter_from_thread<F>(&self, f: F) -> Box<dyn Iterator<Item = TupleOfBytes> + Send>
where
F: (for<'a> FnOnce(&'a Connection, ChannelSender<TupleOfBytes>)) + Send + 'static,
{
#[tracing::instrument(skip(self, sql, param))]
fn iter_from_thread(
&self,
sql: String,
param: Option<Vec<u8>>,
) -> Box<dyn Iterator<Item = TupleOfBytes> + Send + Sync> {
let (s, r) = bounded::<TupleOfBytes>(5);
let engine = self.engine.clone();
let engine = Arc::clone(&self.engine);
thread::spawn(move || {
let _ = f(&engine.pool.read_lock(), s);
});
let lock = self.engine.iter_pool.lock();
if lock.active_count() < lock.max_count() {
lock.execute(move || {
if let Some(param) = param {
iter_from_thread_work(&engine.pool.read_lock(), &s, &sql, [param]);
} else {
iter_from_thread_work(&engine.pool.read_lock(), &s, &sql, []);
}
});
} else {
std::thread::spawn(move || {
if let Some(param) = param {
iter_from_thread_work(&engine.pool.read_lock(), &s, &sql, [param]);
} else {
iter_from_thread_work(&engine.pool.read_lock(), &s, &sql, []);
}
});
}
Box::new(r.into_iter())
}
}
macro_rules! iter_from_thread {
($self:expr, $sql:expr, $param:expr) => {
$self._iter_from_thread(move |guard, s| {
let _ = guard
.prepare($sql)
.unwrap()
.query_map($param, |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
.unwrap()
.map(|r| r.unwrap())
.try_for_each(|bob| s.send(bob));
})
};
fn iter_from_thread_work<P>(
guard: &HoldingConn<'_>,
s: &ChannelSender<(Vec<u8>, Vec<u8>)>,
sql: &str,
params: P,
) where
P: Params,
{
for bob in guard
.prepare(sql)
.unwrap()
.query_map(params, |row| Ok((row.get_unwrap(0), row.get_unwrap(1))))
.unwrap()
.map(|r| r.unwrap())
{
if s.send(bob).is_err() {
return;
}
}
}
impl Tree for SqliteTable {
#[tracing::instrument(skip(self, key))]
fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>> {
self.get_with_guard(&self.engine.pool.read_lock(), key)
}
#[tracing::instrument(skip(self, key, value))]
fn insert(&self, key: &[u8], value: &[u8]) -> Result<()> {
let guard = self.engine.pool.write_lock();
@ -365,6 +397,7 @@ impl Tree for SqliteTable {
Ok(())
}
#[tracing::instrument(skip(self, key))]
fn remove(&self, key: &[u8]) -> Result<()> {
let guard = self.engine.pool.write_lock();
@ -385,15 +418,13 @@ impl Tree for SqliteTable {
Ok(())
}
#[tracing::instrument(skip(self))]
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item = TupleOfBytes> + Send + 'a> {
let name = self.name.clone();
iter_from_thread!(
self,
format!("SELECT key, value FROM {}", name).as_str(),
params![]
)
self.iter_from_thread(format!("SELECT key, value FROM {}", name), None)
}
#[tracing::instrument(skip(self, from, backwards))]
fn iter_from<'a>(
&'a self,
from: &[u8],
@ -402,28 +433,25 @@ impl Tree for SqliteTable {
let name = self.name.clone();
let from = from.to_vec(); // TODO change interface?
if backwards {
iter_from_thread!(
self,
self.iter_from_thread(
format!(
"SELECT key, value FROM {} WHERE key <= ? ORDER BY key DESC",
name
)
.as_str(),
[from]
),
Some(from),
)
} else {
iter_from_thread!(
self,
self.iter_from_thread(
format!(
"SELECT key, value FROM {} WHERE key >= ? ORDER BY key ASC",
name
)
.as_str(),
[from]
),
Some(from),
)
}
}
#[tracing::instrument(skip(self, key))]
fn increment(&self, key: &[u8]) -> Result<Vec<u8>> {
let guard = self.engine.pool.write_lock();
@ -446,18 +474,17 @@ impl Tree for SqliteTable {
Ok(new)
}
#[tracing::instrument(skip(self, prefix))]
fn scan_prefix<'a>(
&'a self,
prefix: Vec<u8>,
) -> Box<dyn Iterator<Item = TupleOfBytes> + Send + 'a> {
// let name = self.name.clone();
// iter_from_thread!(
// self,
// self.iter_from_thread(
// format!(
// "SELECT key, value FROM {} WHERE key BETWEEN ?1 AND ?1 || X'FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF' ORDER BY key ASC",
// name
// )
// .as_str(),
// [prefix]
// )
Box::new(
@ -466,6 +493,7 @@ impl Tree for SqliteTable {
)
}
#[tracing::instrument(skip(self, prefix))]
fn watch_prefix<'a>(&'a self, prefix: &[u8]) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>> {
let (tx, rx) = tokio::sync::oneshot::channel();
@ -481,6 +509,7 @@ impl Tree for SqliteTable {
})
}
#[tracing::instrument(skip(self))]
fn clear(&self) -> Result<()> {
debug!("clear: running");
self.engine