Compare commits

...
This repository has been archived on 2025-08-14. You can view files and clone it, but you cannot make any changes to it's state, such as pushing and creating new issues, pull requests or comments.

10 commits

Author SHA1 Message Date
strawberry
d5a9c98657 make federation retry timer-based
Signed-off-by: strawberry <strawberry@puppygock.gay>
2024-04-17 22:14:30 -04:00
strawberry
395b466b4a rename OutgoingKind enum to OutgoingDestination
Signed-off-by: strawberry <strawberry@puppygock.gay>
2024-04-17 20:11:18 -04:00
strawberry
0376b58006 use latest main rev for hickory (and for reqwest)
Signed-off-by: strawberry <strawberry@puppygock.gay>
2024-04-17 20:05:56 -04:00
strawberry
78c1e2f427 adjust DNS default config options
Signed-off-by: strawberry <strawberry@puppygock.gay>
2024-04-17 19:49:19 -04:00
strawberry
6614b8f6bf ci: remove download env
Signed-off-by: strawberry <strawberry@puppygock.gay>
2024-04-17 19:15:12 -04:00
strawberry
c2fa8e6f8d split up CI steps
Signed-off-by: strawberry <strawberry@puppygock.gay>
2024-04-17 17:59:01 -04:00
strawberry
b8108f5897 cargo fmt
Signed-off-by: strawberry <strawberry@puppygock.gay>
2024-04-17 17:50:34 -04:00
morguldir
cf8358cbe6 Remove extra test flag when publishing to ghcr in the CI
test -n checks if a string is longer than non-zero, but we just need a compare

Signed-off-by: morguldir <morguldir@protonmail.com>
2024-04-17 17:22:52 -04:00
strawberry
7ecc570bb8 Revert "dont use loole for sending channel code"
This reverts commit d0a9666a29.
2024-04-17 15:16:01 -04:00
strawberry
002799177d fix wrong warn message
Signed-off-by: strawberry <strawberry@puppygock.gay>
2024-04-17 15:15:52 -04:00
9 changed files with 480 additions and 394 deletions

View file

@ -26,11 +26,9 @@ permissions:
contents: read contents: read
jobs: jobs:
ci: setup:
name: CI and Artifacts name: CI Setup
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Sync repository - name: Sync repository
uses: actions/checkout@v4 uses: actions/checkout@v4
@ -94,194 +92,78 @@ jobs:
./bin/nix-build-and-cache .#devShells.x86_64-linux.default.inputDerivation ./bin/nix-build-and-cache .#devShells.x86_64-linux.default.inputDerivation
build-and-test:
name: CI and Artifacts
needs: setup
runs-on: ubuntu-latest
strategy:
matrix:
target: [
"static-x86_64-unknown-linux-musl",
"static-x86_64-unknown-linux-musl-jemalloc",
"static-x86_64-unknown-linux-musl-hmalloc",
"static-aarch64-unknown-linux-musl",
"static-aarch64-unknown-linux-musl-jemalloc",
"static-aarch64-unknown-linux-musl-hmalloc",
]
oci-target: [
"x86_64-unknown-linux-gnu",
"x86_64-unknown-linux-musl",
"x86_64-unknown-linux-musl-jemalloc",
"x86_64-unknown-linux-musl-hmalloc",
"aarch64-unknown-linux-musl",
"aarch64-unknown-linux-musl-jemalloc",
"aarch64-unknown-linux-musl-hmalloc",
]
steps:
- name: Perform continuous integration - name: Perform continuous integration
run: direnv exec . engage run: direnv exec . engage
- name: Build static-x86_64-unknown-linux-musl and Create static deb-x86_64-unknown-linux-musl - name: Build static artifacts
run: | run: |
./bin/nix-build-and-cache .#static-x86_64-unknown-linux-musl ./bin/nix-build-and-cache .#${{ matrix.target }}
mkdir -p target/release mkdir -p target/release
cp -v -f result/bin/conduit target/release cp -v -f result/bin/conduit target/release
direnv exec . cargo deb --no-build direnv exec . cargo deb --no-build --output target/debian/${{ matrix.target }}.deb
- name: Upload artifact static-x86_64-unknown-linux-musl - name: Upload static artifacts
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v4
with: with:
name: static-x86_64-unknown-linux-musl name: ${{ matrix.target }}
path: result/bin/conduit path: result/bin/conduit
if-no-files-found: error if-no-files-found: error
- name: Upload artifact deb-x86_64-unknown-linux-musl - name: Upload static deb artifacts
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v4
with: with:
name: x86_64-unknown-linux-musl.deb name: ${{ matrix.target }}.deb
path: target/debian/*.deb path: target/debian/${{ matrix.target }}.deb
if-no-files-found: error if-no-files-found: error
- name: Build static-x86_64-unknown-linux-musl-jemalloc and Create static deb-x86_64-unknown-linux-musl-jemalloc
- name: Build OCI images
run: | run: |
./bin/nix-build-and-cache .#static-x86_64-unknown-linux-musl-jemalloc ./bin/nix-build-and-cache .#oci-image-${{ matrix.oci-target }}
mkdir -p target/release cp -v -f result oci-image-${{ matrix.oci-target }}.tar.gz
cp -v -f result/bin/conduit target/release
direnv exec . cargo deb --no-build
- name: Upload artifact static-x86_64-unknown-linux-musl-jemalloc - name: Upload OCI image artifacts
uses: actions/upload-artifact@v4 uses: actions/upload-artifact@v4
with: with:
name: static-x86_64-unknown-linux-musl-jemalloc name: oci-image-${{ matrix.oci-target }}
path: result/bin/conduit path: oci-image-${{ matrix.oci-target }}.tar.gz
if-no-files-found: error
- name: Upload artifact deb-x86_64-unknown-linux-musl-jemalloc
uses: actions/upload-artifact@v4
with:
name: x86_64-unknown-linux-musl-jemalloc.deb
path: target/debian/*.deb
if-no-files-found: error
- name: Build static-x86_64-unknown-linux-musl-hmalloc and Create static deb-x86_64-unknown-linux-musl-hmalloc
run: |
./bin/nix-build-and-cache .#static-x86_64-unknown-linux-musl-hmalloc
mkdir -p target/release
cp -v -f result/bin/conduit target/release
direnv exec . cargo deb --no-build
- name: Upload artifact static-x86_64-unknown-linux-musl-hmalloc
uses: actions/upload-artifact@v4
with:
name: static-x86_64-unknown-linux-musl-hmalloc
path: result/bin/conduit
if-no-files-found: error
- name: Upload artifact deb-x86_64-unknown-linux-musl-hmalloc
uses: actions/upload-artifact@v4
with:
name: x86_64-unknown-linux-musl-hmalloc.deb
path: target/debian/*.deb
if-no-files-found: error
- name: Build static-aarch64-unknown-linux-musl
run: |
./bin/nix-build-and-cache .#static-aarch64-unknown-linux-musl
- name: Upload artifact static-aarch64-unknown-linux-musl
uses: actions/upload-artifact@v4
with:
name: static-aarch64-unknown-linux-musl
path: result/bin/conduit
if-no-files-found: error
- name: Build static-aarch64-unknown-linux-musl-jemalloc
run: |
./bin/nix-build-and-cache .#static-aarch64-unknown-linux-musl-jemalloc
- name: Upload artifact static-aarch64-unknown-linux-musl-jemalloc
uses: actions/upload-artifact@v4
with:
name: static-aarch64-unknown-linux-musl-jemalloc
path: result/bin/conduit
if-no-files-found: error
- name: Build static-aarch64-unknown-linux-musl-hmalloc
run: |
./bin/nix-build-and-cache .#static-aarch64-unknown-linux-musl-hmalloc
- name: Upload artifact static-aarch64-unknown-linux-musl-hmalloc
uses: actions/upload-artifact@v4
with:
name: static-aarch64-unknown-linux-musl-hmalloc
path: result/bin/conduit
if-no-files-found: error
- name: Build oci-image-x86_64-unknown-linux-gnu
run: |
./bin/nix-build-and-cache .#oci-image
cp -v -f result oci-image-amd64.tar.gz
- name: Upload artifact oci-image-x86_64-unknown-linux-gnu
uses: actions/upload-artifact@v4
with:
name: oci-image-x86_64-unknown-linux-gnu
path: oci-image-amd64.tar.gz
if-no-files-found: error
# don't compress again
compression-level: 0
- name: Build oci-image-x86_64-unknown-linux-gnu-jemalloc
run: |
./bin/nix-build-and-cache .#oci-image-jemalloc
cp -v -f result oci-image-amd64.tar.gz
- name: Upload artifact oci-image-x86_64-unknown-linux-gnu-jemalloc
uses: actions/upload-artifact@v4
with:
name: oci-image-x86_64-unknown-linux-gnu-jemalloc
path: oci-image-amd64.tar.gz
if-no-files-found: error
# don't compress again
compression-level: 0
- name: Build oci-image-x86_64-unknown-linux-gnu-hmalloc
run: |
./bin/nix-build-and-cache .#oci-image-hmalloc
cp -v -f result oci-image-amd64.tar.gz
- name: Upload artifact oci-image-x86_64-unknown-linux-gnu-hmalloc
uses: actions/upload-artifact@v4
with:
name: oci-image-x86_64-unknown-linux-gnu-hmalloc
path: oci-image-amd64.tar.gz
if-no-files-found: error if-no-files-found: error
# don't compress again # don't compress again
compression-level: 0 compression-level: 0
- name: Build oci-image-aarch64-unknown-linux-musl
run: |
./bin/nix-build-and-cache .#oci-image-aarch64-unknown-linux-musl
cp -v -f result oci-image-arm64v8.tar.gz
- name: Upload artifact oci-image-aarch64-unknown-linux-musl
uses: actions/upload-artifact@v4
with:
name: oci-image-aarch64-unknown-linux-musl
path: oci-image-arm64v8.tar.gz
if-no-files-found: error
# don't compress again
compression-level: 0
- name: Build oci-image-aarch64-unknown-linux-musl-jemalloc
run: |
./bin/nix-build-and-cache .#oci-image-aarch64-unknown-linux-musl-jemalloc
cp -v -f result oci-image-arm64v8.tar.gz
- name: Upload artifact oci-image-aarch64-unknown-linux-musl-jemalloc
uses: actions/upload-artifact@v4
with:
name: oci-image-aarch64-unknown-linux-musl-jemalloc
path: oci-image-arm64v8.tar.gz
if-no-files-found: error
# don't compress again
compression-level: 0
- name: Build oci-image-aarch64-unknown-linux-musl-hmalloc
run: |
./bin/nix-build-and-cache .#oci-image-aarch64-unknown-linux-musl-hmalloc
cp -v -f result oci-image-arm64v8.tar.gz
- name: Upload artifact oci-image-aarch64-unknown-linux-musl-hmalloc
uses: actions/upload-artifact@v4
with:
name: oci-image-aarch64-unknown-linux-musl-hmalloc
path: oci-image-arm64v8.tar.gz
if-no-files-found: error
# don't compress again
compression-level: 0
publish:
needs: build-and-test
runs-on: ubuntu-latest
steps:
- name: Extract metadata for Dockerhub - name: Extract metadata for Dockerhub
env: env:
REGISTRY: registry.hub.docker.com REGISTRY: registry.hub.docker.com
@ -378,7 +260,7 @@ jobs:
docker manifest create $IMAGE_NAME:$GITHUB_REF_NAME --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 docker manifest create $IMAGE_NAME:$GITHUB_REF_NAME --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker manifest push $IMAGE_NAME:$GITHUB_REF_NAME docker manifest push $IMAGE_NAME:$GITHUB_REF_NAME
# Tag "main" as latest (stable branch) # Tag "main" as latest (stable branch)
if [[ -n "$GITHUB_REF_NAME" = "main" ]]; then if [[ "$GITHUB_REF_NAME" = "main" ]]; then
docker manifest create $IMAGE_NAME:latest --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8 docker manifest create $IMAGE_NAME:latest --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_AMD64 --amend $IMAGE_NAME:$GITHUB_SHA-$IMAGE_SUFFIX_ARM64V8
docker manifest push $IMAGE_NAME:latest docker manifest push $IMAGE_NAME:latest
fi fi

35
Cargo.lock generated
View file

@ -111,6 +111,17 @@ dependencies = [
"zstd-safe", "zstd-safe",
] ]
[[package]]
name = "async-recursion"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30c5ef0ede93efbf733c1a727f3b6b5a1060bbedd5600183e66f6e4be4af0ec5"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.58",
]
[[package]] [[package]]
name = "async-trait" name = "async-trait"
version = "0.1.80" version = "0.1.80"
@ -1045,9 +1056,9 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]] [[package]]
name = "hickory-proto" name = "hickory-proto"
version = "0.24.0" version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/hickory-dns/hickory-dns?rev=94ac564c3f677e038f7255ddb762e9301d0f2c5d#94ac564c3f677e038f7255ddb762e9301d0f2c5d"
checksum = "091a6fbccf4860009355e3efc52ff4acf37a63489aad7435372d44ceeb6fbbcf"
dependencies = [ dependencies = [
"async-recursion",
"async-trait", "async-trait",
"cfg-if", "cfg-if",
"data-encoding", "data-encoding",
@ -1055,7 +1066,7 @@ dependencies = [
"futures-channel", "futures-channel",
"futures-io", "futures-io",
"futures-util", "futures-util",
"idna 0.4.0", "idna",
"ipnet", "ipnet",
"once_cell", "once_cell",
"rand", "rand",
@ -1069,8 +1080,7 @@ dependencies = [
[[package]] [[package]]
name = "hickory-resolver" name = "hickory-resolver"
version = "0.24.0" version = "0.24.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/hickory-dns/hickory-dns?rev=94ac564c3f677e038f7255ddb762e9301d0f2c5d#94ac564c3f677e038f7255ddb762e9301d0f2c5d"
checksum = "35b8f021164e6a984c9030023544c57789c51760065cd510572fedcfb04164e8"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"futures-util", "futures-util",
@ -1311,16 +1321,6 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "idna"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d20d6b07bfbc108882d88ed8e37d39636dcc260e15e30c45e6ba089610b917c"
dependencies = [
"unicode-bidi",
"unicode-normalization",
]
[[package]] [[package]]
name = "idna" name = "idna"
version = "0.5.0" version = "0.5.0"
@ -2285,8 +2285,7 @@ checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56"
[[package]] [[package]]
name = "reqwest" name = "reqwest"
version = "0.11.27" version = "0.11.27"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://github.com/girlbossceo/reqwest?rev=319335e000fdea2e3d01f44245c8a21864d0c1c3#319335e000fdea2e3d01f44245c8a21864d0c1c3"
checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62"
dependencies = [ dependencies = [
"async-compression", "async-compression",
"base64 0.21.7", "base64 0.21.7",
@ -3763,7 +3762,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633" checksum = "31e6302e3bb753d46e83516cae55ae196fc0c309407cf11ab35cc51a4c2a4633"
dependencies = [ dependencies = [
"form_urlencoded", "form_urlencoded",
"idna 0.5.0", "idna",
"percent-encoding", "percent-encoding",
"serde", "serde",
] ]

View file

@ -29,9 +29,6 @@ base64 = "0.22.0"
# Used when hashing the state # Used when hashing the state
ring = "0.17.8" ring = "0.17.8"
# Used when querying the SRV record of other servers
hickory-resolver = "0.24.0"
# Used to find matching events for appservices # Used to find matching events for appservices
regex = "1.10.4" regex = "1.10.4"
@ -107,9 +104,11 @@ version = "0.14"
features = ["server", "http1", "http2"] features = ["server", "http1", "http2"]
[dependencies.reqwest] [dependencies.reqwest]
version = "0.11.27" #version = "0.11.27"
git = "https://github.com/girlbossceo/reqwest"
rev = "319335e000fdea2e3d01f44245c8a21864d0c1c3"
default-features = false default-features = false
features = ["rustls-tls-native-roots", "socks", "trust-dns"] features = ["rustls-tls-native-roots", "socks", "hickory-dns"]
# all the serde stuff # all the serde stuff
# Used for pdu definition # Used for pdu definition
@ -272,6 +271,10 @@ features = [
"unstable-extensible-events", "unstable-extensible-events",
] ]
[dependencies.hickory-resolver]
git = "https://github.com/hickory-dns/hickory-dns"
rev = "94ac564c3f677e038f7255ddb762e9301d0f2c5d"
[dependencies.rust-rocksdb] [dependencies.rust-rocksdb]
git = "https://github.com/zaidoon1/rust-rocksdb" git = "https://github.com/zaidoon1/rust-rocksdb"
branch = "master" branch = "master"

View file

@ -477,19 +477,19 @@ allow_profile_lookup_federation_requests = true
# Minimum time-to-live in seconds for entries in the DNS cache. The default may appear high to most # Minimum time-to-live in seconds for entries in the DNS cache. The default may appear high to most
# administrators; this is by design. Only decrease this if you are using an external DNS cache. # administrators; this is by design. Only decrease this if you are using an external DNS cache.
#dns_min_ttl = 60 * 90 #dns_min_ttl = 10800
# Minimum time-to-live in seconds for NXDOMAIN entries in the DNS cache. This value is critical for # Minimum time-to-live in seconds for NXDOMAIN entries in the DNS cache. This value is critical for
# the server to federate efficiently. NXDOMAIN's are assumed to not be returning to the federation # the server to federate efficiently. NXDOMAIN's are assumed to not be returning to the federation
# and aggressively cached rather than constantly rechecked. # and aggressively cached rather than constantly rechecked.
#dns_min_ttl_nxdomain = 60 * 60 * 24 * 3 #dns_min_ttl_nxdomain = 86400
# The number of seconds to wait for a reply to a DNS query. Please note that recursive queries can # The number of seconds to wait for a reply to a DNS query. Please note that recursive queries can
# take up to several seconds for some domains, so this value should not be too low. # take up to several seconds for some domains, so this value should not be too low.
#dns_timeout = 5 #dns_timeout = 10
# Number of retries after a timeout. # Number of retries after a timeout.
#dns_attempts = 5 #dns_attempts = 10
# Fallback to TCP on DNS errors. Set this to false if unsupported by nameserver. # Fallback to TCP on DNS errors. Set this to false if unsupported by nameserver.
#dns_tcp_fallback = true #dns_tcp_fallback = true
@ -498,7 +498,7 @@ allow_profile_lookup_federation_requests = true
# This can avoid useless DNS queries if the first nameserver responds with NXDOMAIN or an empty NOERROR response. # This can avoid useless DNS queries if the first nameserver responds with NXDOMAIN or an empty NOERROR response.
# #
# The default is to query one nameserver and stop (false). # The default is to query one nameserver and stop (false).
#query_all_nameservers = false #query_all_nameservers = true
### Request Timeouts, Connection Timeouts, and Connection Pooling ### Request Timeouts, Connection Timeouts, and Connection Pooling

View file

@ -1554,7 +1554,8 @@ pub async fn create_invite_route(body: Ruma<create_invite::v2::Request>) -> Resu
.contains(&server.to_owned()) .contains(&server.to_owned())
{ {
warn!( warn!(
"Received federated/remote invite from banned server {sender_servername} for room ID {}. Rejecting.", "Received federated/remote invite from server {sender_servername} for room ID {} which has a banned \
server name. Rejecting.",
body.room_id body.room_id
); );
return Err(Error::BadRequest( return Err(Error::BadRequest(

View file

@ -100,7 +100,7 @@ pub struct Config {
pub dns_timeout: u64, pub dns_timeout: u64,
#[serde(default = "true_fn")] #[serde(default = "true_fn")]
pub dns_tcp_fallback: bool, pub dns_tcp_fallback: bool,
#[serde(default)] #[serde(default = "true_fn")]
pub query_all_nameservers: bool, pub query_all_nameservers: bool,
#[serde(default = "default_max_request_size")] #[serde(default = "default_max_request_size")]
pub max_request_size: u32, pub max_request_size: u32,
@ -851,13 +851,13 @@ fn default_cleanup_second_interval() -> u32 {
fn default_dns_cache_entries() -> u32 { 12288 } fn default_dns_cache_entries() -> u32 { 12288 }
fn default_dns_min_ttl() -> u64 { 60 * 90 } fn default_dns_min_ttl() -> u64 { 60 * 180 }
fn default_dns_min_ttl_nxdomain() -> u64 { 60 * 60 * 24 * 3 } fn default_dns_min_ttl_nxdomain() -> u64 { 60 * 60 * 24 }
fn default_dns_attempts() -> u16 { 5 } fn default_dns_attempts() -> u16 { 10 }
fn default_dns_timeout() -> u64 { 5 } fn default_dns_timeout() -> u64 { 10 }
fn default_max_request_size() -> u32 { fn default_max_request_size() -> u32 {
20 * 1024 * 1024 // Default to 20 MB 20 * 1024 * 1024 // Default to 20 MB

View file

@ -4,7 +4,7 @@ use crate::{
database::KeyValueDatabase, database::KeyValueDatabase,
service::{ service::{
self, self,
sending::{OutgoingKind, SendingEventType}, sending::{OutgoingDestination, SendingEventType},
}, },
services, utils, Error, Result, services, utils, Error, Result,
}; };
@ -12,7 +12,7 @@ use crate::{
impl service::sending::Data for KeyValueDatabase { impl service::sending::Data for KeyValueDatabase {
fn active_requests<'a>( fn active_requests<'a>(
&'a self, &'a self,
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingKind, SendingEventType)>> + 'a> { ) -> Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingDestination, SendingEventType)>> + 'a> {
Box::new( Box::new(
self.servercurrentevent_data self.servercurrentevent_data
.iter() .iter()
@ -21,7 +21,7 @@ impl service::sending::Data for KeyValueDatabase {
} }
fn active_requests_for<'a>( fn active_requests_for<'a>(
&'a self, outgoing_kind: &OutgoingKind, &'a self, outgoing_kind: &OutgoingDestination,
) -> Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a> { ) -> Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a> {
let prefix = outgoing_kind.get_prefix(); let prefix = outgoing_kind.get_prefix();
Box::new( Box::new(
@ -33,7 +33,7 @@ impl service::sending::Data for KeyValueDatabase {
fn delete_active_request(&self, key: Vec<u8>) -> Result<()> { self.servercurrentevent_data.remove(&key) } fn delete_active_request(&self, key: Vec<u8>) -> Result<()> { self.servercurrentevent_data.remove(&key) }
fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()> { fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()> {
let prefix = outgoing_kind.get_prefix(); let prefix = outgoing_kind.get_prefix();
for (key, _) in self.servercurrentevent_data.scan_prefix(prefix) { for (key, _) in self.servercurrentevent_data.scan_prefix(prefix) {
self.servercurrentevent_data.remove(&key)?; self.servercurrentevent_data.remove(&key)?;
@ -42,7 +42,7 @@ impl service::sending::Data for KeyValueDatabase {
Ok(()) Ok(())
} }
fn delete_all_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()> { fn delete_all_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()> {
let prefix = outgoing_kind.get_prefix(); let prefix = outgoing_kind.get_prefix();
for (key, _) in self.servercurrentevent_data.scan_prefix(prefix.clone()) { for (key, _) in self.servercurrentevent_data.scan_prefix(prefix.clone()) {
self.servercurrentevent_data.remove(&key).unwrap(); self.servercurrentevent_data.remove(&key).unwrap();
@ -55,7 +55,7 @@ impl service::sending::Data for KeyValueDatabase {
Ok(()) Ok(())
} }
fn queue_requests(&self, requests: &[(&OutgoingKind, SendingEventType)]) -> Result<Vec<Vec<u8>>> { fn queue_requests(&self, requests: &[(&OutgoingDestination, SendingEventType)]) -> Result<Vec<Vec<u8>>> {
let mut batch = Vec::new(); let mut batch = Vec::new();
let mut keys = Vec::new(); let mut keys = Vec::new();
for (outgoing_kind, event) in requests { for (outgoing_kind, event) in requests {
@ -79,7 +79,7 @@ impl service::sending::Data for KeyValueDatabase {
} }
fn queued_requests<'a>( fn queued_requests<'a>(
&'a self, outgoing_kind: &OutgoingKind, &'a self, outgoing_kind: &OutgoingDestination,
) -> Box<dyn Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a> { ) -> Box<dyn Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a> {
let prefix = outgoing_kind.get_prefix(); let prefix = outgoing_kind.get_prefix();
return Box::new( return Box::new(
@ -122,7 +122,7 @@ impl service::sending::Data for KeyValueDatabase {
} }
#[tracing::instrument(skip(key))] #[tracing::instrument(skip(key))]
fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(OutgoingKind, SendingEventType)> { fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(OutgoingDestination, SendingEventType)> {
// Appservices start with a plus // Appservices start with a plus
Ok::<_, Error>(if key.starts_with(b"+") { Ok::<_, Error>(if key.starts_with(b"+") {
let mut parts = key[1..].splitn(2, |&b| b == 0xFF); let mut parts = key[1..].splitn(2, |&b| b == 0xFF);
@ -136,7 +136,7 @@ fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(OutgoingKind,
.map_err(|_| Error::bad_database("Invalid server bytes in server_currenttransaction"))?; .map_err(|_| Error::bad_database("Invalid server bytes in server_currenttransaction"))?;
( (
OutgoingKind::Appservice(server), OutgoingDestination::Appservice(server),
if value.is_empty() { if value.is_empty() {
SendingEventType::Pdu(event.to_vec()) SendingEventType::Pdu(event.to_vec())
} else { } else {
@ -163,7 +163,7 @@ fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(OutgoingKind,
.ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?; .ok_or_else(|| Error::bad_database("Invalid bytes in servercurrentpdus."))?;
( (
OutgoingKind::Push(user_id, pushkey_string), OutgoingDestination::Push(user_id, pushkey_string),
if value.is_empty() { if value.is_empty() {
SendingEventType::Pdu(event.to_vec()) SendingEventType::Pdu(event.to_vec())
} else { } else {
@ -183,7 +183,7 @@ fn parse_servercurrentevent(key: &[u8], value: Vec<u8>) -> Result<(OutgoingKind,
.map_err(|_| Error::bad_database("Invalid server bytes in server_currenttransaction"))?; .map_err(|_| Error::bad_database("Invalid server bytes in server_currenttransaction"))?;
( (
OutgoingKind::Normal( OutgoingDestination::Normal(
ServerName::parse(server) ServerName::parse(server)
.map_err(|_| Error::bad_database("Invalid server string in server_currenttransaction"))?, .map_err(|_| Error::bad_database("Invalid server string in server_currenttransaction"))?,
), ),

View file

@ -1,20 +1,20 @@
use ruma::ServerName; use ruma::ServerName;
use super::{OutgoingKind, SendingEventType}; use super::{OutgoingDestination, SendingEventType};
use crate::Result; use crate::Result;
type OutgoingSendingIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingKind, SendingEventType)>> + 'a>; type OutgoingSendingIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, OutgoingDestination, SendingEventType)>> + 'a>;
type SendingEventTypeIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a>; type SendingEventTypeIter<'a> = Box<dyn Iterator<Item = Result<(Vec<u8>, SendingEventType)>> + 'a>;
pub trait Data: Send + Sync { pub trait Data: Send + Sync {
fn active_requests(&self) -> OutgoingSendingIter<'_>; fn active_requests(&self) -> OutgoingSendingIter<'_>;
fn active_requests_for(&self, outgoing_kind: &OutgoingKind) -> SendingEventTypeIter<'_>; fn active_requests_for(&self, outgoing_kind: &OutgoingDestination) -> SendingEventTypeIter<'_>;
fn delete_active_request(&self, key: Vec<u8>) -> Result<()>; fn delete_active_request(&self, key: Vec<u8>) -> Result<()>;
fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()>; fn delete_all_active_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()>;
fn delete_all_requests_for(&self, outgoing_kind: &OutgoingKind) -> Result<()>; fn delete_all_requests_for(&self, outgoing_kind: &OutgoingDestination) -> Result<()>;
fn queue_requests(&self, requests: &[(&OutgoingKind, SendingEventType)]) -> Result<Vec<Vec<u8>>>; fn queue_requests(&self, requests: &[(&OutgoingDestination, SendingEventType)]) -> Result<Vec<Vec<u8>>>;
fn queued_requests<'a>( fn queued_requests<'a>(
&'a self, outgoing_kind: &OutgoingKind, &'a self, outgoing_kind: &OutgoingDestination,
) -> Box<dyn Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a>; ) -> Box<dyn Iterator<Item = Result<(SendingEventType, Vec<u8>)>> + 'a>;
fn mark_as_active(&self, events: &[(SendingEventType, Vec<u8>)]) -> Result<()>; fn mark_as_active(&self, events: &[(SendingEventType, Vec<u8>)]) -> Result<()>;
fn set_latest_educount(&self, server_name: &ServerName, educount: u64) -> Result<()>; fn set_latest_educount(&self, server_name: &ServerName, educount: u64) -> Result<()>;

View file

@ -1,6 +1,6 @@
use std::{ use std::{
cmp, cmp,
collections::{BTreeMap, HashMap, HashSet}, collections::{hash_map::Entry, BTreeMap, HashMap, HashSet},
fmt::Debug, fmt::Debug,
sync::Arc, sync::Arc,
time::{Duration, Instant}, time::{Duration, Instant},
@ -25,7 +25,7 @@ use ruma::{
events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType}, events::{push_rules::PushRulesEvent, receipt::ReceiptType, AnySyncEphemeralRoomEvent, GlobalAccountDataEventType},
push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId, push, uint, MilliSecondsSinceUnixEpoch, OwnedServerName, OwnedUserId, RoomId, ServerName, UInt, UserId,
}; };
use tokio::sync::{mpsc, Mutex, Semaphore}; use tokio::sync::{oneshot, Mutex, Semaphore};
use tracing::{error, warn}; use tracing::{error, warn};
use crate::{service::presence::Presence, services, utils::calculate_hash, Config, Error, PduEvent, Result}; use crate::{service::presence::Presence, services, utils::calculate_hash, Config, Error, PduEvent, Result};
@ -42,15 +42,15 @@ pub struct Service {
/// The state for a given state hash. /// The state for a given state hash.
pub(super) maximum_requests: Arc<Semaphore>, pub(super) maximum_requests: Arc<Semaphore>,
pub sender: mpsc::UnboundedSender<(OutgoingKind, SendingEventType, Vec<u8>)>, pub sender: loole::Sender<(OutgoingDestination, SendingEventType, Vec<u8>)>,
receiver: Mutex<mpsc::UnboundedReceiver<(OutgoingKind, SendingEventType, Vec<u8>)>>, receiver: Mutex<loole::Receiver<(OutgoingDestination, SendingEventType, Vec<u8>)>>,
startup_netburst: bool, startup_netburst: bool,
startup_netburst_keep: i64, startup_netburst_keep: i64,
timeout: u64, timeout: u64,
} }
#[derive(Clone, Debug, PartialEq, Eq, Hash)] #[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub enum OutgoingKind { pub enum OutgoingDestination {
Appservice(String), Appservice(String),
Push(OwnedUserId, String), // user and pushkey Push(OwnedUserId, String), // user and pushkey
Normal(OwnedServerName), Normal(OwnedServerName),
@ -65,14 +65,31 @@ pub enum SendingEventType {
} }
enum TransactionStatus { enum TransactionStatus {
/// Currently running (for the first time)
Running, Running,
Failed(u32, Instant), // number of times failed, time of last failure /// Failed, backing off for a retry
Retrying(u32), // number of times failed Failed {
failures: u32,
waker: Option<oneshot::Sender<()>>,
},
/// Currently retrying
Retrying {
/// number of times failed
failures: u32,
},
}
/// A control-flow enum to dictate what the handler should do after (trying to)
/// prepare a transaction
enum TransactionPrepOutcome {
Send(Vec<SendingEventType>),
Wake(OutgoingDestination),
Nothing,
} }
impl Service { impl Service {
pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> { pub fn build(db: &'static dyn Data, config: &Config) -> Arc<Self> {
let (sender, receiver) = mpsc::unbounded_channel(); let (sender, receiver) = loole::unbounded();
Arc::new(Self { Arc::new(Self {
db, db,
sender, sender,
@ -86,7 +103,7 @@ impl Service {
#[tracing::instrument(skip(self, pdu_id, user, pushkey))] #[tracing::instrument(skip(self, pdu_id, user, pushkey))]
pub fn send_pdu_push(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> { pub fn send_pdu_push(&self, pdu_id: &[u8], user: &UserId, pushkey: String) -> Result<()> {
let outgoing_kind = OutgoingKind::Push(user.to_owned(), pushkey); let outgoing_kind = OutgoingDestination::Push(user.to_owned(), pushkey);
let event = SendingEventType::Pdu(pdu_id.to_owned()); let event = SendingEventType::Pdu(pdu_id.to_owned());
let _cork = services().globals.db.cork()?; let _cork = services().globals.db.cork()?;
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
@ -99,7 +116,7 @@ impl Service {
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec<u8>) -> Result<()> { pub fn send_pdu_appservice(&self, appservice_id: String, pdu_id: Vec<u8>) -> Result<()> {
let outgoing_kind = OutgoingKind::Appservice(appservice_id); let outgoing_kind = OutgoingDestination::Appservice(appservice_id);
let event = SendingEventType::Pdu(pdu_id); let event = SendingEventType::Pdu(pdu_id);
let _cork = services().globals.db.cork()?; let _cork = services().globals.db.cork()?;
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
@ -126,7 +143,7 @@ impl Service {
pub fn send_pdu_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I, pdu_id: &[u8]) -> Result<()> { pub fn send_pdu_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I, pdu_id: &[u8]) -> Result<()> {
let requests = servers let requests = servers
.into_iter() .into_iter()
.map(|server| (OutgoingKind::Normal(server), SendingEventType::Pdu(pdu_id.to_owned()))) .map(|server| (OutgoingDestination::Normal(server), SendingEventType::Pdu(pdu_id.to_owned())))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let _cork = services().globals.db.cork()?; let _cork = services().globals.db.cork()?;
let keys = self.db.queue_requests( let keys = self.db.queue_requests(
@ -146,7 +163,7 @@ impl Service {
#[tracing::instrument(skip(self, server, serialized))] #[tracing::instrument(skip(self, server, serialized))]
pub fn send_edu_server(&self, server: &ServerName, serialized: Vec<u8>) -> Result<()> { pub fn send_edu_server(&self, server: &ServerName, serialized: Vec<u8>) -> Result<()> {
let outgoing_kind = OutgoingKind::Normal(server.to_owned()); let outgoing_kind = OutgoingDestination::Normal(server.to_owned());
let event = SendingEventType::Edu(serialized); let event = SendingEventType::Edu(serialized);
let _cork = services().globals.db.cork()?; let _cork = services().globals.db.cork()?;
let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?; let keys = self.db.queue_requests(&[(&outgoing_kind, event.clone())])?;
@ -173,7 +190,7 @@ impl Service {
pub fn send_edu_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I, serialized: Vec<u8>) -> Result<()> { pub fn send_edu_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I, serialized: Vec<u8>) -> Result<()> {
let requests = servers let requests = servers
.into_iter() .into_iter()
.map(|server| (OutgoingKind::Normal(server), SendingEventType::Edu(serialized.clone()))) .map(|server| (OutgoingDestination::Normal(server), SendingEventType::Edu(serialized.clone())))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let _cork = services().globals.db.cork()?; let _cork = services().globals.db.cork()?;
let keys = self.db.queue_requests( let keys = self.db.queue_requests(
@ -205,7 +222,7 @@ impl Service {
#[tracing::instrument(skip(self, servers))] #[tracing::instrument(skip(self, servers))]
pub fn flush_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I) -> Result<()> { pub fn flush_servers<I: Iterator<Item = OwnedServerName>>(&self, servers: I) -> Result<()> {
let requests = servers.into_iter().map(OutgoingKind::Normal); let requests = servers.into_iter().map(OutgoingDestination::Normal);
for outgoing_kind in requests { for outgoing_kind in requests {
self.sender self.sender
@ -221,7 +238,7 @@ impl Service {
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub fn cleanup_events(&self, appservice_id: String) -> Result<()> { pub fn cleanup_events(&self, appservice_id: String) -> Result<()> {
self.db self.db
.delete_all_requests_for(&OutgoingKind::Appservice(appservice_id))?; .delete_all_requests_for(&OutgoingDestination::Appservice(appservice_id))?;
Ok(()) Ok(())
} }
@ -274,14 +291,17 @@ impl Service {
#[tracing::instrument(skip(self), name = "sender")] #[tracing::instrument(skip(self), name = "sender")]
async fn handler(&self) -> Result<()> { async fn handler(&self) -> Result<()> {
let mut receiver = self.receiver.lock().await; let new_transactions = self.receiver.lock().await;
let (waking_sender, waking_receiver) = loole::unbounded();
let mut futures = FuturesUnordered::new(); let mut outgoing = FuturesUnordered::new();
let mut current_transaction_status = HashMap::<OutgoingKind, TransactionStatus>::new(); let mut retrying = FuturesUnordered::new();
let mut current_transaction_status = HashMap::<OutgoingDestination, TransactionStatus>::new();
// Retry requests we could not finish yet // Retry requests we could not finish yet
if self.startup_netburst { if self.startup_netburst {
let mut initial_transactions = HashMap::<OutgoingKind, Vec<SendingEventType>>::new(); let mut initial_transactions = HashMap::<OutgoingDestination, Vec<SendingEventType>>::new();
for (key, outgoing_kind, event) in self.db.active_requests().filter_map(Result::ok) { for (key, outgoing_kind, event) in self.db.active_requests().filter_map(Result::ok) {
let entry = initial_transactions let entry = initial_transactions
.entry(outgoing_kind.clone()) .entry(outgoing_kind.clone())
@ -300,13 +320,14 @@ impl Service {
for (outgoing_kind, events) in initial_transactions { for (outgoing_kind, events) in initial_transactions {
current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running); current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running);
futures.push(handle_events(outgoing_kind.clone(), events)); outgoing.push(handle_events(outgoing_kind.clone(), events));
} }
} }
loop { loop {
tokio::select! { tokio::select! {
Some(response) = futures.next() => { Some(response) = outgoing.next() => {
// Outgoing transaction succeeded
match response { match response {
Ok(outgoing_kind) => { Ok(outgoing_kind) => {
let _cork = services().globals.db.cork(); let _cork = services().globals.db.cork();
@ -322,51 +343,155 @@ impl Service {
if !new_events.is_empty() { if !new_events.is_empty() {
// Insert pdus we found // Insert pdus we found
self.db.mark_as_active(&new_events)?; self.db.mark_as_active(&new_events)?;
futures.push(handle_events(
outgoing_kind.clone(), // Clear retries
current_transaction_status.insert(outgoing_kind.clone(), TransactionStatus::Running);
outgoing.push(handle_events(
outgoing_kind,
new_events.into_iter().map(|(event, _)| event).collect(), new_events.into_iter().map(|(event, _)| event).collect(),
)); ));
} else { } else {
current_transaction_status.remove(&outgoing_kind); current_transaction_status.remove(&outgoing_kind);
} }
} }
Err((outgoing_kind, _)) => { // Outgoing transaction failed
current_transaction_status.entry(outgoing_kind).and_modify(|e| *e = match e { Err((destination, err)) => {
TransactionStatus::Running => TransactionStatus::Failed(1, Instant::now()), // Set status to Failed, create timer
TransactionStatus::Retrying(n) => TransactionStatus::Failed(*n+1, Instant::now()), let timer = Self::mark_failed_and_backoff(&mut current_transaction_status, destination.clone());
TransactionStatus::Failed(_, _) => {
error!("Request that was not even running failed?!"); // Add timer to loop
return retrying.push(timer);
},
}); warn!("Outgoing request to {destination} failed: {err}");
} }
}; };
}, },
Some((outgoing_kind, event, key)) = receiver.recv() => {
if let Ok(Some(events)) = self.select_events( // Transaction retry timers firing
&outgoing_kind, Some(dest) = retrying.next() => {
// Transition Failed => Retrying, return pending old transaction events
match self.select_events(
&dest,
vec![], // will be ignored because fresh == false
&mut current_transaction_status,
false,
) {
Ok(TransactionPrepOutcome::Send(events)) => {
outgoing.push(handle_events(dest, events));
}
Ok(_) => {
// Unreachable because fresh == false
unreachable!("select_events on a stale transaction {} did not return ::Send", dest)
}
Err(err) => {
error!("Ignoring error in (stale) outgoing request ({}) handler: {}", dest, err);
// transaction dropped, so drop destination as well.
current_transaction_status.remove(&dest);
}
}
},
// Explicit wakeups, makes a backoff timer return immediately
Ok(outgoing) = waking_receiver.recv_async() => {
if let Some(TransactionStatus::Failed { waker, .. }) = current_transaction_status.get_mut(&outgoing) {
if let Some(waker) = waker.take() {
_ = waker.send(());
}
}
},
// New transactions to be sent out (from server/user activity)
event = new_transactions.recv_async() => {
if let Ok((dest, event, key)) = event {
match self.select_events(
&dest,
vec![(event, key)], vec![(event, key)],
&mut current_transaction_status, &mut current_transaction_status,
) { true) {
futures.push(handle_events(outgoing_kind, events)); Ok(TransactionPrepOutcome::Send(events)) => {
outgoing.push(handle_events(dest, events));
},
Ok(TransactionPrepOutcome::Wake(dest)) => {
waking_sender.send(dest).expect("nothing closes this channel but ourselves");
},
Ok(TransactionPrepOutcome::Nothing) => {},
Err(err) => {
error!("Ignoring error in (fresh) outgoing request ({}) handler: {}", dest, err);
}
}
} }
} }
} }
} }
} }
/// Generates timer/oneshot, alters status to reflect Failed
///
/// Returns timer/oneshot future to wake up loop for next retry
fn mark_failed_and_backoff(
status: &mut HashMap<OutgoingDestination, TransactionStatus>, dest: OutgoingDestination,
) -> impl std::future::Future<Output = OutgoingDestination> {
let now = Instant::now();
let entry = status
.get_mut(&dest)
.expect("guaranteed to be set before this function");
let failures = match entry {
// Running -> Failed
TransactionStatus::Running => 1,
// Retrying -> Failed
TransactionStatus::Retrying {
failures,
} => *failures + 1,
// The transition of Failed -> Retrying is handled by handle_events
TransactionStatus::Failed {
..
} => {
unreachable!(
"TransactionStatus in inconsistent state: Expected either Running or Retrying, got Failed, \
bailing..."
)
},
};
const ONE_DAY: Duration = Duration::from_secs(60 * 60 * 24);
// Exponential backoff, clamp upper value to one day
let next_wakeup = now + (Duration::from_secs(30) * failures * failures).min(ONE_DAY);
let (fut, waker) = dest.wrap_in_interruptible_sleep(next_wakeup);
*entry = TransactionStatus::Failed {
failures,
waker: Some(waker),
};
fut
}
/// This prepares a transaction, checks the transaction state, and selects
/// appropriate events.
#[tracing::instrument(skip(self, outgoing_kind, new_events, current_transaction_status))] #[tracing::instrument(skip(self, outgoing_kind, new_events, current_transaction_status))]
fn select_events( fn select_events(
&self, &self,
outgoing_kind: &OutgoingKind, outgoing_kind: &OutgoingDestination,
new_events: Vec<(SendingEventType, Vec<u8>)>, // Events we want to send: event and full key new_events: Vec<(SendingEventType, Vec<u8>)>, // Events we want to send: event and full key
current_transaction_status: &mut HashMap<OutgoingKind, TransactionStatus>, current_transaction_status: &mut HashMap<OutgoingDestination, TransactionStatus>,
) -> Result<Option<Vec<SendingEventType>>> { fresh: bool, // Wether or not this transaction came from server activity.
let (allow, retry) = self.select_events_current(outgoing_kind.clone(), current_transaction_status)?; ) -> Result<TransactionPrepOutcome> {
let (allow, retry, wake_up) =
self.select_events_current(outgoing_kind.clone(), current_transaction_status, fresh)?;
// Nothing can be done for this remote, bail out. // Nothing can be done for this remote, bail out.
if !allow { if wake_up {
return Ok(None); return Ok(TransactionPrepOutcome::Wake(outgoing_kind.clone()));
} else if !allow {
return Ok(TransactionPrepOutcome::Nothing);
} }
let _cork = services().globals.db.cork(); let _cork = services().globals.db.cork();
@ -374,12 +499,14 @@ impl Service {
// Must retry any previous transaction for this remote. // Must retry any previous transaction for this remote.
if retry { if retry {
self.db // We retry the previous transaction
for (_, e) in self
.db
.active_requests_for(outgoing_kind) .active_requests_for(outgoing_kind)
.filter_map(Result::ok) .filter_map(Result::ok)
.for_each(|(_, e)| events.push(e)); {
events.push(e);
return Ok(Some(events)); }
} }
// Compose the next transaction // Compose the next transaction
@ -392,43 +519,79 @@ impl Service {
} }
// Add EDU's into the transaction // Add EDU's into the transaction
if let OutgoingKind::Normal(server_name) = outgoing_kind { if let OutgoingDestination::Normal(server_name) = outgoing_kind {
if let Ok((select_edus, last_count)) = self.select_edus(server_name) { if let Ok((select_edus, last_count)) = self.select_edus(server_name) {
events.extend(select_edus.into_iter().map(SendingEventType::Edu)); events.extend(select_edus.into_iter().map(SendingEventType::Edu));
self.db.set_latest_educount(server_name, last_count)?; self.db.set_latest_educount(server_name, last_count)?;
} }
} }
Ok(Some(events)) Ok(TransactionPrepOutcome::Send(events))
} }
#[tracing::instrument(skip(self, outgoing_kind, current_transaction_status))] #[tracing::instrument(skip(self, outgoing_kind, current_transaction_status))]
fn select_events_current( fn select_events_current(
&self, outgoing_kind: OutgoingKind, current_transaction_status: &mut HashMap<OutgoingKind, TransactionStatus>, &self, outgoing_kind: OutgoingDestination,
) -> Result<(bool, bool)> { current_transaction_status: &mut HashMap<OutgoingDestination, TransactionStatus>, fresh: bool,
let (mut allow, mut retry) = (true, false); ) -> Result<(bool, bool, bool)> {
current_transaction_status let (mut allow, mut retry, mut wake_up) = (true, false, false);
.entry(outgoing_kind)
let entry = current_transaction_status.entry(outgoing_kind);
if fresh {
// If its fresh, we initialise the status if we need to.
//
// We do nothing if it is already running or retrying.
//
// We return with a wake if it is in the Failed state.
entry
.and_modify(|e| match e { .and_modify(|e| match e {
TransactionStatus::Failed(tries, time) => { TransactionStatus::Running
// Fail if a request has failed recently (exponential backoff) | TransactionStatus::Retrying {
const MAX_DURATION: Duration = Duration::from_secs(60 * 60 * 24); ..
let mut min_elapsed_duration = Duration::from_secs(self.timeout) * (*tries) * (*tries); } => {
min_elapsed_duration = cmp::min(min_elapsed_duration, MAX_DURATION);
if time.elapsed() < min_elapsed_duration {
allow = false;
} else {
retry = true;
*e = TransactionStatus::Retrying(*tries);
}
},
TransactionStatus::Running | TransactionStatus::Retrying(_) => {
allow = false; // already running allow = false; // already running
}, },
TransactionStatus::Failed {
..
} => {
// currently sleeping
wake_up = true;
},
}) })
.or_insert(TransactionStatus::Running); .or_insert(TransactionStatus::Running);
} else {
// If it's not fresh, we expect an entry.
//
// We also expect us to be the only one who are touching this destination right
// now, and its a stale transaction, so it must be in the Failed state
match entry {
Entry::Occupied(mut e) => {
let e = e.get_mut();
match e {
TransactionStatus::Failed {
failures,
..
} => {
*e = TransactionStatus::Retrying {
failures: *failures,
};
retry = true;
},
Ok((allow, retry)) _ => unreachable!(
"Encountered bad state when preparing stale transaction: expected Failed state, got \
Running or Retrying"
),
}
},
Entry::Vacant(_) => unreachable!(
"Encountered bad state when preparing stale transaction: expected Failed state, got vacant state"
),
}
}
Ok((allow, retry, wake_up))
} }
#[tracing::instrument(skip(self, server_name))] #[tracing::instrument(skip(self, server_name))]
@ -594,19 +757,21 @@ pub fn select_edus_receipts(
} }
async fn handle_events( async fn handle_events(
kind: OutgoingKind, events: Vec<SendingEventType>, kind: OutgoingDestination, events: Vec<SendingEventType>,
) -> Result<OutgoingKind, (OutgoingKind, Error)> { ) -> Result<OutgoingDestination, (OutgoingDestination, Error)> {
match kind { match kind {
OutgoingKind::Appservice(ref id) => handle_events_kind_appservice(&kind, id, events).await, OutgoingDestination::Appservice(ref id) => handle_events_kind_appservice(&kind, id, events).await,
OutgoingKind::Push(ref userid, ref pushkey) => handle_events_kind_push(&kind, userid, pushkey, events).await, OutgoingDestination::Push(ref userid, ref pushkey) => {
OutgoingKind::Normal(ref server) => handle_events_kind_normal(&kind, server, events).await, handle_events_kind_push(&kind, userid, pushkey, events).await
},
OutgoingDestination::Normal(ref server) => handle_events_kind_normal(&kind, server, events).await,
} }
} }
#[tracing::instrument(skip(kind, events))] #[tracing::instrument(skip(kind, events))]
async fn handle_events_kind_appservice( async fn handle_events_kind_appservice(
kind: &OutgoingKind, id: &String, events: Vec<SendingEventType>, kind: &OutgoingDestination, id: &String, events: Vec<SendingEventType>,
) -> Result<OutgoingKind, (OutgoingKind, Error)> { ) -> Result<OutgoingDestination, (OutgoingDestination, Error)> {
let mut pdu_jsons = Vec::new(); let mut pdu_jsons = Vec::new();
for event in &events { for event in &events {
@ -674,8 +839,8 @@ async fn handle_events_kind_appservice(
#[tracing::instrument(skip(kind, events))] #[tracing::instrument(skip(kind, events))]
async fn handle_events_kind_push( async fn handle_events_kind_push(
kind: &OutgoingKind, userid: &OwnedUserId, pushkey: &String, events: Vec<SendingEventType>, kind: &OutgoingDestination, userid: &OwnedUserId, pushkey: &String, events: Vec<SendingEventType>,
) -> Result<OutgoingKind, (OutgoingKind, Error)> { ) -> Result<OutgoingDestination, (OutgoingDestination, Error)> {
let mut pdus = Vec::new(); let mut pdus = Vec::new();
for event in &events { for event in &events {
@ -715,7 +880,7 @@ async fn handle_events_kind_push(
let Some(pusher) = services() let Some(pusher) = services()
.pusher .pusher
.get_pusher(userid, pushkey) .get_pusher(userid, pushkey)
.map_err(|e| (kind.clone(), e))? .map_err(|e| (OutgoingDestination::Push(userid.clone(), pushkey.clone()), e))?
else { else {
continue; continue;
}; };
@ -752,8 +917,8 @@ async fn handle_events_kind_push(
#[tracing::instrument(skip(kind, events), name = "")] #[tracing::instrument(skip(kind, events), name = "")]
async fn handle_events_kind_normal( async fn handle_events_kind_normal(
kind: &OutgoingKind, dest: &OwnedServerName, events: Vec<SendingEventType>, kind: &OutgoingDestination, dest: &OwnedServerName, events: Vec<SendingEventType>,
) -> Result<OutgoingKind, (OutgoingKind, Error)> { ) -> Result<OutgoingDestination, (OutgoingDestination, Error)> {
let mut edu_jsons = Vec::new(); let mut edu_jsons = Vec::new();
let mut pdu_jsons = Vec::new(); let mut pdu_jsons = Vec::new();
@ -826,23 +991,23 @@ async fn handle_events_kind_normal(
response response
} }
impl OutgoingKind { impl OutgoingDestination {
#[tracing::instrument(skip(self))] #[tracing::instrument(skip(self))]
pub fn get_prefix(&self) -> Vec<u8> { pub fn get_prefix(&self) -> Vec<u8> {
let mut prefix = match self { let mut prefix = match self {
OutgoingKind::Appservice(server) => { OutgoingDestination::Appservice(server) => {
let mut p = b"+".to_vec(); let mut p = b"+".to_vec();
p.extend_from_slice(server.as_bytes()); p.extend_from_slice(server.as_bytes());
p p
}, },
OutgoingKind::Push(user, pushkey) => { OutgoingDestination::Push(user, pushkey) => {
let mut p = b"$".to_vec(); let mut p = b"$".to_vec();
p.extend_from_slice(user.as_bytes()); p.extend_from_slice(user.as_bytes());
p.push(0xFF); p.push(0xFF);
p.extend_from_slice(pushkey.as_bytes()); p.extend_from_slice(pushkey.as_bytes());
p p
}, },
OutgoingKind::Normal(server) => { OutgoingDestination::Normal(server) => {
let mut p = Vec::new(); let mut p = Vec::new();
p.extend_from_slice(server.as_bytes()); p.extend_from_slice(server.as_bytes());
p p
@ -852,4 +1017,40 @@ impl OutgoingKind {
prefix prefix
} }
/// This wraps the OutgoingDestination key in an interruptible sleep future.
///
/// The first return value is the future, the second is the oneshot that
/// interrupts that future, and causes it to return instantly.
fn wrap_in_interruptible_sleep(
self, at: Instant,
) -> (impl std::future::Future<Output = Self>, oneshot::Sender<()>) {
let (tx, rx) = oneshot::channel();
let at = tokio::time::Instant::from_std(at);
(
async move {
_ = tokio::time::timeout_at(at, rx).await;
self
},
tx,
)
}
}
impl std::fmt::Display for OutgoingDestination {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OutgoingDestination::Appservice(appservice_id) => {
write!(f, "Appservice (ID {:?})", appservice_id)
},
OutgoingDestination::Push(user, push_key) => {
write!(f, "User Push Service (for {:?}, with key {:?})", user, push_key)
},
OutgoingDestination::Normal(server) => {
write!(f, "Matrix Server ({:?})", server)
},
}
}
} }