de-global services

Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
Jason Volk 2024-07-27 07:17:07 +00:00
parent 7e50db4193
commit 2f85a5c1ac
36 changed files with 327 additions and 336 deletions

View file

@ -6,6 +6,7 @@ use axum::{
};
use axum_client_ip::SecureClientIpSource;
use conduit::{error, Result, Server};
use conduit_service::Services;
use http::{
header::{self, HeaderName},
HeaderValue, Method, StatusCode,
@ -34,7 +35,8 @@ const CONDUWUIT_CSP: &[&str] = &[
const CONDUWUIT_PERMISSIONS_POLICY: &[&str] = &["interest-cohort=()", "browsing-topics=()"];
pub(crate) fn build(server: &Arc<Server>) -> Result<Router> {
pub(crate) fn build(services: &Arc<Services>) -> Result<Router> {
let server = &services.server;
let layers = ServiceBuilder::new();
#[cfg(feature = "sentry_telemetry")]
@ -83,7 +85,7 @@ pub(crate) fn build(server: &Arc<Server>) -> Result<Router> {
.layer(body_limit_layer(server))
.layer(CatchPanicLayer::custom(catch_panic));
Ok(router::build(server).layer(layers))
Ok(router::build(services).layer(layers))
}
#[cfg(any(feature = "zstd_compression", feature = "gzip_compression", feature = "brotli_compression"))]
@ -151,12 +153,14 @@ fn body_limit_layer(server: &Server) -> DefaultBodyLimit { DefaultBodyLimit::max
#[allow(clippy::needless_pass_by_value)]
#[tracing::instrument(skip_all, name = "panic")]
fn catch_panic(err: Box<dyn Any + Send + 'static>) -> http::Response<http_body_util::Full<bytes::Bytes>> {
conduit_service::services()
.server
.metrics
.requests_panic
.fetch_add(1, std::sync::atomic::Ordering::Release);
//TODO: XXX
/*
conduit_service::services()
.server
.metrics
.requests_panic
.fetch_add(1, std::sync::atomic::Ordering::Release);
*/
let details = if let Some(s) = err.downcast_ref::<String>() {
s.clone()
} else if let Some(s) = err.downcast_ref::<&str>() {

View file

@ -11,22 +11,23 @@ extern crate conduit_core as conduit;
use std::{future::Future, pin::Pin, sync::Arc};
use conduit::{Result, Server};
use conduit_service::Services;
conduit::mod_ctor! {}
conduit::mod_dtor! {}
conduit::rustc_flags_capture! {}
#[no_mangle]
pub extern "Rust" fn start(server: &Arc<Server>) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
pub extern "Rust" fn start(server: &Arc<Server>) -> Pin<Box<dyn Future<Output = Result<Arc<Services>>> + Send>> {
Box::pin(run::start(server.clone()))
}
#[no_mangle]
pub extern "Rust" fn stop(server: &Arc<Server>) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
Box::pin(run::stop(server.clone()))
pub extern "Rust" fn stop(services: Arc<Services>) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
Box::pin(run::stop(services))
}
#[no_mangle]
pub extern "Rust" fn run(server: &Arc<Server>) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
Box::pin(run::run(server.clone()))
pub extern "Rust" fn run(services: &Arc<Services>) -> Pin<Box<dyn Future<Output = Result<()>> + Send>> {
Box::pin(run::run(services.clone()))
}

View file

@ -1,20 +1,20 @@
use std::sync::Arc;
use axum::{response::IntoResponse, routing::get, Router};
use conduit::{Error, Server};
use conduit::Error;
use conduit_api::State;
use conduit_service::Services;
use http::{StatusCode, Uri};
use ruma::api::client::error::ErrorKind;
extern crate conduit_api as api;
extern crate conduit_service as service;
pub(crate) fn build(services: &Arc<Services>) -> Router {
let router = Router::<State>::new();
let state = services.clone();
pub(crate) fn build(server: &Arc<Server>) -> Router {
let router = Router::<api::State>::new();
api::router::build(router, server)
conduit_api::router::build(router, &services.server)
.route("/", get(it_works))
.fallback(not_found)
.with_state(service::services())
.with_state(state)
}
async fn not_found(_uri: Uri) -> impl IntoResponse {

View file

@ -1,28 +1,30 @@
use std::{sync::Arc, time::Duration};
extern crate conduit_admin as admin;
extern crate conduit_core as conduit;
extern crate conduit_service as service;
use std::{
sync::{atomic::Ordering, Arc},
time::Duration,
};
use axum_server::Handle as ServerHandle;
use conduit::{debug, debug_error, debug_info, error, info, Error, Result, Server};
use service::Services;
use tokio::{
sync::broadcast::{self, Sender},
task::JoinHandle,
};
extern crate conduit_admin as admin;
extern crate conduit_core as conduit;
extern crate conduit_service as service;
use std::sync::atomic::Ordering;
use conduit::{debug, debug_info, error, info, Error, Result, Server};
use crate::serve;
/// Main loop base
#[tracing::instrument(skip_all)]
pub(crate) async fn run(server: Arc<Server>) -> Result<()> {
pub(crate) async fn run(services: Arc<Services>) -> Result<()> {
let server = &services.server;
debug!("Start");
// Install the admin room callback here for now
admin::init().await;
admin::init(&services.admin).await;
// Setup shutdown/signal handling
let handle = ServerHandle::new();
@ -33,13 +35,13 @@ pub(crate) async fn run(server: Arc<Server>) -> Result<()> {
let mut listener = server
.runtime()
.spawn(serve::serve(server.clone(), handle.clone(), tx.subscribe()));
.spawn(serve::serve(services.clone(), handle.clone(), tx.subscribe()));
// Focal point
debug!("Running");
let res = tokio::select! {
res = &mut listener => res.map_err(Error::from).unwrap_or_else(Err),
res = service::services().poll() => handle_services_poll(&server, res, listener).await,
res = services.poll() => handle_services_poll(server, res, listener).await,
};
// Join the signal handler before we leave.
@ -47,7 +49,7 @@ pub(crate) async fn run(server: Arc<Server>) -> Result<()> {
_ = sigs.await;
// Remove the admin room callback
admin::fini().await;
admin::fini(&services.admin).await;
debug_info!("Finish");
res
@ -55,26 +57,33 @@ pub(crate) async fn run(server: Arc<Server>) -> Result<()> {
/// Async initializations
#[tracing::instrument(skip_all)]
pub(crate) async fn start(server: Arc<Server>) -> Result<()> {
pub(crate) async fn start(server: Arc<Server>) -> Result<Arc<Services>> {
debug!("Starting...");
service::start(&server).await?;
let services = Services::build(server).await?.start().await?;
#[cfg(feature = "systemd")]
sd_notify::notify(true, &[sd_notify::NotifyState::Ready]).expect("failed to notify systemd of ready state");
debug!("Started");
Ok(())
Ok(services)
}
/// Async destructions
#[tracing::instrument(skip_all)]
pub(crate) async fn stop(_server: Arc<Server>) -> Result<()> {
pub(crate) async fn stop(services: Arc<Services>) -> Result<()> {
debug!("Shutting down...");
// Wait for all completions before dropping or we'll lose them to the module
// unload and explode.
service::stop().await;
services.stop().await;
if let Err(services) = Arc::try_unwrap(services) {
debug_error!(
"{} dangling references to Services after shutdown",
Arc::strong_count(&services)
);
}
debug!("Cleaning up...");

View file

@ -5,22 +5,26 @@ mod unix;
use std::sync::Arc;
use axum_server::Handle as ServerHandle;
use conduit::{Result, Server};
use conduit::Result;
use conduit_service::Services;
use tokio::sync::broadcast;
use crate::layers;
use super::layers;
/// Serve clients
pub(super) async fn serve(server: Arc<Server>, handle: ServerHandle, shutdown: broadcast::Receiver<()>) -> Result<()> {
pub(super) async fn serve(
services: Arc<Services>, handle: ServerHandle, shutdown: broadcast::Receiver<()>,
) -> Result<()> {
let server = &services.server;
let config = &server.config;
let addrs = config.get_bind_addrs();
let app = layers::build(&server)?;
let app = layers::build(&services)?;
if cfg!(unix) && config.unix_socket_path.is_some() {
unix::serve(&server, app, shutdown).await
unix::serve(server, app, shutdown).await
} else if config.tls.is_some() {
tls::serve(&server, app, handle, addrs).await
tls::serve(server, app, handle, addrs).await
} else {
plain::serve(&server, app, handle, addrs).await
plain::serve(server, app, handle, addrs).await
}
}