remove unnecessary cf arc refcnt workaround
log errors and panics propagating through the request task join Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
7ce782ddf4
commit
b4d22bd05e
5 changed files with 53 additions and 28 deletions
|
@ -30,13 +30,13 @@ use crate::{
|
||||||
};
|
};
|
||||||
|
|
||||||
pub struct Engine {
|
pub struct Engine {
|
||||||
|
pub(crate) db: Db,
|
||||||
|
pub(crate) pool: Arc<Pool>,
|
||||||
|
pub(crate) ctx: Arc<Context>,
|
||||||
pub(super) read_only: bool,
|
pub(super) read_only: bool,
|
||||||
pub(super) secondary: bool,
|
pub(super) secondary: bool,
|
||||||
pub(crate) checksums: bool,
|
pub(crate) checksums: bool,
|
||||||
corks: AtomicU32,
|
corks: AtomicU32,
|
||||||
pub(crate) db: Db,
|
|
||||||
pub(crate) pool: Arc<Pool>,
|
|
||||||
pub(crate) ctx: Arc<Context>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) type Db = DBWithThreadMode<MultiThreaded>;
|
pub(crate) type Db = DBWithThreadMode<MultiThreaded>;
|
||||||
|
|
|
@ -56,13 +56,13 @@ pub(crate) async fn open(ctx: Arc<Context>, desc: &[Descriptor]) -> Result<Arc<S
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(Arc::new(Self {
|
Ok(Arc::new(Self {
|
||||||
|
db,
|
||||||
|
pool: ctx.pool.clone(),
|
||||||
|
ctx: ctx.clone(),
|
||||||
read_only: config.rocksdb_read_only,
|
read_only: config.rocksdb_read_only,
|
||||||
secondary: config.rocksdb_secondary,
|
secondary: config.rocksdb_secondary,
|
||||||
checksums: config.rocksdb_checksums,
|
checksums: config.rocksdb_checksums,
|
||||||
corks: AtomicU32::new(0),
|
corks: AtomicU32::new(0),
|
||||||
pool: ctx.pool.clone(),
|
|
||||||
db,
|
|
||||||
ctx,
|
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,24 +44,24 @@ use crate::{watchers::Watchers, Engine};
|
||||||
|
|
||||||
pub struct Map {
|
pub struct Map {
|
||||||
name: &'static str,
|
name: &'static str,
|
||||||
db: Arc<Engine>,
|
|
||||||
cf: Arc<ColumnFamily>,
|
|
||||||
watchers: Watchers,
|
watchers: Watchers,
|
||||||
write_options: WriteOptions,
|
cf: Arc<ColumnFamily>,
|
||||||
|
db: Arc<Engine>,
|
||||||
read_options: ReadOptions,
|
read_options: ReadOptions,
|
||||||
cache_read_options: ReadOptions,
|
cache_read_options: ReadOptions,
|
||||||
|
write_options: WriteOptions,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Map {
|
impl Map {
|
||||||
pub(crate) fn open(db: &Arc<Engine>, name: &'static str) -> Result<Arc<Self>> {
|
pub(crate) fn open(db: &Arc<Engine>, name: &'static str) -> Result<Arc<Self>> {
|
||||||
Ok(Arc::new(Self {
|
Ok(Arc::new(Self {
|
||||||
name,
|
name,
|
||||||
db: db.clone(),
|
|
||||||
cf: open::open(db, name),
|
|
||||||
watchers: Watchers::default(),
|
watchers: Watchers::default(),
|
||||||
write_options: write_options_default(db),
|
cf: open::open(db, name),
|
||||||
|
db: db.clone(),
|
||||||
read_options: read_options_default(db),
|
read_options: read_options_default(db),
|
||||||
cache_read_options: cache_read_options_default(db),
|
cache_read_options: cache_read_options_default(db),
|
||||||
|
write_options: write_options_default(db),
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,8 +30,5 @@ pub(super) fn open(db: &Arc<Engine>, name: &str) -> Arc<ColumnFamily> {
|
||||||
// lifetime parameter. We should not hold this handle, even in its Arc, after
|
// lifetime parameter. We should not hold this handle, even in its Arc, after
|
||||||
// closing the database (dropping `Engine`). Since `Arc<Engine>` is a sibling
|
// closing the database (dropping `Engine`). Since `Arc<Engine>` is a sibling
|
||||||
// member along with this handle in `Map`, that is prevented.
|
// member along with this handle in `Map`, that is prevented.
|
||||||
unsafe {
|
unsafe { Arc::from_raw(cf_ptr) }
|
||||||
Arc::increment_strong_count(cf_ptr);
|
|
||||||
Arc::from_raw(cf_ptr)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,7 @@
|
||||||
use std::sync::{atomic::Ordering, Arc};
|
use std::{
|
||||||
|
fmt::Debug,
|
||||||
|
sync::{atomic::Ordering, Arc},
|
||||||
|
};
|
||||||
|
|
||||||
use axum::{
|
use axum::{
|
||||||
extract::State,
|
extract::State,
|
||||||
|
@ -12,16 +15,16 @@ use http::{Method, StatusCode, Uri};
|
||||||
level = "debug",
|
level = "debug",
|
||||||
skip_all,
|
skip_all,
|
||||||
fields(
|
fields(
|
||||||
handled = %services
|
|
||||||
.server
|
|
||||||
.metrics
|
|
||||||
.requests_handle_finished
|
|
||||||
.fetch_add(1, Ordering::Relaxed),
|
|
||||||
active = %services
|
active = %services
|
||||||
.server
|
.server
|
||||||
.metrics
|
.metrics
|
||||||
.requests_handle_active
|
.requests_handle_active
|
||||||
.fetch_add(1, Ordering::Relaxed),
|
.fetch_add(1, Ordering::Relaxed),
|
||||||
|
handled = %services
|
||||||
|
.server
|
||||||
|
.metrics
|
||||||
|
.requests_handle_finished
|
||||||
|
.load(Ordering::Relaxed),
|
||||||
)
|
)
|
||||||
)]
|
)]
|
||||||
pub(crate) async fn handle(
|
pub(crate) async fn handle(
|
||||||
|
@ -31,6 +34,10 @@ pub(crate) async fn handle(
|
||||||
) -> Result<Response, StatusCode> {
|
) -> Result<Response, StatusCode> {
|
||||||
#[cfg(debug_assertions)]
|
#[cfg(debug_assertions)]
|
||||||
conduwuit::defer! {{
|
conduwuit::defer! {{
|
||||||
|
_ = services.server
|
||||||
|
.metrics
|
||||||
|
.requests_handle_finished
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
_ = services.server
|
_ = services.server
|
||||||
.metrics
|
.metrics
|
||||||
.requests_handle_active
|
.requests_handle_active
|
||||||
|
@ -47,21 +54,35 @@ pub(crate) async fn handle(
|
||||||
return Err(StatusCode::SERVICE_UNAVAILABLE);
|
return Err(StatusCode::SERVICE_UNAVAILABLE);
|
||||||
}
|
}
|
||||||
|
|
||||||
let method = req.method().clone();
|
|
||||||
let uri = req.uri().clone();
|
let uri = req.uri().clone();
|
||||||
services
|
let method = req.method().clone();
|
||||||
|
let services_ = services.clone();
|
||||||
|
let task = services
|
||||||
.server
|
.server
|
||||||
.runtime()
|
.runtime()
|
||||||
.spawn(next.run(req))
|
.spawn(async move { execute(services_, req, next).await });
|
||||||
.await
|
|
||||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
|
task.await
|
||||||
.and_then(|result| handle_result(&method, &uri, result))
|
.map_err(unhandled)
|
||||||
|
.and_then(move |result| handle_result(&method, &uri, result))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn execute(
|
||||||
|
// we made a safety contract that Services will not go out of scope
|
||||||
|
// during the request; this ensures a reference is accounted for at
|
||||||
|
// the base frame of the task regardless of its detachment.
|
||||||
|
_services: Arc<Services>,
|
||||||
|
req: http::Request<axum::body::Body>,
|
||||||
|
next: axum::middleware::Next,
|
||||||
|
) -> Response {
|
||||||
|
next.run(req).await
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_result(method: &Method, uri: &Uri, result: Response) -> Result<Response, StatusCode> {
|
fn handle_result(method: &Method, uri: &Uri, result: Response) -> Result<Response, StatusCode> {
|
||||||
let status = result.status();
|
let status = result.status();
|
||||||
let reason = status.canonical_reason().unwrap_or("Unknown Reason");
|
let reason = status.canonical_reason().unwrap_or("Unknown Reason");
|
||||||
let code = status.as_u16();
|
let code = status.as_u16();
|
||||||
|
|
||||||
if status.is_server_error() {
|
if status.is_server_error() {
|
||||||
error!(method = ?method, uri = ?uri, "{code} {reason}");
|
error!(method = ?method, uri = ?uri, "{code} {reason}");
|
||||||
} else if status.is_client_error() {
|
} else if status.is_client_error() {
|
||||||
|
@ -78,3 +99,10 @@ fn handle_result(method: &Method, uri: &Uri, result: Response) -> Result<Respons
|
||||||
|
|
||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cold]
|
||||||
|
fn unhandled<Error: Debug>(e: Error) -> StatusCode {
|
||||||
|
error!("unhandled error or panic during request: {e:?}");
|
||||||
|
|
||||||
|
StatusCode::INTERNAL_SERVER_ERROR
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue