add optional support for tokio-console
This turned out to be quite hairy, mostly because we need to apply the config's log level filter to the actual logs (stdout and, optionally sentry), but do not want to filter out the tokio tracing events needed by the console_subscriber. I hit several edge cases in tracing getting this to work, and we now depend on a git version of tracing with a backported patch :(
This commit is contained in:
parent
341bafb91e
commit
8a5599adf9
8 changed files with 488 additions and 105 deletions
|
@ -40,8 +40,8 @@ use tokio::time::{interval, Instant};
|
|||
use tracing::{debug, error, warn};
|
||||
|
||||
use crate::{
|
||||
database::migrations::migrations, service::rooms::timeline::PduCount, services, Config, Error, Result, Services,
|
||||
SERVICES,
|
||||
database::migrations::migrations, service::rooms::timeline::PduCount, services, Config, Error,
|
||||
LogLevelReloadHandles, Result, Services, SERVICES,
|
||||
};
|
||||
|
||||
pub(crate) struct KeyValueDatabase {
|
||||
|
@ -203,13 +203,7 @@ struct CheckForUpdatesResponse {
|
|||
impl KeyValueDatabase {
|
||||
/// Load an existing database or create a new one.
|
||||
#[allow(clippy::too_many_lines)]
|
||||
pub(crate) async fn load_or_create(
|
||||
config: Config,
|
||||
tracing_reload_handler: tracing_subscriber::reload::Handle<
|
||||
tracing_subscriber::EnvFilter,
|
||||
tracing_subscriber::Registry,
|
||||
>,
|
||||
) -> Result<()> {
|
||||
pub(crate) async fn load_or_create(config: Config, tracing_reload_handler: LogLevelReloadHandles) -> Result<()> {
|
||||
Self::check_db_setup(&config)?;
|
||||
|
||||
if !Path::new(&config.database_path).exists() {
|
||||
|
|
105
src/main.rs
105
src/main.rs
|
@ -6,7 +6,7 @@ use std::os::unix::fs::PermissionsExt as _; /* not unix specific, just only for
|
|||
// Not async due to services() being used in many closures, and async closures
|
||||
// are not stable as of writing This is the case for every other occurence of
|
||||
// sync Mutex/RwLock, except for database related ones
|
||||
use std::sync::RwLock;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::{any::Any, io, net::SocketAddr, sync::atomic, time::Duration};
|
||||
|
||||
use api::ruma_wrapper::{Ruma, RumaResponse};
|
||||
|
@ -79,7 +79,7 @@ struct Server {
|
|||
|
||||
runtime: tokio::runtime::Runtime,
|
||||
|
||||
tracing_reload_handle: reload::Handle<EnvFilter, Registry>,
|
||||
tracing_reload_handle: LogLevelReloadHandles,
|
||||
|
||||
#[cfg(feature = "sentry_telemetry")]
|
||||
_sentry_guard: Option<sentry::ClientInitGuard>,
|
||||
|
@ -547,7 +547,56 @@ fn init_sentry(config: &Config) -> sentry::ClientInitGuard {
|
|||
))
|
||||
}
|
||||
|
||||
fn init_tracing_sub(config: &Config) -> reload::Handle<EnvFilter, Registry> {
|
||||
// We need to store a reload::Handle value, but can't name it's type explicitly
|
||||
// because the S type parameter depends on the subscriber's previous layers. In
|
||||
// our case, this includes unnameable 'impl Trait' types.
|
||||
//
|
||||
// This is fixed[1] in the unreleased tracing-subscriber from the master branch,
|
||||
// which removes the S parameter. Unfortunately can't use it without pulling in
|
||||
// a version of tracing that's incompatible with the rest of our deps.
|
||||
//
|
||||
// To work around this, we define an trait without the S paramter that forwards
|
||||
// to the reload::Handle::reload method, and then store the handle as a trait
|
||||
// object.
|
||||
//
|
||||
// [1]: https://github.com/tokio-rs/tracing/pull/1035/commits/8a87ea52425098d3ef8f56d92358c2f6c144a28f
|
||||
trait ReloadHandle<L> {
|
||||
fn reload(&self, new_value: L) -> Result<(), reload::Error>;
|
||||
}
|
||||
|
||||
impl<L, S> ReloadHandle<L> for reload::Handle<L, S> {
|
||||
fn reload(&self, new_value: L) -> Result<(), reload::Error> { reload::Handle::reload(self, new_value) }
|
||||
}
|
||||
|
||||
struct LogLevelReloadHandlesInner {
|
||||
handles: Vec<Box<dyn ReloadHandle<EnvFilter> + Send + Sync>>,
|
||||
}
|
||||
|
||||
/// Wrapper to allow reloading the filter on several several
|
||||
/// [`tracing_subscriber::reload::Handle`]s at once, with the same value.
|
||||
#[derive(Clone)]
|
||||
struct LogLevelReloadHandles {
|
||||
inner: Arc<LogLevelReloadHandlesInner>,
|
||||
}
|
||||
|
||||
impl LogLevelReloadHandles {
|
||||
fn new(handles: Vec<Box<dyn ReloadHandle<EnvFilter> + Send + Sync>>) -> LogLevelReloadHandles {
|
||||
LogLevelReloadHandles {
|
||||
inner: Arc::new(LogLevelReloadHandlesInner {
|
||||
handles,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn reload(&self, new_value: &EnvFilter) -> Result<(), reload::Error> {
|
||||
for handle in &self.inner.handles {
|
||||
handle.reload(new_value.clone())?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn init_tracing_sub(config: &Config) -> LogLevelReloadHandles {
|
||||
let registry = Registry::default();
|
||||
let fmt_layer = tracing_subscriber::fmt::Layer::new();
|
||||
let filter_layer = match EnvFilter::try_new(&config.log) {
|
||||
|
@ -558,35 +607,40 @@ fn init_tracing_sub(config: &Config) -> reload::Handle<EnvFilter, Registry> {
|
|||
},
|
||||
};
|
||||
|
||||
let (reload_filter, reload_handle) = reload::Layer::new(filter_layer);
|
||||
let mut reload_handles = Vec::<Box<dyn ReloadHandle<EnvFilter> + Send + Sync>>::new();
|
||||
let subscriber = registry;
|
||||
|
||||
#[cfg(feature = "sentry_telemetry")]
|
||||
let sentry_layer = sentry_tracing::layer();
|
||||
|
||||
let subscriber;
|
||||
|
||||
#[allow(clippy::unnecessary_operation)] // error[E0658]: attributes on expressions are experimental
|
||||
#[cfg(feature = "sentry_telemetry")]
|
||||
{
|
||||
subscriber = registry
|
||||
.with(reload_filter)
|
||||
.with(fmt_layer)
|
||||
.with(sentry_layer);
|
||||
#[cfg(feature = "tokio_console")]
|
||||
let subscriber = {
|
||||
let console_layer = console_subscriber::spawn();
|
||||
subscriber.with(console_layer)
|
||||
};
|
||||
|
||||
#[allow(clippy::unnecessary_operation)] // error[E0658]: attributes on expressions are experimental
|
||||
#[cfg(not(feature = "sentry_telemetry"))]
|
||||
{
|
||||
subscriber = registry.with(reload_filter).with(fmt_layer);
|
||||
let (fmt_reload_filter, fmt_reload_handle) = reload::Layer::new(filter_layer.clone());
|
||||
reload_handles.push(Box::new(fmt_reload_handle));
|
||||
let subscriber = subscriber.with(fmt_layer.with_filter(fmt_reload_filter));
|
||||
|
||||
#[cfg(feature = "sentry_telemetry")]
|
||||
let subscriber = {
|
||||
let sentry_layer = sentry_tracing::layer();
|
||||
let (sentry_reload_filter, sentry_reload_handle) = reload::Layer::new(filter_layer);
|
||||
reload_handles.push(Box::new(sentry_reload_handle));
|
||||
subscriber.with(sentry_layer.with_filter(sentry_reload_filter))
|
||||
};
|
||||
|
||||
tracing::subscriber::set_global_default(subscriber).unwrap();
|
||||
|
||||
reload_handle
|
||||
#[cfg(all(feature = "tokio_console", feature = "release_max_log_level"))]
|
||||
error!(
|
||||
"'tokio_console' feature and 'release_max_log_level' feature are incompatible, because console-subscriber \
|
||||
needs access to trace-level events. 'release_max_log_level' must be disabled to use tokio-console."
|
||||
);
|
||||
|
||||
LogLevelReloadHandles::new(reload_handles)
|
||||
}
|
||||
|
||||
#[cfg(feature = "perf_measurements")]
|
||||
fn init_tracing_jaeger(config: &Config) -> reload::Handle<EnvFilter, Registry> {
|
||||
fn init_tracing_jaeger(config: &Config) -> LogLevelReloadHandles {
|
||||
opentelemetry::global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
|
||||
let tracer = opentelemetry_jaeger::new_agent_pipeline()
|
||||
.with_auto_split_batch(true)
|
||||
|
@ -609,11 +663,12 @@ fn init_tracing_jaeger(config: &Config) -> reload::Handle<EnvFilter, Registry> {
|
|||
|
||||
tracing::subscriber::set_global_default(subscriber).unwrap();
|
||||
|
||||
reload_handle
|
||||
LogLevelReloadHandles::new(vec![Box::new(reload_handle)])
|
||||
}
|
||||
|
||||
// TODO: tokio-console here?
|
||||
#[cfg(feature = "perf_measurements")]
|
||||
fn init_tracing_flame(_config: &Config) -> reload::Handle<EnvFilter, Registry> {
|
||||
fn init_tracing_flame(_config: &Config) -> LogLevelReloadHandles {
|
||||
let registry = Registry::default();
|
||||
let (flame_layer, _guard) = tracing_flame::FlameLayer::with_file("./tracing.folded").unwrap();
|
||||
let flame_layer = flame_layer.with_empty_samples(false);
|
||||
|
@ -626,7 +681,7 @@ fn init_tracing_flame(_config: &Config) -> reload::Handle<EnvFilter, Registry> {
|
|||
|
||||
tracing::subscriber::set_global_default(subscriber).unwrap();
|
||||
|
||||
reload_handle
|
||||
LogLevelReloadHandles::new(vec![Box::new(reload_handle)])
|
||||
}
|
||||
|
||||
// This is needed for opening lots of file descriptors, which tends to
|
||||
|
|
|
@ -331,7 +331,7 @@ pub(crate) async fn change_log_level(
|
|||
match services()
|
||||
.globals
|
||||
.tracing_reload_handle
|
||||
.modify(|filter| *filter = old_filter_layer)
|
||||
.reload(&old_filter_layer)
|
||||
{
|
||||
Ok(()) => {
|
||||
return Ok(RoomMessageEventContent::text_plain(format!(
|
||||
|
@ -360,7 +360,7 @@ pub(crate) async fn change_log_level(
|
|||
match services()
|
||||
.globals
|
||||
.tracing_reload_handle
|
||||
.modify(|filter| *filter = new_filter_layer)
|
||||
.reload(&new_filter_layer)
|
||||
{
|
||||
Ok(()) => {
|
||||
return Ok(RoomMessageEventContent::text_plain("Successfully changed log level"));
|
||||
|
|
|
@ -27,10 +27,9 @@ use ruma::{
|
|||
};
|
||||
use tokio::sync::{broadcast, watch::Receiver, Mutex, RwLock};
|
||||
use tracing::{error, info, trace};
|
||||
use tracing_subscriber::{EnvFilter, Registry};
|
||||
use url::Url;
|
||||
|
||||
use crate::{services, Config, Result};
|
||||
use crate::{services, Config, LogLevelReloadHandles, Result};
|
||||
|
||||
mod client;
|
||||
mod data;
|
||||
|
@ -45,7 +44,7 @@ type SyncHandle = (
|
|||
pub(crate) struct Service<'a> {
|
||||
pub(crate) db: &'static dyn Data,
|
||||
|
||||
pub(crate) tracing_reload_handle: tracing_subscriber::reload::Handle<EnvFilter, Registry>,
|
||||
pub(crate) tracing_reload_handle: LogLevelReloadHandles,
|
||||
pub(crate) config: Config,
|
||||
pub(crate) cidr_range_denylist: Vec<IPAddress>,
|
||||
keypair: Arc<ruma::signatures::Ed25519KeyPair>,
|
||||
|
@ -99,8 +98,7 @@ impl Default for RotationHandler {
|
|||
|
||||
impl Service<'_> {
|
||||
pub(crate) fn load(
|
||||
db: &'static dyn Data, config: &Config,
|
||||
tracing_reload_handle: tracing_subscriber::reload::Handle<EnvFilter, Registry>,
|
||||
db: &'static dyn Data, config: &Config, tracing_reload_handle: LogLevelReloadHandles,
|
||||
) -> Result<Self> {
|
||||
let keypair = db.load_keypair();
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@ use std::{
|
|||
use lru_cache::LruCache;
|
||||
use tokio::sync::{broadcast, Mutex, RwLock};
|
||||
|
||||
use crate::{Config, Result};
|
||||
use crate::{Config, LogLevelReloadHandles, Result};
|
||||
|
||||
pub(crate) mod account_data;
|
||||
pub(crate) mod admin;
|
||||
|
@ -55,11 +55,7 @@ impl Services<'_> {
|
|||
+ sending::Data
|
||||
+ 'static,
|
||||
>(
|
||||
db: &'static D, config: &Config,
|
||||
tracing_reload_handle: tracing_subscriber::reload::Handle<
|
||||
tracing_subscriber::EnvFilter,
|
||||
tracing_subscriber::Registry,
|
||||
>,
|
||||
db: &'static D, config: &Config, tracing_reload_handle: LogLevelReloadHandles,
|
||||
) -> Result<Self> {
|
||||
Ok(Self {
|
||||
appservice: appservice::Service::build(db)?,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue