Compare commits
1 commit
ef40af30f4
...
991e9a5844
Author | SHA1 | Date | |
---|---|---|---|
991e9a5844 |
25 changed files with 136346 additions and 21 deletions
100
Cargo.lock
generated
100
Cargo.lock
generated
|
@ -262,6 +262,7 @@ dependencies = [
|
|||
"textwrap",
|
||||
"unicode-width",
|
||||
"vec_map",
|
||||
"yaml-rust",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -273,6 +274,21 @@ dependencies = [
|
|||
"cache-padded",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-epoch"
|
||||
version = "0.8.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
"cfg-if",
|
||||
"crossbeam-utils",
|
||||
"lazy_static",
|
||||
"maybe-uninit",
|
||||
"memoffset",
|
||||
"scopeguard",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "crossbeam-utils"
|
||||
version = "0.7.2"
|
||||
|
@ -547,12 +563,27 @@ version = "0.1.8"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7ffc5c5338469d4d3ea17d269fa8ea3512ad247247c30bd2df69e68309ed0a08"
|
||||
|
||||
[[package]]
|
||||
name = "maybe-uninit"
|
||||
version = "2.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"
|
||||
|
||||
[[package]]
|
||||
name = "memchr"
|
||||
version = "2.3.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3728d817d99e5ac407411fa471ff9800a778d88a24685968b36824eaf4bee400"
|
||||
|
||||
[[package]]
|
||||
name = "memoffset"
|
||||
version = "0.5.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c198b026e1bbf08a937e94c6c60f9ec4a2267f5b0d2eec9c1b21b061ce2be55f"
|
||||
dependencies = [
|
||||
"autocfg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "miniz_oxide"
|
||||
version = "0.4.0"
|
||||
|
@ -666,12 +697,13 @@ checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
|
|||
|
||||
[[package]]
|
||||
name = "polling"
|
||||
version = "0.1.5"
|
||||
version = "0.1.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9e09dffb745feffca5be3dea51c02b7b368c4597ab0219a82acaf9799ab3e0d1"
|
||||
checksum = "d10bd4578b2ca39fa2581c058921cb50ad226a8999829ba595e1665bcfdaf4a8"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"libc",
|
||||
"log",
|
||||
"wepoll-sys-stjepang",
|
||||
"winapi",
|
||||
]
|
||||
|
@ -759,6 +791,15 @@ version = "0.1.57"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce"
|
||||
|
||||
[[package]]
|
||||
name = "remem"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ab4a09a8f306c4c15760db502c6c2aa2f5ea71edeb185d1b6fba293b066a9cff"
|
||||
dependencies = [
|
||||
"crossbeam-epoch",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ring"
|
||||
version = "0.16.15"
|
||||
|
@ -792,6 +833,12 @@ version = "1.0.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2"
|
||||
|
||||
[[package]]
|
||||
name = "scopeguard"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
|
||||
|
||||
[[package]]
|
||||
name = "serde"
|
||||
version = "1.0.115"
|
||||
|
@ -913,8 +960,12 @@ dependencies = [
|
|||
name = "torment-cli"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"clap",
|
||||
"rand",
|
||||
"torment-core",
|
||||
"torment-peer",
|
||||
"torment-storage",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -935,6 +986,18 @@ dependencies = [
|
|||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "torment-daemon"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"polling",
|
||||
"rand",
|
||||
"torment-core",
|
||||
"torment-manager",
|
||||
"torment-peer",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "torment-dht"
|
||||
version = "0.1.0"
|
||||
|
@ -962,9 +1025,36 @@ dependencies = [
|
|||
"torment-dht",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "torment-manager"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"lazy_static",
|
||||
"ring",
|
||||
"torment-core",
|
||||
"torment-peer",
|
||||
"torment-storage",
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "torment-peer"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"torment-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "torment-storage"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"lazy_static",
|
||||
"remem",
|
||||
"torment-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "unicode-bidi"
|
||||
|
@ -1143,3 +1233,9 @@ name = "winapi-x86_64-pc-windows-gnu"
|
|||
version = "0.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
|
||||
|
||||
[[package]]
|
||||
name = "yaml-rust"
|
||||
version = "0.3.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e66366e18dc58b46801afbf2ca7661a9f59cc8c5962c29892b6039b4f86fa992"
|
||||
|
|
|
@ -5,5 +5,8 @@ members = [
|
|||
"torment-dht",
|
||||
"torment-dht-node",
|
||||
"torment-bencode",
|
||||
"torment-peer"
|
||||
"torment-peer",
|
||||
"torment-storage",
|
||||
"torment-manager",
|
||||
"torment-daemon"
|
||||
]
|
|
@ -3,3 +3,7 @@
|
|||
A torrent client
|
||||
|
||||
It's written in Rust, that's the only selling point really
|
||||
|
||||
## Slightly important
|
||||
|
||||
Torment uses `eT` as peer id prefix, in both DHT and BitTorrent traffic, DHT uses `eT<major u8><minor u8>` and BitTorrent will use `eT<2 char ASCII major><2 char ASCII minor><2 char ASCII patch>`, I assume 96 bits will be enough to differentiate the 3 users that will be using Torment
|
128337
callgrind.out.19249
Normal file
128337
callgrind.out.19249
Normal file
File diff suppressed because it is too large
Load diff
6108
nice.svg
Normal file
6108
nice.svg
Normal file
File diff suppressed because it is too large
Load diff
After Width: | Height: | Size: 601 KiB |
|
@ -7,5 +7,9 @@ edition = "2018"
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
clap = "2.33.3"
|
||||
clap = {version = "2.33.3", features = ["yaml"]}
|
||||
torment-core = { path = "../torment-core" }
|
||||
torment-peer = { path = "../torment-peer" }
|
||||
torment-storage = { path = "../torment-storage" }
|
||||
rand = "0.7.3"
|
||||
bytes = "0.5.6"
|
20
torment-cli/cli.yml
Normal file
20
torment-cli/cli.yml
Normal file
|
@ -0,0 +1,20 @@
|
|||
name: torment-cli
|
||||
version: "0.1.0"
|
||||
about: "A torrent client"
|
||||
subcommands:
|
||||
- torrent:
|
||||
about: "Show details about a torrent"
|
||||
args:
|
||||
- file:
|
||||
required: true
|
||||
- download:
|
||||
about: "Show details about a torrent"
|
||||
args:
|
||||
- file:
|
||||
help: Set the torrent to download
|
||||
required: true
|
||||
- peer:
|
||||
short: p
|
||||
long: peer
|
||||
help: Set a custom peer
|
||||
takes_value: true
|
144
torment-cli/src/download.rs
Normal file
144
torment-cli/src/download.rs
Normal file
|
@ -0,0 +1,144 @@
|
|||
use bytes::{Bytes, BytesMut};
|
||||
use rand::random;
|
||||
use std::cmp::min;
|
||||
use std::convert::TryInto;
|
||||
use std::io::{Read, Write};
|
||||
use std::net::{SocketAddr, TcpStream};
|
||||
use std::process::exit;
|
||||
use torment_core::infohash::v1::U160;
|
||||
use torment_core::infohash::InfoHashCapable;
|
||||
use torment_core::metainfo::Torrent;
|
||||
use torment_core::peer_id;
|
||||
use torment_peer::message::Message::Request;
|
||||
use torment_peer::message::{Handshake, Message, SelectionMessage};
|
||||
use torment_peer::{Peer, PeerProtocol};
|
||||
use torment_storage::ToStorageMap;
|
||||
|
||||
pub fn download(torrent: Torrent, peers: Vec<SocketAddr>) {
|
||||
println!("Starting download of {}", torrent.name());
|
||||
if peers.len() == 0 {
|
||||
println!("No peers given, stopping");
|
||||
exit(1);
|
||||
}
|
||||
let peer = peers.first().unwrap();
|
||||
let mut stream = TcpStream::connect(peer).expect("Failed to open tcp connection to peer");
|
||||
stream.set_nodelay(true).expect("Failed setting nodelay");
|
||||
let id = U160::from(peer_id(random()));
|
||||
|
||||
let our_header = Handshake::new(id, torrent.info_hash());
|
||||
stream.write(&our_header.to_bytes());
|
||||
let mut buffer = [0u8; 4096];
|
||||
let mut done_header = false;
|
||||
let mut header = Handshake::new(U160::random(), U160::random());
|
||||
let mut peer = None;
|
||||
let mut message_buffer = BytesMut::with_capacity(4096 * 10);
|
||||
let mut storage = torrent.to_storage_map("/tmp", false);
|
||||
let mut asked = 0;
|
||||
let request_length = 2u32.pow(14);
|
||||
let piece_length = torrent.meta_info().piece_length() as u32;
|
||||
let mut progress = 0;
|
||||
let mut header_buffer = BytesMut::new();
|
||||
loop {
|
||||
let size = stream.read(&mut buffer).expect("Failed to receive data");
|
||||
if done_header == false {
|
||||
header_buffer.extend_from_slice(&buffer[..size]);
|
||||
if header_buffer.len() < 68 {
|
||||
continue;
|
||||
}
|
||||
|
||||
header = Handshake::from_bytes(Bytes::from(buffer[..size].to_vec()))
|
||||
.expect("Failed to parse header");
|
||||
if header.info_hash() != torrent.info_hash() {
|
||||
panic!(
|
||||
"Peer wants to serve different torrent ({}) :(",
|
||||
header.info_hash()
|
||||
);
|
||||
}
|
||||
|
||||
println!(
|
||||
"Peer[{}] connected",
|
||||
String::from_utf8_lossy(&header.peer_id().to_bytes())
|
||||
);
|
||||
peer = Some(Peer::new(
|
||||
stream.peer_addr().unwrap(),
|
||||
PeerProtocol::TCP,
|
||||
header,
|
||||
torrent.meta_info(),
|
||||
));
|
||||
done_header = true;
|
||||
// stream.write_all(&*Message::Request(SelectionMessage::new(0, 0, 1024)).to_bytes());
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut peer = peer.as_mut().unwrap();
|
||||
message_buffer.extend_from_slice(&buffer[..size]);
|
||||
while message_buffer.len() >= 4 {
|
||||
let length = u32::from_be_bytes(message_buffer[..4].try_into().unwrap());
|
||||
if length == 0 {
|
||||
message_buffer.split_to(4);
|
||||
continue;
|
||||
}
|
||||
|
||||
if message_buffer.len() < (length + 4) as usize {
|
||||
break;
|
||||
}
|
||||
|
||||
let message = message_buffer.split_to((4 + length) as usize).freeze();
|
||||
|
||||
let msg = Message::from_bytes(message.slice(4..)).expect("Failed parsing message");
|
||||
// println!("=> {:?}", msg);
|
||||
if msg == Message::Unchoke {
|
||||
stream.write_all(&Message::Interested.to_length_prefixed_bytes());
|
||||
}
|
||||
peer.process(msg);
|
||||
}
|
||||
|
||||
let mut messages = vec![];
|
||||
let was_above_90 = peer.count_open_requests() > 90;
|
||||
while !peer.is_choked()
|
||||
&& peer.count_open_requests() < 500
|
||||
&& !was_above_90
|
||||
&& asked < torrent.meta_info().pieces()
|
||||
{
|
||||
let curr_piece_length = if asked + 1 == torrent.meta_info().pieces() {
|
||||
piece_length - (storage.size() as u32 % piece_length)
|
||||
} else {
|
||||
piece_length
|
||||
};
|
||||
let size = min(request_length, curr_piece_length - progress);
|
||||
let sel = SelectionMessage::new(asked as u32, progress, size);
|
||||
peer.requested(sel);
|
||||
let msg = Message::Request(sel);
|
||||
// println!("<= {:?}", msg);
|
||||
messages.extend(msg.to_length_prefixed_bytes());
|
||||
progress += size;
|
||||
if progress >= curr_piece_length {
|
||||
asked += 1;
|
||||
progress = 0;
|
||||
}
|
||||
}
|
||||
|
||||
if messages.len() > 0 {
|
||||
stream
|
||||
.write_all(&messages)
|
||||
.expect("Failed writing to network socket");
|
||||
}
|
||||
|
||||
while let Some(piece) = peer.next_piece() {
|
||||
// println!("> {} {} {}", piece.index(), piece.offset(), piece.length());
|
||||
storage
|
||||
.write(
|
||||
piece.index() as usize,
|
||||
piece.offset() as usize,
|
||||
piece.piece(),
|
||||
)
|
||||
.expect("Failed writing piece to storage");
|
||||
}
|
||||
|
||||
if peer.count_open_requests() == 0 && asked > 1 {
|
||||
println!("done");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,18 +1,17 @@
|
|||
use clap::{App, Arg, SubCommand};
|
||||
use clap::load_yaml;
|
||||
use clap::App;
|
||||
use std::fs::File;
|
||||
use std::io::Read;
|
||||
use std::net::SocketAddr;
|
||||
use std::path::Path;
|
||||
use std::str::FromStr;
|
||||
use torment_core::metainfo::Torrent;
|
||||
|
||||
pub mod download;
|
||||
|
||||
fn main() {
|
||||
let matches = App::new("torment-cli")
|
||||
.version(env!("CARGO_PKG_VERSION"))
|
||||
.subcommand(
|
||||
SubCommand::with_name("torrent")
|
||||
.about("Shows details about torrent file")
|
||||
.arg(Arg::with_name("file").required(true)),
|
||||
)
|
||||
.get_matches();
|
||||
let yml = load_yaml!("../cli.yml");
|
||||
let matches = App::from_yaml(yml).get_matches();
|
||||
|
||||
match matches.subcommand() {
|
||||
("torrent", Some(subcmd)) => {
|
||||
|
@ -34,6 +33,38 @@ fn main() {
|
|||
}
|
||||
}
|
||||
|
||||
("download", Some(subcmd)) => {
|
||||
let file = subcmd.value_of("file").unwrap();
|
||||
let peers = subcmd
|
||||
.values_of("peer")
|
||||
.map(|values| {
|
||||
values
|
||||
.into_iter()
|
||||
.filter_map(|res| match SocketAddr::from_str(res) {
|
||||
Ok(addr) => Some(addr),
|
||||
Err(err) => {
|
||||
println!("Failed to parse {}: {}", res, err);
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
.unwrap_or(vec![]);
|
||||
let mut bytes: Vec<u8> = vec![];
|
||||
File::open(file).unwrap().read_to_end(&mut bytes).unwrap();
|
||||
match Torrent::from_bytes(
|
||||
&bytes,
|
||||
Path::new(file)
|
||||
.file_stem()
|
||||
.and_then(|file_name| file_name.to_str())
|
||||
.map(|str| str.to_string()),
|
||||
) {
|
||||
Ok(torrent) => download::download(torrent, peers),
|
||||
|
||||
Err(err) => println!("Error: {}", err),
|
||||
}
|
||||
}
|
||||
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,16 +1,42 @@
|
|||
#![allow(dead_code)]
|
||||
use crate::infohash::v1::U160;
|
||||
use crate::infohash::InfoHashCapable;
|
||||
use bytes::BytesMut;
|
||||
use serde::export::fmt::Debug;
|
||||
use serde_derive::{Deserialize, Serialize};
|
||||
use std::cmp::min;
|
||||
use std::convert::TryInto;
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
|
||||
use std::str::FromStr;
|
||||
|
||||
pub mod infohash;
|
||||
pub mod ip;
|
||||
pub mod metainfo;
|
||||
pub mod utils;
|
||||
|
||||
pub fn peer_id(input: [u8; 12]) -> [u8; 20] {
|
||||
let peer_id = format!(
|
||||
"eT{:0>2}{:0>2}{:0>2}AAAAAAAAAAAA",
|
||||
env!("CARGO_PKG_VERSION_MAJOR"),
|
||||
env!("CARGO_PKG_VERSION_MINOR"),
|
||||
env!("CARGO_PKG_VERSION_PATCH")
|
||||
);
|
||||
|
||||
let mut peer_id: [u8; 20] = peer_id.as_bytes().try_into().unwrap();
|
||||
&peer_id[8..20].copy_from_slice(&input);
|
||||
peer_id
|
||||
}
|
||||
|
||||
pub fn compact_peer_id() -> [u8; 4] {
|
||||
[
|
||||
b'e',
|
||||
b'T',
|
||||
u8::from_str(env!("CARGO_PKG_VERSION_MAJOR")).unwrap(),
|
||||
u8::from_str(env!("CARGO_PKG_VERSION_MINOR")).unwrap(),
|
||||
]
|
||||
}
|
||||
|
||||
pub trait CompactContact: Sized {
|
||||
fn to_compact_contact(&self) -> Vec<u8>;
|
||||
fn from_compact_contact<T: AsRef<[u8]>>(input: T) -> Result<Self, ParsingError>;
|
||||
|
@ -94,6 +120,66 @@ impl ContactInfo {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Ord, PartialOrd, Eq, PartialEq)]
|
||||
pub struct Bitfield(BytesMut);
|
||||
|
||||
impl Bitfield {
|
||||
pub fn new<T: AsRef<[u8]>>(field: T) -> Bitfield {
|
||||
Bitfield(BytesMut::from(field.as_ref()))
|
||||
}
|
||||
|
||||
pub fn with_size(size: usize) -> Bitfield {
|
||||
Bitfield({
|
||||
let mut field = BytesMut::new();
|
||||
field.resize((size / 8) + if size % 8 > 0 { 1 } else { 0 }, 0);
|
||||
field
|
||||
})
|
||||
}
|
||||
|
||||
pub fn set(&mut self, index: u32) {
|
||||
let byte_index = index / 8;
|
||||
let bitmask = 1 << (7 - index % 8);
|
||||
self.0[byte_index as usize] |= bitmask;
|
||||
}
|
||||
|
||||
pub fn unset(&mut self, index: u32) {
|
||||
let byte_index = index / 8;
|
||||
let bitmask = 1 << (8 - index % 8);
|
||||
self.0[byte_index as usize] &= !bitmask;
|
||||
}
|
||||
|
||||
pub fn get(&self, index: u32) -> bool {
|
||||
let byte_index = index / 8;
|
||||
let bitmask = 1 << (7 - (index % 8));
|
||||
(self.0[byte_index as usize] & bitmask) > 0
|
||||
}
|
||||
|
||||
pub fn add<T: AsRef<[u8]>>(&mut self, field: T) {
|
||||
let field = field.as_ref();
|
||||
let max = min(self.0.len(), field.len());
|
||||
for i in 0..max {
|
||||
self.0[i] |= field[i]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for Bitfield {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "Bitfield(")?;
|
||||
for byte in &self.0 {
|
||||
write!(f, "{:b}", byte)?;
|
||||
}
|
||||
|
||||
write!(f, ")")
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRef<[u8]> for Bitfield {
|
||||
fn as_ref(&self) -> &[u8] {
|
||||
self.0.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct PeerStorage {}
|
||||
|
||||
|
@ -114,3 +200,30 @@ impl PeerStorage {
|
|||
vec![]
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::{compact_peer_id, peer_id, Bitfield};
|
||||
use rand::random;
|
||||
|
||||
#[test]
|
||||
fn test_peer_id() {
|
||||
let x = peer_id(random());
|
||||
assert_eq!(String::from_utf8_lossy(&x[0..8]), "eT000100");
|
||||
assert_eq!(&compact_peer_id(), b"eT\x00\x01");
|
||||
}
|
||||
|
||||
fn test_bitfield() {
|
||||
let mut field = Bitfield::with_size(3);
|
||||
field.set(0);
|
||||
assert!(field.get(0));
|
||||
field.unset(0);
|
||||
assert_eq!(false, field.get(0));
|
||||
field.set(2);
|
||||
|
||||
let other_field = Bitfield::with_size(3);
|
||||
|
||||
field.add(other_field);
|
||||
assert_eq!(true, field.get(2));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@ impl Display for MetaInfoParsingError {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Torrent {
|
||||
name: String,
|
||||
announce_list: Vec<HashSet<Url>>,
|
||||
|
@ -116,8 +117,25 @@ impl Torrent {
|
|||
bencoded,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
|
||||
pub fn info_hash(&self) -> U160 {
|
||||
self.info.info_hash()
|
||||
}
|
||||
|
||||
pub fn meta_info(&self) -> &MetaInfo {
|
||||
&self.info
|
||||
}
|
||||
|
||||
pub fn announce_list(&self) -> &Vec<HashSet<Url>> {
|
||||
self.announce_list.as_ref()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct MetaInfo {
|
||||
info_hash: U160,
|
||||
bencoded: BencodeValue,
|
||||
|
@ -126,6 +144,28 @@ pub struct MetaInfo {
|
|||
object: MetaInfoObject,
|
||||
}
|
||||
|
||||
impl MetaInfo {
|
||||
pub fn info_hash(&self) -> U160 {
|
||||
self.info_hash
|
||||
}
|
||||
|
||||
pub fn piece_length(&self) -> usize {
|
||||
self.piece_length
|
||||
}
|
||||
|
||||
pub fn pieces(&self) -> usize {
|
||||
self.pieces.len() / 20
|
||||
}
|
||||
|
||||
pub fn hash(&self, index: usize) -> Bytes {
|
||||
self.pieces.slice(index * 20..(index + 1) * 20)
|
||||
}
|
||||
|
||||
pub fn object(&self) -> &MetaInfoObject {
|
||||
&self.object
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum MetaInfoObject {
|
||||
Files(Vec<MetaFile>),
|
||||
|
@ -233,4 +273,12 @@ impl MetaFile {
|
|||
.ok_or(MetaInfoParsingError::WrongType)?,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn path(&self) -> &[String] {
|
||||
&self.path
|
||||
}
|
||||
|
||||
pub fn length(&self) -> usize {
|
||||
self.length
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::time::Instant;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct EphemeralMap<K: Ord + Clone, V> {
|
||||
|
@ -103,10 +103,28 @@ impl<K: Ord + Clone + Debug, V> EphemeralMap<K, V> {
|
|||
self.get(key).is_some()
|
||||
}
|
||||
|
||||
pub fn get_and_bump(&mut self, key: &K, from_now: Duration) -> Option<&V> {
|
||||
if self.get_node(key).is_none() {
|
||||
return None;
|
||||
}
|
||||
|
||||
self.update_expiry(&key, Instant::now() + from_now);
|
||||
self.get(key)
|
||||
}
|
||||
|
||||
pub fn get(&self, key: &K) -> Option<&V> {
|
||||
self.get_node(key).map(|x| &x.value)
|
||||
}
|
||||
|
||||
pub fn get_and_bump_mut(&mut self, key: &K, from_now: Duration) -> Option<&mut V> {
|
||||
if self.get_node(key).is_none() {
|
||||
return None;
|
||||
}
|
||||
|
||||
self.update_expiry(&key, Instant::now() + from_now);
|
||||
self.get_mut(key)
|
||||
}
|
||||
|
||||
pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
|
||||
self.get_node_mut(key).map(|x| &mut x.value)
|
||||
}
|
||||
|
|
15
torment-daemon/Cargo.toml
Normal file
15
torment-daemon/Cargo.toml
Normal file
|
@ -0,0 +1,15 @@
|
|||
[package]
|
||||
name = "torment-daemon"
|
||||
version = "0.1.0"
|
||||
authors = ["eater <=@eater.me>"]
|
||||
edition = "2018"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
torment-core = { path = "../torment-core" }
|
||||
torment-peer = { path = "../torment-peer" }
|
||||
torment-manager = { path = "../torment-manager" }
|
||||
bytes = "0.5.6"
|
||||
polling = "0.1.6"
|
||||
rand = "0.7.3"
|
151
torment-daemon/src/main.rs
Normal file
151
torment-daemon/src/main.rs
Normal file
|
@ -0,0 +1,151 @@
|
|||
use bytes::BytesMut;
|
||||
use polling::{Event, Poller};
|
||||
use std::collections::HashMap;
|
||||
use std::convert::TryInto;
|
||||
use std::fs::File;
|
||||
use std::io::{Read, Write};
|
||||
use std::net::TcpStream;
|
||||
use std::option::Option::Some;
|
||||
use std::path::PathBuf;
|
||||
use std::process::exit;
|
||||
use std::result::Result::Ok;
|
||||
use std::time::Duration;
|
||||
use torment_core::infohash::v1::U160;
|
||||
use torment_core::metainfo::Torrent;
|
||||
use torment_core::peer_id;
|
||||
use torment_manager::session_manager::SessionManager;
|
||||
use torment_manager::torrent_manager::{TorrentManager, TorrentTarget};
|
||||
use torment_peer::message::{Handshake, Message};
|
||||
use torment_peer::PeerProtocol::TCP;
|
||||
|
||||
struct TcpState {
|
||||
builder: BytesMut,
|
||||
handshake: Option<Handshake>,
|
||||
stream: TcpStream,
|
||||
}
|
||||
|
||||
fn main() {
|
||||
let mut session_manager = SessionManager::new();
|
||||
|
||||
let mut buffer = vec![];
|
||||
File::open("/home/eater/Downloads/[Commie] Senyuu. - 23 [150B93D5].mkv.torrent")
|
||||
.unwrap()
|
||||
.read_to_end(&mut buffer)
|
||||
.unwrap();
|
||||
let torrent_manager = TorrentManager::from_torrent(
|
||||
Torrent::from_bytes(
|
||||
buffer,
|
||||
Some("[Commie] Senyuu. - 23 [150B93D5].mkv".to_string()),
|
||||
)
|
||||
.unwrap(),
|
||||
TorrentTarget {
|
||||
path: PathBuf::from("/tmp"),
|
||||
is_base_path: true,
|
||||
},
|
||||
None,
|
||||
);
|
||||
|
||||
let peer_id = U160::from(peer_id(rand::random()));
|
||||
let info_hash = torrent_manager.info_hash();
|
||||
session_manager.add_torrent_manager(torrent_manager);
|
||||
|
||||
let mut tcp_stream_ktorrent = TcpStream::connect("127.0.0.1:6881").unwrap();
|
||||
tcp_stream_ktorrent.set_nodelay(true).unwrap();
|
||||
tcp_stream_ktorrent
|
||||
.write_all(Handshake::new(peer_id, info_hash).to_bytes().as_ref())
|
||||
.unwrap();
|
||||
|
||||
let mut tcp_stream_transmission = TcpStream::connect("192.168.188.100:51413").unwrap();
|
||||
tcp_stream_transmission.set_nodelay(true).unwrap();
|
||||
tcp_stream_transmission
|
||||
.write_all(Handshake::new(peer_id, info_hash).to_bytes().as_ref())
|
||||
.unwrap();
|
||||
|
||||
let mut buffer = vec![0u8; 4096 * 10];
|
||||
let mut poller = Poller::new().unwrap();
|
||||
poller.insert(&tcp_stream_ktorrent).unwrap();
|
||||
poller.insert(&tcp_stream_transmission).unwrap();
|
||||
poller.interest(&tcp_stream_ktorrent, Event::readable(0));
|
||||
poller.interest(&tcp_stream_transmission, Event::readable(1));
|
||||
|
||||
let mut items = vec![
|
||||
TcpState {
|
||||
builder: Default::default(),
|
||||
handshake: None,
|
||||
stream: tcp_stream_ktorrent,
|
||||
},
|
||||
TcpState {
|
||||
builder: Default::default(),
|
||||
handshake: None,
|
||||
stream: tcp_stream_transmission,
|
||||
},
|
||||
];
|
||||
|
||||
let mut peer_map: HashMap<(U160, U160), usize> = Default::default();
|
||||
|
||||
loop {
|
||||
let mut events: Vec<Event> = vec![];
|
||||
poller.wait(&mut events, Some(Duration::from_secs(10)));
|
||||
|
||||
for event in events {
|
||||
println!("Event => {:?}", event);
|
||||
if !event.readable {
|
||||
continue;
|
||||
}
|
||||
|
||||
let item = &mut items[event.key];
|
||||
let packet = item.stream.read(&mut buffer).unwrap();
|
||||
item.builder.extend_from_slice(&buffer[..packet]);
|
||||
let handshake = if let Some(handshake) = &item.handshake {
|
||||
handshake
|
||||
} else {
|
||||
if item.builder.len() >= 68 {
|
||||
item.handshake =
|
||||
Some(Handshake::from_bytes(item.builder.split_to(68).freeze()).unwrap());
|
||||
let handshake = item.handshake.as_ref().unwrap();
|
||||
println!("{} => {:?}", item.stream.peer_addr().unwrap(), handshake);
|
||||
peer_map.insert((handshake.info_hash(), handshake.peer_id()), event.key);
|
||||
session_manager.handshake(*handshake, item.stream.peer_addr().unwrap(), TCP);
|
||||
handshake
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
while item.builder.len() >= 4 {
|
||||
let len = u32::from_be_bytes(item.builder[..4].try_into().unwrap());
|
||||
if len + 4 > item.builder.len() as u32 {
|
||||
break;
|
||||
}
|
||||
|
||||
if len == 0 {
|
||||
item.builder.split_to(4);
|
||||
continue;
|
||||
}
|
||||
|
||||
let message_bytes = item.builder.split_to((4 + len) as usize).freeze();
|
||||
let msg = Message::from_bytes(message_bytes.slice(4..)).unwrap();
|
||||
println!("{} => {:?}", item.stream.peer_addr().unwrap(), msg);
|
||||
if !session_manager.process(info_hash, handshake.peer_id(), msg) {
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
|
||||
poller.interest(&item.stream, Event::readable(event.key));
|
||||
}
|
||||
|
||||
while let Some(queued) = session_manager.next() {
|
||||
if let Some(key) = peer_map.get(&(queued.info_hash, queued.peer_id)) {
|
||||
let item = &mut items[*key];
|
||||
println!("{} <= {:?}", queued.addr, queued.message);
|
||||
|
||||
item.stream
|
||||
.write_all(&queued.message.to_length_prefixed_bytes())
|
||||
.unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
println!("=> Running house keeping");
|
||||
session_manager.house_keeping();
|
||||
}
|
||||
}
|
|
@ -6,17 +6,15 @@ use std::option::Option::Some;
|
|||
use std::str::FromStr;
|
||||
use torment_core::infohash::v1::U160;
|
||||
use torment_core::infohash::InfoHashCapable;
|
||||
use torment_core::{CompactContact, ContactInfo, ParsingError};
|
||||
use torment_core::{compact_peer_id, CompactContact, ContactInfo, ParsingError};
|
||||
|
||||
pub use bendy::decoding::FromBencode;
|
||||
pub use bendy::encoding::ToBencode;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
const VERSION: &'static [u8] = &[b'e', b't', 0, 0];
|
||||
|
||||
fn ver() -> Option<String> {
|
||||
// None
|
||||
Some(String::from_utf8_lossy(VERSION).to_string())
|
||||
Some(String::from_utf8_lossy(&compact_peer_id()).to_string())
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
|
|
16
torment-manager/Cargo.toml
Normal file
16
torment-manager/Cargo.toml
Normal file
|
@ -0,0 +1,16 @@
|
|||
[package]
|
||||
name = "torment-manager"
|
||||
version = "0.1.0"
|
||||
authors = ["eater <=@eater.me>"]
|
||||
edition = "2018"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
torment-core = { path = "../torment-core" }
|
||||
torment-peer = { path = "../torment-peer" }
|
||||
torment-storage = { path = "../torment-storage" }
|
||||
bytes = "0.5.6"
|
||||
url = "2.1.1"
|
||||
lazy_static = "1.4.0"
|
||||
ring = "0.16.15"
|
3
torment-manager/src/lib.rs
Normal file
3
torment-manager/src/lib.rs
Normal file
|
@ -0,0 +1,3 @@
|
|||
pub mod session_manager;
|
||||
pub mod torrent_manager;
|
||||
pub mod tracker_manager;
|
86
torment-manager/src/session_manager.rs
Normal file
86
torment-manager/src/session_manager.rs
Normal file
|
@ -0,0 +1,86 @@
|
|||
use crate::torrent_manager::TorrentManager;
|
||||
use std::collections::{HashMap, VecDeque};
|
||||
use std::net::SocketAddr;
|
||||
use std::option::Option::Some;
|
||||
use torment_core::infohash::v1::U160;
|
||||
use torment_peer::message::{Handshake, Message};
|
||||
use torment_peer::PeerProtocol;
|
||||
|
||||
pub struct QueuedMessage {
|
||||
pub message: Message,
|
||||
pub info_hash: U160,
|
||||
pub peer_id: U160,
|
||||
pub addr: SocketAddr,
|
||||
}
|
||||
|
||||
pub struct SessionManager {
|
||||
torrents: HashMap<U160, TorrentManager>,
|
||||
peer_socket: HashMap<(U160, U160), SocketAddr>,
|
||||
message_queue: VecDeque<QueuedMessage>,
|
||||
}
|
||||
|
||||
impl SessionManager {
|
||||
pub fn add_torrent_manager(&mut self, torrent: TorrentManager) {
|
||||
self.torrents.insert(torrent.info_hash(), torrent);
|
||||
}
|
||||
|
||||
fn remove_torrent_manager(&mut self, info_hash: U160) -> Option<TorrentManager> {
|
||||
self.torrents.remove(&info_hash)
|
||||
}
|
||||
|
||||
pub fn handshake(
|
||||
&mut self,
|
||||
handshake: Handshake,
|
||||
addr: SocketAddr,
|
||||
protocol: PeerProtocol,
|
||||
) -> bool {
|
||||
if let Some(torrent_manager) = self.torrents.get_mut(&handshake.info_hash()) {
|
||||
self.peer_socket
|
||||
.insert((handshake.peer_id(), handshake.info_hash()), addr);
|
||||
torrent_manager.handshake(handshake, addr, protocol)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn process(&mut self, torrent: U160, peer_id: U160, message: Message) -> bool {
|
||||
if let Some(torrent_manager) = self.torrents.get_mut(&torrent) {
|
||||
if torrent_manager.process(peer_id, message) {
|
||||
while let Some((peer_id, message)) = torrent_manager.next() {
|
||||
if let Some(addr) = self.peer_socket.get(&(peer_id, torrent)).copied() {
|
||||
self.message_queue.push_back(QueuedMessage {
|
||||
message,
|
||||
addr,
|
||||
peer_id,
|
||||
info_hash: torrent,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new() -> SessionManager {
|
||||
SessionManager {
|
||||
torrents: Default::default(),
|
||||
peer_socket: Default::default(),
|
||||
message_queue: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn next(&mut self) -> Option<QueuedMessage> {
|
||||
self.message_queue.pop_front()
|
||||
}
|
||||
|
||||
pub fn house_keeping(&mut self) {
|
||||
for (_, torrent) in &mut self.torrents {
|
||||
torrent.house_keeping();
|
||||
}
|
||||
}
|
||||
}
|
275
torment-manager/src/torrent_manager.rs
Normal file
275
torment-manager/src/torrent_manager.rs
Normal file
|
@ -0,0 +1,275 @@
|
|||
use crate::tracker_manager::{TrackerId, TrackerManager};
|
||||
use bytes::Bytes;
|
||||
use ring::digest::{digest, SHA1_FOR_LEGACY_USE_ONLY};
|
||||
use std::collections::{HashMap, HashSet, VecDeque};
|
||||
use std::net::SocketAddr;
|
||||
use std::ops::Index;
|
||||
use std::option::Option::Some;
|
||||
use std::path::PathBuf;
|
||||
use torment_core::infohash::v1::U160;
|
||||
use torment_core::metainfo::{MetaInfo, Torrent};
|
||||
use torment_core::Bitfield;
|
||||
use torment_peer::message::{Handshake, Message, PieceMessage, SelectionMessage};
|
||||
use torment_peer::{Peer, PeerProtocol};
|
||||
use torment_storage::{StorageMap, ToStorageMap};
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum Source {
|
||||
Torrent(TorrentSource),
|
||||
MetaInfo(MetaInfoSource),
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MetaInfoSource {
|
||||
info_hash: U160,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct TorrentSource {
|
||||
file: Option<PathBuf>,
|
||||
torrent: Option<Torrent>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct TorrentTarget {
|
||||
pub path: PathBuf,
|
||||
pub is_base_path: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct TorrentManager {
|
||||
info_hash: U160,
|
||||
bitfield: Bitfield,
|
||||
trackers: Vec<Vec<TrackerId>>,
|
||||
source: Source,
|
||||
target: TorrentTarget,
|
||||
peers: HashMap<U160, Peer>,
|
||||
queue: VecDeque<(U160, Message)>,
|
||||
storage_map: StorageMap,
|
||||
}
|
||||
|
||||
pub const REQUEST_SIZE: usize = 16384;
|
||||
|
||||
impl TorrentManager {
|
||||
pub fn info_hash(&self) -> U160 {
|
||||
self.info_hash
|
||||
}
|
||||
|
||||
pub fn from_torrent(
|
||||
torrent: Torrent,
|
||||
target: TorrentTarget,
|
||||
tracker_manager: Option<&mut TrackerManager>,
|
||||
) -> TorrentManager {
|
||||
let trackers = tracker_manager
|
||||
.map(|manager| {
|
||||
torrent
|
||||
.announce_list()
|
||||
.iter()
|
||||
.map(|tier_list| {
|
||||
tier_list
|
||||
.iter()
|
||||
.filter_map(|tracker| manager.get_tracker_id(tracker))
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
})
|
||||
.unwrap_or(vec![]);
|
||||
|
||||
TorrentManager {
|
||||
info_hash: torrent.info_hash(),
|
||||
bitfield: Bitfield::with_size(torrent.meta_info().pieces()),
|
||||
trackers,
|
||||
source: Source::Torrent(TorrentSource {
|
||||
file: None,
|
||||
torrent: Some(torrent.clone()),
|
||||
}),
|
||||
storage_map: torrent.to_storage_map(&target.path, target.is_base_path),
|
||||
target,
|
||||
peers: HashMap::new(),
|
||||
queue: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handshake(
|
||||
&mut self,
|
||||
handshake: Handshake,
|
||||
addr: SocketAddr,
|
||||
protocol: PeerProtocol,
|
||||
) -> bool {
|
||||
if handshake.info_hash() != self.info_hash {
|
||||
return false;
|
||||
}
|
||||
|
||||
if let Some(peer) = self.peers.get(&handshake.peer_id()) {
|
||||
if peer.addr() != addr {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
let meta_info = self.meta_info();
|
||||
let peer = Peer::new(addr, protocol, handshake, meta_info);
|
||||
self.peers.insert(peer.id(), peer);
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
fn meta_info(&mut self) -> &MetaInfo {
|
||||
if let Source::Torrent(torrent) = &self.source {
|
||||
return torrent
|
||||
.torrent
|
||||
.as_ref()
|
||||
.expect("No torrent loaded uh oh missing functionality")
|
||||
.meta_info();
|
||||
}
|
||||
|
||||
panic!("Can't resolve MetaInfo for torrent")
|
||||
}
|
||||
|
||||
pub fn process(&mut self, peer_id: U160, message: Message) -> bool {
|
||||
let mut queue = vec![];
|
||||
let ok = if let Some(peer) = self.peers.get_mut(&peer_id) {
|
||||
if peer.process(message) {
|
||||
while let Some(message) = peer.next() {
|
||||
self.queue.push_back((peer_id, message));
|
||||
}
|
||||
|
||||
while let Some(piece) = peer.next_piece() {
|
||||
if self.storage_map.has_piece(piece.index() as usize) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if self
|
||||
.storage_map
|
||||
.write(
|
||||
piece.index() as usize,
|
||||
piece.offset() as usize,
|
||||
piece.piece(),
|
||||
)
|
||||
.unwrap()
|
||||
{
|
||||
queue.push(piece.index() as usize);
|
||||
}
|
||||
}
|
||||
|
||||
while let Some(piece_request) = peer.next_request() {
|
||||
if !self.bitfield.get(piece_request.index()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut buffer = vec![0u8; piece_request.length() as usize];
|
||||
self.storage_map
|
||||
.read(
|
||||
piece_request.index() as usize,
|
||||
piece_request.offset() as usize,
|
||||
&mut buffer,
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
self.queue.push_back((
|
||||
peer_id,
|
||||
Message::Piece(PieceMessage {
|
||||
index: piece_request.index(),
|
||||
offset: piece_request.offset(),
|
||||
piece: Bytes::from(buffer),
|
||||
}),
|
||||
))
|
||||
}
|
||||
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
for have in queue {
|
||||
let piece_hash = self.meta_info().hash(have);
|
||||
let piece_data = self.storage_map.read_piece(have).unwrap();
|
||||
let res = digest(&SHA1_FOR_LEGACY_USE_ONLY, &piece_data);
|
||||
if piece_hash != res.as_ref() {
|
||||
println!("=> Piece#{} failed verification", have);
|
||||
self.storage_map.wipe_piece(have);
|
||||
continue;
|
||||
} else {
|
||||
println!("=> Piece#{} verified", have);
|
||||
}
|
||||
|
||||
self.bitfield.set(have as u32);
|
||||
|
||||
let keys = self.peers.keys().copied().collect::<Vec<_>>();
|
||||
for key in keys {
|
||||
self.queue.push_back((key, Message::Have(have as u32)));
|
||||
|
||||
if !self.peers[&key].has_piece(have as u32) && self.peers[&key].we_choked() {
|
||||
self.peers.get_mut(&key).unwrap().set_we_choked(false);
|
||||
self.queue.push_back((key, Message::Unchoke));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ok
|
||||
}
|
||||
|
||||
pub fn next(&mut self) -> Option<(U160, Message)> {
|
||||
self.queue.pop_front()
|
||||
}
|
||||
|
||||
pub fn house_keeping(&mut self) {
|
||||
let mut peers = self
|
||||
.peers
|
||||
.iter()
|
||||
.filter_map(|(_, peer)| {
|
||||
if peer.has()1
|
||||
|
||||
if peer.is_choked() {
|
||||
None
|
||||
} else {
|
||||
Some(peer.id())
|
||||
}
|
||||
})
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
let pieces = self.meta_info().pieces() as u32;
|
||||
for i in 0u32..pieces {
|
||||
if self.bitfield.get(i) {
|
||||
continue;
|
||||
}
|
||||
|
||||
let length = self.storage_map.get_piece_length(i as usize);
|
||||
let mut offset = 0;
|
||||
|
||||
for peer in &peers.clone() {
|
||||
if !self.peers[peer].has_piece(i) {
|
||||
continue;
|
||||
}
|
||||
|
||||
while offset < length && self.peers[peer].count_open_requests() < 25 {
|
||||
if !self.storage_map.has_piece_bit(i as usize, offset) {
|
||||
let request_length = if (offset + REQUEST_SIZE) > length {
|
||||
length - offset
|
||||
} else {
|
||||
REQUEST_SIZE
|
||||
};
|
||||
|
||||
let msg = SelectionMessage::new(i, offset as u32, request_length as u32);
|
||||
|
||||
if self.peers.get_mut(peer).unwrap().requested(msg) {
|
||||
self.queue.push_back((*peer, Message::Request(msg)));
|
||||
}
|
||||
}
|
||||
|
||||
offset += REQUEST_SIZE;
|
||||
}
|
||||
|
||||
if self.peers[peer].count_open_requests() >= 25 {
|
||||
peers.remove(peer);
|
||||
}
|
||||
|
||||
if offset >= length {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
34
torment-manager/src/tracker_manager.rs
Normal file
34
torment-manager/src/tracker_manager.rs
Normal file
|
@ -0,0 +1,34 @@
|
|||
use std::collections::HashMap;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use url::Url;
|
||||
|
||||
pub struct TrackerManager {
|
||||
counter: AtomicUsize,
|
||||
trackers: HashMap<usize, Tracker>,
|
||||
url_index: HashMap<String, usize>,
|
||||
}
|
||||
|
||||
pub struct Tracker {
|
||||
url: Url,
|
||||
}
|
||||
|
||||
pub type TrackerId = usize;
|
||||
|
||||
impl TrackerManager {
|
||||
pub fn get_tracker_id(&mut self, url: &Url) -> Option<TrackerId> {
|
||||
// dht is not a tracker bye
|
||||
if url.scheme() == "dht" {
|
||||
return None;
|
||||
}
|
||||
|
||||
let url_str = url.as_str();
|
||||
if let Some(id) = self.url_index.get(url_str) {
|
||||
return Some(*id);
|
||||
}
|
||||
|
||||
let new_id = self.counter.fetch_add(1, Ordering::AcqRel);
|
||||
self.url_index.insert(url_str.to_string(), new_id);
|
||||
|
||||
Some(new_id)
|
||||
}
|
||||
}
|
|
@ -7,3 +7,5 @@ edition = "2018"
|
|||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
torment-core = { path = "../torment-core" }
|
||||
bytes = "0.5.6"
|
|
@ -1,4 +1,170 @@
|
|||
struct Peer {}
|
||||
use crate::message::{Handshake, Message, PieceMessage, SelectionMessage};
|
||||
use std::collections::{HashSet, VecDeque};
|
||||
use std::net::SocketAddr;
|
||||
use torment_core::infohash::v1::U160;
|
||||
use torment_core::metainfo::MetaInfo;
|
||||
use torment_core::Bitfield;
|
||||
|
||||
pub mod message;
|
||||
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
pub struct PeerState {
|
||||
chocked: bool,
|
||||
interested: bool,
|
||||
}
|
||||
|
||||
impl PeerState {
|
||||
const fn new() -> PeerState {
|
||||
PeerState {
|
||||
chocked: true,
|
||||
interested: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for PeerState {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub enum PeerProtocol {
|
||||
TCP,
|
||||
MuTP,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Peer {
|
||||
id: U160,
|
||||
info_hash: U160,
|
||||
address: SocketAddr,
|
||||
protocol: PeerProtocol,
|
||||
our_state: PeerState,
|
||||
their_state: PeerState,
|
||||
has: Bitfield,
|
||||
request_state: HashSet<SelectionMessage>,
|
||||
request_queue: VecDeque<SelectionMessage>,
|
||||
requested_queue: HashSet<SelectionMessage>,
|
||||
received_pieces: VecDeque<PieceMessage>,
|
||||
queue: VecDeque<Message>,
|
||||
}
|
||||
|
||||
impl Peer {
|
||||
pub fn id(&self) -> U160 {
|
||||
self.id
|
||||
}
|
||||
|
||||
pub fn new(
|
||||
address: SocketAddr,
|
||||
protocol: PeerProtocol,
|
||||
header: Handshake,
|
||||
meta_info: &MetaInfo,
|
||||
) -> Self {
|
||||
Peer {
|
||||
id: header.peer_id(),
|
||||
info_hash: header.info_hash(),
|
||||
address,
|
||||
protocol,
|
||||
our_state: PeerState::new(),
|
||||
their_state: PeerState::new(),
|
||||
has: Bitfield::with_size(meta_info.pieces()),
|
||||
request_state: Default::default(),
|
||||
request_queue: Default::default(),
|
||||
requested_queue: Default::default(),
|
||||
received_pieces: Default::default(),
|
||||
queue: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn has(&mut self) -> Bitfield {
|
||||
self.has.clone()
|
||||
}
|
||||
|
||||
pub fn process(&mut self, message: Message) -> bool {
|
||||
match message {
|
||||
Message::Choke => self.their_state.chocked = true,
|
||||
Message::Unchoke => self.their_state.chocked = false,
|
||||
Message::Interested => self.their_state.interested = true,
|
||||
Message::NotInterested => self.their_state.interested = false,
|
||||
Message::Have(nr) => self.has.set(nr),
|
||||
Message::Bitfield(bitfield) => self.has.add(bitfield),
|
||||
Message::Request(selection) => {
|
||||
self.request_queue.push_back(selection);
|
||||
self.request_state.insert(selection);
|
||||
}
|
||||
Message::Cancel(selection) => {
|
||||
self.request_state.remove(&selection);
|
||||
}
|
||||
Message::Piece(piece) => {
|
||||
self.requested_queue.remove(&SelectionMessage::new(
|
||||
piece.index(),
|
||||
piece.offset(),
|
||||
piece.length(),
|
||||
));
|
||||
self.received_pieces.push_back(piece);
|
||||
}
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
pub fn count_open_requests(&self) -> usize {
|
||||
self.requested_queue.len()
|
||||
}
|
||||
|
||||
pub fn is_choked(&self) -> bool {
|
||||
self.their_state.chocked
|
||||
}
|
||||
|
||||
pub fn is_interested(&self) -> bool {
|
||||
self.their_state.interested
|
||||
}
|
||||
|
||||
pub fn we_interested(&self) -> bool {
|
||||
self.our_state.interested
|
||||
}
|
||||
|
||||
pub fn we_choked(&self) -> bool {
|
||||
self.our_state.chocked
|
||||
}
|
||||
|
||||
pub fn set_we_choked(&mut self, choked: bool) {
|
||||
self.our_state.chocked = choked;
|
||||
}
|
||||
|
||||
pub fn set_we_interested(&mut self, interested: bool) {
|
||||
self.our_state.interested = interested;
|
||||
}
|
||||
|
||||
pub fn requested(&mut self, selection: SelectionMessage) -> bool {
|
||||
self.requested_queue.insert(selection)
|
||||
}
|
||||
|
||||
pub fn next_piece(&mut self) -> Option<PieceMessage> {
|
||||
self.received_pieces.pop_front()
|
||||
}
|
||||
|
||||
pub fn next_request(&mut self) -> Option<SelectionMessage> {
|
||||
while let Some(item) = self.request_queue.pop_front() {
|
||||
if self.request_state.remove(&item) {
|
||||
return Some(item);
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub fn addr(&self) -> SocketAddr {
|
||||
self.address
|
||||
}
|
||||
pub fn has_piece(&self, index: u32) -> bool {
|
||||
self.has.get(index as u32)
|
||||
}
|
||||
|
||||
pub fn next(&mut self) -> Option<Message> {
|
||||
self.queue.pop_front()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
|
292
torment-peer/src/message.rs
Normal file
292
torment-peer/src/message.rs
Normal file
|
@ -0,0 +1,292 @@
|
|||
use bytes::{Bytes, BytesMut};
|
||||
use std::convert::TryInto;
|
||||
use std::error::Error;
|
||||
use std::fmt::{Debug, Display, Formatter};
|
||||
use torment_core::infohash::v1::U160;
|
||||
use torment_core::infohash::InfoHashCapable;
|
||||
use torment_core::Bitfield;
|
||||
|
||||
#[derive(Clone, Debug, Ord, PartialOrd, Eq, PartialEq)]
|
||||
pub enum Message {
|
||||
Choke,
|
||||
Unchoke,
|
||||
Interested,
|
||||
NotInterested,
|
||||
Have(u32),
|
||||
Bitfield(Bitfield),
|
||||
Request(SelectionMessage),
|
||||
Cancel(SelectionMessage),
|
||||
Piece(PieceMessage),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum MessageParsingError {
|
||||
TooShort,
|
||||
UnknownType,
|
||||
InvalidPrefix,
|
||||
}
|
||||
|
||||
impl Error for MessageParsingError {}
|
||||
|
||||
impl Display for MessageParsingError {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
write!(f, "{:?}", self)
|
||||
}
|
||||
}
|
||||
|
||||
impl Message {
|
||||
pub fn from_bytes(bytes: Bytes) -> Result<Message, MessageParsingError> {
|
||||
Ok(match bytes[0] {
|
||||
0 => Message::Choke,
|
||||
1 => Message::Unchoke,
|
||||
2 => Message::Interested,
|
||||
3 => Message::NotInterested,
|
||||
4 => {
|
||||
if bytes.len() < 5 {
|
||||
return Err(MessageParsingError::TooShort);
|
||||
}
|
||||
|
||||
Message::Have(u32::from_be_bytes(bytes[1..5].try_into().unwrap()))
|
||||
}
|
||||
|
||||
5 => Message::Bitfield(Bitfield::new(bytes.slice(1..))),
|
||||
|
||||
6 | 8 => {
|
||||
if bytes.len() < 13 {
|
||||
return Err(MessageParsingError::TooShort);
|
||||
}
|
||||
|
||||
let selection = SelectionMessage {
|
||||
index: u32::from_be_bytes(bytes[1..5].try_into().unwrap()),
|
||||
offset: u32::from_be_bytes(bytes[5..9].try_into().unwrap()),
|
||||
length: u32::from_be_bytes(bytes[9..13].try_into().unwrap()),
|
||||
};
|
||||
|
||||
if bytes[0] == 6 {
|
||||
Message::Request(selection)
|
||||
} else {
|
||||
Message::Cancel(selection)
|
||||
}
|
||||
}
|
||||
|
||||
7 => {
|
||||
if bytes.len() < 9 {
|
||||
return Err(MessageParsingError::TooShort);
|
||||
}
|
||||
|
||||
Message::Piece(PieceMessage {
|
||||
index: u32::from_be_bytes(bytes[1..5].try_into().unwrap()),
|
||||
offset: u32::from_be_bytes(bytes[5..9].try_into().unwrap()),
|
||||
piece: bytes.slice(9..),
|
||||
})
|
||||
}
|
||||
|
||||
_ => return Err(MessageParsingError::UnknownType),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn to_length_prefixed_bytes(&self) -> Vec<u8> {
|
||||
let buffer = self.to_bytes();
|
||||
let size = (buffer.len() as u32).to_be_bytes();
|
||||
let mut map = Vec::with_capacity(buffer.len() + 4);
|
||||
map.extend_from_slice(&size);
|
||||
map.extend(buffer);
|
||||
map
|
||||
}
|
||||
|
||||
pub fn to_bytes(&self) -> Bytes {
|
||||
match self {
|
||||
Message::Choke => Bytes::from_static(&[0u8]),
|
||||
Message::Unchoke => Bytes::from_static(&[1u8]),
|
||||
Message::Interested => Bytes::from_static(&[2u8]),
|
||||
Message::NotInterested => Bytes::from_static(&[3u8]),
|
||||
Message::Have(piece) => {
|
||||
let mut buffer = BytesMut::with_capacity(5);
|
||||
buffer.extend(&[4]);
|
||||
buffer.extend_from_slice(&piece.to_be_bytes());
|
||||
buffer.freeze()
|
||||
}
|
||||
Message::Bitfield(bitfield) => {
|
||||
let mut buffer = BytesMut::with_capacity(1 + bitfield.as_ref().len());
|
||||
buffer.extend(&[5]);
|
||||
buffer.extend_from_slice(bitfield.as_ref());
|
||||
buffer.freeze()
|
||||
}
|
||||
Message::Request(request) => {
|
||||
let mut buffer = BytesMut::with_capacity(13);
|
||||
buffer.extend(&[6]);
|
||||
buffer.extend_from_slice(&request.index.to_be_bytes());
|
||||
buffer.extend_from_slice(&request.offset.to_be_bytes());
|
||||
buffer.extend_from_slice(&request.length.to_be_bytes());
|
||||
buffer.freeze()
|
||||
}
|
||||
Message::Piece(piece) => {
|
||||
let mut buffer = BytesMut::with_capacity(9 + piece.piece.len());
|
||||
buffer.extend(&[7]);
|
||||
buffer.extend_from_slice(&piece.index.to_be_bytes());
|
||||
buffer.extend_from_slice(&piece.offset.to_be_bytes());
|
||||
buffer.extend_from_slice(piece.piece.as_ref());
|
||||
buffer.freeze()
|
||||
}
|
||||
Message::Cancel(cancel) => {
|
||||
let mut buffer = BytesMut::with_capacity(13);
|
||||
buffer.extend(&[8]);
|
||||
buffer.extend_from_slice(&cancel.index.to_be_bytes());
|
||||
buffer.extend_from_slice(&cancel.offset.to_be_bytes());
|
||||
buffer.extend_from_slice(&cancel.length.to_be_bytes());
|
||||
buffer.freeze()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Copy)]
|
||||
pub struct Handshake {
|
||||
reserved: [u8; 8],
|
||||
info_hash: U160,
|
||||
peer_id: U160,
|
||||
}
|
||||
|
||||
impl Handshake {
|
||||
pub fn from_bytes(bytes: Bytes) -> Result<Handshake, MessageParsingError> {
|
||||
if bytes.len() < 68 {
|
||||
return Err(MessageParsingError::TooShort);
|
||||
}
|
||||
|
||||
if &bytes[0..20] != b"\x13BitTorrent protocol" {
|
||||
return Err(MessageParsingError::InvalidPrefix);
|
||||
}
|
||||
|
||||
Ok(Handshake {
|
||||
reserved: bytes[20..28].try_into().unwrap(),
|
||||
info_hash: U160::from_bytes(&bytes[28..48]).unwrap(),
|
||||
peer_id: U160::from_bytes(&bytes[48..68]).unwrap(),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn to_bytes(&self) -> [u8; 68] {
|
||||
let mut header =
|
||||
*b"\x13BitTorrent protocol\0\0\0\0\0\0\0\0AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA";
|
||||
header[28..48].copy_from_slice(self.info_hash.to_bytes().as_ref());
|
||||
header[48..68].copy_from_slice(self.peer_id.to_bytes().as_ref());
|
||||
header
|
||||
}
|
||||
|
||||
pub fn new(peer_id: U160, info_hash: U160) -> Handshake {
|
||||
Handshake {
|
||||
reserved: [0; 8],
|
||||
info_hash,
|
||||
peer_id,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn peer_id(&self) -> U160 {
|
||||
self.peer_id
|
||||
}
|
||||
|
||||
pub fn info_hash(&self) -> U160 {
|
||||
self.info_hash
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Hash, Copy, Ord, PartialOrd, Eq, PartialEq)]
|
||||
pub struct SelectionMessage {
|
||||
index: u32,
|
||||
offset: u32,
|
||||
length: u32,
|
||||
}
|
||||
|
||||
impl SelectionMessage {
|
||||
pub fn new(index: u32, offset: u32, length: u32) -> SelectionMessage {
|
||||
SelectionMessage {
|
||||
index,
|
||||
offset,
|
||||
length,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn index(&self) -> u32 {
|
||||
self.index
|
||||
}
|
||||
|
||||
pub fn offset(&self) -> u32 {
|
||||
self.offset
|
||||
}
|
||||
|
||||
pub fn length(&self) -> u32 {
|
||||
self.length
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Ord, PartialOrd, Eq, PartialEq)]
|
||||
pub struct PieceMessage {
|
||||
pub index: u32,
|
||||
pub offset: u32,
|
||||
pub piece: Bytes,
|
||||
}
|
||||
|
||||
impl Debug for PieceMessage {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("PieceMessage")
|
||||
.field("index", &self.index)
|
||||
.field("offset", &self.offset)
|
||||
.field("piece", &self.piece.len())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl PieceMessage {
|
||||
pub fn index(&self) -> u32 {
|
||||
self.index
|
||||
}
|
||||
|
||||
pub fn offset(&self) -> u32 {
|
||||
self.offset
|
||||
}
|
||||
|
||||
pub fn length(&self) -> u32 {
|
||||
self.piece.len() as u32
|
||||
}
|
||||
|
||||
pub fn piece(&self) -> Bytes {
|
||||
self.piece.clone()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::message::{Message, PieceMessage, SelectionMessage};
|
||||
use bytes::Bytes;
|
||||
use torment_core::Bitfield;
|
||||
|
||||
#[test]
|
||||
fn round_trip() {
|
||||
let msgs = [
|
||||
Message::Choke,
|
||||
Message::Unchoke,
|
||||
Message::Interested,
|
||||
Message::NotInterested,
|
||||
Message::Have(42),
|
||||
Message::Bitfield(Bitfield::with_size(4)),
|
||||
Message::Request(SelectionMessage {
|
||||
index: 69,
|
||||
offset: 1337,
|
||||
length: 42,
|
||||
}),
|
||||
Message::Piece(PieceMessage {
|
||||
index: 69,
|
||||
offset: 1337,
|
||||
piece: Bytes::from_static(b"hewwo"),
|
||||
}),
|
||||
Message::Cancel(SelectionMessage {
|
||||
index: 69,
|
||||
offset: 1337,
|
||||
length: 42,
|
||||
}),
|
||||
];
|
||||
|
||||
for msg in &msgs {
|
||||
assert_eq!(msg, &Message::from_bytes(msg.to_bytes()).unwrap())
|
||||
}
|
||||
}
|
||||
}
|
13
torment-storage/Cargo.toml
Normal file
13
torment-storage/Cargo.toml
Normal file
|
@ -0,0 +1,13 @@
|
|||
[package]
|
||||
name = "torment-storage"
|
||||
version = "0.1.0"
|
||||
authors = ["eater <=@eater.me>"]
|
||||
edition = "2018"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
torment-core = { path = "../torment-core" }
|
||||
remem = "0.1.0"
|
||||
bytes = "0.5.6"
|
||||
lazy_static = "1.4.0"
|
348
torment-storage/src/lib.rs
Normal file
348
torment-storage/src/lib.rs
Normal file
|
@ -0,0 +1,348 @@
|
|||
use bytes::{Bytes, BytesMut};
|
||||
use lazy_static::lazy_static;
|
||||
use remem::{ItemGuard, Pool};
|
||||
use std::borrow::Borrow;
|
||||
use std::cmp::min;
|
||||
use std::collections::{BTreeMap, HashMap, HashSet};
|
||||
use std::fmt::{Debug, Formatter};
|
||||
use std::fs::{DirBuilder, File, OpenOptions};
|
||||
use std::io;
|
||||
use std::io::{Read, Seek, SeekFrom, Write};
|
||||
use std::ops::{Add, Deref, Range};
|
||||
use std::path::{Path, PathBuf, MAIN_SEPARATOR};
|
||||
use std::rc::Rc;
|
||||
use std::sync::Mutex;
|
||||
use std::time::{Duration, Instant};
|
||||
use torment_core::metainfo::{MetaInfo, MetaInfoObject, Torrent};
|
||||
use torment_core::utils::EphemeralMap;
|
||||
|
||||
lazy_static! {
|
||||
static ref MEMORY_POOL: Pool<Vec<u8>> = Pool::new(|| vec![0u8; 4194304]);
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct StorageMap {
|
||||
base_path: PathBuf,
|
||||
piece_length: usize,
|
||||
pieces: usize,
|
||||
open_files: HashMap<usize, Rc<Mutex<File>>>,
|
||||
buffer: HashMap<usize, StoragePiece>,
|
||||
mapping: BTreeMap<usize, StorageMapping>,
|
||||
size: usize,
|
||||
}
|
||||
|
||||
pub struct StoragePiece {
|
||||
index: usize,
|
||||
pieces: usize,
|
||||
ranges: HashSet<usize>,
|
||||
buffer: ItemGuard<'static, Vec<u8>>,
|
||||
}
|
||||
|
||||
impl StoragePiece {
|
||||
fn is_complete(&self) -> bool {
|
||||
self.pieces == self.ranges.len()
|
||||
}
|
||||
}
|
||||
|
||||
impl Debug for StoragePiece {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("StoragePiece")
|
||||
.field("index", &self.index)
|
||||
.field("ranges", &self.ranges)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl StorageMap {
|
||||
fn get_offsets(&self, offset: usize, length: usize) -> Vec<usize> {
|
||||
let x = 0..1;
|
||||
let end = offset + length;
|
||||
let mut range = self.mapping.range(offset..);
|
||||
let mut items = vec![];
|
||||
while let Some((item_end, _)) = range.next() {
|
||||
items.push(*item_end);
|
||||
|
||||
if *item_end > end {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
items
|
||||
}
|
||||
|
||||
fn get_file(&mut self, item: &StorageMapping) -> io::Result<Rc<Mutex<File>>> {
|
||||
if let Some(file) = self.open_files.get(&item.offset) {
|
||||
return Ok(Rc::clone(file));
|
||||
}
|
||||
|
||||
let path = item.path.join(&format!("{}", MAIN_SEPARATOR));
|
||||
let file_path = self.base_path.join(path);
|
||||
println!("Opening file: {:?}", file_path);
|
||||
|
||||
if let Some(parent) = file_path.parent() {
|
||||
DirBuilder::new().recursive(true).create(parent)?;
|
||||
}
|
||||
|
||||
let file = OpenOptions::new()
|
||||
.write(true)
|
||||
.read(true)
|
||||
.create(true)
|
||||
.open(file_path.as_path())?;
|
||||
|
||||
let rc = Rc::new(Mutex::new(file));
|
||||
self.open_files.insert(item.offset, Rc::clone(&rc));
|
||||
Ok(rc)
|
||||
}
|
||||
|
||||
fn internal_write<T: AsRef<[u8]>>(&mut self, offset: usize, data: T) -> io::Result<()> {
|
||||
let data = data.as_ref();
|
||||
let mut written = 0;
|
||||
for item_end in self.get_offsets(offset, data.len()) {
|
||||
let item = self.mapping.get(&item_end).unwrap().clone();
|
||||
let file = self.get_file(&item)?;
|
||||
let mut file = file.lock().unwrap();
|
||||
let curr_offset = (offset + written) - item.offset;
|
||||
file.seek(SeekFrom::Start(curr_offset as u64))?;
|
||||
let to_write = min(item.length - curr_offset, data.len() - written);
|
||||
file.write_all(&data[written..written + to_write])?;
|
||||
written += to_write;
|
||||
}
|
||||
|
||||
assert_eq!(written, data.len(), "Failed to write all data");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn internal_read<T: AsMut<[u8]>>(&mut self, offset: usize, mut data: T) -> io::Result<()> {
|
||||
let data = data.as_mut();
|
||||
let mut readed = 0;
|
||||
for item_end in self.get_offsets(offset, data.len()) {
|
||||
let item = self.mapping.get(&item_end).unwrap().clone();
|
||||
let file = self.get_file(&item)?;
|
||||
let mut file = file.lock().unwrap();
|
||||
let curr_offset = (offset + readed) - item.offset;
|
||||
file.seek(SeekFrom::Start(curr_offset as u64))?;
|
||||
let to_read = min(item.length - curr_offset, data.len() - readed);
|
||||
file.read_exact(&mut data[readed..readed + to_read])?;
|
||||
readed += to_read;
|
||||
}
|
||||
|
||||
assert_eq!(readed, data.len(), "Failed to read all data");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn has_piece(&self, index: usize) -> bool {
|
||||
if let Some(piece) = self.buffer.get(&index) {
|
||||
piece.is_complete()
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
pub fn read_piece(&mut self, index: usize) -> io::Result<Vec<u8>> {
|
||||
let mut bytes = vec![0; self.get_piece_length(index)];
|
||||
self.read(index, 0, &mut bytes);
|
||||
Ok(bytes)
|
||||
}
|
||||
|
||||
pub fn read<T: AsMut<[u8]>>(
|
||||
&mut self,
|
||||
index: usize,
|
||||
offset: usize,
|
||||
mut data: T,
|
||||
) -> io::Result<()> {
|
||||
let data = data.as_mut();
|
||||
self.internal_read((index * self.piece_length) + offset, data)
|
||||
}
|
||||
|
||||
pub fn write<T: AsRef<[u8]>>(
|
||||
&mut self,
|
||||
index: usize,
|
||||
offset: usize,
|
||||
data: T,
|
||||
) -> io::Result<bool> {
|
||||
let data = data.as_ref();
|
||||
let item = if let Some(item) = self.buffer.get_mut(&index) {
|
||||
item
|
||||
} else {
|
||||
let request_size = 2usize.pow(14);
|
||||
let piece_length = self.get_piece_length(index);
|
||||
let pieces = (piece_length / request_size)
|
||||
+ if piece_length % request_size > 0 {
|
||||
1
|
||||
} else {
|
||||
0
|
||||
};
|
||||
|
||||
self.buffer.insert(index, {
|
||||
StoragePiece {
|
||||
index,
|
||||
ranges: Default::default(),
|
||||
buffer: MEMORY_POOL.get(),
|
||||
pieces,
|
||||
}
|
||||
});
|
||||
|
||||
self.buffer.get_mut(&index).unwrap()
|
||||
};
|
||||
|
||||
item.buffer[offset..offset + data.len()].copy_from_slice(data);
|
||||
item.ranges.insert(offset);
|
||||
|
||||
if item.is_complete() {
|
||||
if let Some(item) = self.buffer.remove(&index) {
|
||||
self.internal_write(
|
||||
item.index * self.piece_length,
|
||||
&item.buffer[0..self.get_piece_length(item.index)],
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wipe_piece(&mut self, index: usize) {
|
||||
self.buffer.remove(&index);
|
||||
}
|
||||
|
||||
pub fn get_piece_length(&self, index: usize) -> usize {
|
||||
if index + 1 == self.pieces {
|
||||
let len = (self.size % self.piece_length);
|
||||
if len == 0 {
|
||||
self.piece_length
|
||||
} else {
|
||||
len
|
||||
}
|
||||
} else {
|
||||
self.piece_length
|
||||
}
|
||||
}
|
||||
|
||||
pub fn has_piece_bit(&self, index: usize, offset: usize) -> bool {
|
||||
self.buffer
|
||||
.get(&index)
|
||||
.map(|item| item.ranges.contains(&offset))
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
pub fn house_keeping(&mut self) {
|
||||
// self.open_files.clean()
|
||||
}
|
||||
|
||||
pub fn size(&self) -> usize {
|
||||
self.size
|
||||
}
|
||||
}
|
||||
|
||||
pub trait ToStorageMap {
|
||||
fn to_storage_map<P: AsRef<Path>>(&self, path: P, is_base_path: bool) -> StorageMap;
|
||||
}
|
||||
|
||||
impl ToStorageMap for Torrent {
|
||||
fn to_storage_map<P: AsRef<Path>>(&self, path: P, is_base_path: bool) -> StorageMap {
|
||||
let name = self.name().clone();
|
||||
match self.meta_info().object() {
|
||||
MetaInfoObject::File(size) => {
|
||||
if is_base_path {
|
||||
StorageMapBuilder::create(path, self.meta_info().piece_length())
|
||||
.insert(vec![name.to_string()], *size)
|
||||
.build()
|
||||
} else {
|
||||
let path = path.as_ref().to_path_buf();
|
||||
let parent = path.parent().unwrap_or(if cfg!(target_os = "win") {
|
||||
Path::new("C:\\")
|
||||
} else {
|
||||
Path::new("/")
|
||||
});
|
||||
|
||||
StorageMapBuilder::create(parent, self.meta_info().piece_length())
|
||||
.insert(
|
||||
vec![path
|
||||
.file_name()
|
||||
.map(|os_str| os_str.to_string_lossy().to_string())
|
||||
.unwrap_or_else(|| self.name().to_string())],
|
||||
*size,
|
||||
)
|
||||
.build()
|
||||
}
|
||||
}
|
||||
|
||||
MetaInfoObject::Files(files) => {
|
||||
let mut builder = if is_base_path {
|
||||
StorageMapBuilder::create(path, self.meta_info().piece_length())
|
||||
} else {
|
||||
StorageMapBuilder::create(
|
||||
path.as_ref().join(self.name()),
|
||||
self.meta_info().piece_length(),
|
||||
)
|
||||
};
|
||||
|
||||
for file in files {
|
||||
builder = builder.insert(file.path().to_vec(), file.length());
|
||||
}
|
||||
|
||||
builder.build()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct StorageMapping {
|
||||
path: Vec<String>,
|
||||
length: usize,
|
||||
offset: usize,
|
||||
}
|
||||
|
||||
pub struct StorageMapBuilder {
|
||||
base_path: PathBuf,
|
||||
offset: usize,
|
||||
piece_length: usize,
|
||||
items: BTreeMap<usize, StorageMapping>,
|
||||
}
|
||||
|
||||
impl StorageMapBuilder {
|
||||
pub fn create<P: AsRef<Path>>(path: P, piece_length: usize) -> StorageMapBuilder {
|
||||
StorageMapBuilder {
|
||||
base_path: path.as_ref().to_path_buf(),
|
||||
offset: 0,
|
||||
piece_length,
|
||||
items: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(mut self, path: Vec<String>, length: usize) -> Self {
|
||||
let offset = self.offset;
|
||||
self.offset += length;
|
||||
self.items.insert(
|
||||
self.offset - 1,
|
||||
StorageMapping {
|
||||
offset,
|
||||
length,
|
||||
path,
|
||||
},
|
||||
);
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(self) -> StorageMap {
|
||||
StorageMap {
|
||||
base_path: self.base_path,
|
||||
piece_length: self.piece_length,
|
||||
pieces: (self.offset / self.piece_length)
|
||||
+ if self.offset % self.piece_length > 0 {
|
||||
1
|
||||
} else {
|
||||
0
|
||||
},
|
||||
open_files: Default::default(),
|
||||
buffer: Default::default(),
|
||||
mapping: self.items,
|
||||
size: self.offset,
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue