rename src/bin to src/main
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
4aeec78ab4
commit
362649ff87
4 changed files with 0 additions and 0 deletions
124
src/main/Cargo.toml
Normal file
124
src/main/Cargo.toml
Normal file
|
@ -0,0 +1,124 @@
|
|||
[package]
|
||||
# TODO: when can we rename to conduwuit?
|
||||
name = "conduit"
|
||||
default-run = "conduit"
|
||||
description.workspace = true
|
||||
license.workspace = true
|
||||
authors.workspace = true
|
||||
homepage.workspace = true
|
||||
repository.workspace = true
|
||||
readme.workspace = true
|
||||
version.workspace = true
|
||||
edition.workspace = true
|
||||
rust-version.workspace = true
|
||||
metadata.crane.workspace = true
|
||||
|
||||
[package.metadata.deb]
|
||||
name = "conduwuit"
|
||||
maintainer = "strawberry <strawberry@puppygock.gay>"
|
||||
copyright = "2024, strawberry <strawberry@puppygock.gay>"
|
||||
license-file = ["../../LICENSE", "3"]
|
||||
depends = "$auto, ca-certificates"
|
||||
extended-description = """\
|
||||
a cool hard fork of Conduit, a Matrix homeserver written in Rust"""
|
||||
section = "net"
|
||||
priority = "optional"
|
||||
conf-files = ["/etc/conduwuit/conduwuit.toml"]
|
||||
maintainer-scripts = "../../debian/"
|
||||
systemd-units = { unit-name = "conduwuit", start = false }
|
||||
assets = [
|
||||
["../../debian/README.md", "usr/share/doc/conduwuit/README.Debian", "644"],
|
||||
["../../README.md", "usr/share/doc/conduwuit/", "644"],
|
||||
["../../target/release/conduwuit", "usr/sbin/conduwuit", "755"],
|
||||
["../../conduwuit-example.toml", "etc/conduwuit/conduwuit.toml", "640"],
|
||||
]
|
||||
|
||||
[features]
|
||||
default = [
|
||||
"sentry_telemetry",
|
||||
"release_max_log_level",
|
||||
]
|
||||
|
||||
# increases performance, reduces build times, and reduces binary size by not compiling or
|
||||
# genreating code for log level filters that users will generally not use (debug and trace)
|
||||
release_max_log_level = [
|
||||
"tracing/max_level_trace",
|
||||
"tracing/release_max_level_info",
|
||||
"log/max_level_trace",
|
||||
"log/release_max_level_info",
|
||||
]
|
||||
sentry_telemetry = [
|
||||
"dep:sentry",
|
||||
"dep:sentry-tracing",
|
||||
"dep:sentry-tower",
|
||||
]
|
||||
# enable the tokio_console server ncompatible with release_max_log_level
|
||||
tokio_console = [
|
||||
"dep:console-subscriber",
|
||||
"tokio/tracing",
|
||||
]
|
||||
perf_measurements = [
|
||||
"dep:opentelemetry",
|
||||
"dep:tracing-flame",
|
||||
"dep:tracing-opentelemetry",
|
||||
"dep:opentelemetry_sdk",
|
||||
"dep:opentelemetry-jaeger",
|
||||
]
|
||||
jemalloc = [
|
||||
"dep:tikv-jemallocator",
|
||||
]
|
||||
panic_trap = []
|
||||
mods = []
|
||||
|
||||
[dependencies]
|
||||
conduit-router.workspace = true
|
||||
conduit-admin.workspace = true
|
||||
conduit-api.workspace = true
|
||||
conduit-service.workspace = true
|
||||
conduit-database.workspace = true
|
||||
conduit-core.workspace = true
|
||||
|
||||
tokio.workspace = true
|
||||
log.workspace = true
|
||||
tracing.workspace = true
|
||||
tracing-subscriber.workspace = true
|
||||
clap.workspace = true
|
||||
num_cpus.workspace = true
|
||||
|
||||
opentelemetry.workspace = true
|
||||
opentelemetry.optional = true
|
||||
tracing-flame.workspace = true
|
||||
tracing-flame.optional = true
|
||||
tracing-opentelemetry.workspace = true
|
||||
tracing-opentelemetry.optional = true
|
||||
opentelemetry_sdk.workspace = true
|
||||
opentelemetry_sdk.optional = true
|
||||
opentelemetry-jaeger.workspace = true
|
||||
opentelemetry-jaeger.optional = true
|
||||
|
||||
sentry.workspace = true
|
||||
sentry.optional = true
|
||||
sentry-tracing.workspace = true
|
||||
sentry-tracing.optional = true
|
||||
sentry-tower.workspace = true
|
||||
sentry-tower.optional = true
|
||||
|
||||
tikv-jemallocator.workspace = true
|
||||
tikv-jemallocator.optional = true
|
||||
|
||||
tokio-metrics.workspace = true
|
||||
tokio-metrics.optional = true
|
||||
|
||||
console-subscriber.workspace = true
|
||||
console-subscriber.optional = true
|
||||
|
||||
[target.'cfg(all(not(target_env = "msvc"), target_os = "linux"))'.dependencies]
|
||||
hardened_malloc-rs.workspace = true
|
||||
hardened_malloc-rs.optional = true
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[[bin]]
|
||||
name = "conduit"
|
||||
path = "main.rs"
|
96
src/main/main.rs
Normal file
96
src/main/main.rs
Normal file
|
@ -0,0 +1,96 @@
|
|||
mod mods;
|
||||
mod server;
|
||||
|
||||
extern crate conduit_core as conduit;
|
||||
|
||||
use std::{cmp, sync::Arc, time::Duration};
|
||||
|
||||
use conduit::{debug_info, error, utils::clap, Error, Result};
|
||||
use server::Server;
|
||||
use tokio::runtime;
|
||||
|
||||
const WORKER_NAME: &str = "conduwuit:worker";
|
||||
const WORKER_MIN: usize = 2;
|
||||
const WORKER_KEEPALIVE_MS: u64 = 2500;
|
||||
|
||||
fn main() -> Result<(), Error> {
|
||||
let args = clap::parse();
|
||||
let runtime = runtime::Builder::new_multi_thread()
|
||||
.enable_io()
|
||||
.enable_time()
|
||||
.thread_name(WORKER_NAME)
|
||||
.worker_threads(cmp::max(WORKER_MIN, num_cpus::get()))
|
||||
.thread_keep_alive(Duration::from_millis(WORKER_KEEPALIVE_MS))
|
||||
.build()
|
||||
.expect("built runtime");
|
||||
|
||||
let handle = runtime.handle();
|
||||
let server: Arc<Server> = Server::build(args, Some(handle))?;
|
||||
runtime.block_on(async { async_main(server.clone()).await })?;
|
||||
|
||||
// explicit drop here to trace thread and tls dtors
|
||||
drop(runtime);
|
||||
|
||||
debug_info!("Exit");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Operate the server normally in release-mode static builds. This will start,
|
||||
/// run and stop the server within the asynchronous runtime.
|
||||
#[cfg(not(feature = "mods"))]
|
||||
async fn async_main(server: Arc<Server>) -> Result<(), Error> {
|
||||
extern crate conduit_router as router;
|
||||
use tracing::error;
|
||||
|
||||
if let Err(error) = router::start(&server.server).await {
|
||||
error!("Critical error starting server: {error}");
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
if let Err(error) = router::run(&server.server).await {
|
||||
error!("Critical error running server: {error}");
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
if let Err(error) = router::stop(&server.server).await {
|
||||
error!("Critical error stopping server: {error}");
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
debug_info!("Exit runtime");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Operate the server in developer-mode dynamic builds. This will start, run,
|
||||
/// and hot-reload portions of the server as-needed before returning for an
|
||||
/// actual shutdown. This is not available in release-mode or static builds.
|
||||
#[cfg(feature = "mods")]
|
||||
async fn async_main(server: Arc<Server>) -> Result<(), Error> {
|
||||
let mut starts = true;
|
||||
let mut reloads = true;
|
||||
while reloads {
|
||||
if let Err(error) = mods::open(&server).await {
|
||||
error!("Loading router: {error}");
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
let result = mods::run(&server, starts).await;
|
||||
if let Ok(result) = result {
|
||||
(starts, reloads) = result;
|
||||
}
|
||||
|
||||
let force = !reloads || result.is_err();
|
||||
if let Err(error) = mods::close(&server, force).await {
|
||||
error!("Unloading router: {error}");
|
||||
return Err(error);
|
||||
}
|
||||
|
||||
if let Err(error) = result {
|
||||
error!("{error}");
|
||||
return Err(error);
|
||||
}
|
||||
}
|
||||
|
||||
debug_info!("Exit runtime");
|
||||
Ok(())
|
||||
}
|
129
src/main/mods.rs
Normal file
129
src/main/mods.rs
Normal file
|
@ -0,0 +1,129 @@
|
|||
#![cfg(feature = "mods")]
|
||||
#[cfg(not(any(clippy, debug_assertions, doctest, test)))]
|
||||
compile_error!("Feature 'mods' is only available in developer builds");
|
||||
|
||||
use std::{
|
||||
future::Future,
|
||||
pin::Pin,
|
||||
sync::{atomic::Ordering, Arc},
|
||||
};
|
||||
|
||||
use conduit::{mods, Error, Result};
|
||||
use tracing::{debug, error};
|
||||
|
||||
use crate::Server;
|
||||
|
||||
type RunFuncResult = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
|
||||
type RunFuncProto = fn(&Arc<conduit::Server>) -> RunFuncResult;
|
||||
|
||||
const RESTART_THRESH: &str = "conduit_service";
|
||||
const MODULE_NAMES: &[&str] = &[
|
||||
//"conduit_core",
|
||||
"conduit_database",
|
||||
"conduit_service",
|
||||
"conduit_api",
|
||||
"conduit_admin",
|
||||
"conduit_router",
|
||||
];
|
||||
|
||||
#[cfg(feature = "panic_trap")]
|
||||
conduit::mod_init! {{
|
||||
conduit::debug::set_panic_trap();
|
||||
}}
|
||||
|
||||
pub(crate) async fn run(server: &Arc<Server>, starts: bool) -> Result<(bool, bool), Error> {
|
||||
let main_lock = server.mods.read().await;
|
||||
let main_mod = (*main_lock).last().expect("main module loaded");
|
||||
if starts {
|
||||
let start = main_mod.get::<RunFuncProto>("start")?;
|
||||
if let Err(error) = start(&server.server).await {
|
||||
error!("Starting server: {error}");
|
||||
return Err(error);
|
||||
}
|
||||
}
|
||||
let run = main_mod.get::<RunFuncProto>("run")?;
|
||||
if let Err(error) = run(&server.server).await {
|
||||
error!("Running server: {error}");
|
||||
return Err(error);
|
||||
}
|
||||
let reloads = server.server.reload.swap(false, Ordering::AcqRel);
|
||||
let stops = !reloads || stale(server).await? <= restart_thresh();
|
||||
let starts = reloads && stops;
|
||||
if stops {
|
||||
let stop = main_mod.get::<RunFuncProto>("stop")?;
|
||||
if let Err(error) = stop(&server.server).await {
|
||||
error!("Stopping server: {error}");
|
||||
return Err(error);
|
||||
}
|
||||
}
|
||||
|
||||
Ok((starts, reloads))
|
||||
}
|
||||
|
||||
pub(crate) async fn open(server: &Arc<Server>) -> Result<usize, Error> {
|
||||
let mut mods_lock = server.mods.write().await;
|
||||
let mods: &mut Vec<mods::Module> = &mut mods_lock;
|
||||
debug!(
|
||||
available = %available(),
|
||||
loaded = %mods.len(),
|
||||
"Loading modules",
|
||||
);
|
||||
|
||||
for (i, name) in MODULE_NAMES.iter().enumerate() {
|
||||
if mods.get(i).is_none() {
|
||||
mods.push(mods::Module::from_name(name)?);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(mods.len())
|
||||
}
|
||||
|
||||
pub(crate) async fn close(server: &Arc<Server>, force: bool) -> Result<usize, Error> {
|
||||
let stale = stale_count(server).await;
|
||||
let mut mods_lock = server.mods.write().await;
|
||||
let mods: &mut Vec<mods::Module> = &mut mods_lock;
|
||||
debug!(
|
||||
available = %available(),
|
||||
loaded = %mods.len(),
|
||||
stale = %stale,
|
||||
force,
|
||||
"Unloading modules",
|
||||
);
|
||||
|
||||
while mods.last().is_some() {
|
||||
let module = &mods.last().expect("module");
|
||||
if force || module.deleted()? {
|
||||
mods.pop();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(mods.len())
|
||||
}
|
||||
|
||||
async fn stale_count(server: &Arc<Server>) -> usize {
|
||||
let watermark = stale(server).await.unwrap_or(available());
|
||||
available() - watermark
|
||||
}
|
||||
|
||||
async fn stale(server: &Arc<Server>) -> Result<usize, Error> {
|
||||
let mods_lock = server.mods.read().await;
|
||||
let mods: &Vec<mods::Module> = &mods_lock;
|
||||
for (i, module) in mods.iter().enumerate() {
|
||||
if module.deleted()? {
|
||||
return Ok(i);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(mods.len())
|
||||
}
|
||||
|
||||
fn restart_thresh() -> usize {
|
||||
MODULE_NAMES
|
||||
.iter()
|
||||
.position(|&name| name.ends_with(RESTART_THRESH))
|
||||
.unwrap_or(MODULE_NAMES.len())
|
||||
}
|
||||
|
||||
const fn available() -> usize { MODULE_NAMES.len() }
|
186
src/main/server.rs
Normal file
186
src/main/server.rs
Normal file
|
@ -0,0 +1,186 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use conduit::{
|
||||
conduwuit_version,
|
||||
config::Config,
|
||||
info,
|
||||
log::{LogLevelReloadHandles, ReloadHandle},
|
||||
utils::{clap, maximize_fd_limit},
|
||||
Error, Result,
|
||||
};
|
||||
use tokio::runtime;
|
||||
use tracing_subscriber::{prelude::*, reload, EnvFilter, Registry};
|
||||
|
||||
/// Server runtime state; complete
|
||||
pub(crate) struct Server {
|
||||
/// Server runtime state; public portion
|
||||
pub(crate) server: Arc<conduit::Server>,
|
||||
|
||||
_tracing_flame_guard: TracingFlameGuard,
|
||||
|
||||
#[cfg(feature = "sentry_telemetry")]
|
||||
_sentry_guard: Option<sentry::ClientInitGuard>,
|
||||
|
||||
// Module instances; TODO: move to mods::loaded mgmt vector
|
||||
#[cfg(feature = "mods")]
|
||||
pub(crate) mods: tokio::sync::RwLock<Vec<conduit::mods::Module>>,
|
||||
}
|
||||
|
||||
impl Server {
|
||||
pub(crate) fn build(args: clap::Args, runtime: Option<&runtime::Handle>) -> Result<Arc<Server>, Error> {
|
||||
let config = Config::new(args.config)?;
|
||||
#[cfg(feature = "sentry_telemetry")]
|
||||
let sentry_guard = init_sentry(&config);
|
||||
let (tracing_reload_handle, tracing_flame_guard) = init_tracing(&config);
|
||||
|
||||
config.check()?;
|
||||
#[cfg(unix)]
|
||||
maximize_fd_limit().expect("Unable to increase maximum soft and hard file descriptor limit");
|
||||
info!(
|
||||
server_name = %config.server_name,
|
||||
database_path = ?config.database_path,
|
||||
log_levels = %config.log,
|
||||
"{}",
|
||||
conduwuit_version(),
|
||||
);
|
||||
|
||||
Ok(Arc::new(Server {
|
||||
server: Arc::new(conduit::Server::new(config, runtime.cloned(), tracing_reload_handle)),
|
||||
|
||||
_tracing_flame_guard: tracing_flame_guard,
|
||||
|
||||
#[cfg(feature = "sentry_telemetry")]
|
||||
_sentry_guard: sentry_guard,
|
||||
|
||||
#[cfg(feature = "mods")]
|
||||
mods: tokio::sync::RwLock::new(Vec::new()),
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "sentry_telemetry")]
|
||||
fn init_sentry(config: &Config) -> Option<sentry::ClientInitGuard> {
|
||||
if !config.sentry {
|
||||
return None;
|
||||
}
|
||||
|
||||
let sentry_endpoint = config
|
||||
.sentry_endpoint
|
||||
.as_ref()
|
||||
.expect("init_sentry should only be called if sentry is enabled and this is not None")
|
||||
.as_str();
|
||||
|
||||
let server_name = if config.sentry_send_server_name {
|
||||
Some(config.server_name.to_string().into())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
Some(sentry::init((
|
||||
sentry_endpoint,
|
||||
sentry::ClientOptions {
|
||||
release: sentry::release_name!(),
|
||||
traces_sample_rate: config.sentry_traces_sample_rate,
|
||||
server_name,
|
||||
..Default::default()
|
||||
},
|
||||
)))
|
||||
}
|
||||
|
||||
#[cfg(feature = "perf_measurements")]
|
||||
type TracingFlameGuard = Option<tracing_flame::FlushGuard<std::io::BufWriter<std::fs::File>>>;
|
||||
#[cfg(not(feature = "perf_measurements"))]
|
||||
type TracingFlameGuard = ();
|
||||
|
||||
// clippy thinks the filter_layer clones are redundant if the next usage is
|
||||
// behind a disabled feature.
|
||||
#[allow(clippy::redundant_clone)]
|
||||
fn init_tracing(config: &Config) -> (LogLevelReloadHandles, TracingFlameGuard) {
|
||||
let registry = Registry::default();
|
||||
let fmt_layer = tracing_subscriber::fmt::Layer::new();
|
||||
let filter_layer = match EnvFilter::try_new(&config.log) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
eprintln!("It looks like your config is invalid. The following error occured while parsing it: {e}");
|
||||
EnvFilter::try_new("warn").unwrap()
|
||||
},
|
||||
};
|
||||
|
||||
let mut reload_handles = Vec::<Box<dyn ReloadHandle<EnvFilter> + Send + Sync>>::new();
|
||||
let subscriber = registry;
|
||||
|
||||
#[cfg(feature = "tokio_console")]
|
||||
let subscriber = {
|
||||
let console_layer = console_subscriber::spawn();
|
||||
subscriber.with(console_layer)
|
||||
};
|
||||
|
||||
let (fmt_reload_filter, fmt_reload_handle) = reload::Layer::new(filter_layer.clone());
|
||||
reload_handles.push(Box::new(fmt_reload_handle));
|
||||
let subscriber = subscriber.with(fmt_layer.with_filter(fmt_reload_filter));
|
||||
|
||||
#[cfg(feature = "sentry_telemetry")]
|
||||
let subscriber = {
|
||||
let sentry_layer = sentry_tracing::layer();
|
||||
let (sentry_reload_filter, sentry_reload_handle) = reload::Layer::new(filter_layer.clone());
|
||||
reload_handles.push(Box::new(sentry_reload_handle));
|
||||
subscriber.with(sentry_layer.with_filter(sentry_reload_filter))
|
||||
};
|
||||
|
||||
#[cfg(feature = "perf_measurements")]
|
||||
let (subscriber, flame_guard) = {
|
||||
let (flame_layer, flame_guard) = if config.tracing_flame {
|
||||
let flame_filter = match EnvFilter::try_new(&config.tracing_flame_filter) {
|
||||
Ok(flame_filter) => flame_filter,
|
||||
Err(e) => panic!("tracing_flame_filter config value is invalid: {e}"),
|
||||
};
|
||||
|
||||
let (flame_layer, flame_guard) =
|
||||
match tracing_flame::FlameLayer::with_file(&config.tracing_flame_output_path) {
|
||||
Ok(ok) => ok,
|
||||
Err(e) => {
|
||||
panic!("failed to initialize tracing-flame: {e}");
|
||||
},
|
||||
};
|
||||
let flame_layer = flame_layer
|
||||
.with_empty_samples(false)
|
||||
.with_filter(flame_filter);
|
||||
(Some(flame_layer), Some(flame_guard))
|
||||
} else {
|
||||
(None, None)
|
||||
};
|
||||
|
||||
let jaeger_layer = if config.allow_jaeger {
|
||||
opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
|
||||
let tracer = opentelemetry_jaeger::new_agent_pipeline()
|
||||
.with_auto_split_batch(true)
|
||||
.with_service_name("conduwuit")
|
||||
.install_batch(opentelemetry_sdk::runtime::Tokio)
|
||||
.unwrap();
|
||||
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
|
||||
|
||||
let (jaeger_reload_filter, jaeger_reload_handle) = reload::Layer::new(filter_layer);
|
||||
reload_handles.push(Box::new(jaeger_reload_handle));
|
||||
Some(telemetry.with_filter(jaeger_reload_filter))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let subscriber = subscriber.with(flame_layer).with(jaeger_layer);
|
||||
(subscriber, flame_guard)
|
||||
};
|
||||
|
||||
#[cfg(not(feature = "perf_measurements"))]
|
||||
#[cfg_attr(not(feature = "perf_measurements"), allow(clippy::let_unit_value))]
|
||||
let flame_guard = ();
|
||||
|
||||
tracing::subscriber::set_global_default(subscriber).unwrap();
|
||||
|
||||
#[cfg(all(feature = "tokio_console", feature = "release_max_log_level"))]
|
||||
tracing::error!(
|
||||
"'tokio_console' feature and 'release_max_log_level' feature are incompatible, because console-subscriber \
|
||||
needs access to trace-level events. 'release_max_log_level' must be disabled to use tokio-console."
|
||||
);
|
||||
|
||||
(LogLevelReloadHandles::new(reload_handles), flame_guard)
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue