de-global some services in services
Signed-off-by: Jason Volk <jason@zemos.net>
This commit is contained in:
parent
3ccd9ea326
commit
29fc5b9b52
4 changed files with 23 additions and 26 deletions
|
@ -14,7 +14,7 @@ pub struct Data {
|
||||||
servercurrentevent_data: Arc<Map>,
|
servercurrentevent_data: Arc<Map>,
|
||||||
servernameevent_data: Arc<Map>,
|
servernameevent_data: Arc<Map>,
|
||||||
servername_educount: Arc<Map>,
|
servername_educount: Arc<Map>,
|
||||||
_db: Arc<Database>,
|
pub(super) db: Arc<Database>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Data {
|
impl Data {
|
||||||
|
@ -23,7 +23,7 @@ impl Data {
|
||||||
servercurrentevent_data: db["servercurrentevent_data"].clone(),
|
servercurrentevent_data: db["servercurrentevent_data"].clone(),
|
||||||
servernameevent_data: db["servernameevent_data"].clone(),
|
servernameevent_data: db["servernameevent_data"].clone(),
|
||||||
servername_educount: db["servername_educount"].clone(),
|
servername_educount: db["servername_educount"].clone(),
|
||||||
_db: db,
|
db,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,9 +3,9 @@ mod data;
|
||||||
mod send;
|
mod send;
|
||||||
mod sender;
|
mod sender;
|
||||||
|
|
||||||
use std::fmt::Debug;
|
use std::{fmt::Debug, sync::Arc};
|
||||||
|
|
||||||
use conduit::{err, Result};
|
use conduit::{err, Result, Server};
|
||||||
use ruma::{
|
use ruma::{
|
||||||
api::{appservice::Registration, OutgoingRequest},
|
api::{appservice::Registration, OutgoingRequest},
|
||||||
OwnedServerName, OwnedUserId, RoomId, ServerName, UserId,
|
OwnedServerName, OwnedUserId, RoomId, ServerName, UserId,
|
||||||
|
@ -18,12 +18,11 @@ use crate::{server_is_ours, services};
|
||||||
|
|
||||||
pub struct Service {
|
pub struct Service {
|
||||||
pub db: data::Data,
|
pub db: data::Data,
|
||||||
|
server: Arc<Server>,
|
||||||
|
|
||||||
/// The state for a given state hash.
|
/// The state for a given state hash.
|
||||||
sender: loole::Sender<Msg>,
|
sender: loole::Sender<Msg>,
|
||||||
receiver: Mutex<loole::Receiver<Msg>>,
|
receiver: Mutex<loole::Receiver<Msg>>,
|
||||||
startup_netburst: bool,
|
|
||||||
startup_netburst_keep: i64,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||||
|
@ -53,7 +52,7 @@ impl Service {
|
||||||
pub fn send_pdu_push(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> {
|
pub fn send_pdu_push(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> {
|
||||||
let dest = Destination::Push(user.to_owned(), pushkey);
|
let dest = Destination::Push(user.to_owned(), pushkey);
|
||||||
let event = SendingEvent::Pdu(pdu_id.to_owned());
|
let event = SendingEvent::Pdu(pdu_id.to_owned());
|
||||||
let _cork = services().db.cork();
|
let _cork = self.db.db.cork();
|
||||||
let keys = self.db.queue_requests(&[(&dest, event.clone())])?;
|
let keys = self.db.queue_requests(&[(&dest, event.clone())])?;
|
||||||
self.dispatch(Msg {
|
self.dispatch(Msg {
|
||||||
dest,
|
dest,
|
||||||
|
@ -66,7 +65,7 @@ impl Service {
|
||||||
pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec<u8>) -> Result<()> {
|
pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec<u8>) -> Result<()> {
|
||||||
let dest = Destination::Appservice(appservice_id);
|
let dest = Destination::Appservice(appservice_id);
|
||||||
let event = SendingEvent::Pdu(pdu_id);
|
let event = SendingEvent::Pdu(pdu_id);
|
||||||
let _cork = services().db.cork();
|
let _cork = self.db.db.cork();
|
||||||
let keys = self.db.queue_requests(&[(&dest, event.clone())])?;
|
let keys = self.db.queue_requests(&[(&dest, event.clone())])?;
|
||||||
self.dispatch(Msg {
|
self.dispatch(Msg {
|
||||||
dest,
|
dest,
|
||||||
|
@ -93,7 +92,7 @@ impl Service {
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|server| (Destination::Normal(server), SendingEvent::Pdu(pdu_id.to_owned())))
|
.map(|server| (Destination::Normal(server), SendingEvent::Pdu(pdu_id.to_owned())))
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
let _cork = services().db.cork();
|
let _cork = self.db.db.cork();
|
||||||
let keys = self.db.queue_requests(
|
let keys = self.db.queue_requests(
|
||||||
&requests
|
&requests
|
||||||
.iter()
|
.iter()
|
||||||
|
@ -115,7 +114,7 @@ impl Service {
|
||||||
pub fn send_edu_server(&self, server: &ServerName, serialized: Vec<u8>) -> Result<()> {
|
pub fn send_edu_server(&self, server: &ServerName, serialized: Vec<u8>) -> Result<()> {
|
||||||
let dest = Destination::Normal(server.to_owned());
|
let dest = Destination::Normal(server.to_owned());
|
||||||
let event = SendingEvent::Edu(serialized);
|
let event = SendingEvent::Edu(serialized);
|
||||||
let _cork = services().db.cork();
|
let _cork = self.db.db.cork();
|
||||||
let keys = self.db.queue_requests(&[(&dest, event.clone())])?;
|
let keys = self.db.queue_requests(&[(&dest, event.clone())])?;
|
||||||
self.dispatch(Msg {
|
self.dispatch(Msg {
|
||||||
dest,
|
dest,
|
||||||
|
@ -142,7 +141,7 @@ impl Service {
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|server| (Destination::Normal(server), SendingEvent::Edu(serialized.clone())))
|
.map(|server| (Destination::Normal(server), SendingEvent::Edu(serialized.clone())))
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
let _cork = services().db.cork();
|
let _cork = self.db.db.cork();
|
||||||
let keys = self.db.queue_requests(
|
let keys = self.db.queue_requests(
|
||||||
&requests
|
&requests
|
||||||
.iter()
|
.iter()
|
||||||
|
|
|
@ -49,14 +49,12 @@ const CLEANUP_TIMEOUT_MS: u64 = 3500;
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl crate::Service for Service {
|
impl crate::Service for Service {
|
||||||
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
fn build(args: crate::Args<'_>) -> Result<Arc<Self>> {
|
||||||
let config = &args.server.config;
|
|
||||||
let (sender, receiver) = loole::unbounded();
|
let (sender, receiver) = loole::unbounded();
|
||||||
Ok(Arc::new(Self {
|
Ok(Arc::new(Self {
|
||||||
db: Data::new(args.db.clone()),
|
db: Data::new(args.db.clone()),
|
||||||
|
server: args.server.clone(),
|
||||||
sender,
|
sender,
|
||||||
receiver: Mutex::new(receiver),
|
receiver: Mutex::new(receiver),
|
||||||
startup_netburst: config.startup_netburst,
|
|
||||||
startup_netburst_keep: config.startup_netburst_keep,
|
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -119,7 +117,7 @@ impl Service {
|
||||||
fn handle_response_ok(
|
fn handle_response_ok(
|
||||||
&self, dest: &Destination, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus,
|
&self, dest: &Destination, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus,
|
||||||
) {
|
) {
|
||||||
let _cork = services().db.cork();
|
let _cork = self.db.db.cork();
|
||||||
self.db
|
self.db
|
||||||
.delete_all_active_requests_for(dest)
|
.delete_all_active_requests_for(dest)
|
||||||
.expect("all active requests deleted");
|
.expect("all active requests deleted");
|
||||||
|
@ -174,11 +172,11 @@ impl Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn initial_requests(&self, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus) {
|
fn initial_requests(&self, futures: &SendingFutures<'_>, statuses: &mut CurTransactionStatus) {
|
||||||
let keep = usize::try_from(self.startup_netburst_keep).unwrap_or(usize::MAX);
|
let keep = usize::try_from(self.server.config.startup_netburst_keep).unwrap_or(usize::MAX);
|
||||||
let mut txns = HashMap::<Destination, Vec<SendingEvent>>::new();
|
let mut txns = HashMap::<Destination, Vec<SendingEvent>>::new();
|
||||||
for (key, dest, event) in self.db.active_requests().filter_map(Result::ok) {
|
for (key, dest, event) in self.db.active_requests().filter_map(Result::ok) {
|
||||||
let entry = txns.entry(dest.clone()).or_default();
|
let entry = txns.entry(dest.clone()).or_default();
|
||||||
if self.startup_netburst_keep >= 0 && entry.len() >= keep {
|
if self.server.config.startup_netburst_keep >= 0 && entry.len() >= keep {
|
||||||
warn!("Dropping unsent event {:?} {:?}", dest, String::from_utf8_lossy(&key));
|
warn!("Dropping unsent event {:?} {:?}", dest, String::from_utf8_lossy(&key));
|
||||||
self.db
|
self.db
|
||||||
.delete_active_request(&key)
|
.delete_active_request(&key)
|
||||||
|
@ -189,7 +187,7 @@ impl Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (dest, events) in txns {
|
for (dest, events) in txns {
|
||||||
if self.startup_netburst && !events.is_empty() {
|
if self.server.config.startup_netburst && !events.is_empty() {
|
||||||
statuses.insert(dest.clone(), TransactionStatus::Running);
|
statuses.insert(dest.clone(), TransactionStatus::Running);
|
||||||
futures.push(Box::pin(send_events(dest.clone(), events)));
|
futures.push(Box::pin(send_events(dest.clone(), events)));
|
||||||
}
|
}
|
||||||
|
@ -210,7 +208,7 @@ impl Service {
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
let _cork = services().db.cork();
|
let _cork = self.db.db.cork();
|
||||||
let mut events = Vec::new();
|
let mut events = Vec::new();
|
||||||
|
|
||||||
// Must retry any previous transaction for this remote.
|
// Must retry any previous transaction for this remote.
|
||||||
|
@ -224,7 +222,7 @@ impl Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compose the next transaction
|
// Compose the next transaction
|
||||||
let _cork = services().db.cork();
|
let _cork = self.db.db.cork();
|
||||||
if !new_events.is_empty() {
|
if !new_events.is_empty() {
|
||||||
self.db.mark_as_active(&new_events)?;
|
self.db.mark_as_active(&new_events)?;
|
||||||
for (e, _) in new_events {
|
for (e, _) in new_events {
|
||||||
|
@ -251,8 +249,8 @@ impl Service {
|
||||||
.and_modify(|e| match e {
|
.and_modify(|e| match e {
|
||||||
TransactionStatus::Failed(tries, time) => {
|
TransactionStatus::Failed(tries, time) => {
|
||||||
// Fail if a request has failed recently (exponential backoff)
|
// Fail if a request has failed recently (exponential backoff)
|
||||||
let min = services().globals.config.sender_timeout;
|
let min = self.server.config.sender_timeout;
|
||||||
let max = services().globals.config.sender_retry_backoff_limit;
|
let max = self.server.config.sender_retry_backoff_limit;
|
||||||
if continue_exponential_backoff_secs(min, max, time.elapsed(), *tries) {
|
if continue_exponential_backoff_secs(min, max, time.elapsed(), *tries) {
|
||||||
allow = false;
|
allow = false;
|
||||||
} else {
|
} else {
|
||||||
|
@ -288,7 +286,7 @@ impl Service {
|
||||||
.filter(|user_id| user_is_local(user_id)),
|
.filter(|user_id| user_is_local(user_id)),
|
||||||
);
|
);
|
||||||
|
|
||||||
if services().globals.allow_outgoing_read_receipts()
|
if self.server.config.allow_outgoing_read_receipts
|
||||||
&& !select_edus_receipts(&room_id, since, &mut max_edu_count, &mut events)?
|
&& !select_edus_receipts(&room_id, since, &mut max_edu_count, &mut events)?
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
|
@ -311,7 +309,7 @@ impl Service {
|
||||||
events.push(serde_json::to_vec(&edu).expect("json can be serialized"));
|
events.push(serde_json::to_vec(&edu).expect("json can be serialized"));
|
||||||
}
|
}
|
||||||
|
|
||||||
if services().globals.allow_outgoing_presence() {
|
if self.server.config.allow_outgoing_presence {
|
||||||
select_edus_presence(server_name, since, &mut max_edu_count, &mut events)?;
|
select_edus_presence(server_name, since, &mut max_edu_count, &mut events)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -617,7 +615,7 @@ async fn send_events_dest_normal(
|
||||||
&services().client.sender,
|
&services().client.sender,
|
||||||
server,
|
server,
|
||||||
send_transaction_message::v1::Request {
|
send_transaction_message::v1::Request {
|
||||||
origin: services().globals.server_name().to_owned(),
|
origin: services().server.config.server_name.clone(),
|
||||||
pdus: pdu_jsons,
|
pdus: pdu_jsons,
|
||||||
edus: edu_jsons,
|
edus: edu_jsons,
|
||||||
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
|
origin_server_ts: MilliSecondsSinceUnixEpoch::now(),
|
||||||
|
|
|
@ -52,9 +52,9 @@ impl Services {
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
|
globals: build!(globals::Service),
|
||||||
resolver: build!(resolver::Service),
|
resolver: build!(resolver::Service),
|
||||||
client: build!(client::Service),
|
client: build!(client::Service),
|
||||||
globals: build!(globals::Service),
|
|
||||||
rooms: rooms::Service {
|
rooms: rooms::Service {
|
||||||
alias: build!(rooms::alias::Service),
|
alias: build!(rooms::alias::Service),
|
||||||
auth_chain: build!(rooms::auth_chain::Service),
|
auth_chain: build!(rooms::auth_chain::Service),
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue