Compare commits

...

1 commit

Author SHA1 Message Date
991e9a5844
wip 2020-09-01 23:48:09 +02:00
25 changed files with 136346 additions and 21 deletions

100
Cargo.lock generated
View file

@ -262,6 +262,7 @@ dependencies = [
"textwrap",
"unicode-width",
"vec_map",
"yaml-rust",
]
[[package]]
@ -273,6 +274,21 @@ dependencies = [
"cache-padded",
]
[[package]]
name = "crossbeam-epoch"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace"
dependencies = [
"autocfg",
"cfg-if",
"crossbeam-utils",
"lazy_static",
"maybe-uninit",
"memoffset",
"scopeguard",
]
[[package]]
name = "crossbeam-utils"
version = "0.7.2"
@ -547,12 +563,27 @@ version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
[[package]]
name = "maybe-uninit"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"
[[package]]
name = "memchr"
version = "2.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400"
[[package]]
name = "memoffset"
version = "0.5.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c198b026e1bbf08a937e94c6c60f9ec4a2267f5b0d2eec9c1b21b061ce2be55f"
dependencies = [
"autocfg",
]
[[package]]
name = "miniz_oxide"
version = "0.4.0"
@ -666,12 +697,13 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "polling"
version = "0.1.5"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e09dffb745feffca5be3dea51c02b7b368c4597ab0219a82acaf9799ab3e0d1"
checksum = "d10bd4578b2ca39fa2581c058921cb50ad226a8999829ba595e1665bcfdaf4a8"
dependencies = [
"cfg-if",
"libc",
"log",
"wepoll-sys-stjepang",
"winapi",
]
@ -759,6 +791,15 @@ version = "0.1.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce"
[[package]]
name = "remem"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab4a09a8f306c4c15760db502c6c2aa2f5ea71edeb185d1b6fba293b066a9cff"
dependencies = [
"crossbeam-epoch",
]
[[package]]
name = "ring"
version = "0.16.15"
@ -792,6 +833,12 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2"
[[package]]
name = "scopeguard"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "serde"
version = "1.0.115"
@ -913,8 +960,12 @@ dependencies = [
name = "torment-cli"
version = "0.1.0"
dependencies = [
"bytes",
"clap",
"rand",
"torment-core",
"torment-peer",
"torment-storage",
]
[[package]]
@ -935,6 +986,18 @@ dependencies = [
"url",
]
[[package]]
name = "torment-daemon"
version = "0.1.0"
dependencies = [
"bytes",
"polling",
"rand",
"torment-core",
"torment-manager",
"torment-peer",
]
[[package]]
name = "torment-dht"
version = "0.1.0"
@ -962,9 +1025,36 @@ dependencies = [
"torment-dht",
]
[[package]]
name = "torment-manager"
version = "0.1.0"
dependencies = [
"bytes",
"lazy_static",
"ring",
"torment-core",
"torment-peer",
"torment-storage",
"url",
]
[[package]]
name = "torment-peer"
version = "0.1.0"
dependencies = [
"bytes",
"torment-core",
]
[[package]]
name = "torment-storage"
version = "0.1.0"
dependencies = [
"bytes",
"lazy_static",
"remem",
"torment-core",
]
[[package]]
name = "unicode-bidi"
@ -1143,3 +1233,9 @@ name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "yaml-rust"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e66366e18dc58b46801afbf2ca7661a9f59cc8c5962c29892b6039b4f86fa992"

View file

@ -5,5 +5,8 @@ members = [
"torment-dht",
"torment-dht-node",
"torment-bencode",
"torment-peer"
"torment-peer",
"torment-storage",
"torment-manager",
"torment-daemon"
]

View file

@ -2,4 +2,8 @@
A torrent client
It's written in Rust, that's the only selling point really
It's written in Rust, that's the only selling point really
## Slightly important
Torment uses `eT` as peer id prefix, in both DHT and BitTorrent traffic, DHT uses `eT<major u8><minor u8>` and BitTorrent will use `eT<2 char ASCII major><2 char ASCII minor><2 char ASCII patch>`, I assume 96 bits will be enough to differentiate the 3 users that will be using Torment

128337
callgrind.out.19249 Normal file

File diff suppressed because it is too large Load diff

6108
nice.svg Normal file

File diff suppressed because it is too large Load diff

After

Width:  |  Height:  |  Size: 601 KiB

View file

@ -7,5 +7,9 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
clap = "2.33.3"
torment-core = { path = "../torment-core" }
clap = {version = "2.33.3", features = ["yaml"]}
torment-core = { path = "../torment-core" }
torment-peer = { path = "../torment-peer" }
torment-storage = { path = "../torment-storage" }
rand = "0.7.3"
bytes = "0.5.6"

20
torment-cli/cli.yml Normal file
View file

@ -0,0 +1,20 @@
name: torment-cli
version: "0.1.0"
about: "A torrent client"
subcommands:
- torrent:
about: "Show details about a torrent"
args:
- file:
required: true
- download:
about: "Show details about a torrent"
args:
- file:
help: Set the torrent to download
required: true
- peer:
short: p
long: peer
help: Set a custom peer
takes_value: true

144
torment-cli/src/download.rs Normal file
View file

@ -0,0 +1,144 @@
use bytes::{Bytes, BytesMut};
use rand::random;
use std::cmp::min;
use std::convert::TryInto;
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpStream};
use std::process::exit;
use torment_core::infohash::v1::U160;
use torment_core::infohash::InfoHashCapable;
use torment_core::metainfo::Torrent;
use torment_core::peer_id;
use torment_peer::message::Message::Request;
use torment_peer::message::{Handshake, Message, SelectionMessage};
use torment_peer::{Peer, PeerProtocol};
use torment_storage::ToStorageMap;
pub fn download(torrent: Torrent, peers: Vec<SocketAddr>) {
println!("Starting download of {}", torrent.name());
if peers.len() == 0 {
println!("No peers given, stopping");
exit(1);
}
let peer = peers.first().unwrap();
let mut stream = TcpStream::connect(peer).expect("Failed to open tcp connection to peer");
stream.set_nodelay(true).expect("Failed setting nodelay");
let id = U160::from(peer_id(random()));
let our_header = Handshake::new(id, torrent.info_hash());
stream.write(&our_header.to_bytes());
let mut buffer = [0u8; 4096];
let mut done_header = false;
let mut header = Handshake::new(U160::random(), U160::random());
let mut peer = None;
let mut message_buffer = BytesMut::with_capacity(4096 * 10);
let mut storage = torrent.to_storage_map("/tmp", false);
let mut asked = 0;
let request_length = 2u32.pow(14);
let piece_length = torrent.meta_info().piece_length() as u32;
let mut progress = 0;
let mut header_buffer = BytesMut::new();
loop {
let size = stream.read(&mut buffer).expect("Failed to receive data");
if done_header == false {
header_buffer.extend_from_slice(&buffer[..size]);
if header_buffer.len() < 68 {
continue;
}
header = Handshake::from_bytes(Bytes::from(buffer[..size].to_vec()))
.expect("Failed to parse header");
if header.info_hash() != torrent.info_hash() {
panic!(
"Peer wants to serve different torrent ({}) :(",
header.info_hash()
);
}
println!(
"Peer[{}] connected",
String::from_utf8_lossy(&header.peer_id().to_bytes())
);
peer = Some(Peer::new(
stream.peer_addr().unwrap(),
PeerProtocol::TCP,
header,
torrent.meta_info(),
));
done_header = true;
// stream.write_all(&*Message::Request(SelectionMessage::new(0, 0, 1024)).to_bytes());
continue;
}
let mut peer = peer.as_mut().unwrap();
message_buffer.extend_from_slice(&buffer[..size]);
while message_buffer.len() >= 4 {
let length = u32::from_be_bytes(message_buffer[..4].try_into().unwrap());
if length == 0 {
message_buffer.split_to(4);
continue;
}
if message_buffer.len() < (length + 4) as usize {
break;
}
let message = message_buffer.split_to((4 + length) as usize).freeze();
let msg = Message::from_bytes(message.slice(4..)).expect("Failed parsing message");
// println!("=> {:?}", msg);
if msg == Message::Unchoke {
stream.write_all(&Message::Interested.to_length_prefixed_bytes());
}
peer.process(msg);
}
let mut messages = vec![];
let was_above_90 = peer.count_open_requests() > 90;
while !peer.is_choked()
&& peer.count_open_requests() < 500
&& !was_above_90
&& asked < torrent.meta_info().pieces()
{
let curr_piece_length = if asked + 1 == torrent.meta_info().pieces() {
piece_length - (storage.size() as u32 % piece_length)
} else {
piece_length
};
let size = min(request_length, curr_piece_length - progress);
let sel = SelectionMessage::new(asked as u32, progress, size);
peer.requested(sel);
let msg = Message::Request(sel);
// println!("<= {:?}", msg);
messages.extend(msg.to_length_prefixed_bytes());
progress += size;
if progress >= curr_piece_length {
asked += 1;
progress = 0;
}
}
if messages.len() > 0 {
stream
.write_all(&messages)
.expect("Failed writing to network socket");
}
while let Some(piece) = peer.next_piece() {
// println!("> {} {} {}", piece.index(), piece.offset(), piece.length());
storage
.write(
piece.index() as usize,
piece.offset() as usize,
piece.piece(),
)
.expect("Failed writing piece to storage");
}
if peer.count_open_requests() == 0 && asked > 1 {
println!("done");
break;
}
}
}

View file

@ -1,18 +1,17 @@
use clap::{App, Arg, SubCommand};
use clap::load_yaml;
use clap::App;
use std::fs::File;
use std::io::Read;
use std::net::SocketAddr;
use std::path::Path;
use std::str::FromStr;
use torment_core::metainfo::Torrent;
pub mod download;
fn main() {
let matches = App::new("torment-cli")
.version(env!("CARGO_PKG_VERSION"))
.subcommand(
SubCommand::with_name("torrent")
.about("Shows details about torrent file")
.arg(Arg::with_name("file").required(true)),
)
.get_matches();
let yml = load_yaml!("../cli.yml");
let matches = App::from_yaml(yml).get_matches();
match matches.subcommand() {
("torrent", Some(subcmd)) => {
@ -34,6 +33,38 @@ fn main() {
}
}
("download", Some(subcmd)) => {
let file = subcmd.value_of("file").unwrap();
let peers = subcmd
.values_of("peer")
.map(|values| {
values
.into_iter()
.filter_map(|res| match SocketAddr::from_str(res) {
Ok(addr) => Some(addr),
Err(err) => {
println!("Failed to parse {}: {}", res, err);
None
}
})
.collect()
})
.unwrap_or(vec![]);
let mut bytes: Vec<u8> = vec![];
File::open(file).unwrap().read_to_end(&mut bytes).unwrap();
match Torrent::from_bytes(
&bytes,
Path::new(file)
.file_stem()
.and_then(|file_name| file_name.to_str())
.map(|str| str.to_string()),
) {
Ok(torrent) => download::download(torrent, peers),
Err(err) => println!("Error: {}", err),
}
}
_ => {}
}
}

View file

@ -1,16 +1,42 @@
#![allow(dead_code)]
use crate::infohash::v1::U160;
use crate::infohash::InfoHashCapable;
use bytes::BytesMut;
use serde::export::fmt::Debug;
use serde_derive::{Deserialize, Serialize};
use std::cmp::min;
use std::convert::TryInto;
use std::fmt::{Display, Formatter};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::str::FromStr;
pub mod infohash;
pub mod ip;
pub mod metainfo;
pub mod utils;
pub fn peer_id(input: [u8; 12]) -> [u8; 20] {
let peer_id = format!(
"eT{:0>2}{:0>2}{:0>2}AAAAAAAAAAAA",
env!("CARGO_PKG_VERSION_MAJOR"),
env!("CARGO_PKG_VERSION_MINOR"),
env!("CARGO_PKG_VERSION_PATCH")
);
let mut peer_id: [u8; 20] = peer_id.as_bytes().try_into().unwrap();
&peer_id[8..20].copy_from_slice(&input);
peer_id
}
pub fn compact_peer_id() -> [u8; 4] {
[
b'e',
b'T',
u8::from_str(env!("CARGO_PKG_VERSION_MAJOR")).unwrap(),
u8::from_str(env!("CARGO_PKG_VERSION_MINOR")).unwrap(),
]
}
pub trait CompactContact: Sized {
fn to_compact_contact(&self) -> Vec<u8>;
fn from_compact_contact<T: AsRef<[u8]>>(input: T) -> Result<Self, ParsingError>;
@ -94,6 +120,66 @@ impl ContactInfo {
}
}
#[derive(Clone, Ord, PartialOrd, Eq, PartialEq)]
pub struct Bitfield(BytesMut);
impl Bitfield {
pub fn new<T: AsRef<[u8]>>(field: T) -> Bitfield {
Bitfield(BytesMut::from(field.as_ref()))
}
pub fn with_size(size: usize) -> Bitfield {
Bitfield({
let mut field = BytesMut::new();
field.resize((size / 8) + if size % 8 > 0 { 1 } else { 0 }, 0);
field
})
}
pub fn set(&mut self, index: u32) {
let byte_index = index / 8;
let bitmask = 1 << (7 - index % 8);
self.0[byte_index as usize] |= bitmask;
}
pub fn unset(&mut self, index: u32) {
let byte_index = index / 8;
let bitmask = 1 << (8 - index % 8);
self.0[byte_index as usize] &= !bitmask;
}
pub fn get(&self, index: u32) -> bool {
let byte_index = index / 8;
let bitmask = 1 << (7 - (index % 8));
(self.0[byte_index as usize] & bitmask) > 0
}
pub fn add<T: AsRef<[u8]>>(&mut self, field: T) {
let field = field.as_ref();
let max = min(self.0.len(), field.len());
for i in 0..max {
self.0[i] |= field[i]
}
}
}
impl Debug for Bitfield {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Bitfield(")?;
for byte in &self.0 {
write!(f, "{:b}", byte)?;
}
write!(f, ")")
}
}
impl AsRef<[u8]> for Bitfield {
fn as_ref(&self) -> &[u8] {
self.0.as_ref()
}
}
#[derive(Debug, Default)]
pub struct PeerStorage {}
@ -114,3 +200,30 @@ impl PeerStorage {
vec![]
}
}
#[cfg(test)]
mod tests {
use crate::{compact_peer_id, peer_id, Bitfield};
use rand::random;
#[test]
fn test_peer_id() {
let x = peer_id(random());
assert_eq!(String::from_utf8_lossy(&x[0..8]), "eT000100");
assert_eq!(&compact_peer_id(), b"eT\x00\x01");
}
fn test_bitfield() {
let mut field = Bitfield::with_size(3);
field.set(0);
assert!(field.get(0));
field.unset(0);
assert_eq!(false, field.get(0));
field.set(2);
let other_field = Bitfield::with_size(3);
field.add(other_field);
assert_eq!(true, field.get(2));
}
}

View file

@ -37,6 +37,7 @@ impl Display for MetaInfoParsingError {
}
}
#[derive(Clone)]
pub struct Torrent {
name: String,
announce_list: Vec<HashSet<Url>>,
@ -116,8 +117,25 @@ impl Torrent {
bencoded,
})
}
pub fn name(&self) -> &str {
&self.name
}
pub fn info_hash(&self) -> U160 {
self.info.info_hash()
}
pub fn meta_info(&self) -> &MetaInfo {
&self.info
}
pub fn announce_list(&self) -> &Vec<HashSet<Url>> {
self.announce_list.as_ref()
}
}
#[derive(Clone)]
pub struct MetaInfo {
info_hash: U160,
bencoded: BencodeValue,
@ -126,6 +144,28 @@ pub struct MetaInfo {
object: MetaInfoObject,
}
impl MetaInfo {
pub fn info_hash(&self) -> U160 {
self.info_hash
}
pub fn piece_length(&self) -> usize {
self.piece_length
}
pub fn pieces(&self) -> usize {
self.pieces.len() / 20
}
pub fn hash(&self, index: usize) -> Bytes {
self.pieces.slice(index * 20..(index + 1) * 20)
}
pub fn object(&self) -> &MetaInfoObject {
&self.object
}
}
#[derive(Clone, Debug)]
pub enum MetaInfoObject {
Files(Vec<MetaFile>),
@ -233,4 +273,12 @@ impl MetaFile {
.ok_or(MetaInfoParsingError::WrongType)?,
})
}
pub fn path(&self) -> &[String] {
&self.path
}
pub fn length(&self) -> usize {
self.length
}
}

View file

@ -1,6 +1,6 @@
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::{Debug, Formatter};
use std::time::Instant;
use std::time::{Duration, Instant};
#[derive(Default)]
pub struct EphemeralMap<K: Ord + Clone, V> {
@ -103,10 +103,28 @@ impl<K: Ord + Clone + Debug, V> EphemeralMap<K, V> {
self.get(key).is_some()
}
pub fn get_and_bump(&mut self, key: &K, from_now: Duration) -> Option<&V> {
if self.get_node(key).is_none() {
return None;
}
self.update_expiry(&key, Instant::now() + from_now);
self.get(key)
}
pub fn get(&self, key: &K) -> Option<&V> {
self.get_node(key).map(|x| &x.value)
}
pub fn get_and_bump_mut(&mut self, key: &K, from_now: Duration) -> Option<&mut V> {
if self.get_node(key).is_none() {
return None;
}
self.update_expiry(&key, Instant::now() + from_now);
self.get_mut(key)
}
pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
self.get_node_mut(key).map(|x| &mut x.value)
}

15
torment-daemon/Cargo.toml Normal file
View file

@ -0,0 +1,15 @@
[package]
name = "torment-daemon"
version = "0.1.0"
authors = ["eater <=@eater.me>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
torment-core = { path = "../torment-core" }
torment-peer = { path = "../torment-peer" }
torment-manager = { path = "../torment-manager" }
bytes = "0.5.6"
polling = "0.1.6"
rand = "0.7.3"

151
torment-daemon/src/main.rs Normal file
View file

@ -0,0 +1,151 @@
use bytes::BytesMut;
use polling::{Event, Poller};
use std::collections::HashMap;
use std::convert::TryInto;
use std::fs::File;
use std::io::{Read, Write};
use std::net::TcpStream;
use std::option::Option::Some;
use std::path::PathBuf;
use std::process::exit;
use std::result::Result::Ok;
use std::time::Duration;
use torment_core::infohash::v1::U160;
use torment_core::metainfo::Torrent;
use torment_core::peer_id;
use torment_manager::session_manager::SessionManager;
use torment_manager::torrent_manager::{TorrentManager, TorrentTarget};
use torment_peer::message::{Handshake, Message};
use torment_peer::PeerProtocol::TCP;
struct TcpState {
builder: BytesMut,
handshake: Option<Handshake>,
stream: TcpStream,
}
fn main() {
let mut session_manager = SessionManager::new();
let mut buffer = vec![];
File::open("/home/eater/Downloads/[Commie] Senyuu. - 23 [150B93D5].mkv.torrent")
.unwrap()
.read_to_end(&mut buffer)
.unwrap();
let torrent_manager = TorrentManager::from_torrent(
Torrent::from_bytes(
buffer,
Some("[Commie] Senyuu. - 23 [150B93D5].mkv".to_string()),
)
.unwrap(),
TorrentTarget {
path: PathBuf::from("/tmp"),
is_base_path: true,
},
None,
);
let peer_id = U160::from(peer_id(rand::random()));
let info_hash = torrent_manager.info_hash();
session_manager.add_torrent_manager(torrent_manager);
let mut tcp_stream_ktorrent = TcpStream::connect("127.0.0.1:6881").unwrap();
tcp_stream_ktorrent.set_nodelay(true).unwrap();
tcp_stream_ktorrent
.write_all(Handshake::new(peer_id, info_hash).to_bytes().as_ref())
.unwrap();
let mut tcp_stream_transmission = TcpStream::connect("192.168.188.100:51413").unwrap();
tcp_stream_transmission.set_nodelay(true).unwrap();
tcp_stream_transmission
.write_all(Handshake::new(peer_id, info_hash).to_bytes().as_ref())
.unwrap();
let mut buffer = vec![0u8; 4096 * 10];
let mut poller = Poller::new().unwrap();
poller.insert(&tcp_stream_ktorrent).unwrap();
poller.insert(&tcp_stream_transmission).unwrap();
poller.interest(&tcp_stream_ktorrent, Event::readable(0));
poller.interest(&tcp_stream_transmission, Event::readable(1));
let mut items = vec![
TcpState {
builder: Default::default(),
handshake: None,
stream: tcp_stream_ktorrent,
},
TcpState {
builder: Default::default(),
handshake: None,
stream: tcp_stream_transmission,
},
];
let mut peer_map: HashMap<(U160, U160), usize> = Default::default();
loop {
let mut events: Vec<Event> = vec![];
poller.wait(&mut events, Some(Duration::from_secs(10)));
for event in events {
println!("Event => {:?}", event);
if !event.readable {
continue;
}
let item = &mut items[event.key];
let packet = item.stream.read(&mut buffer).unwrap();
item.builder.extend_from_slice(&buffer[..packet]);
let handshake = if let Some(handshake) = &item.handshake {
handshake
} else {
if item.builder.len() >= 68 {
item.handshake =
Some(Handshake::from_bytes(item.builder.split_to(68).freeze()).unwrap());
let handshake = item.handshake.as_ref().unwrap();
println!("{} => {:?}", item.stream.peer_addr().unwrap(), handshake);
peer_map.insert((handshake.info_hash(), handshake.peer_id()), event.key);
session_manager.handshake(*handshake, item.stream.peer_addr().unwrap(), TCP);
handshake
} else {
continue;
}
};
while item.builder.len() >= 4 {
let len = u32::from_be_bytes(item.builder[..4].try_into().unwrap());
if len + 4 > item.builder.len() as u32 {
break;
}
if len == 0 {
item.builder.split_to(4);
continue;
}
let message_bytes = item.builder.split_to((4 + len) as usize).freeze();
let msg = Message::from_bytes(message_bytes.slice(4..)).unwrap();
println!("{} => {:?}", item.stream.peer_addr().unwrap(), msg);
if !session_manager.process(info_hash, handshake.peer_id(), msg) {
exit(1);
}
}
poller.interest(&item.stream, Event::readable(event.key));
}
while let Some(queued) = session_manager.next() {
if let Some(key) = peer_map.get(&(queued.info_hash, queued.peer_id)) {
let item = &mut items[*key];
println!("{} <= {:?}", queued.addr, queued.message);
item.stream
.write_all(&queued.message.to_length_prefixed_bytes())
.unwrap();
}
}
println!("=> Running house keeping");
session_manager.house_keeping();
}
}

View file

@ -6,17 +6,15 @@ use std::option::Option::Some;
use std::str::FromStr;
use torment_core::infohash::v1::U160;
use torment_core::infohash::InfoHashCapable;
use torment_core::{CompactContact, ContactInfo, ParsingError};
use torment_core::{compact_peer_id, CompactContact, ContactInfo, ParsingError};
pub use bendy::decoding::FromBencode;
pub use bendy::encoding::ToBencode;
use std::net::SocketAddr;
const VERSION: &'static [u8] = &[b'e', b't', 0, 0];
fn ver() -> Option<String> {
// None
Some(String::from_utf8_lossy(VERSION).to_string())
Some(String::from_utf8_lossy(&compact_peer_id()).to_string())
}
#[derive(Debug, Clone, Eq, PartialEq)]

View file

@ -0,0 +1,16 @@
[package]
name = "torment-manager"
version = "0.1.0"
authors = ["eater <=@eater.me>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
torment-core = { path = "../torment-core" }
torment-peer = { path = "../torment-peer" }
torment-storage = { path = "../torment-storage" }
bytes = "0.5.6"
url = "2.1.1"
lazy_static = "1.4.0"
ring = "0.16.15"

View file

@ -0,0 +1,3 @@
pub mod session_manager;
pub mod torrent_manager;
pub mod tracker_manager;

View file

@ -0,0 +1,86 @@
use crate::torrent_manager::TorrentManager;
use std::collections::{HashMap, VecDeque};
use std::net::SocketAddr;
use std::option::Option::Some;
use torment_core::infohash::v1::U160;
use torment_peer::message::{Handshake, Message};
use torment_peer::PeerProtocol;
pub struct QueuedMessage {
pub message: Message,
pub info_hash: U160,
pub peer_id: U160,
pub addr: SocketAddr,
}
pub struct SessionManager {
torrents: HashMap<U160, TorrentManager>,
peer_socket: HashMap<(U160, U160), SocketAddr>,
message_queue: VecDeque<QueuedMessage>,
}
impl SessionManager {
pub fn add_torrent_manager(&mut self, torrent: TorrentManager) {
self.torrents.insert(torrent.info_hash(), torrent);
}
fn remove_torrent_manager(&mut self, info_hash: U160) -> Option<TorrentManager> {
self.torrents.remove(&info_hash)
}
pub fn handshake(
&mut self,
handshake: Handshake,
addr: SocketAddr,
protocol: PeerProtocol,
) -> bool {
if let Some(torrent_manager) = self.torrents.get_mut(&handshake.info_hash()) {
self.peer_socket
.insert((handshake.peer_id(), handshake.info_hash()), addr);
torrent_manager.handshake(handshake, addr, protocol)
} else {
false
}
}
pub fn process(&mut self, torrent: U160, peer_id: U160, message: Message) -> bool {
if let Some(torrent_manager) = self.torrents.get_mut(&torrent) {
if torrent_manager.process(peer_id, message) {
while let Some((peer_id, message)) = torrent_manager.next() {
if let Some(addr) = self.peer_socket.get(&(peer_id, torrent)).copied() {
self.message_queue.push_back(QueuedMessage {
message,
addr,
peer_id,
info_hash: torrent,
});
}
}
true
} else {
false
}
} else {
false
}
}
pub fn new() -> SessionManager {
SessionManager {
torrents: Default::default(),
peer_socket: Default::default(),
message_queue: Default::default(),
}
}
pub fn next(&mut self) -> Option<QueuedMessage> {
self.message_queue.pop_front()
}
pub fn house_keeping(&mut self) {
for (_, torrent) in &mut self.torrents {
torrent.house_keeping();
}
}
}

View file

@ -0,0 +1,275 @@
use crate::tracker_manager::{TrackerId, TrackerManager};
use bytes::Bytes;
use ring::digest::{digest, SHA1_FOR_LEGACY_USE_ONLY};
use std::collections::{HashMap, HashSet, VecDeque};
use std::net::SocketAddr;
use std::ops::Index;
use std::option::Option::Some;
use std::path::PathBuf;
use torment_core::infohash::v1::U160;
use torment_core::metainfo::{MetaInfo, Torrent};
use torment_core::Bitfield;
use torment_peer::message::{Handshake, Message, PieceMessage, SelectionMessage};
use torment_peer::{Peer, PeerProtocol};
use torment_storage::{StorageMap, ToStorageMap};
#[derive(Debug)]
pub enum Source {
Torrent(TorrentSource),
MetaInfo(MetaInfoSource),
}
#[derive(Debug)]
pub struct MetaInfoSource {
info_hash: U160,
}
#[derive(Debug)]
pub struct TorrentSource {
file: Option<PathBuf>,
torrent: Option<Torrent>,
}
#[derive(Debug)]
pub struct TorrentTarget {
pub path: PathBuf,
pub is_base_path: bool,
}
#[derive(Debug)]
pub struct TorrentManager {
info_hash: U160,
bitfield: Bitfield,
trackers: Vec<Vec<TrackerId>>,
source: Source,
target: TorrentTarget,
peers: HashMap<U160, Peer>,
queue: VecDeque<(U160, Message)>,
storage_map: StorageMap,
}
pub const REQUEST_SIZE: usize = 16384;
impl TorrentManager {
pub fn info_hash(&self) -> U160 {
self.info_hash
}
pub fn from_torrent(
torrent: Torrent,
target: TorrentTarget,
tracker_manager: Option<&mut TrackerManager>,
) -> TorrentManager {
let trackers = tracker_manager
.map(|manager| {
torrent
.announce_list()
.iter()
.map(|tier_list| {
tier_list
.iter()
.filter_map(|tracker| manager.get_tracker_id(tracker))
.collect::<Vec<_>>()
})
.collect::<Vec<_>>()
})
.unwrap_or(vec![]);
TorrentManager {
info_hash: torrent.info_hash(),
bitfield: Bitfield::with_size(torrent.meta_info().pieces()),
trackers,
source: Source::Torrent(TorrentSource {
file: None,
torrent: Some(torrent.clone()),
}),
storage_map: torrent.to_storage_map(&target.path, target.is_base_path),
target,
peers: HashMap::new(),
queue: Default::default(),
}
}
pub fn handshake(
&mut self,
handshake: Handshake,
addr: SocketAddr,
protocol: PeerProtocol,
) -> bool {
if handshake.info_hash() != self.info_hash {
return false;
}
if let Some(peer) = self.peers.get(&handshake.peer_id()) {
if peer.addr() != addr {
return false;
}
}
let meta_info = self.meta_info();
let peer = Peer::new(addr, protocol, handshake, meta_info);
self.peers.insert(peer.id(), peer);
true
}
fn meta_info(&mut self) -> &MetaInfo {
if let Source::Torrent(torrent) = &self.source {
return torrent
.torrent
.as_ref()
.expect("No torrent loaded uh oh missing functionality")
.meta_info();
}
panic!("Can't resolve MetaInfo for torrent")
}
pub fn process(&mut self, peer_id: U160, message: Message) -> bool {
let mut queue = vec![];
let ok = if let Some(peer) = self.peers.get_mut(&peer_id) {
if peer.process(message) {
while let Some(message) = peer.next() {
self.queue.push_back((peer_id, message));
}
while let Some(piece) = peer.next_piece() {
if self.storage_map.has_piece(piece.index() as usize) {
continue;
}
if self
.storage_map
.write(
piece.index() as usize,
piece.offset() as usize,
piece.piece(),
)
.unwrap()
{
queue.push(piece.index() as usize);
}
}
while let Some(piece_request) = peer.next_request() {
if !self.bitfield.get(piece_request.index()) {
continue;
}
let mut buffer = vec![0u8; piece_request.length() as usize];
self.storage_map
.read(
piece_request.index() as usize,
piece_request.offset() as usize,
&mut buffer,
)
.unwrap();
self.queue.push_back((
peer_id,
Message::Piece(PieceMessage {
index: piece_request.index(),
offset: piece_request.offset(),
piece: Bytes::from(buffer),
}),
))
}
true
} else {
false
}
} else {
false
};
for have in queue {
let piece_hash = self.meta_info().hash(have);
let piece_data = self.storage_map.read_piece(have).unwrap();
let res = digest(&SHA1_FOR_LEGACY_USE_ONLY, &piece_data);
if piece_hash != res.as_ref() {
println!("=> Piece#{} failed verification", have);
self.storage_map.wipe_piece(have);
continue;
} else {
println!("=> Piece#{} verified", have);
}
self.bitfield.set(have as u32);
let keys = self.peers.keys().copied().collect::<Vec<_>>();
for key in keys {
self.queue.push_back((key, Message::Have(have as u32)));
if !self.peers[&key].has_piece(have as u32) && self.peers[&key].we_choked() {
self.peers.get_mut(&key).unwrap().set_we_choked(false);
self.queue.push_back((key, Message::Unchoke));
}
}
}
ok
}
pub fn next(&mut self) -> Option<(U160, Message)> {
self.queue.pop_front()
}
pub fn house_keeping(&mut self) {
let mut peers = self
.peers
.iter()
.filter_map(|(_, peer)| {
if peer.has()1
if peer.is_choked() {
None
} else {
Some(peer.id())
}
})
.collect::<HashSet<_>>();
let pieces = self.meta_info().pieces() as u32;
for i in 0u32..pieces {
if self.bitfield.get(i) {
continue;
}
let length = self.storage_map.get_piece_length(i as usize);
let mut offset = 0;
for peer in &peers.clone() {
if !self.peers[peer].has_piece(i) {
continue;
}
while offset < length && self.peers[peer].count_open_requests() < 25 {
if !self.storage_map.has_piece_bit(i as usize, offset) {
let request_length = if (offset + REQUEST_SIZE) > length {
length - offset
} else {
REQUEST_SIZE
};
let msg = SelectionMessage::new(i, offset as u32, request_length as u32);
if self.peers.get_mut(peer).unwrap().requested(msg) {
self.queue.push_back((*peer, Message::Request(msg)));
}
}
offset += REQUEST_SIZE;
}
if self.peers[peer].count_open_requests() >= 25 {
peers.remove(peer);
}
if offset >= length {
break;
}
}
}
}
}

View file

@ -0,0 +1,34 @@
use std::collections::HashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use url::Url;
pub struct TrackerManager {
counter: AtomicUsize,
trackers: HashMap<usize, Tracker>,
url_index: HashMap<String, usize>,
}
pub struct Tracker {
url: Url,
}
pub type TrackerId = usize;
impl TrackerManager {
pub fn get_tracker_id(&mut self, url: &Url) -> Option<TrackerId> {
// dht is not a tracker bye
if url.scheme() == "dht" {
return None;
}
let url_str = url.as_str();
if let Some(id) = self.url_index.get(url_str) {
return Some(*id);
}
let new_id = self.counter.fetch_add(1, Ordering::AcqRel);
self.url_index.insert(url_str.to_string(), new_id);
Some(new_id)
}
}

View file

@ -7,3 +7,5 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
torment-core = { path = "../torment-core" }
bytes = "0.5.6"

View file

@ -1,4 +1,170 @@
struct Peer {}
use crate::message::{Handshake, Message, PieceMessage, SelectionMessage};
use std::collections::{HashSet, VecDeque};
use std::net::SocketAddr;
use torment_core::infohash::v1::U160;
use torment_core::metainfo::MetaInfo;
use torment_core::Bitfield;
pub mod message;
#[derive(Copy, Clone, Debug)]
pub struct PeerState {
chocked: bool,
interested: bool,
}
impl PeerState {
const fn new() -> PeerState {
PeerState {
chocked: true,
interested: false,
}
}
}
impl Default for PeerState {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Copy, Clone)]
pub enum PeerProtocol {
TCP,
MuTP,
}
#[derive(Debug, Clone)]
pub struct Peer {
id: U160,
info_hash: U160,
address: SocketAddr,
protocol: PeerProtocol,
our_state: PeerState,
their_state: PeerState,
has: Bitfield,
request_state: HashSet<SelectionMessage>,
request_queue: VecDeque<SelectionMessage>,
requested_queue: HashSet<SelectionMessage>,
received_pieces: VecDeque<PieceMessage>,
queue: VecDeque<Message>,
}
impl Peer {
pub fn id(&self) -> U160 {
self.id
}
pub fn new(
address: SocketAddr,
protocol: PeerProtocol,
header: Handshake,
meta_info: &MetaInfo,
) -> Self {
Peer {
id: header.peer_id(),
info_hash: header.info_hash(),
address,
protocol,
our_state: PeerState::new(),
their_state: PeerState::new(),
has: Bitfield::with_size(meta_info.pieces()),
request_state: Default::default(),
request_queue: Default::default(),
requested_queue: Default::default(),
received_pieces: Default::default(),
queue: Default::default(),
}
}
pub fn has(&mut self) -> Bitfield {
self.has.clone()
}
pub fn process(&mut self, message: Message) -> bool {
match message {
Message::Choke => self.their_state.chocked = true,
Message::Unchoke => self.their_state.chocked = false,
Message::Interested => self.their_state.interested = true,
Message::NotInterested => self.their_state.interested = false,
Message::Have(nr) => self.has.set(nr),
Message::Bitfield(bitfield) => self.has.add(bitfield),
Message::Request(selection) => {
self.request_queue.push_back(selection);
self.request_state.insert(selection);
}
Message::Cancel(selection) => {
self.request_state.remove(&selection);
}
Message::Piece(piece) => {
self.requested_queue.remove(&SelectionMessage::new(
piece.index(),
piece.offset(),
piece.length(),
));
self.received_pieces.push_back(piece);
}
}
true
}
pub fn count_open_requests(&self) -> usize {
self.requested_queue.len()
}
pub fn is_choked(&self) -> bool {
self.their_state.chocked
}
pub fn is_interested(&self) -> bool {
self.their_state.interested
}
pub fn we_interested(&self) -> bool {
self.our_state.interested
}
pub fn we_choked(&self) -> bool {
self.our_state.chocked
}
pub fn set_we_choked(&mut self, choked: bool) {
self.our_state.chocked = choked;
}
pub fn set_we_interested(&mut self, interested: bool) {
self.our_state.interested = interested;
}
pub fn requested(&mut self, selection: SelectionMessage) -> bool {
self.requested_queue.insert(selection)
}
pub fn next_piece(&mut self) -> Option<PieceMessage> {
self.received_pieces.pop_front()
}
pub fn next_request(&mut self) -> Option<SelectionMessage> {
while let Some(item) = self.request_queue.pop_front() {
if self.request_state.remove(&item) {
return Some(item);
}
}
None
}
pub fn addr(&self) -> SocketAddr {
self.address
}
pub fn has_piece(&self, index: u32) -> bool {
self.has.get(index as u32)
}
pub fn next(&mut self) -> Option<Message> {
self.queue.pop_front()
}
}
#[cfg(test)]
mod tests {

292
torment-peer/src/message.rs Normal file
View file

@ -0,0 +1,292 @@
use bytes::{Bytes, BytesMut};
use std::convert::TryInto;
use std::error::Error;
use std::fmt::{Debug, Display, Formatter};
use torment_core::infohash::v1::U160;
use torment_core::infohash::InfoHashCapable;
use torment_core::Bitfield;
#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)]
pub enum Message {
Choke,
Unchoke,
Interested,
NotInterested,
Have(u32),
Bitfield(Bitfield),
Request(SelectionMessage),
Cancel(SelectionMessage),
Piece(PieceMessage),
}
#[derive(Clone, Debug)]
pub enum MessageParsingError {
TooShort,
UnknownType,
InvalidPrefix,
}
impl Error for MessageParsingError {}
impl Display for MessageParsingError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
impl Message {
pub fn from_bytes(bytes: Bytes) -> Result<Message, MessageParsingError> {
Ok(match bytes[0] {
0 => Message::Choke,
1 => Message::Unchoke,
2 => Message::Interested,
3 => Message::NotInterested,
4 => {
if bytes.len() < 5 {
return Err(MessageParsingError::TooShort);
}
Message::Have(u32::from_be_bytes(bytes[1..5].try_into().unwrap()))
}
5 => Message::Bitfield(Bitfield::new(bytes.slice(1..))),
6 | 8 => {
if bytes.len() < 13 {
return Err(MessageParsingError::TooShort);
}
let selection = SelectionMessage {
index: u32::from_be_bytes(bytes[1..5].try_into().unwrap()),
offset: u32::from_be_bytes(bytes[5..9].try_into().unwrap()),
length: u32::from_be_bytes(bytes[9..13].try_into().unwrap()),
};
if bytes[0] == 6 {
Message::Request(selection)
} else {
Message::Cancel(selection)
}
}
7 => {
if bytes.len() < 9 {
return Err(MessageParsingError::TooShort);
}
Message::Piece(PieceMessage {
index: u32::from_be_bytes(bytes[1..5].try_into().unwrap()),
offset: u32::from_be_bytes(bytes[5..9].try_into().unwrap()),
piece: bytes.slice(9..),
})
}
_ => return Err(MessageParsingError::UnknownType),
})
}
pub fn to_length_prefixed_bytes(&self) -> Vec<u8> {
let buffer = self.to_bytes();
let size = (buffer.len() as u32).to_be_bytes();
let mut map = Vec::with_capacity(buffer.len() + 4);
map.extend_from_slice(&size);
map.extend(buffer);
map
}
pub fn to_bytes(&self) -> Bytes {
match self {
Message::Choke => Bytes::from_static(&[0u8]),
Message::Unchoke => Bytes::from_static(&[1u8]),
Message::Interested => Bytes::from_static(&[2u8]),
Message::NotInterested => Bytes::from_static(&[3u8]),
Message::Have(piece) => {
let mut buffer = BytesMut::with_capacity(5);
buffer.extend(&[4]);
buffer.extend_from_slice(&piece.to_be_bytes());
buffer.freeze()
}
Message::Bitfield(bitfield) => {
let mut buffer = BytesMut::with_capacity(1 + bitfield.as_ref().len());
buffer.extend(&[5]);
buffer.extend_from_slice(bitfield.as_ref());
buffer.freeze()
}
Message::Request(request) => {
let mut buffer = BytesMut::with_capacity(13);
buffer.extend(&[6]);
buffer.extend_from_slice(&request.index.to_be_bytes());
buffer.extend_from_slice(&request.offset.to_be_bytes());
buffer.extend_from_slice(&request.length.to_be_bytes());
buffer.freeze()
}
Message::Piece(piece) => {
let mut buffer = BytesMut::with_capacity(9 + piece.piece.len());
buffer.extend(&[7]);
buffer.extend_from_slice(&piece.index.to_be_bytes());
buffer.extend_from_slice(&piece.offset.to_be_bytes());
buffer.extend_from_slice(piece.piece.as_ref());
buffer.freeze()
}
Message::Cancel(cancel) => {
let mut buffer = BytesMut::with_capacity(13);
buffer.extend(&[8]);
buffer.extend_from_slice(&cancel.index.to_be_bytes());
buffer.extend_from_slice(&cancel.offset.to_be_bytes());
buffer.extend_from_slice(&cancel.length.to_be_bytes());
buffer.freeze()
}
}
}
}
#[derive(Clone, Debug, Copy)]
pub struct Handshake {
reserved: [u8; 8],
info_hash: U160,
peer_id: U160,
}
impl Handshake {
pub fn from_bytes(bytes: Bytes) -> Result<Handshake, MessageParsingError> {
if bytes.len() < 68 {
return Err(MessageParsingError::TooShort);
}
if &bytes[0..20] != b"\x13BitTorrent protocol" {
return Err(MessageParsingError::InvalidPrefix);
}
Ok(Handshake {
reserved: bytes[20..28].try_into().unwrap(),
info_hash: U160::from_bytes(&bytes[28..48]).unwrap(),
peer_id: U160::from_bytes(&bytes[48..68]).unwrap(),
})
}
pub fn to_bytes(&self) -> [u8; 68] {
let mut header =
*b"\x13BitTorrent protocol\0\0\0\0\0\0\0\0AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA";
header[28..48].copy_from_slice(self.info_hash.to_bytes().as_ref());
header[48..68].copy_from_slice(self.peer_id.to_bytes().as_ref());
header
}
pub fn new(peer_id: U160, info_hash: U160) -> Handshake {
Handshake {
reserved: [0; 8],
info_hash,
peer_id,
}
}
pub fn peer_id(&self) -> U160 {
self.peer_id
}
pub fn info_hash(&self) -> U160 {
self.info_hash
}
}
#[derive(Clone, Debug, Hash, Copy, Ord, PartialOrd, Eq, PartialEq)]
pub struct SelectionMessage {
index: u32,
offset: u32,
length: u32,
}
impl SelectionMessage {
pub fn new(index: u32, offset: u32, length: u32) -> SelectionMessage {
SelectionMessage {
index,
offset,
length,
}
}
pub fn index(&self) -> u32 {
self.index
}
pub fn offset(&self) -> u32 {
self.offset
}
pub fn length(&self) -> u32 {
self.length
}
}
#[derive(Clone, Ord, PartialOrd, Eq, PartialEq)]
pub struct PieceMessage {
pub index: u32,
pub offset: u32,
pub piece: Bytes,
}
impl Debug for PieceMessage {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PieceMessage")
.field("index", &self.index)
.field("offset", &self.offset)
.field("piece", &self.piece.len())
.finish()
}
}
impl PieceMessage {
pub fn index(&self) -> u32 {
self.index
}
pub fn offset(&self) -> u32 {
self.offset
}
pub fn length(&self) -> u32 {
self.piece.len() as u32
}
pub fn piece(&self) -> Bytes {
self.piece.clone()
}
}
#[cfg(test)]
mod tests {
use crate::message::{Message, PieceMessage, SelectionMessage};
use bytes::Bytes;
use torment_core::Bitfield;
#[test]
fn round_trip() {
let msgs = [
Message::Choke,
Message::Unchoke,
Message::Interested,
Message::NotInterested,
Message::Have(42),
Message::Bitfield(Bitfield::with_size(4)),
Message::Request(SelectionMessage {
index: 69,
offset: 1337,
length: 42,
}),
Message::Piece(PieceMessage {
index: 69,
offset: 1337,
piece: Bytes::from_static(b"hewwo"),
}),
Message::Cancel(SelectionMessage {
index: 69,
offset: 1337,
length: 42,
}),
];
for msg in &msgs {
assert_eq!(msg, &Message::from_bytes(msg.to_bytes()).unwrap())
}
}
}

View file

@ -0,0 +1,13 @@
[package]
name = "torment-storage"
version = "0.1.0"
authors = ["eater <=@eater.me>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
torment-core = { path = "../torment-core" }
remem = "0.1.0"
bytes = "0.5.6"
lazy_static = "1.4.0"

348
torment-storage/src/lib.rs Normal file
View file

@ -0,0 +1,348 @@
use bytes::{Bytes, BytesMut};
use lazy_static::lazy_static;
use remem::{ItemGuard, Pool};
use std::borrow::Borrow;
use std::cmp::min;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::fmt::{Debug, Formatter};
use std::fs::{DirBuilder, File, OpenOptions};
use std::io;
use std::io::{Read, Seek, SeekFrom, Write};
use std::ops::{Add, Deref, Range};
use std::path::{Path, PathBuf, MAIN_SEPARATOR};
use std::rc::Rc;
use std::sync::Mutex;
use std::time::{Duration, Instant};
use torment_core::metainfo::{MetaInfo, MetaInfoObject, Torrent};
use torment_core::utils::EphemeralMap;
lazy_static! {
static ref MEMORY_POOL: Pool<Vec<u8>> = Pool::new(|| vec![0u8; 4194304]);
}
#[derive(Debug)]
pub struct StorageMap {
base_path: PathBuf,
piece_length: usize,
pieces: usize,
open_files: HashMap<usize, Rc<Mutex<File>>>,
buffer: HashMap<usize, StoragePiece>,
mapping: BTreeMap<usize, StorageMapping>,
size: usize,
}
pub struct StoragePiece {
index: usize,
pieces: usize,
ranges: HashSet<usize>,
buffer: ItemGuard<'static, Vec<u8>>,
}
impl StoragePiece {
fn is_complete(&self) -> bool {
self.pieces == self.ranges.len()
}
}
impl Debug for StoragePiece {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StoragePiece")
.field("index", &self.index)
.field("ranges", &self.ranges)
.finish()
}
}
impl StorageMap {
fn get_offsets(&self, offset: usize, length: usize) -> Vec<usize> {
let x = 0..1;
let end = offset + length;
let mut range = self.mapping.range(offset..);
let mut items = vec![];
while let Some((item_end, _)) = range.next() {
items.push(*item_end);
if *item_end > end {
break;
}
}
items
}
fn get_file(&mut self, item: &StorageMapping) -> io::Result<Rc<Mutex<File>>> {
if let Some(file) = self.open_files.get(&item.offset) {
return Ok(Rc::clone(file));
}
let path = item.path.join(&format!("{}", MAIN_SEPARATOR));
let file_path = self.base_path.join(path);
println!("Opening file: {:?}", file_path);
if let Some(parent) = file_path.parent() {
DirBuilder::new().recursive(true).create(parent)?;
}
let file = OpenOptions::new()
.write(true)
.read(true)
.create(true)
.open(file_path.as_path())?;
let rc = Rc::new(Mutex::new(file));
self.open_files.insert(item.offset, Rc::clone(&rc));
Ok(rc)
}
fn internal_write<T: AsRef<[u8]>>(&mut self, offset: usize, data: T) -> io::Result<()> {
let data = data.as_ref();
let mut written = 0;
for item_end in self.get_offsets(offset, data.len()) {
let item = self.mapping.get(&item_end).unwrap().clone();
let file = self.get_file(&item)?;
let mut file = file.lock().unwrap();
let curr_offset = (offset + written) - item.offset;
file.seek(SeekFrom::Start(curr_offset as u64))?;
let to_write = min(item.length - curr_offset, data.len() - written);
file.write_all(&data[written..written + to_write])?;
written += to_write;
}
assert_eq!(written, data.len(), "Failed to write all data");
Ok(())
}
fn internal_read<T: AsMut<[u8]>>(&mut self, offset: usize, mut data: T) -> io::Result<()> {
let data = data.as_mut();
let mut readed = 0;
for item_end in self.get_offsets(offset, data.len()) {
let item = self.mapping.get(&item_end).unwrap().clone();
let file = self.get_file(&item)?;
let mut file = file.lock().unwrap();
let curr_offset = (offset + readed) - item.offset;
file.seek(SeekFrom::Start(curr_offset as u64))?;
let to_read = min(item.length - curr_offset, data.len() - readed);
file.read_exact(&mut data[readed..readed + to_read])?;
readed += to_read;
}
assert_eq!(readed, data.len(), "Failed to read all data");
Ok(())
}
pub fn has_piece(&self, index: usize) -> bool {
if let Some(piece) = self.buffer.get(&index) {
piece.is_complete()
} else {
false
}
}
pub fn read_piece(&mut self, index: usize) -> io::Result<Vec<u8>> {
let mut bytes = vec![0; self.get_piece_length(index)];
self.read(index, 0, &mut bytes);
Ok(bytes)
}
pub fn read<T: AsMut<[u8]>>(
&mut self,
index: usize,
offset: usize,
mut data: T,
) -> io::Result<()> {
let data = data.as_mut();
self.internal_read((index * self.piece_length) + offset, data)
}
pub fn write<T: AsRef<[u8]>>(
&mut self,
index: usize,
offset: usize,
data: T,
) -> io::Result<bool> {
let data = data.as_ref();
let item = if let Some(item) = self.buffer.get_mut(&index) {
item
} else {
let request_size = 2usize.pow(14);
let piece_length = self.get_piece_length(index);
let pieces = (piece_length / request_size)
+ if piece_length % request_size > 0 {
1
} else {
0
};
self.buffer.insert(index, {
StoragePiece {
index,
ranges: Default::default(),
buffer: MEMORY_POOL.get(),
pieces,
}
});
self.buffer.get_mut(&index).unwrap()
};
item.buffer[offset..offset + data.len()].copy_from_slice(data);
item.ranges.insert(offset);
if item.is_complete() {
if let Some(item) = self.buffer.remove(&index) {
self.internal_write(
item.index * self.piece_length,
&item.buffer[0..self.get_piece_length(item.index)],
)?;
}
Ok(true)
} else {
Ok(false)
}
}
pub fn wipe_piece(&mut self, index: usize) {
self.buffer.remove(&index);
}
pub fn get_piece_length(&self, index: usize) -> usize {
if index + 1 == self.pieces {
let len = (self.size % self.piece_length);
if len == 0 {
self.piece_length
} else {
len
}
} else {
self.piece_length
}
}
pub fn has_piece_bit(&self, index: usize, offset: usize) -> bool {
self.buffer
.get(&index)
.map(|item| item.ranges.contains(&offset))
.unwrap_or(false)
}
pub fn house_keeping(&mut self) {
// self.open_files.clean()
}
pub fn size(&self) -> usize {
self.size
}
}
pub trait ToStorageMap {
fn to_storage_map<P: AsRef<Path>>(&self, path: P, is_base_path: bool) -> StorageMap;
}
impl ToStorageMap for Torrent {
fn to_storage_map<P: AsRef<Path>>(&self, path: P, is_base_path: bool) -> StorageMap {
let name = self.name().clone();
match self.meta_info().object() {
MetaInfoObject::File(size) => {
if is_base_path {
StorageMapBuilder::create(path, self.meta_info().piece_length())
.insert(vec![name.to_string()], *size)
.build()
} else {
let path = path.as_ref().to_path_buf();
let parent = path.parent().unwrap_or(if cfg!(target_os = "win") {
Path::new("C:\\")
} else {
Path::new("/")
});
StorageMapBuilder::create(parent, self.meta_info().piece_length())
.insert(
vec![path
.file_name()
.map(|os_str| os_str.to_string_lossy().to_string())
.unwrap_or_else(|| self.name().to_string())],
*size,
)
.build()
}
}
MetaInfoObject::Files(files) => {
let mut builder = if is_base_path {
StorageMapBuilder::create(path, self.meta_info().piece_length())
} else {
StorageMapBuilder::create(
path.as_ref().join(self.name()),
self.meta_info().piece_length(),
)
};
for file in files {
builder = builder.insert(file.path().to_vec(), file.length());
}
builder.build()
}
}
}
}
#[derive(Debug, Clone)]
pub struct StorageMapping {
path: Vec<String>,
length: usize,
offset: usize,
}
pub struct StorageMapBuilder {
base_path: PathBuf,
offset: usize,
piece_length: usize,
items: BTreeMap<usize, StorageMapping>,
}
impl StorageMapBuilder {
pub fn create<P: AsRef<Path>>(path: P, piece_length: usize) -> StorageMapBuilder {
StorageMapBuilder {
base_path: path.as_ref().to_path_buf(),
offset: 0,
piece_length,
items: BTreeMap::new(),
}
}
pub fn insert(mut self, path: Vec<String>, length: usize) -> Self {
let offset = self.offset;
self.offset += length;
self.items.insert(
self.offset - 1,
StorageMapping {
offset,
length,
path,
},
);
self
}
pub fn build(self) -> StorageMap {
StorageMap {
base_path: self.base_path,
piece_length: self.piece_length,
pieces: (self.offset / self.piece_length)
+ if self.offset % self.piece_length > 0 {
1
} else {
0
},
open_files: Default::default(),
buffer: Default::default(),
mapping: self.items,
size: self.offset,
}
}
}