Skip to main content

hashiverse_server_lib/transport/ddos/
ipset_ddos.rs

1//! # Kernel-level DDoS protection backed by Linux `ipset` + `iptables`
2//!
3//! The production implementation of
4//! [`hashiverse_lib::transport::ddos::ddos::DdosProtection`] used by the real server.
5//! Layered on top of the in-RAM scoring logic from `hashiverse-lib`:
6//!
7//! 1. Per-IP `DdosScore` accumulates penalties for bad requests (e.g. invalid PoW,
8//!    malformed packets) with linear time decay from
9//!    [`hashiverse_lib::tools::config::SERVER_DDOS_DECAY_PER_SECOND`].
10//! 2. When a score crosses
11//!    [`hashiverse_lib::tools::config::SERVER_DDOS_SCORE_THRESHOLD`], the IP is
12//!    shelled out to `ipset add` against the set named by
13//!    [`hashiverse_lib::tools::config::SERVER_DDOS_IPSET_SET_NAME`], which an
14//!    operator-configured `iptables` rule then drops at the kernel.
15//! 3. A short (≥10 s) throttle around the `ipset` call prevents hammering the
16//!    subprocess in edge cases.
17//!
18//! Per-IP concurrent-connection caps are enforced via a `HashMap<String, usize>`
19//! guarded by a `parking_lot::Mutex`, cutting off a single IP from monopolising all
20//! [`hashiverse_lib::tools::config::SERVER_DDOS_MAX_CONNECTIONS_PER_IP`] slots. The
21//! `NET_ADMIN` capability is required on the container — see the operator docs.
22
23use hashiverse_lib::tools::time_provider::moka_clock::TimeProviderMokaClock;
24use hashiverse_lib::tools::time_provider::time_provider::TimeProvider;
25use hashiverse_lib::transport::ddos::ddos::{DdosProtection, DdosScore};
26use parking_lot::Mutex;
27use log::{info, warn};
28use moka::sync::Cache;
29use std::collections::HashMap;
30use std::process::Command;
31use std::sync::Arc;
32use std::time::Duration;
33
34/// Production DDoS protection backed by Linux `ipset`.
35///
36/// Per-IP scores use linear time decay: each `allow_request` adds 1.0 point,
37/// each `report_bad_request` adds `bad_request_penalty` points, and the score
38/// drains at `decay_per_second` points/second.  This means sustained low-rate
39/// traffic stabilises below the threshold while bursts trigger quickly.
40///
41/// When a score first crosses `score_threshold`, the IP is added to the named
42/// ipset via `ipset add <set_name> <ip> --exist`.  A second 10-second moka cache
43/// (`ipset_throttle`) prevents hammering the ipset command.
44///
45/// `try_acquire_connection` additionally enforces a per-IP connection cap
46/// (`max_connections_per_ip`).
47pub struct IpsetDdosProtection {
48    set_name: String,
49    score_threshold: f64,
50    decay_per_second: f64,
51    bad_request_penalty: f64,
52    max_connections_per_ip: usize,
53    scores: Cache<String, Arc<Mutex<DdosScore>>>,
54    ipset_throttle: Cache<String, ()>,
55    connections: Mutex<HashMap<String, usize>>,
56    time_provider: Arc<dyn TimeProvider>,
57}
58
59impl IpsetDdosProtection {
60    pub fn new(set_name: impl Into<String>, score_threshold: f64, decay_per_second: f64, bad_request_penalty: f64, max_connections_per_ip: usize, time_provider: Arc<dyn TimeProvider>) -> Self {
61        let set_name = set_name.into();
62
63        // Idle expiry: time for a maxed-out score to fully decay, with 2x margin
64        let idle_secs = if decay_per_second > 0.0 {
65            (score_threshold / decay_per_second * 2.0).ceil() as u64
66        } else {
67            3600
68        };
69
70        let result = Command::new("ipset").args(["create", &set_name, "hash:ip", "timeout", &idle_secs.to_string(), "--exist"]).status();
71        match result {
72            Ok(status) if status.success() => info!("DDoS: ipset set '{}' ready", set_name),
73            Ok(status) => warn!("DDoS: ipset create '{}' failed with status {}", set_name, status),
74            Err(e) => warn!("DDoS: failed to run ipset create '{}': {}", set_name, e),
75        }
76
77        // Set up our iptables rules for both docker (FORWARD) and metal (INPUT) servers.
78        for chain in ["INPUT", "FORWARD"] {
79            // -D silently fails if the rule doesn't exist, so delete then re-insert is safe
80            let _ = Command::new("iptables").args(["-D", chain, "-m", "set", "--match-set", &set_name, "src", "-j", "DROP"]).status();
81            let result = Command::new("iptables").args(["-I", chain, "-m", "set", "--match-set", &set_name, "src", "-j", "DROP"]).status();
82            match result {
83                Ok(status) if status.success() => info!("DDoS: iptables {} rule for '{}' installed", chain, set_name),
84                Ok(status) => warn!("DDoS: iptables -I {} for '{}' failed with status {}", chain, set_name, status),
85                Err(e) => warn!("DDoS: failed to run iptables {} for '{}': {}", chain, set_name, e),
86            }
87        }
88
89        // Score idle-eviction and the ipset throttle run on our TimeProvider, not wall time.
90        let scores = Cache::builder()
91            .time_to_idle(Duration::from_secs(idle_secs))
92            .external_clock(Arc::new(TimeProviderMokaClock::new(time_provider.clone())))
93            .build();
94        let ipset_throttle = Cache::builder()
95            .time_to_live(Duration::from_secs(10))
96            .external_clock(Arc::new(TimeProviderMokaClock::new(time_provider.clone())))
97            .build();
98
99        Self {
100            set_name,
101            score_threshold,
102            decay_per_second,
103            bad_request_penalty,
104            max_connections_per_ip,
105            scores,
106            ipset_throttle,
107            connections: Mutex::new(HashMap::new()),
108            time_provider,
109        }
110    }
111
112    fn increment_score(&self, ip: &str, points: f64) -> f64 {
113        let now = self.time_provider.current_time_millis();
114        let entry = self.scores.get_with(ip.to_string(), || Arc::new(Mutex::new(DdosScore::new())));
115        entry.lock().increment(points, self.decay_per_second, now)
116    }
117
118    fn is_score_banned(&self, ip: &str) -> bool {
119        let now = self.time_provider.current_time_millis();
120        self.scores
121            .get(ip)
122            .map(|entry| entry.lock().current(self.decay_per_second, now) >= self.score_threshold)
123            .unwrap_or(false)
124    }
125
126    fn maybe_call_ipset(&self, ip: &str) {
127        // Have we already spawned this IP address?
128        {
129            if self.ipset_throttle.contains_key(ip) {
130                return;
131            }
132            self.ipset_throttle.insert(ip.to_string(), ());
133        }
134
135        // Add to ipset asynchronously
136        {
137            let set_name = self.set_name.clone();
138            let ip = ip.to_string();
139
140            tokio::spawn(async move {
141                info!("Banning DDoS ip: {}", ip);
142                match tokio::process::Command::new("ipset").args(["add", &set_name, &ip, "--exist"]).status().await {
143                    Ok(status) if status.success() => info!("DDoS: banned {} via ipset set '{}'", ip, set_name),
144                    Ok(status) => warn!("DDoS: ipset add {} failed with status {}", ip, status),
145                    Err(e) => warn!("DDoS: failed to run ipset for {}: {}", ip, e),
146                }
147            });
148        }
149    }
150}
151
152impl DdosProtection for IpsetDdosProtection {
153    fn allow_request(&self, ip: &str) -> bool {
154        let score = self.increment_score(ip, 1.0);
155        if score >= self.score_threshold {
156            self.maybe_call_ipset(ip);
157            false
158        }
159        else {
160            true
161        }
162    }
163
164    fn report_bad_request(&self, ip: &str) {
165        let score = self.increment_score(ip, self.bad_request_penalty);
166        if score >= self.score_threshold {
167            self.maybe_call_ipset(ip);
168        }
169    }
170
171    fn try_acquire_connection(&self, ip: &str) -> bool {
172        if self.is_score_banned(ip) {
173            return false;
174        }
175        let mut connections = self.connections.lock();
176        let count = connections.entry(ip.to_string()).or_insert(0);
177        if *count >= self.max_connections_per_ip {
178            return false;
179        }
180        *count += 1;
181        true
182    }
183
184    fn release_connection(&self, ip: &str) {
185        let mut connections = self.connections.lock();
186        if let Some(count) = connections.get_mut(ip) {
187            *count = count.saturating_sub(1);
188            if *count == 0 {
189                connections.remove(ip);
190            }
191        }
192    }
193}