diff --git a/.gitignore b/.gitignore index ea8c4bf..531a66a 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ /target +callgrind* diff --git a/Cargo.lock b/Cargo.lock index 6d79cff..428a775 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -262,6 +262,7 @@ dependencies = [ "textwrap", "unicode-width", "vec_map", + "yaml-rust", ] [[package]] @@ -273,6 +274,21 @@ dependencies = [ "cache-padded", ] +[[package]] +name = "crossbeam-epoch" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "lazy_static", + "maybe-uninit", + "memoffset", + "scopeguard", +] + [[package]] name = "crossbeam-utils" version = "0.7.2" @@ -547,12 +563,27 @@ version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08" +[[package]] +name = "maybe-uninit" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" + [[package]] name = "memchr" version = "2.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400" +[[package]] +name = "memoffset" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c198b026e1bbf08a937e94c6c60f9ec4a2267f5b0d2eec9c1b21b061ce2be55f" +dependencies = [ + "autocfg", +] + [[package]] name = "miniz_oxide" version = "0.4.0" @@ -666,12 +697,13 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" [[package]] name = "polling" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e09dffb745feffca5be3dea51c02b7b368c4597ab0219a82acaf9799ab3e0d1" +checksum = "d10bd4578b2ca39fa2581c058921cb50ad226a8999829ba595e1665bcfdaf4a8" dependencies = [ "cfg-if", "libc", + "log", "wepoll-sys-stjepang", "winapi", ] @@ -759,6 +791,15 @@ version = "0.1.57" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" +[[package]] +name = "remem" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab4a09a8f306c4c15760db502c6c2aa2f5ea71edeb185d1b6fba293b066a9cff" +dependencies = [ + "crossbeam-epoch", +] + [[package]] name = "ring" version = "0.16.15" @@ -792,6 +833,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2" +[[package]] +name = "scopeguard" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" + [[package]] name = "serde" version = "1.0.115" @@ -913,8 +960,12 @@ dependencies = [ name = "torment-cli" version = "0.1.0" dependencies = [ + "bytes", "clap", + "rand", "torment-core", + "torment-peer", + "torment-storage", ] [[package]] @@ -935,6 +986,18 @@ dependencies = [ "url", ] +[[package]] +name = "torment-daemon" +version = "0.1.0" +dependencies = [ + "bytes", + "polling", + "rand", + "torment-core", + "torment-manager", + "torment-peer", +] + [[package]] name = "torment-dht" version = "0.1.0" @@ -962,9 +1025,36 @@ dependencies = [ "torment-dht", ] +[[package]] +name = "torment-manager" +version = "0.1.0" +dependencies = [ + "bytes", + "lazy_static", + "ring", + "torment-core", + "torment-peer", + "torment-storage", + "url", +] + [[package]] name = "torment-peer" version = "0.1.0" +dependencies = [ + "bytes", + "torment-core", +] + +[[package]] +name = "torment-storage" +version = "0.1.0" +dependencies = [ + "bytes", + "lazy_static", + "remem", + "torment-core", +] [[package]] name = "unicode-bidi" @@ -1143,3 +1233,9 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "yaml-rust" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e66366e18dc58b46801afbf2ca7661a9f59cc8c5962c29892b6039b4f86fa992" diff --git a/Cargo.toml b/Cargo.toml index 5668755..f8645ce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,5 +5,8 @@ members = [ "torment-dht", "torment-dht-node", "torment-bencode", - "torment-peer" + "torment-peer", + "torment-storage", + "torment-manager", + "torment-daemon" ] \ No newline at end of file diff --git a/README.md b/README.md index ed0457d..1c29a82 100644 --- a/README.md +++ b/README.md @@ -2,4 +2,8 @@ A torrent client -It's written in Rust, that's the only selling point really \ No newline at end of file +It's written in Rust, that's the only selling point really + +## Slightly important + +Torment uses `eT` as peer id prefix, in both DHT and BitTorrent traffic, DHT uses `eT` and BitTorrent will use `eT<2 char ASCII major><2 char ASCII minor><2 char ASCII patch>`, I assume 96 bits will be enough to differentiate the 3 users that will be using Torment \ No newline at end of file diff --git a/torment-cli/Cargo.toml b/torment-cli/Cargo.toml index 7fb298f..d70a437 100644 --- a/torment-cli/Cargo.toml +++ b/torment-cli/Cargo.toml @@ -7,5 +7,9 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -clap = "2.33.3" -torment-core = { path = "../torment-core" } \ No newline at end of file +clap = {version = "2.33.3", features = ["yaml"]} +torment-core = { path = "../torment-core" } +torment-peer = { path = "../torment-peer" } +torment-storage = { path = "../torment-storage" } +rand = "0.7.3" +bytes = "0.5.6" \ No newline at end of file diff --git a/torment-cli/cli.yml b/torment-cli/cli.yml new file mode 100644 index 0000000..3926e6d --- /dev/null +++ b/torment-cli/cli.yml @@ -0,0 +1,20 @@ +name: torment-cli +version: "0.1.0" +about: "A torrent client" +subcommands: + - torrent: + about: "Show details about a torrent" + args: + - file: + required: true + - download: + about: "Show details about a torrent" + args: + - file: + help: Set the torrent to download + required: true + - peer: + short: p + long: peer + help: Set a custom peer + takes_value: true \ No newline at end of file diff --git a/torment-cli/src/download.rs b/torment-cli/src/download.rs new file mode 100644 index 0000000..d44b4e0 --- /dev/null +++ b/torment-cli/src/download.rs @@ -0,0 +1,144 @@ +use bytes::{Bytes, BytesMut}; +use rand::random; +use std::cmp::min; +use std::convert::TryInto; +use std::io::{Read, Write}; +use std::net::{SocketAddr, TcpStream}; +use std::process::exit; +use torment_core::infohash::v1::U160; +use torment_core::infohash::InfoHashCapable; +use torment_core::metainfo::Torrent; +use torment_core::peer_id; +use torment_peer::message::Message::Request; +use torment_peer::message::{Handshake, Message, SelectionMessage}; +use torment_peer::{Peer, PeerProtocol}; +use torment_storage::ToStorageMap; + +pub fn download(torrent: Torrent, peers: Vec) { + println!("Starting download of {}", torrent.name()); + if peers.len() == 0 { + println!("No peers given, stopping"); + exit(1); + } + let peer = peers.first().unwrap(); + let mut stream = TcpStream::connect(peer).expect("Failed to open tcp connection to peer"); + stream.set_nodelay(true).expect("Failed setting nodelay"); + let id = U160::from(peer_id(random())); + + let our_header = Handshake::new(id, torrent.info_hash()); + stream.write(&our_header.to_bytes()); + let mut buffer = [0u8; 4096]; + let mut done_header = false; + let mut header = Handshake::new(U160::random(), U160::random()); + let mut peer = None; + let mut message_buffer = BytesMut::with_capacity(4096 * 10); + let mut storage = torrent.to_storage_map("/tmp", false); + let mut asked = 0; + let request_length = 2u32.pow(14); + let piece_length = torrent.meta_info().piece_length() as u32; + let mut progress = 0; + let mut header_buffer = BytesMut::new(); + loop { + let size = stream.read(&mut buffer).expect("Failed to receive data"); + if done_header == false { + header_buffer.extend_from_slice(&buffer[..size]); + if header_buffer.len() < 68 { + continue; + } + + header = Handshake::from_bytes(Bytes::from(buffer[..size].to_vec())) + .expect("Failed to parse header"); + if header.info_hash() != torrent.info_hash() { + panic!( + "Peer wants to serve different torrent ({}) :(", + header.info_hash() + ); + } + + println!( + "Peer[{}] connected", + String::from_utf8_lossy(&header.peer_id().to_bytes()) + ); + peer = Some(Peer::new( + stream.peer_addr().unwrap(), + PeerProtocol::TCP, + header, + torrent.meta_info(), + )); + done_header = true; + // stream.write_all(&*Message::Request(SelectionMessage::new(0, 0, 1024)).to_bytes()); + + continue; + } + + let mut peer = peer.as_mut().unwrap(); + message_buffer.extend_from_slice(&buffer[..size]); + while message_buffer.len() >= 4 { + let length = u32::from_be_bytes(message_buffer[..4].try_into().unwrap()); + if length == 0 { + message_buffer.split_to(4); + continue; + } + + if message_buffer.len() < (length + 4) as usize { + break; + } + + let message = message_buffer.split_to((4 + length) as usize).freeze(); + + let msg = Message::from_bytes(message.slice(4..)).expect("Failed parsing message"); + // println!("=> {:?}", msg); + if msg == Message::Unchoke { + stream.write_all(&Message::Interested.to_length_prefixed_bytes()); + } + peer.process(msg); + } + + let mut messages = vec![]; + let was_above_90 = peer.count_open_requests() > 90; + while !peer.is_choked() + && peer.count_open_requests() < 500 + && !was_above_90 + && asked < torrent.meta_info().pieces() + { + let curr_piece_length = if asked + 1 == torrent.meta_info().pieces() { + piece_length - (storage.size() as u32 % piece_length) + } else { + piece_length + }; + let size = min(request_length, curr_piece_length - progress); + let sel = SelectionMessage::new(asked as u32, progress, size); + peer.requested(sel); + let msg = Message::Request(sel); + // println!("<= {:?}", msg); + messages.extend(msg.to_length_prefixed_bytes()); + progress += size; + if progress >= curr_piece_length { + asked += 1; + progress = 0; + } + } + + if messages.len() > 0 { + stream + .write_all(&messages) + .expect("Failed writing to network socket"); + } + + while let Some(piece) = peer.next_piece() { + // println!("> {} {} {}", piece.index(), piece.offset(), piece.length()); + storage + .write( + piece.index() as usize, + piece.offset() as usize, + piece.piece(), + ) + .expect("Failed writing piece to storage"); + } + + if peer.count_open_requests() == 0 && asked > 1 { + println!("done"); + break; + } + } +} diff --git a/torment-cli/src/main.rs b/torment-cli/src/main.rs index 1367467..42c0556 100644 --- a/torment-cli/src/main.rs +++ b/torment-cli/src/main.rs @@ -1,18 +1,17 @@ -use clap::{App, Arg, SubCommand}; +use clap::load_yaml; +use clap::App; use std::fs::File; use std::io::Read; +use std::net::SocketAddr; use std::path::Path; +use std::str::FromStr; use torment_core::metainfo::Torrent; +pub mod download; + fn main() { - let matches = App::new("torment-cli") - .version(env!("CARGO_PKG_VERSION")) - .subcommand( - SubCommand::with_name("torrent") - .about("Shows details about torrent file") - .arg(Arg::with_name("file").required(true)), - ) - .get_matches(); + let yml = load_yaml!("../cli.yml"); + let matches = App::from_yaml(yml).get_matches(); match matches.subcommand() { ("torrent", Some(subcmd)) => { @@ -34,6 +33,38 @@ fn main() { } } + ("download", Some(subcmd)) => { + let file = subcmd.value_of("file").unwrap(); + let peers = subcmd + .values_of("peer") + .map(|values| { + values + .into_iter() + .filter_map(|res| match SocketAddr::from_str(res) { + Ok(addr) => Some(addr), + Err(err) => { + println!("Failed to parse {}: {}", res, err); + None + } + }) + .collect() + }) + .unwrap_or(vec![]); + let mut bytes: Vec = vec![]; + File::open(file).unwrap().read_to_end(&mut bytes).unwrap(); + match Torrent::from_bytes( + &bytes, + Path::new(file) + .file_stem() + .and_then(|file_name| file_name.to_str()) + .map(|str| str.to_string()), + ) { + Ok(torrent) => download::download(torrent, peers), + + Err(err) => println!("Error: {}", err), + } + } + _ => {} } } diff --git a/torment-core/src/lib.rs b/torment-core/src/lib.rs index 8e37b20..3e7ee53 100644 --- a/torment-core/src/lib.rs +++ b/torment-core/src/lib.rs @@ -1,16 +1,42 @@ #![allow(dead_code)] use crate::infohash::v1::U160; use crate::infohash::InfoHashCapable; +use bytes::BytesMut; +use serde::export::fmt::Debug; use serde_derive::{Deserialize, Serialize}; +use std::cmp::min; use std::convert::TryInto; use std::fmt::{Display, Formatter}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; +use std::str::FromStr; pub mod infohash; pub mod ip; pub mod metainfo; pub mod utils; +pub fn peer_id(input: [u8; 12]) -> [u8; 20] { + let peer_id = format!( + "eT{:0>2}{:0>2}{:0>2}AAAAAAAAAAAA", + env!("CARGO_PKG_VERSION_MAJOR"), + env!("CARGO_PKG_VERSION_MINOR"), + env!("CARGO_PKG_VERSION_PATCH") + ); + + let mut peer_id: [u8; 20] = peer_id.as_bytes().try_into().unwrap(); + &peer_id[8..20].copy_from_slice(&input); + peer_id +} + +pub fn compact_peer_id() -> [u8; 4] { + [ + b'e', + b'T', + u8::from_str(env!("CARGO_PKG_VERSION_MAJOR")).unwrap(), + u8::from_str(env!("CARGO_PKG_VERSION_MINOR")).unwrap(), + ] +} + pub trait CompactContact: Sized { fn to_compact_contact(&self) -> Vec; fn from_compact_contact>(input: T) -> Result; @@ -94,6 +120,66 @@ impl ContactInfo { } } +#[derive(Clone, Ord, PartialOrd, Eq, PartialEq)] +pub struct Bitfield(BytesMut); + +impl Bitfield { + pub fn new>(field: T) -> Bitfield { + Bitfield(BytesMut::from(field.as_ref())) + } + + pub fn with_size(size: usize) -> Bitfield { + Bitfield({ + let mut field = BytesMut::new(); + field.resize((size / 8) + if size % 8 > 0 { 1 } else { 0 }, 0); + field + }) + } + + pub fn set(&mut self, index: u32) { + let byte_index = index / 8; + let bitmask = 1 << (7 - index % 8); + self.0[byte_index as usize] |= bitmask; + } + + pub fn unset(&mut self, index: u32) { + let byte_index = index / 8; + let bitmask = 1 << (8 - index % 8); + self.0[byte_index as usize] &= !bitmask; + } + + pub fn get(&self, index: u32) -> bool { + let byte_index = index / 8; + let bitmask = 1 << (7 - (index % 8)); + (self.0[byte_index as usize] & bitmask) > 0 + } + + pub fn add>(&mut self, field: T) { + let field = field.as_ref(); + let max = min(self.0.len(), field.len()); + for i in 0..max { + self.0[i] |= field[i] + } + } +} + +impl Debug for Bitfield { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Bitfield(")?; + for byte in &self.0 { + write!(f, "{:b}", byte)?; + } + + write!(f, ")") + } +} + +impl AsRef<[u8]> for Bitfield { + fn as_ref(&self) -> &[u8] { + self.0.as_ref() + } +} + #[derive(Debug, Default)] pub struct PeerStorage {} @@ -114,3 +200,30 @@ impl PeerStorage { vec![] } } + +#[cfg(test)] +mod tests { + use crate::{compact_peer_id, peer_id, Bitfield}; + use rand::random; + + #[test] + fn test_peer_id() { + let x = peer_id(random()); + assert_eq!(String::from_utf8_lossy(&x[0..8]), "eT000100"); + assert_eq!(&compact_peer_id(), b"eT\x00\x01"); + } + + fn test_bitfield() { + let mut field = Bitfield::with_size(3); + field.set(0); + assert!(field.get(0)); + field.unset(0); + assert_eq!(false, field.get(0)); + field.set(2); + + let other_field = Bitfield::with_size(3); + + field.add(other_field); + assert_eq!(true, field.get(2)); + } +} diff --git a/torment-core/src/metainfo.rs b/torment-core/src/metainfo.rs index 0c09f16..4af151e 100644 --- a/torment-core/src/metainfo.rs +++ b/torment-core/src/metainfo.rs @@ -37,6 +37,7 @@ impl Display for MetaInfoParsingError { } } +#[derive(Clone)] pub struct Torrent { name: String, announce_list: Vec>, @@ -116,8 +117,25 @@ impl Torrent { bencoded, }) } + + pub fn name(&self) -> &str { + &self.name + } + + pub fn info_hash(&self) -> U160 { + self.info.info_hash() + } + + pub fn meta_info(&self) -> &MetaInfo { + &self.info + } + + pub fn announce_list(&self) -> &Vec> { + self.announce_list.as_ref() + } } +#[derive(Clone)] pub struct MetaInfo { info_hash: U160, bencoded: BencodeValue, @@ -126,6 +144,28 @@ pub struct MetaInfo { object: MetaInfoObject, } +impl MetaInfo { + pub fn info_hash(&self) -> U160 { + self.info_hash + } + + pub fn piece_length(&self) -> usize { + self.piece_length + } + + pub fn pieces(&self) -> usize { + self.pieces.len() / 20 + } + + pub fn hash(&self, index: usize) -> Bytes { + self.pieces.slice(index * 20..(index + 1) * 20) + } + + pub fn object(&self) -> &MetaInfoObject { + &self.object + } +} + #[derive(Clone, Debug)] pub enum MetaInfoObject { Files(Vec), @@ -233,4 +273,12 @@ impl MetaFile { .ok_or(MetaInfoParsingError::WrongType)?, }) } + + pub fn path(&self) -> &[String] { + &self.path + } + + pub fn length(&self) -> usize { + self.length + } } diff --git a/torment-core/src/utils.rs b/torment-core/src/utils.rs index f99be75..4eec3ce 100644 --- a/torment-core/src/utils.rs +++ b/torment-core/src/utils.rs @@ -1,6 +1,6 @@ use std::collections::{BTreeMap, BTreeSet}; use std::fmt::{Debug, Formatter}; -use std::time::Instant; +use std::time::{Duration, Instant}; #[derive(Default)] pub struct EphemeralMap { @@ -103,10 +103,28 @@ impl EphemeralMap { self.get(key).is_some() } + pub fn get_and_bump(&mut self, key: &K, from_now: Duration) -> Option<&V> { + if self.get_node(key).is_none() { + return None; + } + + self.update_expiry(&key, Instant::now() + from_now); + self.get(key) + } + pub fn get(&self, key: &K) -> Option<&V> { self.get_node(key).map(|x| &x.value) } + pub fn get_and_bump_mut(&mut self, key: &K, from_now: Duration) -> Option<&mut V> { + if self.get_node(key).is_none() { + return None; + } + + self.update_expiry(&key, Instant::now() + from_now); + self.get_mut(key) + } + pub fn get_mut(&mut self, key: &K) -> Option<&mut V> { self.get_node_mut(key).map(|x| &mut x.value) } diff --git a/torment-daemon/Cargo.toml b/torment-daemon/Cargo.toml new file mode 100644 index 0000000..96ed7bb --- /dev/null +++ b/torment-daemon/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "torment-daemon" +version = "0.1.0" +authors = ["eater <=@eater.me>"] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +torment-core = { path = "../torment-core" } +torment-peer = { path = "../torment-peer" } +torment-manager = { path = "../torment-manager" } +bytes = "0.5.6" +polling = "0.1.6" +rand = "0.7.3" \ No newline at end of file diff --git a/torment-daemon/src/main.rs b/torment-daemon/src/main.rs new file mode 100644 index 0000000..7f81a23 --- /dev/null +++ b/torment-daemon/src/main.rs @@ -0,0 +1,151 @@ +use bytes::BytesMut; +use polling::{Event, Poller}; +use std::collections::HashMap; +use std::convert::TryInto; +use std::fs::File; +use std::io::{Read, Write}; +use std::net::TcpStream; +use std::option::Option::Some; +use std::path::PathBuf; +use std::process::exit; +use std::result::Result::Ok; +use std::time::Duration; +use torment_core::infohash::v1::U160; +use torment_core::metainfo::Torrent; +use torment_core::peer_id; +use torment_manager::session_manager::SessionManager; +use torment_manager::torrent_manager::{TorrentManager, TorrentTarget}; +use torment_peer::message::{Handshake, Message}; +use torment_peer::PeerProtocol::TCP; + +struct TcpState { + builder: BytesMut, + handshake: Option, + stream: TcpStream, +} + +fn main() { + let mut session_manager = SessionManager::new(); + + let mut buffer = vec![]; + File::open("/home/eater/Downloads/[Commie] Senyuu. - 23 [150B93D5].mkv.torrent") + .unwrap() + .read_to_end(&mut buffer) + .unwrap(); + let torrent_manager = TorrentManager::from_torrent( + Torrent::from_bytes( + buffer, + Some("[Commie] Senyuu. - 23 [150B93D5].mkv".to_string()), + ) + .unwrap(), + TorrentTarget { + path: PathBuf::from("/tmp"), + is_base_path: true, + }, + None, + ); + + let peer_id = U160::from(peer_id(rand::random())); + let info_hash = torrent_manager.info_hash(); + session_manager.add_torrent_manager(torrent_manager); + + let mut tcp_stream_ktorrent = TcpStream::connect("127.0.0.1:6881").unwrap(); + tcp_stream_ktorrent.set_nodelay(true).unwrap(); + tcp_stream_ktorrent + .write_all(Handshake::new(peer_id, info_hash).to_bytes().as_ref()) + .unwrap(); + + let mut tcp_stream_transmission = TcpStream::connect("192.168.188.100:51413").unwrap(); + tcp_stream_transmission.set_nodelay(true).unwrap(); + tcp_stream_transmission + .write_all(Handshake::new(peer_id, info_hash).to_bytes().as_ref()) + .unwrap(); + + let mut buffer = vec![0u8; 4096 * 10]; + let mut poller = Poller::new().unwrap(); + poller.insert(&tcp_stream_ktorrent).unwrap(); + poller.insert(&tcp_stream_transmission).unwrap(); + poller.interest(&tcp_stream_ktorrent, Event::readable(0)); + poller.interest(&tcp_stream_transmission, Event::readable(1)); + + let mut items = vec![ + TcpState { + builder: Default::default(), + handshake: None, + stream: tcp_stream_ktorrent, + }, + TcpState { + builder: Default::default(), + handshake: None, + stream: tcp_stream_transmission, + }, + ]; + + let mut peer_map: HashMap<(U160, U160), usize> = Default::default(); + + loop { + let mut events: Vec = vec![]; + poller.wait(&mut events, Some(Duration::from_secs(10))); + + for event in events { + println!("Event => {:?}", event); + if !event.readable { + continue; + } + + let item = &mut items[event.key]; + let packet = item.stream.read(&mut buffer).unwrap(); + item.builder.extend_from_slice(&buffer[..packet]); + let handshake = if let Some(handshake) = &item.handshake { + handshake + } else { + if item.builder.len() >= 68 { + item.handshake = + Some(Handshake::from_bytes(item.builder.split_to(68).freeze()).unwrap()); + let handshake = item.handshake.as_ref().unwrap(); + println!("{} => {:?}", item.stream.peer_addr().unwrap(), handshake); + peer_map.insert((handshake.info_hash(), handshake.peer_id()), event.key); + session_manager.handshake(*handshake, item.stream.peer_addr().unwrap(), TCP); + handshake + } else { + continue; + } + }; + + while item.builder.len() >= 4 { + let len = u32::from_be_bytes(item.builder[..4].try_into().unwrap()); + if len + 4 > item.builder.len() as u32 { + break; + } + + if len == 0 { + item.builder.split_to(4); + continue; + } + + let message_bytes = item.builder.split_to((4 + len) as usize).freeze(); + let msg = Message::from_bytes(message_bytes.slice(4..)).unwrap(); + println!("{} => {:?}", item.stream.peer_addr().unwrap(), msg); + if !session_manager.process(info_hash, handshake.peer_id(), msg) { + exit(1); + } + } + + poller.interest(&item.stream, Event::readable(event.key)); + } + + while let Some(queued) = session_manager.next() { + if let Some(key) = peer_map.get(&(queued.info_hash, queued.peer_id)) { + let item = &mut items[*key]; + println!("{} <= {:?}", queued.addr, queued.message); + + item.stream + .write_all(&queued.message.to_length_prefixed_bytes()) + .unwrap(); + } + } + + println!("=> Running house keeping"); + session_manager.house_keeping(); + } +} diff --git a/torment-dht/src/krpc.rs b/torment-dht/src/krpc.rs index eacecff..48ca072 100644 --- a/torment-dht/src/krpc.rs +++ b/torment-dht/src/krpc.rs @@ -6,17 +6,15 @@ use std::option::Option::Some; use std::str::FromStr; use torment_core::infohash::v1::U160; use torment_core::infohash::InfoHashCapable; -use torment_core::{CompactContact, ContactInfo, ParsingError}; +use torment_core::{compact_peer_id, CompactContact, ContactInfo, ParsingError}; pub use bendy::decoding::FromBencode; pub use bendy::encoding::ToBencode; use std::net::SocketAddr; -const VERSION: &'static [u8] = &[b'e', b't', 0, 0]; - fn ver() -> Option { // None - Some(String::from_utf8_lossy(VERSION).to_string()) + Some(String::from_utf8_lossy(&compact_peer_id()).to_string()) } #[derive(Debug, Clone, Eq, PartialEq)] diff --git a/torment-manager/Cargo.toml b/torment-manager/Cargo.toml new file mode 100644 index 0000000..715ef45 --- /dev/null +++ b/torment-manager/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "torment-manager" +version = "0.1.0" +authors = ["eater <=@eater.me>"] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +torment-core = { path = "../torment-core" } +torment-peer = { path = "../torment-peer" } +torment-storage = { path = "../torment-storage" } +bytes = "0.5.6" +url = "2.1.1" +lazy_static = "1.4.0" +ring = "0.16.15" \ No newline at end of file diff --git a/torment-manager/src/lib.rs b/torment-manager/src/lib.rs new file mode 100644 index 0000000..e3a308a --- /dev/null +++ b/torment-manager/src/lib.rs @@ -0,0 +1,3 @@ +pub mod session_manager; +pub mod torrent_manager; +pub mod tracker_manager; diff --git a/torment-manager/src/session_manager.rs b/torment-manager/src/session_manager.rs new file mode 100644 index 0000000..d5eeaa6 --- /dev/null +++ b/torment-manager/src/session_manager.rs @@ -0,0 +1,86 @@ +use crate::torrent_manager::TorrentManager; +use std::collections::{HashMap, VecDeque}; +use std::net::SocketAddr; +use std::option::Option::Some; +use torment_core::infohash::v1::U160; +use torment_peer::message::{Handshake, Message}; +use torment_peer::PeerProtocol; + +pub struct QueuedMessage { + pub message: Message, + pub info_hash: U160, + pub peer_id: U160, + pub addr: SocketAddr, +} + +pub struct SessionManager { + torrents: HashMap, + peer_socket: HashMap<(U160, U160), SocketAddr>, + message_queue: VecDeque, +} + +impl SessionManager { + pub fn add_torrent_manager(&mut self, torrent: TorrentManager) { + self.torrents.insert(torrent.info_hash(), torrent); + } + + fn remove_torrent_manager(&mut self, info_hash: U160) -> Option { + self.torrents.remove(&info_hash) + } + + pub fn handshake( + &mut self, + handshake: Handshake, + addr: SocketAddr, + protocol: PeerProtocol, + ) -> bool { + if let Some(torrent_manager) = self.torrents.get_mut(&handshake.info_hash()) { + self.peer_socket + .insert((handshake.peer_id(), handshake.info_hash()), addr); + torrent_manager.handshake(handshake, addr, protocol) + } else { + false + } + } + + pub fn process(&mut self, torrent: U160, peer_id: U160, message: Message) -> bool { + if let Some(torrent_manager) = self.torrents.get_mut(&torrent) { + if torrent_manager.process(peer_id, message) { + while let Some((peer_id, message)) = torrent_manager.next() { + if let Some(addr) = self.peer_socket.get(&(peer_id, torrent)).copied() { + self.message_queue.push_back(QueuedMessage { + message, + addr, + peer_id, + info_hash: torrent, + }); + } + } + + true + } else { + false + } + } else { + false + } + } + + pub fn new() -> SessionManager { + SessionManager { + torrents: Default::default(), + peer_socket: Default::default(), + message_queue: Default::default(), + } + } + + pub fn next(&mut self) -> Option { + self.message_queue.pop_front() + } + + pub fn house_keeping(&mut self) { + for (_, torrent) in &mut self.torrents { + torrent.house_keeping(); + } + } +} diff --git a/torment-manager/src/torrent_manager.rs b/torment-manager/src/torrent_manager.rs new file mode 100644 index 0000000..f37e659 --- /dev/null +++ b/torment-manager/src/torrent_manager.rs @@ -0,0 +1,275 @@ +use crate::tracker_manager::{TrackerId, TrackerManager}; +use bytes::Bytes; +use ring::digest::{digest, SHA1_FOR_LEGACY_USE_ONLY}; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::net::SocketAddr; +use std::ops::Index; +use std::option::Option::Some; +use std::path::PathBuf; +use torment_core::infohash::v1::U160; +use torment_core::metainfo::{MetaInfo, Torrent}; +use torment_core::Bitfield; +use torment_peer::message::{Handshake, Message, PieceMessage, SelectionMessage}; +use torment_peer::{Peer, PeerProtocol}; +use torment_storage::{StorageMap, ToStorageMap}; + +#[derive(Debug)] +pub enum Source { + Torrent(TorrentSource), + MetaInfo(MetaInfoSource), +} + +#[derive(Debug)] +pub struct MetaInfoSource { + info_hash: U160, +} + +#[derive(Debug)] +pub struct TorrentSource { + file: Option, + torrent: Option, +} + +#[derive(Debug)] +pub struct TorrentTarget { + pub path: PathBuf, + pub is_base_path: bool, +} + +#[derive(Debug)] +pub struct TorrentManager { + info_hash: U160, + bitfield: Bitfield, + trackers: Vec>, + source: Source, + target: TorrentTarget, + peers: HashMap, + queue: VecDeque<(U160, Message)>, + storage_map: StorageMap, +} + +pub const REQUEST_SIZE: usize = 16384; + +impl TorrentManager { + pub fn info_hash(&self) -> U160 { + self.info_hash + } + + pub fn from_torrent( + torrent: Torrent, + target: TorrentTarget, + tracker_manager: Option<&mut TrackerManager>, + ) -> TorrentManager { + let trackers = tracker_manager + .map(|manager| { + torrent + .announce_list() + .iter() + .map(|tier_list| { + tier_list + .iter() + .filter_map(|tracker| manager.get_tracker_id(tracker)) + .collect::>() + }) + .collect::>() + }) + .unwrap_or(vec![]); + + TorrentManager { + info_hash: torrent.info_hash(), + bitfield: Bitfield::with_size(torrent.meta_info().pieces()), + trackers, + source: Source::Torrent(TorrentSource { + file: None, + torrent: Some(torrent.clone()), + }), + storage_map: torrent.to_storage_map(&target.path, target.is_base_path), + target, + peers: HashMap::new(), + queue: Default::default(), + } + } + + pub fn handshake( + &mut self, + handshake: Handshake, + addr: SocketAddr, + protocol: PeerProtocol, + ) -> bool { + if handshake.info_hash() != self.info_hash { + return false; + } + + if let Some(peer) = self.peers.get(&handshake.peer_id()) { + if peer.addr() != addr { + return false; + } + } + + let meta_info = self.meta_info(); + let peer = Peer::new(addr, protocol, handshake, meta_info); + self.peers.insert(peer.id(), peer); + + true + } + + fn meta_info(&mut self) -> &MetaInfo { + if let Source::Torrent(torrent) = &self.source { + return torrent + .torrent + .as_ref() + .expect("No torrent loaded uh oh missing functionality") + .meta_info(); + } + + panic!("Can't resolve MetaInfo for torrent") + } + + pub fn process(&mut self, peer_id: U160, message: Message) -> bool { + let mut queue = vec![]; + let ok = if let Some(peer) = self.peers.get_mut(&peer_id) { + if peer.process(message) { + while let Some(message) = peer.next() { + self.queue.push_back((peer_id, message)); + } + + while let Some(piece) = peer.next_piece() { + if self.storage_map.has_piece(piece.index() as usize) { + continue; + } + + if self + .storage_map + .write( + piece.index() as usize, + piece.offset() as usize, + piece.piece(), + ) + .unwrap() + { + queue.push(piece.index() as usize); + } + } + + while let Some(piece_request) = peer.next_request() { + if !self.bitfield.get(piece_request.index()) { + continue; + } + + let mut buffer = vec![0u8; piece_request.length() as usize]; + self.storage_map + .read( + piece_request.index() as usize, + piece_request.offset() as usize, + &mut buffer, + ) + .unwrap(); + + self.queue.push_back(( + peer_id, + Message::Piece(PieceMessage { + index: piece_request.index(), + offset: piece_request.offset(), + piece: Bytes::from(buffer), + }), + )) + } + + true + } else { + false + } + } else { + false + }; + + for have in queue { + let piece_hash = self.meta_info().hash(have); + let piece_data = self.storage_map.read_piece(have).unwrap(); + let res = digest(&SHA1_FOR_LEGACY_USE_ONLY, &piece_data); + if piece_hash != res.as_ref() { + println!("=> Piece#{} failed verification", have); + self.storage_map.wipe_piece(have); + continue; + } else { + println!("=> Piece#{} verified", have); + } + + self.bitfield.set(have as u32); + + let keys = self.peers.keys().copied().collect::>(); + for key in keys { + self.queue.push_back((key, Message::Have(have as u32))); + + if !self.peers[&key].has_piece(have as u32) && self.peers[&key].we_choked() { + self.peers.get_mut(&key).unwrap().set_we_choked(false); + self.queue.push_back((key, Message::Unchoke)); + } + } + } + + ok + } + + pub fn next(&mut self) -> Option<(U160, Message)> { + self.queue.pop_front() + } + + pub fn house_keeping(&mut self) { + let mut peers = self + .peers + .iter() + .filter_map(|(_, peer)| { + if peer.has()1 + + if peer.is_choked() { + None + } else { + Some(peer.id()) + } + }) + .collect::>(); + + let pieces = self.meta_info().pieces() as u32; + for i in 0u32..pieces { + if self.bitfield.get(i) { + continue; + } + + let length = self.storage_map.get_piece_length(i as usize); + let mut offset = 0; + + for peer in &peers.clone() { + if !self.peers[peer].has_piece(i) { + continue; + } + + while offset < length && self.peers[peer].count_open_requests() < 25 { + if !self.storage_map.has_piece_bit(i as usize, offset) { + let request_length = if (offset + REQUEST_SIZE) > length { + length - offset + } else { + REQUEST_SIZE + }; + + let msg = SelectionMessage::new(i, offset as u32, request_length as u32); + + if self.peers.get_mut(peer).unwrap().requested(msg) { + self.queue.push_back((*peer, Message::Request(msg))); + } + } + + offset += REQUEST_SIZE; + } + + if self.peers[peer].count_open_requests() >= 25 { + peers.remove(peer); + } + + if offset >= length { + break; + } + } + } + } +} diff --git a/torment-manager/src/tracker_manager.rs b/torment-manager/src/tracker_manager.rs new file mode 100644 index 0000000..15d407b --- /dev/null +++ b/torment-manager/src/tracker_manager.rs @@ -0,0 +1,34 @@ +use std::collections::HashMap; +use std::sync::atomic::{AtomicUsize, Ordering}; +use url::Url; + +pub struct TrackerManager { + counter: AtomicUsize, + trackers: HashMap, + url_index: HashMap, +} + +pub struct Tracker { + url: Url, +} + +pub type TrackerId = usize; + +impl TrackerManager { + pub fn get_tracker_id(&mut self, url: &Url) -> Option { + // dht is not a tracker bye + if url.scheme() == "dht" { + return None; + } + + let url_str = url.as_str(); + if let Some(id) = self.url_index.get(url_str) { + return Some(*id); + } + + let new_id = self.counter.fetch_add(1, Ordering::AcqRel); + self.url_index.insert(url_str.to_string(), new_id); + + Some(new_id) + } +} diff --git a/torment-peer/Cargo.toml b/torment-peer/Cargo.toml index 6b48d61..c5502c2 100644 --- a/torment-peer/Cargo.toml +++ b/torment-peer/Cargo.toml @@ -7,3 +7,5 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +torment-core = { path = "../torment-core" } +bytes = "0.5.6" \ No newline at end of file diff --git a/torment-peer/src/lib.rs b/torment-peer/src/lib.rs index 613c696..ca8840c 100644 --- a/torment-peer/src/lib.rs +++ b/torment-peer/src/lib.rs @@ -1,4 +1,170 @@ -struct Peer {} +use crate::message::{Handshake, Message, PieceMessage, SelectionMessage}; +use std::collections::{HashSet, VecDeque}; +use std::net::SocketAddr; +use torment_core::infohash::v1::U160; +use torment_core::metainfo::MetaInfo; +use torment_core::Bitfield; + +pub mod message; + +#[derive(Copy, Clone, Debug)] +pub struct PeerState { + chocked: bool, + interested: bool, +} + +impl PeerState { + const fn new() -> PeerState { + PeerState { + chocked: true, + interested: false, + } + } +} + +impl Default for PeerState { + fn default() -> Self { + Self::new() + } +} + +#[derive(Debug, Copy, Clone)] +pub enum PeerProtocol { + TCP, + MuTP, +} + +#[derive(Debug, Clone)] +pub struct Peer { + id: U160, + info_hash: U160, + address: SocketAddr, + protocol: PeerProtocol, + our_state: PeerState, + their_state: PeerState, + has: Bitfield, + request_state: HashSet, + request_queue: VecDeque, + requested_queue: HashSet, + received_pieces: VecDeque, + queue: VecDeque, +} + +impl Peer { + pub fn id(&self) -> U160 { + self.id + } + + pub fn new( + address: SocketAddr, + protocol: PeerProtocol, + header: Handshake, + meta_info: &MetaInfo, + ) -> Self { + Peer { + id: header.peer_id(), + info_hash: header.info_hash(), + address, + protocol, + our_state: PeerState::new(), + their_state: PeerState::new(), + has: Bitfield::with_size(meta_info.pieces()), + request_state: Default::default(), + request_queue: Default::default(), + requested_queue: Default::default(), + received_pieces: Default::default(), + queue: Default::default(), + } + } + + pub fn has(&mut self) -> Bitfield { + self.has.clone() + } + + pub fn process(&mut self, message: Message) -> bool { + match message { + Message::Choke => self.their_state.chocked = true, + Message::Unchoke => self.their_state.chocked = false, + Message::Interested => self.their_state.interested = true, + Message::NotInterested => self.their_state.interested = false, + Message::Have(nr) => self.has.set(nr), + Message::Bitfield(bitfield) => self.has.add(bitfield), + Message::Request(selection) => { + self.request_queue.push_back(selection); + self.request_state.insert(selection); + } + Message::Cancel(selection) => { + self.request_state.remove(&selection); + } + Message::Piece(piece) => { + self.requested_queue.remove(&SelectionMessage::new( + piece.index(), + piece.offset(), + piece.length(), + )); + self.received_pieces.push_back(piece); + } + } + + true + } + + pub fn count_open_requests(&self) -> usize { + self.requested_queue.len() + } + + pub fn is_choked(&self) -> bool { + self.their_state.chocked + } + + pub fn is_interested(&self) -> bool { + self.their_state.interested + } + + pub fn we_interested(&self) -> bool { + self.our_state.interested + } + + pub fn we_choked(&self) -> bool { + self.our_state.chocked + } + + pub fn set_we_choked(&mut self, choked: bool) { + self.our_state.chocked = choked; + } + + pub fn set_we_interested(&mut self, interested: bool) { + self.our_state.interested = interested; + } + + pub fn requested(&mut self, selection: SelectionMessage) -> bool { + self.requested_queue.insert(selection) + } + + pub fn next_piece(&mut self) -> Option { + self.received_pieces.pop_front() + } + + pub fn next_request(&mut self) -> Option { + while let Some(item) = self.request_queue.pop_front() { + if self.request_state.remove(&item) { + return Some(item); + } + } + None + } + + pub fn addr(&self) -> SocketAddr { + self.address + } + pub fn has_piece(&self, index: u32) -> bool { + self.has.get(index as u32) + } + + pub fn next(&mut self) -> Option { + self.queue.pop_front() + } +} #[cfg(test)] mod tests { diff --git a/torment-peer/src/message.rs b/torment-peer/src/message.rs new file mode 100644 index 0000000..36669cc --- /dev/null +++ b/torment-peer/src/message.rs @@ -0,0 +1,292 @@ +use bytes::{Bytes, BytesMut}; +use std::convert::TryInto; +use std::error::Error; +use std::fmt::{Debug, Display, Formatter}; +use torment_core::infohash::v1::U160; +use torment_core::infohash::InfoHashCapable; +use torment_core::Bitfield; + +#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)] +pub enum Message { + Choke, + Unchoke, + Interested, + NotInterested, + Have(u32), + Bitfield(Bitfield), + Request(SelectionMessage), + Cancel(SelectionMessage), + Piece(PieceMessage), +} + +#[derive(Clone, Debug)] +pub enum MessageParsingError { + TooShort, + UnknownType, + InvalidPrefix, +} + +impl Error for MessageParsingError {} + +impl Display for MessageParsingError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl Message { + pub fn from_bytes(bytes: Bytes) -> Result { + Ok(match bytes[0] { + 0 => Message::Choke, + 1 => Message::Unchoke, + 2 => Message::Interested, + 3 => Message::NotInterested, + 4 => { + if bytes.len() < 5 { + return Err(MessageParsingError::TooShort); + } + + Message::Have(u32::from_be_bytes(bytes[1..5].try_into().unwrap())) + } + + 5 => Message::Bitfield(Bitfield::new(bytes.slice(1..))), + + 6 | 8 => { + if bytes.len() < 13 { + return Err(MessageParsingError::TooShort); + } + + let selection = SelectionMessage { + index: u32::from_be_bytes(bytes[1..5].try_into().unwrap()), + offset: u32::from_be_bytes(bytes[5..9].try_into().unwrap()), + length: u32::from_be_bytes(bytes[9..13].try_into().unwrap()), + }; + + if bytes[0] == 6 { + Message::Request(selection) + } else { + Message::Cancel(selection) + } + } + + 7 => { + if bytes.len() < 9 { + return Err(MessageParsingError::TooShort); + } + + Message::Piece(PieceMessage { + index: u32::from_be_bytes(bytes[1..5].try_into().unwrap()), + offset: u32::from_be_bytes(bytes[5..9].try_into().unwrap()), + piece: bytes.slice(9..), + }) + } + + _ => return Err(MessageParsingError::UnknownType), + }) + } + + pub fn to_length_prefixed_bytes(&self) -> Vec { + let buffer = self.to_bytes(); + let size = (buffer.len() as u32).to_be_bytes(); + let mut map = Vec::with_capacity(buffer.len() + 4); + map.extend_from_slice(&size); + map.extend(buffer); + map + } + + pub fn to_bytes(&self) -> Bytes { + match self { + Message::Choke => Bytes::from_static(&[0u8]), + Message::Unchoke => Bytes::from_static(&[1u8]), + Message::Interested => Bytes::from_static(&[2u8]), + Message::NotInterested => Bytes::from_static(&[3u8]), + Message::Have(piece) => { + let mut buffer = BytesMut::with_capacity(5); + buffer.extend(&[4]); + buffer.extend_from_slice(&piece.to_be_bytes()); + buffer.freeze() + } + Message::Bitfield(bitfield) => { + let mut buffer = BytesMut::with_capacity(1 + bitfield.as_ref().len()); + buffer.extend(&[5]); + buffer.extend_from_slice(bitfield.as_ref()); + buffer.freeze() + } + Message::Request(request) => { + let mut buffer = BytesMut::with_capacity(13); + buffer.extend(&[6]); + buffer.extend_from_slice(&request.index.to_be_bytes()); + buffer.extend_from_slice(&request.offset.to_be_bytes()); + buffer.extend_from_slice(&request.length.to_be_bytes()); + buffer.freeze() + } + Message::Piece(piece) => { + let mut buffer = BytesMut::with_capacity(9 + piece.piece.len()); + buffer.extend(&[7]); + buffer.extend_from_slice(&piece.index.to_be_bytes()); + buffer.extend_from_slice(&piece.offset.to_be_bytes()); + buffer.extend_from_slice(piece.piece.as_ref()); + buffer.freeze() + } + Message::Cancel(cancel) => { + let mut buffer = BytesMut::with_capacity(13); + buffer.extend(&[8]); + buffer.extend_from_slice(&cancel.index.to_be_bytes()); + buffer.extend_from_slice(&cancel.offset.to_be_bytes()); + buffer.extend_from_slice(&cancel.length.to_be_bytes()); + buffer.freeze() + } + } + } +} + +#[derive(Clone, Debug, Copy)] +pub struct Handshake { + reserved: [u8; 8], + info_hash: U160, + peer_id: U160, +} + +impl Handshake { + pub fn from_bytes(bytes: Bytes) -> Result { + if bytes.len() < 68 { + return Err(MessageParsingError::TooShort); + } + + if &bytes[0..20] != b"\x13BitTorrent protocol" { + return Err(MessageParsingError::InvalidPrefix); + } + + Ok(Handshake { + reserved: bytes[20..28].try_into().unwrap(), + info_hash: U160::from_bytes(&bytes[28..48]).unwrap(), + peer_id: U160::from_bytes(&bytes[48..68]).unwrap(), + }) + } + + pub fn to_bytes(&self) -> [u8; 68] { + let mut header = + *b"\x13BitTorrent protocol\0\0\0\0\0\0\0\0AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"; + header[28..48].copy_from_slice(self.info_hash.to_bytes().as_ref()); + header[48..68].copy_from_slice(self.peer_id.to_bytes().as_ref()); + header + } + + pub fn new(peer_id: U160, info_hash: U160) -> Handshake { + Handshake { + reserved: [0; 8], + info_hash, + peer_id, + } + } + + pub fn peer_id(&self) -> U160 { + self.peer_id + } + + pub fn info_hash(&self) -> U160 { + self.info_hash + } +} + +#[derive(Clone, Debug, Hash, Copy, Ord, PartialOrd, Eq, PartialEq)] +pub struct SelectionMessage { + index: u32, + offset: u32, + length: u32, +} + +impl SelectionMessage { + pub fn new(index: u32, offset: u32, length: u32) -> SelectionMessage { + SelectionMessage { + index, + offset, + length, + } + } + + pub fn index(&self) -> u32 { + self.index + } + + pub fn offset(&self) -> u32 { + self.offset + } + + pub fn length(&self) -> u32 { + self.length + } +} + +#[derive(Clone, Ord, PartialOrd, Eq, PartialEq)] +pub struct PieceMessage { + pub index: u32, + pub offset: u32, + pub piece: Bytes, +} + +impl Debug for PieceMessage { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PieceMessage") + .field("index", &self.index) + .field("offset", &self.offset) + .field("piece", &self.piece.len()) + .finish() + } +} + +impl PieceMessage { + pub fn index(&self) -> u32 { + self.index + } + + pub fn offset(&self) -> u32 { + self.offset + } + + pub fn length(&self) -> u32 { + self.piece.len() as u32 + } + + pub fn piece(&self) -> Bytes { + self.piece.clone() + } +} + +#[cfg(test)] +mod tests { + use crate::message::{Message, PieceMessage, SelectionMessage}; + use bytes::Bytes; + use torment_core::Bitfield; + + #[test] + fn round_trip() { + let msgs = [ + Message::Choke, + Message::Unchoke, + Message::Interested, + Message::NotInterested, + Message::Have(42), + Message::Bitfield(Bitfield::with_size(4)), + Message::Request(SelectionMessage { + index: 69, + offset: 1337, + length: 42, + }), + Message::Piece(PieceMessage { + index: 69, + offset: 1337, + piece: Bytes::from_static(b"hewwo"), + }), + Message::Cancel(SelectionMessage { + index: 69, + offset: 1337, + length: 42, + }), + ]; + + for msg in &msgs { + assert_eq!(msg, &Message::from_bytes(msg.to_bytes()).unwrap()) + } + } +} diff --git a/torment-storage/Cargo.toml b/torment-storage/Cargo.toml new file mode 100644 index 0000000..2d9fecd --- /dev/null +++ b/torment-storage/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "torment-storage" +version = "0.1.0" +authors = ["eater <=@eater.me>"] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +torment-core = { path = "../torment-core" } +remem = "0.1.0" +bytes = "0.5.6" +lazy_static = "1.4.0" diff --git a/torment-storage/src/lib.rs b/torment-storage/src/lib.rs new file mode 100644 index 0000000..6f8cdb1 --- /dev/null +++ b/torment-storage/src/lib.rs @@ -0,0 +1,348 @@ +use bytes::{Bytes, BytesMut}; +use lazy_static::lazy_static; +use remem::{ItemGuard, Pool}; +use std::borrow::Borrow; +use std::cmp::min; +use std::collections::{BTreeMap, HashMap, HashSet}; +use std::fmt::{Debug, Formatter}; +use std::fs::{DirBuilder, File, OpenOptions}; +use std::io; +use std::io::{Read, Seek, SeekFrom, Write}; +use std::ops::{Add, Deref, Range}; +use std::path::{Path, PathBuf, MAIN_SEPARATOR}; +use std::rc::Rc; +use std::sync::Mutex; +use std::time::{Duration, Instant}; +use torment_core::metainfo::{MetaInfo, MetaInfoObject, Torrent}; +use torment_core::utils::EphemeralMap; + +lazy_static! { + static ref MEMORY_POOL: Pool> = Pool::new(|| vec![0u8; 4194304]); +} + +#[derive(Debug)] +pub struct StorageMap { + base_path: PathBuf, + piece_length: usize, + pieces: usize, + open_files: HashMap>>, + buffer: HashMap, + mapping: BTreeMap, + size: usize, +} + +pub struct StoragePiece { + index: usize, + pieces: usize, + ranges: HashSet, + buffer: ItemGuard<'static, Vec>, +} + +impl StoragePiece { + fn is_complete(&self) -> bool { + self.pieces == self.ranges.len() + } +} + +impl Debug for StoragePiece { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("StoragePiece") + .field("index", &self.index) + .field("ranges", &self.ranges) + .finish() + } +} + +impl StorageMap { + fn get_offsets(&self, offset: usize, length: usize) -> Vec { + let x = 0..1; + let end = offset + length; + let mut range = self.mapping.range(offset..); + let mut items = vec![]; + while let Some((item_end, _)) = range.next() { + items.push(*item_end); + + if *item_end > end { + break; + } + } + + items + } + + fn get_file(&mut self, item: &StorageMapping) -> io::Result>> { + if let Some(file) = self.open_files.get(&item.offset) { + return Ok(Rc::clone(file)); + } + + let path = item.path.join(&format!("{}", MAIN_SEPARATOR)); + let file_path = self.base_path.join(path); + println!("Opening file: {:?}", file_path); + + if let Some(parent) = file_path.parent() { + DirBuilder::new().recursive(true).create(parent)?; + } + + let file = OpenOptions::new() + .write(true) + .read(true) + .create(true) + .open(file_path.as_path())?; + + let rc = Rc::new(Mutex::new(file)); + self.open_files.insert(item.offset, Rc::clone(&rc)); + Ok(rc) + } + + fn internal_write>(&mut self, offset: usize, data: T) -> io::Result<()> { + let data = data.as_ref(); + let mut written = 0; + for item_end in self.get_offsets(offset, data.len()) { + let item = self.mapping.get(&item_end).unwrap().clone(); + let file = self.get_file(&item)?; + let mut file = file.lock().unwrap(); + let curr_offset = (offset + written) - item.offset; + file.seek(SeekFrom::Start(curr_offset as u64))?; + let to_write = min(item.length - curr_offset, data.len() - written); + file.write_all(&data[written..written + to_write])?; + written += to_write; + } + + assert_eq!(written, data.len(), "Failed to write all data"); + + Ok(()) + } + + fn internal_read>(&mut self, offset: usize, mut data: T) -> io::Result<()> { + let data = data.as_mut(); + let mut readed = 0; + for item_end in self.get_offsets(offset, data.len()) { + let item = self.mapping.get(&item_end).unwrap().clone(); + let file = self.get_file(&item)?; + let mut file = file.lock().unwrap(); + let curr_offset = (offset + readed) - item.offset; + file.seek(SeekFrom::Start(curr_offset as u64))?; + let to_read = min(item.length - curr_offset, data.len() - readed); + file.read_exact(&mut data[readed..readed + to_read])?; + readed += to_read; + } + + assert_eq!(readed, data.len(), "Failed to read all data"); + + Ok(()) + } + + pub fn has_piece(&self, index: usize) -> bool { + if let Some(piece) = self.buffer.get(&index) { + piece.is_complete() + } else { + false + } + } + + pub fn read_piece(&mut self, index: usize) -> io::Result> { + let mut bytes = vec![0; self.get_piece_length(index)]; + self.read(index, 0, &mut bytes); + Ok(bytes) + } + + pub fn read>( + &mut self, + index: usize, + offset: usize, + mut data: T, + ) -> io::Result<()> { + let data = data.as_mut(); + self.internal_read((index * self.piece_length) + offset, data) + } + + pub fn write>( + &mut self, + index: usize, + offset: usize, + data: T, + ) -> io::Result { + let data = data.as_ref(); + let item = if let Some(item) = self.buffer.get_mut(&index) { + item + } else { + let request_size = 2usize.pow(14); + let piece_length = self.get_piece_length(index); + let pieces = (piece_length / request_size) + + if piece_length % request_size > 0 { + 1 + } else { + 0 + }; + + self.buffer.insert(index, { + StoragePiece { + index, + ranges: Default::default(), + buffer: MEMORY_POOL.get(), + pieces, + } + }); + + self.buffer.get_mut(&index).unwrap() + }; + + item.buffer[offset..offset + data.len()].copy_from_slice(data); + item.ranges.insert(offset); + + if item.is_complete() { + if let Some(item) = self.buffer.remove(&index) { + self.internal_write( + item.index * self.piece_length, + &item.buffer[0..self.get_piece_length(item.index)], + )?; + } + + Ok(true) + } else { + Ok(false) + } + } + + pub fn wipe_piece(&mut self, index: usize) { + self.buffer.remove(&index); + } + + pub fn get_piece_length(&self, index: usize) -> usize { + if index + 1 == self.pieces { + let len = (self.size % self.piece_length); + if len == 0 { + self.piece_length + } else { + len + } + } else { + self.piece_length + } + } + + pub fn has_piece_bit(&self, index: usize, offset: usize) -> bool { + self.buffer + .get(&index) + .map(|item| item.ranges.contains(&offset)) + .unwrap_or(false) + } + + pub fn house_keeping(&mut self) { + // self.open_files.clean() + } + + pub fn size(&self) -> usize { + self.size + } +} + +pub trait ToStorageMap { + fn to_storage_map>(&self, path: P, is_base_path: bool) -> StorageMap; +} + +impl ToStorageMap for Torrent { + fn to_storage_map>(&self, path: P, is_base_path: bool) -> StorageMap { + let name = self.name().clone(); + match self.meta_info().object() { + MetaInfoObject::File(size) => { + if is_base_path { + StorageMapBuilder::create(path, self.meta_info().piece_length()) + .insert(vec![name.to_string()], *size) + .build() + } else { + let path = path.as_ref().to_path_buf(); + let parent = path.parent().unwrap_or(if cfg!(target_os = "win") { + Path::new("C:\\") + } else { + Path::new("/") + }); + + StorageMapBuilder::create(parent, self.meta_info().piece_length()) + .insert( + vec![path + .file_name() + .map(|os_str| os_str.to_string_lossy().to_string()) + .unwrap_or_else(|| self.name().to_string())], + *size, + ) + .build() + } + } + + MetaInfoObject::Files(files) => { + let mut builder = if is_base_path { + StorageMapBuilder::create(path, self.meta_info().piece_length()) + } else { + StorageMapBuilder::create( + path.as_ref().join(self.name()), + self.meta_info().piece_length(), + ) + }; + + for file in files { + builder = builder.insert(file.path().to_vec(), file.length()); + } + + builder.build() + } + } + } +} + +#[derive(Debug, Clone)] +pub struct StorageMapping { + path: Vec, + length: usize, + offset: usize, +} + +pub struct StorageMapBuilder { + base_path: PathBuf, + offset: usize, + piece_length: usize, + items: BTreeMap, +} + +impl StorageMapBuilder { + pub fn create>(path: P, piece_length: usize) -> StorageMapBuilder { + StorageMapBuilder { + base_path: path.as_ref().to_path_buf(), + offset: 0, + piece_length, + items: BTreeMap::new(), + } + } + + pub fn insert(mut self, path: Vec, length: usize) -> Self { + let offset = self.offset; + self.offset += length; + self.items.insert( + self.offset - 1, + StorageMapping { + offset, + length, + path, + }, + ); + + self + } + + pub fn build(self) -> StorageMap { + StorageMap { + base_path: self.base_path, + piece_length: self.piece_length, + pieces: (self.offset / self.piece_length) + + if self.offset % self.piece_length > 0 { + 1 + } else { + 0 + }, + open_files: Default::default(), + buffer: Default::default(), + mapping: self.items, + size: self.offset, + } + } +}