From c7cc7ff9edd2300dedcd30b69218719c85cc7b35 Mon Sep 17 00:00:00 2001 From: eater <=@eater.me> Date: Thu, 3 Sep 2020 09:10:01 +0200 Subject: [PATCH] wip --- Cargo.lock | 571 +++++++++++++++++++++++- Cargo.toml | 3 +- torment-cli/src/download.rs | 14 +- torment-core/src/consts.rs | 1 + torment-core/src/lib.rs | 87 ++-- torment-core/src/metainfo.rs | 14 +- torment-core/src/peer_storage.rs | 122 +++++ torment-core/src/utils.rs | 4 + torment-daemon/src/main.rs | 153 +------ torment-dht-node/src/main.rs | 4 +- torment-dht/src/host_node.rs | 32 +- torment-manager/Cargo.toml | 7 +- torment-manager/src/consts.rs | 2 + torment-manager/src/lib.rs | 3 + torment-manager/src/session_manager.rs | 86 +++- torment-manager/src/torment_instance.rs | 432 ++++++++++++++++++ torment-manager/src/torrent_manager.rs | 226 +++++++--- torment-manager/src/tracker_manager.rs | 141 +++++- torment-peer/src/lib.rs | 71 ++- torment-peer/src/message.rs | 2 +- torment-storage/src/lib.rs | 87 ++-- torment-tracker/Cargo.toml | 13 + torment-tracker/src/lib.rs | 269 +++++++++++ 23 files changed, 2054 insertions(+), 290 deletions(-) create mode 100644 torment-core/src/consts.rs create mode 100644 torment-core/src/peer_storage.rs create mode 100644 torment-manager/src/consts.rs create mode 100644 torment-manager/src/torment_instance.rs create mode 100644 torment-tracker/Cargo.toml create mode 100644 torment-tracker/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 428a775..4833441 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -21,7 +21,7 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] @@ -75,7 +75,7 @@ dependencies = [ "socket2", "vec-arena", "wepoll-sys-stjepang", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -135,7 +135,7 @@ checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ "hermit-abi", "libc", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -274,6 +274,32 @@ dependencies = [ "cache-padded", ] +[[package]] +name = "core-foundation" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57d24c7a13c43e870e37c1556b74555437870a04514f7685f5b354e090567171" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "core-foundation-sys" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3a71ab494c0b5b860bdc8407ae08978052417070c2ced38573a9157ad75b8ac" + +[[package]] +name = "crossbeam-channel" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ee0cc8804d5393478d743b035099520087a5186f3b93fa58cec08fa62407b6" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-epoch" version = "0.8.2" @@ -300,6 +326,21 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "dtoa" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "134951f4028bdadb9b84baf4232681efbf277da25144b9b0ad65df75946c422b" + +[[package]] +name = "encoding_rs" +version = "0.8.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a51b8cf747471cb9499b6d59e59b0444f4c90eba8968c4e44874e92b5b64ace2" +dependencies = [ + "cfg-if", +] + [[package]] name = "event-listener" version = "2.4.0" @@ -334,6 +375,43 @@ version = "1.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4bd3bdaaf0a72155260a1c098989b60db1cbb22d6a628e64f16237aa4da93cc7" +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + +[[package]] +name = "fuchsia-zircon" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" +dependencies = [ + "bitflags", + "fuchsia-zircon-sys", +] + +[[package]] +name = "fuchsia-zircon-sys" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" + [[package]] name = "futures" version = "0.3.5" @@ -461,6 +539,25 @@ version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "aaf91faf136cb47367fa430cd46e37a788775e7fa104f8b4bcb3861dc389b724" +[[package]] +name = "h2" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "993f9e0baeed60001cf565546b0d3dbe6a6ad23f2bd31644a133c641eccf6d53" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "futures-sink", + "futures-util", + "http", + "indexmap", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.8.2" @@ -485,6 +582,70 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "644f9158b2f133fd50f5fb3242878846d9eb792e445c893805ff0e3824006e35" +[[package]] +name = "http" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28d569972648b2c512421b5f2a405ad6ac9666547189d0c5477a3f200f3e02f9" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + +[[package]] +name = "http-body" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b" +dependencies = [ + "bytes", + "http", +] + +[[package]] +name = "httparse" +version = "1.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9" + +[[package]] +name = "hyper" +version = "0.13.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e68a8dd9716185d9e64ea473ea6ef63529252e3e27623295a0378a19665d5eb" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body", + "httparse", + "itoa", + "pin-project", + "socket2", + "time", + "tokio", + "tower-service", + "tracing", + "want", +] + +[[package]] +name = "hyper-tls" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d979acc56dcb5b8dddba3917601745e877576475aa046df3226eabdecef78eed" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-tls", +] + [[package]] name = "idna" version = "0.2.0" @@ -506,6 +667,21 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "iovec" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" +dependencies = [ + "libc", +] + +[[package]] +name = "ipnet" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47be2f14c678be2fdcab04ab1171db51b2762ce6f0a8ee87c8dd4a04ed216135" + [[package]] name = "itoa" version = "0.4.6" @@ -521,6 +697,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kernel32-sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" +dependencies = [ + "winapi 0.2.8", + "winapi-build", +] + [[package]] name = "kv-log-macro" version = "1.0.7" @@ -584,6 +770,22 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mime" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" + +[[package]] +name = "mime_guess" +version = "2.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2684d4c2e97d99848d30b324b00c8fcc7e5c897b7cbb5819b09e7c90e8baf212" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "miniz_oxide" version = "0.4.0" @@ -593,6 +795,37 @@ dependencies = [ "adler", ] +[[package]] +name = "mio" +version = "0.6.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fce347092656428bc8eaf6201042cb551b8d67855af7374542a92a0fbfcac430" +dependencies = [ + "cfg-if", + "fuchsia-zircon", + "fuchsia-zircon-sys", + "iovec", + "kernel32-sys", + "libc", + "log", + "miow", + "net2", + "slab", + "winapi 0.2.8", +] + +[[package]] +name = "miow" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" +dependencies = [ + "kernel32-sys", + "net2", + "winapi 0.2.8", + "ws2_32-sys", +] + [[package]] name = "multitask" version = "0.2.0" @@ -604,6 +837,35 @@ dependencies = [ "fastrand", ] +[[package]] +name = "native-tls" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b0d88c06fe90d5ee94048ba40409ef1d9315d86f6f38c2efdaad4fb50c58b2d" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + +[[package]] +name = "net2" +version = "0.2.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ba7c918ac76704fb42afcbbb43891e72731f3dcca3bef2a19786297baf14af7" +dependencies = [ + "cfg-if", + "libc", + "winapi 0.3.9", +] + [[package]] name = "num-integer" version = "0.1.43" @@ -645,6 +907,39 @@ version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b631f7e854af39a1739f401cf34a8a013dfe09eac4fa4dba91e9768bd28168d" +[[package]] +name = "openssl" +version = "0.10.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d575eff3665419f9b83678ff2815858ad9d11567e082f5ac1814baba4e2bcb4" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "lazy_static", + "libc", + "openssl-sys", +] + +[[package]] +name = "openssl-probe" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77af24da69f9d9341038eba93a073b1fdaaa1b788221b00a69bce9e762cb32de" + +[[package]] +name = "openssl-sys" +version = "0.9.58" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a842db4709b604f0fe5d1170ae3565899be2ad3d9cbc72dedc789ac0511f78de" +dependencies = [ + "autocfg", + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "parking" version = "1.0.6" @@ -695,6 +990,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "pkg-config" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d36492546b6af1463394d46f0c834346f31548646f6ba10849802c9c9a27ac33" + [[package]] name = "polling" version = "0.1.6" @@ -705,7 +1006,7 @@ dependencies = [ "libc", "log", "wepoll-sys-stjepang", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -800,6 +1101,50 @@ dependencies = [ "crossbeam-epoch", ] +[[package]] +name = "remove_dir_all" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" +dependencies = [ + "winapi 0.3.9", +] + +[[package]] +name = "reqwest" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9eaa17ac5d7b838b7503d118fa16ad88f440498bf9ffe5424e621f93190d61e" +dependencies = [ + "base64", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "http", + "http-body", + "hyper", + "hyper-tls", + "ipnet", + "js-sys", + "lazy_static", + "log", + "mime", + "mime_guess", + "native-tls", + "percent-encoding", + "pin-project-lite", + "serde", + "serde_urlencoded", + "tokio", + "tokio-tls", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg", +] + [[package]] name = "ring" version = "0.16.15" @@ -812,7 +1157,7 @@ dependencies = [ "spin", "untrusted", "web-sys", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -827,6 +1172,16 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" +[[package]] +name = "schannel" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f05ba609c234e60bee0d547fe94a4c7e9da733d1c962cf6e59efa4cd9c8bc75" +dependencies = [ + "lazy_static", + "winapi 0.3.9", +] + [[package]] name = "scoped-tls" version = "1.0.0" @@ -839,6 +1194,29 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" +[[package]] +name = "security-framework" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64808902d7d99f78eaddd2b4e2509713babc3dc3c85ad6f4c447680f3c01e535" +dependencies = [ + "bitflags", + "core-foundation", + "core-foundation-sys", + "libc", + "security-framework-sys", +] + +[[package]] +name = "security-framework-sys" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17bf11d99252f512695eb468de5516e5cf75455521e69dfe343f3b74e4748405" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "serde" version = "1.0.115" @@ -871,6 +1249,18 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_urlencoded" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ec5d77e2d4c73717816afac02670d5c4f534ea95ed430442cad02e7a6e32c97" +dependencies = [ + "dtoa", + "itoa", + "serde", + "url", +] + [[package]] name = "slab" version = "0.4.2" @@ -886,7 +1276,7 @@ dependencies = [ "cfg-if", "libc", "redox_syscall", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -924,6 +1314,20 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "tempfile" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9" +dependencies = [ + "cfg-if", + "libc", + "rand", + "redox_syscall", + "remove_dir_all", + "winapi 0.3.9", +] + [[package]] name = "textwrap" version = "0.11.0" @@ -940,7 +1344,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" dependencies = [ "libc", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -949,6 +1353,48 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "238ce071d267c5710f9d31451efec16c5ee22de34df17cc05e56cbc92e967117" +[[package]] +name = "tokio" +version = "0.2.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d34ca54d84bf2b5b4d7d31e901a8464f7b60ac145a284fba25ceb801f2ddccd" +dependencies = [ + "bytes", + "fnv", + "futures-core", + "iovec", + "lazy_static", + "memchr", + "mio", + "num_cpus", + "pin-project-lite", + "slab", +] + +[[package]] +name = "tokio-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a70f4fcd7b3b24fb194f837560168208f669ca8cb70d0c4b862944452396343" +dependencies = [ + "native-tls", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "log", + "pin-project-lite", + "tokio", +] + [[package]] name = "torment-bencode" version = "0.1.0" @@ -1030,11 +1476,16 @@ name = "torment-manager" version = "0.1.0" dependencies = [ "bytes", + "crossbeam-channel", "lazy_static", + "polling", + "rand", + "reqwest", "ring", "torment-core", "torment-peer", "torment-storage", + "torment-tracker", "url", ] @@ -1056,6 +1507,57 @@ dependencies = [ "torment-core", ] +[[package]] +name = "torment-tracker" +version = "0.1.0" +dependencies = [ + "reqwest", + "torment-bencode", + "torment-core", + "url", +] + +[[package]] +name = "tower-service" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e987b6bf443f4b5b3b6f38704195592cca41c5bb7aedd3c3693c7081f8289860" + +[[package]] +name = "tracing" +version = "0.1.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d79ca061b032d6ce30c660fded31189ca0b9922bf483cd70759f13a2d86786c" +dependencies = [ + "cfg-if", + "log", + "tracing-core", +] + +[[package]] +name = "tracing-core" +version = "0.1.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f0e00789804e99b20f12bc7003ca416309d28a6f495d6af58d1e2c2842461b5" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "try-lock" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" + +[[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.4" @@ -1103,6 +1605,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "vcpkg" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6454029bf181f092ad1b853286f23e2c507d8e8194d01d92da4a55c274a5508c" + [[package]] name = "vec-arena" version = "0.5.2" @@ -1115,12 +1623,28 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" +[[package]] +name = "version_check" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed" + [[package]] name = "waker-fn" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9571542c2ce85ce642e6b58b3364da2fb53526360dfb7c211add4f5c23105ff7" +[[package]] +name = "want" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0" +dependencies = [ + "log", + "try-lock", +] + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" @@ -1134,6 +1658,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0563a9a4b071746dd5aedbc3a28c6fe9be4586fb3fbadb67c400d4f53c6b16c" dependencies = [ "cfg-if", + "serde", + "serde_json", "wasm-bindgen-macro", ] @@ -1212,6 +1738,12 @@ dependencies = [ "cc", ] +[[package]] +name = "winapi" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" + [[package]] name = "winapi" version = "0.3.9" @@ -1222,6 +1754,12 @@ dependencies = [ "winapi-x86_64-pc-windows-gnu", ] +[[package]] +name = "winapi-build" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" + [[package]] name = "winapi-i686-pc-windows-gnu" version = "0.4.0" @@ -1234,6 +1772,25 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "winreg" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0120db82e8a1e0b9fb3345a539c478767c0048d842860994d96113d5b667bd69" +dependencies = [ + "winapi 0.3.9", +] + +[[package]] +name = "ws2_32-sys" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" +dependencies = [ + "winapi 0.2.8", + "winapi-build", +] + [[package]] name = "yaml-rust" version = "0.3.5" diff --git a/Cargo.toml b/Cargo.toml index f8645ce..6ae1c34 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,5 +8,6 @@ members = [ "torment-peer", "torment-storage", "torment-manager", - "torment-daemon" + "torment-daemon", + "torment-tracker", ] \ No newline at end of file diff --git a/torment-cli/src/download.rs b/torment-cli/src/download.rs index d44b4e0..dc5c741 100644 --- a/torment-cli/src/download.rs +++ b/torment-cli/src/download.rs @@ -1,4 +1,4 @@ -use bytes::{Bytes, BytesMut}; +use bytes::{Buf, Bytes, BytesMut}; use rand::random; use std::cmp::min; use std::convert::TryInto; @@ -9,7 +9,6 @@ use torment_core::infohash::v1::U160; use torment_core::infohash::InfoHashCapable; use torment_core::metainfo::Torrent; use torment_core::peer_id; -use torment_peer::message::Message::Request; use torment_peer::message::{Handshake, Message, SelectionMessage}; use torment_peer::{Peer, PeerProtocol}; use torment_storage::ToStorageMap; @@ -26,9 +25,10 @@ pub fn download(torrent: Torrent, peers: Vec) { let id = U160::from(peer_id(random())); let our_header = Handshake::new(id, torrent.info_hash()); - stream.write(&our_header.to_bytes()); + stream.write(&our_header.to_bytes()).unwrap(); let mut buffer = [0u8; 4096]; let mut done_header = false; + #[allow(unused_assignments)] let mut header = Handshake::new(U160::random(), U160::random()); let mut peer = None; let mut message_buffer = BytesMut::with_capacity(4096 * 10); @@ -71,12 +71,12 @@ pub fn download(torrent: Torrent, peers: Vec) { continue; } - let mut peer = peer.as_mut().unwrap(); + let peer = peer.as_mut().unwrap(); message_buffer.extend_from_slice(&buffer[..size]); while message_buffer.len() >= 4 { let length = u32::from_be_bytes(message_buffer[..4].try_into().unwrap()); if length == 0 { - message_buffer.split_to(4); + message_buffer.advance(4); continue; } @@ -89,7 +89,9 @@ pub fn download(torrent: Torrent, peers: Vec) { let msg = Message::from_bytes(message.slice(4..)).expect("Failed parsing message"); // println!("=> {:?}", msg); if msg == Message::Unchoke { - stream.write_all(&Message::Interested.to_length_prefixed_bytes()); + stream + .write_all(&Message::Interested.to_length_prefixed_bytes()) + .unwrap(); } peer.process(msg); } diff --git a/torment-core/src/consts.rs b/torment-core/src/consts.rs new file mode 100644 index 0000000..c863a4f --- /dev/null +++ b/torment-core/src/consts.rs @@ -0,0 +1 @@ +pub const REQUEST_SIZE: usize = 16384; diff --git a/torment-core/src/lib.rs b/torment-core/src/lib.rs index 3e7ee53..0a3776a 100644 --- a/torment-core/src/lib.rs +++ b/torment-core/src/lib.rs @@ -10,11 +10,15 @@ use std::fmt::{Display, Formatter}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; use std::str::FromStr; +mod consts; pub mod infohash; pub mod ip; pub mod metainfo; +pub mod peer_storage; pub mod utils; +pub use consts::*; + pub fn peer_id(input: [u8; 12]) -> [u8; 20] { let peer_id = format!( "eT{:0>2}{:0>2}{:0>2}AAAAAAAAAAAA", @@ -121,34 +125,44 @@ impl ContactInfo { } #[derive(Clone, Ord, PartialOrd, Eq, PartialEq)] -pub struct Bitfield(BytesMut); +pub struct Bitfield(BytesMut, usize); impl Bitfield { - pub fn new>(field: T) -> Bitfield { - Bitfield(BytesMut::from(field.as_ref())) + pub fn size(&self) -> usize { + self.1 + } + + pub fn new>(field: T, size: usize) -> Bitfield { + Bitfield(BytesMut::from(field.as_ref()), size) } pub fn with_size(size: usize) -> Bitfield { - Bitfield({ - let mut field = BytesMut::new(); - field.resize((size / 8) + if size % 8 > 0 { 1 } else { 0 }, 0); - field - }) + Bitfield( + { + let mut field = BytesMut::new(); + field.resize((size / 8) + if size % 8 > 0 { 1 } else { 0 }, 0); + field + }, + size, + ) } pub fn set(&mut self, index: u32) { + debug_assert!(index < self.1 as u32, "index out of bounds"); let byte_index = index / 8; - let bitmask = 1 << (7 - index % 8); + let bitmask = 1 << (7 - (index % 8)); self.0[byte_index as usize] |= bitmask; } pub fn unset(&mut self, index: u32) { + debug_assert!(index < self.1 as u32, "index out of bounds"); let byte_index = index / 8; - let bitmask = 1 << (8 - index % 8); + let bitmask = 1 << (7 - (index % 8)); self.0[byte_index as usize] &= !bitmask; } pub fn get(&self, index: u32) -> bool { + debug_assert!(index < self.1 as u32, "index out of bounds"); let byte_index = index / 8; let bitmask = 1 << (7 - (index % 8)); (self.0[byte_index as usize] & bitmask) > 0 @@ -161,13 +175,40 @@ impl Bitfield { self.0[i] |= field[i] } } + + pub fn all(&self) -> bool { + for i in 0..self.0.len() { + let byte = self.0[i]; + if i + 1 == self.0.len() { + let left = self.1 % 8; + if left != 0 { + return byte == 255 ^ (255 >> left); + } + } + + if byte != u8::MAX { + return false; + } + } + + return true; + } } impl Debug for Bitfield { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "Bitfield(")?; - for byte in &self.0 { - write!(f, "{:b}", byte)?; + for i in 0..self.0.len() { + let byte = self.0[i]; + if i + 1 == self.0.len() { + write!( + f, + "{}", + &format!("{:0>8b}", byte)[0..(if self.1 % 8 == 0 { 8 } else { self.1 % 8 })] + )?; + } else { + write!(f, "{:b}", byte)?; + } } write!(f, ")") @@ -180,9 +221,6 @@ impl AsRef<[u8]> for Bitfield { } } -#[derive(Debug, Default)] -pub struct PeerStorage {} - #[derive(Ord, PartialOrd, Eq, PartialEq, Copy, Clone)] pub enum LookupFilter { IPv6, @@ -190,17 +228,6 @@ pub enum LookupFilter { All, } -impl PeerStorage { - pub fn new() -> PeerStorage { - PeerStorage {} - } - - pub fn add_peers(&mut self, _info_hash: U160, _peers: Vec) {} - pub fn get_peers(&self, _info_hash: U160, _filter: LookupFilter) -> Vec { - vec![] - } -} - #[cfg(test)] mod tests { use crate::{compact_peer_id, peer_id, Bitfield}; @@ -213,6 +240,7 @@ mod tests { assert_eq!(&compact_peer_id(), b"eT\x00\x01"); } + #[test] fn test_bitfield() { let mut field = Bitfield::with_size(3); field.set(0); @@ -225,5 +253,12 @@ mod tests { field.add(other_field); assert_eq!(true, field.get(2)); + + assert!(!field.all()); + field.set(1); + field.set(0); + + println!("{} = {:?}", field.as_ref()[0], field); + assert!(field.all()); } } diff --git a/torment-core/src/metainfo.rs b/torment-core/src/metainfo.rs index 4af151e..f474c79 100644 --- a/torment-core/src/metainfo.rs +++ b/torment-core/src/metainfo.rs @@ -79,7 +79,7 @@ impl Torrent { .or(name) .unwrap_or(info.info_hash.to_string()); - let announce_list = if let Some(announce) = dict.get(b"announce-list") { + let mut announce_list = if let Some(announce) = dict.get(b"announce-list") { announce .list() .map(|list| -> Result>, MetaInfoParsingError> { @@ -110,6 +110,18 @@ impl Torrent { vec![] }; + if let Some(item) = dict.get(b"announce") { + if let Some(Ok(url)) = item.string() { + if let Ok(url) = Url::parse(&url) { + if announce_list.is_empty() { + announce_list.push(HashSet::new()); + } + + announce_list[0].insert(url); + } + } + } + Ok(Torrent { name, announce_list, diff --git a/torment-core/src/peer_storage.rs b/torment-core/src/peer_storage.rs new file mode 100644 index 0000000..64a439b --- /dev/null +++ b/torment-core/src/peer_storage.rs @@ -0,0 +1,122 @@ +use crate::infohash::v1::U160; +use crate::LookupFilter; +use std::collections::{HashMap, HashSet}; +use std::net::SocketAddr; +use std::ops::Add; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; +use url::Url; + +#[derive(Debug, Default)] +pub struct PeerStorage { + torrents: HashMap, +} + +#[derive(Debug, Default)] +pub struct PeerCollection { + sets: Vec, + peers: HashSet, + peers6: HashSet, +} + +#[derive(Debug)] +pub struct PeerSet { + source: PeerStorageSource, + expires: Instant, + peers: Vec, +} + +impl PeerCollection { + fn add_peers(&mut self, peers: Vec, source: PeerStorageSource) { + let set = PeerSet { + source, + peers, + expires: Instant::now().add(Duration::from_secs(60 * 30)), + }; + + for peer in &set.peers { + if peer.is_ipv4() { + self.peers.insert(*peer); + } else { + self.peers6.insert(*peer); + } + } + + self.sets.push(set); + } +} + +#[derive(Debug, Clone)] +pub enum PeerStorageSource { + DHT, + Tracker(Url), +} + +impl PeerStorage { + pub fn new() -> PeerStorage { + PeerStorage { + torrents: Default::default(), + } + } + + pub fn add_peers( + &mut self, + info_hash: U160, + peers: Vec, + source: PeerStorageSource, + ) { + self.torrents + .entry(info_hash) + .or_default() + .add_peers(peers, source) + } + + pub fn get_peers(&self, info_hash: U160, filter: LookupFilter) -> Vec { + let collection = if let Some(collection) = self.torrents.get(&info_hash) { + collection + } else { + return vec![]; + }; + + let mut buffer = vec![]; + + if filter != LookupFilter::IPv6 { + buffer.extend(&collection.peers); + } + + if filter != LookupFilter::IPv4 { + buffer.extend(&collection.peers6); + } + + buffer + } +} + +#[derive(Debug, Clone)] +pub struct PeerStorageHolder { + inner: Arc>, +} + +impl PeerStorageHolder { + pub fn new(peer_storage: PeerStorage) -> PeerStorageHolder { + PeerStorageHolder { + inner: Arc::new(RwLock::new(peer_storage)), + } + } + + pub fn get_peers(&self, info_hash: U160, filter: LookupFilter) -> Vec { + self.inner.read().unwrap().get_peers(info_hash, filter) + } + + pub fn add_peers( + &mut self, + info_hash: U160, + peers: Vec, + source: PeerStorageSource, + ) { + self.inner + .write() + .unwrap() + .add_peers(info_hash, peers, source) + } +} diff --git a/torment-core/src/utils.rs b/torment-core/src/utils.rs index 4eec3ce..44d6d31 100644 --- a/torment-core/src/utils.rs +++ b/torment-core/src/utils.rs @@ -259,6 +259,10 @@ impl Debug for EphemeralSet { } impl EphemeralSet { + pub fn len(&self) -> usize { + self.0.len() + } + pub fn new() -> Self { EphemeralSet(EphemeralMap::new()) } diff --git a/torment-daemon/src/main.rs b/torment-daemon/src/main.rs index 7f81a23..21260a9 100644 --- a/torment-daemon/src/main.rs +++ b/torment-daemon/src/main.rs @@ -1,151 +1,42 @@ -use bytes::BytesMut; -use polling::{Event, Poller}; -use std::collections::HashMap; -use std::convert::TryInto; +use rand::random; use std::fs::File; -use std::io::{Read, Write}; -use std::net::TcpStream; -use std::option::Option::Some; -use std::path::PathBuf; -use std::process::exit; -use std::result::Result::Ok; -use std::time::Duration; +use std::io::Read; +use std::net::IpAddr; +use std::str::FromStr; use torment_core::infohash::v1::U160; use torment_core::metainfo::Torrent; use torment_core::peer_id; +use torment_core::peer_storage::{PeerStorage, PeerStorageHolder}; use torment_manager::session_manager::SessionManager; +use torment_manager::torment_instance::TormentInstance; use torment_manager::torrent_manager::{TorrentManager, TorrentTarget}; -use torment_peer::message::{Handshake, Message}; -use torment_peer::PeerProtocol::TCP; - -struct TcpState { - builder: BytesMut, - handshake: Option, - stream: TcpStream, -} +use torment_manager::tracker_manager::TrackerManager; fn main() { - let mut session_manager = SessionManager::new(); + let ip = IpAddr::from_str("0.0.0.0").unwrap(); + let peer_id = U160::from(peer_id(random())); + + let mut session_manager = SessionManager::new(ip, 50002, peer_id); - let mut buffer = vec![]; - File::open("/home/eater/Downloads/[Commie] Senyuu. - 23 [150B93D5].mkv.torrent") + let mut file = vec![]; + File::open("test.torrent") .unwrap() - .read_to_end(&mut buffer) + .read_to_end(&mut file) .unwrap(); + let torrent = Torrent::from_bytes(file, Some("test".to_string())).unwrap(); + + println!("Downloading {:#?}", torrent); let torrent_manager = TorrentManager::from_torrent( - Torrent::from_bytes( - buffer, - Some("[Commie] Senyuu. - 23 [150B93D5].mkv".to_string()), - ) - .unwrap(), + torrent, TorrentTarget { - path: PathBuf::from("/tmp"), + path: "/tmp/test".into(), is_base_path: true, }, - None, + Some(session_manager.tracker_manager_mut()), ); - let peer_id = U160::from(peer_id(rand::random())); - let info_hash = torrent_manager.info_hash(); session_manager.add_torrent_manager(torrent_manager); - let mut tcp_stream_ktorrent = TcpStream::connect("127.0.0.1:6881").unwrap(); - tcp_stream_ktorrent.set_nodelay(true).unwrap(); - tcp_stream_ktorrent - .write_all(Handshake::new(peer_id, info_hash).to_bytes().as_ref()) - .unwrap(); - - let mut tcp_stream_transmission = TcpStream::connect("192.168.188.100:51413").unwrap(); - tcp_stream_transmission.set_nodelay(true).unwrap(); - tcp_stream_transmission - .write_all(Handshake::new(peer_id, info_hash).to_bytes().as_ref()) - .unwrap(); - - let mut buffer = vec![0u8; 4096 * 10]; - let mut poller = Poller::new().unwrap(); - poller.insert(&tcp_stream_ktorrent).unwrap(); - poller.insert(&tcp_stream_transmission).unwrap(); - poller.interest(&tcp_stream_ktorrent, Event::readable(0)); - poller.interest(&tcp_stream_transmission, Event::readable(1)); - - let mut items = vec![ - TcpState { - builder: Default::default(), - handshake: None, - stream: tcp_stream_ktorrent, - }, - TcpState { - builder: Default::default(), - handshake: None, - stream: tcp_stream_transmission, - }, - ]; - - let mut peer_map: HashMap<(U160, U160), usize> = Default::default(); - - loop { - let mut events: Vec = vec![]; - poller.wait(&mut events, Some(Duration::from_secs(10))); - - for event in events { - println!("Event => {:?}", event); - if !event.readable { - continue; - } - - let item = &mut items[event.key]; - let packet = item.stream.read(&mut buffer).unwrap(); - item.builder.extend_from_slice(&buffer[..packet]); - let handshake = if let Some(handshake) = &item.handshake { - handshake - } else { - if item.builder.len() >= 68 { - item.handshake = - Some(Handshake::from_bytes(item.builder.split_to(68).freeze()).unwrap()); - let handshake = item.handshake.as_ref().unwrap(); - println!("{} => {:?}", item.stream.peer_addr().unwrap(), handshake); - peer_map.insert((handshake.info_hash(), handshake.peer_id()), event.key); - session_manager.handshake(*handshake, item.stream.peer_addr().unwrap(), TCP); - handshake - } else { - continue; - } - }; - - while item.builder.len() >= 4 { - let len = u32::from_be_bytes(item.builder[..4].try_into().unwrap()); - if len + 4 > item.builder.len() as u32 { - break; - } - - if len == 0 { - item.builder.split_to(4); - continue; - } - - let message_bytes = item.builder.split_to((4 + len) as usize).freeze(); - let msg = Message::from_bytes(message_bytes.slice(4..)).unwrap(); - println!("{} => {:?}", item.stream.peer_addr().unwrap(), msg); - if !session_manager.process(info_hash, handshake.peer_id(), msg) { - exit(1); - } - } - - poller.interest(&item.stream, Event::readable(event.key)); - } - - while let Some(queued) = session_manager.next() { - if let Some(key) = peer_map.get(&(queued.info_hash, queued.peer_id)) { - let item = &mut items[*key]; - println!("{} <= {:?}", queued.addr, queued.message); - - item.stream - .write_all(&queued.message.to_length_prefixed_bytes()) - .unwrap(); - } - } - - println!("=> Running house keeping"); - session_manager.house_keeping(); - } + let mut instance = TormentInstance::new(50002, session_manager); + instance.logic_loop(); } diff --git a/torment-dht-node/src/main.rs b/torment-dht-node/src/main.rs index 575709d..9110eef 100644 --- a/torment-dht-node/src/main.rs +++ b/torment-dht-node/src/main.rs @@ -6,6 +6,7 @@ use std::net::SocketAddr; use std::pin::Pin; use std::str::FromStr; use std::time::{Duration, Instant}; +use torment_core::peer_storage::{PeerStorage, PeerStorageHolder}; use torment_dht::host_node::HostNode; use torment_dht::krpc::{FromBencode, Message, ToBencode}; @@ -22,7 +23,8 @@ async fn main() { let socket = UdpSocket::bind(SocketAddr::from_str("[::]:50002").unwrap()) .await .unwrap(); - let mut node = HostNode::new(Default::default(), Some(50002)); + let peer_storage = PeerStorage::new(); + let mut node = HostNode::new(PeerStorageHolder::new(peer_storage), Some(50002)); node.add_bootstrap(SocketAddr::from_str("67.215.246.10:6881").unwrap(), None); node.add_bootstrap( SocketAddr::from_str("[2001:41d0:c:5ac:5::1]:6881").unwrap(), diff --git a/torment-dht/src/host_node.rs b/torment-dht/src/host_node.rs index d4d1f8a..0db7bfe 100644 --- a/torment-dht/src/host_node.rs +++ b/torment-dht/src/host_node.rs @@ -6,13 +6,12 @@ use std::collections::{HashSet, VecDeque}; use std::convert::TryInto; use std::net::{IpAddr, SocketAddr}; use std::ops::Add; -use std::rc::Rc; -use std::sync::RwLock; use std::time::{Duration, Instant}; use torment_core::infohash::v1::U160; use torment_core::ip::IpAddrExt; +use torment_core::peer_storage::{PeerStorageHolder, PeerStorageSource}; use torment_core::utils::{EphemeralMap, EphemeralSet}; -use torment_core::{ContactInfo, LookupFilter, PeerStorage}; +use torment_core::{ContactInfo, LookupFilter}; #[derive(Debug)] enum OpenRequest { @@ -34,7 +33,7 @@ pub struct HostNode { ipv6_table: Table, nodes: EphemeralMap<(U160, IpAddr, u16), PeerNode>, queue: VecDeque<(Message, SocketAddr)>, - peer_storage: Rc>, + peer_storage: PeerStorageHolder, } #[derive(Debug)] @@ -92,7 +91,7 @@ impl PeerNode { } impl HostNode { - pub fn new(peer_storage: Rc>, port: Option) -> HostNode { + pub fn new(peer_storage: PeerStorageHolder, port: Option) -> HostNode { let id = U160::random(); HostNode { id, @@ -108,7 +107,7 @@ impl HostNode { } pub fn from_persistent_state( - peer_storage: Rc>, + peer_storage: PeerStorageHolder, port: Option, state: PersistentState, ) -> HostNode { @@ -351,10 +350,11 @@ impl HostNode { } if get_peers.values.len() > 0 { - self.peer_storage - .write() - .unwrap() - .add_peers(info_hash, get_peers.values.clone()); + self.peer_storage.add_peers( + info_hash, + get_peers.values.clone(), + PeerStorageSource::DHT, + ); } for node in &get_peers.nodes.nodes { @@ -502,7 +502,7 @@ impl HostNode { get_peers .want .unwrap_or_else(|| if from.is_ipv4() { Want::N4 } else { Want::N6 }); - let peers = self.peer_storage.read().unwrap().get_peers( + let peers = self.peer_storage.get_peers( get_peers.info_hash, if table == Want::N4 { LookupFilter::IPv4 @@ -611,7 +611,7 @@ impl HostNode { self.queue .push_back((Message::error(t, 203, "Bad token".to_string()), from)) } else { - self.peer_storage.write().unwrap().add_peers( + self.peer_storage.add_peers( announce.info_hash, vec![SocketAddr::new( from.ip(), @@ -621,6 +621,7 @@ impl HostNode { announce.port }, )], + PeerStorageSource::DHT, ); self.queue .push_back((Message::response_empty(self.id, t), from)) @@ -686,13 +687,12 @@ mod test { use crate::host_node::HostNode; use crate::krpc::Message; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; - use std::rc::Rc; - use std::sync::RwLock; use torment_core::infohash::v1::U160; - use torment_core::{ContactInfo, PeerStorage}; + use torment_core::peer_storage::{PeerStorage, PeerStorageHolder}; + use torment_core::ContactInfo; fn get_host() -> HostNode { - let peer_storage = Rc::new(RwLock::new(PeerStorage::new())); + let peer_storage = PeerStorageHolder::new(PeerStorage::new()); HostNode::new(peer_storage, None) } diff --git a/torment-manager/Cargo.toml b/torment-manager/Cargo.toml index 715ef45..2c13771 100644 --- a/torment-manager/Cargo.toml +++ b/torment-manager/Cargo.toml @@ -10,7 +10,12 @@ edition = "2018" torment-core = { path = "../torment-core" } torment-peer = { path = "../torment-peer" } torment-storage = { path = "../torment-storage" } +torment-tracker = { path = "../torment-tracker" } +reqwest = { version = "0.10.8", features = ["blocking"] } +crossbeam-channel = "0.4.3" bytes = "0.5.6" url = "2.1.1" lazy_static = "1.4.0" -ring = "0.16.15" \ No newline at end of file +ring = "0.16.15" +polling = "0.1.6" +rand = "0.7.3" \ No newline at end of file diff --git a/torment-manager/src/consts.rs b/torment-manager/src/consts.rs new file mode 100644 index 0000000..36ef3ae --- /dev/null +++ b/torment-manager/src/consts.rs @@ -0,0 +1,2 @@ +pub const MAX_OPEN_REQUESTS: usize = 250; +pub const MAX_CONNECTIONS_PER_THREAD: usize = 100; diff --git a/torment-manager/src/lib.rs b/torment-manager/src/lib.rs index e3a308a..6f4e098 100644 --- a/torment-manager/src/lib.rs +++ b/torment-manager/src/lib.rs @@ -1,3 +1,6 @@ +mod consts; pub mod session_manager; +pub mod torment_instance; pub mod torrent_manager; pub mod tracker_manager; +pub use consts::*; diff --git a/torment-manager/src/session_manager.rs b/torment-manager/src/session_manager.rs index d5eeaa6..7906b53 100644 --- a/torment-manager/src/session_manager.rs +++ b/torment-manager/src/session_manager.rs @@ -1,8 +1,10 @@ use crate::torrent_manager::TorrentManager; -use std::collections::{HashMap, VecDeque}; -use std::net::SocketAddr; +use crate::tracker_manager::TrackerManager; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::net::{IpAddr, SocketAddr}; use std::option::Option::Some; use torment_core::infohash::v1::U160; +use torment_core::peer_storage::{PeerStorage, PeerStorageHolder}; use torment_peer::message::{Handshake, Message}; use torment_peer::PeerProtocol; @@ -14,17 +16,60 @@ pub struct QueuedMessage { } pub struct SessionManager { + id: U160, torrents: HashMap, + torrents_ids: HashSet, peer_socket: HashMap<(U160, U160), SocketAddr>, message_queue: VecDeque, + tracker_manager: TrackerManager, + pub peer_storage: PeerStorageHolder, } impl SessionManager { + pub fn peer_count(&self, info_hash: U160) -> usize { + self.torrents[&info_hash].peer_count() + } + + pub fn torrents(&self) -> Vec { + self.torrents_ids.iter().copied().collect() + } + + pub fn torrent_manager(&self, info_hash: U160) -> &TorrentManager { + &self.torrents[&info_hash] + } + + pub fn id(&self) -> U160 { + self.id + } + + pub fn tracker_manager(&self) -> &TrackerManager { + &self.tracker_manager + } + + pub fn tracker_manager_mut(&mut self) -> &mut TrackerManager { + &mut self.tracker_manager + } + + pub fn is_done(&self, info_hash: U160) -> bool { + if let Some(torrent) = self.torrents.get(&info_hash) { + torrent.is_done() + } else { + false + } + } + + pub fn dump_queue(&mut self) -> VecDeque { + std::mem::replace(&mut self.message_queue, VecDeque::new()) + } + pub fn add_torrent_manager(&mut self, torrent: TorrentManager) { + self.torrents_ids.insert(torrent.info_hash()); self.torrents.insert(torrent.info_hash(), torrent); } + #[allow(dead_code)] fn remove_torrent_manager(&mut self, info_hash: U160) -> Option { + self.torrents_ids.remove(&info_hash); self.torrents.remove(&info_hash) } @@ -46,7 +91,8 @@ impl SessionManager { pub fn process(&mut self, torrent: U160, peer_id: U160, message: Message) -> bool { if let Some(torrent_manager) = self.torrents.get_mut(&torrent) { if torrent_manager.process(peer_id, message) { - while let Some((peer_id, message)) = torrent_manager.next() { + let queue = torrent_manager.dump_queue(); + for (peer_id, message) in queue { if let Some(addr) = self.peer_socket.get(&(peer_id, torrent)).copied() { self.message_queue.push_back(QueuedMessage { message, @@ -66,9 +112,14 @@ impl SessionManager { } } - pub fn new() -> SessionManager { + pub fn new(ip: IpAddr, port: u16, peer_id: U160) -> SessionManager { + let peer_storage = PeerStorageHolder::new(PeerStorage::new()); SessionManager { + id: peer_id, torrents: Default::default(), + tracker_manager: TrackerManager::new(ip, port, peer_id, peer_storage.clone()), + peer_storage, + torrents_ids: Default::default(), peer_socket: Default::default(), message_queue: Default::default(), } @@ -79,8 +130,33 @@ impl SessionManager { } pub fn house_keeping(&mut self) { - for (_, torrent) in &mut self.torrents { + for (info_hash, torrent) in &mut self.torrents { torrent.house_keeping(); + + let queue = torrent.dump_queue(); + for (peer_id, message) in queue { + if let Some(addr) = self.peer_socket.get(&(peer_id, *info_hash)).copied() { + self.message_queue.push_back(QueuedMessage { + message, + addr, + peer_id, + info_hash: *info_hash, + }); + } + } + } + } + + pub fn announce(&mut self) { + println!("Announcing torrents..."); + for info_hash in self.torrents_ids.clone() { + let trackers = self.torrents[&info_hash].trackers().clone(); + for tracker_tier in trackers { + for tracker in tracker_tier { + self.tracker_manager + .announce(tracker, &self.torrents[&info_hash]); + } + } } } } diff --git a/torment-manager/src/torment_instance.rs b/torment-manager/src/torment_instance.rs new file mode 100644 index 0000000..b0a9c44 --- /dev/null +++ b/torment-manager/src/torment_instance.rs @@ -0,0 +1,432 @@ +#![allow(dead_code)] + +use crate::session_manager::SessionManager; +use bytes::{Buf, Bytes, BytesMut}; +use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; +use polling::{Event, Poller}; +use rand::seq::SliceRandom; +use std::cmp::max; +use std::collections::{HashMap, HashSet, VecDeque}; +use std::convert::TryInto; +use std::io::{ErrorKind, Read, Write}; +use std::net::{IpAddr, Ipv6Addr, SocketAddr, TcpListener, TcpStream}; +use std::ops::Add; +use std::option::Option::Some; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::thread::JoinHandle; +use std::time::{Duration, Instant}; +use torment_core::infohash::v1::U160; +use torment_core::LookupFilter; +use torment_peer::message::{Handshake, Message}; +use torment_peer::PeerProtocol; +use torment_peer::PeerProtocol::TCP; + +pub struct TormentInstance { + id: U160, + port: u16, + dht_enabled: bool, + connections: OffshoreConnections, + tracking: HashMap, + handshake_sent: HashSet, + session: SessionManager, +} + +struct OffshoreConnections { + control_receiver: Receiver, + thread: JoinHandle<()>, + thread_sender: Sender, +} + +enum ControlMessage { + Data(SocketAddr, Bytes), + Handshake(Handshake, SocketAddr, PeerProtocol), + Own(TcpStream, SocketAddr), + Disown(SocketAddr), +} + +struct Stream { + buffer: BytesMut, + addr: SocketAddr, + stream: TcpStream, + handshake_received: bool, + id: usize, + handshake_sent: bool, +} + +fn connection_pool_thread( + port: u16, + control_sender: Sender, + thread_receiver: Receiver, +) { + let mut streams: HashMap = HashMap::new(); + let counter = AtomicUsize::new(0); + let mut index: HashMap = HashMap::new(); + let listener = + TcpListener::bind(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), port)).unwrap(); + + let (notify_sender, notify_receiver) = bounded(1); + let (notify_restart_sender, notify_restart_receiver) = bounded(1); + + let poller = Arc::new(Poller::new().unwrap()); + let thread_receiver_clone = thread_receiver.clone(); + let poller_clone = Arc::clone(&poller); + + std::thread::Builder::new() + .name(format!("io/notify")) + .spawn(move || { + for message in thread_receiver { + // println!("Received channel message, notifying wait"); + poller_clone.notify().unwrap(); + notify_sender.send(message).unwrap(); + notify_restart_receiver.recv().unwrap(); + } + }); + + poller.insert(&listener).unwrap(); + poller.interest(&listener, Event::readable(usize::MAX)); + + let mut buffer = [0u8; 1024 * 1024]; + loop { + let mut events = vec![]; + match poller.wait(&mut events, None) { + Ok(_) => {} + Err(err) if err.kind() == ErrorKind::Interrupted => {} + Err(err) => Err(err).unwrap(), + } + + // println!("Events => {:?}", events); + + for event in events { + if !event.readable { + continue; + } + + if event.key == usize::MAX { + while let Ok((stream, addr)) = listener.accept() { + let id = counter.fetch_add(1, Ordering::AcqRel); + stream.set_nodelay(true).unwrap(); + poller.insert(&stream).unwrap(); + poller.interest(&stream, Event::readable(id)).unwrap(); + streams.insert( + addr, + Stream { + addr, + handshake_received: false, + handshake_sent: false, + buffer: BytesMut::new(), + stream, + id, + }, + ); + index.insert(id, addr); + } + + poller + .interest(&listener, Event::readable(usize::MAX)) + .unwrap(); + continue; + } + + let addr = index[&event.key]; + let size = streams + .get_mut(&addr) + .unwrap() + .stream + .read(&mut buffer) + .unwrap_or(0); + + if size == 0 { + control_sender.send(ControlMessage::Disown(addr)).unwrap(); + index.remove(&event.key); + if let Some(stream) = streams.remove(&addr) { + poller.remove(&stream.stream).unwrap(); + } + println!("{} == Disconnected", addr); + continue; + } + + // println!("{} => {} bytes", addr, size); + + let stream = streams.get_mut(&addr).unwrap(); + stream.buffer.extend_from_slice(&buffer[0..size]); + + if stream.buffer.len() >= 68 && !stream.handshake_received { + let handshake = Handshake::from_bytes(stream.buffer.split_to(68).freeze()); + let handshake = match handshake { + Err(_) => { + control_sender.send(ControlMessage::Disown(addr)).unwrap(); + index.remove(&event.key); + if let Some(stream) = streams.remove(&addr) { + poller.remove(&stream.stream).unwrap(); + } + + continue; + } + Ok(hand) => hand, + }; + + println!("{} => {:?}", stream.addr, handshake); + control_sender + .send(ControlMessage::Handshake(handshake, stream.addr, TCP)) + .unwrap(); + + stream.handshake_received = true; + } + + poller + .interest(&stream.stream, Event::readable(event.key)) + .unwrap(); + + if !stream.handshake_received { + continue; + } + + while stream.buffer.len() >= 4 { + let len = u32::from_be_bytes(stream.buffer[..4].try_into().unwrap()); + if len == 0 { + stream.buffer.advance(4); + continue; + } + + if len + 4 > stream.buffer.len() as u32 { + break; + } + + let mut message_bytes = stream.buffer.split_to((4 + len) as usize); + message_bytes.advance(4); + control_sender + .send(ControlMessage::Data(stream.addr, message_bytes.freeze())) + .unwrap(); + } + } + + let mut msgs = vec![]; + if let Ok(message) = notify_receiver.try_recv() { + msgs.push(message); + } + + while let Ok(message) = thread_receiver_clone.try_recv() { + msgs.push(message); + } + + for message in msgs { + match message { + ControlMessage::Data(target, data) => { + let ok = if let Some(stream) = streams.get_mut(&target) { + if stream.stream.write_all(&data).is_err() { + control_sender + .send(ControlMessage::Disown(stream.addr)) + .unwrap(); + + false + } else { + true + } + } else { + true + }; + + if !ok { + if let Some(stream) = streams.remove(&target) { + poller.remove(&stream.stream).unwrap(); + index.remove(&stream.id); + println!("{} == Disconnected", stream.addr); + } + } + } + + ControlMessage::Disown(addr) => { + if let Some(stream) = streams.remove(&addr) { + index.remove(&stream.id); + poller.remove(&stream.stream).unwrap(); + } + } + + ControlMessage::Own(stream, addr) => { + let id = counter.fetch_add(1, Ordering::AcqRel); + poller.insert(&stream).unwrap(); + poller.interest(&stream, Event::readable(id)).unwrap(); + streams.insert( + addr, + Stream { + addr, + handshake_received: false, + handshake_sent: true, + buffer: BytesMut::new(), + stream, + id, + }, + ); + index.insert(id, addr); + } + + _ => {} + }; + } + + notify_restart_sender.send(()).unwrap(); + } +} + +impl TormentInstance { + pub fn new(port: u16, session: SessionManager) -> TormentInstance { + let (control_sender, control_receiver) = unbounded(); + + let (thread_sender, thread_receiver) = unbounded(); + let thread = std::thread::Builder::new() + .name(format!("io/poll")) + .spawn(move || connection_pool_thread(port, control_sender, thread_receiver)) + .unwrap(); + + TormentInstance { + id: session.id(), + port, + dht_enabled: false, + connections: OffshoreConnections { + control_receiver, + thread, + thread_sender, + }, + tracking: Default::default(), + handshake_sent: Default::default(), + session, + } + } + + pub fn tracker_logic(&mut self) { + self.session.announce(); + self.session.tracker_manager_mut().house_keeping(); + + for torrent in self.session.torrents() { + let peer_count = self.session.peer_count(torrent); + if peer_count > 25 { + continue; + } + + let mut peers = self + .session + .peer_storage + .get_peers(torrent, LookupFilter::All); + + println!("have {} peers for {}", peers.len(), torrent); + + peers.shuffle(&mut rand::thread_rng()); + + let mut todo = max(0, 25 - peer_count); + while todo > 0 && peers.len() > 0 { + let peer = peers.pop().unwrap(); + if self.tracking.contains_key(&peer) { + continue; + } + + let thread_sender = self.connections.thread_sender.clone(); + let peer_id = self.id; + + self.handshake_sent.insert(peer); + + std::thread::spawn(move || { + // println!("Trying connection with {}", peer); + if let Ok(mut stream) = TcpStream::connect(peer) { + stream.set_nodelay(true).unwrap(); + stream + .write_all(&Handshake::new(peer_id, torrent).to_bytes()) + .unwrap(); + + // println!("{} <= Connecting", peer); + + thread_sender + .send(ControlMessage::Own(stream, peer)) + .unwrap(); + } + }); + + todo -= 1; + } + } + } + + pub fn logic_loop(&mut self) { + let mut next_house_keeping = Instant::now(); + + loop { + if next_house_keeping < Instant::now() { + self.session.house_keeping(); + self.tracker_logic(); + next_house_keeping = Instant::now().add(Duration::from_secs(10)); + } + + while next_house_keeping > Instant::now() { + if let Ok(message) = self + .connections + .control_receiver + .recv_timeout(Duration::from_secs(1)) + { + match message { + ControlMessage::Handshake(handshake, addr, protocol) => { + let handshake_sent = self.handshake_sent.remove(&addr); + + self.tracking + .insert(addr, (handshake.peer_id(), handshake.info_hash())); + if self.session.handshake(handshake, addr, protocol) { + if !handshake_sent { + self.connections + .thread_sender + .send(ControlMessage::Data( + addr, + Bytes::from( + Handshake::new(self.id, handshake.info_hash()) + .to_bytes() + .to_vec(), + ), + )) + .unwrap(); + } + } + } + + ControlMessage::Data(sock_addr, data) => { + if let Some((peer_id, info_hash)) = + self.tracking.get(&sock_addr).copied() + { + let message = Message::from_bytes(data); + // println!("{} => {:?}", sock_addr, message); + if message.is_err() { + continue; + } + + self.session.process(info_hash, peer_id, message.unwrap()); + } else { + // println!("{} => ????", sock_addr); + } + } + + _ => {} + } + } + + let queue: HashMap<_, VecDeque<_>> = + self.session + .dump_queue() + .into_iter() + .fold(HashMap::new(), |mut map, item| { + map.entry(item.addr).or_default().push_back(item.message); + map + }); + for (addr, messages) in queue { + let mut bytes = BytesMut::new(); + + for msg in messages { + // println!("{} <= {:?}", addr, msg); + bytes.extend_from_slice(&msg.to_length_prefixed_bytes()); + } + + self.connections + .thread_sender + .send(ControlMessage::Data(addr, bytes.freeze())) + .unwrap() + } + } + } + } +} diff --git a/torment-manager/src/torrent_manager.rs b/torment-manager/src/torrent_manager.rs index f37e659..db39d9f 100644 --- a/torment-manager/src/torrent_manager.rs +++ b/torment-manager/src/torrent_manager.rs @@ -1,15 +1,19 @@ use crate::tracker_manager::{TrackerId, TrackerManager}; +use crate::MAX_OPEN_REQUESTS; use bytes::Bytes; use ring::digest::{digest, SHA1_FOR_LEGACY_USE_ONLY}; +use std::cmp::max; use std::collections::{HashMap, HashSet, VecDeque}; use std::net::SocketAddr; -use std::ops::Index; +use std::ops::Add; use std::option::Option::Some; use std::path::PathBuf; +use std::time::{Duration, Instant}; use torment_core::infohash::v1::U160; use torment_core::metainfo::{MetaInfo, Torrent}; -use torment_core::Bitfield; -use torment_peer::message::{Handshake, Message, PieceMessage, SelectionMessage}; +use torment_core::utils::EphemeralSet; +use torment_core::{Bitfield, REQUEST_SIZE}; +use torment_peer::message::{Handshake, Message, PieceMessage}; use torment_peer::{Peer, PeerProtocol}; use torment_storage::{StorageMap, ToStorageMap}; @@ -46,15 +50,53 @@ pub struct TorrentManager { peers: HashMap, queue: VecDeque<(U160, Message)>, storage_map: StorageMap, + requests_queue: VecDeque, + open_requests: HashMap>, + uploaded: usize, + downloaded: usize, } -pub const REQUEST_SIZE: usize = 16384; - impl TorrentManager { + pub fn peer_count(&self) -> usize { + self.peers.len() + } + + pub fn trackers(&self) -> &Vec> { + &self.trackers + } + pub fn info_hash(&self) -> U160 { self.info_hash } + pub fn is_done(&self) -> bool { + self.bitfield.all() + } + + pub fn uploaded(&self) -> usize { + self.uploaded + } + + pub fn downloaded(&self) -> usize { + self.downloaded + } + + pub fn bytes_left(&self) -> i64 { + let mut size = self.storage_map.size() as i64; + let piece_length = self.storage_map.get_piece_length(0) as i64; + for i in 0..self.bitfield.size() as u32 { + if self.bitfield.get(i) { + size -= piece_length; + } + } + + max(0, size) + } + + pub fn dump_queue(&mut self) -> VecDeque<(U160, Message)> { + std::mem::replace(&mut self.queue, VecDeque::new()) + } + pub fn from_torrent( torrent: Torrent, target: TorrentTarget, @@ -84,9 +126,13 @@ impl TorrentManager { torrent: Some(torrent.clone()), }), storage_map: torrent.to_storage_map(&target.path, target.is_base_path), + requests_queue: Default::default(), target, peers: HashMap::new(), queue: Default::default(), + open_requests: Default::default(), + uploaded: 0, + downloaded: 0, } } @@ -107,7 +153,8 @@ impl TorrentManager { } let meta_info = self.meta_info(); - let peer = Peer::new(addr, protocol, handshake, meta_info); + let mut peer = Peer::new(addr, protocol, handshake, meta_info); + peer.send_bitfield(&self.bitfield); self.peers.insert(peer.id(), peer); true @@ -127,12 +174,12 @@ impl TorrentManager { pub fn process(&mut self, peer_id: U160, message: Message) -> bool { let mut queue = vec![]; + if let Message::Piece(piece) = &message { + self.downloaded += piece.piece.len() + } + let ok = if let Some(peer) = self.peers.get_mut(&peer_id) { if peer.process(message) { - while let Some(message) = peer.next() { - self.queue.push_back((peer_id, message)); - } - while let Some(piece) = peer.next_piece() { if self.storage_map.has_piece(piece.index() as usize) { continue; @@ -151,6 +198,10 @@ impl TorrentManager { } } + while let Some(_) = peer.next_have() { + // Something i guess + } + while let Some(piece_request) = peer.next_request() { if !self.bitfield.get(piece_request.index()) { continue; @@ -175,6 +226,9 @@ impl TorrentManager { )) } + self.queue_requests(peer_id); + let msgs = self.peers.get_mut(&peer_id).unwrap().dump_queue(); + self.queue_messages(peer_id, msgs); true } else { false @@ -183,6 +237,7 @@ impl TorrentManager { false }; + // offload to io thread? for have in queue { let piece_hash = self.meta_info().hash(have); let piece_data = self.storage_map.read_piece(have).unwrap(); @@ -203,7 +258,6 @@ impl TorrentManager { if !self.peers[&key].has_piece(have as u32) && self.peers[&key].we_choked() { self.peers.get_mut(&key).unwrap().set_we_choked(false); - self.queue.push_back((key, Message::Unchoke)); } } } @@ -211,65 +265,133 @@ impl TorrentManager { ok } + fn queue_messages(&mut self, peer_id: U160, messages: VecDeque) { + for msg in messages { + if let Message::Piece(piece) = &msg { + self.uploaded += piece.piece.len(); + } + + self.queue.push_back((peer_id, msg)); + } + } + pub fn next(&mut self) -> Option<(U160, Message)> { self.queue.pop_front() } - pub fn house_keeping(&mut self) { - let mut peers = self - .peers - .iter() - .filter_map(|(_, peer)| { - if peer.has()1 - - if peer.is_choked() { - None - } else { - Some(peer.id()) - } - }) - .collect::>(); - - let pieces = self.meta_info().pieces() as u32; - for i in 0u32..pieces { - if self.bitfield.get(i) { + fn get_next_request_piece_for_peer(&mut self, peer: &U160) -> Option<(u32, u32, u32)> { + let mut queue_index = 0; + while let Some(index) = self.requests_queue.get(queue_index).copied() { + if !self.peers[peer].has_piece(index) { + queue_index += 1; continue; } - let length = self.storage_map.get_piece_length(i as usize); - let mut offset = 0; + let bits = self.storage_map.get_bits_in_pieces(index as usize); + if self.open_requests.get(&index).map_or(0, |set| set.len()) >= bits { + self.requests_queue.remove(0); + } - for peer in &peers.clone() { - if !self.peers[peer].has_piece(i) { + let set = self + .open_requests + .entry(index) + .or_insert(EphemeralSet::new()); + for i in 0..bits as u32 { + let bit_offset = i * REQUEST_SIZE as u32; + if set.contains(&bit_offset) { continue; } - while offset < length && self.peers[peer].count_open_requests() < 25 { - if !self.storage_map.has_piece_bit(i as usize, offset) { - let request_length = if (offset + REQUEST_SIZE) > length { - length - offset - } else { - REQUEST_SIZE - }; + set.insert(bit_offset, Instant::now().add(Duration::from_secs(10))); + return Some(( + index, + bit_offset, + self.storage_map + .get_bit_length(index as usize, bit_offset as usize), + )); + } + } - let msg = SelectionMessage::new(i, offset as u32, request_length as u32); + None + } - if self.peers.get_mut(peer).unwrap().requested(msg) { - self.queue.push_back((*peer, Message::Request(msg))); - } - } + pub fn house_keeping(&mut self) { + let mut map: HashMap = HashMap::new(); + let mut interested_peers = HashSet::new(); + if self.peers.len() <= 0 { + return; + } - offset += REQUEST_SIZE; - } + let mut done = 0; - if self.peers[peer].count_open_requests() >= 25 { - peers.remove(peer); - } + for i in 0..self.meta_info().pieces() as u32 { + if self.bitfield.get(i) { + done += 1; + // Don't need to queue anything + continue; + } - if offset >= length { - break; + let mut entries = 0; + for (_, peer) in &mut self.peers { + if peer.has_piece(i) { + entries += 1; + interested_peers.insert(peer.id()); } } + + if entries == 0 { + // Can't queue entries without peers + continue; + } + + map.insert(i, entries); + } + + println!( + "{}%", + (done as f64 / self.meta_info().pieces() as f64) * 100f64 + ); + + let mut pieces: Vec<_> = map.keys().copied().collect(); + pieces.sort_by_key(|piece| map[piece]); + self.requests_queue = VecDeque::from(pieces); + + for (_, peer) in &mut self.peers { + if !interested_peers.contains(&peer.id()) || self.requests_queue.len() == 0 { + peer.lost_interest(); + continue; + } + + peer.interested(); + if peer.is_choked() { + // Don't send requests if they're choked + continue; + } + } + + let peer_ids = self.peers.keys().copied().collect::>(); + for peer_id in peer_ids { + self.queue_requests(peer_id); + let message_queue = self.peers.get_mut(&peer_id).unwrap().dump_queue(); + self.queue_messages(peer_id, message_queue); + } + } + + fn queue_requests(&mut self, peer_id: U160) { + if self.peers[&peer_id].is_choked() { + return; + } + + let amount = MAX_OPEN_REQUESTS - self.peers[&peer_id].count_open_requests(); + for _ in 0..amount { + if let Some((index, offset, length)) = self.get_next_request_piece_for_peer(&peer_id) { + self.peers + .get_mut(&peer_id) + .unwrap() + .request(index, offset, length); + } else { + break; + } } } } diff --git a/torment-manager/src/tracker_manager.rs b/torment-manager/src/tracker_manager.rs index 15d407b..833de56 100644 --- a/torment-manager/src/tracker_manager.rs +++ b/torment-manager/src/tracker_manager.rs @@ -1,20 +1,94 @@ +use crate::torrent_manager::TorrentManager; +use crossbeam_channel::{unbounded, Receiver, Sender}; +use reqwest::blocking::Client; +use reqwest::header::HeaderMap; use std::collections::HashMap; +use std::net::IpAddr; use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::RwLock; +use std::thread::JoinHandle; +use torment_core::infohash::v1::U160; +use torment_core::peer_storage::PeerStorageHolder; +use torment_tracker::{ + block_do_announce, Tracker, TrackerAnnounceRequest, TrackerAnnounceResponse, +}; use url::Url; pub struct TrackerManager { + id: U160, + ip: IpAddr, + port: u16, counter: AtomicUsize, - trackers: HashMap, + trackers: HashMap>, url_index: HashMap, + peer_storage: PeerStorageHolder, + tracker_offshore_state: TrackerOffshoreState, } -pub struct Tracker { - url: Url, +struct TrackerOffshoreState { + threads: Vec>, + tracker_request_sender: Sender<(usize, TrackerAnnounceRequest)>, + tracker_request_receiver: Receiver<(usize, TrackerAnnounceRequest)>, + tracker_response_sender: Sender<(usize, TrackerAnnounceResponse)>, + tracker_response_receiver: Receiver<(usize, TrackerAnnounceResponse)>, +} + +impl TrackerOffshoreState { + fn new() -> TrackerOffshoreState { + let (req_send, req_recv) = unbounded(); + let (resp_send, resp_recv) = unbounded(); + + TrackerOffshoreState { + threads: vec![], + tracker_request_sender: req_send, + tracker_request_receiver: req_recv, + tracker_response_sender: resp_send, + tracker_response_receiver: resp_recv, + } + } } pub type TrackerId = usize; +fn tracker_thread( + input: Receiver<(usize, TrackerAnnounceRequest)>, + output: Sender<(usize, TrackerAnnounceResponse)>, +) { + let mut map = HeaderMap::new(); + map.insert( + reqwest::header::USER_AGENT, + format!("eater's Torment/{}", env!("CARGO_PKG_VERSION")) + .parse() + .unwrap(), + ); + let client = Client::builder().default_headers(map).build().unwrap(); + + for (tracker_id, req) in input { + output + .send((tracker_id, block_do_announce(req, &client))) + .unwrap(); + } +} + impl TrackerManager { + pub fn new( + ip: IpAddr, + port: u16, + peer_id: U160, + peer_storage: PeerStorageHolder, + ) -> TrackerManager { + TrackerManager { + id: peer_id, + ip, + port, + counter: Default::default(), + trackers: Default::default(), + url_index: Default::default(), + peer_storage, + tracker_offshore_state: TrackerOffshoreState::new(), + } + } + pub fn get_tracker_id(&mut self, url: &Url) -> Option { // dht is not a tracker bye if url.scheme() == "dht" { @@ -28,7 +102,68 @@ impl TrackerManager { let new_id = self.counter.fetch_add(1, Ordering::AcqRel); self.url_index.insert(url_str.to_string(), new_id); + self.trackers.insert( + new_id, + RwLock::new(Tracker::new(url.clone(), self.peer_storage.clone())), + ); Some(new_id) } + + pub fn announce(&mut self, tracker_id: usize, torrent: &TorrentManager) { + let id = torrent.info_hash(); + { + let tracker = &self.trackers[&tracker_id].read().unwrap(); + if !tracker.needs_update(id) { + return; + } + } + + let tracker = &mut self.trackers[&tracker_id].write().unwrap(); + println!( + "Queueing announce {} on {}", + torrent.info_hash(), + tracker.url() + ); + + let req = tracker.create_update_request( + id, + self.id, + self.ip, + self.port, + torrent.uploaded(), + torrent.downloaded(), + torrent.bytes_left(), + ); + + self.tracker_offshore_state + .tracker_request_sender + .send((tracker_id, req)) + .unwrap() + } + + pub fn house_keeping(&mut self) { + while self.tracker_offshore_state.threads.len() < 5 { + let sender = self.tracker_offshore_state.tracker_response_sender.clone(); + let receiver = self.tracker_offshore_state.tracker_request_receiver.clone(); + + self.tracker_offshore_state.threads.push( + std::thread::Builder::new() + .name(format!( + "tracker/{}", + self.tracker_offshore_state.threads.len() + )) + .spawn(|| tracker_thread(receiver, sender)) + .unwrap(), + ); + } + + while let Ok((tracker_id, resp)) = self + .tracker_offshore_state + .tracker_response_receiver + .try_recv() + { + self.trackers[&tracker_id].write().unwrap().process(resp); + } + } } diff --git a/torment-peer/src/lib.rs b/torment-peer/src/lib.rs index ca8840c..9eec180 100644 --- a/torment-peer/src/lib.rs +++ b/torment-peer/src/lib.rs @@ -48,6 +48,7 @@ pub struct Peer { requested_queue: HashSet, received_pieces: VecDeque, queue: VecDeque, + have_queue: VecDeque, } impl Peer { @@ -55,6 +56,10 @@ impl Peer { self.id } + pub fn dump_queue(&mut self) -> VecDeque { + std::mem::replace(&mut self.queue, VecDeque::new()) + } + pub fn new( address: SocketAddr, protocol: PeerProtocol, @@ -74,6 +79,7 @@ impl Peer { requested_queue: Default::default(), received_pieces: Default::default(), queue: Default::default(), + have_queue: Default::default(), } } @@ -87,7 +93,10 @@ impl Peer { Message::Unchoke => self.their_state.chocked = false, Message::Interested => self.their_state.interested = true, Message::NotInterested => self.their_state.interested = false, - Message::Have(nr) => self.has.set(nr), + Message::Have(nr) => { + self.has.set(nr); + self.have_queue.push_back(nr); + } Message::Bitfield(bitfield) => self.has.add(bitfield), Message::Request(selection) => { self.request_queue.push_back(selection); @@ -130,17 +139,68 @@ impl Peer { } pub fn set_we_choked(&mut self, choked: bool) { + if self.our_state.chocked != choked { + self.queue.push_back(if choked { + Message::Choke + } else { + Message::Unchoke + }) + } self.our_state.chocked = choked; } + pub fn send_bitfield(&mut self, bitfield: &Bitfield) { + self.queue.push_back(Message::Bitfield(bitfield.clone())); + } + pub fn set_we_interested(&mut self, interested: bool) { self.our_state.interested = interested; } + pub fn request(&mut self, index: u32, offset: u32, length: u32) { + let msg = SelectionMessage::new(index, offset, length); + if self.requested(msg) { + self.queue.push_back(Message::Request(msg)); + } + } + pub fn requested(&mut self, selection: SelectionMessage) -> bool { self.requested_queue.insert(selection) } + pub fn unchoke(&mut self) { + if !self.our_state.chocked { + return; + } + + self.queue.push_back(Message::Unchoke); + } + + pub fn choke(&mut self) { + if self.our_state.chocked { + return; + } + + self.queue.push_back(Message::Choke); + } + + pub fn interested(&mut self) { + if self.our_state.interested { + return; + } + + self.our_state.interested = true; + self.queue.push_back(Message::Interested); + } + + pub fn lost_interest(&mut self) { + if !self.our_state.interested { + return; + } + + self.queue.push_back(Message::NotInterested); + } + pub fn next_piece(&mut self) -> Option { self.received_pieces.pop_front() } @@ -157,6 +217,7 @@ impl Peer { pub fn addr(&self) -> SocketAddr { self.address } + pub fn has_piece(&self, index: u32) -> bool { self.has.get(index as u32) } @@ -164,12 +225,8 @@ impl Peer { pub fn next(&mut self) -> Option { self.queue.pop_front() } -} -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - assert_eq!(2 + 2, 4); + pub fn next_have(&mut self) -> Option { + self.have_queue.pop_front() } } diff --git a/torment-peer/src/message.rs b/torment-peer/src/message.rs index 36669cc..c50651a 100644 --- a/torment-peer/src/message.rs +++ b/torment-peer/src/message.rs @@ -49,7 +49,7 @@ impl Message { Message::Have(u32::from_be_bytes(bytes[1..5].try_into().unwrap())) } - 5 => Message::Bitfield(Bitfield::new(bytes.slice(1..))), + 5 => Message::Bitfield(Bitfield::new(bytes.slice(1..), bytes.len() - 1)), 6 | 8 => { if bytes.len() < 13 { diff --git a/torment-storage/src/lib.rs b/torment-storage/src/lib.rs index 6f8cdb1..8ddc231 100644 --- a/torment-storage/src/lib.rs +++ b/torment-storage/src/lib.rs @@ -1,20 +1,16 @@ -use bytes::{Bytes, BytesMut}; use lazy_static::lazy_static; use remem::{ItemGuard, Pool}; -use std::borrow::Borrow; use std::cmp::min; use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::{Debug, Formatter}; use std::fs::{DirBuilder, File, OpenOptions}; use std::io; use std::io::{Read, Seek, SeekFrom, Write}; -use std::ops::{Add, Deref, Range}; use std::path::{Path, PathBuf, MAIN_SEPARATOR}; use std::rc::Rc; use std::sync::Mutex; -use std::time::{Duration, Instant}; -use torment_core::metainfo::{MetaInfo, MetaInfoObject, Torrent}; -use torment_core::utils::EphemeralMap; +use torment_core::metainfo::{MetaInfoObject, Torrent}; +use torment_core::REQUEST_SIZE; lazy_static! { static ref MEMORY_POOL: Pool> = Pool::new(|| vec![0u8; 4194304]); @@ -24,6 +20,7 @@ lazy_static! { pub struct StorageMap { base_path: PathBuf, piece_length: usize, + bits_in_piece: usize, pieces: usize, open_files: HashMap>>, buffer: HashMap, @@ -33,14 +30,14 @@ pub struct StorageMap { pub struct StoragePiece { index: usize, - pieces: usize, + bits: usize, ranges: HashSet, buffer: ItemGuard<'static, Vec>, } impl StoragePiece { fn is_complete(&self) -> bool { - self.pieces == self.ranges.len() + self.bits == self.ranges.len() } } @@ -55,7 +52,6 @@ impl Debug for StoragePiece { impl StorageMap { fn get_offsets(&self, offset: usize, length: usize) -> Vec { - let x = 0..1; let end = offset + length; let mut range = self.mapping.range(offset..); let mut items = vec![]; @@ -142,7 +138,7 @@ impl StorageMap { pub fn read_piece(&mut self, index: usize) -> io::Result> { let mut bytes = vec![0; self.get_piece_length(index)]; - self.read(index, 0, &mut bytes); + self.read(index, 0, &mut bytes)?; Ok(bytes) } @@ -166,21 +162,14 @@ impl StorageMap { let item = if let Some(item) = self.buffer.get_mut(&index) { item } else { - let request_size = 2usize.pow(14); - let piece_length = self.get_piece_length(index); - let pieces = (piece_length / request_size) - + if piece_length % request_size > 0 { - 1 - } else { - 0 - }; + let bits = self.get_bits_in_pieces(index); self.buffer.insert(index, { StoragePiece { index, ranges: Default::default(), buffer: MEMORY_POOL.get(), - pieces, + bits, } }); @@ -209,16 +198,40 @@ impl StorageMap { } pub fn get_piece_length(&self, index: usize) -> usize { - if index + 1 == self.pieces { - let len = (self.size % self.piece_length); - if len == 0 { - self.piece_length + if index + 1 < self.pieces { + return self.piece_length; + } + + let len = self.size % self.piece_length; + if len == 0 { + self.piece_length + } else { + len + } + } + + pub fn get_bits_in_pieces(&self, index: usize) -> usize { + if index + 1 < self.pieces { + return self.bits_in_piece; + } + + let piece_length = self.get_piece_length(index); + + (piece_length / REQUEST_SIZE) + + if piece_length % REQUEST_SIZE > 0 { + 1 } else { - len + 0 } - } else { - self.piece_length + } + + pub fn get_bit_length(&self, index: usize, offset: usize) -> u32 { + let piece_length = self.get_piece_length(index); + if offset + REQUEST_SIZE <= piece_length { + return REQUEST_SIZE as u32; } + + (piece_length % REQUEST_SIZE) as u32 } pub fn has_piece_bit(&self, index: usize, offset: usize) -> bool { @@ -330,15 +343,25 @@ impl StorageMapBuilder { } pub fn build(self) -> StorageMap { + let pieces = (self.offset / self.piece_length) + + if self.offset % self.piece_length > 0 { + 1 + } else { + 0 + }; + + let bits_in_piece = (self.piece_length / REQUEST_SIZE) + + if (self.piece_length % REQUEST_SIZE) > 0 { + 1 + } else { + 0 + }; + StorageMap { base_path: self.base_path, piece_length: self.piece_length, - pieces: (self.offset / self.piece_length) - + if self.offset % self.piece_length > 0 { - 1 - } else { - 0 - }, + pieces, + bits_in_piece, open_files: Default::default(), buffer: Default::default(), mapping: self.items, diff --git a/torment-tracker/Cargo.toml b/torment-tracker/Cargo.toml new file mode 100644 index 0000000..b1d1352 --- /dev/null +++ b/torment-tracker/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "torment-tracker" +version = "0.1.0" +authors = ["eater <=@eater.me>"] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +torment-core = { path = "../torment-core" } +torment-bencode = { path = "../torment-bencode" } +reqwest = { version = "0.10", features = ["blocking"] } +url = "2.1.1" \ No newline at end of file diff --git a/torment-tracker/src/lib.rs b/torment-tracker/src/lib.rs new file mode 100644 index 0000000..2c0416d --- /dev/null +++ b/torment-tracker/src/lib.rs @@ -0,0 +1,269 @@ +use reqwest::blocking::Client; +use reqwest::{Method, Url}; +use std::collections::HashMap; +use std::convert::TryInto; +use std::net::{IpAddr, SocketAddr}; +use std::ops::Add; +use std::str::FromStr; +use std::time::{Duration, Instant}; +use torment_bencode::BencodeValue; +use torment_core::infohash::v1::U160; +use torment_core::infohash::InfoHashCapable; +use torment_core::peer_storage::{PeerStorageHolder, PeerStorageSource}; +use torment_core::CompactContact; + +pub struct Tracker { + url: Url, + is_udp: bool, + // last_scrape: Option, + last_announce: HashMap, + requested_interval: Duration, + failure: HashMap, + peer_storage: PeerStorageHolder, + // supports_scrape: bool, +} + +pub enum TrackerAnnounceRequest { + HTTP(Url, U160), + UDP(Url, U160), +} + +#[derive(Debug)] +pub struct TrackerAnnounceResponse { + info_hash: U160, + // url: Url, + interval: Option, + source: PeerStorageSource, + result: Result, String>, +} + +impl Tracker { + pub fn url(&self) -> &Url { + &self.url + } + + pub fn new(url: Url, peer_storage: PeerStorageHolder) -> Tracker { + Tracker { + is_udp: url.scheme() == "udp", + url, + last_announce: HashMap::new(), + requested_interval: Duration::from_secs(30 * 60), + failure: HashMap::new(), + peer_storage, + } + } + + pub fn needs_update(&self, info_hash: U160) -> bool { + self.last_announce + .get(&info_hash) + .map(|inst| inst.add(self.requested_interval) < Instant::now()) + .unwrap_or(true) + } + + pub fn process(&mut self, response: TrackerAnnounceResponse) { + match response.result { + Ok(peers) => { + self.peer_storage + .add_peers(response.info_hash, peers, response.source); + } + + Err(err) => { + self.failure.insert(response.info_hash, err); + } + } + + if let Some(interval) = response.interval { + self.requested_interval = interval; + } else { + self.requested_interval = Duration::from_secs(60 * 30); + } + } + + pub fn create_update_request( + &mut self, + info_hash: U160, + peer_id: U160, + ip: IpAddr, + port: u16, + uploaded: usize, + downloaded: usize, + left: i64, + ) -> TrackerAnnounceRequest { + if self.is_udp { + return TrackerAnnounceRequest::UDP(self.url.clone(), info_hash); + } + + let mut url = self.url.clone(); + info_hash.to_byte_array(); + { + let mut query = url.query_pairs_mut(); + query.append_pair("ip", &format!("{}", ip)); + query.append_pair("port", &format!("{}", port)); + query.append_pair("uploaded", &format!("{}", uploaded)); + query.append_pair("downloaded", &format!("{}", downloaded)); + query.append_pair("left", &format!("{}", left)); + } + + let encoded_info_hash = url::form_urlencoded::byte_serialize(&info_hash.to_bytes()).fold( + String::new(), + |mut coll, curr| { + coll.push_str(curr); + coll + }, + ); + + let encoded_peer_id = url::form_urlencoded::byte_serialize(&peer_id.to_bytes()).fold( + String::new(), + |mut coll, curr| { + coll.push_str(curr); + coll + }, + ); + let query = url.query().unwrap_or(""); + url.set_query(Some(&format!( + "{}&info_hash={}&peer_id={}", + query, encoded_info_hash, encoded_peer_id + ))); + + self.last_announce.insert(info_hash, Instant::now()); + + TrackerAnnounceRequest::HTTP(url, info_hash) + } +} + +pub fn block_do_announce( + request: TrackerAnnounceRequest, + client: &Client, +) -> TrackerAnnounceResponse { + let (url, info_hash) = match request { + TrackerAnnounceRequest::HTTP(url, info_hash) => (url, info_hash), + TrackerAnnounceRequest::UDP(url, info_hash) => { + return TrackerAnnounceResponse { + info_hash, + interval: None, + source: PeerStorageSource::Tracker(url.clone()), + // url, + result: Err(format!("UDP trackers not supported")), + }; + } + }; + + println!("Announcing {} on {}", info_hash, url); + + let res = block_do_http_response_parse(url.clone(), client); + + TrackerAnnounceResponse { + info_hash, + source: PeerStorageSource::Tracker(url.clone()), + // url, + interval: res.as_ref().map(|x| x.0).ok(), + result: res.map(|x| x.1), + } +} + +pub fn block_do_http_response_parse( + url: Url, + http_client: &Client, +) -> Result<(Duration, Vec), String> { + let resp = match http_client.request(Method::GET, url).send() { + Ok(resp) => resp, + + Err(err) => { + return Err(format!("{}", err)); + } + }; + + let body = resp.bytes(); + if let Err(err) = body { + return Err(format!("{}", err)); + } + + let body = body.unwrap(); + let body = BencodeValue::decode(body); + if let Err(err) = body { + return Err(format!("Bencoding error: {}", err)); + } + + let body = body.unwrap(); + let body = body.dict(); + if body.is_none() { + return Err(format!("Tracker gave invalid response")); + } + + let body = body.unwrap(); + + if let Some(failure) = body.get(b"failure reason") { + let failure = failure.string().and_then(|x| x.ok()).unwrap_or(format!( + "Tracker failed to respond with valid failure, but did respond with a failure" + )); + + return Err(failure); + } + + let interval = body + .get(b"interval") + .and_then(|item| item.int()) + .unwrap_or(30 * 60); + + let interval = Duration::from_secs(interval as u64); + + let peers = body.get(b"peers"); + + if peers.is_none() { + return Ok((interval, vec![])); + } + + let peers = peers.unwrap(); + + let peers: Vec = match peers { + BencodeValue::List(old) => { + let mut buffer = vec![]; + for item in old.iter() { + let dict = if let Some(dict) = item.dict() { + dict + } else { + continue; + }; + + let ip = if let Some(BencodeValue::Bytes(ip)) = dict.get(b"ip") { + ip.clone() + } else { + continue; + }; + + let ip = if let Some(ip) = IpAddr::from_str(&String::from_utf8_lossy(&ip)).ok() { + ip + } else { + continue; + }; + + let port: u16 = if let Some(BencodeValue::Int(nr)) = dict.get(b"port") { + if *nr < u16::MAX as i64 { + if let Ok(port) = (*nr).try_into() { + port + } else { + continue; + } + } else { + continue; + } + } else { + continue; + }; + + buffer.push(SocketAddr::new(ip, port)); + } + + buffer + } + BencodeValue::Bytes(compact) => compact + .chunks(6) + .filter_map(|x| SocketAddr::from_compact_contact(x).ok()) + .collect(), + _ => { + return Err(format!("Tracker failed to respond with valid response")); + } + }; + + Ok((interval, peers)) +}