Compare commits
10 commits
alpine-pac
...
federation
Author | SHA1 | Date | |
---|---|---|---|
|
d5a9c98657 | ||
|
395b466b4a | ||
|
0376b58006 | ||
|
78c1e2f427 | ||
|
6614b8f6bf | ||
|
c2fa8e6f8d | ||
|
b8108f5897 | ||
|
cf8358cbe6 | ||
|
7ecc570bb8 | ||
|
002799177d |
9 changed files with 480 additions and 394 deletions
212
.github/workflows/ci.yml
vendored
212
.github/workflows/ci.yml
vendored
|
@ -26,11 +26,9 @@ permissions:
|
||||||
contents: read
|
contents: read
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
ci:
|
setup:
|
||||||
name: CI and Artifacts
|
name: CI Setup
|
||||||
|
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- name: Sync repository
|
- name: Sync repository
|
||||||
uses: actions/checkout@v4
|
uses: actions/checkout@v4
|
||||||
|
@ -94,194 +92,78 @@ jobs:
|
||||||
./bin/nix-build-and-cache .#devShells.x86_64-linux.default.inputDerivation
|
./bin/nix-build-and-cache .#devShells.x86_64-linux.default.inputDerivation
|
||||||
|
|
||||||
|
|
||||||
|
build-and-test:
|
||||||
|
name: CI and Artifacts
|
||||||
|
needs: setup
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
strategy:
|
||||||
|
matrix:
|
||||||
|
target: [
|
||||||
|
"static-x86_64-unknown-linux-musl",
|
||||||
|
"static-x86_64-unknown-linux-musl-jemalloc",
|
||||||
|
"static-x86_64-unknown-linux-musl-hmalloc",
|
||||||
|
"static-aarch64-unknown-linux-musl",
|
||||||
|
"static-aarch64-unknown-linux-musl-jemalloc",
|
||||||
|
"static-aarch64-unknown-linux-musl-hmalloc",
|
||||||
|
]
|
||||||
|
oci-target: [
|
||||||
|
"x86_64-unknown-linux-gnu",
|
||||||
|
"x86_64-unknown-linux-musl",
|
||||||
|
"x86_64-unknown-linux-musl-jemalloc",
|
||||||
|
"x86_64-unknown-linux-musl-hmalloc",
|
||||||
|
"aarch64-unknown-linux-musl",
|
||||||
|
"aarch64-unknown-linux-musl-jemalloc",
|
||||||
|
"aarch64-unknown-linux-musl-hmalloc",
|
||||||
|
]
|
||||||
|
|
||||||
|
steps:
|
||||||
- name: Perform continuous integration
|
- name: Perform continuous integration
|
||||||
run: direnv exec . engage
|
run: direnv exec . engage
|
||||||
|
|
||||||
|
|
||||||
- name: Build static-x86_64-unknown-linux-musl and Create static deb-x86_64-unknown-linux-musl
|
- name: Build static artifacts
|
||||||
run: |
|
run: |
|
||||||
./bin/nix-build-and-cache .#static-x86_64-unknown-linux-musl
|
./bin/nix-build-and-cache .#${{ matrix.target }}
|
||||||
mkdir -p target/release
|
mkdir -p target/release
|
||||||
cp -v -f result/bin/conduit target/release
|
cp -v -f result/bin/conduit target/release
|
||||||
direnv exec . cargo deb --no-build
|
direnv exec . cargo deb --no-build --output target/debian/${{ matrix.target }}.deb
|
||||||
|
|
||||||
- name: Upload artifact static-x86_64-unknown-linux-musl
|
- name: Upload static artifacts
|
||||||
uses: actions/upload-artifact@v4
|
uses: actions/upload-artifact@v4
|
||||||
with:
|
with:
|
||||||
name: static-x86_64-unknown-linux-musl
|
name: ${{ matrix.target }}
|
||||||
path: result/bin/conduit
|
path: result/bin/conduit
|
||||||
if-no-files-found: error
|
if-no-files-found: error
|
||||||
|
|
||||||
- name: Upload artifact deb-x86_64-unknown-linux-musl
|
- name: Upload static deb artifacts
|
||||||
uses: actions/upload-artifact@v4
|
uses: actions/upload-artifact@v4
|
||||||
with:
|
with:
|
||||||
name: x86_64-unknown-linux-musl.deb
|
name: ${{ matrix.target }}.deb
|
||||||
path: target/debian/*.deb
|
path: target/debian/${{ matrix.target }}.deb
|
||||||
if-no-files-found: error
|
if-no-files-found: error
|
||||||
|
|
||||||
- name: Build static-x86_64-unknown-linux-musl-jemalloc and Create static deb-x86_64-unknown-linux-musl-jemalloc
|
|
||||||
|
- name: Build OCI images
|
||||||
run: |
|
run: |
|
||||||
./bin/nix-build-and-cache .#static-x86_64-unknown-linux-musl-jemalloc
|
./bin/nix-build-and-cache .#oci-image-${{ matrix.oci-target }}
|
||||||
mkdir -p target/release
|
cp -v -f result oci-image-${{ matrix.oci-target }}.tar.gz
|
||||||
cp -v -f result/bin/conduit target/release
|
|
||||||
direnv exec . cargo deb --no-build
|
|
||||||
|
|
||||||
- name: Upload artifact static-x86_64-unknown-linux-musl-jemalloc
|
- name: Upload OCI image artifacts
|
||||||
uses: actions/upload-artifact@v4
|
uses: actions/upload-artifact@v4
|
||||||
with:
|
with:
|
||||||
name: static-x86_64-unknown-linux-musl-jemalloc
|
name: oci-image-${{ matrix.oci-target }}
|
||||||
path: result/bin/conduit
|
path: oci-image-${{ matrix.oci-target }}.tar.gz
|
||||||
if-no-files-found: error
|
|
||||||
|
|
||||||
- name: Upload artifact deb-x86_64-unknown-linux-musl-jemalloc
|
|
||||||
uses: actions/upload-artifact@v4
|
|
||||||
with:
|
|
||||||
name: x86_64-unknown-linux-musl-jemalloc.deb
|
|
||||||
path: target/debian/*.deb
|
|
||||||
if-no-files-found: error
|
|
||||||
|
|
||||||
- name: Build static-x86_64-unknown-linux-musl-hmalloc and Create static deb-x86_64-unknown-linux-musl-hmalloc
|
|
||||||
run: |
|
|
||||||
./bin/nix-build-and-cache .#static-x86_64-unknown-linux-musl-hmalloc
|
|
||||||
mkdir -p target/release
|
|
||||||
cp -v -f result/bin/conduit target/release
|
|
||||||
direnv exec . cargo deb --no-build
|
|
||||||
|
|
||||||
- name: Upload artifact static-x86_64-unknown-linux-musl-hmalloc
|
|
||||||
uses: actions/upload-artifact@v4
|
|
||||||
with:
|
|
||||||
name: static-x86_64-unknown-linux-musl-hmalloc
|
|
||||||
path: result/bin/conduit
|
|
||||||
if-no-files-found: error
|
|
||||||
|
|
||||||
- name: Upload artifact deb-x86_64-unknown-linux-musl-hmalloc
|
|
||||||
uses: actions/upload-artifact@v4
|
|
||||||
with:
|
|
||||||
name: x86_64-unknown-linux-musl-hmalloc.deb
|
|
||||||
path: target/debian/*.deb
|
|
||||||
if-no-files-found: error
|
|
||||||
|
|
||||||
|
|
||||||
- name: Build static-aarch64-unknown-linux-musl
|
|
||||||
run: |
|
|
||||||
./bin/nix-build-and-cache .#static-aarch64-unknown-linux-musl
|
|
||||||
|
|
||||||
- name: Upload artifact static-aarch64-unknown-linux-musl
|
|
||||||
uses: actions/upload-artifact@v4
|
|
||||||
with:
|
|
||||||
name: static-aarch64-unknown-linux-musl
|
|
||||||
path: result/bin/conduit
|
|
||||||
if-no-files-found: error
|
|
||||||
|
|
||||||
- name: Build static-aarch64-unknown-linux-musl-jemalloc
|
|
||||||
run: |
|
|
||||||
./bin/nix-build-and-cache .#static-aarch64-unknown-linux-musl-jemalloc
|
|
||||||
|
|
||||||
- name: Upload artifact static-aarch64-unknown-linux-musl-jemalloc
|
|
||||||
uses: actions/upload-artifact@v4
|
|
||||||
with:
|
|
||||||
name: static-aarch64-unknown-linux-musl-jemalloc
|
|
||||||
path: result/bin/conduit
|
|
||||||
if-no-files-found: error
|
|
||||||
|
|
||||||
- name: Build static-aarch64-unknown-linux-musl-hmalloc
|
|
||||||
run: |
|
|
||||||
./bin/nix-build-and-cache .#static-aarch64-unknown-linux-musl-hmalloc
|
|
||||||
|
|
||||||
- name: Upload artifact static-aarch64-unknown-linux-musl-hmalloc
|
|
||||||
uses: actions/upload-artifact@v4
|
|
||||||
with:
|
|
||||||
name: static-aarch64-unknown-linux-musl-hmalloc
|
|
||||||
path: result/bin/conduit
|
|
||||||
if-no-files-found: error
|
|
||||||
|
|
||||||
|
|
||||||
- name: Build oci-image-x86_64-unknown-linux-gnu
|
|
||||||
run: |
|
|
||||||
./bin/nix-build-and-cache .#oci-image
|
|
||||||
cp -v -f result oci-image-amd64.tar.gz
|
|
||||||
|
|
||||||
- name: Upload artifact oci-image-x86_64-unknown-linux-gnu
|
|
||||||
uses: actions/upload-artifact@v4
|
|
||||||
with:
|
|
||||||
name: oci-image-x86_64-unknown-linux-gnu
|
|
||||||
path: oci-image-amd64.tar.gz
|
|
||||||
if-no-files-found: error
|
|
||||||
# don't compress again
|
|
||||||
compression-level: 0
|
|
||||||
|
|
||||||
- name: Build oci-image-x86_64-unknown-linux-gnu-jemalloc
|
|
||||||
run: |
|
|
||||||
./bin/nix-build-and-cache .#oci-image-jemalloc
|
|
||||||
cp -v -f result oci-image-amd64.tar.gz
|
|
||||||
|
|
||||||
- name: Upload artifact oci-image-x86_64-unknown-linux-gnu-jemalloc
|
|
||||||
uses: actions/upload-artifact@v4
|
|
||||||
with:
|
|
||||||
name: oci-image-x86_64-unknown-linux-gnu-jemalloc
|
|
||||||
path: oci-image-amd64.tar.gz
|
|
||||||
if-no-files-found: error
|
|
||||||
# don't compress again
|
|
||||||
compression-level: 0
|
|
||||||
|
|
||||||
- name: Build oci-image-x86_64-unknown-linux-gnu-hmalloc
|
|
||||||
run: |
|
|
||||||
./bin/nix-build-and-cache .#oci-image-hmalloc
|
|
||||||
cp -v -f result oci-image-amd64.tar.gz
|
|
||||||
|
|
||||||
- name: Upload artifact oci-image-x86_64-unknown-linux-gnu-hmalloc
|
|
||||||
uses: actions/upload-artifact@v4
|
|
||||||
with:
|
|
||||||
name: oci-image-x86_64-unknown-linux-gnu-hmalloc
|
|
||||||
path: oci-image-amd64.tar.gz
|
|
||||||
if-no-files-found: error
|
if-no-files-found: error
|
||||||
# don't compress again
|
# don't compress again
|
||||||
compression-level: 0
|
compression-level: 0
|
||||||
|
|
||||||
|
|
||||||
- name: Build oci-image-aarch64-unknown-linux-musl
|
|
||||||
run: |
|
|
||||||
./bin/nix-build-and-cache .#oci-image-aarch64-unknown-linux-musl
|
|
||||||
cp -v -f result oci-image-arm64v8.tar.gz
|
|
||||||
|
|
||||||
- name: Upload artifact oci-image-aarch64-unknown-linux-musl
|
|
||||||
uses: actions/upload-artifact@v4
|
|
||||||
with:
|
|
||||||
name: oci-image-aarch64-unknown-linux-musl
|
|
||||||
path: oci-image-arm64v8.tar.gz
|
|
||||||
if-no-files-found: error
|
|
||||||
# don't compress again
|
|
||||||
compression-level: 0
|
|
||||||
|
|
||||||
- name: Build oci-image-aarch64-unknown-linux-musl-jemalloc
|
|
||||||
run: |
|
|
||||||
./bin/nix-build-and-cache .#oci-image-aarch64-unknown-linux-musl-jemalloc
|
|
||||||
cp -v -f result oci-image-arm64v8.tar.gz
|
|
||||||
|
|
||||||
- name: Upload artifact oci-image-aarch64-unknown-linux-musl-jemalloc
|
|
||||||
uses: actions/upload-artifact@v4
|
|
||||||
with:
|
|
||||||
name: oci-image-aarch64-unknown-linux-musl-jemalloc
|
|
||||||
path: oci-image-arm64v8.tar.gz
|
|
||||||
if-no-files-found: error
|
|
||||||
# don't compress again
|
|
||||||
compression-level: 0
|
|
||||||
|
|
||||||
- name: Build oci-image-aarch64-unknown-linux-musl-hmalloc
|
|
||||||
run: |
|
|
||||||
./bin/nix-build-and-cache .#oci-image-aarch64-unknown-linux-musl-hmalloc
|
|
||||||
cp -v -f result oci-image-arm64v8.tar.gz
|
|
||||||
|
|
||||||
- name: Upload artifact oci-image-aarch64-unknown-linux-musl-hmalloc
|
|
||||||
uses: actions/upload-artifact@v4
|
|
||||||
with:
|
|
||||||
name: oci-image-aarch64-unknown-linux-musl-hmalloc
|
|
||||||
path: oci-image-arm64v8.tar.gz
|
|
||||||
if-no-files-found: error
|
|
||||||
# don't compress again
|
|
||||||
compression-level: 0
|
|
||||||
|
|
||||||
|
|
||||||
|
publish:
|
||||||
|
needs: build-and-test
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
- name: Extract metadata for Dockerhub
|
- name: Extract metadata for Dockerhub
|
||||||
env:
|
env:
|
||||||
REGISTRY: registry.hub.docker.com
|
REGISTRY: registry.hub.docker.com
|
||||||
|
@ -378,7 +260,7 @@ jobs:
|
||||||
docker manifest create $IMAGE_NAME:$GITHUB_REF_NAME --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
|
docker manifest create $IMAGE_NAME:$GITHUB_REF_NAME --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
|
||||||
docker manifest push $IMAGE_NAME:$GITHUB_REF_NAME
|
docker manifest push $IMAGE_NAME:$GITHUB_REF_NAME
|
||||||
# Tag "main" as latest (stable branch)
|
# Tag "main" as latest (stable branch)
|
||||||
if [[ -n "$GITHUB_REF_NAME" = "main" ]]; then
|
if [[ "$GITHUB_REF_NAME" = "main" ]]; then
|
||||||
docker manifest create $IMAGE_NAME:latest --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
|
docker manifest create $IMAGE_NAME:latest --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
|
||||||
docker manifest push $IMAGE_NAME:latest
|
docker manifest push $IMAGE_NAME:latest
|
||||||
fi
|
fi
|
||||||
|
|
35
Cargo.lock
generated
35
Cargo.lock
generated
|
@ -111,6 +111,17 @@ dependencies = [
|
||||||
"zstd-safe",
|
"zstd-safe",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "async-recursion"
|
||||||
|
version = "1.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "30c5ef0ede93efbf733c1a727f3b6b5a1060bbedd5600183e66f6e4be4af0ec5"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn 2.0.58",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "async-trait"
|
name = "async-trait"
|
||||||
version = "0.1.80"
|
version = "0.1.80"
|
||||||
|
@ -1045,9 +1056,9 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hickory-proto"
|
name = "hickory-proto"
|
||||||
version = "0.24.0"
|
version = "0.24.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "git+https://github.com/hickory-dns/hickory-dns?rev=94ac564c3f677e038f7255ddb762e9301d0f2c5d#94ac564c3f677e038f7255ddb762e9301d0f2c5d"
|
||||||
checksum = "091a6fbccf4860009355e3efc52ff4acf37a63489aad7435372d44ceeb6fbbcf"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"async-recursion",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
"data-encoding",
|
"data-encoding",
|
||||||
|
@ -1055,7 +1066,7 @@ dependencies = [
|
||||||
"futures-channel",
|
"futures-channel",
|
||||||
"futures-io",
|
"futures-io",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"idna 0.4.0",
|
"idna",
|
||||||
"ipnet",
|
"ipnet",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"rand",
|
"rand",
|
||||||
|
@ -1069,8 +1080,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hickory-resolver"
|
name = "hickory-resolver"
|
||||||
version = "0.24.0"
|
version = "0.24.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "git+https://github.com/hickory-dns/hickory-dns?rev=94ac564c3f677e038f7255ddb762e9301d0f2c5d#94ac564c3f677e038f7255ddb762e9301d0f2c5d"
|
||||||
checksum = "35b8f021164e6a984c9030023544c57789c51760065cd510572fedcfb04164e8"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
|
@ -1311,16 +1321,6 @@ dependencies = [
|
||||||
"tokio",
|
"tokio",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "idna"
|
|
||||||
version = "0.4.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c"
|
|
||||||
dependencies = [
|
|
||||||
"unicode-bidi",
|
|
||||||
"unicode-normalization",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "idna"
|
name = "idna"
|
||||||
version = "0.5.0"
|
version = "0.5.0"
|
||||||
|
@ -2285,8 +2285,7 @@ checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56"
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "reqwest"
|
name = "reqwest"
|
||||||
version = "0.11.27"
|
version = "0.11.27"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "git+https://github.com/girlbossceo/reqwest?rev=319335e000fdea2e3d01f44245c8a21864d0c1c3#319335e000fdea2e3d01f44245c8a21864d0c1c3"
|
||||||
checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-compression",
|
"async-compression",
|
||||||
"base64 0.21.7",
|
"base64 0.21.7",
|
||||||
|
@ -3763,7 +3762,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633"
|
checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"form_urlencoded",
|
"form_urlencoded",
|
||||||
"idna 0.5.0",
|
"idna",
|
||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
"serde",
|
"serde",
|
||||||
]
|
]
|
||||||
|
|
13
Cargo.toml
13
Cargo.toml
|
@ -29,9 +29,6 @@ base64 = "0.22.0"
|
||||||
# Used when hashing the state
|
# Used when hashing the state
|
||||||
ring = "0.17.8"
|
ring = "0.17.8"
|
||||||
|
|
||||||
# Used when querying the SRV record of other servers
|
|
||||||
hickory-resolver = "0.24.0"
|
|
||||||
|
|
||||||
# Used to find matching events for appservices
|
# Used to find matching events for appservices
|
||||||
regex = "1.10.4"
|
regex = "1.10.4"
|
||||||
|
|
||||||
|
@ -107,9 +104,11 @@ version = "0.14"
|
||||||
features = ["server", "http1", "http2"]
|
features = ["server", "http1", "http2"]
|
||||||
|
|
||||||
[dependencies.reqwest]
|
[dependencies.reqwest]
|
||||||
version = "0.11.27"
|
#version = "0.11.27"
|
||||||
|
git = "https://github.com/girlbossceo/reqwest"
|
||||||
|
rev = "319335e000fdea2e3d01f44245c8a21864d0c1c3"
|
||||||
default-features = false
|
default-features = false
|
||||||
features = ["rustls-tls-native-roots", "socks", "trust-dns"]
|
features = ["rustls-tls-native-roots", "socks", "hickory-dns"]
|
||||||
|
|
||||||
# all the serde stuff
|
# all the serde stuff
|
||||||
# Used for pdu definition
|
# Used for pdu definition
|
||||||
|
@ -272,6 +271,10 @@ features = [
|
||||||
"unstable-extensible-events",
|
"unstable-extensible-events",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[dependencies.hickory-resolver]
|
||||||
|
git = "https://github.com/hickory-dns/hickory-dns"
|
||||||
|
rev = "94ac564c3f677e038f7255ddb762e9301d0f2c5d"
|
||||||
|
|
||||||
[dependencies.rust-rocksdb]
|
[dependencies.rust-rocksdb]
|
||||||
git = "https://github.com/zaidoon1/rust-rocksdb"
|
git = "https://github.com/zaidoon1/rust-rocksdb"
|
||||||
branch = "master"
|
branch = "master"
|
||||||
|
|
|
@ -477,19 +477,19 @@ allow_profile_lookup_federation_requests = true
|
||||||
|
|
||||||
# Minimum time-to-live in seconds for entries in the DNS cache. The default may appear high to most
|
# Minimum time-to-live in seconds for entries in the DNS cache. The default may appear high to most
|
||||||
# administrators; this is by design. Only decrease this if you are using an external DNS cache.
|
# administrators; this is by design. Only decrease this if you are using an external DNS cache.
|
||||||
#dns_min_ttl = 60 * 90
|
#dns_min_ttl = 10800
|
||||||
|
|
||||||
# Minimum time-to-live in seconds for NXDOMAIN entries in the DNS cache. This value is critical for
|
# Minimum time-to-live in seconds for NXDOMAIN entries in the DNS cache. This value is critical for
|
||||||
# the server to federate efficiently. NXDOMAIN's are assumed to not be returning to the federation
|
# the server to federate efficiently. NXDOMAIN's are assumed to not be returning to the federation
|
||||||
# and aggressively cached rather than constantly rechecked.
|
# and aggressively cached rather than constantly rechecked.
|
||||||
#dns_min_ttl_nxdomain = 60 * 60 * 24 * 3
|
#dns_min_ttl_nxdomain = 86400
|
||||||
|
|
||||||
# The number of seconds to wait for a reply to a DNS query. Please note that recursive queries can
|
# The number of seconds to wait for a reply to a DNS query. Please note that recursive queries can
|
||||||
# take up to several seconds for some domains, so this value should not be too low.
|
# take up to several seconds for some domains, so this value should not be too low.
|
||||||
#dns_timeout = 5
|
#dns_timeout = 10
|
||||||
|
|
||||||
# Number of retries after a timeout.
|
# Number of retries after a timeout.
|
||||||
#dns_attempts = 5
|
#dns_attempts = 10
|
||||||
|
|
||||||
# Fallback to TCP on DNS errors. Set this to false if unsupported by nameserver.
|
# Fallback to TCP on DNS errors. Set this to false if unsupported by nameserver.
|
||||||
#dns_tcp_fallback = true
|
#dns_tcp_fallback = true
|
||||||
|
@ -498,7 +498,7 @@ allow_profile_lookup_federation_requests = true
|
||||||
# This can avoid useless DNS queries if the first nameserver responds with NXDOMAIN or an empty NOERROR response.
|
# This can avoid useless DNS queries if the first nameserver responds with NXDOMAIN or an empty NOERROR response.
|
||||||
#
|
#
|
||||||
# The default is to query one nameserver and stop (false).
|
# The default is to query one nameserver and stop (false).
|
||||||
#query_all_nameservers = false
|
#query_all_nameservers = true
|
||||||
|
|
||||||
|
|
||||||
### Request Timeouts, Connection Timeouts, and Connection Pooling
|
### Request Timeouts, Connection Timeouts, and Connection Pooling
|
||||||
|
|
|
@ -1554,7 +1554,8 @@ pub async fn create_invite_route(body: Ruma<create_invite::v2::Request>) -> Resu
|
||||||
.contains(&server.to_owned())
|
.contains(&server.to_owned())
|
||||||
{
|
{
|
||||||
warn!(
|
warn!(
|
||||||
"Received federated/remote invite from banned server {sender_servername} for room ID {}. Rejecting.",
|
"Received federated/remote invite from server {sender_servername} for room ID {} which has a banned \
|
||||||
|
server name. Rejecting.",
|
||||||
body.room_id
|
body.room_id
|
||||||
);
|
);
|
||||||
return Err(Error::BadRequest(
|
return Err(Error::BadRequest(
|
||||||
|
|
|
@ -100,7 +100,7 @@ pub struct Config {
|
||||||
pub dns_timeout: u64,
|
pub dns_timeout: u64,
|
||||||
#[serde(default = "true_fn")]
|
#[serde(default = "true_fn")]
|
||||||
pub dns_tcp_fallback: bool,
|
pub dns_tcp_fallback: bool,
|
||||||
#[serde(default)]
|
#[serde(default = "true_fn")]
|
||||||
pub query_all_nameservers: bool,
|
pub query_all_nameservers: bool,
|
||||||
#[serde(default = "default_max_request_size")]
|
#[serde(default = "default_max_request_size")]
|
||||||
pub max_request_size: u32,
|
pub max_request_size: u32,
|
||||||
|
@ -851,13 +851,13 @@ fn default_cleanup_second_interval() -> u32 {
|
||||||
|
|
||||||
fn default_dns_cache_entries() -> u32 { 12288 }
|
fn default_dns_cache_entries() -> u32 { 12288 }
|
||||||
|
|
||||||
fn default_dns_min_ttl() -> u64 { 60 * 90 }
|
fn default_dns_min_ttl() -> u64 { 60 * 180 }
|
||||||
|
|
||||||
fn default_dns_min_ttl_nxdomain() -> u64 { 60 * 60 * 24 * 3 }
|
fn default_dns_min_ttl_nxdomain() -> u64 { 60 * 60 * 24 }
|
||||||
|
|
||||||
fn default_dns_attempts() -> u16 { 5 }
|
fn default_dns_attempts() -> u16 { 10 }
|
||||||
|
|
||||||
fn default_dns_timeout() -> u64 { 5 }
|
fn default_dns_timeout() -> u64 { 10 }
|
||||||
|
|
||||||
fn default_max_request_size() -> u32 {
|
fn default_max_request_size() -> u32 {
|
||||||
20 * 1024 * 1024 // Default to 20 MB
|
20 * 1024 * 1024 // Default to 20 MB
|
||||||
|
|
|
@ -4,7 +4,7 @@ use crate::{
|
||||||
database::KeyValueDatabase,
|
database::KeyValueDatabase,
|
||||||
service::{
|
service::{
|
||||||
self,
|
self,
|
||||||
sending::{OutgoingKind, SendingEventType},
|
sending::{OutgoingDestination, SendingEventType},
|
||||||
},
|
},
|
||||||
services, utils, Error, Result,
|
services, utils, Error, Result,
|
||||||
};
|
};
|
||||||
|
@ -12,7 +12,7 @@ use crate::{
|
||||||
impl service::sending::Data for KeyValueDatabase {
|
impl service::sending::Data for KeyValueDatabase {
|
||||||
fn active_requests<'a>(
|
fn active_requests<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingKind, SendingEventType)>> + 'a> {
|
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingDestination, SendingEventType)>> + 'a> {
|
||||||
Box::new(
|
Box::new(
|
||||||
self.servercurrentevent_data
|
self.servercurrentevent_data
|
||||||
.iter()
|
.iter()
|
||||||
|
@ -21,7 +21,7 @@ impl service::sending::Data for KeyValueDatabase {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn active_requests_for<'a>(
|
fn active_requests_for<'a>(
|
||||||
&'a self, outgoing_kind: &OutgoingKind,
|
&'a self, outgoing_kind: &OutgoingDestination,
|
||||||
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a> {
|
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a> {
|
||||||
let prefix = outgoing_kind.get_prefix();
|
let prefix = outgoing_kind.get_prefix();
|
||||||
Box::new(
|
Box::new(
|
||||||
|
@ -33,7 +33,7 @@ impl service::sending::Data for KeyValueDatabase {
|
||||||
|
|
||||||
fn delete_active_request(&self, key: Vec<u8>) -> Result<()> { self.servercurrentevent_data.remove(&key) }
|
fn delete_active_request(&self, key: Vec<u8>) -> Result<()> { self.servercurrentevent_data.remove(&key) }
|
||||||
|
|
||||||
fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()> {
|
fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()> {
|
||||||
let prefix = outgoing_kind.get_prefix();
|
let prefix = outgoing_kind.get_prefix();
|
||||||
for (key, _) in self.servercurrentevent_data.scan_prefix(prefix) {
|
for (key, _) in self.servercurrentevent_data.scan_prefix(prefix) {
|
||||||
self.servercurrentevent_data.remove(&key)?;
|
self.servercurrentevent_data.remove(&key)?;
|
||||||
|
@ -42,7 +42,7 @@ impl service::sending::Data for KeyValueDatabase {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn delete_all_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()> {
|
fn delete_all_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()> {
|
||||||
let prefix = outgoing_kind.get_prefix();
|
let prefix = outgoing_kind.get_prefix();
|
||||||
for (key, _) in self.servercurrentevent_data.scan_prefix(prefix.clone()) {
|
for (key, _) in self.servercurrentevent_data.scan_prefix(prefix.clone()) {
|
||||||
self.servercurrentevent_data.remove(&key).unwrap();
|
self.servercurrentevent_data.remove(&key).unwrap();
|
||||||
|
@ -55,7 +55,7 @@ impl service::sending::Data for KeyValueDatabase {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn queue_requests(&self, requests: &[(&OutgoingKind, SendingEventType)]) -> Result<Vec<Vec<u8>>> {
|
fn queue_requests(&self, requests: &[(&OutgoingDestination, SendingEventType)]) -> Result<Vec<Vec<u8>>> {
|
||||||
let mut batch = Vec::new();
|
let mut batch = Vec::new();
|
||||||
let mut keys = Vec::new();
|
let mut keys = Vec::new();
|
||||||
for (outgoing_kind, event) in requests {
|
for (outgoing_kind, event) in requests {
|
||||||
|
@ -79,7 +79,7 @@ impl service::sending::Data for KeyValueDatabase {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn queued_requests<'a>(
|
fn queued_requests<'a>(
|
||||||
&'a self, outgoing_kind: &OutgoingKind,
|
&'a self, outgoing_kind: &OutgoingDestination,
|
||||||
) -> Box<dyn Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a> {
|
) -> Box<dyn Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a> {
|
||||||
let prefix = outgoing_kind.get_prefix();
|
let prefix = outgoing_kind.get_prefix();
|
||||||
return Box::new(
|
return Box::new(
|
||||||
|
@ -122,7 +122,7 @@ impl service::sending::Data for KeyValueDatabase {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(key))]
|
#[tracing::instrument(skip(key))]
|
||||||
fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(OutgoingKind, SendingEventType)> {
|
fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(OutgoingDestination, SendingEventType)> {
|
||||||
// Appservices start with a plus
|
// Appservices start with a plus
|
||||||
Ok::<_, Error>(if key.starts_with(b"+") {
|
Ok::<_, Error>(if key.starts_with(b"+") {
|
||||||
let mut parts = key[1..].splitn(2, |&b| b == 0xFF);
|
let mut parts = key[1..].splitn(2, |&b| b == 0xFF);
|
||||||
|
@ -136,7 +136,7 @@ fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(OutgoingKind,
|
||||||
.map_err(|_| Error::bad_database("Invalid server bytes in server_currenttransaction"))?;
|
.map_err(|_| Error::bad_database("Invalid server bytes in server_currenttransaction"))?;
|
||||||
|
|
||||||
(
|
(
|
||||||
OutgoingKind::Appservice(server),
|
OutgoingDestination::Appservice(server),
|
||||||
if value.is_empty() {
|
if value.is_empty() {
|
||||||
SendingEventType::Pdu(event.to_vec())
|
SendingEventType::Pdu(event.to_vec())
|
||||||
} else {
|
} else {
|
||||||
|
@ -163,7 +163,7 @@ fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(OutgoingKind,
|
||||||
.ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?;
|
.ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?;
|
||||||
|
|
||||||
(
|
(
|
||||||
OutgoingKind::Push(user_id, pushkey_string),
|
OutgoingDestination::Push(user_id, pushkey_string),
|
||||||
if value.is_empty() {
|
if value.is_empty() {
|
||||||
SendingEventType::Pdu(event.to_vec())
|
SendingEventType::Pdu(event.to_vec())
|
||||||
} else {
|
} else {
|
||||||
|
@ -183,7 +183,7 @@ fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(OutgoingKind,
|
||||||
.map_err(|_| Error::bad_database("Invalid server bytes in server_currenttransaction"))?;
|
.map_err(|_| Error::bad_database("Invalid server bytes in server_currenttransaction"))?;
|
||||||
|
|
||||||
(
|
(
|
||||||
OutgoingKind::Normal(
|
OutgoingDestination::Normal(
|
||||||
ServerName::parse(server)
|
ServerName::parse(server)
|
||||||
.map_err(|_| Error::bad_database("Invalid server string in server_currenttransaction"))?,
|
.map_err(|_| Error::bad_database("Invalid server string in server_currenttransaction"))?,
|
||||||
),
|
),
|
||||||
|
|
|
@ -1,20 +1,20 @@
|
||||||
use ruma::ServerName;
|
use ruma::ServerName;
|
||||||
|
|
||||||
use super::{OutgoingKind, SendingEventType};
|
use super::{OutgoingDestination, SendingEventType};
|
||||||
use crate::Result;
|
use crate::Result;
|
||||||
|
|
||||||
type OutgoingSendingIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingKind, SendingEventType)>> + 'a>;
|
type OutgoingSendingIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingDestination, SendingEventType)>> + 'a>;
|
||||||
type SendingEventTypeIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a>;
|
type SendingEventTypeIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a>;
|
||||||
|
|
||||||
pub trait Data: Send + Sync {
|
pub trait Data: Send + Sync {
|
||||||
fn active_requests(&self) -> OutgoingSendingIter<'_>;
|
fn active_requests(&self) -> OutgoingSendingIter<'_>;
|
||||||
fn active_requests_for(&self, outgoing_kind: &OutgoingKind) -> SendingEventTypeIter<'_>;
|
fn active_requests_for(&self, outgoing_kind: &OutgoingDestination) -> SendingEventTypeIter<'_>;
|
||||||
fn delete_active_request(&self, key: Vec<u8>) -> Result<()>;
|
fn delete_active_request(&self, key: Vec<u8>) -> Result<()>;
|
||||||
fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()>;
|
fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()>;
|
||||||
fn delete_all_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()>;
|
fn delete_all_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()>;
|
||||||
fn queue_requests(&self, requests: &[(&OutgoingKind, SendingEventType)]) -> Result<Vec<Vec<u8>>>;
|
fn queue_requests(&self, requests: &[(&OutgoingDestination, SendingEventType)]) -> Result<Vec<Vec<u8>>>;
|
||||||
fn queued_requests<'a>(
|
fn queued_requests<'a>(
|
||||||
&'a self, outgoing_kind: &OutgoingKind,
|
&'a self, outgoing_kind: &OutgoingDestination,
|
||||||
) -> Box<dyn Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a>;
|
) -> Box<dyn Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a>;
|
||||||
fn mark_as_active(&self, events: &[(SendingEventType, Vec<u8>)]) -> Result<()>;
|
fn mark_as_active(&self, events: &[(SendingEventType, Vec<u8>)]) -> Result<()>;
|
||||||
fn set_latest_educount(&self, server_name: &ServerName, educount: u64) -> Result<()>;
|
fn set_latest_educount(&self, server_name: &ServerName, educount: u64) -> Result<()>;
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
use std::{
|
use std::{
|
||||||
cmp,
|
cmp,
|
||||||
collections::{BTreeMap, HashMap, HashSet},
|
collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
|
||||||
fmt::Debug,
|
fmt::Debug,
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
|
@ -25,7 +25,7 @@ use ruma::{
|
||||||
events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType},
|
events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType},
|
||||||
push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId,
|
push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId,
|
||||||
};
|
};
|
||||||
use tokio::sync::{mpsc, Mutex, Semaphore};
|
use tokio::sync::{oneshot, Mutex, Semaphore};
|
||||||
use tracing::{error, warn};
|
use tracing::{error, warn};
|
||||||
|
|
||||||
use crate::{service::presence::Presence, services, utils::calculate_hash, Config, Error, PduEvent, Result};
|
use crate::{service::presence::Presence, services, utils::calculate_hash, Config, Error, PduEvent, Result};
|
||||||
|
@ -42,15 +42,15 @@ pub struct Service {
|
||||||
|
|
||||||
/// The state for a given state hash.
|
/// The state for a given state hash.
|
||||||
pub(super) maximum_requests: Arc<Semaphore>,
|
pub(super) maximum_requests: Arc<Semaphore>,
|
||||||
pub sender: mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec<u8>)>,
|
pub sender: loole::Sender<(OutgoingDestination, SendingEventType, Vec<u8>)>,
|
||||||
receiver: Mutex<mpsc::UnboundedReceiver<(OutgoingKind, SendingEventType, Vec<u8>)>>,
|
receiver: Mutex<loole::Receiver<(OutgoingDestination, SendingEventType, Vec<u8>)>>,
|
||||||
startup_netburst: bool,
|
startup_netburst: bool,
|
||||||
startup_netburst_keep: i64,
|
startup_netburst_keep: i64,
|
||||||
timeout: u64,
|
timeout: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
|
||||||
pub enum OutgoingKind {
|
pub enum OutgoingDestination {
|
||||||
Appservice(String),
|
Appservice(String),
|
||||||
Push(OwnedUserId, String), // user and pushkey
|
Push(OwnedUserId, String), // user and pushkey
|
||||||
Normal(OwnedServerName),
|
Normal(OwnedServerName),
|
||||||
|
@ -65,14 +65,31 @@ pub enum SendingEventType {
|
||||||
}
|
}
|
||||||
|
|
||||||
enum TransactionStatus {
|
enum TransactionStatus {
|
||||||
|
/// Currently running (for the first time)
|
||||||
Running,
|
Running,
|
||||||
Failed(u32, Instant), // number of times failed, time of last failure
|
/// Failed, backing off for a retry
|
||||||
Retrying(u32), // number of times failed
|
Failed {
|
||||||
|
failures: u32,
|
||||||
|
waker: Option<oneshot::Sender<()>>,
|
||||||
|
},
|
||||||
|
/// Currently retrying
|
||||||
|
Retrying {
|
||||||
|
/// number of times failed
|
||||||
|
failures: u32,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A control-flow enum to dictate what the handler should do after (trying to)
|
||||||
|
/// prepare a transaction
|
||||||
|
enum TransactionPrepOutcome {
|
||||||
|
Send(Vec<SendingEventType>),
|
||||||
|
Wake(OutgoingDestination),
|
||||||
|
Nothing,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Service {
|
impl Service {
|
||||||
pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> {
|
pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> {
|
||||||
let (sender, receiver) = mpsc::unbounded_channel();
|
let (sender, receiver) = loole::unbounded();
|
||||||
Arc::new(Self {
|
Arc::new(Self {
|
||||||
db,
|
db,
|
||||||
sender,
|
sender,
|
||||||
|
@ -86,7 +103,7 @@ impl Service {
|
||||||
|
|
||||||
#[tracing::instrument(skip(self, pdu_id, user, pushkey))]
|
#[tracing::instrument(skip(self, pdu_id, user, pushkey))]
|
||||||
pub fn send_pdu_push(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> {
|
pub fn send_pdu_push(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> {
|
||||||
let outgoing_kind = OutgoingKind::Push(user.to_owned(), pushkey);
|
let outgoing_kind = OutgoingDestination::Push(user.to_owned(), pushkey);
|
||||||
let event = SendingEventType::Pdu(pdu_id.to_owned());
|
let event = SendingEventType::Pdu(pdu_id.to_owned());
|
||||||
let _cork = services().globals.db.cork()?;
|
let _cork = services().globals.db.cork()?;
|
||||||
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
|
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
|
||||||
|
@ -99,7 +116,7 @@ impl Service {
|
||||||
|
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec<u8>) -> Result<()> {
|
pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec<u8>) -> Result<()> {
|
||||||
let outgoing_kind = OutgoingKind::Appservice(appservice_id);
|
let outgoing_kind = OutgoingDestination::Appservice(appservice_id);
|
||||||
let event = SendingEventType::Pdu(pdu_id);
|
let event = SendingEventType::Pdu(pdu_id);
|
||||||
let _cork = services().globals.db.cork()?;
|
let _cork = services().globals.db.cork()?;
|
||||||
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
|
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
|
||||||
|
@ -126,7 +143,7 @@ impl Service {
|
||||||
pub fn send_pdu_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I, pdu_id: &[u8]) -> Result<()> {
|
pub fn send_pdu_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I, pdu_id: &[u8]) -> Result<()> {
|
||||||
let requests = servers
|
let requests = servers
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|server| (OutgoingKind::Normal(server), SendingEventType::Pdu(pdu_id.to_owned())))
|
.map(|server| (OutgoingDestination::Normal(server), SendingEventType::Pdu(pdu_id.to_owned())))
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
let _cork = services().globals.db.cork()?;
|
let _cork = services().globals.db.cork()?;
|
||||||
let keys = self.db.queue_requests(
|
let keys = self.db.queue_requests(
|
||||||
|
@ -146,7 +163,7 @@ impl Service {
|
||||||
|
|
||||||
#[tracing::instrument(skip(self, server, serialized))]
|
#[tracing::instrument(skip(self, server, serialized))]
|
||||||
pub fn send_edu_server(&self, server: &ServerName, serialized: Vec<u8>) -> Result<()> {
|
pub fn send_edu_server(&self, server: &ServerName, serialized: Vec<u8>) -> Result<()> {
|
||||||
let outgoing_kind = OutgoingKind::Normal(server.to_owned());
|
let outgoing_kind = OutgoingDestination::Normal(server.to_owned());
|
||||||
let event = SendingEventType::Edu(serialized);
|
let event = SendingEventType::Edu(serialized);
|
||||||
let _cork = services().globals.db.cork()?;
|
let _cork = services().globals.db.cork()?;
|
||||||
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
|
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
|
||||||
|
@ -173,7 +190,7 @@ impl Service {
|
||||||
pub fn send_edu_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I, serialized: Vec<u8>) -> Result<()> {
|
pub fn send_edu_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I, serialized: Vec<u8>) -> Result<()> {
|
||||||
let requests = servers
|
let requests = servers
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|server| (OutgoingKind::Normal(server), SendingEventType::Edu(serialized.clone())))
|
.map(|server| (OutgoingDestination::Normal(server), SendingEventType::Edu(serialized.clone())))
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
let _cork = services().globals.db.cork()?;
|
let _cork = services().globals.db.cork()?;
|
||||||
let keys = self.db.queue_requests(
|
let keys = self.db.queue_requests(
|
||||||
|
@ -205,7 +222,7 @@ impl Service {
|
||||||
|
|
||||||
#[tracing::instrument(skip(self, servers))]
|
#[tracing::instrument(skip(self, servers))]
|
||||||
pub fn flush_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I) -> Result<()> {
|
pub fn flush_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I) -> Result<()> {
|
||||||
let requests = servers.into_iter().map(OutgoingKind::Normal);
|
let requests = servers.into_iter().map(OutgoingDestination::Normal);
|
||||||
|
|
||||||
for outgoing_kind in requests {
|
for outgoing_kind in requests {
|
||||||
self.sender
|
self.sender
|
||||||
|
@ -221,7 +238,7 @@ impl Service {
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
pub fn cleanup_events(&self, appservice_id: String) -> Result<()> {
|
pub fn cleanup_events(&self, appservice_id: String) -> Result<()> {
|
||||||
self.db
|
self.db
|
||||||
.delete_all_requests_for(&OutgoingKind::Appservice(appservice_id))?;
|
.delete_all_requests_for(&OutgoingDestination::Appservice(appservice_id))?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -274,14 +291,17 @@ impl Service {
|
||||||
|
|
||||||
#[tracing::instrument(skip(self), name = "sender")]
|
#[tracing::instrument(skip(self), name = "sender")]
|
||||||
async fn handler(&self) -> Result<()> {
|
async fn handler(&self) -> Result<()> {
|
||||||
let mut receiver = self.receiver.lock().await;
|
let new_transactions = self.receiver.lock().await;
|
||||||
|
let (waking_sender, waking_receiver) = loole::unbounded();
|
||||||
|
|
||||||
let mut futures = FuturesUnordered::new();
|
let mut outgoing = FuturesUnordered::new();
|
||||||
let mut current_transaction_status = HashMap::<OutgoingKind, TransactionStatus>::new();
|
let mut retrying = FuturesUnordered::new();
|
||||||
|
|
||||||
|
let mut current_transaction_status = HashMap::<OutgoingDestination, TransactionStatus>::new();
|
||||||
|
|
||||||
// Retry requests we could not finish yet
|
// Retry requests we could not finish yet
|
||||||
if self.startup_netburst {
|
if self.startup_netburst {
|
||||||
let mut initial_transactions = HashMap::<OutgoingKind, Vec<SendingEventType>>::new();
|
let mut initial_transactions = HashMap::<OutgoingDestination, Vec<SendingEventType>>::new();
|
||||||
for (key, outgoing_kind, event) in self.db.active_requests().filter_map(Result::ok) {
|
for (key, outgoing_kind, event) in self.db.active_requests().filter_map(Result::ok) {
|
||||||
let entry = initial_transactions
|
let entry = initial_transactions
|
||||||
.entry(outgoing_kind.clone())
|
.entry(outgoing_kind.clone())
|
||||||
|
@ -300,13 +320,14 @@ impl Service {
|
||||||
|
|
||||||
for (outgoing_kind, events) in initial_transactions {
|
for (outgoing_kind, events) in initial_transactions {
|
||||||
current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running);
|
current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running);
|
||||||
futures.push(handle_events(outgoing_kind.clone(), events));
|
outgoing.push(handle_events(outgoing_kind.clone(), events));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
Some(response) = futures.next() => {
|
Some(response) = outgoing.next() => {
|
||||||
|
// Outgoing transaction succeeded
|
||||||
match response {
|
match response {
|
||||||
Ok(outgoing_kind) => {
|
Ok(outgoing_kind) => {
|
||||||
let _cork = services().globals.db.cork();
|
let _cork = services().globals.db.cork();
|
||||||
|
@ -322,51 +343,155 @@ impl Service {
|
||||||
if !new_events.is_empty() {
|
if !new_events.is_empty() {
|
||||||
// Insert pdus we found
|
// Insert pdus we found
|
||||||
self.db.mark_as_active(&new_events)?;
|
self.db.mark_as_active(&new_events)?;
|
||||||
futures.push(handle_events(
|
|
||||||
outgoing_kind.clone(),
|
// Clear retries
|
||||||
|
current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running);
|
||||||
|
|
||||||
|
outgoing.push(handle_events(
|
||||||
|
outgoing_kind,
|
||||||
new_events.into_iter().map(|(event, _)| event).collect(),
|
new_events.into_iter().map(|(event, _)| event).collect(),
|
||||||
));
|
));
|
||||||
} else {
|
} else {
|
||||||
current_transaction_status.remove(&outgoing_kind);
|
current_transaction_status.remove(&outgoing_kind);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err((outgoing_kind, _)) => {
|
// Outgoing transaction failed
|
||||||
current_transaction_status.entry(outgoing_kind).and_modify(|e| *e = match e {
|
Err((destination, err)) => {
|
||||||
TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()),
|
// Set status to Failed, create timer
|
||||||
TransactionStatus::Retrying(n) => TransactionStatus::Failed(*n+1, Instant::now()),
|
let timer = Self::mark_failed_and_backoff(&mut current_transaction_status, destination.clone());
|
||||||
TransactionStatus::Failed(_, _) => {
|
|
||||||
error!("Request that was not even running failed?!");
|
// Add timer to loop
|
||||||
return
|
retrying.push(timer);
|
||||||
},
|
|
||||||
});
|
warn!("Outgoing request to {destination} failed: {err}");
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
},
|
},
|
||||||
Some((outgoing_kind, event, key)) = receiver.recv() => {
|
|
||||||
if let Ok(Some(events)) = self.select_events(
|
// Transaction retry timers firing
|
||||||
&outgoing_kind,
|
Some(dest) = retrying.next() => {
|
||||||
|
// Transition Failed => Retrying, return pending old transaction events
|
||||||
|
match self.select_events(
|
||||||
|
&dest,
|
||||||
|
vec![], // will be ignored because fresh == false
|
||||||
|
&mut current_transaction_status,
|
||||||
|
false,
|
||||||
|
) {
|
||||||
|
Ok(TransactionPrepOutcome::Send(events)) => {
|
||||||
|
outgoing.push(handle_events(dest, events));
|
||||||
|
}
|
||||||
|
Ok(_) => {
|
||||||
|
// Unreachable because fresh == false
|
||||||
|
unreachable!("select_events on a stale transaction {} did not return ::Send", dest)
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(err) => {
|
||||||
|
error!("Ignoring error in (stale) outgoing request ({}) handler: {}", dest, err);
|
||||||
|
|
||||||
|
// transaction dropped, so drop destination as well.
|
||||||
|
current_transaction_status.remove(&dest);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
// Explicit wakeups, makes a backoff timer return immediately
|
||||||
|
Ok(outgoing) = waking_receiver.recv_async() => {
|
||||||
|
if let Some(TransactionStatus::Failed { waker, .. }) = current_transaction_status.get_mut(&outgoing) {
|
||||||
|
if let Some(waker) = waker.take() {
|
||||||
|
_ = waker.send(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
// New transactions to be sent out (from server/user activity)
|
||||||
|
event = new_transactions.recv_async() => {
|
||||||
|
if let Ok((dest, event, key)) = event {
|
||||||
|
match self.select_events(
|
||||||
|
&dest,
|
||||||
vec![(event, key)],
|
vec![(event, key)],
|
||||||
&mut current_transaction_status,
|
&mut current_transaction_status,
|
||||||
) {
|
true) {
|
||||||
futures.push(handle_events(outgoing_kind, events));
|
Ok(TransactionPrepOutcome::Send(events)) => {
|
||||||
|
outgoing.push(handle_events(dest, events));
|
||||||
|
},
|
||||||
|
Ok(TransactionPrepOutcome::Wake(dest)) => {
|
||||||
|
waking_sender.send(dest).expect("nothing closes this channel but ourselves");
|
||||||
|
},
|
||||||
|
Ok(TransactionPrepOutcome::Nothing) => {},
|
||||||
|
Err(err) => {
|
||||||
|
error!("Ignoring error in (fresh) outgoing request ({}) handler: {}", dest, err);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Generates timer/oneshot, alters status to reflect Failed
|
||||||
|
///
|
||||||
|
/// Returns timer/oneshot future to wake up loop for next retry
|
||||||
|
fn mark_failed_and_backoff(
|
||||||
|
status: &mut HashMap<OutgoingDestination, TransactionStatus>, dest: OutgoingDestination,
|
||||||
|
) -> impl std::future::Future<Output = OutgoingDestination> {
|
||||||
|
let now = Instant::now();
|
||||||
|
|
||||||
|
let entry = status
|
||||||
|
.get_mut(&dest)
|
||||||
|
.expect("guaranteed to be set before this function");
|
||||||
|
|
||||||
|
let failures = match entry {
|
||||||
|
// Running -> Failed
|
||||||
|
TransactionStatus::Running => 1,
|
||||||
|
// Retrying -> Failed
|
||||||
|
TransactionStatus::Retrying {
|
||||||
|
failures,
|
||||||
|
} => *failures + 1,
|
||||||
|
|
||||||
|
// The transition of Failed -> Retrying is handled by handle_events
|
||||||
|
TransactionStatus::Failed {
|
||||||
|
..
|
||||||
|
} => {
|
||||||
|
unreachable!(
|
||||||
|
"TransactionStatus in inconsistent state: Expected either Running or Retrying, got Failed, \
|
||||||
|
bailing..."
|
||||||
|
)
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
const ONE_DAY: Duration = Duration::from_secs(60 * 60 * 24);
|
||||||
|
|
||||||
|
// Exponential backoff, clamp upper value to one day
|
||||||
|
let next_wakeup = now + (Duration::from_secs(30) * failures * failures).min(ONE_DAY);
|
||||||
|
|
||||||
|
let (fut, waker) = dest.wrap_in_interruptible_sleep(next_wakeup);
|
||||||
|
|
||||||
|
*entry = TransactionStatus::Failed {
|
||||||
|
failures,
|
||||||
|
waker: Some(waker),
|
||||||
|
};
|
||||||
|
|
||||||
|
fut
|
||||||
|
}
|
||||||
|
|
||||||
|
/// This prepares a transaction, checks the transaction state, and selects
|
||||||
|
/// appropriate events.
|
||||||
#[tracing::instrument(skip(self, outgoing_kind, new_events, current_transaction_status))]
|
#[tracing::instrument(skip(self, outgoing_kind, new_events, current_transaction_status))]
|
||||||
fn select_events(
|
fn select_events(
|
||||||
&self,
|
&self,
|
||||||
outgoing_kind: &OutgoingKind,
|
outgoing_kind: &OutgoingDestination,
|
||||||
new_events: Vec<(SendingEventType, Vec<u8>)>, // Events we want to send: event and full key
|
new_events: Vec<(SendingEventType, Vec<u8>)>, // Events we want to send: event and full key
|
||||||
current_transaction_status: &mut HashMap<OutgoingKind, TransactionStatus>,
|
current_transaction_status: &mut HashMap<OutgoingDestination, TransactionStatus>,
|
||||||
) -> Result<Option<Vec<SendingEventType>>> {
|
fresh: bool, // Wether or not this transaction came from server activity.
|
||||||
let (allow, retry) = self.select_events_current(outgoing_kind.clone(), current_transaction_status)?;
|
) -> Result<TransactionPrepOutcome> {
|
||||||
|
let (allow, retry, wake_up) =
|
||||||
|
self.select_events_current(outgoing_kind.clone(), current_transaction_status, fresh)?;
|
||||||
|
|
||||||
// Nothing can be done for this remote, bail out.
|
// Nothing can be done for this remote, bail out.
|
||||||
if !allow {
|
if wake_up {
|
||||||
return Ok(None);
|
return Ok(TransactionPrepOutcome::Wake(outgoing_kind.clone()));
|
||||||
|
} else if !allow {
|
||||||
|
return Ok(TransactionPrepOutcome::Nothing);
|
||||||
}
|
}
|
||||||
|
|
||||||
let _cork = services().globals.db.cork();
|
let _cork = services().globals.db.cork();
|
||||||
|
@ -374,12 +499,14 @@ impl Service {
|
||||||
|
|
||||||
// Must retry any previous transaction for this remote.
|
// Must retry any previous transaction for this remote.
|
||||||
if retry {
|
if retry {
|
||||||
self.db
|
// We retry the previous transaction
|
||||||
|
for (_, e) in self
|
||||||
|
.db
|
||||||
.active_requests_for(outgoing_kind)
|
.active_requests_for(outgoing_kind)
|
||||||
.filter_map(Result::ok)
|
.filter_map(Result::ok)
|
||||||
.for_each(|(_, e)| events.push(e));
|
{
|
||||||
|
events.push(e);
|
||||||
return Ok(Some(events));
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compose the next transaction
|
// Compose the next transaction
|
||||||
|
@ -392,43 +519,79 @@ impl Service {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add EDU's into the transaction
|
// Add EDU's into the transaction
|
||||||
if let OutgoingKind::Normal(server_name) = outgoing_kind {
|
if let OutgoingDestination::Normal(server_name) = outgoing_kind {
|
||||||
if let Ok((select_edus, last_count)) = self.select_edus(server_name) {
|
if let Ok((select_edus, last_count)) = self.select_edus(server_name) {
|
||||||
events.extend(select_edus.into_iter().map(SendingEventType::Edu));
|
events.extend(select_edus.into_iter().map(SendingEventType::Edu));
|
||||||
self.db.set_latest_educount(server_name, last_count)?;
|
self.db.set_latest_educount(server_name, last_count)?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Some(events))
|
Ok(TransactionPrepOutcome::Send(events))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self, outgoing_kind, current_transaction_status))]
|
#[tracing::instrument(skip(self, outgoing_kind, current_transaction_status))]
|
||||||
fn select_events_current(
|
fn select_events_current(
|
||||||
&self, outgoing_kind: OutgoingKind, current_transaction_status: &mut HashMap<OutgoingKind, TransactionStatus>,
|
&self, outgoing_kind: OutgoingDestination,
|
||||||
) -> Result<(bool, bool)> {
|
current_transaction_status: &mut HashMap<OutgoingDestination, TransactionStatus>, fresh: bool,
|
||||||
let (mut allow, mut retry) = (true, false);
|
) -> Result<(bool, bool, bool)> {
|
||||||
current_transaction_status
|
let (mut allow, mut retry, mut wake_up) = (true, false, false);
|
||||||
.entry(outgoing_kind)
|
|
||||||
|
let entry = current_transaction_status.entry(outgoing_kind);
|
||||||
|
|
||||||
|
if fresh {
|
||||||
|
// If its fresh, we initialise the status if we need to.
|
||||||
|
//
|
||||||
|
// We do nothing if it is already running or retrying.
|
||||||
|
//
|
||||||
|
// We return with a wake if it is in the Failed state.
|
||||||
|
entry
|
||||||
.and_modify(|e| match e {
|
.and_modify(|e| match e {
|
||||||
TransactionStatus::Failed(tries, time) => {
|
TransactionStatus::Running
|
||||||
// Fail if a request has failed recently (exponential backoff)
|
| TransactionStatus::Retrying {
|
||||||
const MAX_DURATION: Duration = Duration::from_secs(60 * 60 * 24);
|
..
|
||||||
let mut min_elapsed_duration = Duration::from_secs(self.timeout) * (*tries) * (*tries);
|
} => {
|
||||||
min_elapsed_duration = cmp::min(min_elapsed_duration, MAX_DURATION);
|
|
||||||
if time.elapsed() < min_elapsed_duration {
|
|
||||||
allow = false;
|
|
||||||
} else {
|
|
||||||
retry = true;
|
|
||||||
*e = TransactionStatus::Retrying(*tries);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
TransactionStatus::Running | TransactionStatus::Retrying(_) => {
|
|
||||||
allow = false; // already running
|
allow = false; // already running
|
||||||
},
|
},
|
||||||
|
TransactionStatus::Failed {
|
||||||
|
..
|
||||||
|
} => {
|
||||||
|
// currently sleeping
|
||||||
|
wake_up = true;
|
||||||
|
},
|
||||||
})
|
})
|
||||||
.or_insert(TransactionStatus::Running);
|
.or_insert(TransactionStatus::Running);
|
||||||
|
} else {
|
||||||
|
// If it's not fresh, we expect an entry.
|
||||||
|
//
|
||||||
|
// We also expect us to be the only one who are touching this destination right
|
||||||
|
// now, and its a stale transaction, so it must be in the Failed state
|
||||||
|
match entry {
|
||||||
|
Entry::Occupied(mut e) => {
|
||||||
|
let e = e.get_mut();
|
||||||
|
match e {
|
||||||
|
TransactionStatus::Failed {
|
||||||
|
failures,
|
||||||
|
..
|
||||||
|
} => {
|
||||||
|
*e = TransactionStatus::Retrying {
|
||||||
|
failures: *failures,
|
||||||
|
};
|
||||||
|
retry = true;
|
||||||
|
},
|
||||||
|
|
||||||
Ok((allow, retry))
|
_ => unreachable!(
|
||||||
|
"Encountered bad state when preparing stale transaction: expected Failed state, got \
|
||||||
|
Running or Retrying"
|
||||||
|
),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Entry::Vacant(_) => unreachable!(
|
||||||
|
"Encountered bad state when preparing stale transaction: expected Failed state, got vacant state"
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok((allow, retry, wake_up))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(self, server_name))]
|
#[tracing::instrument(skip(self, server_name))]
|
||||||
|
@ -594,19 +757,21 @@ pub fn select_edus_receipts(
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_events(
|
async fn handle_events(
|
||||||
kind: OutgoingKind, events: Vec<SendingEventType>,
|
kind: OutgoingDestination, events: Vec<SendingEventType>,
|
||||||
) -> Result<OutgoingKind, (OutgoingKind, Error)> {
|
) -> Result<OutgoingDestination, (OutgoingDestination, Error)> {
|
||||||
match kind {
|
match kind {
|
||||||
OutgoingKind::Appservice(ref id) => handle_events_kind_appservice(&kind, id, events).await,
|
OutgoingDestination::Appservice(ref id) => handle_events_kind_appservice(&kind, id, events).await,
|
||||||
OutgoingKind::Push(ref userid, ref pushkey) => handle_events_kind_push(&kind, userid, pushkey, events).await,
|
OutgoingDestination::Push(ref userid, ref pushkey) => {
|
||||||
OutgoingKind::Normal(ref server) => handle_events_kind_normal(&kind, server, events).await,
|
handle_events_kind_push(&kind, userid, pushkey, events).await
|
||||||
|
},
|
||||||
|
OutgoingDestination::Normal(ref server) => handle_events_kind_normal(&kind, server, events).await,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(skip(kind, events))]
|
#[tracing::instrument(skip(kind, events))]
|
||||||
async fn handle_events_kind_appservice(
|
async fn handle_events_kind_appservice(
|
||||||
kind: &OutgoingKind, id: &String, events: Vec<SendingEventType>,
|
kind: &OutgoingDestination, id: &String, events: Vec<SendingEventType>,
|
||||||
) -> Result<OutgoingKind, (OutgoingKind, Error)> {
|
) -> Result<OutgoingDestination, (OutgoingDestination, Error)> {
|
||||||
let mut pdu_jsons = Vec::new();
|
let mut pdu_jsons = Vec::new();
|
||||||
|
|
||||||
for event in &events {
|
for event in &events {
|
||||||
|
@ -674,8 +839,8 @@ async fn handle_events_kind_appservice(
|
||||||
|
|
||||||
#[tracing::instrument(skip(kind, events))]
|
#[tracing::instrument(skip(kind, events))]
|
||||||
async fn handle_events_kind_push(
|
async fn handle_events_kind_push(
|
||||||
kind: &OutgoingKind, userid: &OwnedUserId, pushkey: &String, events: Vec<SendingEventType>,
|
kind: &OutgoingDestination, userid: &OwnedUserId, pushkey: &String, events: Vec<SendingEventType>,
|
||||||
) -> Result<OutgoingKind, (OutgoingKind, Error)> {
|
) -> Result<OutgoingDestination, (OutgoingDestination, Error)> {
|
||||||
let mut pdus = Vec::new();
|
let mut pdus = Vec::new();
|
||||||
|
|
||||||
for event in &events {
|
for event in &events {
|
||||||
|
@ -715,7 +880,7 @@ async fn handle_events_kind_push(
|
||||||
let Some(pusher) = services()
|
let Some(pusher) = services()
|
||||||
.pusher
|
.pusher
|
||||||
.get_pusher(userid, pushkey)
|
.get_pusher(userid, pushkey)
|
||||||
.map_err(|e| (kind.clone(), e))?
|
.map_err(|e| (OutgoingDestination::Push(userid.clone(), pushkey.clone()), e))?
|
||||||
else {
|
else {
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
@ -752,8 +917,8 @@ async fn handle_events_kind_push(
|
||||||
|
|
||||||
#[tracing::instrument(skip(kind, events), name = "")]
|
#[tracing::instrument(skip(kind, events), name = "")]
|
||||||
async fn handle_events_kind_normal(
|
async fn handle_events_kind_normal(
|
||||||
kind: &OutgoingKind, dest: &OwnedServerName, events: Vec<SendingEventType>,
|
kind: &OutgoingDestination, dest: &OwnedServerName, events: Vec<SendingEventType>,
|
||||||
) -> Result<OutgoingKind, (OutgoingKind, Error)> {
|
) -> Result<OutgoingDestination, (OutgoingDestination, Error)> {
|
||||||
let mut edu_jsons = Vec::new();
|
let mut edu_jsons = Vec::new();
|
||||||
let mut pdu_jsons = Vec::new();
|
let mut pdu_jsons = Vec::new();
|
||||||
|
|
||||||
|
@ -826,23 +991,23 @@ async fn handle_events_kind_normal(
|
||||||
response
|
response
|
||||||
}
|
}
|
||||||
|
|
||||||
impl OutgoingKind {
|
impl OutgoingDestination {
|
||||||
#[tracing::instrument(skip(self))]
|
#[tracing::instrument(skip(self))]
|
||||||
pub fn get_prefix(&self) -> Vec<u8> {
|
pub fn get_prefix(&self) -> Vec<u8> {
|
||||||
let mut prefix = match self {
|
let mut prefix = match self {
|
||||||
OutgoingKind::Appservice(server) => {
|
OutgoingDestination::Appservice(server) => {
|
||||||
let mut p = b"+".to_vec();
|
let mut p = b"+".to_vec();
|
||||||
p.extend_from_slice(server.as_bytes());
|
p.extend_from_slice(server.as_bytes());
|
||||||
p
|
p
|
||||||
},
|
},
|
||||||
OutgoingKind::Push(user, pushkey) => {
|
OutgoingDestination::Push(user, pushkey) => {
|
||||||
let mut p = b"$".to_vec();
|
let mut p = b"$".to_vec();
|
||||||
p.extend_from_slice(user.as_bytes());
|
p.extend_from_slice(user.as_bytes());
|
||||||
p.push(0xFF);
|
p.push(0xFF);
|
||||||
p.extend_from_slice(pushkey.as_bytes());
|
p.extend_from_slice(pushkey.as_bytes());
|
||||||
p
|
p
|
||||||
},
|
},
|
||||||
OutgoingKind::Normal(server) => {
|
OutgoingDestination::Normal(server) => {
|
||||||
let mut p = Vec::new();
|
let mut p = Vec::new();
|
||||||
p.extend_from_slice(server.as_bytes());
|
p.extend_from_slice(server.as_bytes());
|
||||||
p
|
p
|
||||||
|
@ -852,4 +1017,40 @@ impl OutgoingKind {
|
||||||
|
|
||||||
prefix
|
prefix
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// This wraps the OutgoingDestination key in an interruptible sleep future.
|
||||||
|
///
|
||||||
|
/// The first return value is the future, the second is the oneshot that
|
||||||
|
/// interrupts that future, and causes it to return instantly.
|
||||||
|
fn wrap_in_interruptible_sleep(
|
||||||
|
self, at: Instant,
|
||||||
|
) -> (impl std::future::Future<Output = Self>, oneshot::Sender<()>) {
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
let at = tokio::time::Instant::from_std(at);
|
||||||
|
|
||||||
|
(
|
||||||
|
async move {
|
||||||
|
_ = tokio::time::timeout_at(at, rx).await;
|
||||||
|
|
||||||
|
self
|
||||||
|
},
|
||||||
|
tx,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::fmt::Display for OutgoingDestination {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
match self {
|
||||||
|
OutgoingDestination::Appservice(appservice_id) => {
|
||||||
|
write!(f, "Appservice (ID {:?})", appservice_id)
|
||||||
|
},
|
||||||
|
OutgoingDestination::Push(user, push_key) => {
|
||||||
|
write!(f, "User Push Service (for {:?}, with key {:?})", user, push_key)
|
||||||
|
},
|
||||||
|
OutgoingDestination::Normal(server) => {
|
||||||
|
write!(f, "Matrix Server ({:?})", server)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue