diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9f2cc992..8b0d4e32 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -26,11 +26,9 @@ permissions: contents: read jobs: - ci: - name: CI and Artifacts - + setup: + name: CI Setup runs-on: ubuntu-latest - steps: - name: Sync repository uses: actions/checkout@v4 @@ -94,291 +92,175 @@ jobs: ./bin/nix-build-and-cache .#devShells.x86_64-linux.default.inputDerivation + build-and-test: + name: CI and Artifacts + needs: setup + runs-on: ubuntu-latest + strategy: + matrix: + target: [ + "static-x86_64-unknown-linux-musl", + "static-x86_64-unknown-linux-musl-jemalloc", + "static-x86_64-unknown-linux-musl-hmalloc", + "static-aarch64-unknown-linux-musl", + "static-aarch64-unknown-linux-musl-jemalloc", + "static-aarch64-unknown-linux-musl-hmalloc", + ] + oci-target: [ + "x86_64-unknown-linux-gnu", + "x86_64-unknown-linux-musl", + "x86_64-unknown-linux-musl-jemalloc", + "x86_64-unknown-linux-musl-hmalloc", + "aarch64-unknown-linux-musl", + "aarch64-unknown-linux-musl-jemalloc", + "aarch64-unknown-linux-musl-hmalloc", + ] + + steps: - name: Perform continuous integration run: direnv exec . engage - - name: Build static-x86_64-unknown-linux-musl and Create static deb-x86_64-unknown-linux-musl + - name: Build static artifacts run: | - ./bin/nix-build-and-cache .#static-x86_64-unknown-linux-musl + ./bin/nix-build-and-cache .#${{ matrix.target }} mkdir -p target/release cp -v -f result/bin/conduit target/release - direnv exec . cargo deb --no-build + direnv exec . cargo deb --no-build --output target/debian/${{ matrix.target }}.deb - - name: Upload artifact static-x86_64-unknown-linux-musl + - name: Upload static artifacts uses: actions/upload-artifact@v4 with: - name: static-x86_64-unknown-linux-musl + name: ${{ matrix.target }} path: result/bin/conduit if-no-files-found: error - - name: Upload artifact deb-x86_64-unknown-linux-musl + - name: Upload static deb artifacts uses: actions/upload-artifact@v4 with: - name: x86_64-unknown-linux-musl.deb - path: target/debian/*.deb + name: ${{ matrix.target }}.deb + path: target/debian/${{ matrix.target }}.deb if-no-files-found: error - - name: Build static-x86_64-unknown-linux-musl-jemalloc and Create static deb-x86_64-unknown-linux-musl-jemalloc + + - name: Build OCI images run: | - ./bin/nix-build-and-cache .#static-x86_64-unknown-linux-musl-jemalloc - mkdir -p target/release - cp -v -f result/bin/conduit target/release - direnv exec . cargo deb --no-build + ./bin/nix-build-and-cache .#oci-image-${{ matrix.oci-target }} + cp -v -f result oci-image-${{ matrix.oci-target }}.tar.gz - - name: Upload artifact static-x86_64-unknown-linux-musl-jemalloc + - name: Upload OCI image artifacts uses: actions/upload-artifact@v4 with: - name: static-x86_64-unknown-linux-musl-jemalloc - path: result/bin/conduit - if-no-files-found: error - - - name: Upload artifact deb-x86_64-unknown-linux-musl-jemalloc - uses: actions/upload-artifact@v4 - with: - name: x86_64-unknown-linux-musl-jemalloc.deb - path: target/debian/*.deb - if-no-files-found: error - - - name: Build static-x86_64-unknown-linux-musl-hmalloc and Create static deb-x86_64-unknown-linux-musl-hmalloc - run: | - ./bin/nix-build-and-cache .#static-x86_64-unknown-linux-musl-hmalloc - mkdir -p target/release - cp -v -f result/bin/conduit target/release - direnv exec . cargo deb --no-build - - - name: Upload artifact static-x86_64-unknown-linux-musl-hmalloc - uses: actions/upload-artifact@v4 - with: - name: static-x86_64-unknown-linux-musl-hmalloc - path: result/bin/conduit - if-no-files-found: error - - - name: Upload artifact deb-x86_64-unknown-linux-musl-hmalloc - uses: actions/upload-artifact@v4 - with: - name: x86_64-unknown-linux-musl-hmalloc.deb - path: target/debian/*.deb - if-no-files-found: error - - - - name: Build static-aarch64-unknown-linux-musl - run: | - ./bin/nix-build-and-cache .#static-aarch64-unknown-linux-musl - - - name: Upload artifact static-aarch64-unknown-linux-musl - uses: actions/upload-artifact@v4 - with: - name: static-aarch64-unknown-linux-musl - path: result/bin/conduit - if-no-files-found: error - - - name: Build static-aarch64-unknown-linux-musl-jemalloc - run: | - ./bin/nix-build-and-cache .#static-aarch64-unknown-linux-musl-jemalloc - - - name: Upload artifact static-aarch64-unknown-linux-musl-jemalloc - uses: actions/upload-artifact@v4 - with: - name: static-aarch64-unknown-linux-musl-jemalloc - path: result/bin/conduit - if-no-files-found: error - - - name: Build static-aarch64-unknown-linux-musl-hmalloc - run: | - ./bin/nix-build-and-cache .#static-aarch64-unknown-linux-musl-hmalloc - - - name: Upload artifact static-aarch64-unknown-linux-musl-hmalloc - uses: actions/upload-artifact@v4 - with: - name: static-aarch64-unknown-linux-musl-hmalloc - path: result/bin/conduit - if-no-files-found: error - - - - name: Build oci-image-x86_64-unknown-linux-gnu - run: | - ./bin/nix-build-and-cache .#oci-image - cp -v -f result oci-image-amd64.tar.gz - - - name: Upload artifact oci-image-x86_64-unknown-linux-gnu - uses: actions/upload-artifact@v4 - with: - name: oci-image-x86_64-unknown-linux-gnu - path: oci-image-amd64.tar.gz - if-no-files-found: error - # don't compress again - compression-level: 0 - - - name: Build oci-image-x86_64-unknown-linux-gnu-jemalloc - run: | - ./bin/nix-build-and-cache .#oci-image-jemalloc - cp -v -f result oci-image-amd64.tar.gz - - - name: Upload artifact oci-image-x86_64-unknown-linux-gnu-jemalloc - uses: actions/upload-artifact@v4 - with: - name: oci-image-x86_64-unknown-linux-gnu-jemalloc - path: oci-image-amd64.tar.gz - if-no-files-found: error - # don't compress again - compression-level: 0 - - - name: Build oci-image-x86_64-unknown-linux-gnu-hmalloc - run: | - ./bin/nix-build-and-cache .#oci-image-hmalloc - cp -v -f result oci-image-amd64.tar.gz - - - name: Upload artifact oci-image-x86_64-unknown-linux-gnu-hmalloc - uses: actions/upload-artifact@v4 - with: - name: oci-image-x86_64-unknown-linux-gnu-hmalloc - path: oci-image-amd64.tar.gz + name: oci-image-${{ matrix.oci-target }} + path: oci-image-${{ matrix.oci-target }}.tar.gz if-no-files-found: error # don't compress again compression-level: 0 - - name: Build oci-image-aarch64-unknown-linux-musl - run: | - ./bin/nix-build-and-cache .#oci-image-aarch64-unknown-linux-musl - cp -v -f result oci-image-arm64v8.tar.gz - - - name: Upload artifact oci-image-aarch64-unknown-linux-musl - uses: actions/upload-artifact@v4 - with: - name: oci-image-aarch64-unknown-linux-musl - path: oci-image-arm64v8.tar.gz - if-no-files-found: error - # don't compress again - compression-level: 0 - - - name: Build oci-image-aarch64-unknown-linux-musl-jemalloc - run: | - ./bin/nix-build-and-cache .#oci-image-aarch64-unknown-linux-musl-jemalloc - cp -v -f result oci-image-arm64v8.tar.gz - - - name: Upload artifact oci-image-aarch64-unknown-linux-musl-jemalloc - uses: actions/upload-artifact@v4 - with: - name: oci-image-aarch64-unknown-linux-musl-jemalloc - path: oci-image-arm64v8.tar.gz - if-no-files-found: error - # don't compress again - compression-level: 0 - - - name: Build oci-image-aarch64-unknown-linux-musl-hmalloc - run: | - ./bin/nix-build-and-cache .#oci-image-aarch64-unknown-linux-musl-hmalloc - cp -v -f result oci-image-arm64v8.tar.gz - - - name: Upload artifact oci-image-aarch64-unknown-linux-musl-hmalloc - uses: actions/upload-artifact@v4 - with: - name: oci-image-aarch64-unknown-linux-musl-hmalloc - path: oci-image-arm64v8.tar.gz - if-no-files-found: error - # don't compress again - compression-level: 0 - - name: Extract metadata for Dockerhub - env: - REGISTRY: registry.hub.docker.com - IMAGE_NAME: ${{ github.repository }} - id: meta-dockerhub - uses: docker/metadata-action@v5 - with: - images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + publish: + needs: build-and-test + runs-on: ubuntu-latest + steps: + - name: Extract metadata for Dockerhub + env: + REGISTRY: registry.hub.docker.com + IMAGE_NAME: ${{ github.repository }} + id: meta-dockerhub + uses: docker/metadata-action@v5 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} - - name: Extract metadata for GitHub Container Registry - env: - REGISTRY: ghcr.io - IMAGE_NAME: ${{ github.repository }} - id: meta-ghcr - uses: docker/metadata-action@v5 - with: - images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} + - name: Extract metadata for GitHub Container Registry + env: + REGISTRY: ghcr.io + IMAGE_NAME: ${{ github.repository }} + id: meta-ghcr + uses: docker/metadata-action@v5 + with: + images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }} - - name: Login to Dockerhub - env: - DOCKERHUB_TOKEN: ${{ secrets.DOCKERHUB_TOKEN }} - DOCKER_USERNAME: ${{ vars.DOCKER_USERNAME }} - if: ${{ (github.event_name != 'pull_request') && (env.DOCKER_USERNAME != '') && (env.DOCKERHUB_TOKEN != '') }} - uses: docker/login-action@v3 - with: - # username is not really a secret - username: ${{ vars.DOCKER_USERNAME }} - password: ${{ secrets.DOCKERHUB_TOKEN }} + - name: Login to Dockerhub + env: + DOCKERHUB_TOKEN: ${{ secrets.DOCKERHUB_TOKEN }} + DOCKER_USERNAME: ${{ vars.DOCKER_USERNAME }} + if: ${{ (github.event_name != 'pull_request') && (env.DOCKER_USERNAME != '') && (env.DOCKERHUB_TOKEN != '') }} + uses: docker/login-action@v3 + with: + # username is not really a secret + username: ${{ vars.DOCKER_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} - - name: Login to GitHub Container Registry - if: github.event_name != 'pull_request' - uses: docker/login-action@v3 - env: - REGISTRY: ghcr.io - with: - registry: ${{ env.REGISTRY }} - username: ${{ github.repository_owner }} - password: ${{ secrets.GITHUB_TOKEN }} + - name: Login to GitHub Container Registry + if: github.event_name != 'pull_request' + uses: docker/login-action@v3 + env: + REGISTRY: ghcr.io + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.repository_owner }} + password: ${{ secrets.GITHUB_TOKEN }} - - name: Publish to Dockerhub - env: - DOCKERHUB_TOKEN: ${{ secrets.DOCKERHUB_TOKEN }} - DOCKER_USERNAME: ${{ vars.DOCKER_USERNAME }} - IMAGE_NAME: docker.io/${{ github.repository }} - IMAGE_SUFFIX_AMD64: amd64 - IMAGE_SUFFIX_ARM64V8: arm64v8 - if: ${{ (github.event_name != 'pull_request') && (env.DOCKER_USERNAME != '') && (env.DOCKERHUB_TOKEN != '') }} - run: | - docker load -i oci-image-amd64.tar.gz - IMAGE_ID_AMD64=$(docker images -q conduit:main) - docker load -i oci-image-arm64v8.tar.gz - IMAGE_ID_ARM64V8=$(docker images -q conduit:main) + - name: Publish to Dockerhub + env: + DOCKERHUB_TOKEN: ${{ secrets.DOCKERHUB_TOKEN }} + DOCKER_USERNAME: ${{ vars.DOCKER_USERNAME }} + IMAGE_NAME: docker.io/${{ github.repository }} + IMAGE_SUFFIX_AMD64: amd64 + IMAGE_SUFFIX_ARM64V8: arm64v8 + if: ${{ (github.event_name != 'pull_request') && (env.DOCKER_USERNAME != '') && (env.DOCKERHUB_TOKEN != '') }} + run: | + docker load -i oci-image-amd64.tar.gz + IMAGE_ID_AMD64=$(docker images -q conduit:main) + docker load -i oci-image-arm64v8.tar.gz + IMAGE_ID_ARM64V8=$(docker images -q conduit:main) - # Tag and push the architecture specific images - docker tag $IMAGE_ID_AMD64 $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 - docker tag $IMAGE_ID_ARM64V8 $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 - docker push $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 - docker push $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 - # Tag the multi-arch image - docker manifest create $IMAGE_NAME:$GITHUB_SHA --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 - docker manifest push $IMAGE_NAME:$GITHUB_SHA - # Tag and push the git ref - docker manifest create $IMAGE_NAME:$GITHUB_REF_NAME --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 - docker manifest push $IMAGE_NAME:$GITHUB_REF_NAME - # Tag "main" as latest (stable branch) - if [[ "$GITHUB_REF_NAME" = "main" ]]; then - docker manifest create $IMAGE_NAME:latest --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 - docker manifest push $IMAGE_NAME:latest - fi + # Tag and push the architecture specific images + docker tag $IMAGE_ID_AMD64 $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 + docker tag $IMAGE_ID_ARM64V8 $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 + docker push $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 + docker push $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 + # Tag the multi-arch image + docker manifest create $IMAGE_NAME:$GITHUB_SHA --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 + docker manifest push $IMAGE_NAME:$GITHUB_SHA + # Tag and push the git ref + docker manifest create $IMAGE_NAME:$GITHUB_REF_NAME --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 + docker manifest push $IMAGE_NAME:$GITHUB_REF_NAME + # Tag "main" as latest (stable branch) + if [[ "$GITHUB_REF_NAME" = "main" ]]; then + docker manifest create $IMAGE_NAME:latest --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 + docker manifest push $IMAGE_NAME:latest + fi - - name: Publish to GitHub Container Registry - if: github.event_name != 'pull_request' - env: - IMAGE_NAME: ghcr.io/${{ github.repository }} - IMAGE_SUFFIX_AMD64: amd64 - IMAGE_SUFFIX_ARM64V8: arm64v8 - run: | - docker load -i oci-image-amd64.tar.gz - IMAGE_ID_AMD64=$(docker images -q conduit:main) - docker load -i oci-image-arm64v8.tar.gz - IMAGE_ID_ARM64V8=$(docker images -q conduit:main) + - name: Publish to GitHub Container Registry + if: github.event_name != 'pull_request' + env: + IMAGE_NAME: ghcr.io/${{ github.repository }} + IMAGE_SUFFIX_AMD64: amd64 + IMAGE_SUFFIX_ARM64V8: arm64v8 + run: | + docker load -i oci-image-amd64.tar.gz + IMAGE_ID_AMD64=$(docker images -q conduit:main) + docker load -i oci-image-arm64v8.tar.gz + IMAGE_ID_ARM64V8=$(docker images -q conduit:main) - # Tag and push the architecture specific images - docker tag $IMAGE_ID_AMD64 $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 - docker tag $IMAGE_ID_ARM64V8 $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 - docker push $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 - docker push $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 - # Tag the multi-arch image - docker manifest create $IMAGE_NAME:$GITHUB_SHA --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 - docker manifest push $IMAGE_NAME:$GITHUB_SHA - # Tag and push the git ref - docker manifest create $IMAGE_NAME:$GITHUB_REF_NAME --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 - docker manifest push $IMAGE_NAME:$GITHUB_REF_NAME - # Tag "main" as latest (stable branch) - if [[ -n "$GITHUB_REF_NAME" = "main" ]]; then - docker manifest create $IMAGE_NAME:latest --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 - docker manifest push $IMAGE_NAME:latest - fi + # Tag and push the architecture specific images + docker tag $IMAGE_ID_AMD64 $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 + docker tag $IMAGE_ID_ARM64V8 $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 + docker push $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 + docker push $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 + # Tag the multi-arch image + docker manifest create $IMAGE_NAME:$GITHUB_SHA --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 + docker manifest push $IMAGE_NAME:$GITHUB_SHA + # Tag and push the git ref + docker manifest create $IMAGE_NAME:$GITHUB_REF_NAME --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 + docker manifest push $IMAGE_NAME:$GITHUB_REF_NAME + # Tag "main" as latest (stable branch) + if [[ "$GITHUB_REF_NAME" = "main" ]]; then + docker manifest create $IMAGE_NAME:latest --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 + docker manifest push $IMAGE_NAME:latest + fi diff --git a/Cargo.lock b/Cargo.lock index f0bec7f2..bdf9d374 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -111,6 +111,17 @@ dependencies = [ "zstd-safe", ] +[[package]] +name = "async-recursion" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30c5ef0ede93efbf733c1a727f3b6b5a1060bbedd5600183e66f6e4be4af0ec5" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.58", +] + [[package]] name = "async-trait" version = "0.1.80" @@ -1045,9 +1056,9 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" [[package]] name = "hickory-proto" version = "0.24.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "091a6fbccf4860009355e3efc52ff4acf37a63489aad7435372d44ceeb6fbbcf" +source = "git+https://github.com/hickory-dns/hickory-dns?rev=94ac564c3f677e038f7255ddb762e9301d0f2c5d#94ac564c3f677e038f7255ddb762e9301d0f2c5d" dependencies = [ + "async-recursion", "async-trait", "cfg-if", "data-encoding", @@ -1055,7 +1066,7 @@ dependencies = [ "futures-channel", "futures-io", "futures-util", - "idna 0.4.0", + "idna", "ipnet", "once_cell", "rand", @@ -1069,8 +1080,7 @@ dependencies = [ [[package]] name = "hickory-resolver" version = "0.24.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "35b8f021164e6a984c9030023544c57789c51760065cd510572fedcfb04164e8" +source = "git+https://github.com/hickory-dns/hickory-dns?rev=94ac564c3f677e038f7255ddb762e9301d0f2c5d#94ac564c3f677e038f7255ddb762e9301d0f2c5d" dependencies = [ "cfg-if", "futures-util", @@ -1311,16 +1321,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "idna" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c" -dependencies = [ - "unicode-bidi", - "unicode-normalization", -] - [[package]] name = "idna" version = "0.5.0" @@ -2285,8 +2285,7 @@ checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" [[package]] name = "reqwest" version = "0.11.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" +source = "git+https://github.com/girlbossceo/reqwest?rev=319335e000fdea2e3d01f44245c8a21864d0c1c3#319335e000fdea2e3d01f44245c8a21864d0c1c3" dependencies = [ "async-compression", "base64 0.21.7", @@ -3763,7 +3762,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" dependencies = [ "form_urlencoded", - "idna 0.5.0", + "idna", "percent-encoding", "serde", ] diff --git a/Cargo.toml b/Cargo.toml index fa0e6577..e9d6b11b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,9 +29,6 @@ base64 = "0.22.0" # Used when hashing the state ring = "0.17.8" -# Used when querying the SRV record of other servers -hickory-resolver = "0.24.0" - # Used to find matching events for appservices regex = "1.10.4" @@ -107,9 +104,11 @@ version = "0.14" features = ["server", "http1", "http2"] [dependencies.reqwest] -version = "0.11.27" +#version = "0.11.27" +git = "https://github.com/girlbossceo/reqwest" +rev = "319335e000fdea2e3d01f44245c8a21864d0c1c3" default-features = false -features = ["rustls-tls-native-roots", "socks", "trust-dns"] +features = ["rustls-tls-native-roots", "socks", "hickory-dns"] # all the serde stuff # Used for pdu definition @@ -272,6 +271,10 @@ features = [ "unstable-extensible-events", ] +[dependencies.hickory-resolver] +git = "https://github.com/hickory-dns/hickory-dns" +rev = "94ac564c3f677e038f7255ddb762e9301d0f2c5d" + [dependencies.rust-rocksdb] git = "https://github.com/zaidoon1/rust-rocksdb" branch = "master" diff --git a/conduwuit-example.toml b/conduwuit-example.toml index f077ebdf..65a012a9 100644 --- a/conduwuit-example.toml +++ b/conduwuit-example.toml @@ -477,19 +477,19 @@ allow_profile_lookup_federation_requests = true # Minimum time-to-live in seconds for entries in the DNS cache. The default may appear high to most # administrators; this is by design. Only decrease this if you are using an external DNS cache. -#dns_min_ttl = 60 * 90 +#dns_min_ttl = 10800 # Minimum time-to-live in seconds for NXDOMAIN entries in the DNS cache. This value is critical for # the server to federate efficiently. NXDOMAIN's are assumed to not be returning to the federation # and aggressively cached rather than constantly rechecked. -#dns_min_ttl_nxdomain = 60 * 60 * 24 * 3 +#dns_min_ttl_nxdomain = 86400 # The number of seconds to wait for a reply to a DNS query. Please note that recursive queries can # take up to several seconds for some domains, so this value should not be too low. -#dns_timeout = 5 +#dns_timeout = 10 # Number of retries after a timeout. -#dns_attempts = 5 +#dns_attempts = 10 # Fallback to TCP on DNS errors. Set this to false if unsupported by nameserver. #dns_tcp_fallback = true @@ -498,7 +498,7 @@ allow_profile_lookup_federation_requests = true # This can avoid useless DNS queries if the first nameserver responds with NXDOMAIN or an empty NOERROR response. # # The default is to query one nameserver and stop (false). -#query_all_nameservers = false +#query_all_nameservers = true ### Request Timeouts, Connection Timeouts, and Connection Pooling diff --git a/src/api/server_server.rs b/src/api/server_server.rs index ee57e0f6..12de8d45 100644 --- a/src/api/server_server.rs +++ b/src/api/server_server.rs @@ -1554,7 +1554,8 @@ pub async fn create_invite_route(body: Ruma) -> Resu .contains(&server.to_owned()) { warn!( - "Received federated/remote invite from banned server {sender_servername} for room ID {}. Rejecting.", + "Received federated/remote invite from server {sender_servername} for room ID {} which has a banned \ + server name. Rejecting.", body.room_id ); return Err(Error::BadRequest( diff --git a/src/config/mod.rs b/src/config/mod.rs index 8fe0823d..f6ec335a 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -100,7 +100,7 @@ pub struct Config { pub dns_timeout: u64, #[serde(default = "true_fn")] pub dns_tcp_fallback: bool, - #[serde(default)] + #[serde(default = "true_fn")] pub query_all_nameservers: bool, #[serde(default = "default_max_request_size")] pub max_request_size: u32, @@ -851,13 +851,13 @@ fn default_cleanup_second_interval() -> u32 { fn default_dns_cache_entries() -> u32 { 12288 } -fn default_dns_min_ttl() -> u64 { 60 * 90 } +fn default_dns_min_ttl() -> u64 { 60 * 180 } -fn default_dns_min_ttl_nxdomain() -> u64 { 60 * 60 * 24 * 3 } +fn default_dns_min_ttl_nxdomain() -> u64 { 60 * 60 * 24 } -fn default_dns_attempts() -> u16 { 5 } +fn default_dns_attempts() -> u16 { 10 } -fn default_dns_timeout() -> u64 { 5 } +fn default_dns_timeout() -> u64 { 10 } fn default_max_request_size() -> u32 { 20 * 1024 * 1024 // Default to 20 MB diff --git a/src/database/key_value/sending.rs b/src/database/key_value/sending.rs index 9f871ee7..78dfd197 100644 --- a/src/database/key_value/sending.rs +++ b/src/database/key_value/sending.rs @@ -4,7 +4,7 @@ use crate::{ database::KeyValueDatabase, service::{ self, - sending::{OutgoingKind, SendingEventType}, + sending::{OutgoingDestination, SendingEventType}, }, services, utils, Error, Result, }; @@ -12,7 +12,7 @@ use crate::{ impl service::sending::Data for KeyValueDatabase { fn active_requests<'a>( &'a self, - ) -> Box, OutgoingKind, SendingEventType)>> + 'a> { + ) -> Box, OutgoingDestination, SendingEventType)>> + 'a> { Box::new( self.servercurrentevent_data .iter() @@ -21,7 +21,7 @@ impl service::sending::Data for KeyValueDatabase { } fn active_requests_for<'a>( - &'a self, outgoing_kind: &OutgoingKind, + &'a self, outgoing_kind: &OutgoingDestination, ) -> Box, SendingEventType)>> + 'a> { let prefix = outgoing_kind.get_prefix(); Box::new( @@ -33,7 +33,7 @@ impl service::sending::Data for KeyValueDatabase { fn delete_active_request(&self, key: Vec) -> Result<()> { self.servercurrentevent_data.remove(&key) } - fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()> { + fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()> { let prefix = outgoing_kind.get_prefix(); for (key, _) in self.servercurrentevent_data.scan_prefix(prefix) { self.servercurrentevent_data.remove(&key)?; @@ -42,7 +42,7 @@ impl service::sending::Data for KeyValueDatabase { Ok(()) } - fn delete_all_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()> { + fn delete_all_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()> { let prefix = outgoing_kind.get_prefix(); for (key, _) in self.servercurrentevent_data.scan_prefix(prefix.clone()) { self.servercurrentevent_data.remove(&key).unwrap(); @@ -55,7 +55,7 @@ impl service::sending::Data for KeyValueDatabase { Ok(()) } - fn queue_requests(&self, requests: &[(&OutgoingKind, SendingEventType)]) -> Result>> { + fn queue_requests(&self, requests: &[(&OutgoingDestination, SendingEventType)]) -> Result>> { let mut batch = Vec::new(); let mut keys = Vec::new(); for (outgoing_kind, event) in requests { @@ -79,7 +79,7 @@ impl service::sending::Data for KeyValueDatabase { } fn queued_requests<'a>( - &'a self, outgoing_kind: &OutgoingKind, + &'a self, outgoing_kind: &OutgoingDestination, ) -> Box)>> + 'a> { let prefix = outgoing_kind.get_prefix(); return Box::new( @@ -122,7 +122,7 @@ impl service::sending::Data for KeyValueDatabase { } #[tracing::instrument(skip(key))] -fn parse_servercurrentevent(key: &[u8], value: Vec) -> Result<(OutgoingKind, SendingEventType)> { +fn parse_servercurrentevent(key: &[u8], value: Vec) -> Result<(OutgoingDestination, SendingEventType)> { // Appservices start with a plus Ok::<_, Error>(if key.starts_with(b"+") { let mut parts = key[1..].splitn(2, |&b| b == 0xFF); @@ -136,7 +136,7 @@ fn parse_servercurrentevent(key: &[u8], value: Vec) -> Result<(OutgoingKind, .map_err(|_| Error::bad_database("Invalid server bytes in server_currenttransaction"))?; ( - OutgoingKind::Appservice(server), + OutgoingDestination::Appservice(server), if value.is_empty() { SendingEventType::Pdu(event.to_vec()) } else { @@ -163,7 +163,7 @@ fn parse_servercurrentevent(key: &[u8], value: Vec) -> Result<(OutgoingKind, .ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?; ( - OutgoingKind::Push(user_id, pushkey_string), + OutgoingDestination::Push(user_id, pushkey_string), if value.is_empty() { SendingEventType::Pdu(event.to_vec()) } else { @@ -183,7 +183,7 @@ fn parse_servercurrentevent(key: &[u8], value: Vec) -> Result<(OutgoingKind, .map_err(|_| Error::bad_database("Invalid server bytes in server_currenttransaction"))?; ( - OutgoingKind::Normal( + OutgoingDestination::Normal( ServerName::parse(server) .map_err(|_| Error::bad_database("Invalid server string in server_currenttransaction"))?, ), diff --git a/src/service/sending/data.rs b/src/service/sending/data.rs index 46f3cd71..d5b0923e 100644 --- a/src/service/sending/data.rs +++ b/src/service/sending/data.rs @@ -1,20 +1,20 @@ use ruma::ServerName; -use super::{OutgoingKind, SendingEventType}; +use super::{OutgoingDestination, SendingEventType}; use crate::Result; -type OutgoingSendingIter<'a> = Box, OutgoingKind, SendingEventType)>> + 'a>; +type OutgoingSendingIter<'a> = Box, OutgoingDestination, SendingEventType)>> + 'a>; type SendingEventTypeIter<'a> = Box, SendingEventType)>> + 'a>; pub trait Data: Send + Sync { fn active_requests(&self) -> OutgoingSendingIter<'_>; - fn active_requests_for(&self, outgoing_kind: &OutgoingKind) -> SendingEventTypeIter<'_>; + fn active_requests_for(&self, outgoing_kind: &OutgoingDestination) -> SendingEventTypeIter<'_>; fn delete_active_request(&self, key: Vec) -> Result<()>; - fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()>; - fn delete_all_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()>; - fn queue_requests(&self, requests: &[(&OutgoingKind, SendingEventType)]) -> Result>>; + fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()>; + fn delete_all_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()>; + fn queue_requests(&self, requests: &[(&OutgoingDestination, SendingEventType)]) -> Result>>; fn queued_requests<'a>( - &'a self, outgoing_kind: &OutgoingKind, + &'a self, outgoing_kind: &OutgoingDestination, ) -> Box)>> + 'a>; fn mark_as_active(&self, events: &[(SendingEventType, Vec)]) -> Result<()>; fn set_latest_educount(&self, server_name: &ServerName, educount: u64) -> Result<()>; diff --git a/src/service/sending/mod.rs b/src/service/sending/mod.rs index 2323aa57..73cde12e 100644 --- a/src/service/sending/mod.rs +++ b/src/service/sending/mod.rs @@ -1,6 +1,6 @@ use std::{ cmp, - collections::{BTreeMap, HashMap, HashSet}, + collections::{hash_map::Entry, BTreeMap, HashMap, HashSet}, fmt::Debug, sync::Arc, time::{Duration, Instant}, @@ -25,7 +25,7 @@ use ruma::{ events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType}, push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId, }; -use tokio::sync::{mpsc, Mutex, Semaphore}; +use tokio::sync::{oneshot, Mutex, Semaphore}; use tracing::{error, warn}; use crate::{service::presence::Presence, services, utils::calculate_hash, Config, Error, PduEvent, Result}; @@ -42,15 +42,15 @@ pub struct Service { /// The state for a given state hash. pub(super) maximum_requests: Arc, - pub sender: mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec)>, - receiver: Mutex)>>, + pub sender: loole::Sender<(OutgoingDestination, SendingEventType, Vec)>, + receiver: Mutex)>>, startup_netburst: bool, startup_netburst_keep: i64, timeout: u64, } #[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub enum OutgoingKind { +pub enum OutgoingDestination { Appservice(String), Push(OwnedUserId, String), // user and pushkey Normal(OwnedServerName), @@ -65,14 +65,31 @@ pub enum SendingEventType { } enum TransactionStatus { + /// Currently running (for the first time) Running, - Failed(u32, Instant), // number of times failed, time of last failure - Retrying(u32), // number of times failed + /// Failed, backing off for a retry + Failed { + failures: u32, + waker: Option>, + }, + /// Currently retrying + Retrying { + /// number of times failed + failures: u32, + }, +} + +/// A control-flow enum to dictate what the handler should do after (trying to) +/// prepare a transaction +enum TransactionPrepOutcome { + Send(Vec), + Wake(OutgoingDestination), + Nothing, } impl Service { pub fn build(db: &'static dyn Data, config: &Config) -> Arc { - let (sender, receiver) = mpsc::unbounded_channel(); + let (sender, receiver) = loole::unbounded(); Arc::new(Self { db, sender, @@ -86,7 +103,7 @@ impl Service { #[tracing::instrument(skip(self, pdu_id, user, pushkey))] pub fn send_pdu_push(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> { - let outgoing_kind = OutgoingKind::Push(user.to_owned(), pushkey); + let outgoing_kind = OutgoingDestination::Push(user.to_owned(), pushkey); let event = SendingEventType::Pdu(pdu_id.to_owned()); let _cork = services().globals.db.cork()?; let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; @@ -99,7 +116,7 @@ impl Service { #[tracing::instrument(skip(self))] pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec) -> Result<()> { - let outgoing_kind = OutgoingKind::Appservice(appservice_id); + let outgoing_kind = OutgoingDestination::Appservice(appservice_id); let event = SendingEventType::Pdu(pdu_id); let _cork = services().globals.db.cork()?; let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; @@ -126,7 +143,7 @@ impl Service { pub fn send_pdu_servers>(&self, servers: I, pdu_id: &[u8]) -> Result<()> { let requests = servers .into_iter() - .map(|server| (OutgoingKind::Normal(server), SendingEventType::Pdu(pdu_id.to_owned()))) + .map(|server| (OutgoingDestination::Normal(server), SendingEventType::Pdu(pdu_id.to_owned()))) .collect::>(); let _cork = services().globals.db.cork()?; let keys = self.db.queue_requests( @@ -146,7 +163,7 @@ impl Service { #[tracing::instrument(skip(self, server, serialized))] pub fn send_edu_server(&self, server: &ServerName, serialized: Vec) -> Result<()> { - let outgoing_kind = OutgoingKind::Normal(server.to_owned()); + let outgoing_kind = OutgoingDestination::Normal(server.to_owned()); let event = SendingEventType::Edu(serialized); let _cork = services().globals.db.cork()?; let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; @@ -173,7 +190,7 @@ impl Service { pub fn send_edu_servers>(&self, servers: I, serialized: Vec) -> Result<()> { let requests = servers .into_iter() - .map(|server| (OutgoingKind::Normal(server), SendingEventType::Edu(serialized.clone()))) + .map(|server| (OutgoingDestination::Normal(server), SendingEventType::Edu(serialized.clone()))) .collect::>(); let _cork = services().globals.db.cork()?; let keys = self.db.queue_requests( @@ -205,7 +222,7 @@ impl Service { #[tracing::instrument(skip(self, servers))] pub fn flush_servers>(&self, servers: I) -> Result<()> { - let requests = servers.into_iter().map(OutgoingKind::Normal); + let requests = servers.into_iter().map(OutgoingDestination::Normal); for outgoing_kind in requests { self.sender @@ -221,7 +238,7 @@ impl Service { #[tracing::instrument(skip(self))] pub fn cleanup_events(&self, appservice_id: String) -> Result<()> { self.db - .delete_all_requests_for(&OutgoingKind::Appservice(appservice_id))?; + .delete_all_requests_for(&OutgoingDestination::Appservice(appservice_id))?; Ok(()) } @@ -274,14 +291,17 @@ impl Service { #[tracing::instrument(skip(self), name = "sender")] async fn handler(&self) -> Result<()> { - let mut receiver = self.receiver.lock().await; + let new_transactions = self.receiver.lock().await; + let (waking_sender, waking_receiver) = loole::unbounded(); - let mut futures = FuturesUnordered::new(); - let mut current_transaction_status = HashMap::::new(); + let mut outgoing = FuturesUnordered::new(); + let mut retrying = FuturesUnordered::new(); + + let mut current_transaction_status = HashMap::::new(); // Retry requests we could not finish yet if self.startup_netburst { - let mut initial_transactions = HashMap::>::new(); + let mut initial_transactions = HashMap::>::new(); for (key, outgoing_kind, event) in self.db.active_requests().filter_map(Result::ok) { let entry = initial_transactions .entry(outgoing_kind.clone()) @@ -300,13 +320,14 @@ impl Service { for (outgoing_kind, events) in initial_transactions { current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running); - futures.push(handle_events(outgoing_kind.clone(), events)); + outgoing.push(handle_events(outgoing_kind.clone(), events)); } } loop { tokio::select! { - Some(response) = futures.next() => { + Some(response) = outgoing.next() => { + // Outgoing transaction succeeded match response { Ok(outgoing_kind) => { let _cork = services().globals.db.cork(); @@ -322,51 +343,155 @@ impl Service { if !new_events.is_empty() { // Insert pdus we found self.db.mark_as_active(&new_events)?; - futures.push(handle_events( - outgoing_kind.clone(), + + // Clear retries + current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running); + + outgoing.push(handle_events( + outgoing_kind, new_events.into_iter().map(|(event, _)| event).collect(), )); } else { current_transaction_status.remove(&outgoing_kind); } } - Err((outgoing_kind, _)) => { - current_transaction_status.entry(outgoing_kind).and_modify(|e| *e = match e { - TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()), - TransactionStatus::Retrying(n) => TransactionStatus::Failed(*n+1, Instant::now()), - TransactionStatus::Failed(_, _) => { - error!("Request that was not even running failed?!"); - return - }, - }); + // Outgoing transaction failed + Err((destination, err)) => { + // Set status to Failed, create timer + let timer = Self::mark_failed_and_backoff(&mut current_transaction_status, destination.clone()); + + // Add timer to loop + retrying.push(timer); + + warn!("Outgoing request to {destination} failed: {err}"); } }; }, - Some((outgoing_kind, event, key)) = receiver.recv() => { - if let Ok(Some(events)) = self.select_events( - &outgoing_kind, + + // Transaction retry timers firing + Some(dest) = retrying.next() => { + // Transition Failed => Retrying, return pending old transaction events + match self.select_events( + &dest, + vec![], // will be ignored because fresh == false + &mut current_transaction_status, + false, + ) { + Ok(TransactionPrepOutcome::Send(events)) => { + outgoing.push(handle_events(dest, events)); + } + Ok(_) => { + // Unreachable because fresh == false + unreachable!("select_events on a stale transaction {} did not return ::Send", dest) + } + + Err(err) => { + error!("Ignoring error in (stale) outgoing request ({}) handler: {}", dest, err); + + // transaction dropped, so drop destination as well. + current_transaction_status.remove(&dest); + } + } + }, + + // Explicit wakeups, makes a backoff timer return immediately + Ok(outgoing) = waking_receiver.recv_async() => { + if let Some(TransactionStatus::Failed { waker, .. }) = current_transaction_status.get_mut(&outgoing) { + if let Some(waker) = waker.take() { + _ = waker.send(()); + } + } + }, + + // New transactions to be sent out (from server/user activity) + event = new_transactions.recv_async() => { + if let Ok((dest, event, key)) = event { + match self.select_events( + &dest, vec![(event, key)], &mut current_transaction_status, - ) { - futures.push(handle_events(outgoing_kind, events)); + true) { + Ok(TransactionPrepOutcome::Send(events)) => { + outgoing.push(handle_events(dest, events)); + }, + Ok(TransactionPrepOutcome::Wake(dest)) => { + waking_sender.send(dest).expect("nothing closes this channel but ourselves"); + }, + Ok(TransactionPrepOutcome::Nothing) => {}, + Err(err) => { + error!("Ignoring error in (fresh) outgoing request ({}) handler: {}", dest, err); + } + } } } } } } + /// Generates timer/oneshot, alters status to reflect Failed + /// + /// Returns timer/oneshot future to wake up loop for next retry + fn mark_failed_and_backoff( + status: &mut HashMap, dest: OutgoingDestination, + ) -> impl std::future::Future { + let now = Instant::now(); + + let entry = status + .get_mut(&dest) + .expect("guaranteed to be set before this function"); + + let failures = match entry { + // Running -> Failed + TransactionStatus::Running => 1, + // Retrying -> Failed + TransactionStatus::Retrying { + failures, + } => *failures + 1, + + // The transition of Failed -> Retrying is handled by handle_events + TransactionStatus::Failed { + .. + } => { + unreachable!( + "TransactionStatus in inconsistent state: Expected either Running or Retrying, got Failed, \ + bailing..." + ) + }, + }; + + const ONE_DAY: Duration = Duration::from_secs(60 * 60 * 24); + + // Exponential backoff, clamp upper value to one day + let next_wakeup = now + (Duration::from_secs(30) * failures * failures).min(ONE_DAY); + + let (fut, waker) = dest.wrap_in_interruptible_sleep(next_wakeup); + + *entry = TransactionStatus::Failed { + failures, + waker: Some(waker), + }; + + fut + } + + /// This prepares a transaction, checks the transaction state, and selects + /// appropriate events. #[tracing::instrument(skip(self, outgoing_kind, new_events, current_transaction_status))] fn select_events( &self, - outgoing_kind: &OutgoingKind, + outgoing_kind: &OutgoingDestination, new_events: Vec<(SendingEventType, Vec)>, // Events we want to send: event and full key - current_transaction_status: &mut HashMap, - ) -> Result>> { - let (allow, retry) = self.select_events_current(outgoing_kind.clone(), current_transaction_status)?; + current_transaction_status: &mut HashMap, + fresh: bool, // Wether or not this transaction came from server activity. + ) -> Result { + let (allow, retry, wake_up) = + self.select_events_current(outgoing_kind.clone(), current_transaction_status, fresh)?; // Nothing can be done for this remote, bail out. - if !allow { - return Ok(None); + if wake_up { + return Ok(TransactionPrepOutcome::Wake(outgoing_kind.clone())); + } else if !allow { + return Ok(TransactionPrepOutcome::Nothing); } let _cork = services().globals.db.cork(); @@ -374,12 +499,14 @@ impl Service { // Must retry any previous transaction for this remote. if retry { - self.db + // We retry the previous transaction + for (_, e) in self + .db .active_requests_for(outgoing_kind) .filter_map(Result::ok) - .for_each(|(_, e)| events.push(e)); - - return Ok(Some(events)); + { + events.push(e); + } } // Compose the next transaction @@ -392,43 +519,79 @@ impl Service { } // Add EDU's into the transaction - if let OutgoingKind::Normal(server_name) = outgoing_kind { + if let OutgoingDestination::Normal(server_name) = outgoing_kind { if let Ok((select_edus, last_count)) = self.select_edus(server_name) { events.extend(select_edus.into_iter().map(SendingEventType::Edu)); self.db.set_latest_educount(server_name, last_count)?; } } - Ok(Some(events)) + Ok(TransactionPrepOutcome::Send(events)) } #[tracing::instrument(skip(self, outgoing_kind, current_transaction_status))] fn select_events_current( - &self, outgoing_kind: OutgoingKind, current_transaction_status: &mut HashMap, - ) -> Result<(bool, bool)> { - let (mut allow, mut retry) = (true, false); - current_transaction_status - .entry(outgoing_kind) - .and_modify(|e| match e { - TransactionStatus::Failed(tries, time) => { - // Fail if a request has failed recently (exponential backoff) - const MAX_DURATION: Duration = Duration::from_secs(60 * 60 * 24); - let mut min_elapsed_duration = Duration::from_secs(self.timeout) * (*tries) * (*tries); - min_elapsed_duration = cmp::min(min_elapsed_duration, MAX_DURATION); - if time.elapsed() < min_elapsed_duration { - allow = false; - } else { - retry = true; - *e = TransactionStatus::Retrying(*tries); + &self, outgoing_kind: OutgoingDestination, + current_transaction_status: &mut HashMap, fresh: bool, + ) -> Result<(bool, bool, bool)> { + let (mut allow, mut retry, mut wake_up) = (true, false, false); + + let entry = current_transaction_status.entry(outgoing_kind); + + if fresh { + // If its fresh, we initialise the status if we need to. + // + // We do nothing if it is already running or retrying. + // + // We return with a wake if it is in the Failed state. + entry + .and_modify(|e| match e { + TransactionStatus::Running + | TransactionStatus::Retrying { + .. + } => { + allow = false; // already running + }, + TransactionStatus::Failed { + .. + } => { + // currently sleeping + wake_up = true; + }, + }) + .or_insert(TransactionStatus::Running); + } else { + // If it's not fresh, we expect an entry. + // + // We also expect us to be the only one who are touching this destination right + // now, and its a stale transaction, so it must be in the Failed state + match entry { + Entry::Occupied(mut e) => { + let e = e.get_mut(); + match e { + TransactionStatus::Failed { + failures, + .. + } => { + *e = TransactionStatus::Retrying { + failures: *failures, + }; + retry = true; + }, + + _ => unreachable!( + "Encountered bad state when preparing stale transaction: expected Failed state, got \ + Running or Retrying" + ), } }, - TransactionStatus::Running | TransactionStatus::Retrying(_) => { - allow = false; // already running - }, - }) - .or_insert(TransactionStatus::Running); + Entry::Vacant(_) => unreachable!( + "Encountered bad state when preparing stale transaction: expected Failed state, got vacant state" + ), + } + } - Ok((allow, retry)) + Ok((allow, retry, wake_up)) } #[tracing::instrument(skip(self, server_name))] @@ -594,19 +757,21 @@ pub fn select_edus_receipts( } async fn handle_events( - kind: OutgoingKind, events: Vec, -) -> Result { + kind: OutgoingDestination, events: Vec, +) -> Result { match kind { - OutgoingKind::Appservice(ref id) => handle_events_kind_appservice(&kind, id, events).await, - OutgoingKind::Push(ref userid, ref pushkey) => handle_events_kind_push(&kind, userid, pushkey, events).await, - OutgoingKind::Normal(ref server) => handle_events_kind_normal(&kind, server, events).await, + OutgoingDestination::Appservice(ref id) => handle_events_kind_appservice(&kind, id, events).await, + OutgoingDestination::Push(ref userid, ref pushkey) => { + handle_events_kind_push(&kind, userid, pushkey, events).await + }, + OutgoingDestination::Normal(ref server) => handle_events_kind_normal(&kind, server, events).await, } } #[tracing::instrument(skip(kind, events))] async fn handle_events_kind_appservice( - kind: &OutgoingKind, id: &String, events: Vec, -) -> Result { + kind: &OutgoingDestination, id: &String, events: Vec, +) -> Result { let mut pdu_jsons = Vec::new(); for event in &events { @@ -674,8 +839,8 @@ async fn handle_events_kind_appservice( #[tracing::instrument(skip(kind, events))] async fn handle_events_kind_push( - kind: &OutgoingKind, userid: &OwnedUserId, pushkey: &String, events: Vec, -) -> Result { + kind: &OutgoingDestination, userid: &OwnedUserId, pushkey: &String, events: Vec, +) -> Result { let mut pdus = Vec::new(); for event in &events { @@ -715,7 +880,7 @@ async fn handle_events_kind_push( let Some(pusher) = services() .pusher .get_pusher(userid, pushkey) - .map_err(|e| (kind.clone(), e))? + .map_err(|e| (OutgoingDestination::Push(userid.clone(), pushkey.clone()), e))? else { continue; }; @@ -752,8 +917,8 @@ async fn handle_events_kind_push( #[tracing::instrument(skip(kind, events), name = "")] async fn handle_events_kind_normal( - kind: &OutgoingKind, dest: &OwnedServerName, events: Vec, -) -> Result { + kind: &OutgoingDestination, dest: &OwnedServerName, events: Vec, +) -> Result { let mut edu_jsons = Vec::new(); let mut pdu_jsons = Vec::new(); @@ -826,23 +991,23 @@ async fn handle_events_kind_normal( response } -impl OutgoingKind { +impl OutgoingDestination { #[tracing::instrument(skip(self))] pub fn get_prefix(&self) -> Vec { let mut prefix = match self { - OutgoingKind::Appservice(server) => { + OutgoingDestination::Appservice(server) => { let mut p = b"+".to_vec(); p.extend_from_slice(server.as_bytes()); p }, - OutgoingKind::Push(user, pushkey) => { + OutgoingDestination::Push(user, pushkey) => { let mut p = b"$".to_vec(); p.extend_from_slice(user.as_bytes()); p.push(0xFF); p.extend_from_slice(pushkey.as_bytes()); p }, - OutgoingKind::Normal(server) => { + OutgoingDestination::Normal(server) => { let mut p = Vec::new(); p.extend_from_slice(server.as_bytes()); p @@ -852,4 +1017,40 @@ impl OutgoingKind { prefix } + + /// This wraps the OutgoingDestination key in an interruptible sleep future. + /// + /// The first return value is the future, the second is the oneshot that + /// interrupts that future, and causes it to return instantly. + fn wrap_in_interruptible_sleep( + self, at: Instant, + ) -> (impl std::future::Future, oneshot::Sender<()>) { + let (tx, rx) = oneshot::channel(); + let at = tokio::time::Instant::from_std(at); + + ( + async move { + _ = tokio::time::timeout_at(at, rx).await; + + self + }, + tx, + ) + } +} + +impl std::fmt::Display for OutgoingDestination { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + OutgoingDestination::Appservice(appservice_id) => { + write!(f, "Appservice (ID {:?})", appservice_id) + }, + OutgoingDestination::Push(user, push_key) => { + write!(f, "User Push Service (for {:?}, with key {:?})", user, push_key) + }, + OutgoingDestination::Normal(server) => { + write!(f, "Matrix Server ({:?})", server) + }, + } + } }