add FIFO compaction for persistent-cache descriptor; comments/cleanup
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
a57336ec13
commit
17003ba773
4 changed files with 46 additions and 17 deletions
|
@ -1,8 +1,8 @@
|
||||||
use conduwuit::{Config, Result, err, utils::math::Expected};
|
use conduwuit::{Config, Result, err, utils::math::Expected};
|
||||||
use rocksdb::{
|
use rocksdb::{
|
||||||
BlockBasedIndexType, BlockBasedOptions, BlockBasedPinningTier, Cache,
|
BlockBasedIndexType, BlockBasedOptions, BlockBasedPinningTier, Cache,
|
||||||
DBCompressionType as CompressionType, DataBlockIndexType, LruCacheOptions, Options,
|
DBCompressionType as CompressionType, DataBlockIndexType, FifoCompactOptions,
|
||||||
UniversalCompactOptions, UniversalCompactionStopStyle,
|
LruCacheOptions, Options, UniversalCompactOptions, UniversalCompactionStopStyle,
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::descriptor::{CacheDisp, Descriptor};
|
use super::descriptor::{CacheDisp, Descriptor};
|
||||||
|
@ -16,7 +16,7 @@ pub(super) const SENTINEL_COMPRESSION_LEVEL: i32 = 32767;
|
||||||
pub(crate) fn cf_options(ctx: &Context, opts: Options, desc: &Descriptor) -> Result<Options> {
|
pub(crate) fn cf_options(ctx: &Context, opts: Options, desc: &Descriptor) -> Result<Options> {
|
||||||
let cache = get_cache(ctx, desc);
|
let cache = get_cache(ctx, desc);
|
||||||
let config = &ctx.server.config;
|
let config = &ctx.server.config;
|
||||||
descriptor_cf_options(opts, desc.clone(), config, cache.as_ref())
|
descriptor_cf_options(opts, *desc, config, cache.as_ref())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn descriptor_cf_options(
|
fn descriptor_cf_options(
|
||||||
|
@ -46,6 +46,7 @@ fn descriptor_cf_options(
|
||||||
opts.set_compaction_style(desc.compaction);
|
opts.set_compaction_style(desc.compaction);
|
||||||
opts.set_compaction_pri(desc.compaction_pri);
|
opts.set_compaction_pri(desc.compaction_pri);
|
||||||
opts.set_universal_compaction_options(&uc_options(&desc));
|
opts.set_universal_compaction_options(&uc_options(&desc));
|
||||||
|
opts.set_fifo_compaction_options(&fifo_options(&desc));
|
||||||
|
|
||||||
let compression_shape: Vec<_> = desc
|
let compression_shape: Vec<_> = desc
|
||||||
.compression_shape
|
.compression_shape
|
||||||
|
@ -142,6 +143,13 @@ fn set_compression(desc: &mut Descriptor, config: &Config) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn fifo_options(desc: &Descriptor) -> FifoCompactOptions {
|
||||||
|
let mut opts = FifoCompactOptions::default();
|
||||||
|
opts.set_max_table_files_size(desc.limit_size);
|
||||||
|
|
||||||
|
opts
|
||||||
|
}
|
||||||
|
|
||||||
fn uc_options(desc: &Descriptor) -> UniversalCompactOptions {
|
fn uc_options(desc: &Descriptor) -> UniversalCompactOptions {
|
||||||
let mut opts = UniversalCompactOptions::default();
|
let mut opts = UniversalCompactOptions::default();
|
||||||
opts.set_stop_style(UniversalCompactionStopStyle::Total);
|
opts.set_stop_style(UniversalCompactionStopStyle::Total);
|
||||||
|
|
|
@ -6,14 +6,8 @@ use rocksdb::{
|
||||||
|
|
||||||
use super::cf_opts::SENTINEL_COMPRESSION_LEVEL;
|
use super::cf_opts::SENTINEL_COMPRESSION_LEVEL;
|
||||||
|
|
||||||
|
/// Column Descriptor
|
||||||
#[derive(Debug, Clone, Copy)]
|
#[derive(Debug, Clone, Copy)]
|
||||||
pub(crate) enum CacheDisp {
|
|
||||||
Unique,
|
|
||||||
Shared,
|
|
||||||
SharedWith(&'static str),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub(crate) struct Descriptor {
|
pub(crate) struct Descriptor {
|
||||||
pub(crate) name: &'static str,
|
pub(crate) name: &'static str,
|
||||||
pub(crate) dropped: bool,
|
pub(crate) dropped: bool,
|
||||||
|
@ -30,6 +24,7 @@ pub(crate) struct Descriptor {
|
||||||
pub(crate) file_shape: i32,
|
pub(crate) file_shape: i32,
|
||||||
pub(crate) level0_width: i32,
|
pub(crate) level0_width: i32,
|
||||||
pub(crate) merge_width: (i32, i32),
|
pub(crate) merge_width: (i32, i32),
|
||||||
|
pub(crate) limit_size: u64,
|
||||||
pub(crate) ttl: u64,
|
pub(crate) ttl: u64,
|
||||||
pub(crate) compaction: CompactionStyle,
|
pub(crate) compaction: CompactionStyle,
|
||||||
pub(crate) compaction_pri: CompactionPri,
|
pub(crate) compaction_pri: CompactionPri,
|
||||||
|
@ -46,7 +41,16 @@ pub(crate) struct Descriptor {
|
||||||
pub(crate) auto_readahead_max: usize,
|
pub(crate) auto_readahead_max: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) static BASE: Descriptor = Descriptor {
|
/// Cache Disposition
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
pub(crate) enum CacheDisp {
|
||||||
|
Unique,
|
||||||
|
Shared,
|
||||||
|
SharedWith(&'static str),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Base descriptor supplying common defaults to all derived descriptors.
|
||||||
|
static BASE: Descriptor = Descriptor {
|
||||||
name: EMPTY,
|
name: EMPTY,
|
||||||
dropped: false,
|
dropped: false,
|
||||||
cache_disp: CacheDisp::Shared,
|
cache_disp: CacheDisp::Shared,
|
||||||
|
@ -62,6 +66,7 @@ pub(crate) static BASE: Descriptor = Descriptor {
|
||||||
file_shape: 2,
|
file_shape: 2,
|
||||||
level0_width: 2,
|
level0_width: 2,
|
||||||
merge_width: (2, 16),
|
merge_width: (2, 16),
|
||||||
|
limit_size: 0,
|
||||||
ttl: 60 * 60 * 24 * 21,
|
ttl: 60 * 60 * 24 * 21,
|
||||||
compaction: CompactionStyle::Level,
|
compaction: CompactionStyle::Level,
|
||||||
compaction_pri: CompactionPri::MinOverlappingRatio,
|
compaction_pri: CompactionPri::MinOverlappingRatio,
|
||||||
|
@ -78,6 +83,10 @@ pub(crate) static BASE: Descriptor = Descriptor {
|
||||||
auto_readahead_max: 1024 * 1024 * 2,
|
auto_readahead_max: 1024 * 1024 * 2,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// Tombstone descriptor for columns which have been or will be deleted.
|
||||||
|
pub(crate) static DROPPED: Descriptor = Descriptor { dropped: true, ..BASE };
|
||||||
|
|
||||||
|
/// Descriptor for large datasets with random updates across the keyspace.
|
||||||
pub(crate) static RANDOM: Descriptor = Descriptor {
|
pub(crate) static RANDOM: Descriptor = Descriptor {
|
||||||
compaction_pri: CompactionPri::OldestSmallestSeqFirst,
|
compaction_pri: CompactionPri::OldestSmallestSeqFirst,
|
||||||
write_size: 1024 * 1024 * 32,
|
write_size: 1024 * 1024 * 32,
|
||||||
|
@ -88,6 +97,7 @@ pub(crate) static RANDOM: Descriptor = Descriptor {
|
||||||
..BASE
|
..BASE
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// Descriptor for large datasets with updates to the end of the keyspace.
|
||||||
pub(crate) static SEQUENTIAL: Descriptor = Descriptor {
|
pub(crate) static SEQUENTIAL: Descriptor = Descriptor {
|
||||||
compaction_pri: CompactionPri::OldestLargestSeqFirst,
|
compaction_pri: CompactionPri::OldestLargestSeqFirst,
|
||||||
write_size: 1024 * 1024 * 64,
|
write_size: 1024 * 1024 * 64,
|
||||||
|
@ -101,6 +111,7 @@ pub(crate) static SEQUENTIAL: Descriptor = Descriptor {
|
||||||
..BASE
|
..BASE
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// Descriptor for small datasets with random updates across the keyspace.
|
||||||
pub(crate) static RANDOM_SMALL: Descriptor = Descriptor {
|
pub(crate) static RANDOM_SMALL: Descriptor = Descriptor {
|
||||||
compaction: CompactionStyle::Universal,
|
compaction: CompactionStyle::Universal,
|
||||||
write_size: 1024 * 1024 * 16,
|
write_size: 1024 * 1024 * 16,
|
||||||
|
@ -117,6 +128,7 @@ pub(crate) static RANDOM_SMALL: Descriptor = Descriptor {
|
||||||
..RANDOM
|
..RANDOM
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// Descriptor for small datasets with updates to the end of the keyspace.
|
||||||
pub(crate) static SEQUENTIAL_SMALL: Descriptor = Descriptor {
|
pub(crate) static SEQUENTIAL_SMALL: Descriptor = Descriptor {
|
||||||
compaction: CompactionStyle::Universal,
|
compaction: CompactionStyle::Universal,
|
||||||
write_size: 1024 * 1024 * 16,
|
write_size: 1024 * 1024 * 16,
|
||||||
|
@ -132,3 +144,14 @@ pub(crate) static SEQUENTIAL_SMALL: Descriptor = Descriptor {
|
||||||
compressed_index: false,
|
compressed_index: false,
|
||||||
..SEQUENTIAL
|
..SEQUENTIAL
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// Descriptor for small persistent caches with random updates. Oldest entries
|
||||||
|
/// are deleted after limit_size reached.
|
||||||
|
pub(crate) static RANDOM_SMALL_CACHE: Descriptor = Descriptor {
|
||||||
|
compaction: CompactionStyle::Fifo,
|
||||||
|
cache_disp: CacheDisp::Unique,
|
||||||
|
limit_size: 1024 * 1024 * 64,
|
||||||
|
ttl: 60 * 60 * 24 * 14,
|
||||||
|
file_shape: 2,
|
||||||
|
..RANDOM_SMALL
|
||||||
|
};
|
||||||
|
|
|
@ -101,13 +101,11 @@ fn configure_cfds(
|
||||||
debug!("Creating new column {name:?} not previously found in existing database.");
|
debug!("Creating new column {name:?} not previously found in existing database.");
|
||||||
});
|
});
|
||||||
|
|
||||||
let missing_descriptors = missing
|
let missing_descriptors = missing.clone().map(|_| descriptor::DROPPED);
|
||||||
.clone()
|
|
||||||
.map(|_| Descriptor { dropped: true, ..descriptor::BASE });
|
|
||||||
|
|
||||||
let cfopts: Vec<_> = desc
|
let cfopts: Vec<_> = desc
|
||||||
.iter()
|
.iter()
|
||||||
.cloned()
|
.copied()
|
||||||
.chain(missing_descriptors)
|
.chain(missing_descriptors)
|
||||||
.map(|ref desc| cf_options(ctx, db_opts.clone(), desc))
|
.map(|ref desc| cf_options(ctx, db_opts.clone(), desc))
|
||||||
.collect::<Result<_>>()?;
|
.collect::<Result<_>>()?;
|
||||||
|
|
|
@ -233,7 +233,7 @@ pub(super) static MAPS: &[Descriptor] = &[
|
||||||
},
|
},
|
||||||
Descriptor {
|
Descriptor {
|
||||||
name: "servername_destination",
|
name: "servername_destination",
|
||||||
..descriptor::RANDOM_SMALL
|
..descriptor::RANDOM_SMALL_CACHE
|
||||||
},
|
},
|
||||||
Descriptor {
|
Descriptor {
|
||||||
name: "servername_educount",
|
name: "servername_educount",
|
||||||
|
@ -241,7 +241,7 @@ pub(super) static MAPS: &[Descriptor] = &[
|
||||||
},
|
},
|
||||||
Descriptor {
|
Descriptor {
|
||||||
name: "servername_override",
|
name: "servername_override",
|
||||||
..descriptor::RANDOM_SMALL
|
..descriptor::RANDOM_SMALL_CACHE
|
||||||
},
|
},
|
||||||
Descriptor {
|
Descriptor {
|
||||||
name: "servernameevent_data",
|
name: "servernameevent_data",
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue