diff --git a/Cargo.lock b/Cargo.lock index 00343168..50234e98 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -27,21 +27,44 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "884391ef1066acaa41e766ba8f596341b96e93ce34f9a43e7d24bf0a0eaf0561" dependencies = [ - "aes-soft", - "aesni", + "aes-soft 0.6.4", + "aesni 0.10.0", "cipher", ] +[[package]] +name = "aes-ctr" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2e5b0458ea3beae0d1d8c0f3946564f8e10f90646cf78c06b4351052058d1ee" +dependencies = [ + "aes-soft 0.3.3", + "aesni 0.6.0", + "ctr 0.3.2", + "stream-cipher", +] + [[package]] name = "aes-ctr" version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7729c3cde54d67063be556aeac75a81330d802f0259500ca40cb52967f975763" dependencies = [ - "aes-soft", - "aesni", + "aes-soft 0.6.4", + "aesni 0.10.0", "cipher", - "ctr", + "ctr 0.6.0", +] + +[[package]] +name = "aes-soft" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfd7e7ae3f9a1fb5c03b389fc6bb9a51400d0c13053f0dca698c832bfd893a0d" +dependencies = [ + "block-cipher-trait", + "byteorder", + "opaque-debug 0.2.3", ] [[package]] @@ -54,6 +77,17 @@ dependencies = [ "opaque-debug 0.3.0", ] +[[package]] +name = "aesni" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f70a6b5f971e473091ab7cfb5ffac6cde81666c4556751d8d5620ead8abf100" +dependencies = [ + "block-cipher-trait", + "opaque-debug 0.2.3", + "stream-cipher", +] + [[package]] name = "aesni" version = "0.10.0" @@ -223,6 +257,25 @@ dependencies = [ "generic-array 0.12.3", ] +[[package]] +name = "block-cipher-trait" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c924d49bd09e7c06003acda26cd9742e796e34282ec6c1189404dee0c1f4774" +dependencies = [ + "generic-array 0.12.3", +] + +[[package]] +name = "block-modes" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31aa8410095e39fdb732909fb5730a48d5bd7c2e3cd76bd1b07b3dbea130c529" +dependencies = [ + "block-cipher-trait", + "block-padding", +] + [[package]] name = "block-padding" version = "0.1.5" @@ -234,9 +287,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.6.0" +version = "3.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "099e596ef14349721d9016f6b80dd3419ea1bf289ab9b44df8e4dfd3a005d5d9" +checksum = "63396b8a4b9de3f4fdfb320ab6080762242f66a8ef174c49d8e19b674db4cdbe" [[package]] name = "byte-tools" @@ -258,9 +311,9 @@ checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040" [[package]] name = "cc" -version = "1.0.66" +version = "1.0.67" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c0496836a84f8d0495758516b8621a622beb77c0fed418570e50764093ced48" +checksum = "e3c69b077ad434294d3ce9f1f6143a2a4b89a8a2d54ef813d85003a4fd1137fd" [[package]] name = "cesu8" @@ -384,13 +437,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3818dfca4b0cb5211a659bbcbb94225b7127407b2b135e650d717bfb78ab10d3" dependencies = [ "cookie", - "idna 0.2.1", + "idna 0.2.2", "log", "publicsuffix", "serde", "serde_json", "time 0.2.25", - "url 2.2.0", + "url 2.2.1", ] [[package]] @@ -462,6 +515,16 @@ dependencies = [ "subtle", ] +[[package]] +name = "ctr" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "022cd691704491df67d25d006fe8eca083098253c4d43516c2206479c58c6736" +dependencies = [ + "block-cipher-trait", + "stream-cipher", +] + [[package]] name = "ctr" version = "0.6.0" @@ -532,6 +595,16 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" +[[package]] +name = "dns-sd" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d748509dea20228f63ba519bf142ce2593396386125b01f5b0d6412dab972087" +dependencies = [ + "libc", + "pkg-config", +] + [[package]] name = "either" version = "1.6.1" @@ -540,9 +613,9 @@ checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457" [[package]] name = "env_logger" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f26ecb66b4bdca6c1409b40fb255eefc2bd4f6d135dab3c3124f80ffa2a9661e" +checksum = "17392a012ea30ef05a610aa97dfb49496e71c9f676b27879922ea5bdf60d9d3f" dependencies = [ "atty", "humantime", @@ -586,7 +659,7 @@ checksum = "1d34cfa13a63ae058bfa601fe9e313bbdb3746427c1459185464ce0fcf62e1e8" dependencies = [ "cfg-if 1.0.0", "libc", - "redox_syscall 0.2.4", + "redox_syscall", "winapi", ] @@ -598,9 +671,9 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" [[package]] name = "form_urlencoded" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ece68d15c92e84fa4f19d3780f1294e5ca82a78a6d515f1efaabcc144688be00" +checksum = "5fc25a87fa4fd2094bffb06925852034d90a17f0d1e05197d4956d3555752191" dependencies = [ "matches", "percent-encoding 2.1.0", @@ -732,6 +805,15 @@ dependencies = [ "version_check", ] +[[package]] +name = "getopts" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14dbbfd5c71d70241ecf9e6f13737f7b5ce823821063188d7e46c41d371eebd5" +dependencies = [ + "unicode-width", +] + [[package]] name = "getrandom" version = "0.1.16" @@ -824,9 +906,9 @@ dependencies = [ [[package]] name = "gstreamer" -version = "0.16.5" +version = "0.16.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5d50f822055923f1cbede233aa5dfd4ee957cf328fb3076e330886094e11d6cf" +checksum = "9ff5d0f7ff308ae37e6eb47b6ded17785bdea06e438a708cd09e0288c1862f33" dependencies = [ "bitflags", "cfg-if 1.0.0", @@ -964,6 +1046,12 @@ dependencies = [ "libc", ] +[[package]] +name = "hex" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "644f9158b2f133fd50f5fb3242878846d9eb792e445c893805ff0e3824006e35" + [[package]] name = "hmac" version = "0.7.1" @@ -974,6 +1062,17 @@ dependencies = [ "digest", ] +[[package]] +name = "hostname" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867" +dependencies = [ + "libc", + "match_cfg", + "winapi", +] + [[package]] name = "http" version = "0.2.3" @@ -1029,7 +1128,7 @@ dependencies = [ "httparse", "httpdate", "itoa", - "pin-project 1.0.5", + "pin-project", "socket2", "tokio", "tower-service", @@ -1056,15 +1155,36 @@ dependencies = [ [[package]] name = "idna" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de910d521f7cc3135c4de8db1cb910e0b5ed1dc6f57c381cd07e8e661ce10094" +checksum = "89829a5d69c23d348314a7ac337fe39173b61149a9864deabd260983aed48c21" dependencies = [ "matches", "unicode-bidi", "unicode-normalization", ] +[[package]] +name = "if-addrs" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28538916eb3f3976311f5dfbe67b5362d0add1293d0a9cad17debf86f8e3aa48" +dependencies = [ + "if-addrs-sys", + "libc", + "winapi", +] + +[[package]] +name = "if-addrs-sys" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de74b9dd780476e837e5eb5ab7c88b49ed304126e412030a0adba99c8efe79ea" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "indexmap" version = "1.6.1" @@ -1222,6 +1342,24 @@ dependencies = [ "winapi", ] +[[package]] +name = "libmdns" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b276920bfc6c9285e16ffd30ed410487f0185f383483f45a3446afc0554fded" +dependencies = [ + "byteorder", + "futures-util", + "hostname", + "if-addrs", + "log", + "multimap", + "quick-error", + "rand 0.8.3", + "socket2", + "tokio", +] + [[package]] name = "libpulse-binding" version = "2.23.0" @@ -1274,18 +1412,33 @@ dependencies = [ name = "librespot" version = "0.1.3" dependencies = [ + "base64 0.13.0", + "env_logger", + "futures", + "getopts", + "hex", + "hyper", "librespot-audio", + "librespot-connect", "librespot-core", "librespot-metadata", "librespot-playback", "librespot-protocol", + "log", + "num-bigint", + "protobuf", + "rand 0.7.3", + "rpassword", + "sha-1", + "tokio", + "url 1.7.2", ] [[package]] name = "librespot-audio" version = "0.1.3" dependencies = [ - "aes-ctr", + "aes-ctr 0.6.0", "bit-set", "byteorder", "bytes", @@ -1301,6 +1454,33 @@ dependencies = [ "vorbis", ] +[[package]] +name = "librespot-connect" +version = "0.1.3" +dependencies = [ + "aes-ctr 0.3.0", + "base64 0.13.0", + "block-modes", + "dns-sd", + "futures", + "hmac", + "hyper", + "libmdns", + "librespot-core", + "librespot-playback", + "librespot-protocol", + "log", + "num-bigint", + "protobuf", + "rand 0.7.3", + "serde", + "serde_derive", + "serde_json", + "sha-1", + "tokio", + "url 1.7.2", +] + [[package]] name = "librespot-core" version = "0.1.3" @@ -1434,6 +1614,12 @@ dependencies = [ "libc", ] +[[package]] +name = "match_cfg" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4" + [[package]] name = "matches" version = "0.1.8" @@ -1458,9 +1644,9 @@ dependencies = [ [[package]] name = "mio" -version = "0.7.7" +version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e50ae3f04d169fcc9bde0b547d1c205219b7157e07ded9c5aff03e0637cb3ed7" +checksum = "dc250d6848c90d719ea2ce34546fb5df7af1d3fd189d10bf7bad80bfcebecd95" dependencies = [ "libc", "log", @@ -1485,6 +1671,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0419348c027fa7be448d2ae7ea0e4e04c2334c31dc4e74ab29f00a2a7ca69204" +[[package]] +name = "multimap" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1255076139a83bb467426e7f8d0134968a8118844faa755985e077cf31850333" +dependencies = [ + "serde", +] + [[package]] name = "ndk" version = "0.2.1" @@ -1726,14 +1921,14 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.8.2" +version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ccb628cad4f84851442432c60ad8e1f607e29752d0bf072cbd0baf28aa34272" +checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018" dependencies = [ "cfg-if 1.0.0", "instant", "libc", - "redox_syscall 0.1.57", + "redox_syscall", "smallvec", "winapi", ] @@ -1786,33 +1981,13 @@ dependencies = [ "ucd-trie", ] -[[package]] -name = "pin-project" -version = "0.4.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ffbc8e94b38ea3d2d8ba92aea2983b503cd75d0888d75b86bb37970b5698e15" -dependencies = [ - "pin-project-internal 0.4.27", -] - [[package]] name = "pin-project" version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96fa8ebb90271c4477f144354485b8068bd8f6b78b428b01ba892ca26caf0b63" dependencies = [ - "pin-project-internal 1.0.5", -] - -[[package]] -name = "pin-project-internal" -version = "0.4.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65ad2ae56b6abe3a1ee25f15ee605bacadb9a764edaba9c2bf4103800d4a1895" -dependencies = [ - "proc-macro2", - "quote", - "syn", + "pin-project-internal", ] [[package]] @@ -1963,10 +2138,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3bbaa49075179162b49acac1c6aa45fb4dafb5f13cf6794276d77bc7fd95757b" dependencies = [ "error-chain", - "idna 0.2.1", + "idna 0.2.2", "lazy_static", "regex", - "url 2.2.0", + "url 2.2.1", ] [[package]] @@ -1979,10 +2154,16 @@ dependencies = [ ] [[package]] -name = "quote" -version = "1.0.8" +name = "quick-error" +version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "991431c3519a3f36861882da93630ce66b52918dcf1b8e2fd66b397fc96f28df" +checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" + +[[package]] +name = "quote" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3d0b9745dc2debf507c8422de05d7226cc1f0644216dfdfead988f9b1ab32a7" dependencies = [ "proc-macro2", ] @@ -2021,7 +2202,7 @@ checksum = "0ef9e7e66b4468674bfcb0c81af8b7fa0bb154fa9f28eb840da5c447baeb8d7e" dependencies = [ "libc", "rand_chacha 0.3.0", - "rand_core 0.6.1", + "rand_core 0.6.2", "rand_hc 0.3.0", ] @@ -2042,7 +2223,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e12735cf05c9e10bf21534da50a147b924d555dc7a547c42e6bb2d5b6017ae0d" dependencies = [ "ppv-lite86", - "rand_core 0.6.1", + "rand_core 0.6.2", ] [[package]] @@ -2071,9 +2252,9 @@ dependencies = [ [[package]] name = "rand_core" -version = "0.6.1" +version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c026d7df8b298d90ccbbc5190bd04d85e159eaf5576caeacf8741da93ccbd2e5" +checksum = "34cf66eb183df1c5876e2dcf6b13d57340741e8dc255b48e40a26de954d06ae7" dependencies = [ "getrandom 0.2.2", ] @@ -2093,20 +2274,14 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3190ef7066a446f2e7f42e239d161e905420ccab01eb967c9eb27d21b2322a73" dependencies = [ - "rand_core 0.6.1", + "rand_core 0.6.2", ] [[package]] name = "redox_syscall" -version = "0.1.57" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" - -[[package]] -name = "redox_syscall" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "05ec8ca9416c5ea37062b502703cd7fcb207736bc294f6e0cf367ac6fc234570" +checksum = "94341e4e44e24f6b591b59e47a8a027df12e008d73fd5672dbea9cc22f4507d9" dependencies = [ "bitflags", ] @@ -2168,6 +2343,16 @@ dependencies = [ "cpal", ] +[[package]] +name = "rpassword" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffc936cf8a7ea60c58f030fd36a612a48f440610214dc54bc36431f9ea0c3efb" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "rustc-demangle" version = "0.1.18" @@ -2386,6 +2571,15 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fdf1b9db47230893d76faad238fd6097fd6d6a9245cd7a4d90dbd639536bbd2" +[[package]] +name = "signal-hook-registry" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16f1d0fef1604ba8f7a073c7e701f213e056707210e9020af4528e0101ce11a6" +dependencies = [ + "libc", +] + [[package]] name = "slab" version = "0.4.2" @@ -2479,6 +2673,15 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0" +[[package]] +name = "stream-cipher" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8131256a5896cabcf5eb04f4d6dacbe1aefda854b0d9896e09cb58829ec5638c" +dependencies = [ + "generic-array 0.12.3", +] + [[package]] name = "strsim" version = "0.9.3" @@ -2549,9 +2752,9 @@ dependencies = [ [[package]] name = "tar" -version = "0.4.32" +version = "0.4.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0313546c01d59e29be4f09687bcb4fb6690cec931cc3607b6aec7a0e417f4cc6" +checksum = "c0bcfbd6a598361fda270d82469fff3d65089dc33e175c9a131f7b4cd395f228" dependencies = [ "filetime", "libc", @@ -2567,7 +2770,7 @@ dependencies = [ "cfg-if 1.0.0", "libc", "rand 0.8.3", - "redox_syscall 0.2.4", + "redox_syscall", "remove_dir_all", "winapi", ] @@ -2583,18 +2786,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.23" +version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76cc616c6abf8c8928e2fdcc0dbfab37175edd8fb49a4641066ad1364fdab146" +checksum = "e0f4a65597094d4483ddaed134f409b2cb7c1beccf25201a9f73c719254fa98e" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.23" +version = "1.0.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9be73a2caec27583d0046ef3796c3794f868a5bc813db689eed00c7631275cd1" +checksum = "7765189610d8241a44529806d6fd1f2e0a08734313a35d5b3a556f92b381f3c0" dependencies = [ "proc-macro2", "quote", @@ -2603,9 +2806,9 @@ dependencies = [ [[package]] name = "thread_local" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8208a331e1cb318dd5bd76951d2b8fc48ca38a69f5f4e4af1b6a9f8c6236915" +checksum = "8018d24e04c95ac8790716a5987d0fec4f8b27249ffa0f7d33f1369bdfb88cbd" dependencies = [ "once_cell", ] @@ -2685,8 +2888,11 @@ dependencies = [ "memchr", "mio", "num_cpus", + "once_cell", "pin-project-lite", + "signal-hook-registry", "tokio-macros", + "winapi", ] [[package]] @@ -2731,9 +2937,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6" [[package]] name = "tracing" -version = "0.1.23" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d40a22fd029e33300d8d89a5cc8ffce18bb7c587662f54629e94c9de5487f3" +checksum = "f77d3842f76ca899ff2dbcf231c5c65813dea431301d6eb686279c15c4464f12" dependencies = [ "cfg-if 1.0.0", "pin-project-lite", @@ -2751,11 +2957,11 @@ dependencies = [ [[package]] name = "tracing-futures" -version = "0.2.4" +version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab7bb6f14721aa00656086e9335d363c5c8747bae02ebe32ea2c7dece5689b4c" +checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2" dependencies = [ - "pin-project 0.4.27", + "pin-project", "tracing", ] @@ -2801,6 +3007,12 @@ version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb0d2e7be6ae3a5fa87eed5fb451aff96f2573d2694942e40543ae0bbe19c796" +[[package]] +name = "unicode-width" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3" + [[package]] name = "unicode-xid" version = "0.2.1" @@ -2836,7 +3048,7 @@ dependencies = [ "once_cell", "qstring", "rustls", - "url 2.2.0", + "url 2.2.1", "webpki", "webpki-roots", ] @@ -2854,12 +3066,12 @@ dependencies = [ [[package]] name = "url" -version = "2.2.0" +version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5909f2b0817350449ed73e8bcd81c8c3c8d9a7a5d8acba4b27db277f1868976e" +checksum = "9ccd964113622c8e9322cfac19eb1004a07e636c545f325da085d5cdde6f1f8b" dependencies = [ "form_urlencoded", - "idna 0.2.1", + "idna 0.2.2", "matches", "percent-encoding 2.1.0", ] diff --git a/Cargo.toml b/Cargo.toml index a7ef8ed4..e4f9c51e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,18 +15,18 @@ edition = "2018" name = "librespot" path = "src/lib.rs" -# [[bin]] -# name = "librespot" -# path = "src/main.rs" -# doc = false +[[bin]] +name = "librespot" +path = "src/main.rs" +doc = false [dependencies.librespot-audio] path = "audio" version = "0.1.3" -# [dependencies.librespot-connect] -# path = "connect" -# version = "0.1.3" +[dependencies.librespot-connect] +path = "connect" +version = "0.1.3" [dependencies.librespot-core] path = "core" @@ -44,6 +44,22 @@ version = "0.1.3" path = "protocol" version = "0.1.3" +[dependencies] +base64 = "0.13" +env_logger = {version = "0.8", default-features = false, features = ["termcolor","humantime","atty"]} +futures = "0.3" +getopts = "0.2" +hyper = "0.14" +log = "0.4" +num-bigint = "0.3" +protobuf = "~2.14.0" +rand = "0.7" +rpassword = "5.0" +tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros", "signal", "process"] } +url = "1.7" +sha-1 = "0.8" +hex = "0.4" + [features] alsa-backend = ["librespot-playback/alsa-backend"] portaudio-backend = ["librespot-playback/portaudio-backend"] diff --git a/connect/Cargo.toml b/connect/Cargo.toml index b5cd4fdb..1f73f01b 100644 --- a/connect/Cargo.toml +++ b/connect/Cargo.toml @@ -19,8 +19,8 @@ version = "0.1.3" [dependencies] base64 = "0.13" -futures = "0.1" -hyper = "0.12" +futures = "0.3" +hyper = { version = "0.14", features = ["server", "http1"] } log = "0.4" num-bigint = "0.3" protobuf = "~2.14.0" @@ -28,7 +28,7 @@ rand = "0.7" serde = "1.0" serde_derive = "1.0" serde_json = "1.0" -tokio = "0.1" +tokio = { version = "1.0", features = ["macros"] } url = "1.7" sha-1 = "0.8" hmac = "0.7" @@ -36,7 +36,7 @@ aes-ctr = "0.3" block-modes = "0.3" dns-sd = { version = "0.1.3", optional = true } -libmdns = { version = "0.2.7", optional = true } +libmdns = { version = "0.6", optional = true } [features] diff --git a/connect/src/discovery.rs b/connect/src/discovery.rs index f9414ee6..62310b2f 100644 --- a/connect/src/discovery.rs +++ b/connect/src/discovery.rs @@ -2,15 +2,18 @@ use aes_ctr::stream_cipher::generic_array::GenericArray; use aes_ctr::stream_cipher::{NewStreamCipher, SyncStreamCipher}; use aes_ctr::Aes128Ctr; use base64; -use futures::sync::mpsc; -use futures::{Future, Poll, Stream}; +use futures::channel::{mpsc, oneshot}; +use futures::{Stream, StreamExt}; use hmac::{Hmac, Mac}; - -use hyper::{ - self, server::conn::Http, service::Service, Body, Method, Request, Response, StatusCode, -}; +use hyper::service::{make_service_fn, service_fn}; +use hyper::{Body, Method, Request, Response, StatusCode}; use sha1::{Digest, Sha1}; +use std::borrow::Cow; +use std::convert::Infallible; +use std::net::{Ipv4Addr, SocketAddr}; +use std::task::{Context, Poll}; + #[cfg(feature = "with-dns-sd")] use dns_sd::DNSService; @@ -21,8 +24,8 @@ use num_bigint::BigUint; use rand; use std::collections::BTreeMap; use std::io; +use std::pin::Pin; use std::sync::Arc; -use tokio::runtime::current_thread::Handle; use url; use librespot_core::authentication::Credentials; @@ -63,13 +66,8 @@ impl Discovery { (discovery, rx) } -} -impl Discovery { - fn handle_get_info( - &self, - _params: &BTreeMap, - ) -> ::futures::Finished, hyper::Error> { + fn handle_get_info(&self, _: BTreeMap, Cow<'_, str>>) -> Response { let public_key = self.0.public_key.to_bytes_be(); let public_key = base64::encode(&public_key); @@ -93,20 +91,20 @@ impl Discovery { }); let body = result.to_string(); - ::futures::finished(Response::new(Body::from(body))) + Response::new(Body::from(body)) } fn handle_add_user( &self, - params: &BTreeMap, - ) -> ::futures::Finished, hyper::Error> { - let username = params.get("userName").unwrap(); + params: BTreeMap, Cow<'_, str>>, + ) -> Response { + let username = params.get("userName").unwrap().as_ref(); let encrypted_blob = params.get("blob").unwrap(); let client_key = params.get("clientKey").unwrap(); - let encrypted_blob = base64::decode(encrypted_blob).unwrap(); + let encrypted_blob = base64::decode(encrypted_blob.as_bytes()).unwrap(); - let client_key = base64::decode(client_key).unwrap(); + let client_key = base64::decode(client_key.as_bytes()).unwrap(); let client_key = BigUint::from_bytes_be(&client_key); let shared_key = util::powm(&client_key, &self.0.private_key, &DH_PRIME); @@ -141,7 +139,7 @@ impl Discovery { }); let body = result.to_string(); - return ::futures::finished(Response::new(Body::from(body))); + return Response::new(Body::from(body)); } let decrypted = { @@ -155,7 +153,7 @@ impl Discovery { }; let credentials = - Credentials::with_blob(username.to_owned(), &decrypted, &self.0.device_id); + Credentials::with_blob(username.to_string(), &decrypted, &self.0.device_id); self.0.tx.unbounded_send(credentials).unwrap(); @@ -166,52 +164,39 @@ impl Discovery { }); let body = result.to_string(); - return ::futures::finished(Response::new(Body::from(body))); + Response::new(Body::from(body)) } - fn not_found(&self) -> ::futures::Finished, hyper::Error> { + fn not_found(&self) -> Response { let mut res = Response::default(); *res.status_mut() = StatusCode::NOT_FOUND; - ::futures::finished(res) + res } -} -impl Service for Discovery { - type ReqBody = Body; - type ResBody = Body; - type Error = hyper::Error; - - type Future = Box, Error = hyper::Error> + Send>; - fn call(&mut self, request: Request<(Self::ReqBody)>) -> Self::Future { + async fn call(self, request: Request) -> hyper::Result> { let mut params = BTreeMap::new(); let (parts, body) = request.into_parts(); if let Some(query) = parts.uri.query() { - params.extend(url::form_urlencoded::parse(query.as_bytes()).into_owned()); + let query_params = url::form_urlencoded::parse(query.as_bytes()); + params.extend(query_params); } if parts.method != Method::GET { debug!("{:?} {:?} {:?}", parts.method, parts.uri.path(), params); } - let this = self.clone(); - Box::new( - body.fold(Vec::new(), |mut acc, chunk| { - acc.extend_from_slice(chunk.as_ref()); - Ok::<_, hyper::Error>(acc) - }) - .map(move |body| { - params.extend(url::form_urlencoded::parse(&body).into_owned()); - params - }) - .and_then(move |params| { - match (parts.method, params.get("action").map(AsRef::as_ref)) { - (Method::GET, Some("getInfo")) => this.handle_get_info(¶ms), - (Method::POST, Some("addUser")) => this.handle_add_user(¶ms), - _ => this.not_found(), - } - }), + let body = hyper::body::to_bytes(body).await?; + + params.extend(url::form_urlencoded::parse(&body)); + + Ok( + match (parts.method, params.get("action").map(AsRef::as_ref)) { + (Method::GET, Some("getInfo")) => self.handle_get_info(params), + (Method::POST, Some("addUser")) => self.handle_add_user(params), + _ => self.not_found(), + }, ) } } @@ -220,48 +205,40 @@ impl Service for Discovery { pub struct DiscoveryStream { credentials: mpsc::UnboundedReceiver, _svc: DNSService, + _close_tx: oneshot::Sender, } #[cfg(not(feature = "with-dns-sd"))] pub struct DiscoveryStream { credentials: mpsc::UnboundedReceiver, _svc: libmdns::Service, + _close_tx: oneshot::Sender, } pub fn discovery( - handle: &Handle, config: ConnectConfig, device_id: String, port: u16, ) -> io::Result { let (discovery, creds_rx) = Discovery::new(config.clone(), device_id); + let (close_tx, close_rx) = oneshot::channel(); - let serve = { - let http = Http::new(); - http.serve_addr(&format!("0.0.0.0:{}", port).parse().unwrap(), move || { - Ok(discovery.clone()) - }) - .unwrap() - }; + let address = SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), port); - let s_port = serve.incoming_ref().local_addr().port(); + let make_service = make_service_fn(move |_| { + let discovery = discovery.clone(); + async move { Ok::<_, hyper::Error>(service_fn(move |request| discovery.clone().call(request))) } + }); + + let server = hyper::Server::bind(&address).serve(make_service); + + let s_port = server.local_addr().port(); debug!("Zeroconf server listening on 0.0.0.0:{}", s_port); - let server_future = { - let handle = handle.clone(); - serve - .for_each( - move |connecting: hyper::server::conn::Connecting< - hyper::server::conn::AddrStream, - futures::Failed<_, hyper::Error>, - >| { - handle.spawn(connecting.flatten().then(|_| Ok(()))).unwrap(); - Ok(()) - }, - ) - .then(|_| Ok(())) - }; - handle.spawn(server_future).unwrap(); + tokio::spawn(server.with_graceful_shutdown(async { + close_rx.await.unwrap_err(); + debug!("Shutting down discovery server"); + })); #[cfg(feature = "with-dns-sd")] let svc = DNSService::register( @@ -275,7 +252,7 @@ pub fn discovery( .unwrap(); #[cfg(not(feature = "with-dns-sd"))] - let responder = libmdns::Responder::spawn(&handle)?; + let responder = libmdns::Responder::spawn(&tokio::runtime::Handle::current())?; #[cfg(not(feature = "with-dns-sd"))] let svc = responder.register( @@ -288,14 +265,14 @@ pub fn discovery( Ok(DiscoveryStream { credentials: creds_rx, _svc: svc, + _close_tx: close_tx, }) } impl Stream for DiscoveryStream { type Item = Credentials; - type Error = (); - fn poll(&mut self) -> Poll, Self::Error> { - self.credentials.poll() + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.credentials.poll_next_unpin(cx) } } diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index 352a3fcf..2e3694e4 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -1,26 +1,26 @@ -use std; +use std::pin::Pin; use std::time::{SystemTime, UNIX_EPOCH}; -use futures::future; -use futures::sync::mpsc; -use futures::{Async, Future, Poll, Sink, Stream}; -use protobuf::{self, Message}; -use rand; -use rand::seq::SliceRandom; -use serde_json; - use crate::context::StationContext; use crate::playback::mixer::Mixer; use crate::playback::player::{Player, PlayerEvent, PlayerEventChannel}; use crate::protocol; use crate::protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State, TrackRef}; +use futures::channel::mpsc; +use futures::future::{self, FusedFuture}; +use futures::stream::FusedStream; +use futures::{Future, FutureExt, StreamExt}; use librespot_core::config::{ConnectConfig, VolumeCtrl}; -use librespot_core::mercury::MercuryError; +use librespot_core::mercury::{MercuryError, MercurySender}; use librespot_core::session::Session; use librespot_core::spotify_id::{SpotifyAudioType, SpotifyId, SpotifyIdError}; use librespot_core::util::url_encode; use librespot_core::util::SeqGenerator; use librespot_core::version; +use protobuf::{self, Message}; +use rand; +use rand::seq::SliceRandom; +use serde_json; enum SpircPlayStatus { Stopped, @@ -40,7 +40,10 @@ enum SpircPlayStatus { }, } -pub struct SpircTask { +type BoxedFuture = Pin + Send>>; +type BoxedStream = Pin + Send>>; + +struct SpircTask { player: Player, mixer: Box, config: SpircTaskConfig, @@ -54,15 +57,15 @@ pub struct SpircTask { mixer_started: bool, play_status: SpircPlayStatus, - subscription: Box>, - sender: Box>, + subscription: BoxedStream, + sender: MercurySender, commands: mpsc::UnboundedReceiver, player_events: PlayerEventChannel, shutdown: bool, session: Session, - context_fut: Box>, - autoplay_fut: Box>, + context_fut: BoxedFuture>, + autoplay_fut: BoxedFuture>, context: Option, } @@ -246,7 +249,7 @@ impl Spirc { session: Session, player: Player, mixer: Box, - ) -> (Spirc, SpircTask) { + ) -> (Spirc, impl Future) { debug!("new Spirc[{}]", session.session_id()); let ident = session.device_id().to_owned(); @@ -255,22 +258,20 @@ impl Spirc { debug!("canonical_username: {}", url_encode(&session.username())); let uri = format!("hm://remote/user/{}/", url_encode(&session.username())); - let subscription = session.mercury().subscribe(&uri as &str); - let subscription = subscription - .map(|stream| stream.map_err(|_| MercuryError)) - .flatten_stream(); - let subscription = Box::new(subscription.map(|response| -> Frame { - let data = response.payload.first().unwrap(); - protobuf::parse_from_bytes(data).unwrap() - })); - - let sender = Box::new( + let subscription = Box::pin( session .mercury() - .sender(uri) - .with(|frame: Frame| Ok(frame.write_to_bytes().unwrap())), + .subscribe(uri.clone()) + .map(Result::unwrap) + .flatten_stream() + .map(|response| -> Frame { + let data = response.payload.first().unwrap(); + protobuf::parse_from_bytes(data).unwrap() + }), ); + let sender = session.mercury().sender(uri); + let (cmd_tx, cmd_rx) = mpsc::unbounded(); let volume = config.volume; @@ -304,10 +305,10 @@ impl Spirc { player_events: player_events, shutdown: false, - session: session.clone(), + session: session, - context_fut: Box::new(future::empty()), - autoplay_fut: Box::new(future::empty()), + context_fut: Box::pin(future::pending()), + autoplay_fut: Box::pin(future::pending()), context: None, }; @@ -317,7 +318,7 @@ impl Spirc { task.hello(); - (spirc, task) + (spirc, task.run()) } pub fn play(&self) { @@ -346,114 +347,76 @@ impl Spirc { } } -impl Future for SpircTask { - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll<(), ()> { - loop { - let mut progress = false; - - if self.session.is_invalid() { - return Ok(Async::Ready(())); - } - - if !self.shutdown { - match self.subscription.poll().unwrap() { - Async::Ready(Some(frame)) => { - progress = true; - self.handle_frame(frame); - } - Async::Ready(None) => { +impl SpircTask { + async fn run(mut self) { + while !self.session.is_invalid() && !self.shutdown { + tokio::select! { + frame = self.subscription.next() => match frame { + Some(frame) => self.handle_frame(frame), + None => { error!("subscription terminated"); - self.shutdown = true; - self.commands.close(); + break; } - Async::NotReady => (), - } - - match self.commands.poll().unwrap() { - Async::Ready(Some(command)) => { - progress = true; - self.handle_command(command); + }, + cmd = self.commands.next(), if !self.commands.is_terminated() => if let Some(cmd) = cmd { + self.handle_command(cmd); + }, + event = self.player_events.next(), if !self.player_events.is_terminated() => if let Some(event) = event { + self.handle_player_event(event) + }, + result = self.sender.flush(), if !self.sender.is_flushed() => if result.is_err() { + error!("Cannot flush spirc event sender."); + break; + }, + context = &mut self.context_fut, if !self.context_fut.is_terminated() => { + match context { + Ok(value) => { + let r_context = serde_json::from_value::(value); + self.context = match r_context { + Ok(context) => { + info!( + "Resolved {:?} tracks from <{:?}>", + context.tracks.len(), + self.state.get_context_uri(), + ); + Some(context) + } + Err(e) => { + error!("Unable to parse JSONContext {:?}", e); + None + } + }; + // It needn't be so verbose - can be as simple as + // if let Some(ref context) = r_context { + // info!("Got {:?} tracks from <{}>", context.tracks.len(), context.uri); + // } + // self.context = r_context; + }, + Err(err) => { + error!("ContextError: {:?}", err) + } } - Async::Ready(None) => (), - Async::NotReady => (), - } - - match self.player_events.poll() { - Ok(Async::NotReady) => (), - Ok(Async::Ready(None)) => (), - Err(_) => (), - Ok(Async::Ready(Some(event))) => { - progress = true; - self.handle_player_event(event); + }, + autoplay = &mut self.autoplay_fut, if !self.autoplay_fut.is_terminated() => { + match autoplay { + Ok(autoplay_station_uri) => { + info!("Autoplay uri resolved to <{:?}>", autoplay_station_uri); + self.context_fut = self.resolve_station(&autoplay_station_uri); + }, + Err(err) => { + error!("AutoplayError: {:?}", err) + } } - } - // TODO: Refactor - match self.context_fut.poll() { - Ok(Async::Ready(value)) => { - let r_context = serde_json::from_value::(value.clone()); - self.context = match r_context { - Ok(context) => { - info!( - "Resolved {:?} tracks from <{:?}>", - context.tracks.len(), - self.state.get_context_uri(), - ); - Some(context) - } - Err(e) => { - error!("Unable to parse JSONContext {:?}\n{:?}", e, value); - None - } - }; - // It needn't be so verbose - can be as simple as - // if let Some(ref context) = r_context { - // info!("Got {:?} tracks from <{}>", context.tracks.len(), context.uri); - // } - // self.context = r_context; - - progress = true; - self.context_fut = Box::new(future::empty()); - } - Ok(Async::NotReady) => (), - Err(err) => { - self.context_fut = Box::new(future::empty()); - error!("ContextError: {:?}", err) - } - } - - match self.autoplay_fut.poll() { - Ok(Async::Ready(autoplay_station_uri)) => { - info!("Autoplay uri resolved to <{:?}>", autoplay_station_uri); - self.context_fut = self.resolve_station(&autoplay_station_uri); - progress = true; - self.autoplay_fut = Box::new(future::empty()); - } - Ok(Async::NotReady) => (), - Err(err) => { - self.autoplay_fut = Box::new(future::empty()); - error!("AutoplayError: {:?}", err) - } - } - } - - let poll_sender = self.sender.poll_complete().unwrap(); - - // Only shutdown once we've flushed out all our messages - if self.shutdown && poll_sender.is_ready() { - return Ok(Async::Ready(())); - } - - if !progress { - return Ok(Async::NotReady); + }, + else => break } } - } -} -impl SpircTask { + if self.sender.flush().await.is_err() { + warn!("Cannot flush spirc event sender."); + } + } + fn now_ms(&mut self) -> i64 { let dur = match SystemTime::now().duration_since(UNIX_EPOCH) { Ok(dur) => dur, @@ -1060,52 +1023,53 @@ impl SpircTask { } } - fn resolve_station( - &self, - uri: &str, - ) -> Box> { + fn resolve_station(&self, uri: &str) -> BoxedFuture> { let radio_uri = format!("hm://radio-apollo/v3/stations/{}", uri); self.resolve_uri(&radio_uri) } - fn resolve_autoplay_uri( - &self, - uri: &str, - ) -> Box> { + fn resolve_autoplay_uri(&self, uri: &str) -> BoxedFuture> { let query_uri = format!("hm://autoplay-enabled/query?uri={}", uri); let request = self.session.mercury().get(query_uri); - Box::new(request.and_then(move |response| { - if response.status_code == 200 { + Box::pin( + async { + let response = request.await?; + + if response.status_code == 200 { + let data = response + .payload + .first() + .expect("Empty autoplay uri") + .to_vec(); + let autoplay_uri = String::from_utf8(data).unwrap(); + Ok(autoplay_uri) + } else { + warn!("No autoplay_uri found"); + Err(MercuryError) + } + } + .fuse(), + ) + } + + fn resolve_uri(&self, uri: &str) -> BoxedFuture> { + let request = self.session.mercury().get(uri); + + Box::pin( + async move { + let response = request.await?; + let data = response .payload .first() - .expect("Empty autoplay uri") - .to_vec(); - let autoplay_uri = String::from_utf8(data).unwrap(); - Ok(autoplay_uri) - } else { - warn!("No autoplay_uri found"); - Err(MercuryError) + .expect("Empty payload on context uri"); + let response: serde_json::Value = serde_json::from_slice(&data).unwrap(); + + Ok(response) } - })) - } - - fn resolve_uri( - &self, - uri: &str, - ) -> Box> { - let request = self.session.mercury().get(uri); - - Box::new(request.and_then(move |response| { - let data = response - .payload - .first() - .expect("Empty payload on context uri"); - let response: serde_json::Value = serde_json::from_slice(&data).unwrap(); - - Ok(response) - })) + .fuse(), + ) } fn update_tracks_from_context(&mut self) { @@ -1345,7 +1309,6 @@ impl<'a> CommandSender<'a> { self.frame.set_state(self.spirc.state.clone()); } - let send = self.spirc.sender.start_send(self.frame).unwrap(); - assert!(send.is_ready()); + self.spirc.sender.send(self.frame.write_to_bytes().unwrap()); } } diff --git a/core/src/mercury/mod.rs b/core/src/mercury/mod.rs index 72360c97..4baa674f 100644 --- a/core/src/mercury/mod.rs +++ b/core/src/mercury/mod.rs @@ -102,46 +102,48 @@ impl MercuryManager { MercurySender::new(self.clone(), uri.into()) } - pub async fn subscribe>( + pub fn subscribe>( &self, uri: T, - ) -> Result, MercuryError> { + ) -> impl Future, MercuryError>> + 'static + { let uri = uri.into(); - let response = self - .request(MercuryRequest { - method: MercuryMethod::SUB, - uri: uri.clone(), - content_type: None, - payload: Vec::new(), - }) - .await?; - - let (tx, rx) = mpsc::unbounded(); - - let manager = self.clone(); - - manager.lock(move |inner| { - if !inner.invalid { - debug!("subscribed uri={} count={}", uri, response.payload.len()); - if !response.payload.is_empty() { - // Old subscription protocol, watch the provided list of URIs - for sub in response.payload { - let mut sub: protocol::pubsub::Subscription = - protobuf::parse_from_bytes(&sub).unwrap(); - let sub_uri = sub.take_uri(); - - debug!("subscribed sub_uri={}", sub_uri); - - inner.subscriptions.push((sub_uri, tx.clone())); - } - } else { - // New subscription protocol, watch the requested URI - inner.subscriptions.push((uri, tx)); - } - } + let request = self.request(MercuryRequest { + method: MercuryMethod::SUB, + uri: uri.clone(), + content_type: None, + payload: Vec::new(), }); - Ok(rx) + let manager = self.clone(); + async move { + let response = request.await?; + + let (tx, rx) = mpsc::unbounded(); + + manager.lock(move |inner| { + if !inner.invalid { + debug!("subscribed uri={} count={}", uri, response.payload.len()); + if !response.payload.is_empty() { + // Old subscription protocol, watch the provided list of URIs + for sub in response.payload { + let mut sub: protocol::pubsub::Subscription = + protobuf::parse_from_bytes(&sub).unwrap(); + let sub_uri = sub.take_uri(); + + debug!("subscribed sub_uri={}", sub_uri); + + inner.subscriptions.push((sub_uri, tx.clone())); + } + } else { + // New subscription protocol, watch the requested URI + inner.subscriptions.push((uri, tx)); + } + } + }); + + Ok(rx) + } } pub(crate) fn dispatch(&self, cmd: u8, mut data: Bytes) { diff --git a/core/src/mercury/sender.rs b/core/src/mercury/sender.rs index 860c2f33..e276bcf1 100644 --- a/core/src/mercury/sender.rs +++ b/core/src/mercury/sender.rs @@ -1,5 +1,4 @@ -use futures::Sink; -use std::{collections::VecDeque, pin::Pin, task::Context}; +use std::collections::VecDeque; use super::*; @@ -18,6 +17,22 @@ impl MercurySender { pending: VecDeque::new(), } } + + pub fn is_flushed(&self) -> bool { + self.pending.is_empty() + } + + pub fn send(&mut self, item: Vec) { + let task = self.mercury.send(self.uri.clone(), item); + self.pending.push_back(task); + } + + pub async fn flush(&mut self) -> Result<(), MercuryError> { + for fut in self.pending.drain(..) { + fut.await?; + } + Ok(()) + } } impl Clone for MercurySender { @@ -29,39 +44,3 @@ impl Clone for MercurySender { } } } - -impl Sink> for MercurySender { - type Error = MercuryError; - - fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.poll_flush(cx) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - loop { - match self.pending.front_mut() { - Some(task) => { - match Pin::new(task).poll(cx) { - Poll::Ready(Err(x)) => return Poll::Ready(Err(x)), - Poll::Pending => return Poll::Pending, - _ => (), - }; - } - None => { - return Poll::Ready(Ok(())); - } - } - self.pending.pop_front(); - } - } - - fn start_send(mut self: Pin<&mut Self>, item: Vec) -> Result<(), Self::Error> { - let task = self.mercury.send(self.uri.clone(), item); - self.pending.push_back(task); - Ok(()) - } -} diff --git a/core/src/session.rs b/core/src/session.rs index fd706798..b0eca0c0 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -39,6 +39,8 @@ struct SessionInternal { mercury: OnceCell, cache: Option>, + handle: tokio::runtime::Handle, + session_id: usize, } @@ -65,7 +67,13 @@ impl Session { cache.save_credentials(&reusable_credentials); } - let session = Session::create(conn, config, cache, reusable_credentials.username); + let session = Session::create( + conn, + config, + cache, + reusable_credentials.username, + tokio::runtime::Handle::current(), + ); Ok(session) } @@ -75,6 +83,7 @@ impl Session { config: SessionConfig, cache: Option, username: String, + handle: tokio::runtime::Handle, ) -> Session { let (sink, stream) = transport.split(); @@ -100,6 +109,8 @@ impl Session { channel: OnceCell::new(), mercury: OnceCell::new(), + handle, + session_id: session_id, })); @@ -139,7 +150,7 @@ impl Session { T: Future + Send + 'static, T::Output: Send + 'static, { - tokio::spawn(task); + self.0.handle.spawn(task); } fn debug_info(&self) { diff --git a/playback/src/player.rs b/playback/src/player.rs index 6f6a85ae..3ee5c989 100644 --- a/playback/src/player.rs +++ b/playback/src/player.rs @@ -7,7 +7,6 @@ use crate::audio::{ use crate::audio_backend::Sink; use crate::config::NormalisationType; use crate::config::{Bitrate, PlayerConfig}; -use crate::librespot_core::tokio; use crate::metadata::{AudioItem, FileFormat}; use crate::mixer::AudioFilter; use librespot_core::session::Session; @@ -15,25 +14,22 @@ use librespot_core::spotify_id::SpotifyId; use librespot_core::util::SeqGenerator; use byteorder::{LittleEndian, ReadBytesExt}; -use futures::{ - channel::{mpsc, oneshot}, - future, Future, Stream, StreamExt, -}; -use std::io::{Read, Seek, SeekFrom}; -use std::mem; +use futures::channel::{mpsc, oneshot}; +use futures::{future, Future, Stream, StreamExt, TryFutureExt}; +use std::borrow::Cow; + +use std::cmp::max; +use std::io::{self, Read, Seek, SeekFrom}; +use std::pin::Pin; +use std::task::{Context, Poll}; use std::time::{Duration, Instant}; -use std::{borrow::Cow, io}; -use std::{ - cmp::max, - pin::Pin, - task::{Context, Poll}, -}; +use std::{mem, thread}; const PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS: u32 = 30000; pub struct Player { commands: Option>, - task_handle: Option>, + thread_handle: Option>, play_request_id_generator: SeqGenerator, } @@ -251,33 +247,33 @@ impl Player { let (cmd_tx, cmd_rx) = mpsc::unbounded(); let (event_sender, event_receiver) = mpsc::unbounded(); - debug!("new Player[{}]", session.session_id()); + let handle = thread::spawn(move || { + debug!("new Player[{}]", session.session_id()); - let internal = PlayerInternal { - session: session, - config: config, - commands: cmd_rx, + let internal = PlayerInternal { + session: session, + config: config, + commands: cmd_rx, - state: PlayerState::Stopped, - preload: PlayerPreload::None, - sink: sink_builder(), - sink_status: SinkStatus::Closed, - sink_event_callback: None, - audio_filter: audio_filter, - event_senders: [event_sender].to_vec(), - }; + state: PlayerState::Stopped, + preload: PlayerPreload::None, + sink: sink_builder(), + sink_status: SinkStatus::Closed, + sink_event_callback: None, + audio_filter: audio_filter, + event_senders: [event_sender].to_vec(), + }; - // While PlayerInternal is written as a future, it still contains blocking code. - // It must be run by using wait() in a dedicated thread. - let handle = tokio::spawn(async move { - internal.await; + // While PlayerInternal is written as a future, it still contains blocking code. + // It must be run by using wait() in a dedicated thread. + futures::executor::block_on(internal); debug!("PlayerInternal thread finished."); }); ( Player { commands: Some(cmd_tx), - task_handle: Some(handle), + thread_handle: Some(handle), play_request_id_generator: SeqGenerator::new(0), }, event_receiver, @@ -351,13 +347,11 @@ impl Drop for Player { fn drop(&mut self) { debug!("Shutting down player thread ..."); self.commands = None; - if let Some(handle) = self.task_handle.take() { - tokio::spawn(async { - match handle.await { - Ok(_) => (), - Err(_) => error!("Player thread panicked!"), - } - }); + if let Some(handle) = self.thread_handle.take() { + match handle.join() { + Ok(_) => (), + Err(_) => error!("Player thread panicked!"), + } } } } @@ -436,15 +430,23 @@ impl PlayerState { #[allow(dead_code)] fn is_stopped(&self) -> bool { - matches!(self, Self::Stopped) + use self::PlayerState::*; + match *self { + Stopped => true, + _ => false, + } } fn is_loading(&self) -> bool { - matches!(self, Self::Loading { .. }) + use self::PlayerState::*; + match *self { + Loading { .. } => true, + _ => false, + } } fn decoder(&mut self) -> Option<&mut Decoder> { - use PlayerState::*; + use self::PlayerState::*; match *self { Stopped | EndOfTrack { .. } | Loading { .. } => None, Paused { @@ -1243,9 +1245,10 @@ impl PlayerInternal { loaded_track .stream_loader_controller .set_random_access_mode(); - let _ = tokio::task::block_in_place(|| { - loaded_track.decoder.seek(position_ms as i64) - }); + let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking. + // But most likely the track is fully + // loaded already because we played + // to the end of it. loaded_track.stream_loader_controller.set_stream_mode(); loaded_track.stream_position_pcm = Self::position_ms_to_pcm(position_ms); } @@ -1278,7 +1281,7 @@ impl PlayerInternal { // we can use the current decoder. Ensure it's at the correct position. if Self::position_ms_to_pcm(position_ms) != *stream_position_pcm { stream_loader_controller.set_random_access_mode(); - let _ = tokio::task::block_in_place(|| decoder.seek(position_ms as i64)); + let _ = decoder.seek(position_ms as i64); // This may be blocking. stream_loader_controller.set_stream_mode(); *stream_position_pcm = Self::position_ms_to_pcm(position_ms); } @@ -1346,9 +1349,7 @@ impl PlayerInternal { loaded_track .stream_loader_controller .set_random_access_mode(); - let _ = tokio::task::block_in_place(|| { - loaded_track.decoder.seek(position_ms as i64) - }); + let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking loaded_track.stream_loader_controller.set_stream_mode(); } self.start_playback(track_id, play_request_id, *loaded_track, play); @@ -1563,7 +1564,7 @@ impl PlayerInternal { } } - pub fn load_track( + fn load_track( &self, spotify_id: SpotifyId, position_ms: u32, @@ -1574,22 +1575,23 @@ impl PlayerInternal { // easily. Instead we spawn a thread to do the work and return a one-shot channel as the // future to work with. - let session = self.session.clone(); - let config = self.config.clone(); + let loader = PlayerTrackLoader { + session: self.session.clone(), + config: self.config.clone(), + }; - async move { - let loader = PlayerTrackLoader { session, config }; + let (result_tx, result_rx) = oneshot::channel(); - let (result_tx, result_rx) = oneshot::channel(); - - tokio::spawn(async move { - if let Some(data) = loader.load_track(spotify_id, position_ms).await { + std::thread::spawn(move || { + futures::executor::block_on(loader.load_track(spotify_id, position_ms)).and_then( + move |data| { let _ = result_tx.send(data); - } - }); + Some(()) + }, + ); + }); - result_rx.await.map_err(|_| ()) - } + result_rx.map_err(|_| ()) } fn preload_data_before_playback(&mut self) { @@ -1615,9 +1617,7 @@ impl PlayerInternal { * bytes_per_second as f64) as usize, (READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, ); - tokio::task::block_in_place(|| { - stream_loader_controller.fetch_next_blocking(wait_for_data_length) - }); + stream_loader_controller.fetch_next_blocking(wait_for_data_length); } } } diff --git a/src/lib.rs b/src/lib.rs index 4304e187..7cdd3178 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ #![crate_name = "librespot"] pub extern crate librespot_audio as audio; -// pub extern crate librespot_connect as connect; +pub extern crate librespot_connect as connect; pub extern crate librespot_core as core; pub extern crate librespot_metadata as metadata; pub extern crate librespot_playback as playback; diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 00000000..1392c201 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,600 @@ +use futures::{channel::mpsc::UnboundedReceiver, future::FusedFuture, FutureExt, StreamExt}; +use librespot_playback::player::PlayerEvent; +use log::{error, info, warn}; +use sha1::{Digest, Sha1}; +use std::path::Path; +use std::process::exit; +use std::str::FromStr; +use std::{env, time::Instant}; +use std::{ + io::{stderr, Write}, + pin::Pin, +}; +use url::Url; + +use librespot::core::authentication::{get_credentials, Credentials}; +use librespot::core::cache::Cache; +use librespot::core::config::{ConnectConfig, DeviceType, SessionConfig, VolumeCtrl}; +use librespot::core::session::Session; +use librespot::core::version; + +use librespot::connect::spirc::Spirc; +use librespot::playback::audio_backend::{self, Sink, BACKENDS}; +use librespot::playback::config::{Bitrate, NormalisationType, PlayerConfig}; +use librespot::playback::mixer::{self, Mixer, MixerConfig}; +use librespot::playback::player::Player; + +mod player_event_handler; + +use player_event_handler::{emit_sink_event, run_program_on_events}; + +fn device_id(name: &str) -> String { + hex::encode(Sha1::digest(name.as_bytes())) +} + +fn usage(program: &str, opts: &getopts::Options) -> String { + let brief = format!("Usage: {} [options]", program); + opts.usage(&brief) +} + +fn setup_logging(verbose: bool) { + let mut builder = env_logger::Builder::new(); + match env::var("RUST_LOG") { + Ok(config) => { + builder.parse_filters(&config); + builder.init(); + + if verbose { + warn!("`--verbose` flag overidden by `RUST_LOG` environment variable"); + } + } + Err(_) => { + if verbose { + builder.parse_filters("libmdns=info,librespot=trace"); + } else { + builder.parse_filters("libmdns=info,librespot=info"); + } + builder.init(); + } + } +} + +fn list_backends() { + println!("Available Backends : "); + for (&(name, _), idx) in BACKENDS.iter().zip(0..) { + if idx == 0 { + println!("- {} (default)", name); + } else { + println!("- {}", name); + } + } +} + +#[derive(Clone)] +struct Setup { + backend: fn(Option) -> Box, + device: Option, + + mixer: fn(Option) -> Box, + + cache: Option, + player_config: PlayerConfig, + session_config: SessionConfig, + connect_config: ConnectConfig, + mixer_config: MixerConfig, + credentials: Option, + enable_discovery: bool, + zeroconf_port: u16, + player_event_program: Option, + emit_sink_events: bool, +} + +fn setup(args: &[String]) -> Setup { + let mut opts = getopts::Options::new(); + opts.optopt( + "c", + "cache", + "Path to a directory where files will be cached.", + "CACHE", + ).optopt( + "", + "system-cache", + "Path to a directory where system files (credentials, volume) will be cached. Can be different from cache option value", + "SYTEMCACHE", + ).optflag("", "disable-audio-cache", "Disable caching of the audio data.") + .reqopt("n", "name", "Device name", "NAME") + .optopt("", "device-type", "Displayed device type", "DEVICE_TYPE") + .optopt( + "b", + "bitrate", + "Bitrate (96, 160 or 320). Defaults to 160", + "BITRATE", + ) + .optopt( + "", + "onevent", + "Run PROGRAM when playback is about to begin.", + "PROGRAM", + ) + .optflag("", "emit-sink-events", "Run program set by --onevent before sink is opened and after it is closed.") + .optflag("v", "verbose", "Enable verbose output") + .optopt("u", "username", "Username to sign in with", "USERNAME") + .optopt("p", "password", "Password", "PASSWORD") + .optopt("", "proxy", "HTTP proxy to use when connecting", "PROXY") + .optopt("", "ap-port", "Connect to AP with specified port. If no AP with that port are present fallback AP will be used. Available ports are usually 80, 443 and 4070", "AP_PORT") + .optflag("", "disable-discovery", "Disable discovery mode") + .optopt( + "", + "backend", + "Audio backend to use. Use '?' to list options", + "BACKEND", + ) + .optopt( + "", + "device", + "Audio device to use. Use '?' to list options if using portaudio or alsa", + "DEVICE", + ) + .optopt("", "mixer", "Mixer to use (alsa or softvol)", "MIXER") + .optopt( + "m", + "mixer-name", + "Alsa mixer name, e.g \"PCM\" or \"Master\". Defaults to 'PCM'", + "MIXER_NAME", + ) + .optopt( + "", + "mixer-card", + "Alsa mixer card, e.g \"hw:0\" or similar from `aplay -l`. Defaults to 'default' ", + "MIXER_CARD", + ) + .optopt( + "", + "mixer-index", + "Alsa mixer index, Index of the cards mixer. Defaults to 0", + "MIXER_INDEX", + ) + .optflag( + "", + "mixer-linear-volume", + "Disable alsa's mapped volume scale (cubic). Default false", + ) + .optopt( + "", + "initial-volume", + "Initial volume in %, once connected (must be from 0 to 100)", + "VOLUME", + ) + .optopt( + "", + "zeroconf-port", + "The port the internal server advertised over zeroconf uses.", + "ZEROCONF_PORT", + ) + .optflag( + "", + "enable-volume-normalisation", + "Play all tracks at the same volume", + ) + .optopt( + "", + "normalisation-gain-type", + "Specify the normalisation gain type to use - [track, album]. Default is album.", + "GAIN_TYPE", + ) + .optopt( + "", + "normalisation-pregain", + "Pregain (dB) applied by volume normalisation", + "PREGAIN", + ) + .optopt( + "", + "volume-ctrl", + "Volume control type - [linear, log, fixed]. Default is logarithmic", + "VOLUME_CTRL" + ) + .optflag( + "", + "autoplay", + "autoplay similar songs when your music ends.", + ) + .optflag( + "", + "disable-gapless", + "disable gapless playback.", + ); + + let matches = match opts.parse(&args[1..]) { + Ok(m) => m, + Err(f) => { + writeln!( + stderr(), + "error: {}\n{}", + f.to_string(), + usage(&args[0], &opts) + ) + .unwrap(); + exit(1); + } + }; + + let verbose = matches.opt_present("verbose"); + setup_logging(verbose); + + info!( + "librespot {} ({}). Built on {}. Build ID: {}", + version::short_sha(), + version::commit_date(), + version::short_now(), + version::build_id() + ); + + let backend_name = matches.opt_str("backend"); + if backend_name == Some("?".into()) { + list_backends(); + exit(0); + } + + let backend = audio_backend::find(backend_name).expect("Invalid backend"); + + let device = matches.opt_str("device"); + if device == Some("?".into()) { + backend(device); + exit(0); + } + + let mixer_name = matches.opt_str("mixer"); + let mixer = mixer::find(mixer_name.as_ref()).expect("Invalid mixer"); + + let mixer_config = MixerConfig { + card: matches + .opt_str("mixer-card") + .unwrap_or_else(|| String::from("default")), + mixer: matches + .opt_str("mixer-name") + .unwrap_or_else(|| String::from("PCM")), + index: matches + .opt_str("mixer-index") + .map(|index| index.parse::().unwrap()) + .unwrap_or(0), + mapped_volume: !matches.opt_present("mixer-linear-volume"), + }; + + let cache = { + let audio_dir; + let system_dir; + if matches.opt_present("disable-audio-cache") { + audio_dir = None; + system_dir = matches + .opt_str("system-cache") + .or_else(|| matches.opt_str("c")) + .map(|p| p.into()); + } else { + let cache_dir = matches.opt_str("c"); + audio_dir = cache_dir + .as_ref() + .map(|p| AsRef::::as_ref(p).join("files")); + system_dir = matches + .opt_str("system-cache") + .or(cache_dir) + .map(|p| p.into()); + } + + match Cache::new(system_dir, audio_dir) { + Ok(cache) => Some(cache), + Err(e) => { + warn!("Cannot create cache: {}", e); + None + } + } + }; + + let initial_volume = matches + .opt_str("initial-volume") + .map(|volume| { + let volume = volume.parse::().unwrap(); + if volume > 100 { + panic!("Initial volume must be in the range 0-100"); + } + (volume as i32 * 0xFFFF / 100) as u16 + }) + .or_else(|| cache.as_ref().and_then(Cache::volume)) + .unwrap_or(0x8000); + + let zeroconf_port = matches + .opt_str("zeroconf-port") + .map(|port| port.parse::().unwrap()) + .unwrap_or(0); + + let name = matches.opt_str("name").unwrap(); + + let credentials = { + let cached_credentials = cache.as_ref().and_then(Cache::credentials); + + let password = |username: &String| -> String { + write!(stderr(), "Password for {}: ", username).unwrap(); + stderr().flush().unwrap(); + rpassword::read_password().unwrap() + }; + + get_credentials( + matches.opt_str("username"), + matches.opt_str("password"), + cached_credentials, + password, + ) + }; + + let session_config = { + let device_id = device_id(&name); + + SessionConfig { + user_agent: version::version_string(), + device_id: device_id, + proxy: matches.opt_str("proxy").or(std::env::var("http_proxy").ok()).map( + |s| { + match Url::parse(&s) { + Ok(url) => { + if url.host().is_none() || url.port_or_known_default().is_none() { + panic!("Invalid proxy url, only urls on the format \"http://host:port\" are allowed"); + } + + if url.scheme() != "http" { + panic!("Only unsecure http:// proxies are supported"); + } + url + }, + Err(err) => panic!("Invalid proxy url: {}, only urls on the format \"http://host:port\" are allowed", err) + } + }, + ), + ap_port: matches + .opt_str("ap-port") + .map(|port| port.parse::().expect("Invalid port")), + } + }; + + let player_config = { + let bitrate = matches + .opt_str("b") + .as_ref() + .map(|bitrate| Bitrate::from_str(bitrate).expect("Invalid bitrate")) + .unwrap_or(Bitrate::default()); + let gain_type = matches + .opt_str("normalisation-gain-type") + .as_ref() + .map(|gain_type| { + NormalisationType::from_str(gain_type).expect("Invalid normalisation type") + }) + .unwrap_or(NormalisationType::default()); + PlayerConfig { + bitrate: bitrate, + gapless: !matches.opt_present("disable-gapless"), + normalisation: matches.opt_present("enable-volume-normalisation"), + normalisation_type: gain_type, + normalisation_pregain: matches + .opt_str("normalisation-pregain") + .map(|pregain| pregain.parse::().expect("Invalid pregain float value")) + .unwrap_or(PlayerConfig::default().normalisation_pregain), + } + }; + + let connect_config = { + let device_type = matches + .opt_str("device-type") + .as_ref() + .map(|device_type| DeviceType::from_str(device_type).expect("Invalid device type")) + .unwrap_or(DeviceType::default()); + + let volume_ctrl = matches + .opt_str("volume-ctrl") + .as_ref() + .map(|volume_ctrl| VolumeCtrl::from_str(volume_ctrl).expect("Invalid volume ctrl type")) + .unwrap_or(VolumeCtrl::default()); + + ConnectConfig { + name: name, + device_type: device_type, + volume: initial_volume, + volume_ctrl: volume_ctrl, + autoplay: matches.opt_present("autoplay"), + } + }; + + let enable_discovery = !matches.opt_present("disable-discovery"); + + Setup { + backend: backend, + cache: cache, + session_config: session_config, + player_config: player_config, + connect_config: connect_config, + credentials: credentials, + device: device, + enable_discovery: enable_discovery, + zeroconf_port: zeroconf_port, + mixer: mixer, + mixer_config: mixer_config, + player_event_program: matches.opt_str("onevent"), + emit_sink_events: matches.opt_present("emit-sink-events"), + } +} + +#[tokio::main] +async fn main() { + if env::var("RUST_BACKTRACE").is_err() { + env::set_var("RUST_BACKTRACE", "full") + } + + let args: Vec = std::env::args().collect(); + let setupp = setup(&args); + + let mut last_credentials = None; + let mut spirc: Option = None; + let mut spirc_task: Option> = None; + let mut player_event_channel: Option> = None; + let mut auto_connect_times: Vec = vec![]; + let mut discovery = None; + let mut connecting: Pin>> = + Box::pin(futures::future::pending()); + + if setupp.enable_discovery { + let config = setupp.connect_config.clone(); + let device_id = setupp.session_config.device_id.clone(); + + discovery = Some( + librespot_connect::discovery::discovery(config, device_id, setupp.zeroconf_port) + .unwrap(), + ); + } + + if let Some(credentials) = setupp.credentials { + last_credentials = Some(credentials.clone()); + connecting = Box::pin( + Session::connect( + setupp.session_config.clone(), + credentials, + setupp.cache.clone(), + ) + .fuse(), + ); + } + + loop { + tokio::select! { + credentials = async { discovery.as_mut().unwrap().next().await }, if discovery.is_some() => { + match credentials { + Some(credentials) => { + last_credentials = Some(credentials.clone()); + auto_connect_times.clear(); + + if let Some(spirc) = spirc.take() { + spirc.shutdown(); + } + if let Some(spirc_task) = spirc_task.take() { + // Continue shutdown in its own task + tokio::spawn(spirc_task); + } + + connecting = Box::pin(Session::connect( + setupp.session_config.clone(), + credentials, + setupp.cache.clone(), + ).fuse()); + }, + None => { + warn!("Discovery stopped!"); + discovery = None; + } + } + }, + session = &mut connecting, if !connecting.is_terminated() => match session { + Ok(session) => { + let mixer_config = setupp.mixer_config.clone(); + let mixer = (setupp.mixer)(Some(mixer_config)); + let player_config = setupp.player_config.clone(); + let connect_config = setupp.connect_config.clone(); + + let audio_filter = mixer.get_audio_filter(); + let backend = setupp.backend; + let device = setupp.device.clone(); + let (player, event_channel) = + Player::new(player_config, session.clone(), audio_filter, move || { + (backend)(device) + }); + + if setupp.emit_sink_events { + if let Some(player_event_program) = setupp.player_event_program.clone() { + player.set_sink_event_callback(Some(Box::new(move |sink_status| { + match emit_sink_event(sink_status, &player_event_program) { + Ok(e) if e.success() => (), + Ok(e) => { + if let Some(code) = e.code() { + warn!("Sink event prog returned exit code {}", code); + } else { + warn!("Sink event prog returned failure"); + } + } + Err(e) => { + warn!("Emitting sink event failed: {}", e); + } + } + }))); + } + }; + + let (spirc_, spirc_task_) = Spirc::new(connect_config, session, player, mixer); + + spirc = Some(spirc_); + spirc_task = Some(Box::pin(spirc_task_)); + player_event_channel = Some(event_channel); + }, + Err(e) => { + warn!("Connection failed: {}", e); + } + }, + _ = async { spirc_task.as_mut().unwrap().await }, if spirc_task.is_some() => { + spirc_task = None; + + warn!("Spirc shut down unexpectedly"); + while !auto_connect_times.is_empty() + && ((Instant::now() - auto_connect_times[0]).as_secs() > 600) + { + let _ = auto_connect_times.remove(0); + } + + if let Some(credentials) = last_credentials.clone() { + if auto_connect_times.len() >= 5 { + warn!("Spirc shut down too often. Not reconnecting automatically."); + } else { + auto_connect_times.push(Instant::now()); + + connecting = Box::pin(Session::connect( + setupp.session_config.clone(), + credentials, + setupp.cache.clone(), + ).fuse()); + } + } + }, + event = async { player_event_channel.as_mut().unwrap().next().await }, if player_event_channel.is_some() => match event { + Some(event) => { + if let Some(program) = &setupp.player_event_program { + if let Some(child) = run_program_on_events(event, program) { + let mut child = child.expect("program failed to start"); + + tokio::spawn(async move { + match child.wait().await { + Ok(status) if !status.success() => error!("child exited with status {:?}", status.code()), + Err(e) => error!("failed to wait on child process: {}", e), + _ => {} + } + }); + } + } + }, + None => { + player_event_channel = None; + } + }, + _ = tokio::signal::ctrl_c() => { + break; + } + } + } + + info!("Gracefully shutting down"); + + // Shutdown spirc if necessary + if let Some(spirc) = spirc { + spirc.shutdown(); + + if let Some(mut spirc_task) = spirc_task { + tokio::select! { + _ = tokio::signal::ctrl_c() => (), + _ = spirc_task.as_mut() => () + } + } + } +} diff --git a/src/player_event_handler.rs b/src/player_event_handler.rs index 102cf780..361e6b1a 100644 --- a/src/player_event_handler.rs +++ b/src/player_event_handler.rs @@ -2,22 +2,12 @@ use librespot::playback::player::PlayerEvent; use log::info; use std::collections::HashMap; use std::io; -use std::process::Command; -use tokio_process::{Child, CommandExt}; +use std::process::{Command, ExitStatus}; -use futures::Future; use librespot::playback::player::SinkStatus; +use tokio::process::{Child as AsyncChild, Command as AsyncCommand}; -fn run_program(program: &str, env_vars: HashMap<&str, String>) -> io::Result { - let mut v: Vec<&str> = program.split_whitespace().collect(); - info!("Running {:?} with environment variables {:?}", v, env_vars); - Command::new(&v.remove(0)) - .args(&v) - .envs(env_vars.iter()) - .spawn_async() -} - -pub fn run_program_on_events(event: PlayerEvent, onevent: &str) -> Option> { +pub fn run_program_on_events(event: PlayerEvent, onevent: &str) -> Option> { let mut env_vars = HashMap::new(); match event { PlayerEvent::Changed { @@ -68,10 +58,18 @@ pub fn run_program_on_events(event: PlayerEvent, onevent: &str) -> Option return None, } - Some(run_program(onevent, env_vars)) + + let mut v: Vec<&str> = onevent.split_whitespace().collect(); + info!("Running {:?} with environment variables {:?}", v, env_vars); + Some( + AsyncCommand::new(&v.remove(0)) + .args(&v) + .envs(env_vars.iter()) + .spawn(), + ) } -pub fn emit_sink_event(sink_status: SinkStatus, onevent: &str) { +pub fn emit_sink_event(sink_status: SinkStatus, onevent: &str) -> io::Result { let mut env_vars = HashMap::new(); env_vars.insert("PLAYER_EVENT", "sink".to_string()); let sink_status = match sink_status { @@ -80,6 +78,12 @@ pub fn emit_sink_event(sink_status: SinkStatus, onevent: &str) { SinkStatus::Closed => "closed", }; env_vars.insert("SINK_STATUS", sink_status.to_string()); + let mut v: Vec<&str> = onevent.split_whitespace().collect(); + info!("Running {:?} with environment variables {:?}", v, env_vars); - let _ = run_program(onevent, env_vars).and_then(|child| child.wait()); + Command::new(&v.remove(0)) + .args(&v) + .envs(env_vars.iter()) + .spawn()? + .wait() }