From daf7ecd23a26f124b91c915ee8f39cc3d30b0840 Mon Sep 17 00:00:00 2001 From: johannesd3 Date: Sat, 20 Feb 2021 00:17:18 +0100 Subject: [PATCH] Migrate librespot-connect to tokio 1.0 --- Cargo.lock | 323 +++++++++++++++++++++++++++++---------- Cargo.toml | 6 +- connect/Cargo.toml | 8 +- connect/src/discovery.rs | 125 ++++++--------- connect/src/spirc.rs | 312 ++++++++++++++++++------------------- core/src/mercury/mod.rs | 72 ++++----- src/lib.rs | 2 +- 7 files changed, 478 insertions(+), 370 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 00343168..6b6e7af2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -27,21 +27,44 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "884391ef1066acaa41e766ba8f596341b96e93ce34f9a43e7d24bf0a0eaf0561" dependencies = [ - "aes-soft", - "aesni", + "aes-soft 0.6.4", + "aesni 0.10.0", "cipher", ] +[[package]] +name = "aes-ctr" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2e5b0458ea3beae0d1d8c0f3946564f8e10f90646cf78c06b4351052058d1ee" +dependencies = [ + "aes-soft 0.3.3", + "aesni 0.6.0", + "ctr 0.3.2", + "stream-cipher", +] + [[package]] name = "aes-ctr" version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7729c3cde54d67063be556aeac75a81330d802f0259500ca40cb52967f975763" dependencies = [ - "aes-soft", - "aesni", + "aes-soft 0.6.4", + "aesni 0.10.0", "cipher", - "ctr", + "ctr 0.6.0", +] + +[[package]] +name = "aes-soft" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfd7e7ae3f9a1fb5c03b389fc6bb9a51400d0c13053f0dca698c832bfd893a0d" +dependencies = [ + "block-cipher-trait", + "byteorder", + "opaque-debug 0.2.3", ] [[package]] @@ -54,6 +77,17 @@ dependencies = [ "opaque-debug 0.3.0", ] +[[package]] +name = "aesni" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f70a6b5f971e473091ab7cfb5ffac6cde81666c4556751d8d5620ead8abf100" +dependencies = [ + "block-cipher-trait", + "opaque-debug 0.2.3", + "stream-cipher", +] + [[package]] name = "aesni" version = "0.10.0" @@ -223,6 +257,25 @@ dependencies = [ "generic-array 0.12.3", ] +[[package]] +name = "block-cipher-trait" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c924d49bd09e7c06003acda26cd9742e796e34282ec6c1189404dee0c1f4774" +dependencies = [ + "generic-array 0.12.3", +] + +[[package]] +name = "block-modes" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31aa8410095e39fdb732909fb5730a48d5bd7c2e3cd76bd1b07b3dbea130c529" +dependencies = [ + "block-cipher-trait", + "block-padding", +] + [[package]] name = "block-padding" version = "0.1.5" @@ -234,9 +287,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.6.0" +version = "3.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "099e596ef14349721d9016f6b80dd3419ea1bf289ab9b44df8e4dfd3a005d5d9" +checksum = "63396b8a4b9de3f4fdfb320ab6080762242f66a8ef174c49d8e19b674db4cdbe" [[package]] name = "byte-tools" @@ -258,9 +311,9 @@ checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040" [[package]] name = "cc" -version = "1.0.66" +version = "1.0.67" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c0496836a84f8d0495758516b8621a622beb77c0fed418570e50764093ced48" +checksum = "e3c69b077ad434294d3ce9f1f6143a2a4b89a8a2d54ef813d85003a4fd1137fd" [[package]] name = "cesu8" @@ -384,13 +437,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3818dfca4b0cb5211a659bbcbb94225b7127407b2b135e650d717bfb78ab10d3" dependencies = [ "cookie", - "idna 0.2.1", + "idna 0.2.2", "log", "publicsuffix", "serde", "serde_json", "time 0.2.25", - "url 2.2.0", + "url 2.2.1", ] [[package]] @@ -462,6 +515,16 @@ dependencies = [ "subtle", ] +[[package]] +name = "ctr" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "022cd691704491df67d25d006fe8eca083098253c4d43516c2206479c58c6736" +dependencies = [ + "block-cipher-trait", + "stream-cipher", +] + [[package]] name = "ctr" version = "0.6.0" @@ -532,6 +595,16 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" +[[package]] +name = "dns-sd" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d748509dea20228f63ba519bf142ce2593396386125b01f5b0d6412dab972087" +dependencies = [ + "libc", + "pkg-config", +] + [[package]] name = "either" version = "1.6.1" @@ -540,9 +613,9 @@ checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" [[package]] name = "env_logger" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f26ecb66b4bdca6c1409b40fb255eefc2bd4f6d135dab3c3124f80ffa2a9661e" +checksum = "17392a012ea30ef05a610aa97dfb49496e71c9f676b27879922ea5bdf60d9d3f" dependencies = [ "atty", "humantime", @@ -586,7 +659,7 @@ checksum = "1d34cfa13a63ae058bfa601fe9e313bbdb3746427c1459185464ce0fcf62e1e8" dependencies = [ "cfg-if 1.0.0", "libc", - "redox_syscall 0.2.4", + "redox_syscall", "winapi", ] @@ -598,9 +671,9 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" [[package]] name = "form_urlencoded" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ece68d15c92e84fa4f19d3780f1294e5ca82a78a6d515f1efaabcc144688be00" +checksum = "5fc25a87fa4fd2094bffb06925852034d90a17f0d1e05197d4956d3555752191" dependencies = [ "matches", "percent-encoding 2.1.0", @@ -824,9 +897,9 @@ dependencies = [ [[package]] name = "gstreamer" -version = "0.16.5" +version = "0.16.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d50f822055923f1cbede233aa5dfd4ee957cf328fb3076e330886094e11d6cf" +checksum = "9ff5d0f7ff308ae37e6eb47b6ded17785bdea06e438a708cd09e0288c1862f33" dependencies = [ "bitflags", "cfg-if 1.0.0", @@ -974,6 +1047,17 @@ dependencies = [ "digest", ] +[[package]] +name = "hostname" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867" +dependencies = [ + "libc", + "match_cfg", + "winapi", +] + [[package]] name = "http" version = "0.2.3" @@ -1029,7 +1113,7 @@ dependencies = [ "httparse", "httpdate", "itoa", - "pin-project 1.0.5", + "pin-project", "socket2", "tokio", "tower-service", @@ -1056,15 +1140,36 @@ dependencies = [ [[package]] name = "idna" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de910d521f7cc3135c4de8db1cb910e0b5ed1dc6f57c381cd07e8e661ce10094" +checksum = "89829a5d69c23d348314a7ac337fe39173b61149a9864deabd260983aed48c21" dependencies = [ "matches", "unicode-bidi", "unicode-normalization", ] +[[package]] +name = "if-addrs" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28538916eb3f3976311f5dfbe67b5362d0add1293d0a9cad17debf86f8e3aa48" +dependencies = [ + "if-addrs-sys", + "libc", + "winapi", +] + +[[package]] +name = "if-addrs-sys" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de74b9dd780476e837e5eb5ab7c88b49ed304126e412030a0adba99c8efe79ea" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "indexmap" version = "1.6.1" @@ -1222,6 +1327,24 @@ dependencies = [ "winapi", ] +[[package]] +name = "libmdns" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b276920bfc6c9285e16ffd30ed410487f0185f383483f45a3446afc0554fded" +dependencies = [ + "byteorder", + "futures-util", + "hostname", + "if-addrs", + "log", + "multimap", + "quick-error", + "rand 0.8.3", + "socket2", + "tokio", +] + [[package]] name = "libpulse-binding" version = "2.23.0" @@ -1275,6 +1398,7 @@ name = "librespot" version = "0.1.3" dependencies = [ "librespot-audio", + "librespot-connect", "librespot-core", "librespot-metadata", "librespot-playback", @@ -1285,7 +1409,7 @@ dependencies = [ name = "librespot-audio" version = "0.1.3" dependencies = [ - "aes-ctr", + "aes-ctr 0.6.0", "bit-set", "byteorder", "bytes", @@ -1301,6 +1425,33 @@ dependencies = [ "vorbis", ] +[[package]] +name = "librespot-connect" +version = "0.1.3" +dependencies = [ + "aes-ctr 0.3.0", + "base64 0.13.0", + "block-modes", + "dns-sd", + "futures", + "hmac", + "hyper", + "libmdns", + "librespot-core", + "librespot-playback", + "librespot-protocol", + "log", + "num-bigint", + "protobuf", + "rand 0.7.3", + "serde", + "serde_derive", + "serde_json", + "sha-1", + "tokio", + "url 1.7.2", +] + [[package]] name = "librespot-core" version = "0.1.3" @@ -1434,6 +1585,12 @@ dependencies = [ "libc", ] +[[package]] +name = "match_cfg" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" + [[package]] name = "matches" version = "0.1.8" @@ -1458,9 +1615,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.7.7" +version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e50ae3f04d169fcc9bde0b547d1c205219b7157e07ded9c5aff03e0637cb3ed7" +checksum = "dc250d6848c90d719ea2ce34546fb5df7af1d3fd189d10bf7bad80bfcebecd95" dependencies = [ "libc", "log", @@ -1485,6 +1642,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0419348c027fa7be448d2ae7ea0e4e04c2334c31dc4e74ab29f00a2a7ca69204" +[[package]] +name = "multimap" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1255076139a83bb467426e7f8d0134968a8118844faa755985e077cf31850333" +dependencies = [ + "serde", +] + [[package]] name = "ndk" version = "0.2.1" @@ -1726,14 +1892,14 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ccb628cad4f84851442432c60ad8e1f607e29752d0bf072cbd0baf28aa34272" +checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018" dependencies = [ "cfg-if 1.0.0", "instant", "libc", - "redox_syscall 0.1.57", + "redox_syscall", "smallvec", "winapi", ] @@ -1786,33 +1952,13 @@ dependencies = [ "ucd-trie", ] -[[package]] -name = "pin-project" -version = "0.4.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ffbc8e94b38ea3d2d8ba92aea2983b503cd75d0888d75b86bb37970b5698e15" -dependencies = [ - "pin-project-internal 0.4.27", -] - [[package]] name = "pin-project" version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96fa8ebb90271c4477f144354485b8068bd8f6b78b428b01ba892ca26caf0b63" dependencies = [ - "pin-project-internal 1.0.5", -] - -[[package]] -name = "pin-project-internal" -version = "0.4.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65ad2ae56b6abe3a1ee25f15ee605bacadb9a764edaba9c2bf4103800d4a1895" -dependencies = [ - "proc-macro2", - "quote", - "syn", + "pin-project-internal", ] [[package]] @@ -1963,10 +2109,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3bbaa49075179162b49acac1c6aa45fb4dafb5f13cf6794276d77bc7fd95757b" dependencies = [ "error-chain", - "idna 0.2.1", + "idna 0.2.2", "lazy_static", "regex", - "url 2.2.0", + "url 2.2.1", ] [[package]] @@ -1979,10 +2125,16 @@ dependencies = [ ] [[package]] -name = "quote" -version = "1.0.8" +name = "quick-error" +version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "991431c3519a3f36861882da93630ce66b52918dcf1b8e2fd66b397fc96f28df" +checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" + +[[package]] +name = "quote" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3d0b9745dc2debf507c8422de05d7226cc1f0644216dfdfead988f9b1ab32a7" dependencies = [ "proc-macro2", ] @@ -2021,7 +2173,7 @@ checksum = "0ef9e7e66b4468674bfcb0c81af8b7fa0bb154fa9f28eb840da5c447baeb8d7e" dependencies = [ "libc", "rand_chacha 0.3.0", - "rand_core 0.6.1", + "rand_core 0.6.2", "rand_hc 0.3.0", ] @@ -2042,7 +2194,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e12735cf05c9e10bf21534da50a147b924d555dc7a547c42e6bb2d5b6017ae0d" dependencies = [ "ppv-lite86", - "rand_core 0.6.1", + "rand_core 0.6.2", ] [[package]] @@ -2071,9 +2223,9 @@ dependencies = [ [[package]] name = "rand_core" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c026d7df8b298d90ccbbc5190bd04d85e159eaf5576caeacf8741da93ccbd2e5" +checksum = "34cf66eb183df1c5876e2dcf6b13d57340741e8dc255b48e40a26de954d06ae7" dependencies = [ "getrandom 0.2.2", ] @@ -2093,20 +2245,14 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3190ef7066a446f2e7f42e239d161e905420ccab01eb967c9eb27d21b2322a73" dependencies = [ - "rand_core 0.6.1", + "rand_core 0.6.2", ] [[package]] name = "redox_syscall" -version = "0.1.57" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" - -[[package]] -name = "redox_syscall" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05ec8ca9416c5ea37062b502703cd7fcb207736bc294f6e0cf367ac6fc234570" +checksum = "94341e4e44e24f6b591b59e47a8a027df12e008d73fd5672dbea9cc22f4507d9" dependencies = [ "bitflags", ] @@ -2479,6 +2625,15 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0" +[[package]] +name = "stream-cipher" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8131256a5896cabcf5eb04f4d6dacbe1aefda854b0d9896e09cb58829ec5638c" +dependencies = [ + "generic-array 0.12.3", +] + [[package]] name = "strsim" version = "0.9.3" @@ -2549,9 +2704,9 @@ dependencies = [ [[package]] name = "tar" -version = "0.4.32" +version = "0.4.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0313546c01d59e29be4f09687bcb4fb6690cec931cc3607b6aec7a0e417f4cc6" +checksum = "c0bcfbd6a598361fda270d82469fff3d65089dc33e175c9a131f7b4cd395f228" dependencies = [ "filetime", "libc", @@ -2567,7 +2722,7 @@ dependencies = [ "cfg-if 1.0.0", "libc", "rand 0.8.3", - "redox_syscall 0.2.4", + "redox_syscall", "remove_dir_all", "winapi", ] @@ -2583,18 +2738,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.23" +version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76cc616c6abf8c8928e2fdcc0dbfab37175edd8fb49a4641066ad1364fdab146" +checksum = "e0f4a65597094d4483ddaed134f409b2cb7c1beccf25201a9f73c719254fa98e" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.23" +version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9be73a2caec27583d0046ef3796c3794f868a5bc813db689eed00c7631275cd1" +checksum = "7765189610d8241a44529806d6fd1f2e0a08734313a35d5b3a556f92b381f3c0" dependencies = [ "proc-macro2", "quote", @@ -2603,9 +2758,9 @@ dependencies = [ [[package]] name = "thread_local" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8208a331e1cb318dd5bd76951d2b8fc48ca38a69f5f4e4af1b6a9f8c6236915" +checksum = "8018d24e04c95ac8790716a5987d0fec4f8b27249ffa0f7d33f1369bdfb88cbd" dependencies = [ "once_cell", ] @@ -2731,9 +2886,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" [[package]] name = "tracing" -version = "0.1.23" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d40a22fd029e33300d8d89a5cc8ffce18bb7c587662f54629e94c9de5487f3" +checksum = "f77d3842f76ca899ff2dbcf231c5c65813dea431301d6eb686279c15c4464f12" dependencies = [ "cfg-if 1.0.0", "pin-project-lite", @@ -2751,11 +2906,11 @@ dependencies = [ [[package]] name = "tracing-futures" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab7bb6f14721aa00656086e9335d363c5c8747bae02ebe32ea2c7dece5689b4c" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" dependencies = [ - "pin-project 0.4.27", + "pin-project", "tracing", ] @@ -2836,7 +2991,7 @@ dependencies = [ "once_cell", "qstring", "rustls", - "url 2.2.0", + "url 2.2.1", "webpki", "webpki-roots", ] @@ -2854,12 +3009,12 @@ dependencies = [ [[package]] name = "url" -version = "2.2.0" +version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5909f2b0817350449ed73e8bcd81c8c3c8d9a7a5d8acba4b27db277f1868976e" +checksum = "9ccd964113622c8e9322cfac19eb1004a07e636c545f325da085d5cdde6f1f8b" dependencies = [ "form_urlencoded", - "idna 0.2.1", + "idna 0.2.2", "matches", "percent-encoding 2.1.0", ] diff --git a/Cargo.toml b/Cargo.toml index a7ef8ed4..fa67c0fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,9 +24,9 @@ path = "src/lib.rs" path = "audio" version = "0.1.3" -# [dependencies.librespot-connect] -# path = "connect" -# version = "0.1.3" +[dependencies.librespot-connect] +path = "connect" +version = "0.1.3" [dependencies.librespot-core] path = "core" diff --git a/connect/Cargo.toml b/connect/Cargo.toml index b5cd4fdb..1f73f01b 100644 --- a/connect/Cargo.toml +++ b/connect/Cargo.toml @@ -19,8 +19,8 @@ version = "0.1.3" [dependencies] base64 = "0.13" -futures = "0.1" -hyper = "0.12" +futures = "0.3" +hyper = { version = "0.14", features = ["server", "http1"] } log = "0.4" num-bigint = "0.3" protobuf = "~2.14.0" @@ -28,7 +28,7 @@ rand = "0.7" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" -tokio = "0.1" +tokio = { version = "1.0", features = ["macros"] } url = "1.7" sha-1 = "0.8" hmac = "0.7" @@ -36,7 +36,7 @@ aes-ctr = "0.3" block-modes = "0.3" dns-sd = { version = "0.1.3", optional = true } -libmdns = { version = "0.2.7", optional = true } +libmdns = { version = "0.6", optional = true } [features] diff --git a/connect/src/discovery.rs b/connect/src/discovery.rs index f9414ee6..95597359 100644 --- a/connect/src/discovery.rs +++ b/connect/src/discovery.rs @@ -2,15 +2,17 @@ use aes_ctr::stream_cipher::generic_array::GenericArray; use aes_ctr::stream_cipher::{NewStreamCipher, SyncStreamCipher}; use aes_ctr::Aes128Ctr; use base64; -use futures::sync::mpsc; -use futures::{Future, Poll, Stream}; +use futures::channel::mpsc; +use futures::{Stream, StreamExt}; use hmac::{Hmac, Mac}; - -use hyper::{ - self, server::conn::Http, service::Service, Body, Method, Request, Response, StatusCode, -}; +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Method, Request, Response, StatusCode}; use sha1::{Digest, Sha1}; +use std::borrow::Cow; +use std::net::{Ipv4Addr, SocketAddr}; +use std::task::{Context, Poll}; + #[cfg(feature = "with-dns-sd")] use dns_sd::DNSService; @@ -21,8 +23,8 @@ use num_bigint::BigUint; use rand; use std::collections::BTreeMap; use std::io; +use std::pin::Pin; use std::sync::Arc; -use tokio::runtime::current_thread::Handle; use url; use librespot_core::authentication::Credentials; @@ -63,13 +65,8 @@ impl Discovery { (discovery, rx) } -} -impl Discovery { - fn handle_get_info( - &self, - _params: &BTreeMap, - ) -> ::futures::Finished, hyper::Error> { + fn handle_get_info(&self, _: BTreeMap, Cow<'_, str>>) -> Response { let public_key = self.0.public_key.to_bytes_be(); let public_key = base64::encode(&public_key); @@ -93,20 +90,20 @@ impl Discovery { }); let body = result.to_string(); - ::futures::finished(Response::new(Body::from(body))) + Response::new(Body::from(body)) } fn handle_add_user( &self, - params: &BTreeMap, - ) -> ::futures::Finished, hyper::Error> { - let username = params.get("userName").unwrap(); + params: BTreeMap, Cow<'_, str>>, + ) -> Response { + let username = params.get("userName").unwrap().as_ref(); let encrypted_blob = params.get("blob").unwrap(); let client_key = params.get("clientKey").unwrap(); - let encrypted_blob = base64::decode(encrypted_blob).unwrap(); + let encrypted_blob = base64::decode(encrypted_blob.as_bytes()).unwrap(); - let client_key = base64::decode(client_key).unwrap(); + let client_key = base64::decode(client_key.as_bytes()).unwrap(); let client_key = BigUint::from_bytes_be(&client_key); let shared_key = util::powm(&client_key, &self.0.private_key, &DH_PRIME); @@ -141,7 +138,7 @@ impl Discovery { }); let body = result.to_string(); - return ::futures::finished(Response::new(Body::from(body))); + return Response::new(Body::from(body)); } let decrypted = { @@ -155,7 +152,7 @@ impl Discovery { }; let credentials = - Credentials::with_blob(username.to_owned(), &decrypted, &self.0.device_id); + Credentials::with_blob(username.to_string(), &decrypted, &self.0.device_id); self.0.tx.unbounded_send(credentials).unwrap(); @@ -166,52 +163,39 @@ impl Discovery { }); let body = result.to_string(); - return ::futures::finished(Response::new(Body::from(body))); + Response::new(Body::from(body)) } - fn not_found(&self) -> ::futures::Finished, hyper::Error> { + fn not_found(&self) -> Response { let mut res = Response::default(); *res.status_mut() = StatusCode::NOT_FOUND; - ::futures::finished(res) + res } -} -impl Service for Discovery { - type ReqBody = Body; - type ResBody = Body; - type Error = hyper::Error; - - type Future = Box, Error = hyper::Error> + Send>; - fn call(&mut self, request: Request<(Self::ReqBody)>) -> Self::Future { + async fn call(self, request: Request) -> hyper::Result> { let mut params = BTreeMap::new(); let (parts, body) = request.into_parts(); if let Some(query) = parts.uri.query() { - params.extend(url::form_urlencoded::parse(query.as_bytes()).into_owned()); + let query_params = url::form_urlencoded::parse(query.as_bytes()); + params.extend(query_params); } if parts.method != Method::GET { debug!("{:?} {:?} {:?}", parts.method, parts.uri.path(), params); } - let this = self.clone(); - Box::new( - body.fold(Vec::new(), |mut acc, chunk| { - acc.extend_from_slice(chunk.as_ref()); - Ok::<_, hyper::Error>(acc) - }) - .map(move |body| { - params.extend(url::form_urlencoded::parse(&body).into_owned()); - params - }) - .and_then(move |params| { - match (parts.method, params.get("action").map(AsRef::as_ref)) { - (Method::GET, Some("getInfo")) => this.handle_get_info(¶ms), - (Method::POST, Some("addUser")) => this.handle_add_user(¶ms), - _ => this.not_found(), - } - }), + let body = hyper::body::to_bytes(body).await?; + + params.extend(url::form_urlencoded::parse(&body)); + + Ok( + match (parts.method, params.get("action").map(AsRef::as_ref)) { + (Method::GET, Some("getInfo")) => self.handle_get_info(params), + (Method::POST, Some("addUser")) => self.handle_add_user(params), + _ => self.not_found(), + }, ) } } @@ -229,39 +213,25 @@ pub struct DiscoveryStream { } pub fn discovery( - handle: &Handle, config: ConnectConfig, device_id: String, port: u16, ) -> io::Result { let (discovery, creds_rx) = Discovery::new(config.clone(), device_id); - let serve = { - let http = Http::new(); - http.serve_addr(&format!("0.0.0.0:{}", port).parse().unwrap(), move || { - Ok(discovery.clone()) - }) - .unwrap() - }; + let address = SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), port); - let s_port = serve.incoming_ref().local_addr().port(); + let make_service = make_service_fn(move |_| { + let discovery = discovery.clone(); + async move { Ok::<_, hyper::Error>(service_fn(move |request| discovery.clone().call(request))) } + }); + + let server = hyper::Server::bind(&address).serve(make_service); + + let s_port = server.local_addr().port(); debug!("Zeroconf server listening on 0.0.0.0:{}", s_port); - let server_future = { - let handle = handle.clone(); - serve - .for_each( - move |connecting: hyper::server::conn::Connecting< - hyper::server::conn::AddrStream, - futures::Failed<_, hyper::Error>, - >| { - handle.spawn(connecting.flatten().then(|_| Ok(()))).unwrap(); - Ok(()) - }, - ) - .then(|_| Ok(())) - }; - handle.spawn(server_future).unwrap(); + tokio::spawn(server); #[cfg(feature = "with-dns-sd")] let svc = DNSService::register( @@ -275,7 +245,7 @@ pub fn discovery( .unwrap(); #[cfg(not(feature = "with-dns-sd"))] - let responder = libmdns::Responder::spawn(&handle)?; + let responder = libmdns::Responder::spawn(&tokio::runtime::Handle::current())?; #[cfg(not(feature = "with-dns-sd"))] let svc = responder.register( @@ -293,9 +263,8 @@ pub fn discovery( impl Stream for DiscoveryStream { type Item = Credentials; - type Error = (); - fn poll(&mut self) -> Poll, Self::Error> { - self.credentials.poll() + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.credentials.poll_next_unpin(cx) } } diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index 352a3fcf..5dc89599 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -1,19 +1,15 @@ -use std; +use std::pin::Pin; use std::time::{SystemTime, UNIX_EPOCH}; -use futures::future; -use futures::sync::mpsc; -use futures::{Async, Future, Poll, Sink, Stream}; -use protobuf::{self, Message}; -use rand; -use rand::seq::SliceRandom; -use serde_json; - use crate::context::StationContext; use crate::playback::mixer::Mixer; use crate::playback::player::{Player, PlayerEvent, PlayerEventChannel}; use crate::protocol; use crate::protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State, TrackRef}; +use futures::channel::mpsc; +use futures::future::{self, FusedFuture}; +use futures::stream::FusedStream; +use futures::{Future, FutureExt, Sink, SinkExt, StreamExt}; use librespot_core::config::{ConnectConfig, VolumeCtrl}; use librespot_core::mercury::MercuryError; use librespot_core::session::Session; @@ -21,6 +17,10 @@ use librespot_core::spotify_id::{SpotifyAudioType, SpotifyId, SpotifyIdError}; use librespot_core::util::url_encode; use librespot_core::util::SeqGenerator; use librespot_core::version; +use protobuf::{self, Message}; +use rand; +use rand::seq::SliceRandom; +use serde_json; enum SpircPlayStatus { Stopped, @@ -40,7 +40,11 @@ enum SpircPlayStatus { }, } -pub struct SpircTask { +type BoxedFuture = Pin + Send>>; +type BoxedStream = Pin + Send>>; +type BoxedSink = Pin + Send>>; + +struct SpircTask { player: Player, mixer: Box, config: SpircTaskConfig, @@ -54,15 +58,16 @@ pub struct SpircTask { mixer_started: bool, play_status: SpircPlayStatus, - subscription: Box>, - sender: Box>, + sender_flushed: bool, + subscription: BoxedStream, + sender: BoxedSink, commands: mpsc::UnboundedReceiver, player_events: PlayerEventChannel, shutdown: bool, session: Session, - context_fut: Box>, - autoplay_fut: Box>, + context_fut: BoxedFuture>, + autoplay_fut: BoxedFuture>, context: Option, } @@ -246,7 +251,7 @@ impl Spirc { session: Session, player: Player, mixer: Box, - ) -> (Spirc, SpircTask) { + ) -> (Spirc, impl Future) { debug!("new Spirc[{}]", session.session_id()); let ident = session.device_id().to_owned(); @@ -255,20 +260,23 @@ impl Spirc { debug!("canonical_username: {}", url_encode(&session.username())); let uri = format!("hm://remote/user/{}/", url_encode(&session.username())); - let subscription = session.mercury().subscribe(&uri as &str); - let subscription = subscription - .map(|stream| stream.map_err(|_| MercuryError)) - .flatten_stream(); - let subscription = Box::new(subscription.map(|response| -> Frame { - let data = response.payload.first().unwrap(); - protobuf::parse_from_bytes(data).unwrap() - })); + let subscription = Box::pin( + session + .mercury() + .subscribe(uri.clone()) + .map(Result::unwrap) + .flatten_stream() + .map(|response| -> Frame { + let data = response.payload.first().unwrap(); + protobuf::parse_from_bytes(data).unwrap() + }), + ); - let sender = Box::new( + let sender = Box::pin( session .mercury() .sender(uri) - .with(|frame: Frame| Ok(frame.write_to_bytes().unwrap())), + .with(|frame: Frame| future::ready(Ok(frame.write_to_bytes().unwrap()))), ); let (cmd_tx, cmd_rx) = mpsc::unbounded(); @@ -303,11 +311,12 @@ impl Spirc { commands: cmd_rx, player_events: player_events, + sender_flushed: true, shutdown: false, - session: session.clone(), + session: session, - context_fut: Box::new(future::empty()), - autoplay_fut: Box::new(future::empty()), + context_fut: Box::pin(future::pending()), + autoplay_fut: Box::pin(future::pending()), context: None, }; @@ -317,7 +326,7 @@ impl Spirc { task.hello(); - (spirc, task) + (spirc, task.run()) } pub fn play(&self) { @@ -346,114 +355,80 @@ impl Spirc { } } -impl Future for SpircTask { - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll<(), ()> { - loop { - let mut progress = false; - - if self.session.is_invalid() { - return Ok(Async::Ready(())); - } - - if !self.shutdown { - match self.subscription.poll().unwrap() { - Async::Ready(Some(frame)) => { - progress = true; - self.handle_frame(frame); - } - Async::Ready(None) => { +impl SpircTask { + async fn run(mut self) { + while !self.session.is_invalid() && !self.shutdown { + tokio::select! { + frame = self.subscription.next() => match frame { + Some(frame) => self.handle_frame(frame), + None => { error!("subscription terminated"); - self.shutdown = true; - self.commands.close(); + break; } - Async::NotReady => (), - } - - match self.commands.poll().unwrap() { - Async::Ready(Some(command)) => { - progress = true; - self.handle_command(command); + }, + cmd = self.commands.next(), if !self.commands.is_terminated() => if let Some(cmd) = cmd { + self.handle_command(cmd); + }, + event = self.player_events.next(), if !self.player_events.is_terminated() => if let Some(event) = event { + self.handle_player_event(event) + }, + result = self.sender.flush(), if !self.sender_flushed => { + if result.is_err() { + error!("Cannot flush spirc event sender."); + break; } - Async::Ready(None) => (), - Async::NotReady => (), - } - match self.player_events.poll() { - Ok(Async::NotReady) => (), - Ok(Async::Ready(None)) => (), - Err(_) => (), - Ok(Async::Ready(Some(event))) => { - progress = true; - self.handle_player_event(event); + self.sender_flushed = true; + }, + context = &mut self.context_fut, if !self.context_fut.is_terminated() => { + match context { + Ok(value) => { + let r_context = serde_json::from_value::(value); + self.context = match r_context { + Ok(context) => { + info!( + "Resolved {:?} tracks from <{:?}>", + context.tracks.len(), + self.state.get_context_uri(), + ); + Some(context) + } + Err(e) => { + error!("Unable to parse JSONContext {:?}", e); + None + } + }; + // It needn't be so verbose - can be as simple as + // if let Some(ref context) = r_context { + // info!("Got {:?} tracks from <{}>", context.tracks.len(), context.uri); + // } + // self.context = r_context; + }, + Err(err) => { + error!("ContextError: {:?}", err) + } } - } - // TODO: Refactor - match self.context_fut.poll() { - Ok(Async::Ready(value)) => { - let r_context = serde_json::from_value::(value.clone()); - self.context = match r_context { - Ok(context) => { - info!( - "Resolved {:?} tracks from <{:?}>", - context.tracks.len(), - self.state.get_context_uri(), - ); - Some(context) - } - Err(e) => { - error!("Unable to parse JSONContext {:?}\n{:?}", e, value); - None - } - }; - // It needn't be so verbose - can be as simple as - // if let Some(ref context) = r_context { - // info!("Got {:?} tracks from <{}>", context.tracks.len(), context.uri); - // } - // self.context = r_context; - - progress = true; - self.context_fut = Box::new(future::empty()); + }, + autoplay = &mut self.autoplay_fut, if !self.autoplay_fut.is_terminated() => { + match autoplay { + Ok(autoplay_station_uri) => { + info!("Autoplay uri resolved to <{:?}>", autoplay_station_uri); + self.context_fut = self.resolve_station(&autoplay_station_uri); + }, + Err(err) => { + error!("AutoplayError: {:?}", err) + } } - Ok(Async::NotReady) => (), - Err(err) => { - self.context_fut = Box::new(future::empty()); - error!("ContextError: {:?}", err) - } - } - - match self.autoplay_fut.poll() { - Ok(Async::Ready(autoplay_station_uri)) => { - info!("Autoplay uri resolved to <{:?}>", autoplay_station_uri); - self.context_fut = self.resolve_station(&autoplay_station_uri); - progress = true; - self.autoplay_fut = Box::new(future::empty()); - } - Ok(Async::NotReady) => (), - Err(err) => { - self.autoplay_fut = Box::new(future::empty()); - error!("AutoplayError: {:?}", err) - } - } - } - - let poll_sender = self.sender.poll_complete().unwrap(); - - // Only shutdown once we've flushed out all our messages - if self.shutdown && poll_sender.is_ready() { - return Ok(Async::Ready(())); - } - - if !progress { - return Ok(Async::NotReady); + }, + else => break } } - } -} -impl SpircTask { + if self.sender.close().await.is_err() { + warn!("Cannot close spirc event sender."); + } + } + fn now_ms(&mut self) -> i64 { let dur = match SystemTime::now().duration_since(UNIX_EPOCH) { Ok(dur) => dur, @@ -1060,52 +1035,53 @@ impl SpircTask { } } - fn resolve_station( - &self, - uri: &str, - ) -> Box> { + fn resolve_station(&self, uri: &str) -> BoxedFuture> { let radio_uri = format!("hm://radio-apollo/v3/stations/{}", uri); self.resolve_uri(&radio_uri) } - fn resolve_autoplay_uri( - &self, - uri: &str, - ) -> Box> { + fn resolve_autoplay_uri(&self, uri: &str) -> BoxedFuture> { let query_uri = format!("hm://autoplay-enabled/query?uri={}", uri); let request = self.session.mercury().get(query_uri); - Box::new(request.and_then(move |response| { - if response.status_code == 200 { + Box::pin( + async { + let response = request.await?; + + if response.status_code == 200 { + let data = response + .payload + .first() + .expect("Empty autoplay uri") + .to_vec(); + let autoplay_uri = String::from_utf8(data).unwrap(); + Ok(autoplay_uri) + } else { + warn!("No autoplay_uri found"); + Err(MercuryError) + } + } + .fuse(), + ) + } + + fn resolve_uri(&self, uri: &str) -> BoxedFuture> { + let request = self.session.mercury().get(uri); + + Box::pin( + async move { + let response = request.await?; + let data = response .payload .first() - .expect("Empty autoplay uri") - .to_vec(); - let autoplay_uri = String::from_utf8(data).unwrap(); - Ok(autoplay_uri) - } else { - warn!("No autoplay_uri found"); - Err(MercuryError) + .expect("Empty payload on context uri"); + let response: serde_json::Value = serde_json::from_slice(&data).unwrap(); + + Ok(response) } - })) - } - - fn resolve_uri( - &self, - uri: &str, - ) -> Box> { - let request = self.session.mercury().get(uri); - - Box::new(request.and_then(move |response| { - let data = response - .payload - .first() - .expect("Empty payload on context uri"); - let response: serde_json::Value = serde_json::from_slice(&data).unwrap(); - - Ok(response) - })) + .fuse(), + ) } fn update_tracks_from_context(&mut self) { @@ -1344,8 +1320,14 @@ impl<'a> CommandSender<'a> { if !self.frame.has_state() && self.spirc.device.get_is_active() { self.frame.set_state(self.spirc.state.clone()); } + let sender = &mut self.spirc.sender; - let send = self.spirc.sender.start_send(self.frame).unwrap(); - assert!(send.is_ready()); + future::poll_fn(|cx| sender.as_mut().poll_ready(cx)) + .now_or_never() + .unwrap() + .unwrap(); + + sender.as_mut().start_send(self.frame).unwrap(); + self.spirc.sender_flushed = false; } } diff --git a/core/src/mercury/mod.rs b/core/src/mercury/mod.rs index 72360c97..4baa674f 100644 --- a/core/src/mercury/mod.rs +++ b/core/src/mercury/mod.rs @@ -102,46 +102,48 @@ impl MercuryManager { MercurySender::new(self.clone(), uri.into()) } - pub async fn subscribe>( + pub fn subscribe>( &self, uri: T, - ) -> Result, MercuryError> { + ) -> impl Future, MercuryError>> + 'static + { let uri = uri.into(); - let response = self - .request(MercuryRequest { - method: MercuryMethod::SUB, - uri: uri.clone(), - content_type: None, - payload: Vec::new(), - }) - .await?; - - let (tx, rx) = mpsc::unbounded(); - - let manager = self.clone(); - - manager.lock(move |inner| { - if !inner.invalid { - debug!("subscribed uri={} count={}", uri, response.payload.len()); - if !response.payload.is_empty() { - // Old subscription protocol, watch the provided list of URIs - for sub in response.payload { - let mut sub: protocol::pubsub::Subscription = - protobuf::parse_from_bytes(&sub).unwrap(); - let sub_uri = sub.take_uri(); - - debug!("subscribed sub_uri={}", sub_uri); - - inner.subscriptions.push((sub_uri, tx.clone())); - } - } else { - // New subscription protocol, watch the requested URI - inner.subscriptions.push((uri, tx)); - } - } + let request = self.request(MercuryRequest { + method: MercuryMethod::SUB, + uri: uri.clone(), + content_type: None, + payload: Vec::new(), }); - Ok(rx) + let manager = self.clone(); + async move { + let response = request.await?; + + let (tx, rx) = mpsc::unbounded(); + + manager.lock(move |inner| { + if !inner.invalid { + debug!("subscribed uri={} count={}", uri, response.payload.len()); + if !response.payload.is_empty() { + // Old subscription protocol, watch the provided list of URIs + for sub in response.payload { + let mut sub: protocol::pubsub::Subscription = + protobuf::parse_from_bytes(&sub).unwrap(); + let sub_uri = sub.take_uri(); + + debug!("subscribed sub_uri={}", sub_uri); + + inner.subscriptions.push((sub_uri, tx.clone())); + } + } else { + // New subscription protocol, watch the requested URI + inner.subscriptions.push((uri, tx)); + } + } + }); + + Ok(rx) + } } pub(crate) fn dispatch(&self, cmd: u8, mut data: Bytes) { diff --git a/src/lib.rs b/src/lib.rs index 4304e187..7cdd3178 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ #![crate_name = "librespot"] pub extern crate librespot_audio as audio; -// pub extern crate librespot_connect as connect; +pub extern crate librespot_connect as connect; pub extern crate librespot_core as core; pub extern crate librespot_metadata as metadata; pub extern crate librespot_playback as playback;