You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

648 lines
20 KiB
Rust

#![allow(dead_code)]
use std::cmp::min;
use std::collections::{BTreeMap, VecDeque};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::time::{Duration, Instant};
use torment_core::infohash::v1::U160;
use torment_core::ContactInfo;
use std::mem;
use std::ops::Add;
use std::ops::Bound::Included;
use std::option::Option::Some;
pub mod host_node;
pub mod krpc;
const DHT_K: usize = 8;
#[derive(Debug)]
pub struct Table {
id: U160,
buckets: BTreeMap<U160, Bucket>,
}
type NodeHandle = (U160, IpAddr, u16);
impl Default for Table {
fn default() -> Self {
Table::new()
}
}
impl Table {
pub fn new_with_id(id: U160) -> Table {
Table {
id,
buckets: {
let mut tree = BTreeMap::new();
tree.insert(U160::MAX, Bucket::new(U160::MIN, U160::MAX));
tree
},
}
}
pub fn new() -> Table {
Self::new_with_id(U160::random())
}
fn get_bucket_index(&self, id: U160) -> U160 {
*self.buckets.range(id..).next().unwrap().0
}
fn get_bucket(&self, id: U160) -> &Bucket {
self.buckets
.get(&self.get_bucket_index(id))
.expect("DHT corrupt")
}
fn get_bucket_mut(&mut self, id: U160) -> &mut Bucket {
self.buckets
.get_mut(&self.get_bucket_index(id))
.expect("DHT corrupt")
}
pub fn has_node(&self, contact_info: &ContactInfo) -> bool {
self.get_node(
contact_info.id,
contact_info.contact.ip(),
contact_info.contact.port(),
)
.is_some()
}
fn get_node(&self, id: U160, address: IpAddr, port: u16) -> Option<&Node> {
self.get_bucket(id).nodes.get(&(id, address, port))
}
pub fn find_node(&self, id: U160) -> Option<ContactInfo> {
self.get_bucket(id)
.nodes
.range((
Included(&(id, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0)),
Included(&(
id,
IpAddr::V6(Ipv6Addr::new(
u16::MAX,
u16::MAX,
u16::MAX,
u16::MAX,
u16::MAX,
u16::MAX,
u16::MAX,
u16::MAX,
)),
u16::MAX,
)),
))
.next()
.map(|(_, node)| node.to_contact_info())
}
fn get_node_mut(&mut self, id: U160, address: IpAddr, port: u16) -> Option<&mut Node> {
self.get_bucket_mut(id).nodes.get_mut(&(id, address, port))
}
pub fn find_k_closest_nodes(
&self,
target: U160,
excluding: Option<(U160, IpAddr, u16)>,
) -> Vec<ContactInfo> {
let mut nodes = vec![];
let mut found_before = 0;
let mut found_after = 0;
let mut before_range = self.buckets.range(..target);
while let Some((_, bucket)) = before_range.next_back() {
let info = bucket.get_contact_info();
found_before += info.len();
nodes.extend(info);
if found_before >= DHT_K {
break;
}
}
let mut after_range = self.buckets.range(target..);
while let Some((_, bucket)) = after_range.next() {
let info = bucket.get_contact_info();
found_after += info.len();
nodes.extend(info);
if found_after >= DHT_K {
break;
}
}
nodes.sort_by_key(|x| x.id ^ target);
let nodes = if let Some(exclude) = excluding {
nodes
.iter()
.filter(|x| {
x.id != exclude.0
|| x.contact.ip() != exclude.1
|| x.contact.port() != exclude.2
})
.copied()
.collect()
} else {
nodes
};
nodes[..min(8, nodes.len())].to_vec()
}
pub fn handle_activity(&mut self, id: U160, address: IpAddr, port: u16) {
let bucket_has_activity = if let Some(node) = self.get_node_mut(id, address, port) {
node.handle_activity();
true
} else {
self.add_node(id, address, port)
};
if bucket_has_activity {
self.get_bucket_mut(id).last_activity = Some(Instant::now());
}
}
pub fn handle_outgoing_activity(&mut self, id: U160, address: IpAddr, port: u16) {
if let Some(node) = self.get_node_mut(id, address, port) {
node.handle_outgoing_activity();
}
}
pub fn add_incoming_node(&mut self, id: U160, address: IpAddr, port: u16) {
self.add_node(id, address, port);
self.handle_activity(id, address, port);
}
pub fn add_node(&mut self, id: U160, address: IpAddr, port: u16) -> bool {
let own_id = self.id;
if own_id == id {
return false;
}
let bucket = self.get_bucket_mut(id);
if bucket.add_node(id, address, port) {
return true;
}
if !(bucket.start <= own_id
&& own_id <= bucket.end
&& (bucket.end - bucket.start) > U160::ONE)
{
return false;
}
let low_end = bucket.start + (bucket.end - bucket.start).half();
fn split_node_map(
nodes: &mut BTreeMap<NodeHandle, Node>,
split_at: U160,
) -> (BTreeMap<NodeHandle, Node>, BTreeMap<NodeHandle, Node>) {
let mut high = BTreeMap::new();
let mut low = BTreeMap::new();
let keys = nodes.keys().copied().collect::<Vec<_>>();
for key in keys {
let node = nodes.remove(&key).unwrap();
if node.id <= split_at {
low.insert(key, node);
} else {
high.insert(key, node);
}
}
(low, high)
}
fn recount_nodes_per_id(nodes: &BTreeMap<NodeHandle, Node>) -> BTreeMap<U160, usize> {
nodes
.values()
.map(|x| x.id)
.fold(BTreeMap::new(), |mut c, i| {
c.entry(i).and_modify(|x| *x += 1).or_insert(1);
c
})
}
fn get_last_activity(nodes: &BTreeMap<NodeHandle, Node>) -> Option<Instant> {
nodes.values().filter_map(|x| x.last_activity).max()
}
let (low, high) = split_node_map(&mut bucket.nodes, low_end);
let (low_q, high_q) = split_node_map(&mut bucket.queue, low_end);
bucket.last_activity = get_last_activity(&high);
bucket.nodes_per_id = recount_nodes_per_id(&high);
bucket.nodes = high;
bucket.queue = high_q;
let low_start = bucket.start;
bucket.start = low_end + U160::ONE;
let mut new_bucket = Bucket::new(low_start, low_end);
new_bucket.last_activity = get_last_activity(&low);
new_bucket.nodes_per_id = recount_nodes_per_id(&low);
new_bucket.nodes = low;
new_bucket.queue = low_q;
self.buckets.insert(low_end, new_bucket);
self.add_node(id, address, port)
}
pub fn get_silent_nodes(&self) -> Vec<ContactInfo> {
self.iter()
.filter(|node| {
if let Some(last_activity) = node.last_activity {
if last_activity.add(Duration::from_secs(60 * 15)) < Instant::now() {
return true;
}
}
if let Some(last_outgoing_activity) = node.last_activity {
if node.last_activity.is_none()
&& last_outgoing_activity.add(Duration::from_secs(60)) < Instant::now()
{
return true;
}
}
false
})
.map(|node| node.to_contact_info())
.collect()
}
pub fn update_nodes(&mut self) {
for node in self.iter_mut() {
if node
.last_activity
.map(|x| x.add(Duration::from_secs(15 * 60)) < Instant::now())
.unwrap_or(false)
{
if let NodeState::Questioning(n) = node.state {
if n > 3 {
#[cfg(debug_assertions)]
{
if node.state != NodeState::Bad {
println!(
"Node[id={},addr={}] went from {:?} to {:?}",
node.id,
node.sock_addr(),
node.state,
NodeState::Bad
);
}
}
node.state = NodeState::Bad;
}
if node
.last_outgoing_activity
.map(|x| x.add(Duration::from_secs(3 * 60)) < Instant::now())
.unwrap_or(false)
{
#[cfg(debug_assertions)]
{
if node.state != NodeState::Bad {
println!(
"Node[id={},addr={}] went from {:?} to {:?}",
node.id,
node.sock_addr(),
node.state,
NodeState::Bad
);
}
}
node.state = NodeState::Bad;
}
} else {
#[cfg(debug_assertions)]
{
if node.state != NodeState::Questioning(0) {
println!(
"Node[id={},addr={}] went from {:?} to {:?}",
node.id,
node.sock_addr(),
node.state,
NodeState::Questioning(0)
);
}
}
node.state = NodeState::Questioning(0);
}
}
}
for (_, bucket) in &mut self.buckets {
if !bucket.queue.is_empty() {
let old_nodes = mem::replace(&mut bucket.nodes, BTreeMap::new());
let mut keys = bucket
.queue
.iter()
.map(|(key, node)| (key.clone(), node.first_activity))
.collect::<Vec<_>>();
keys.sort_by_key(|(_, first_activity)| *first_activity);
let mut keys = VecDeque::from(keys);
for (key, old_node) in old_nodes {
if bucket.queue.is_empty() {
bucket.nodes.insert(key, old_node);
continue;
}
if old_node.state == NodeState::Bad {
if let Some((key, _)) = keys.pop_front() {
if let Some(node) = bucket.queue.remove(&key) {
bucket.nodes.insert(key, node);
continue;
}
}
}
bucket.nodes.insert(key, old_node);
}
}
}
}
pub fn count_nodes(&self) -> usize {
self.buckets.values().map(|x| x.nodes.len()).sum()
}
fn iter(&self) -> impl Iterator<Item = &Node> {
self.buckets
.iter()
.flat_map(|(_, bucket)| bucket.nodes.iter().map(|(_, node)| node))
}
fn iter_mut(&mut self) -> impl Iterator<Item = &mut Node> {
self.buckets
.iter_mut()
.flat_map(|(_, bucket)| bucket.nodes.iter_mut().map(|(_, node)| node))
}
}
#[derive(Debug)]
struct Bucket {
start: U160,
end: U160,
last_activity: Option<Instant>,
nodes: BTreeMap<(U160, IpAddr, u16), Node>,
nodes_per_id: BTreeMap<U160, usize>,
queue: BTreeMap<(U160, IpAddr, u16), Node>,
}
impl Bucket {
fn new(start: U160, end: U160) -> Bucket {
Bucket {
start,
end,
last_activity: None,
nodes: BTreeMap::new(),
nodes_per_id: Default::default(),
queue: BTreeMap::new(),
}
}
fn get_contact_info(&self) -> Vec<ContactInfo> {
self.nodes
.iter()
.map(|(_, node)| ContactInfo {
id: node.id,
contact: SocketAddr::new(node.address, node.port),
})
.collect()
}
fn internal_add_node(&mut self, id: U160, address: IpAddr, port: u16) {
self.nodes_per_id
.entry(id)
.and_modify(|x| *x += 1)
.or_insert(1);
self.nodes
.insert((id, address, port), Node::new(id, address, port));
self.last_activity = None;
}
fn add_node(&mut self, id: U160, address: IpAddr, port: u16) -> bool {
// Discard after 4 nodes with the same id
if self.nodes_per_id.get(&id).unwrap_or(&0) >= &4 {
return true;
}
if self.nodes.len() < DHT_K {
self.internal_add_node(id, address, port);
true
} else {
let mut replace = None;
let mut has_questionable = false;
for (key, node) in &self.nodes {
if node.state == NodeState::Bad {
replace = Some(*key);
break;
}
if let NodeState::Questioning(_) = node.state {
has_questionable = true;
}
}
if let Some(key) = replace {
self.nodes.remove(&key);
self.nodes_per_id.entry(key.0).and_modify(|x| *x -= 1);
if Some(&0usize) == self.nodes_per_id.get(&key.0) {
self.nodes_per_id.remove(&key.0);
}
self.internal_add_node(id, address, port);
true
} else {
if has_questionable {
let len = self.queue.len();
let entry = self
.queue
.entry((id, address, port))
.and_modify(|node| node.handle_activity());
if len < DHT_K {
entry.or_insert_with(|| {
let mut queued_node = Node::new(id, address, port);
queued_node.set_queued();
queued_node
});
}
}
false
}
}
}
}
#[derive(Debug)]
struct Node {
id: U160,
address: IpAddr,
port: u16,
first_activity: Instant,
last_outgoing_activity: Option<Instant>,
last_activity: Option<Instant>,
is_queued: bool,
state: NodeState,
}
#[derive(Debug, Ord, PartialOrd, Eq, PartialEq, Copy, Clone)]
enum NodeState {
Good,
Bad,
Questioning(u16),
}
impl Node {
fn new(id: U160, address: IpAddr, port: u16) -> Self {
Node {
id,
address,
port,
first_activity: Instant::now(),
last_outgoing_activity: None,
last_activity: None,
is_queued: false,
state: NodeState::Questioning(0),
}
}
fn sock_addr(&self) -> SocketAddr {
SocketAddr::new(self.address, self.port)
}
fn handle_activity(&mut self) {
self.last_activity = Some(Instant::now());
#[cfg(debug_assertions)]
{
if self.state != NodeState::Good {
println!(
"Node[id={},addr={}] went from {:?} to {:?}",
self.id,
self.sock_addr(),
self.state,
NodeState::Good
);
}
}
self.state = NodeState::Good;
}
fn handle_outgoing_activity(&mut self) {
self.last_outgoing_activity = Some(Instant::now());
if let NodeState::Questioning(nr) = self.state {
#[cfg(debug_assertions)]
{
if self.state != NodeState::Questioning(nr + 1) {
println!(
"Node[id={},addr={}] went from {:?} to {:?}",
self.id,
self.sock_addr(),
self.state,
NodeState::Questioning(nr + 1)
);
}
}
self.state = NodeState::Questioning(nr + 1);
}
}
fn set_queued(&mut self) {
self.is_queued = true;
}
fn set_live(&mut self) {
self.is_queued = false;
}
fn is_queued(&self) -> bool {
self.is_queued
}
fn to_contact_info(&self) -> ContactInfo {
ContactInfo {
id: self.id,
contact: SocketAddr::new(self.address, self.port),
}
}
}
#[cfg(test)]
mod test {
use crate::Table;
use std::net::{IpAddr, Ipv4Addr};
use torment_core::infohash::v1::U160;
#[test]
fn test_table_fulfillment() {
let mut table = Table::new_with_id(U160::ZERO);
table.add_node(U160::ONE, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 1)), 1);
table.add_node(U160::ONE, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 1)), 2);
table.add_node(U160::ONE, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 1)), 3);
table.add_node(U160::ONE, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 1)), 4);
assert_eq!(4, table.count_nodes());
// Shouldn't be added anymore since we already have 4 nodes with the same id
table.add_node(U160::ONE, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 1)), 5);
assert_eq!(4, table.count_nodes());
// Don't add self
table.add_node(U160::ZERO, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 1)), 5);
assert_eq!(4, table.count_nodes());
let two = U160::ONE + U160::ONE;
table.add_node(two, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 1)), 5);
assert_eq!(5, table.count_nodes());
table.add_node(two, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 1)), 6);
table.add_node(two, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 1)), 7);
table.add_node(two, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 1)), 8);
// node_id: MAX
table.add_node(U160::MAX, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 1)), 9);
table.add_node(U160::MAX, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 1)), 10);
table.add_node(U160::MAX, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 1)), 11);
table.add_node(U160::MAX, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 1)), 12);
table.add_node(
U160::MAX - U160::ONE,
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 1)),
13,
);
table.add_node(
U160::MAX - U160::ONE,
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 1)),
14,
);
table.add_node(
U160::MAX - U160::ONE,
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 1)),
15,
);
table.add_node(
U160::MAX - U160::ONE,
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 1)),
16,
);
// This one however should too, since we already know 8 nodes with U160::MIN as id
table.add_node(
U160::MAX - (U160::ONE + U160::ONE),
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 1)),
17,
);
assert_eq!(16, table.count_nodes());
}
}