Compare commits

...
This repository has been archived on 2025-08-14. You can view files and clone it, but you cannot make any changes to it's state, such as pushing and creating new issues, pull requests or comments.

10 commits

Author SHA1 Message Date
strawberry
d5a9c98657 make federation retry timer-based
Signed-off-by: strawberry <strawberry@puppygock.gay>
2024-04-17 22:14:30 -04:00
strawberry
395b466b4a rename OutgoingKind enum to OutgoingDestination
Signed-off-by: strawberry <strawberry@puppygock.gay>
2024-04-17 20:11:18 -04:00
strawberry
0376b58006 use latest main rev for hickory (and for reqwest)
Signed-off-by: strawberry <strawberry@puppygock.gay>
2024-04-17 20:05:56 -04:00
strawberry
78c1e2f427 adjust DNS default config options
Signed-off-by: strawberry <strawberry@puppygock.gay>
2024-04-17 19:49:19 -04:00
strawberry
6614b8f6bf ci: remove download env
Signed-off-by: strawberry <strawberry@puppygock.gay>
2024-04-17 19:15:12 -04:00
strawberry
c2fa8e6f8d split up CI steps
Signed-off-by: strawberry <strawberry@puppygock.gay>
2024-04-17 17:59:01 -04:00
strawberry
b8108f5897 cargo fmt
Signed-off-by: strawberry <strawberry@puppygock.gay>
2024-04-17 17:50:34 -04:00
morguldir
cf8358cbe6 Remove extra test flag when publishing to ghcr in the CI
test -n checks if a string is longer than non-zero, but we just need a compare

Signed-off-by: morguldir <morguldir@protonmail.com>
2024-04-17 17:22:52 -04:00
strawberry
7ecc570bb8 Revert "dont use loole for sending channel code"
This reverts commit d0a9666a29.
2024-04-17 15:16:01 -04:00
strawberry
002799177d fix wrong warn message
Signed-off-by: strawberry <strawberry@puppygock.gay>
2024-04-17 15:15:52 -04:00
9 changed files with 480 additions and 394 deletions

View file

@ -26,11 +26,9 @@ permissions:
contents: read
jobs:
ci:
name: CI and Artifacts
setup:
name: CI Setup
runs-on: ubuntu-latest
steps:
- name: Sync repository
uses: actions/checkout@v4
@ -94,291 +92,175 @@ jobs:
./bin/nix-build-and-cache .#devShells.x86_64-linux.default.inputDerivation
build-and-test:
name: CI and Artifacts
needs: setup
runs-on: ubuntu-latest
strategy:
matrix:
target: [
"static-x86_64-unknown-linux-musl",
"static-x86_64-unknown-linux-musl-jemalloc",
"static-x86_64-unknown-linux-musl-hmalloc",
"static-aarch64-unknown-linux-musl",
"static-aarch64-unknown-linux-musl-jemalloc",
"static-aarch64-unknown-linux-musl-hmalloc",
]
oci-target: [
"x86_64-unknown-linux-gnu",
"x86_64-unknown-linux-musl",
"x86_64-unknown-linux-musl-jemalloc",
"x86_64-unknown-linux-musl-hmalloc",
"aarch64-unknown-linux-musl",
"aarch64-unknown-linux-musl-jemalloc",
"aarch64-unknown-linux-musl-hmalloc",
]
steps:
- name: Perform continuous integration
run: direnv exec . engage
- name: Build static-x86_64-unknown-linux-musl and Create static deb-x86_64-unknown-linux-musl
- name: Build static artifacts
run: |
./bin/nix-build-and-cache .#static-x86_64-unknown-linux-musl
./bin/nix-build-and-cache .#${{ matrix.target }}
mkdir -p target/release
cp -v -f result/bin/conduit target/release
direnv exec . cargo deb --no-build
direnv exec . cargo deb --no-build --output target/debian/${{ matrix.target }}.deb
- name: Upload artifact static-x86_64-unknown-linux-musl
- name: Upload static artifacts
uses: actions/upload-artifact@v4
with:
name: static-x86_64-unknown-linux-musl
name: ${{ matrix.target }}
path: result/bin/conduit
if-no-files-found: error
- name: Upload artifact deb-x86_64-unknown-linux-musl
- name: Upload static deb artifacts
uses: actions/upload-artifact@v4
with:
name: x86_64-unknown-linux-musl.deb
path: target/debian/*.deb
name: ${{ matrix.target }}.deb
path: target/debian/${{ matrix.target }}.deb
if-no-files-found: error
- name: Build static-x86_64-unknown-linux-musl-jemalloc and Create static deb-x86_64-unknown-linux-musl-jemalloc
- name: Build OCI images
run: |
./bin/nix-build-and-cache .#static-x86_64-unknown-linux-musl-jemalloc
mkdir -p target/release
cp -v -f result/bin/conduit target/release
direnv exec . cargo deb --no-build
./bin/nix-build-and-cache .#oci-image-${{ matrix.oci-target }}
cp -v -f result oci-image-${{ matrix.oci-target }}.tar.gz
- name: Upload artifact static-x86_64-unknown-linux-musl-jemalloc
- name: Upload OCI image artifacts
uses: actions/upload-artifact@v4
with:
name: static-x86_64-unknown-linux-musl-jemalloc
path: result/bin/conduit
if-no-files-found: error
- name: Upload artifact deb-x86_64-unknown-linux-musl-jemalloc
uses: actions/upload-artifact@v4
with:
name: x86_64-unknown-linux-musl-jemalloc.deb
path: target/debian/*.deb
if-no-files-found: error
- name: Build static-x86_64-unknown-linux-musl-hmalloc and Create static deb-x86_64-unknown-linux-musl-hmalloc
run: |
./bin/nix-build-and-cache .#static-x86_64-unknown-linux-musl-hmalloc
mkdir -p target/release
cp -v -f result/bin/conduit target/release
direnv exec . cargo deb --no-build
- name: Upload artifact static-x86_64-unknown-linux-musl-hmalloc
uses: actions/upload-artifact@v4
with:
name: static-x86_64-unknown-linux-musl-hmalloc
path: result/bin/conduit
if-no-files-found: error
- name: Upload artifact deb-x86_64-unknown-linux-musl-hmalloc
uses: actions/upload-artifact@v4
with:
name: x86_64-unknown-linux-musl-hmalloc.deb
path: target/debian/*.deb
if-no-files-found: error
- name: Build static-aarch64-unknown-linux-musl
run: |
./bin/nix-build-and-cache .#static-aarch64-unknown-linux-musl
- name: Upload artifact static-aarch64-unknown-linux-musl
uses: actions/upload-artifact@v4
with:
name: static-aarch64-unknown-linux-musl
path: result/bin/conduit
if-no-files-found: error
- name: Build static-aarch64-unknown-linux-musl-jemalloc
run: |
./bin/nix-build-and-cache .#static-aarch64-unknown-linux-musl-jemalloc
- name: Upload artifact static-aarch64-unknown-linux-musl-jemalloc
uses: actions/upload-artifact@v4
with:
name: static-aarch64-unknown-linux-musl-jemalloc
path: result/bin/conduit
if-no-files-found: error
- name: Build static-aarch64-unknown-linux-musl-hmalloc
run: |
./bin/nix-build-and-cache .#static-aarch64-unknown-linux-musl-hmalloc
- name: Upload artifact static-aarch64-unknown-linux-musl-hmalloc
uses: actions/upload-artifact@v4
with:
name: static-aarch64-unknown-linux-musl-hmalloc
path: result/bin/conduit
if-no-files-found: error
- name: Build oci-image-x86_64-unknown-linux-gnu
run: |
./bin/nix-build-and-cache .#oci-image
cp -v -f result oci-image-amd64.tar.gz
- name: Upload artifact oci-image-x86_64-unknown-linux-gnu
uses: actions/upload-artifact@v4
with:
name: oci-image-x86_64-unknown-linux-gnu
path: oci-image-amd64.tar.gz
if-no-files-found: error
# don't compress again
compression-level: 0
- name: Build oci-image-x86_64-unknown-linux-gnu-jemalloc
run: |
./bin/nix-build-and-cache .#oci-image-jemalloc
cp -v -f result oci-image-amd64.tar.gz
- name: Upload artifact oci-image-x86_64-unknown-linux-gnu-jemalloc
uses: actions/upload-artifact@v4
with:
name: oci-image-x86_64-unknown-linux-gnu-jemalloc
path: oci-image-amd64.tar.gz
if-no-files-found: error
# don't compress again
compression-level: 0
- name: Build oci-image-x86_64-unknown-linux-gnu-hmalloc
run: |
./bin/nix-build-and-cache .#oci-image-hmalloc
cp -v -f result oci-image-amd64.tar.gz
- name: Upload artifact oci-image-x86_64-unknown-linux-gnu-hmalloc
uses: actions/upload-artifact@v4
with:
name: oci-image-x86_64-unknown-linux-gnu-hmalloc
path: oci-image-amd64.tar.gz
name: oci-image-${{ matrix.oci-target }}
path: oci-image-${{ matrix.oci-target }}.tar.gz
if-no-files-found: error
# don't compress again
compression-level: 0
- name: Build oci-image-aarch64-unknown-linux-musl
run: |
./bin/nix-build-and-cache .#oci-image-aarch64-unknown-linux-musl
cp -v -f result oci-image-arm64v8.tar.gz
- name: Upload artifact oci-image-aarch64-unknown-linux-musl
uses: actions/upload-artifact@v4
with:
name: oci-image-aarch64-unknown-linux-musl
path: oci-image-arm64v8.tar.gz
if-no-files-found: error
# don't compress again
compression-level: 0
- name: Build oci-image-aarch64-unknown-linux-musl-jemalloc
run: |
./bin/nix-build-and-cache .#oci-image-aarch64-unknown-linux-musl-jemalloc
cp -v -f result oci-image-arm64v8.tar.gz
- name: Upload artifact oci-image-aarch64-unknown-linux-musl-jemalloc
uses: actions/upload-artifact@v4
with:
name: oci-image-aarch64-unknown-linux-musl-jemalloc
path: oci-image-arm64v8.tar.gz
if-no-files-found: error
# don't compress again
compression-level: 0
- name: Build oci-image-aarch64-unknown-linux-musl-hmalloc
run: |
./bin/nix-build-and-cache .#oci-image-aarch64-unknown-linux-musl-hmalloc
cp -v -f result oci-image-arm64v8.tar.gz
- name: Upload artifact oci-image-aarch64-unknown-linux-musl-hmalloc
uses: actions/upload-artifact@v4
with:
name: oci-image-aarch64-unknown-linux-musl-hmalloc
path: oci-image-arm64v8.tar.gz
if-no-files-found: error
# don't compress again
compression-level: 0
- name: Extract metadata for Dockerhub
env:
REGISTRY: registry.hub.docker.com
IMAGE_NAME: ${{ github.repository }}
id: meta-dockerhub
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
publish:
needs: build-and-test
runs-on: ubuntu-latest
steps:
- name: Extract metadata for Dockerhub
env:
REGISTRY: registry.hub.docker.com
IMAGE_NAME: ${{ github.repository }}
id: meta-dockerhub
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
- name: Extract metadata for GitHub Container Registry
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}
id: meta-ghcr
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
- name: Extract metadata for GitHub Container Registry
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}
id: meta-ghcr
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
- name: Login to Dockerhub
env:
DOCKERHUB_TOKEN: ${{ secrets.DOCKERHUB_TOKEN }}
DOCKER_USERNAME: ${{ vars.DOCKER_USERNAME }}
if: ${{ (github.event_name != 'pull_request') && (env.DOCKER_USERNAME != '') && (env.DOCKERHUB_TOKEN != '') }}
uses: docker/login-action@v3
with:
# username is not really a secret
username: ${{ vars.DOCKER_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Login to Dockerhub
env:
DOCKERHUB_TOKEN: ${{ secrets.DOCKERHUB_TOKEN }}
DOCKER_USERNAME: ${{ vars.DOCKER_USERNAME }}
if: ${{ (github.event_name != 'pull_request') && (env.DOCKER_USERNAME != '') && (env.DOCKERHUB_TOKEN != '') }}
uses: docker/login-action@v3
with:
# username is not really a secret
username: ${{ vars.DOCKER_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Login to GitHub Container Registry
if: github.event_name != 'pull_request'
uses: docker/login-action@v3
env:
REGISTRY: ghcr.io
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Login to GitHub Container Registry
if: github.event_name != 'pull_request'
uses: docker/login-action@v3
env:
REGISTRY: ghcr.io
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.repository_owner }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Publish to Dockerhub
env:
DOCKERHUB_TOKEN: ${{ secrets.DOCKERHUB_TOKEN }}
DOCKER_USERNAME: ${{ vars.DOCKER_USERNAME }}
IMAGE_NAME: docker.io/${{ github.repository }}
IMAGE_SUFFIX_AMD64: amd64
IMAGE_SUFFIX_ARM64V8: arm64v8
if: ${{ (github.event_name != 'pull_request') && (env.DOCKER_USERNAME != '') && (env.DOCKERHUB_TOKEN != '') }}
run: |
docker load -i oci-image-amd64.tar.gz
IMAGE_ID_AMD64=$(docker images -q conduit:main)
docker load -i oci-image-arm64v8.tar.gz
IMAGE_ID_ARM64V8=$(docker images -q conduit:main)
- name: Publish to Dockerhub
env:
DOCKERHUB_TOKEN: ${{ secrets.DOCKERHUB_TOKEN }}
DOCKER_USERNAME: ${{ vars.DOCKER_USERNAME }}
IMAGE_NAME: docker.io/${{ github.repository }}
IMAGE_SUFFIX_AMD64: amd64
IMAGE_SUFFIX_ARM64V8: arm64v8
if: ${{ (github.event_name != 'pull_request') && (env.DOCKER_USERNAME != '') && (env.DOCKERHUB_TOKEN != '') }}
run: |
docker load -i oci-image-amd64.tar.gz
IMAGE_ID_AMD64=$(docker images -q conduit:main)
docker load -i oci-image-arm64v8.tar.gz
IMAGE_ID_ARM64V8=$(docker images -q conduit:main)
# Tag and push the architecture specific images
docker tag $IMAGE_ID_AMD64 $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64
docker tag $IMAGE_ID_ARM64V8 $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker push $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64
docker push $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
# Tag the multi-arch image
docker manifest create $IMAGE_NAME:$GITHUB_SHA --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker manifest push $IMAGE_NAME:$GITHUB_SHA
# Tag and push the git ref
docker manifest create $IMAGE_NAME:$GITHUB_REF_NAME --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker manifest push $IMAGE_NAME:$GITHUB_REF_NAME
# Tag "main" as latest (stable branch)
if [[ "$GITHUB_REF_NAME" = "main" ]]; then
docker manifest create $IMAGE_NAME:latest --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker manifest push $IMAGE_NAME:latest
fi
# Tag and push the architecture specific images
docker tag $IMAGE_ID_AMD64 $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64
docker tag $IMAGE_ID_ARM64V8 $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker push $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64
docker push $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
# Tag the multi-arch image
docker manifest create $IMAGE_NAME:$GITHUB_SHA --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker manifest push $IMAGE_NAME:$GITHUB_SHA
# Tag and push the git ref
docker manifest create $IMAGE_NAME:$GITHUB_REF_NAME --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker manifest push $IMAGE_NAME:$GITHUB_REF_NAME
# Tag "main" as latest (stable branch)
if [[ "$GITHUB_REF_NAME" = "main" ]]; then
docker manifest create $IMAGE_NAME:latest --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker manifest push $IMAGE_NAME:latest
fi
- name: Publish to GitHub Container Registry
if: github.event_name != 'pull_request'
env:
IMAGE_NAME: ghcr.io/${{ github.repository }}
IMAGE_SUFFIX_AMD64: amd64
IMAGE_SUFFIX_ARM64V8: arm64v8
run: |
docker load -i oci-image-amd64.tar.gz
IMAGE_ID_AMD64=$(docker images -q conduit:main)
docker load -i oci-image-arm64v8.tar.gz
IMAGE_ID_ARM64V8=$(docker images -q conduit:main)
- name: Publish to GitHub Container Registry
if: github.event_name != 'pull_request'
env:
IMAGE_NAME: ghcr.io/${{ github.repository }}
IMAGE_SUFFIX_AMD64: amd64
IMAGE_SUFFIX_ARM64V8: arm64v8
run: |
docker load -i oci-image-amd64.tar.gz
IMAGE_ID_AMD64=$(docker images -q conduit:main)
docker load -i oci-image-arm64v8.tar.gz
IMAGE_ID_ARM64V8=$(docker images -q conduit:main)
# Tag and push the architecture specific images
docker tag $IMAGE_ID_AMD64 $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64
docker tag $IMAGE_ID_ARM64V8 $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker push $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64
docker push $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
# Tag the multi-arch image
docker manifest create $IMAGE_NAME:$GITHUB_SHA --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker manifest push $IMAGE_NAME:$GITHUB_SHA
# Tag and push the git ref
docker manifest create $IMAGE_NAME:$GITHUB_REF_NAME --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker manifest push $IMAGE_NAME:$GITHUB_REF_NAME
# Tag "main" as latest (stable branch)
if [[ -n "$GITHUB_REF_NAME" = "main" ]]; then
docker manifest create $IMAGE_NAME:latest --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker manifest push $IMAGE_NAME:latest
fi
# Tag and push the architecture specific images
docker tag $IMAGE_ID_AMD64 $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64
docker tag $IMAGE_ID_ARM64V8 $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker push $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64
docker push $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
# Tag the multi-arch image
docker manifest create $IMAGE_NAME:$GITHUB_SHA --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker manifest push $IMAGE_NAME:$GITHUB_SHA
# Tag and push the git ref
docker manifest create $IMAGE_NAME:$GITHUB_REF_NAME --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker manifest push $IMAGE_NAME:$GITHUB_REF_NAME
# Tag "main" as latest (stable branch)
if [[ "$GITHUB_REF_NAME" = "main" ]]; then
docker manifest create $IMAGE_NAME:latest --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker manifest push $IMAGE_NAME:latest
fi

35
Cargo.lock generated
View file

@ -111,6 +111,17 @@ dependencies = [
"zstd-safe",
]
[[package]]
name = "async-recursion"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30c5ef0ede93efbf733c1a727f3b6b5a1060bbedd5600183e66f6e4be4af0ec5"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
]
[[package]]
name = "async-trait"
version = "0.1.80"
@ -1045,9 +1056,9 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "hickory-proto"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "091a6fbccf4860009355e3efc52ff4acf37a63489aad7435372d44ceeb6fbbcf"
source = "git+https://github.com/hickory-dns/hickory-dns?rev=94ac564c3f677e038f7255ddb762e9301d0f2c5d#94ac564c3f677e038f7255ddb762e9301d0f2c5d"
dependencies = [
"async-recursion",
"async-trait",
"cfg-if",
"data-encoding",
@ -1055,7 +1066,7 @@ dependencies = [
"futures-channel",
"futures-io",
"futures-util",
"idna 0.4.0",
"idna",
"ipnet",
"once_cell",
"rand",
@ -1069,8 +1080,7 @@ dependencies = [
[[package]]
name = "hickory-resolver"
version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35b8f021164e6a984c9030023544c57789c51760065cd510572fedcfb04164e8"
source = "git+https://github.com/hickory-dns/hickory-dns?rev=94ac564c3f677e038f7255ddb762e9301d0f2c5d#94ac564c3f677e038f7255ddb762e9301d0f2c5d"
dependencies = [
"cfg-if",
"futures-util",
@ -1311,16 +1321,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "idna"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]]
name = "idna"
version = "0.5.0"
@ -2285,8 +2285,7 @@ checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56"
[[package]]
name = "reqwest"
version = "0.11.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62"
source = "git+https://github.com/girlbossceo/reqwest?rev=319335e000fdea2e3d01f44245c8a21864d0c1c3#319335e000fdea2e3d01f44245c8a21864d0c1c3"
dependencies = [
"async-compression",
"base64 0.21.7",
@ -3763,7 +3762,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633"
dependencies = [
"form_urlencoded",
"idna 0.5.0",
"idna",
"percent-encoding",
"serde",
]

View file

@ -29,9 +29,6 @@ base64 = "0.22.0"
# Used when hashing the state
ring = "0.17.8"
# Used when querying the SRV record of other servers
hickory-resolver = "0.24.0"
# Used to find matching events for appservices
regex = "1.10.4"
@ -107,9 +104,11 @@ version = "0.14"
features = ["server", "http1", "http2"]
[dependencies.reqwest]
version = "0.11.27"
#version = "0.11.27"
git = "https://github.com/girlbossceo/reqwest"
rev = "319335e000fdea2e3d01f44245c8a21864d0c1c3"
default-features = false
features = ["rustls-tls-native-roots", "socks", "trust-dns"]
features = ["rustls-tls-native-roots", "socks", "hickory-dns"]
# all the serde stuff
# Used for pdu definition
@ -272,6 +271,10 @@ features = [
"unstable-extensible-events",
]
[dependencies.hickory-resolver]
git = "https://github.com/hickory-dns/hickory-dns"
rev = "94ac564c3f677e038f7255ddb762e9301d0f2c5d"
[dependencies.rust-rocksdb]
git = "https://github.com/zaidoon1/rust-rocksdb"
branch = "master"

View file

@ -477,19 +477,19 @@ allow_profile_lookup_federation_requests = true
# Minimum time-to-live in seconds for entries in the DNS cache. The default may appear high to most
# administrators; this is by design. Only decrease this if you are using an external DNS cache.
#dns_min_ttl = 60 * 90
#dns_min_ttl = 10800
# Minimum time-to-live in seconds for NXDOMAIN entries in the DNS cache. This value is critical for
# the server to federate efficiently. NXDOMAIN's are assumed to not be returning to the federation
# and aggressively cached rather than constantly rechecked.
#dns_min_ttl_nxdomain = 60 * 60 * 24 * 3
#dns_min_ttl_nxdomain = 86400
# The number of seconds to wait for a reply to a DNS query. Please note that recursive queries can
# take up to several seconds for some domains, so this value should not be too low.
#dns_timeout = 5
#dns_timeout = 10
# Number of retries after a timeout.
#dns_attempts = 5
#dns_attempts = 10
# Fallback to TCP on DNS errors. Set this to false if unsupported by nameserver.
#dns_tcp_fallback = true
@ -498,7 +498,7 @@ allow_profile_lookup_federation_requests = true
# This can avoid useless DNS queries if the first nameserver responds with NXDOMAIN or an empty NOERROR response.
#
# The default is to query one nameserver and stop (false).
#query_all_nameservers = false
#query_all_nameservers = true
### Request Timeouts, Connection Timeouts, and Connection Pooling

View file

@ -1554,7 +1554,8 @@ pub async fn create_invite_route(body: Ruma<create_invite::v2::Request>) -> Resu
.contains(&server.to_owned())
{
warn!(
"Received federated/remote invite from banned server {sender_servername} for room ID {}. Rejecting.",
"Received federated/remote invite from server {sender_servername} for room ID {} which has a banned \
server name. Rejecting.",
body.room_id
);
return Err(Error::BadRequest(

View file

@ -100,7 +100,7 @@ pub struct Config {
pub dns_timeout: u64,
#[serde(default = "true_fn")]
pub dns_tcp_fallback: bool,
#[serde(default)]
#[serde(default = "true_fn")]
pub query_all_nameservers: bool,
#[serde(default = "default_max_request_size")]
pub max_request_size: u32,
@ -851,13 +851,13 @@ fn default_cleanup_second_interval() -> u32 {
fn default_dns_cache_entries() -> u32 { 12288 }
fn default_dns_min_ttl() -> u64 { 60 * 90 }
fn default_dns_min_ttl() -> u64 { 60 * 180 }
fn default_dns_min_ttl_nxdomain() -> u64 { 60 * 60 * 24 * 3 }
fn default_dns_min_ttl_nxdomain() -> u64 { 60 * 60 * 24 }
fn default_dns_attempts() -> u16 { 5 }
fn default_dns_attempts() -> u16 { 10 }
fn default_dns_timeout() -> u64 { 5 }
fn default_dns_timeout() -> u64 { 10 }
fn default_max_request_size() -> u32 {
20 * 1024 * 1024 // Default to 20 MB

View file

@ -4,7 +4,7 @@ use crate::{
database::KeyValueDatabase,
service::{
self,
sending::{OutgoingKind, SendingEventType},
sending::{OutgoingDestination, SendingEventType},
},
services, utils, Error, Result,
};
@ -12,7 +12,7 @@ use crate::{
impl service::sending::Data for KeyValueDatabase {
fn active_requests<'a>(
&'a self,
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingKind, SendingEventType)>> + 'a> {
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingDestination, SendingEventType)>> + 'a> {
Box::new(
self.servercurrentevent_data
.iter()
@ -21,7 +21,7 @@ impl service::sending::Data for KeyValueDatabase {
}
fn active_requests_for<'a>(
&'a self, outgoing_kind: &OutgoingKind,
&'a self, outgoing_kind: &OutgoingDestination,
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a> {
let prefix = outgoing_kind.get_prefix();
Box::new(
@ -33,7 +33,7 @@ impl service::sending::Data for KeyValueDatabase {
fn delete_active_request(&self, key: Vec<u8>) -> Result<()> { self.servercurrentevent_data.remove(&key) }
fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()> {
fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()> {
let prefix = outgoing_kind.get_prefix();
for (key, _) in self.servercurrentevent_data.scan_prefix(prefix) {
self.servercurrentevent_data.remove(&key)?;
@ -42,7 +42,7 @@ impl service::sending::Data for KeyValueDatabase {
Ok(())
}
fn delete_all_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()> {
fn delete_all_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()> {
let prefix = outgoing_kind.get_prefix();
for (key, _) in self.servercurrentevent_data.scan_prefix(prefix.clone()) {
self.servercurrentevent_data.remove(&key).unwrap();
@ -55,7 +55,7 @@ impl service::sending::Data for KeyValueDatabase {
Ok(())
}
fn queue_requests(&self, requests: &[(&OutgoingKind, SendingEventType)]) -> Result<Vec<Vec<u8>>> {
fn queue_requests(&self, requests: &[(&OutgoingDestination, SendingEventType)]) -> Result<Vec<Vec<u8>>> {
let mut batch = Vec::new();
let mut keys = Vec::new();
for (outgoing_kind, event) in requests {
@ -79,7 +79,7 @@ impl service::sending::Data for KeyValueDatabase {
}
fn queued_requests<'a>(
&'a self, outgoing_kind: &OutgoingKind,
&'a self, outgoing_kind: &OutgoingDestination,
) -> Box<dyn Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a> {
let prefix = outgoing_kind.get_prefix();
return Box::new(
@ -122,7 +122,7 @@ impl service::sending::Data for KeyValueDatabase {
}
#[tracing::instrument(skip(key))]
fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(OutgoingKind, SendingEventType)> {
fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(OutgoingDestination, SendingEventType)> {
// Appservices start with a plus
Ok::<_, Error>(if key.starts_with(b"+") {
let mut parts = key[1..].splitn(2, |&b| b == 0xFF);
@ -136,7 +136,7 @@ fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(OutgoingKind,
.map_err(|_| Error::bad_database("Invalid server bytes in server_currenttransaction"))?;
(
OutgoingKind::Appservice(server),
OutgoingDestination::Appservice(server),
if value.is_empty() {
SendingEventType::Pdu(event.to_vec())
} else {
@ -163,7 +163,7 @@ fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(OutgoingKind,
.ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?;
(
OutgoingKind::Push(user_id, pushkey_string),
OutgoingDestination::Push(user_id, pushkey_string),
if value.is_empty() {
SendingEventType::Pdu(event.to_vec())
} else {
@ -183,7 +183,7 @@ fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(OutgoingKind,
.map_err(|_| Error::bad_database("Invalid server bytes in server_currenttransaction"))?;
(
OutgoingKind::Normal(
OutgoingDestination::Normal(
ServerName::parse(server)
.map_err(|_| Error::bad_database("Invalid server string in server_currenttransaction"))?,
),

View file

@ -1,20 +1,20 @@
use ruma::ServerName;
use super::{OutgoingKind, SendingEventType};
use super::{OutgoingDestination, SendingEventType};
use crate::Result;
type OutgoingSendingIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingKind, SendingEventType)>> + 'a>;
type OutgoingSendingIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingDestination, SendingEventType)>> + 'a>;
type SendingEventTypeIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a>;
pub trait Data: Send + Sync {
fn active_requests(&self) -> OutgoingSendingIter<'_>;
fn active_requests_for(&self, outgoing_kind: &OutgoingKind) -> SendingEventTypeIter<'_>;
fn active_requests_for(&self, outgoing_kind: &OutgoingDestination) -> SendingEventTypeIter<'_>;
fn delete_active_request(&self, key: Vec<u8>) -> Result<()>;
fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()>;
fn delete_all_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()>;
fn queue_requests(&self, requests: &[(&OutgoingKind, SendingEventType)]) -> Result<Vec<Vec<u8>>>;
fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()>;
fn delete_all_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()>;
fn queue_requests(&self, requests: &[(&OutgoingDestination, SendingEventType)]) -> Result<Vec<Vec<u8>>>;
fn queued_requests<'a>(
&'a self, outgoing_kind: &OutgoingKind,
&'a self, outgoing_kind: &OutgoingDestination,
) -> Box<dyn Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a>;
fn mark_as_active(&self, events: &[(SendingEventType, Vec<u8>)]) -> Result<()>;
fn set_latest_educount(&self, server_name: &ServerName, educount: u64) -> Result<()>;

View file

@ -1,6 +1,6 @@
use std::{
cmp,
collections::{BTreeMap, HashMap, HashSet},
collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
fmt::Debug,
sync::Arc,
time::{Duration, Instant},
@ -25,7 +25,7 @@ use ruma::{
events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType},
push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId,
};
use tokio::sync::{mpsc, Mutex, Semaphore};
use tokio::sync::{oneshot, Mutex, Semaphore};
use tracing::{error, warn};
use crate::{service::presence::Presence, services, utils::calculate_hash, Config, Error, PduEvent, Result};
@ -42,15 +42,15 @@ pub struct Service {
/// The state for a given state hash.
pub(super) maximum_requests: Arc<Semaphore>,
pub sender: mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec<u8>)>,
receiver: Mutex<mpsc::UnboundedReceiver<(OutgoingKind, SendingEventType, Vec<u8>)>>,
pub sender: loole::Sender<(OutgoingDestination, SendingEventType, Vec<u8>)>,
receiver: Mutex<loole::Receiver<(OutgoingDestination, SendingEventType, Vec<u8>)>>,
startup_netburst: bool,
startup_netburst_keep: i64,
timeout: u64,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum OutgoingKind {
pub enum OutgoingDestination {
Appservice(String),
Push(OwnedUserId, String), // user and pushkey
Normal(OwnedServerName),
@ -65,14 +65,31 @@ pub enum SendingEventType {
}
enum TransactionStatus {
/// Currently running (for the first time)
Running,
Failed(u32, Instant), // number of times failed, time of last failure
Retrying(u32), // number of times failed
/// Failed, backing off for a retry
Failed {
failures: u32,
waker: Option<oneshot::Sender<()>>,
},
/// Currently retrying
Retrying {
/// number of times failed
failures: u32,
},
}
/// A control-flow enum to dictate what the handler should do after (trying to)
/// prepare a transaction
enum TransactionPrepOutcome {
Send(Vec<SendingEventType>),
Wake(OutgoingDestination),
Nothing,
}
impl Service {
pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> {
let (sender, receiver) = mpsc::unbounded_channel();
let (sender, receiver) = loole::unbounded();
Arc::new(Self {
db,
sender,
@ -86,7 +103,7 @@ impl Service {
#[tracing::instrument(skip(self, pdu_id, user, pushkey))]
pub fn send_pdu_push(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> {
let outgoing_kind = OutgoingKind::Push(user.to_owned(), pushkey);
let outgoing_kind = OutgoingDestination::Push(user.to_owned(), pushkey);
let event = SendingEventType::Pdu(pdu_id.to_owned());
let _cork = services().globals.db.cork()?;
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
@ -99,7 +116,7 @@ impl Service {
#[tracing::instrument(skip(self))]
pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec<u8>) -> Result<()> {
let outgoing_kind = OutgoingKind::Appservice(appservice_id);
let outgoing_kind = OutgoingDestination::Appservice(appservice_id);
let event = SendingEventType::Pdu(pdu_id);
let _cork = services().globals.db.cork()?;
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
@ -126,7 +143,7 @@ impl Service {
pub fn send_pdu_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I, pdu_id: &[u8]) -> Result<()> {
let requests = servers
.into_iter()
.map(|server| (OutgoingKind::Normal(server), SendingEventType::Pdu(pdu_id.to_owned())))
.map(|server| (OutgoingDestination::Normal(server), SendingEventType::Pdu(pdu_id.to_owned())))
.collect::<Vec<_>>();
let _cork = services().globals.db.cork()?;
let keys = self.db.queue_requests(
@ -146,7 +163,7 @@ impl Service {
#[tracing::instrument(skip(self, server, serialized))]
pub fn send_edu_server(&self, server: &ServerName, serialized: Vec<u8>) -> Result<()> {
let outgoing_kind = OutgoingKind::Normal(server.to_owned());
let outgoing_kind = OutgoingDestination::Normal(server.to_owned());
let event = SendingEventType::Edu(serialized);
let _cork = services().globals.db.cork()?;
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
@ -173,7 +190,7 @@ impl Service {
pub fn send_edu_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I, serialized: Vec<u8>) -> Result<()> {
let requests = servers
.into_iter()
.map(|server| (OutgoingKind::Normal(server), SendingEventType::Edu(serialized.clone())))
.map(|server| (OutgoingDestination::Normal(server), SendingEventType::Edu(serialized.clone())))
.collect::<Vec<_>>();
let _cork = services().globals.db.cork()?;
let keys = self.db.queue_requests(
@ -205,7 +222,7 @@ impl Service {
#[tracing::instrument(skip(self, servers))]
pub fn flush_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I) -> Result<()> {
let requests = servers.into_iter().map(OutgoingKind::Normal);
let requests = servers.into_iter().map(OutgoingDestination::Normal);
for outgoing_kind in requests {
self.sender
@ -221,7 +238,7 @@ impl Service {
#[tracing::instrument(skip(self))]
pub fn cleanup_events(&self, appservice_id: String) -> Result<()> {
self.db
.delete_all_requests_for(&OutgoingKind::Appservice(appservice_id))?;
.delete_all_requests_for(&OutgoingDestination::Appservice(appservice_id))?;
Ok(())
}
@ -274,14 +291,17 @@ impl Service {
#[tracing::instrument(skip(self), name = "sender")]
async fn handler(&self) -> Result<()> {
let mut receiver = self.receiver.lock().await;
let new_transactions = self.receiver.lock().await;
let (waking_sender, waking_receiver) = loole::unbounded();
let mut futures = FuturesUnordered::new();
let mut current_transaction_status = HashMap::<OutgoingKind, TransactionStatus>::new();
let mut outgoing = FuturesUnordered::new();
let mut retrying = FuturesUnordered::new();
let mut current_transaction_status = HashMap::<OutgoingDestination, TransactionStatus>::new();
// Retry requests we could not finish yet
if self.startup_netburst {
let mut initial_transactions = HashMap::<OutgoingKind, Vec<SendingEventType>>::new();
let mut initial_transactions = HashMap::<OutgoingDestination, Vec<SendingEventType>>::new();
for (key, outgoing_kind, event) in self.db.active_requests().filter_map(Result::ok) {
let entry = initial_transactions
.entry(outgoing_kind.clone())
@ -300,13 +320,14 @@ impl Service {
for (outgoing_kind, events) in initial_transactions {
current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running);
futures.push(handle_events(outgoing_kind.clone(), events));
outgoing.push(handle_events(outgoing_kind.clone(), events));
}
}
loop {
tokio::select! {
Some(response) = futures.next() => {
Some(response) = outgoing.next() => {
// Outgoing transaction succeeded
match response {
Ok(outgoing_kind) => {
let _cork = services().globals.db.cork();
@ -322,51 +343,155 @@ impl Service {
if !new_events.is_empty() {
// Insert pdus we found
self.db.mark_as_active(&new_events)?;
futures.push(handle_events(
outgoing_kind.clone(),
// Clear retries
current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running);
outgoing.push(handle_events(
outgoing_kind,
new_events.into_iter().map(|(event, _)| event).collect(),
));
} else {
current_transaction_status.remove(&outgoing_kind);
}
}
Err((outgoing_kind, _)) => {
current_transaction_status.entry(outgoing_kind).and_modify(|e| *e = match e {
TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()),
TransactionStatus::Retrying(n) => TransactionStatus::Failed(*n+1, Instant::now()),
TransactionStatus::Failed(_, _) => {
error!("Request that was not even running failed?!");
return
},
});
// Outgoing transaction failed
Err((destination, err)) => {
// Set status to Failed, create timer
let timer = Self::mark_failed_and_backoff(&mut current_transaction_status, destination.clone());
// Add timer to loop
retrying.push(timer);
warn!("Outgoing request to {destination} failed: {err}");
}
};
},
Some((outgoing_kind, event, key)) = receiver.recv() => {
if let Ok(Some(events)) = self.select_events(
&outgoing_kind,
// Transaction retry timers firing
Some(dest) = retrying.next() => {
// Transition Failed => Retrying, return pending old transaction events
match self.select_events(
&dest,
vec![], // will be ignored because fresh == false
&mut current_transaction_status,
false,
) {
Ok(TransactionPrepOutcome::Send(events)) => {
outgoing.push(handle_events(dest, events));
}
Ok(_) => {
// Unreachable because fresh == false
unreachable!("select_events on a stale transaction {} did not return ::Send", dest)
}
Err(err) => {
error!("Ignoring error in (stale) outgoing request ({}) handler: {}", dest, err);
// transaction dropped, so drop destination as well.
current_transaction_status.remove(&dest);
}
}
},
// Explicit wakeups, makes a backoff timer return immediately
Ok(outgoing) = waking_receiver.recv_async() => {
if let Some(TransactionStatus::Failed { waker, .. }) = current_transaction_status.get_mut(&outgoing) {
if let Some(waker) = waker.take() {
_ = waker.send(());
}
}
},
// New transactions to be sent out (from server/user activity)
event = new_transactions.recv_async() => {
if let Ok((dest, event, key)) = event {
match self.select_events(
&dest,
vec![(event, key)],
&mut current_transaction_status,
) {
futures.push(handle_events(outgoing_kind, events));
true) {
Ok(TransactionPrepOutcome::Send(events)) => {
outgoing.push(handle_events(dest, events));
},
Ok(TransactionPrepOutcome::Wake(dest)) => {
waking_sender.send(dest).expect("nothing closes this channel but ourselves");
},
Ok(TransactionPrepOutcome::Nothing) => {},
Err(err) => {
error!("Ignoring error in (fresh) outgoing request ({}) handler: {}", dest, err);
}
}
}
}
}
}
}
/// Generates timer/oneshot, alters status to reflect Failed
///
/// Returns timer/oneshot future to wake up loop for next retry
fn mark_failed_and_backoff(
status: &mut HashMap<OutgoingDestination, TransactionStatus>, dest: OutgoingDestination,
) -> impl std::future::Future<Output = OutgoingDestination> {
let now = Instant::now();
let entry = status
.get_mut(&dest)
.expect("guaranteed to be set before this function");
let failures = match entry {
// Running -> Failed
TransactionStatus::Running => 1,
// Retrying -> Failed
TransactionStatus::Retrying {
failures,
} => *failures + 1,
// The transition of Failed -> Retrying is handled by handle_events
TransactionStatus::Failed {
..
} => {
unreachable!(
"TransactionStatus in inconsistent state: Expected either Running or Retrying, got Failed, \
bailing..."
)
},
};
const ONE_DAY: Duration = Duration::from_secs(60 * 60 * 24);
// Exponential backoff, clamp upper value to one day
let next_wakeup = now + (Duration::from_secs(30) * failures * failures).min(ONE_DAY);
let (fut, waker) = dest.wrap_in_interruptible_sleep(next_wakeup);
*entry = TransactionStatus::Failed {
failures,
waker: Some(waker),
};
fut
}
/// This prepares a transaction, checks the transaction state, and selects
/// appropriate events.
#[tracing::instrument(skip(self, outgoing_kind, new_events, current_transaction_status))]
fn select_events(
&self,
outgoing_kind: &OutgoingKind,
outgoing_kind: &OutgoingDestination,
new_events: Vec<(SendingEventType, Vec<u8>)>, // Events we want to send: event and full key
current_transaction_status: &mut HashMap<OutgoingKind, TransactionStatus>,
) -> Result<Option<Vec<SendingEventType>>> {
let (allow, retry) = self.select_events_current(outgoing_kind.clone(), current_transaction_status)?;
current_transaction_status: &mut HashMap<OutgoingDestination, TransactionStatus>,
fresh: bool, // Wether or not this transaction came from server activity.
) -> Result<TransactionPrepOutcome> {
let (allow, retry, wake_up) =
self.select_events_current(outgoing_kind.clone(), current_transaction_status, fresh)?;
// Nothing can be done for this remote, bail out.
if !allow {
return Ok(None);
if wake_up {
return Ok(TransactionPrepOutcome::Wake(outgoing_kind.clone()));
} else if !allow {
return Ok(TransactionPrepOutcome::Nothing);
}
let _cork = services().globals.db.cork();
@ -374,12 +499,14 @@ impl Service {
// Must retry any previous transaction for this remote.
if retry {
self.db
// We retry the previous transaction
for (_, e) in self
.db
.active_requests_for(outgoing_kind)
.filter_map(Result::ok)
.for_each(|(_, e)| events.push(e));
return Ok(Some(events));
{
events.push(e);
}
}
// Compose the next transaction
@ -392,43 +519,79 @@ impl Service {
}
// Add EDU's into the transaction
if let OutgoingKind::Normal(server_name) = outgoing_kind {
if let OutgoingDestination::Normal(server_name) = outgoing_kind {
if let Ok((select_edus, last_count)) = self.select_edus(server_name) {
events.extend(select_edus.into_iter().map(SendingEventType::Edu));
self.db.set_latest_educount(server_name, last_count)?;
}
}
Ok(Some(events))
Ok(TransactionPrepOutcome::Send(events))
}
#[tracing::instrument(skip(self, outgoing_kind, current_transaction_status))]
fn select_events_current(
&self, outgoing_kind: OutgoingKind, current_transaction_status: &mut HashMap<OutgoingKind, TransactionStatus>,
) -> Result<(bool, bool)> {
let (mut allow, mut retry) = (true, false);
current_transaction_status
.entry(outgoing_kind)
.and_modify(|e| match e {
TransactionStatus::Failed(tries, time) => {
// Fail if a request has failed recently (exponential backoff)
const MAX_DURATION: Duration = Duration::from_secs(60 * 60 * 24);
let mut min_elapsed_duration = Duration::from_secs(self.timeout) * (*tries) * (*tries);
min_elapsed_duration = cmp::min(min_elapsed_duration, MAX_DURATION);
if time.elapsed() < min_elapsed_duration {
allow = false;
} else {
retry = true;
*e = TransactionStatus::Retrying(*tries);
&self, outgoing_kind: OutgoingDestination,
current_transaction_status: &mut HashMap<OutgoingDestination, TransactionStatus>, fresh: bool,
) -> Result<(bool, bool, bool)> {
let (mut allow, mut retry, mut wake_up) = (true, false, false);
let entry = current_transaction_status.entry(outgoing_kind);
if fresh {
// If its fresh, we initialise the status if we need to.
//
// We do nothing if it is already running or retrying.
//
// We return with a wake if it is in the Failed state.
entry
.and_modify(|e| match e {
TransactionStatus::Running
| TransactionStatus::Retrying {
..
} => {
allow = false; // already running
},
TransactionStatus::Failed {
..
} => {
// currently sleeping
wake_up = true;
},
})
.or_insert(TransactionStatus::Running);
} else {
// If it's not fresh, we expect an entry.
//
// We also expect us to be the only one who are touching this destination right
// now, and its a stale transaction, so it must be in the Failed state
match entry {
Entry::Occupied(mut e) => {
let e = e.get_mut();
match e {
TransactionStatus::Failed {
failures,
..
} => {
*e = TransactionStatus::Retrying {
failures: *failures,
};
retry = true;
},
_ => unreachable!(
"Encountered bad state when preparing stale transaction: expected Failed state, got \
Running or Retrying"
),
}
},
TransactionStatus::Running | TransactionStatus::Retrying(_) => {
allow = false; // already running
},
})
.or_insert(TransactionStatus::Running);
Entry::Vacant(_) => unreachable!(
"Encountered bad state when preparing stale transaction: expected Failed state, got vacant state"
),
}
}
Ok((allow, retry))
Ok((allow, retry, wake_up))
}
#[tracing::instrument(skip(self, server_name))]
@ -594,19 +757,21 @@ pub fn select_edus_receipts(
}
async fn handle_events(
kind: OutgoingKind, events: Vec<SendingEventType>,
) -> Result<OutgoingKind, (OutgoingKind, Error)> {
kind: OutgoingDestination, events: Vec<SendingEventType>,
) -> Result<OutgoingDestination, (OutgoingDestination, Error)> {
match kind {
OutgoingKind::Appservice(ref id) => handle_events_kind_appservice(&kind, id, events).await,
OutgoingKind::Push(ref userid, ref pushkey) => handle_events_kind_push(&kind, userid, pushkey, events).await,
OutgoingKind::Normal(ref server) => handle_events_kind_normal(&kind, server, events).await,
OutgoingDestination::Appservice(ref id) => handle_events_kind_appservice(&kind, id, events).await,
OutgoingDestination::Push(ref userid, ref pushkey) => {
handle_events_kind_push(&kind, userid, pushkey, events).await
},
OutgoingDestination::Normal(ref server) => handle_events_kind_normal(&kind, server, events).await,
}
}
#[tracing::instrument(skip(kind, events))]
async fn handle_events_kind_appservice(
kind: &OutgoingKind, id: &String, events: Vec<SendingEventType>,
) -> Result<OutgoingKind, (OutgoingKind, Error)> {
kind: &OutgoingDestination, id: &String, events: Vec<SendingEventType>,
) -> Result<OutgoingDestination, (OutgoingDestination, Error)> {
let mut pdu_jsons = Vec::new();
for event in &events {
@ -674,8 +839,8 @@ async fn handle_events_kind_appservice(
#[tracing::instrument(skip(kind, events))]
async fn handle_events_kind_push(
kind: &OutgoingKind, userid: &OwnedUserId, pushkey: &String, events: Vec<SendingEventType>,
) -> Result<OutgoingKind, (OutgoingKind, Error)> {
kind: &OutgoingDestination, userid: &OwnedUserId, pushkey: &String, events: Vec<SendingEventType>,
) -> Result<OutgoingDestination, (OutgoingDestination, Error)> {
let mut pdus = Vec::new();
for event in &events {
@ -715,7 +880,7 @@ async fn handle_events_kind_push(
let Some(pusher) = services()
.pusher
.get_pusher(userid, pushkey)
.map_err(|e| (kind.clone(), e))?
.map_err(|e| (OutgoingDestination::Push(userid.clone(), pushkey.clone()), e))?
else {
continue;
};
@ -752,8 +917,8 @@ async fn handle_events_kind_push(
#[tracing::instrument(skip(kind, events), name = "")]
async fn handle_events_kind_normal(
kind: &OutgoingKind, dest: &OwnedServerName, events: Vec<SendingEventType>,
) -> Result<OutgoingKind, (OutgoingKind, Error)> {
kind: &OutgoingDestination, dest: &OwnedServerName, events: Vec<SendingEventType>,
) -> Result<OutgoingDestination, (OutgoingDestination, Error)> {
let mut edu_jsons = Vec::new();
let mut pdu_jsons = Vec::new();
@ -826,23 +991,23 @@ async fn handle_events_kind_normal(
response
}
impl OutgoingKind {
impl OutgoingDestination {
#[tracing::instrument(skip(self))]
pub fn get_prefix(&self) -> Vec<u8> {
let mut prefix = match self {
OutgoingKind::Appservice(server) => {
OutgoingDestination::Appservice(server) => {
let mut p = b"+".to_vec();
p.extend_from_slice(server.as_bytes());
p
},
OutgoingKind::Push(user, pushkey) => {
OutgoingDestination::Push(user, pushkey) => {
let mut p = b"$".to_vec();
p.extend_from_slice(user.as_bytes());
p.push(0xFF);
p.extend_from_slice(pushkey.as_bytes());
p
},
OutgoingKind::Normal(server) => {
OutgoingDestination::Normal(server) => {
let mut p = Vec::new();
p.extend_from_slice(server.as_bytes());
p
@ -852,4 +1017,40 @@ impl OutgoingKind {
prefix
}
/// This wraps the OutgoingDestination key in an interruptible sleep future.
///
/// The first return value is the future, the second is the oneshot that
/// interrupts that future, and causes it to return instantly.
fn wrap_in_interruptible_sleep(
self, at: Instant,
) -> (impl std::future::Future<Output = Self>, oneshot::Sender<()>) {
let (tx, rx) = oneshot::channel();
let at = tokio::time::Instant::from_std(at);
(
async move {
_ = tokio::time::timeout_at(at, rx).await;
self
},
tx,
)
}
}
impl std::fmt::Display for OutgoingDestination {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OutgoingDestination::Appservice(appservice_id) => {
write!(f, "Appservice (ID {:?})", appservice_id)
},
OutgoingDestination::Push(user, push_key) => {
write!(f, "User Push Service (for {:?}, with key {:?})", user, push_key)
},
OutgoingDestination::Normal(server) => {
write!(f, "Matrix Server ({:?})", server)
},
}
}
}