Compare commits

...
Sign in to create a new pull request.

10 commits

Author SHA1 Message Date
strawberry
d5a9c98657 make federation retry timer-based
Signed-off-by: strawberry <strawberry@puppygock.gay>
2024-04-17 22:14:30 -04:00
strawberry
395b466b4a rename OutgoingKind enum to OutgoingDestination
Signed-off-by: strawberry <strawberry@puppygock.gay>
2024-04-17 20:11:18 -04:00
strawberry
0376b58006 use latest main rev for hickory (and for reqwest)
Signed-off-by: strawberry <strawberry@puppygock.gay>
2024-04-17 20:05:56 -04:00
strawberry
78c1e2f427 adjust DNS default config options
Signed-off-by: strawberry <strawberry@puppygock.gay>
2024-04-17 19:49:19 -04:00
strawberry
6614b8f6bf ci: remove download env
Signed-off-by: strawberry <strawberry@puppygock.gay>
2024-04-17 19:15:12 -04:00
strawberry
c2fa8e6f8d split up CI steps
Signed-off-by: strawberry <strawberry@puppygock.gay>
2024-04-17 17:59:01 -04:00
strawberry
b8108f5897 cargo fmt
Signed-off-by: strawberry <strawberry@puppygock.gay>
2024-04-17 17:50:34 -04:00
morguldir
cf8358cbe6 Remove extra test flag when publishing to ghcr in the CI
test -n checks if a string is longer than non-zero, but we just need a compare

Signed-off-by: morguldir <morguldir@protonmail.com>
2024-04-17 17:22:52 -04:00
strawberry
7ecc570bb8 Revert "dont use loole for sending channel code"
This reverts commit d0a9666a29.
2024-04-17 15:16:01 -04:00
strawberry
002799177d fix wrong warn message
Signed-off-by: strawberry <strawberry@puppygock.gay>
2024-04-17 15:15:52 -04:00
9 changed files with 480 additions and 394 deletions

View file

@ -26,11 +26,9 @@ permissions:
contents: read contents: read
jobs: jobs:
ci: setup:
name: CI and Artifacts name: CI Setup
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Sync repository - name: Sync repository
uses: actions/checkout@v4 uses: actions/checkout@v4
@ -94,291 +92,175 @@ jobs:
./bin/nix-build-and-cache .#devShells.x86_64-linux.default.inputDerivation ./bin/nix-build-and-cache .#devShells.x86_64-linux.default.inputDerivation
build-and-test:
name: CI and Artifacts
needs: setup
runs-on: ubuntu-latest
strategy:
matrix:
target: [
"static-x86_64-unknown-linux-musl",
"static-x86_64-unknown-linux-musl-jemalloc",
"static-x86_64-unknown-linux-musl-hmalloc",
"static-aarch64-unknown-linux-musl",
"static-aarch64-unknown-linux-musl-jemalloc",
"static-aarch64-unknown-linux-musl-hmalloc",
]
oci-target: [
"x86_64-unknown-linux-gnu",
"x86_64-unknown-linux-musl",
"x86_64-unknown-linux-musl-jemalloc",
"x86_64-unknown-linux-musl-hmalloc",
"aarch64-unknown-linux-musl",
"aarch64-unknown-linux-musl-jemalloc",
"aarch64-unknown-linux-musl-hmalloc",
]
steps:
- name: Perform continuous integration - name: Perform continuous integration
run: direnv exec . engage run: direnv exec . engage
- name: Build static-x86_64-unknown-linux-musl and Create static deb-x86_64-unknown-linux-musl - name: Build static artifacts
run: | run: |
./bin/nix-build-and-cache .#static-x86_64-unknown-linux-musl ./bin/nix-build-and-cache .#${{ matrix.target }}
mkdir -p target/release mkdir -p target/release
cp -v -f result/bin/conduit target/release cp -v -f result/bin/conduit target/release
direnv exec . cargo deb --no-build direnv exec . cargo deb --no-build --output target/debian/${{ matrix.target }}.deb
- name: Upload artifact static-x86_64-unknown-linux-musl - name: Upload static artifacts
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v4
with: with:
name: static-x86_64-unknown-linux-musl name: ${{ matrix.target }}
path: result/bin/conduit path: result/bin/conduit
if-no-files-found: error if-no-files-found: error
- name: Upload artifact deb-x86_64-unknown-linux-musl - name: Upload static deb artifacts
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v4
with: with:
name: x86_64-unknown-linux-musl.deb name: ${{ matrix.target }}.deb
path: target/debian/*.deb path: target/debian/${{ matrix.target }}.deb
if-no-files-found: error if-no-files-found: error
- name: Build static-x86_64-unknown-linux-musl-jemalloc and Create static deb-x86_64-unknown-linux-musl-jemalloc
- name: Build OCI images
run: | run: |
./bin/nix-build-and-cache .#static-x86_64-unknown-linux-musl-jemalloc ./bin/nix-build-and-cache .#oci-image-${{ matrix.oci-target }}
mkdir -p target/release cp -v -f result oci-image-${{ matrix.oci-target }}.tar.gz
cp -v -f result/bin/conduit target/release
direnv exec . cargo deb --no-build
- name: Upload artifact static-x86_64-unknown-linux-musl-jemalloc - name: Upload OCI image artifacts
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v4
with: with:
name: static-x86_64-unknown-linux-musl-jemalloc name: oci-image-${{ matrix.oci-target }}
path: result/bin/conduit path: oci-image-${{ matrix.oci-target }}.tar.gz
if-no-files-found: error
- name: Upload artifact deb-x86_64-unknown-linux-musl-jemalloc
uses: actions/upload-artifact@v4
with:
name: x86_64-unknown-linux-musl-jemalloc.deb
path: target/debian/*.deb
if-no-files-found: error
- name: Build static-x86_64-unknown-linux-musl-hmalloc and Create static deb-x86_64-unknown-linux-musl-hmalloc
run: |
./bin/nix-build-and-cache .#static-x86_64-unknown-linux-musl-hmalloc
mkdir -p target/release
cp -v -f result/bin/conduit target/release
direnv exec . cargo deb --no-build
- name: Upload artifact static-x86_64-unknown-linux-musl-hmalloc
uses: actions/upload-artifact@v4
with:
name: static-x86_64-unknown-linux-musl-hmalloc
path: result/bin/conduit
if-no-files-found: error
- name: Upload artifact deb-x86_64-unknown-linux-musl-hmalloc
uses: actions/upload-artifact@v4
with:
name: x86_64-unknown-linux-musl-hmalloc.deb
path: target/debian/*.deb
if-no-files-found: error
- name: Build static-aarch64-unknown-linux-musl
run: |
./bin/nix-build-and-cache .#static-aarch64-unknown-linux-musl
- name: Upload artifact static-aarch64-unknown-linux-musl
uses: actions/upload-artifact@v4
with:
name: static-aarch64-unknown-linux-musl
path: result/bin/conduit
if-no-files-found: error
- name: Build static-aarch64-unknown-linux-musl-jemalloc
run: |
./bin/nix-build-and-cache .#static-aarch64-unknown-linux-musl-jemalloc
- name: Upload artifact static-aarch64-unknown-linux-musl-jemalloc
uses: actions/upload-artifact@v4
with:
name: static-aarch64-unknown-linux-musl-jemalloc
path: result/bin/conduit
if-no-files-found: error
- name: Build static-aarch64-unknown-linux-musl-hmalloc
run: |
./bin/nix-build-and-cache .#static-aarch64-unknown-linux-musl-hmalloc
- name: Upload artifact static-aarch64-unknown-linux-musl-hmalloc
uses: actions/upload-artifact@v4
with:
name: static-aarch64-unknown-linux-musl-hmalloc
path: result/bin/conduit
if-no-files-found: error
- name: Build oci-image-x86_64-unknown-linux-gnu
run: |
./bin/nix-build-and-cache .#oci-image
cp -v -f result oci-image-amd64.tar.gz
- name: Upload artifact oci-image-x86_64-unknown-linux-gnu
uses: actions/upload-artifact@v4
with:
name: oci-image-x86_64-unknown-linux-gnu
path: oci-image-amd64.tar.gz
if-no-files-found: error
# don't compress again
compression-level: 0
- name: Build oci-image-x86_64-unknown-linux-gnu-jemalloc
run: |
./bin/nix-build-and-cache .#oci-image-jemalloc
cp -v -f result oci-image-amd64.tar.gz
- name: Upload artifact oci-image-x86_64-unknown-linux-gnu-jemalloc
uses: actions/upload-artifact@v4
with:
name: oci-image-x86_64-unknown-linux-gnu-jemalloc
path: oci-image-amd64.tar.gz
if-no-files-found: error
# don't compress again
compression-level: 0
- name: Build oci-image-x86_64-unknown-linux-gnu-hmalloc
run: |
./bin/nix-build-and-cache .#oci-image-hmalloc
cp -v -f result oci-image-amd64.tar.gz
- name: Upload artifact oci-image-x86_64-unknown-linux-gnu-hmalloc
uses: actions/upload-artifact@v4
with:
name: oci-image-x86_64-unknown-linux-gnu-hmalloc
path: oci-image-amd64.tar.gz
if-no-files-found: error if-no-files-found: error
# don't compress again # don't compress again
compression-level: 0 compression-level: 0
- name: Build oci-image-aarch64-unknown-linux-musl
run: |
./bin/nix-build-and-cache .#oci-image-aarch64-unknown-linux-musl
cp -v -f result oci-image-arm64v8.tar.gz
- name: Upload artifact oci-image-aarch64-unknown-linux-musl
uses: actions/upload-artifact@v4
with:
name: oci-image-aarch64-unknown-linux-musl
path: oci-image-arm64v8.tar.gz
if-no-files-found: error
# don't compress again
compression-level: 0
- name: Build oci-image-aarch64-unknown-linux-musl-jemalloc
run: |
./bin/nix-build-and-cache .#oci-image-aarch64-unknown-linux-musl-jemalloc
cp -v -f result oci-image-arm64v8.tar.gz
- name: Upload artifact oci-image-aarch64-unknown-linux-musl-jemalloc
uses: actions/upload-artifact@v4
with:
name: oci-image-aarch64-unknown-linux-musl-jemalloc
path: oci-image-arm64v8.tar.gz
if-no-files-found: error
# don't compress again
compression-level: 0
- name: Build oci-image-aarch64-unknown-linux-musl-hmalloc
run: |
./bin/nix-build-and-cache .#oci-image-aarch64-unknown-linux-musl-hmalloc
cp -v -f result oci-image-arm64v8.tar.gz
- name: Upload artifact oci-image-aarch64-unknown-linux-musl-hmalloc
uses: actions/upload-artifact@v4
with:
name: oci-image-aarch64-unknown-linux-musl-hmalloc
path: oci-image-arm64v8.tar.gz
if-no-files-found: error
# don't compress again
compression-level: 0
- name: Extract metadata for Dockerhub publish:
env: needs: build-and-test
REGISTRY: registry.hub.docker.com runs-on: ubuntu-latest
IMAGE_NAME: ${{ github.repository }} steps:
id: meta-dockerhub - name: Extract metadata for Dockerhub
uses: docker/metadata-action@v5 env:
with: REGISTRY: registry.hub.docker.com
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} IMAGE_NAME: ${{ github.repository }}
id: meta-dockerhub
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
- name: Extract metadata for GitHub Container Registry - name: Extract metadata for GitHub Container Registry
env: env:
REGISTRY: ghcr.io REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }} IMAGE_NAME: ${{ github.repository }}
id: meta-ghcr id: meta-ghcr
uses: docker/metadata-action@v5 uses: docker/metadata-action@v5
with: with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
- name: Login to Dockerhub - name: Login to Dockerhub
env: env:
DOCKERHUB_TOKEN: ${{ secrets.DOCKERHUB_TOKEN }} DOCKERHUB_TOKEN: ${{ secrets.DOCKERHUB_TOKEN }}
DOCKER_USERNAME: ${{ vars.DOCKER_USERNAME }} DOCKER_USERNAME: ${{ vars.DOCKER_USERNAME }}
if: ${{ (github.event_name != 'pull_request') && (env.DOCKER_USERNAME != '') && (env.DOCKERHUB_TOKEN != '') }} if: ${{ (github.event_name != 'pull_request') && (env.DOCKER_USERNAME != '') && (env.DOCKERHUB_TOKEN != '') }}
uses: docker/login-action@v3 uses: docker/login-action@v3
with: with:
# username is not really a secret # username is not really a secret
username: ${{ vars.DOCKER_USERNAME }} username: ${{ vars.DOCKER_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }} password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Login to GitHub Container Registry - name: Login to GitHub Container Registry
if: github.event_name != 'pull_request' if: github.event_name != 'pull_request'
uses: docker/login-action@v3 uses: docker/login-action@v3
env: env:
REGISTRY: ghcr.io REGISTRY: ghcr.io
with: with:
registry: ${{ env.REGISTRY }} registry: ${{ env.REGISTRY }}
username: ${{ github.repository_owner }} username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }} password: ${{ secrets.GITHUB_TOKEN }}
- name: Publish to Dockerhub - name: Publish to Dockerhub
env: env:
DOCKERHUB_TOKEN: ${{ secrets.DOCKERHUB_TOKEN }} DOCKERHUB_TOKEN: ${{ secrets.DOCKERHUB_TOKEN }}
DOCKER_USERNAME: ${{ vars.DOCKER_USERNAME }} DOCKER_USERNAME: ${{ vars.DOCKER_USERNAME }}
IMAGE_NAME: docker.io/${{ github.repository }} IMAGE_NAME: docker.io/${{ github.repository }}
IMAGE_SUFFIX_AMD64: amd64 IMAGE_SUFFIX_AMD64: amd64
IMAGE_SUFFIX_ARM64V8: arm64v8 IMAGE_SUFFIX_ARM64V8: arm64v8
if: ${{ (github.event_name != 'pull_request') && (env.DOCKER_USERNAME != '') && (env.DOCKERHUB_TOKEN != '') }} if: ${{ (github.event_name != 'pull_request') && (env.DOCKER_USERNAME != '') && (env.DOCKERHUB_TOKEN != '') }}
run: | run: |
docker load -i oci-image-amd64.tar.gz docker load -i oci-image-amd64.tar.gz
IMAGE_ID_AMD64=$(docker images -q conduit:main) IMAGE_ID_AMD64=$(docker images -q conduit:main)
docker load -i oci-image-arm64v8.tar.gz docker load -i oci-image-arm64v8.tar.gz
IMAGE_ID_ARM64V8=$(docker images -q conduit:main) IMAGE_ID_ARM64V8=$(docker images -q conduit:main)
# Tag and push the architecture specific images # Tag and push the architecture specific images
docker tag $IMAGE_ID_AMD64 $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 docker tag $IMAGE_ID_AMD64 $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64
docker tag $IMAGE_ID_ARM64V8 $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 docker tag $IMAGE_ID_ARM64V8 $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker push $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 docker push $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64
docker push $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 docker push $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
# Tag the multi-arch image # Tag the multi-arch image
docker manifest create $IMAGE_NAME:$GITHUB_SHA --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 docker manifest create $IMAGE_NAME:$GITHUB_SHA --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker manifest push $IMAGE_NAME:$GITHUB_SHA docker manifest push $IMAGE_NAME:$GITHUB_SHA
# Tag and push the git ref # Tag and push the git ref
docker manifest create $IMAGE_NAME:$GITHUB_REF_NAME --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 docker manifest create $IMAGE_NAME:$GITHUB_REF_NAME --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker manifest push $IMAGE_NAME:$GITHUB_REF_NAME docker manifest push $IMAGE_NAME:$GITHUB_REF_NAME
# Tag "main" as latest (stable branch) # Tag "main" as latest (stable branch)
if [[ "$GITHUB_REF_NAME" = "main" ]]; then if [[ "$GITHUB_REF_NAME" = "main" ]]; then
docker manifest create $IMAGE_NAME:latest --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 docker manifest create $IMAGE_NAME:latest --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker manifest push $IMAGE_NAME:latest docker manifest push $IMAGE_NAME:latest
fi fi
- name: Publish to GitHub Container Registry - name: Publish to GitHub Container Registry
if: github.event_name != 'pull_request' if: github.event_name != 'pull_request'
env: env:
IMAGE_NAME: ghcr.io/${{ github.repository }} IMAGE_NAME: ghcr.io/${{ github.repository }}
IMAGE_SUFFIX_AMD64: amd64 IMAGE_SUFFIX_AMD64: amd64
IMAGE_SUFFIX_ARM64V8: arm64v8 IMAGE_SUFFIX_ARM64V8: arm64v8
run: | run: |
docker load -i oci-image-amd64.tar.gz docker load -i oci-image-amd64.tar.gz
IMAGE_ID_AMD64=$(docker images -q conduit:main) IMAGE_ID_AMD64=$(docker images -q conduit:main)
docker load -i oci-image-arm64v8.tar.gz docker load -i oci-image-arm64v8.tar.gz
IMAGE_ID_ARM64V8=$(docker images -q conduit:main) IMAGE_ID_ARM64V8=$(docker images -q conduit:main)
# Tag and push the architecture specific images # Tag and push the architecture specific images
docker tag $IMAGE_ID_AMD64 $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 docker tag $IMAGE_ID_AMD64 $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64
docker tag $IMAGE_ID_ARM64V8 $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 docker tag $IMAGE_ID_ARM64V8 $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker push $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 docker push $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64
docker push $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 docker push $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
# Tag the multi-arch image # Tag the multi-arch image
docker manifest create $IMAGE_NAME:$GITHUB_SHA --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 docker manifest create $IMAGE_NAME:$GITHUB_SHA --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker manifest push $IMAGE_NAME:$GITHUB_SHA docker manifest push $IMAGE_NAME:$GITHUB_SHA
# Tag and push the git ref # Tag and push the git ref
docker manifest create $IMAGE_NAME:$GITHUB_REF_NAME --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 docker manifest create $IMAGE_NAME:$GITHUB_REF_NAME --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker manifest push $IMAGE_NAME:$GITHUB_REF_NAME docker manifest push $IMAGE_NAME:$GITHUB_REF_NAME
# Tag "main" as latest (stable branch) # Tag "main" as latest (stable branch)
if [[ -n "$GITHUB_REF_NAME" = "main" ]]; then if [[ "$GITHUB_REF_NAME" = "main" ]]; then
docker manifest create $IMAGE_NAME:latest --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 docker manifest create $IMAGE_NAME:latest --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker manifest push $IMAGE_NAME:latest docker manifest push $IMAGE_NAME:latest
fi fi

35
Cargo.lock generated
View file

@ -111,6 +111,17 @@ dependencies = [
"zstd-safe", "zstd-safe",
] ]
[[package]]
name = "async-recursion"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30c5ef0ede93efbf733c1a727f3b6b5a1060bbedd5600183e66f6e4be4af0ec5"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
]
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.80" version = "0.1.80"
@ -1045,9 +1056,9 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]] [[package]]
name = "hickory-proto" name = "hickory-proto"
version = "0.24.0" version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/hickory-dns/hickory-dns?rev=94ac564c3f677e038f7255ddb762e9301d0f2c5d#94ac564c3f677e038f7255ddb762e9301d0f2c5d"
checksum = "091a6fbccf4860009355e3efc52ff4acf37a63489aad7435372d44ceeb6fbbcf"
dependencies = [ dependencies = [
"async-recursion",
"async-trait", "async-trait",
"cfg-if", "cfg-if",
"data-encoding", "data-encoding",
@ -1055,7 +1066,7 @@ dependencies = [
"futures-channel", "futures-channel",
"futures-io", "futures-io",
"futures-util", "futures-util",
"idna 0.4.0", "idna",
"ipnet", "ipnet",
"once_cell", "once_cell",
"rand", "rand",
@ -1069,8 +1080,7 @@ dependencies = [
[[package]] [[package]]
name = "hickory-resolver" name = "hickory-resolver"
version = "0.24.0" version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/hickory-dns/hickory-dns?rev=94ac564c3f677e038f7255ddb762e9301d0f2c5d#94ac564c3f677e038f7255ddb762e9301d0f2c5d"
checksum = "35b8f021164e6a984c9030023544c57789c51760065cd510572fedcfb04164e8"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"futures-util", "futures-util",
@ -1311,16 +1321,6 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "idna"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]] [[package]]
name = "idna" name = "idna"
version = "0.5.0" version = "0.5.0"
@ -2285,8 +2285,7 @@ checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56"
[[package]] [[package]]
name = "reqwest" name = "reqwest"
version = "0.11.27" version = "0.11.27"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/girlbossceo/reqwest?rev=319335e000fdea2e3d01f44245c8a21864d0c1c3#319335e000fdea2e3d01f44245c8a21864d0c1c3"
checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62"
dependencies = [ dependencies = [
"async-compression", "async-compression",
"base64 0.21.7", "base64 0.21.7",
@ -3763,7 +3762,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633"
dependencies = [ dependencies = [
"form_urlencoded", "form_urlencoded",
"idna 0.5.0", "idna",
"percent-encoding", "percent-encoding",
"serde", "serde",
] ]

View file

@ -29,9 +29,6 @@ base64 = "0.22.0"
# Used when hashing the state # Used when hashing the state
ring = "0.17.8" ring = "0.17.8"
# Used when querying the SRV record of other servers
hickory-resolver = "0.24.0"
# Used to find matching events for appservices # Used to find matching events for appservices
regex = "1.10.4" regex = "1.10.4"
@ -107,9 +104,11 @@ version = "0.14"
features = ["server", "http1", "http2"] features = ["server", "http1", "http2"]
[dependencies.reqwest] [dependencies.reqwest]
version = "0.11.27" #version = "0.11.27"
git = "https://github.com/girlbossceo/reqwest"
rev = "319335e000fdea2e3d01f44245c8a21864d0c1c3"
default-features = false default-features = false
features = ["rustls-tls-native-roots", "socks", "trust-dns"] features = ["rustls-tls-native-roots", "socks", "hickory-dns"]
# all the serde stuff # all the serde stuff
# Used for pdu definition # Used for pdu definition
@ -272,6 +271,10 @@ features = [
"unstable-extensible-events", "unstable-extensible-events",
] ]
[dependencies.hickory-resolver]
git = "https://github.com/hickory-dns/hickory-dns"
rev = "94ac564c3f677e038f7255ddb762e9301d0f2c5d"
[dependencies.rust-rocksdb] [dependencies.rust-rocksdb]
git = "https://github.com/zaidoon1/rust-rocksdb" git = "https://github.com/zaidoon1/rust-rocksdb"
branch = "master" branch = "master"

View file

@ -477,19 +477,19 @@ allow_profile_lookup_federation_requests = true
# Minimum time-to-live in seconds for entries in the DNS cache. The default may appear high to most # Minimum time-to-live in seconds for entries in the DNS cache. The default may appear high to most
# administrators; this is by design. Only decrease this if you are using an external DNS cache. # administrators; this is by design. Only decrease this if you are using an external DNS cache.
#dns_min_ttl = 60 * 90 #dns_min_ttl = 10800
# Minimum time-to-live in seconds for NXDOMAIN entries in the DNS cache. This value is critical for # Minimum time-to-live in seconds for NXDOMAIN entries in the DNS cache. This value is critical for
# the server to federate efficiently. NXDOMAIN's are assumed to not be returning to the federation # the server to federate efficiently. NXDOMAIN's are assumed to not be returning to the federation
# and aggressively cached rather than constantly rechecked. # and aggressively cached rather than constantly rechecked.
#dns_min_ttl_nxdomain = 60 * 60 * 24 * 3 #dns_min_ttl_nxdomain = 86400
# The number of seconds to wait for a reply to a DNS query. Please note that recursive queries can # The number of seconds to wait for a reply to a DNS query. Please note that recursive queries can
# take up to several seconds for some domains, so this value should not be too low. # take up to several seconds for some domains, so this value should not be too low.
#dns_timeout = 5 #dns_timeout = 10
# Number of retries after a timeout. # Number of retries after a timeout.
#dns_attempts = 5 #dns_attempts = 10
# Fallback to TCP on DNS errors. Set this to false if unsupported by nameserver. # Fallback to TCP on DNS errors. Set this to false if unsupported by nameserver.
#dns_tcp_fallback = true #dns_tcp_fallback = true
@ -498,7 +498,7 @@ allow_profile_lookup_federation_requests = true
# This can avoid useless DNS queries if the first nameserver responds with NXDOMAIN or an empty NOERROR response. # This can avoid useless DNS queries if the first nameserver responds with NXDOMAIN or an empty NOERROR response.
# #
# The default is to query one nameserver and stop (false). # The default is to query one nameserver and stop (false).
#query_all_nameservers = false #query_all_nameservers = true
### Request Timeouts, Connection Timeouts, and Connection Pooling ### Request Timeouts, Connection Timeouts, and Connection Pooling

View file

@ -1554,7 +1554,8 @@ pub async fn create_invite_route(body: Ruma<create_invite::v2::Request>) -> Resu
.contains(&server.to_owned()) .contains(&server.to_owned())
{ {
warn!( warn!(
"Received federated/remote invite from banned server {sender_servername} for room ID {}. Rejecting.", "Received federated/remote invite from server {sender_servername} for room ID {} which has a banned \
server name. Rejecting.",
body.room_id body.room_id
); );
return Err(Error::BadRequest( return Err(Error::BadRequest(

View file

@ -100,7 +100,7 @@ pub struct Config {
pub dns_timeout: u64, pub dns_timeout: u64,
#[serde(default = "true_fn")] #[serde(default = "true_fn")]
pub dns_tcp_fallback: bool, pub dns_tcp_fallback: bool,
#[serde(default)] #[serde(default = "true_fn")]
pub query_all_nameservers: bool, pub query_all_nameservers: bool,
#[serde(default = "default_max_request_size")] #[serde(default = "default_max_request_size")]
pub max_request_size: u32, pub max_request_size: u32,
@ -851,13 +851,13 @@ fn default_cleanup_second_interval() -> u32 {
fn default_dns_cache_entries() -> u32 { 12288 } fn default_dns_cache_entries() -> u32 { 12288 }
fn default_dns_min_ttl() -> u64 { 60 * 90 } fn default_dns_min_ttl() -> u64 { 60 * 180 }
fn default_dns_min_ttl_nxdomain() -> u64 { 60 * 60 * 24 * 3 } fn default_dns_min_ttl_nxdomain() -> u64 { 60 * 60 * 24 }
fn default_dns_attempts() -> u16 { 5 } fn default_dns_attempts() -> u16 { 10 }
fn default_dns_timeout() -> u64 { 5 } fn default_dns_timeout() -> u64 { 10 }
fn default_max_request_size() -> u32 { fn default_max_request_size() -> u32 {
20 * 1024 * 1024 // Default to 20 MB 20 * 1024 * 1024 // Default to 20 MB

View file

@ -4,7 +4,7 @@ use crate::{
database::KeyValueDatabase, database::KeyValueDatabase,
service::{ service::{
self, self,
sending::{OutgoingKind, SendingEventType}, sending::{OutgoingDestination, SendingEventType},
}, },
services, utils, Error, Result, services, utils, Error, Result,
}; };
@ -12,7 +12,7 @@ use crate::{
impl service::sending::Data for KeyValueDatabase { impl service::sending::Data for KeyValueDatabase {
fn active_requests<'a>( fn active_requests<'a>(
&'a self, &'a self,
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingKind, SendingEventType)>> + 'a> { ) -> Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingDestination, SendingEventType)>> + 'a> {
Box::new( Box::new(
self.servercurrentevent_data self.servercurrentevent_data
.iter() .iter()
@ -21,7 +21,7 @@ impl service::sending::Data for KeyValueDatabase {
} }
fn active_requests_for<'a>( fn active_requests_for<'a>(
&'a self, outgoing_kind: &OutgoingKind, &'a self, outgoing_kind: &OutgoingDestination,
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a> { ) -> Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a> {
let prefix = outgoing_kind.get_prefix(); let prefix = outgoing_kind.get_prefix();
Box::new( Box::new(
@ -33,7 +33,7 @@ impl service::sending::Data for KeyValueDatabase {
fn delete_active_request(&self, key: Vec<u8>) -> Result<()> { self.servercurrentevent_data.remove(&key) } fn delete_active_request(&self, key: Vec<u8>) -> Result<()> { self.servercurrentevent_data.remove(&key) }
fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()> { fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()> {
let prefix = outgoing_kind.get_prefix(); let prefix = outgoing_kind.get_prefix();
for (key, _) in self.servercurrentevent_data.scan_prefix(prefix) { for (key, _) in self.servercurrentevent_data.scan_prefix(prefix) {
self.servercurrentevent_data.remove(&key)?; self.servercurrentevent_data.remove(&key)?;
@ -42,7 +42,7 @@ impl service::sending::Data for KeyValueDatabase {
Ok(()) Ok(())
} }
fn delete_all_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()> { fn delete_all_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()> {
let prefix = outgoing_kind.get_prefix(); let prefix = outgoing_kind.get_prefix();
for (key, _) in self.servercurrentevent_data.scan_prefix(prefix.clone()) { for (key, _) in self.servercurrentevent_data.scan_prefix(prefix.clone()) {
self.servercurrentevent_data.remove(&key).unwrap(); self.servercurrentevent_data.remove(&key).unwrap();
@ -55,7 +55,7 @@ impl service::sending::Data for KeyValueDatabase {
Ok(()) Ok(())
} }
fn queue_requests(&self, requests: &[(&OutgoingKind, SendingEventType)]) -> Result<Vec<Vec<u8>>> { fn queue_requests(&self, requests: &[(&OutgoingDestination, SendingEventType)]) -> Result<Vec<Vec<u8>>> {
let mut batch = Vec::new(); let mut batch = Vec::new();
let mut keys = Vec::new(); let mut keys = Vec::new();
for (outgoing_kind, event) in requests { for (outgoing_kind, event) in requests {
@ -79,7 +79,7 @@ impl service::sending::Data for KeyValueDatabase {
} }
fn queued_requests<'a>( fn queued_requests<'a>(
&'a self, outgoing_kind: &OutgoingKind, &'a self, outgoing_kind: &OutgoingDestination,
) -> Box<dyn Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a> { ) -> Box<dyn Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a> {
let prefix = outgoing_kind.get_prefix(); let prefix = outgoing_kind.get_prefix();
return Box::new( return Box::new(
@ -122,7 +122,7 @@ impl service::sending::Data for KeyValueDatabase {
} }
#[tracing::instrument(skip(key))] #[tracing::instrument(skip(key))]
fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(OutgoingKind, SendingEventType)> { fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(OutgoingDestination, SendingEventType)> {
// Appservices start with a plus // Appservices start with a plus
Ok::<_, Error>(if key.starts_with(b"+") { Ok::<_, Error>(if key.starts_with(b"+") {
let mut parts = key[1..].splitn(2, |&b| b == 0xFF); let mut parts = key[1..].splitn(2, |&b| b == 0xFF);
@ -136,7 +136,7 @@ fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(OutgoingKind,
.map_err(|_| Error::bad_database("Invalid server bytes in server_currenttransaction"))?; .map_err(|_| Error::bad_database("Invalid server bytes in server_currenttransaction"))?;
( (
OutgoingKind::Appservice(server), OutgoingDestination::Appservice(server),
if value.is_empty() { if value.is_empty() {
SendingEventType::Pdu(event.to_vec()) SendingEventType::Pdu(event.to_vec())
} else { } else {
@ -163,7 +163,7 @@ fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(OutgoingKind,
.ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?; .ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?;
( (
OutgoingKind::Push(user_id, pushkey_string), OutgoingDestination::Push(user_id, pushkey_string),
if value.is_empty() { if value.is_empty() {
SendingEventType::Pdu(event.to_vec()) SendingEventType::Pdu(event.to_vec())
} else { } else {
@ -183,7 +183,7 @@ fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(OutgoingKind,
.map_err(|_| Error::bad_database("Invalid server bytes in server_currenttransaction"))?; .map_err(|_| Error::bad_database("Invalid server bytes in server_currenttransaction"))?;
( (
OutgoingKind::Normal( OutgoingDestination::Normal(
ServerName::parse(server) ServerName::parse(server)
.map_err(|_| Error::bad_database("Invalid server string in server_currenttransaction"))?, .map_err(|_| Error::bad_database("Invalid server string in server_currenttransaction"))?,
), ),

View file

@ -1,20 +1,20 @@
use ruma::ServerName; use ruma::ServerName;
use super::{OutgoingKind, SendingEventType}; use super::{OutgoingDestination, SendingEventType};
use crate::Result; use crate::Result;
type OutgoingSendingIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingKind, SendingEventType)>> + 'a>; type OutgoingSendingIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingDestination, SendingEventType)>> + 'a>;
type SendingEventTypeIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a>; type SendingEventTypeIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a>;
pub trait Data: Send + Sync { pub trait Data: Send + Sync {
fn active_requests(&self) -> OutgoingSendingIter<'_>; fn active_requests(&self) -> OutgoingSendingIter<'_>;
fn active_requests_for(&self, outgoing_kind: &OutgoingKind) -> SendingEventTypeIter<'_>; fn active_requests_for(&self, outgoing_kind: &OutgoingDestination) -> SendingEventTypeIter<'_>;
fn delete_active_request(&self, key: Vec<u8>) -> Result<()>; fn delete_active_request(&self, key: Vec<u8>) -> Result<()>;
fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()>; fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()>;
fn delete_all_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()>; fn delete_all_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()>;
fn queue_requests(&self, requests: &[(&OutgoingKind, SendingEventType)]) -> Result<Vec<Vec<u8>>>; fn queue_requests(&self, requests: &[(&OutgoingDestination, SendingEventType)]) -> Result<Vec<Vec<u8>>>;
fn queued_requests<'a>( fn queued_requests<'a>(
&'a self, outgoing_kind: &OutgoingKind, &'a self, outgoing_kind: &OutgoingDestination,
) -> Box<dyn Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a>; ) -> Box<dyn Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a>;
fn mark_as_active(&self, events: &[(SendingEventType, Vec<u8>)]) -> Result<()>; fn mark_as_active(&self, events: &[(SendingEventType, Vec<u8>)]) -> Result<()>;
fn set_latest_educount(&self, server_name: &ServerName, educount: u64) -> Result<()>; fn set_latest_educount(&self, server_name: &ServerName, educount: u64) -> Result<()>;

View file

@ -1,6 +1,6 @@
use std::{ use std::{
cmp, cmp,
collections::{BTreeMap, HashMap, HashSet}, collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
fmt::Debug, fmt::Debug,
sync::Arc, sync::Arc,
time::{Duration, Instant}, time::{Duration, Instant},
@ -25,7 +25,7 @@ use ruma::{
events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType}, events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType},
push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId, push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId,
}; };
use tokio::sync::{mpsc, Mutex, Semaphore}; use tokio::sync::{oneshot, Mutex, Semaphore};
use tracing::{error, warn}; use tracing::{error, warn};
use crate::{service::presence::Presence, services, utils::calculate_hash, Config, Error, PduEvent, Result}; use crate::{service::presence::Presence, services, utils::calculate_hash, Config, Error, PduEvent, Result};
@ -42,15 +42,15 @@ pub struct Service {
/// The state for a given state hash. /// The state for a given state hash.
pub(super) maximum_requests: Arc<Semaphore>, pub(super) maximum_requests: Arc<Semaphore>,
pub sender: mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec<u8>)>, pub sender: loole::Sender<(OutgoingDestination, SendingEventType, Vec<u8>)>,
receiver: Mutex<mpsc::UnboundedReceiver<(OutgoingKind, SendingEventType, Vec<u8>)>>, receiver: Mutex<loole::Receiver<(OutgoingDestination, SendingEventType, Vec<u8>)>>,
startup_netburst: bool, startup_netburst: bool,
startup_netburst_keep: i64, startup_netburst_keep: i64,
timeout: u64, timeout: u64,
} }
#[derive(Clone, Debug, PartialEq, Eq, Hash)] #[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum OutgoingKind { pub enum OutgoingDestination {
Appservice(String), Appservice(String),
Push(OwnedUserId, String), // user and pushkey Push(OwnedUserId, String), // user and pushkey
Normal(OwnedServerName), Normal(OwnedServerName),
@ -65,14 +65,31 @@ pub enum SendingEventType {
} }
enum TransactionStatus { enum TransactionStatus {
/// Currently running (for the first time)
Running, Running,
Failed(u32, Instant), // number of times failed, time of last failure /// Failed, backing off for a retry
Retrying(u32), // number of times failed Failed {
failures: u32,
waker: Option<oneshot::Sender<()>>,
},
/// Currently retrying
Retrying {
/// number of times failed
failures: u32,
},
}
/// A control-flow enum to dictate what the handler should do after (trying to)
/// prepare a transaction
enum TransactionPrepOutcome {
Send(Vec<SendingEventType>),
Wake(OutgoingDestination),
Nothing,
} }
impl Service { impl Service {
pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> { pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> {
let (sender, receiver) = mpsc::unbounded_channel(); let (sender, receiver) = loole::unbounded();
Arc::new(Self { Arc::new(Self {
db, db,
sender, sender,
@ -86,7 +103,7 @@ impl Service {
#[tracing::instrument(skip(self, pdu_id, user, pushkey))] #[tracing::instrument(skip(self, pdu_id, user, pushkey))]
pub fn send_pdu_push(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> { pub fn send_pdu_push(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> {
let outgoing_kind = OutgoingKind::Push(user.to_owned(), pushkey); let outgoing_kind = OutgoingDestination::Push(user.to_owned(), pushkey);
let event = SendingEventType::Pdu(pdu_id.to_owned()); let event = SendingEventType::Pdu(pdu_id.to_owned());
let _cork = services().globals.db.cork()?; let _cork = services().globals.db.cork()?;
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
@ -99,7 +116,7 @@ impl Service {
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec<u8>) -> Result<()> { pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec<u8>) -> Result<()> {
let outgoing_kind = OutgoingKind::Appservice(appservice_id); let outgoing_kind = OutgoingDestination::Appservice(appservice_id);
let event = SendingEventType::Pdu(pdu_id); let event = SendingEventType::Pdu(pdu_id);
let _cork = services().globals.db.cork()?; let _cork = services().globals.db.cork()?;
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
@ -126,7 +143,7 @@ impl Service {
pub fn send_pdu_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I, pdu_id: &[u8]) -> Result<()> { pub fn send_pdu_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I, pdu_id: &[u8]) -> Result<()> {
let requests = servers let requests = servers
.into_iter() .into_iter()
.map(|server| (OutgoingKind::Normal(server), SendingEventType::Pdu(pdu_id.to_owned()))) .map(|server| (OutgoingDestination::Normal(server), SendingEventType::Pdu(pdu_id.to_owned())))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let _cork = services().globals.db.cork()?; let _cork = services().globals.db.cork()?;
let keys = self.db.queue_requests( let keys = self.db.queue_requests(
@ -146,7 +163,7 @@ impl Service {
#[tracing::instrument(skip(self, server, serialized))] #[tracing::instrument(skip(self, server, serialized))]
pub fn send_edu_server(&self, server: &ServerName, serialized: Vec<u8>) -> Result<()> { pub fn send_edu_server(&self, server: &ServerName, serialized: Vec<u8>) -> Result<()> {
let outgoing_kind = OutgoingKind::Normal(server.to_owned()); let outgoing_kind = OutgoingDestination::Normal(server.to_owned());
let event = SendingEventType::Edu(serialized); let event = SendingEventType::Edu(serialized);
let _cork = services().globals.db.cork()?; let _cork = services().globals.db.cork()?;
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
@ -173,7 +190,7 @@ impl Service {
pub fn send_edu_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I, serialized: Vec<u8>) -> Result<()> { pub fn send_edu_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I, serialized: Vec<u8>) -> Result<()> {
let requests = servers let requests = servers
.into_iter() .into_iter()
.map(|server| (OutgoingKind::Normal(server), SendingEventType::Edu(serialized.clone()))) .map(|server| (OutgoingDestination::Normal(server), SendingEventType::Edu(serialized.clone())))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let _cork = services().globals.db.cork()?; let _cork = services().globals.db.cork()?;
let keys = self.db.queue_requests( let keys = self.db.queue_requests(
@ -205,7 +222,7 @@ impl Service {
#[tracing::instrument(skip(self, servers))] #[tracing::instrument(skip(self, servers))]
pub fn flush_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I) -> Result<()> { pub fn flush_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I) -> Result<()> {
let requests = servers.into_iter().map(OutgoingKind::Normal); let requests = servers.into_iter().map(OutgoingDestination::Normal);
for outgoing_kind in requests { for outgoing_kind in requests {
self.sender self.sender
@ -221,7 +238,7 @@ impl Service {
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub fn cleanup_events(&self, appservice_id: String) -> Result<()> { pub fn cleanup_events(&self, appservice_id: String) -> Result<()> {
self.db self.db
.delete_all_requests_for(&OutgoingKind::Appservice(appservice_id))?; .delete_all_requests_for(&OutgoingDestination::Appservice(appservice_id))?;
Ok(()) Ok(())
} }
@ -274,14 +291,17 @@ impl Service {
#[tracing::instrument(skip(self), name = "sender")] #[tracing::instrument(skip(self), name = "sender")]
async fn handler(&self) -> Result<()> { async fn handler(&self) -> Result<()> {
let mut receiver = self.receiver.lock().await; let new_transactions = self.receiver.lock().await;
let (waking_sender, waking_receiver) = loole::unbounded();
let mut futures = FuturesUnordered::new(); let mut outgoing = FuturesUnordered::new();
let mut current_transaction_status = HashMap::<OutgoingKind, TransactionStatus>::new(); let mut retrying = FuturesUnordered::new();
let mut current_transaction_status = HashMap::<OutgoingDestination, TransactionStatus>::new();
// Retry requests we could not finish yet // Retry requests we could not finish yet
if self.startup_netburst { if self.startup_netburst {
let mut initial_transactions = HashMap::<OutgoingKind, Vec<SendingEventType>>::new(); let mut initial_transactions = HashMap::<OutgoingDestination, Vec<SendingEventType>>::new();
for (key, outgoing_kind, event) in self.db.active_requests().filter_map(Result::ok) { for (key, outgoing_kind, event) in self.db.active_requests().filter_map(Result::ok) {
let entry = initial_transactions let entry = initial_transactions
.entry(outgoing_kind.clone()) .entry(outgoing_kind.clone())
@ -300,13 +320,14 @@ impl Service {
for (outgoing_kind, events) in initial_transactions { for (outgoing_kind, events) in initial_transactions {
current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running); current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running);
futures.push(handle_events(outgoing_kind.clone(), events)); outgoing.push(handle_events(outgoing_kind.clone(), events));
} }
} }
loop { loop {
tokio::select! { tokio::select! {
Some(response) = futures.next() => { Some(response) = outgoing.next() => {
// Outgoing transaction succeeded
match response { match response {
Ok(outgoing_kind) => { Ok(outgoing_kind) => {
let _cork = services().globals.db.cork(); let _cork = services().globals.db.cork();
@ -322,51 +343,155 @@ impl Service {
if !new_events.is_empty() { if !new_events.is_empty() {
// Insert pdus we found // Insert pdus we found
self.db.mark_as_active(&new_events)?; self.db.mark_as_active(&new_events)?;
futures.push(handle_events(
outgoing_kind.clone(), // Clear retries
current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running);
outgoing.push(handle_events(
outgoing_kind,
new_events.into_iter().map(|(event, _)| event).collect(), new_events.into_iter().map(|(event, _)| event).collect(),
)); ));
} else { } else {
current_transaction_status.remove(&outgoing_kind); current_transaction_status.remove(&outgoing_kind);
} }
} }
Err((outgoing_kind, _)) => { // Outgoing transaction failed
current_transaction_status.entry(outgoing_kind).and_modify(|e| *e = match e { Err((destination, err)) => {
TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()), // Set status to Failed, create timer
TransactionStatus::Retrying(n) => TransactionStatus::Failed(*n+1, Instant::now()), let timer = Self::mark_failed_and_backoff(&mut current_transaction_status, destination.clone());
TransactionStatus::Failed(_, _) => {
error!("Request that was not even running failed?!"); // Add timer to loop
return retrying.push(timer);
},
}); warn!("Outgoing request to {destination} failed: {err}");
} }
}; };
}, },
Some((outgoing_kind, event, key)) = receiver.recv() => {
if let Ok(Some(events)) = self.select_events( // Transaction retry timers firing
&outgoing_kind, Some(dest) = retrying.next() => {
// Transition Failed => Retrying, return pending old transaction events
match self.select_events(
&dest,
vec![], // will be ignored because fresh == false
&mut current_transaction_status,
false,
) {
Ok(TransactionPrepOutcome::Send(events)) => {
outgoing.push(handle_events(dest, events));
}
Ok(_) => {
// Unreachable because fresh == false
unreachable!("select_events on a stale transaction {} did not return ::Send", dest)
}
Err(err) => {
error!("Ignoring error in (stale) outgoing request ({}) handler: {}", dest, err);
// transaction dropped, so drop destination as well.
current_transaction_status.remove(&dest);
}
}
},
// Explicit wakeups, makes a backoff timer return immediately
Ok(outgoing) = waking_receiver.recv_async() => {
if let Some(TransactionStatus::Failed { waker, .. }) = current_transaction_status.get_mut(&outgoing) {
if let Some(waker) = waker.take() {
_ = waker.send(());
}
}
},
// New transactions to be sent out (from server/user activity)
event = new_transactions.recv_async() => {
if let Ok((dest, event, key)) = event {
match self.select_events(
&dest,
vec![(event, key)], vec![(event, key)],
&mut current_transaction_status, &mut current_transaction_status,
) { true) {
futures.push(handle_events(outgoing_kind, events)); Ok(TransactionPrepOutcome::Send(events)) => {
outgoing.push(handle_events(dest, events));
},
Ok(TransactionPrepOutcome::Wake(dest)) => {
waking_sender.send(dest).expect("nothing closes this channel but ourselves");
},
Ok(TransactionPrepOutcome::Nothing) => {},
Err(err) => {
error!("Ignoring error in (fresh) outgoing request ({}) handler: {}", dest, err);
}
}
} }
} }
} }
} }
} }
/// Generates timer/oneshot, alters status to reflect Failed
///
/// Returns timer/oneshot future to wake up loop for next retry
fn mark_failed_and_backoff(
status: &mut HashMap<OutgoingDestination, TransactionStatus>, dest: OutgoingDestination,
) -> impl std::future::Future<Output = OutgoingDestination> {
let now = Instant::now();
let entry = status
.get_mut(&dest)
.expect("guaranteed to be set before this function");
let failures = match entry {
// Running -> Failed
TransactionStatus::Running => 1,
// Retrying -> Failed
TransactionStatus::Retrying {
failures,
} => *failures + 1,
// The transition of Failed -> Retrying is handled by handle_events
TransactionStatus::Failed {
..
} => {
unreachable!(
"TransactionStatus in inconsistent state: Expected either Running or Retrying, got Failed, \
bailing..."
)
},
};
const ONE_DAY: Duration = Duration::from_secs(60 * 60 * 24);
// Exponential backoff, clamp upper value to one day
let next_wakeup = now + (Duration::from_secs(30) * failures * failures).min(ONE_DAY);
let (fut, waker) = dest.wrap_in_interruptible_sleep(next_wakeup);
*entry = TransactionStatus::Failed {
failures,
waker: Some(waker),
};
fut
}
/// This prepares a transaction, checks the transaction state, and selects
/// appropriate events.
#[tracing::instrument(skip(self, outgoing_kind, new_events, current_transaction_status))] #[tracing::instrument(skip(self, outgoing_kind, new_events, current_transaction_status))]
fn select_events( fn select_events(
&self, &self,
outgoing_kind: &OutgoingKind, outgoing_kind: &OutgoingDestination,
new_events: Vec<(SendingEventType, Vec<u8>)>, // Events we want to send: event and full key new_events: Vec<(SendingEventType, Vec<u8>)>, // Events we want to send: event and full key
current_transaction_status: &mut HashMap<OutgoingKind, TransactionStatus>, current_transaction_status: &mut HashMap<OutgoingDestination, TransactionStatus>,
) -> Result<Option<Vec<SendingEventType>>> { fresh: bool, // Wether or not this transaction came from server activity.
let (allow, retry) = self.select_events_current(outgoing_kind.clone(), current_transaction_status)?; ) -> Result<TransactionPrepOutcome> {
let (allow, retry, wake_up) =
self.select_events_current(outgoing_kind.clone(), current_transaction_status, fresh)?;
// Nothing can be done for this remote, bail out. // Nothing can be done for this remote, bail out.
if !allow { if wake_up {
return Ok(None); return Ok(TransactionPrepOutcome::Wake(outgoing_kind.clone()));
} else if !allow {
return Ok(TransactionPrepOutcome::Nothing);
} }
let _cork = services().globals.db.cork(); let _cork = services().globals.db.cork();
@ -374,12 +499,14 @@ impl Service {
// Must retry any previous transaction for this remote. // Must retry any previous transaction for this remote.
if retry { if retry {
self.db // We retry the previous transaction
for (_, e) in self
.db
.active_requests_for(outgoing_kind) .active_requests_for(outgoing_kind)
.filter_map(Result::ok) .filter_map(Result::ok)
.for_each(|(_, e)| events.push(e)); {
events.push(e);
return Ok(Some(events)); }
} }
// Compose the next transaction // Compose the next transaction
@ -392,43 +519,79 @@ impl Service {
} }
// Add EDU's into the transaction // Add EDU's into the transaction
if let OutgoingKind::Normal(server_name) = outgoing_kind { if let OutgoingDestination::Normal(server_name) = outgoing_kind {
if let Ok((select_edus, last_count)) = self.select_edus(server_name) { if let Ok((select_edus, last_count)) = self.select_edus(server_name) {
events.extend(select_edus.into_iter().map(SendingEventType::Edu)); events.extend(select_edus.into_iter().map(SendingEventType::Edu));
self.db.set_latest_educount(server_name, last_count)?; self.db.set_latest_educount(server_name, last_count)?;
} }
} }
Ok(Some(events)) Ok(TransactionPrepOutcome::Send(events))
} }
#[tracing::instrument(skip(self, outgoing_kind, current_transaction_status))] #[tracing::instrument(skip(self, outgoing_kind, current_transaction_status))]
fn select_events_current( fn select_events_current(
&self, outgoing_kind: OutgoingKind, current_transaction_status: &mut HashMap<OutgoingKind, TransactionStatus>, &self, outgoing_kind: OutgoingDestination,
) -> Result<(bool, bool)> { current_transaction_status: &mut HashMap<OutgoingDestination, TransactionStatus>, fresh: bool,
let (mut allow, mut retry) = (true, false); ) -> Result<(bool, bool, bool)> {
current_transaction_status let (mut allow, mut retry, mut wake_up) = (true, false, false);
.entry(outgoing_kind)
.and_modify(|e| match e { let entry = current_transaction_status.entry(outgoing_kind);
TransactionStatus::Failed(tries, time) => {
// Fail if a request has failed recently (exponential backoff) if fresh {
const MAX_DURATION: Duration = Duration::from_secs(60 * 60 * 24); // If its fresh, we initialise the status if we need to.
let mut min_elapsed_duration = Duration::from_secs(self.timeout) * (*tries) * (*tries); //
min_elapsed_duration = cmp::min(min_elapsed_duration, MAX_DURATION); // We do nothing if it is already running or retrying.
if time.elapsed() < min_elapsed_duration { //
allow = false; // We return with a wake if it is in the Failed state.
} else { entry
retry = true; .and_modify(|e| match e {
*e = TransactionStatus::Retrying(*tries); TransactionStatus::Running
| TransactionStatus::Retrying {
..
} => {
allow = false; // already running
},
TransactionStatus::Failed {
..
} => {
// currently sleeping
wake_up = true;
},
})
.or_insert(TransactionStatus::Running);
} else {
// If it's not fresh, we expect an entry.
//
// We also expect us to be the only one who are touching this destination right
// now, and its a stale transaction, so it must be in the Failed state
match entry {
Entry::Occupied(mut e) => {
let e = e.get_mut();
match e {
TransactionStatus::Failed {
failures,
..
} => {
*e = TransactionStatus::Retrying {
failures: *failures,
};
retry = true;
},
_ => unreachable!(
"Encountered bad state when preparing stale transaction: expected Failed state, got \
Running or Retrying"
),
} }
}, },
TransactionStatus::Running | TransactionStatus::Retrying(_) => { Entry::Vacant(_) => unreachable!(
allow = false; // already running "Encountered bad state when preparing stale transaction: expected Failed state, got vacant state"
}, ),
}) }
.or_insert(TransactionStatus::Running); }
Ok((allow, retry)) Ok((allow, retry, wake_up))
} }
#[tracing::instrument(skip(self, server_name))] #[tracing::instrument(skip(self, server_name))]
@ -594,19 +757,21 @@ pub fn select_edus_receipts(
} }
async fn handle_events( async fn handle_events(
kind: OutgoingKind, events: Vec<SendingEventType>, kind: OutgoingDestination, events: Vec<SendingEventType>,
) -> Result<OutgoingKind, (OutgoingKind, Error)> { ) -> Result<OutgoingDestination, (OutgoingDestination, Error)> {
match kind { match kind {
OutgoingKind::Appservice(ref id) => handle_events_kind_appservice(&kind, id, events).await, OutgoingDestination::Appservice(ref id) => handle_events_kind_appservice(&kind, id, events).await,
OutgoingKind::Push(ref userid, ref pushkey) => handle_events_kind_push(&kind, userid, pushkey, events).await, OutgoingDestination::Push(ref userid, ref pushkey) => {
OutgoingKind::Normal(ref server) => handle_events_kind_normal(&kind, server, events).await, handle_events_kind_push(&kind, userid, pushkey, events).await
},
OutgoingDestination::Normal(ref server) => handle_events_kind_normal(&kind, server, events).await,
} }
} }
#[tracing::instrument(skip(kind, events))] #[tracing::instrument(skip(kind, events))]
async fn handle_events_kind_appservice( async fn handle_events_kind_appservice(
kind: &OutgoingKind, id: &String, events: Vec<SendingEventType>, kind: &OutgoingDestination, id: &String, events: Vec<SendingEventType>,
) -> Result<OutgoingKind, (OutgoingKind, Error)> { ) -> Result<OutgoingDestination, (OutgoingDestination, Error)> {
let mut pdu_jsons = Vec::new(); let mut pdu_jsons = Vec::new();
for event in &events { for event in &events {
@ -674,8 +839,8 @@ async fn handle_events_kind_appservice(
#[tracing::instrument(skip(kind, events))] #[tracing::instrument(skip(kind, events))]
async fn handle_events_kind_push( async fn handle_events_kind_push(
kind: &OutgoingKind, userid: &OwnedUserId, pushkey: &String, events: Vec<SendingEventType>, kind: &OutgoingDestination, userid: &OwnedUserId, pushkey: &String, events: Vec<SendingEventType>,
) -> Result<OutgoingKind, (OutgoingKind, Error)> { ) -> Result<OutgoingDestination, (OutgoingDestination, Error)> {
let mut pdus = Vec::new(); let mut pdus = Vec::new();
for event in &events { for event in &events {
@ -715,7 +880,7 @@ async fn handle_events_kind_push(
let Some(pusher) = services() let Some(pusher) = services()
.pusher .pusher
.get_pusher(userid, pushkey) .get_pusher(userid, pushkey)
.map_err(|e| (kind.clone(), e))? .map_err(|e| (OutgoingDestination::Push(userid.clone(), pushkey.clone()), e))?
else { else {
continue; continue;
}; };
@ -752,8 +917,8 @@ async fn handle_events_kind_push(
#[tracing::instrument(skip(kind, events), name = "")] #[tracing::instrument(skip(kind, events), name = "")]
async fn handle_events_kind_normal( async fn handle_events_kind_normal(
kind: &OutgoingKind, dest: &OwnedServerName, events: Vec<SendingEventType>, kind: &OutgoingDestination, dest: &OwnedServerName, events: Vec<SendingEventType>,
) -> Result<OutgoingKind, (OutgoingKind, Error)> { ) -> Result<OutgoingDestination, (OutgoingDestination, Error)> {
let mut edu_jsons = Vec::new(); let mut edu_jsons = Vec::new();
let mut pdu_jsons = Vec::new(); let mut pdu_jsons = Vec::new();
@ -826,23 +991,23 @@ async fn handle_events_kind_normal(
response response
} }
impl OutgoingKind { impl OutgoingDestination {
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub fn get_prefix(&self) -> Vec<u8> { pub fn get_prefix(&self) -> Vec<u8> {
let mut prefix = match self { let mut prefix = match self {
OutgoingKind::Appservice(server) => { OutgoingDestination::Appservice(server) => {
let mut p = b"+".to_vec(); let mut p = b"+".to_vec();
p.extend_from_slice(server.as_bytes()); p.extend_from_slice(server.as_bytes());
p p
}, },
OutgoingKind::Push(user, pushkey) => { OutgoingDestination::Push(user, pushkey) => {
let mut p = b"$".to_vec(); let mut p = b"$".to_vec();
p.extend_from_slice(user.as_bytes()); p.extend_from_slice(user.as_bytes());
p.push(0xFF); p.push(0xFF);
p.extend_from_slice(pushkey.as_bytes()); p.extend_from_slice(pushkey.as_bytes());
p p
}, },
OutgoingKind::Normal(server) => { OutgoingDestination::Normal(server) => {
let mut p = Vec::new(); let mut p = Vec::new();
p.extend_from_slice(server.as_bytes()); p.extend_from_slice(server.as_bytes());
p p
@ -852,4 +1017,40 @@ impl OutgoingKind {
prefix prefix
} }
/// This wraps the OutgoingDestination key in an interruptible sleep future.
///
/// The first return value is the future, the second is the oneshot that
/// interrupts that future, and causes it to return instantly.
fn wrap_in_interruptible_sleep(
self, at: Instant,
) -> (impl std::future::Future<Output = Self>, oneshot::Sender<()>) {
let (tx, rx) = oneshot::channel();
let at = tokio::time::Instant::from_std(at);
(
async move {
_ = tokio::time::timeout_at(at, rx).await;
self
},
tx,
)
}
}
impl std::fmt::Display for OutgoingDestination {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OutgoingDestination::Appservice(appservice_id) => {
write!(f, "Appservice (ID {:?})", appservice_id)
},
OutgoingDestination::Push(user, push_key) => {
write!(f, "User Push Service (for {:?}, with key {:?})", user, push_key)
},
OutgoingDestination::Normal(server) => {
write!(f, "Matrix Server ({:?})", server)
},
}
}
} }