diff --git a/Cargo.lock b/Cargo.lock index 9c8596aa..08d8b167 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -375,6 +375,8 @@ dependencies = [ "heed", "hmac", "http", + "hyper", + "hyperlocal", "image", "jsonwebtoken", "lazy_static", @@ -1031,6 +1033,12 @@ dependencies = [ "libc", ] +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "hmac" version = "0.12.1" @@ -1093,9 +1101,9 @@ checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" [[package]] name = "hyper" -version = "0.14.26" +version = "0.14.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab302d72a6f11a3b910431ff93aae7e773078c769f0a3ef15fb9ec692ed147d4" +checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468" dependencies = [ "bytes", "futures-channel", @@ -1128,6 +1136,17 @@ dependencies = [ "tokio-rustls 0.23.4", ] +[[package]] +name = "hyperlocal" +version = "0.8.0" +source = "git+https://github.com/softprops/hyperlocal?rev=2ee4d149644600d326559af0d2b235c945b05c04#2ee4d149644600d326559af0d2b235c945b05c04" +dependencies = [ + "hex", + "hyper", + "pin-project-lite", + "tokio", +] + [[package]] name = "idna" version = "0.2.3" diff --git a/Cargo.toml b/Cargo.toml index ff1785e4..a163eda2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,10 @@ ruma = { git = "https://github.com/ruma/ruma", rev = "b4853aa8fa5e3a24e3689fc880 #ruma = { path = "../ruma/crates/ruma", features = ["compat", "rand", "appservice-api-c", "client-api", "federation-api", "push-gateway-api-c", "state-res", "unstable-msc2448", "unstable-msc3575", "unstable-exhaustive-types", "ring-compat", "unstable-unspecified" ] } # Async runtime and utilities +hyperlocal = { git = "https://github.com/softprops/hyperlocal", rev = "2ee4d149644600d326559af0d2b235c945b05c04", features = [ + "server", +] } +hyper = { version = "0.14", features = ["server", "http1"] } tokio = { version = "1.28.1", features = ["fs", "macros", "signal", "sync"] } # Used for storing data permanently #sled = { version = "0.34.7", features = ["compression", "no_metrics"], optional = true } diff --git a/DEPLOY.md b/DEPLOY.md index cb318eee..74d4bbaa 100644 --- a/DEPLOY.md +++ b/DEPLOY.md @@ -118,6 +118,8 @@ After=network.target Environment="CONDUIT_CONFIG=/etc/matrix-conduit/conduit.toml" User=conduit Group=conduit +RuntimeDirectory=conduit +RuntimeDirectoryMode=0750 Restart=always ExecStart=/usr/local/bin/matrix-conduit @@ -223,9 +225,15 @@ Listen 8448 ServerName your.server.name # EDIT THIS AllowEncodedSlashes NoDecode + +# TCP ProxyPass /_matrix/ http://127.0.0.1:6167/_matrix/ timeout=300 nocanon ProxyPassReverse /_matrix/ http://127.0.0.1:6167/_matrix/ +# UNIX socket +#ProxyPass /_matrix/ unix:/run/conduit/conduit.sock|http://127.0.0.1:6167/_matrix/ nocanon +#ProxyPassReverse /_matrix/ unix:/run/conduit/conduit.sock|http://127.0.0.1:6167/_matrix/ + ``` @@ -245,7 +253,11 @@ Create `/etc/caddy/conf.d/conduit_caddyfile` and enter this (substitute for your ```caddy your.server.name, your.server.name:8448 { + # TCP reverse_proxy /_matrix/* 127.0.0.1:6167 + + # UNIX socket + #reverse_proxy /_matrix/* unix//run/conduit/conduit.sock } ``` @@ -272,8 +284,18 @@ server { # Increase this to allow posting large files such as videos client_max_body_size 20M; + # UNIX socket + #upstream backend { + # server unix:/run/conduit/conduit.sock; + #} + location /_matrix/ { + # TCP proxy_pass http://127.0.0.1:6167$request_uri; + + # UNIX socket + #proxy_pass http://backend; + proxy_set_header Host $http_host; proxy_buffering off; proxy_read_timeout 5m; diff --git a/conduit-example.toml b/conduit-example.toml index 836db654..3747258a 100644 --- a/conduit-example.toml +++ b/conduit-example.toml @@ -55,3 +55,10 @@ trusted_servers = ["matrix.org"] address = "127.0.0.1" # This makes sure Conduit can only be reached using the reverse proxy #address = "0.0.0.0" # If Conduit is running in a container, make sure the reverse proxy (ie. Traefik) can reach it. + +# Uncomment unix_socket_path to listen on a UNIX socket at the specified path. +# If listening on a UNIX socket, you must remove the 'address' key if defined and add your +# reverse proxy (nginx/Caddy/Apache/etc) to the 'conduit' group, unless world RW +# permissions are specified with unix_socket_perms (666 minimum). +#unix_socket_path = "/run/conduit/conduit.sock" +#unix_socket_perms = 660 \ No newline at end of file diff --git a/src/config/mod.rs b/src/config/mod.rs index a4d7cca4..e8ff4504 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -2,11 +2,13 @@ use std::{ collections::BTreeMap, fmt, net::{IpAddr, Ipv4Addr}, + path::PathBuf, }; +use figment::Figment; use ruma::{OwnedServerName, RoomVersionId}; use serde::{de::IgnoredAny, Deserialize}; -use tracing::warn; +use tracing::{error, warn}; mod proxy; @@ -19,7 +21,9 @@ pub struct Config { #[serde(default = "default_port")] pub port: u16, pub tls: Option, - + pub unix_socket_path: Option, + #[serde(default = "default_unix_socket_perms")] + pub unix_socket_perms: u32, pub server_name: OwnedServerName, #[serde(default = "default_database_backend")] pub database_backend: String, @@ -102,7 +106,7 @@ impl Config { .keys() .filter(|key| DEPRECATED_KEYS.iter().any(|s| s == key)) { - warn!("Config parameter {} is deprecated", key); + warn!("Config parameter \"{}\" is deprecated.", key); was_deprecated = true; } @@ -110,6 +114,19 @@ impl Config { warn!("Read conduit documentation and check your configuration if any new configuration parameters should be adjusted"); } } + + /// Checks the presence of the `address` and `unix_socket_path` keys in the raw_config, exiting the process if both keys were detected. + pub fn error_dual_listening(&self, raw_config: Figment) -> Result<(), ()> { + let check_address = raw_config.find_value("address"); + let check_unix_socket = raw_config.find_value("unix_socket_path"); + + if check_address.is_ok() && check_unix_socket.is_ok() { + error!("TOML keys \"address\" and \"unix_socket_path\" were both defined. Please specify only one option."); + return Err(()); + } + + Ok(()) + } } impl fmt::Display for Config { @@ -223,6 +240,10 @@ fn default_port() -> u16 { 8000 } +fn default_unix_socket_perms() -> u32 { + 660 +} + fn default_database_backend() -> String { "sqlite".to_owned() } diff --git a/src/database/mod.rs b/src/database/mod.rs index e247d9f0..ebe944c3 100644 --- a/src/database/mod.rs +++ b/src/database/mod.rs @@ -1071,33 +1071,46 @@ impl KeyValueDatabase { let timer_interval = Duration::from_secs(services().globals.config.cleanup_second_interval as u64); + fn perform_cleanup() { + let start = Instant::now(); + if let Err(e) = services().globals.cleanup() { + error!(target: "database-cleanup", "Ran into an error during cleanup: {}", e); + } else { + debug!(target: "database-cleanup", "Finished cleanup in {:#?}.", start.elapsed()); + } + } + tokio::spawn(async move { let mut i = interval(timer_interval); #[cfg(unix)] - let mut s = signal(SignalKind::hangup()).unwrap(); + let mut hangup = signal(SignalKind::hangup()).unwrap(); + let mut ctrl_c = signal(SignalKind::interrupt()).unwrap(); + let mut terminate = signal(SignalKind::terminate()).unwrap(); loop { #[cfg(unix)] tokio::select! { _ = i.tick() => { - debug!("cleanup: Timer ticked"); + debug!(target: "database-cleanup", "Timer ticked"); } - _ = s.recv() => { - debug!("cleanup: Received SIGHUP"); + _ = hangup.recv() => { + debug!(target: "database-cleanup","Received SIGHUP"); + } + _ = ctrl_c.recv() => { + debug!(target: "database-cleanup", "Received Ctrl+C, performing last cleanup"); + perform_cleanup(); + } + _ = terminate.recv() => { + debug!(target: "database-cleanup","Received SIGTERM, performing last cleanup"); + perform_cleanup(); } }; #[cfg(not(unix))] { i.tick().await; - debug!("cleanup: Timer ticked") - } - - let start = Instant::now(); - if let Err(e) = services().globals.cleanup() { - error!("cleanup: Errored: {}", e); - } else { - debug!("cleanup: Finished in {:?}", start.elapsed()); + debug!(target: "database-cleanup", "Timer ticked") } + perform_cleanup(); } }); } diff --git a/src/main.rs b/src/main.rs index c74d6ddb..06a5a3ca 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,10 @@ #![allow(clippy::suspicious_else_formatting)] #![deny(clippy::dbg_macro)] -use std::{future::Future, io, net::SocketAddr, sync::atomic, time::Duration}; +use std::{ + fs::Permissions, future::Future, io, net::SocketAddr, os::unix::fs::PermissionsExt, + sync::atomic, time::Duration, +}; use axum::{ extract::{DefaultBodyLimit, FromRequestParts, MatchedPath}, @@ -26,6 +29,8 @@ use http::{ header::{self, HeaderName}, Method, StatusCode, Uri, }; +use hyper::Server; +use hyperlocal::SocketIncoming; use ruma::api::{ client::{ error::{Error as RumaError, ErrorBody, ErrorKind}, @@ -33,7 +38,7 @@ use ruma::api::{ }, IncomingRequest, }; -use tokio::signal; +use tokio::{net::UnixListener, signal, sync::oneshot}; use tower::ServiceBuilder; use tower_http::{ cors::{self, CorsLayer}, @@ -43,6 +48,8 @@ use tower_http::{ use tracing::{debug, error, info, warn}; use tracing_subscriber::{prelude::*, EnvFilter}; +use tokio::sync::oneshot::Sender; + pub use conduit::*; // Re-export everything from the library crate #[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))] @@ -69,12 +76,10 @@ async fn main() { Ok(s) => s, Err(e) => { eprintln!("It looks like your config is invalid. The following error occurred: {e}"); - std::process::exit(1); + return; } }; - config.warn_deprecated(); - let log = format!("{},ruma_state_res=error,_=off,sled=off", config.log); if config.allow_jaeger { @@ -135,11 +140,15 @@ async fn main() { #[cfg(unix)] maximize_fd_limit().expect("should be able to increase the soft limit to the hard limit"); + config.warn_deprecated(); + if let Err(_) = config.error_dual_listening(raw_config) { + return; + }; + info!("Loading database"); if let Err(error) = KeyValueDatabase::load_or_create(config).await { error!(?error, "The database couldn't be loaded or created"); - - std::process::exit(1); + return; }; let config = &services().globals.config; @@ -200,26 +209,57 @@ async fn run_server() -> io::Result<()> { let app = routes().layer(middlewares).into_make_service(); let handle = ServerHandle::new(); + let (tx, rx) = oneshot::channel::<()>(); - tokio::spawn(shutdown_signal(handle.clone())); + tokio::spawn(shutdown_signal(handle.clone(), tx)); - match &config.tls { - Some(tls) => { - let conf = RustlsConfig::from_pem_file(&tls.certs, &tls.key).await?; - let server = bind_rustls(addr, conf).handle(handle).serve(app); - - #[cfg(feature = "systemd")] - let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Ready]); - - server.await? + if let Some(path) = &config.unix_socket_path { + if path.exists() { + warn!( + "UNIX socket path {:#?} already exists (unclean shutdown?), attempting to remove it.", + path.display() + ); + tokio::fs::remove_file(&path).await?; } - None => { - let server = bind(addr).handle(handle).serve(app); - #[cfg(feature = "systemd")] - let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Ready]); + tokio::fs::create_dir_all(path.parent().unwrap()).await?; - server.await? + let socket_perms = config.unix_socket_perms.to_string(); + let octal_perms = u32::from_str_radix(&socket_perms, 8).unwrap(); + + let listener = UnixListener::bind(path.clone()).unwrap(); + tokio::fs::set_permissions(path, Permissions::from_mode(octal_perms)) + .await + .unwrap(); + let socket = SocketIncoming::from_listener(listener); + + #[cfg(feature = "systemd")] + let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Ready]); + let server = Server::builder(socket).serve(app); + let graceful = server.with_graceful_shutdown(async { + rx.await.ok(); + }); + + if let Err(e) = graceful.await { + error!("Server error: {:?}", e); + } + } else { + match &config.tls { + Some(tls) => { + let conf = RustlsConfig::from_pem_file(&tls.certs, &tls.key).await?; + let server = bind_rustls(addr, conf).handle(handle).serve(app); + + #[cfg(feature = "systemd")] + let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Ready]); + server.await? + } + None => { + let server = bind(addr).handle(handle).serve(app); + + #[cfg(feature = "systemd")] + let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Ready]); + server.await? + } } } @@ -439,7 +479,7 @@ fn routes() -> Router { .fallback(not_found) } -async fn shutdown_signal(handle: ServerHandle) { +async fn shutdown_signal(handle: ServerHandle, tx: Sender<()>) -> Result<()> { let ctrl_c = async { signal::ctrl_c() .await @@ -471,6 +511,9 @@ async fn shutdown_signal(handle: ServerHandle) { #[cfg(feature = "systemd")] let _ = sd_notify::notify(true, &[sd_notify::NotifyState::Stopping]); + tx.send(()).unwrap(); + + Ok(()) } async fn not_found(uri: Uri) -> impl IntoResponse { diff --git a/src/service/globals/mod.rs b/src/service/globals/mod.rs index 9bce8a2c..b5037d08 100644 --- a/src/service/globals/mod.rs +++ b/src/service/globals/mod.rs @@ -379,9 +379,26 @@ impl Service { &self.config.well_known_client } + pub fn unix_socket_path(&self) -> &Option { + &self.config.unix_socket_path + } + pub fn shutdown(&self) { self.shutdown.store(true, atomic::Ordering::Relaxed); // On shutdown + + if self.unix_socket_path().is_some() { + match &self.unix_socket_path() { + Some(path) => { + std::fs::remove_file(path.to_owned()).unwrap(); + } + None => error!( + "Unable to remove socket file at {:?} during shutdown.", + &self.unix_socket_path() + ), + }; + }; + info!(target: "shutdown-sync", "Received shutdown notification, notifying sync helpers..."); services().globals.rotate.fire(); }