use crate::tracker_manager::{TrackerId, TrackerManager}; use bytes::Bytes; use ring::digest::{digest, SHA1_FOR_LEGACY_USE_ONLY}; use std::collections::{HashMap, HashSet, VecDeque}; use std::net::SocketAddr; use std::ops::Index; use std::option::Option::Some; use std::path::PathBuf; use torment_core::infohash::v1::U160; use torment_core::metainfo::{MetaInfo, Torrent}; use torment_core::Bitfield; use torment_peer::message::{Handshake, Message, PieceMessage, SelectionMessage}; use torment_peer::{Peer, PeerProtocol}; use torment_storage::{StorageMap, ToStorageMap}; #[derive(Debug)] pub enum Source { Torrent(TorrentSource), MetaInfo(MetaInfoSource), } #[derive(Debug)] pub struct MetaInfoSource { info_hash: U160, } #[derive(Debug)] pub struct TorrentSource { file: Option, torrent: Option, } #[derive(Debug)] pub struct TorrentTarget { pub path: PathBuf, pub is_base_path: bool, } #[derive(Debug)] pub struct TorrentManager { info_hash: U160, bitfield: Bitfield, trackers: Vec>, source: Source, target: TorrentTarget, peers: HashMap, queue: VecDeque<(U160, Message)>, storage_map: StorageMap, } pub const REQUEST_SIZE: usize = 16384; impl TorrentManager { pub fn info_hash(&self) -> U160 { self.info_hash } pub fn from_torrent( torrent: Torrent, target: TorrentTarget, tracker_manager: Option<&mut TrackerManager>, ) -> TorrentManager { let trackers = tracker_manager .map(|manager| { torrent .announce_list() .iter() .map(|tier_list| { tier_list .iter() .filter_map(|tracker| manager.get_tracker_id(tracker)) .collect::>() }) .collect::>() }) .unwrap_or(vec![]); TorrentManager { info_hash: torrent.info_hash(), bitfield: Bitfield::with_size(torrent.meta_info().pieces()), trackers, source: Source::Torrent(TorrentSource { file: None, torrent: Some(torrent.clone()), }), storage_map: torrent.to_storage_map(&target.path, target.is_base_path), target, peers: HashMap::new(), queue: Default::default(), } } pub fn handshake( &mut self, handshake: Handshake, addr: SocketAddr, protocol: PeerProtocol, ) -> bool { if handshake.info_hash() != self.info_hash { return false; } if let Some(peer) = self.peers.get(&handshake.peer_id()) { if peer.addr() != addr { return false; } } let meta_info = self.meta_info(); let peer = Peer::new(addr, protocol, handshake, meta_info); self.peers.insert(peer.id(), peer); true } fn meta_info(&mut self) -> &MetaInfo { if let Source::Torrent(torrent) = &self.source { return torrent .torrent .as_ref() .expect("No torrent loaded uh oh missing functionality") .meta_info(); } panic!("Can't resolve MetaInfo for torrent") } pub fn process(&mut self, peer_id: U160, message: Message) -> bool { let mut queue = vec![]; let ok = if let Some(peer) = self.peers.get_mut(&peer_id) { if peer.process(message) { while let Some(message) = peer.next() { self.queue.push_back((peer_id, message)); } while let Some(piece) = peer.next_piece() { if self.storage_map.has_piece(piece.index() as usize) { continue; } if self .storage_map .write( piece.index() as usize, piece.offset() as usize, piece.piece(), ) .unwrap() { queue.push(piece.index() as usize); } } while let Some(piece_request) = peer.next_request() { if !self.bitfield.get(piece_request.index()) { continue; } let mut buffer = vec![0u8; piece_request.length() as usize]; self.storage_map .read( piece_request.index() as usize, piece_request.offset() as usize, &mut buffer, ) .unwrap(); self.queue.push_back(( peer_id, Message::Piece(PieceMessage { index: piece_request.index(), offset: piece_request.offset(), piece: Bytes::from(buffer), }), )) } true } else { false } } else { false }; for have in queue { let piece_hash = self.meta_info().hash(have); let piece_data = self.storage_map.read_piece(have).unwrap(); let res = digest(&SHA1_FOR_LEGACY_USE_ONLY, &piece_data); if piece_hash != res.as_ref() { println!("=> Piece#{} failed verification", have); self.storage_map.wipe_piece(have); continue; } else { println!("=> Piece#{} verified", have); } self.bitfield.set(have as u32); let keys = self.peers.keys().copied().collect::>(); for key in keys { self.queue.push_back((key, Message::Have(have as u32))); if !self.peers[&key].has_piece(have as u32) && self.peers[&key].we_choked() { self.peers.get_mut(&key).unwrap().set_we_choked(false); self.queue.push_back((key, Message::Unchoke)); } } } ok } pub fn next(&mut self) -> Option<(U160, Message)> { self.queue.pop_front() } pub fn house_keeping(&mut self) { let mut peers = self .peers .iter() .filter_map(|(_, peer)| { if peer.has()1 if peer.is_choked() { None } else { Some(peer.id()) } }) .collect::>(); let pieces = self.meta_info().pieces() as u32; for i in 0u32..pieces { if self.bitfield.get(i) { continue; } let length = self.storage_map.get_piece_length(i as usize); let mut offset = 0; for peer in &peers.clone() { if !self.peers[peer].has_piece(i) { continue; } while offset < length && self.peers[peer].count_open_requests() < 25 { if !self.storage_map.has_piece_bit(i as usize, offset) { let request_length = if (offset + REQUEST_SIZE) > length { length - offset } else { REQUEST_SIZE }; let msg = SelectionMessage::new(i, offset as u32, request_length as u32); if self.peers.get_mut(peer).unwrap().requested(msg) { self.queue.push_back((*peer, Message::Request(msg))); } } offset += REQUEST_SIZE; } if self.peers[peer].count_open_requests() >= 25 { peers.remove(peer); } if offset >= length { break; } } } } }