Merge pull request #606 from Johannesd3/tokio_connect_migration

Finish tokio migration
This commit is contained in:
Ash 2021-02-23 09:25:34 +01:00 committed by GitHub
commit c9b3b955bd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 1264 additions and 500 deletions

380
Cargo.lock generated
View file

@ -27,21 +27,44 @@ version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "884391ef1066acaa41e766ba8f596341b96e93ce34f9a43e7d24bf0a0eaf0561"
dependencies = [
"aes-soft",
"aesni",
"aes-soft 0.6.4",
"aesni 0.10.0",
"cipher",
]
[[package]]
name = "aes-ctr"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2e5b0458ea3beae0d1d8c0f3946564f8e10f90646cf78c06b4351052058d1ee"
dependencies = [
"aes-soft 0.3.3",
"aesni 0.6.0",
"ctr 0.3.2",
"stream-cipher",
]
[[package]]
name = "aes-ctr"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7729c3cde54d67063be556aeac75a81330d802f0259500ca40cb52967f975763"
dependencies = [
"aes-soft",
"aesni",
"aes-soft 0.6.4",
"aesni 0.10.0",
"cipher",
"ctr",
"ctr 0.6.0",
]
[[package]]
name = "aes-soft"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cfd7e7ae3f9a1fb5c03b389fc6bb9a51400d0c13053f0dca698c832bfd893a0d"
dependencies = [
"block-cipher-trait",
"byteorder",
"opaque-debug 0.2.3",
]
[[package]]
@ -54,6 +77,17 @@ dependencies = [
"opaque-debug 0.3.0",
]
[[package]]
name = "aesni"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f70a6b5f971e473091ab7cfb5ffac6cde81666c4556751d8d5620ead8abf100"
dependencies = [
"block-cipher-trait",
"opaque-debug 0.2.3",
"stream-cipher",
]
[[package]]
name = "aesni"
version = "0.10.0"
@ -223,6 +257,25 @@ dependencies = [
"generic-array 0.12.3",
]
[[package]]
name = "block-cipher-trait"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1c924d49bd09e7c06003acda26cd9742e796e34282ec6c1189404dee0c1f4774"
dependencies = [
"generic-array 0.12.3",
]
[[package]]
name = "block-modes"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31aa8410095e39fdb732909fb5730a48d5bd7c2e3cd76bd1b07b3dbea130c529"
dependencies = [
"block-cipher-trait",
"block-padding",
]
[[package]]
name = "block-padding"
version = "0.1.5"
@ -234,9 +287,9 @@ dependencies = [
[[package]]
name = "bumpalo"
version = "3.6.0"
version = "3.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "099e596ef14349721d9016f6b80dd3419ea1bf289ab9b44df8e4dfd3a005d5d9"
checksum = "63396b8a4b9de3f4fdfb320ab6080762242f66a8ef174c49d8e19b674db4cdbe"
[[package]]
name = "byte-tools"
@ -258,9 +311,9 @@ checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040"
[[package]]
name = "cc"
version = "1.0.66"
version = "1.0.67"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c0496836a84f8d0495758516b8621a622beb77c0fed418570e50764093ced48"
checksum = "e3c69b077ad434294d3ce9f1f6143a2a4b89a8a2d54ef813d85003a4fd1137fd"
[[package]]
name = "cesu8"
@ -384,13 +437,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3818dfca4b0cb5211a659bbcbb94225b7127407b2b135e650d717bfb78ab10d3"
dependencies = [
"cookie",
"idna 0.2.1",
"idna 0.2.2",
"log",
"publicsuffix",
"serde",
"serde_json",
"time 0.2.25",
"url 2.2.0",
"url 2.2.1",
]
[[package]]
@ -462,6 +515,16 @@ dependencies = [
"subtle",
]
[[package]]
name = "ctr"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "022cd691704491df67d25d006fe8eca083098253c4d43516c2206479c58c6736"
dependencies = [
"block-cipher-trait",
"stream-cipher",
]
[[package]]
name = "ctr"
version = "0.6.0"
@ -532,6 +595,16 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0"
[[package]]
name = "dns-sd"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d748509dea20228f63ba519bf142ce2593396386125b01f5b0d6412dab972087"
dependencies = [
"libc",
"pkg-config",
]
[[package]]
name = "either"
version = "1.6.1"
@ -540,9 +613,9 @@ checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]]
name = "env_logger"
version = "0.8.2"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26ecb66b4bdca6c1409b40fb255eefc2bd4f6d135dab3c3124f80ffa2a9661e"
checksum = "17392a012ea30ef05a610aa97dfb49496e71c9f676b27879922ea5bdf60d9d3f"
dependencies = [
"atty",
"humantime",
@ -586,7 +659,7 @@ checksum = "1d34cfa13a63ae058bfa601fe9e313bbdb3746427c1459185464ce0fcf62e1e8"
dependencies = [
"cfg-if 1.0.0",
"libc",
"redox_syscall 0.2.4",
"redox_syscall",
"winapi",
]
@ -598,9 +671,9 @@ checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "form_urlencoded"
version = "1.0.0"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ece68d15c92e84fa4f19d3780f1294e5ca82a78a6d515f1efaabcc144688be00"
checksum = "5fc25a87fa4fd2094bffb06925852034d90a17f0d1e05197d4956d3555752191"
dependencies = [
"matches",
"percent-encoding 2.1.0",
@ -732,6 +805,15 @@ dependencies = [
"version_check",
]
[[package]]
name = "getopts"
version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14dbbfd5c71d70241ecf9e6f13737f7b5ce823821063188d7e46c41d371eebd5"
dependencies = [
"unicode-width",
]
[[package]]
name = "getrandom"
version = "0.1.16"
@ -824,9 +906,9 @@ dependencies = [
[[package]]
name = "gstreamer"
version = "0.16.5"
version = "0.16.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d50f822055923f1cbede233aa5dfd4ee957cf328fb3076e330886094e11d6cf"
checksum = "9ff5d0f7ff308ae37e6eb47b6ded17785bdea06e438a708cd09e0288c1862f33"
dependencies = [
"bitflags",
"cfg-if 1.0.0",
@ -964,6 +1046,12 @@ dependencies = [
"libc",
]
[[package]]
name = "hex"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "644f9158b2f133fd50f5fb3242878846d9eb792e445c893805ff0e3824006e35"
[[package]]
name = "hmac"
version = "0.7.1"
@ -974,6 +1062,17 @@ dependencies = [
"digest",
]
[[package]]
name = "hostname"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c731c3e10504cc8ed35cfe2f1db4c9274c3d35fa486e3b31df46f068ef3e867"
dependencies = [
"libc",
"match_cfg",
"winapi",
]
[[package]]
name = "http"
version = "0.2.3"
@ -1029,7 +1128,7 @@ dependencies = [
"httparse",
"httpdate",
"itoa",
"pin-project 1.0.5",
"pin-project",
"socket2",
"tokio",
"tower-service",
@ -1056,15 +1155,36 @@ dependencies = [
[[package]]
name = "idna"
version = "0.2.1"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de910d521f7cc3135c4de8db1cb910e0b5ed1dc6f57c381cd07e8e661ce10094"
checksum = "89829a5d69c23d348314a7ac337fe39173b61149a9864deabd260983aed48c21"
dependencies = [
"matches",
"unicode-bidi",
"unicode-normalization",
]
[[package]]
name = "if-addrs"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28538916eb3f3976311f5dfbe67b5362d0add1293d0a9cad17debf86f8e3aa48"
dependencies = [
"if-addrs-sys",
"libc",
"winapi",
]
[[package]]
name = "if-addrs-sys"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de74b9dd780476e837e5eb5ab7c88b49ed304126e412030a0adba99c8efe79ea"
dependencies = [
"cc",
"libc",
]
[[package]]
name = "indexmap"
version = "1.6.1"
@ -1222,6 +1342,24 @@ dependencies = [
"winapi",
]
[[package]]
name = "libmdns"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b276920bfc6c9285e16ffd30ed410487f0185f383483f45a3446afc0554fded"
dependencies = [
"byteorder",
"futures-util",
"hostname",
"if-addrs",
"log",
"multimap",
"quick-error",
"rand 0.8.3",
"socket2",
"tokio",
]
[[package]]
name = "libpulse-binding"
version = "2.23.0"
@ -1274,18 +1412,33 @@ dependencies = [
name = "librespot"
version = "0.1.3"
dependencies = [
"base64 0.13.0",
"env_logger",
"futures",
"getopts",
"hex",
"hyper",
"librespot-audio",
"librespot-connect",
"librespot-core",
"librespot-metadata",
"librespot-playback",
"librespot-protocol",
"log",
"num-bigint",
"protobuf",
"rand 0.7.3",
"rpassword",
"sha-1",
"tokio",
"url 1.7.2",
]
[[package]]
name = "librespot-audio"
version = "0.1.3"
dependencies = [
"aes-ctr",
"aes-ctr 0.6.0",
"bit-set",
"byteorder",
"bytes",
@ -1301,6 +1454,33 @@ dependencies = [
"vorbis",
]
[[package]]
name = "librespot-connect"
version = "0.1.3"
dependencies = [
"aes-ctr 0.3.0",
"base64 0.13.0",
"block-modes",
"dns-sd",
"futures",
"hmac",
"hyper",
"libmdns",
"librespot-core",
"librespot-playback",
"librespot-protocol",
"log",
"num-bigint",
"protobuf",
"rand 0.7.3",
"serde",
"serde_derive",
"serde_json",
"sha-1",
"tokio",
"url 1.7.2",
]
[[package]]
name = "librespot-core"
version = "0.1.3"
@ -1434,6 +1614,12 @@ dependencies = [
"libc",
]
[[package]]
name = "match_cfg"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffbee8634e0d45d258acb448e7eaab3fce7a0a467395d4d9f228e3c1f01fb2e4"
[[package]]
name = "matches"
version = "0.1.8"
@ -1458,9 +1644,9 @@ dependencies = [
[[package]]
name = "mio"
version = "0.7.7"
version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e50ae3f04d169fcc9bde0b547d1c205219b7157e07ded9c5aff03e0637cb3ed7"
checksum = "dc250d6848c90d719ea2ce34546fb5df7af1d3fd189d10bf7bad80bfcebecd95"
dependencies = [
"libc",
"log",
@ -1485,6 +1671,15 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0419348c027fa7be448d2ae7ea0e4e04c2334c31dc4e74ab29f00a2a7ca69204"
[[package]]
name = "multimap"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1255076139a83bb467426e7f8d0134968a8118844faa755985e077cf31850333"
dependencies = [
"serde",
]
[[package]]
name = "ndk"
version = "0.2.1"
@ -1726,14 +1921,14 @@ dependencies = [
[[package]]
name = "parking_lot_core"
version = "0.8.2"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ccb628cad4f84851442432c60ad8e1f607e29752d0bf072cbd0baf28aa34272"
checksum = "fa7a782938e745763fe6907fc6ba86946d72f49fe7e21de074e08128a99fb018"
dependencies = [
"cfg-if 1.0.0",
"instant",
"libc",
"redox_syscall 0.1.57",
"redox_syscall",
"smallvec",
"winapi",
]
@ -1786,33 +1981,13 @@ dependencies = [
"ucd-trie",
]
[[package]]
name = "pin-project"
version = "0.4.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ffbc8e94b38ea3d2d8ba92aea2983b503cd75d0888d75b86bb37970b5698e15"
dependencies = [
"pin-project-internal 0.4.27",
]
[[package]]
name = "pin-project"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96fa8ebb90271c4477f144354485b8068bd8f6b78b428b01ba892ca26caf0b63"
dependencies = [
"pin-project-internal 1.0.5",
]
[[package]]
name = "pin-project-internal"
version = "0.4.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "65ad2ae56b6abe3a1ee25f15ee605bacadb9a764edaba9c2bf4103800d4a1895"
dependencies = [
"proc-macro2",
"quote",
"syn",
"pin-project-internal",
]
[[package]]
@ -1963,10 +2138,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3bbaa49075179162b49acac1c6aa45fb4dafb5f13cf6794276d77bc7fd95757b"
dependencies = [
"error-chain",
"idna 0.2.1",
"idna 0.2.2",
"lazy_static",
"regex",
"url 2.2.0",
"url 2.2.1",
]
[[package]]
@ -1979,10 +2154,16 @@ dependencies = [
]
[[package]]
name = "quote"
version = "1.0.8"
name = "quick-error"
version = "1.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "991431c3519a3f36861882da93630ce66b52918dcf1b8e2fd66b397fc96f28df"
checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0"
[[package]]
name = "quote"
version = "1.0.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3d0b9745dc2debf507c8422de05d7226cc1f0644216dfdfead988f9b1ab32a7"
dependencies = [
"proc-macro2",
]
@ -2021,7 +2202,7 @@ checksum = "0ef9e7e66b4468674bfcb0c81af8b7fa0bb154fa9f28eb840da5c447baeb8d7e"
dependencies = [
"libc",
"rand_chacha 0.3.0",
"rand_core 0.6.1",
"rand_core 0.6.2",
"rand_hc 0.3.0",
]
@ -2042,7 +2223,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e12735cf05c9e10bf21534da50a147b924d555dc7a547c42e6bb2d5b6017ae0d"
dependencies = [
"ppv-lite86",
"rand_core 0.6.1",
"rand_core 0.6.2",
]
[[package]]
@ -2071,9 +2252,9 @@ dependencies = [
[[package]]
name = "rand_core"
version = "0.6.1"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c026d7df8b298d90ccbbc5190bd04d85e159eaf5576caeacf8741da93ccbd2e5"
checksum = "34cf66eb183df1c5876e2dcf6b13d57340741e8dc255b48e40a26de954d06ae7"
dependencies = [
"getrandom 0.2.2",
]
@ -2093,20 +2274,14 @@ version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3190ef7066a446f2e7f42e239d161e905420ccab01eb967c9eb27d21b2322a73"
dependencies = [
"rand_core 0.6.1",
"rand_core 0.6.2",
]
[[package]]
name = "redox_syscall"
version = "0.1.57"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce"
[[package]]
name = "redox_syscall"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05ec8ca9416c5ea37062b502703cd7fcb207736bc294f6e0cf367ac6fc234570"
checksum = "94341e4e44e24f6b591b59e47a8a027df12e008d73fd5672dbea9cc22f4507d9"
dependencies = [
"bitflags",
]
@ -2168,6 +2343,16 @@ dependencies = [
"cpal",
]
[[package]]
name = "rpassword"
version = "5.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffc936cf8a7ea60c58f030fd36a612a48f440610214dc54bc36431f9ea0c3efb"
dependencies = [
"libc",
"winapi",
]
[[package]]
name = "rustc-demangle"
version = "0.1.18"
@ -2386,6 +2571,15 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fdf1b9db47230893d76faad238fd6097fd6d6a9245cd7a4d90dbd639536bbd2"
[[package]]
name = "signal-hook-registry"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16f1d0fef1604ba8f7a073c7e701f213e056707210e9020af4528e0101ce11a6"
dependencies = [
"libc",
]
[[package]]
name = "slab"
version = "0.4.2"
@ -2479,6 +2673,15 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0"
[[package]]
name = "stream-cipher"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8131256a5896cabcf5eb04f4d6dacbe1aefda854b0d9896e09cb58829ec5638c"
dependencies = [
"generic-array 0.12.3",
]
[[package]]
name = "strsim"
version = "0.9.3"
@ -2549,9 +2752,9 @@ dependencies = [
[[package]]
name = "tar"
version = "0.4.32"
version = "0.4.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0313546c01d59e29be4f09687bcb4fb6690cec931cc3607b6aec7a0e417f4cc6"
checksum = "c0bcfbd6a598361fda270d82469fff3d65089dc33e175c9a131f7b4cd395f228"
dependencies = [
"filetime",
"libc",
@ -2567,7 +2770,7 @@ dependencies = [
"cfg-if 1.0.0",
"libc",
"rand 0.8.3",
"redox_syscall 0.2.4",
"redox_syscall",
"remove_dir_all",
"winapi",
]
@ -2583,18 +2786,18 @@ dependencies = [
[[package]]
name = "thiserror"
version = "1.0.23"
version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76cc616c6abf8c8928e2fdcc0dbfab37175edd8fb49a4641066ad1364fdab146"
checksum = "e0f4a65597094d4483ddaed134f409b2cb7c1beccf25201a9f73c719254fa98e"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.23"
version = "1.0.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9be73a2caec27583d0046ef3796c3794f868a5bc813db689eed00c7631275cd1"
checksum = "7765189610d8241a44529806d6fd1f2e0a08734313a35d5b3a556f92b381f3c0"
dependencies = [
"proc-macro2",
"quote",
@ -2603,9 +2806,9 @@ dependencies = [
[[package]]
name = "thread_local"
version = "1.1.2"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d8208a331e1cb318dd5bd76951d2b8fc48ca38a69f5f4e4af1b6a9f8c6236915"
checksum = "8018d24e04c95ac8790716a5987d0fec4f8b27249ffa0f7d33f1369bdfb88cbd"
dependencies = [
"once_cell",
]
@ -2685,8 +2888,11 @@ dependencies = [
"memchr",
"mio",
"num_cpus",
"once_cell",
"pin-project-lite",
"signal-hook-registry",
"tokio-macros",
"winapi",
]
[[package]]
@ -2731,9 +2937,9 @@ checksum = "360dfd1d6d30e05fda32ace2c8c70e9c0a9da713275777f5a4dbb8a1893930c6"
[[package]]
name = "tracing"
version = "0.1.23"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7d40a22fd029e33300d8d89a5cc8ffce18bb7c587662f54629e94c9de5487f3"
checksum = "f77d3842f76ca899ff2dbcf231c5c65813dea431301d6eb686279c15c4464f12"
dependencies = [
"cfg-if 1.0.0",
"pin-project-lite",
@ -2751,11 +2957,11 @@ dependencies = [
[[package]]
name = "tracing-futures"
version = "0.2.4"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab7bb6f14721aa00656086e9335d363c5c8747bae02ebe32ea2c7dece5689b4c"
checksum = "97d095ae15e245a057c8e8451bab9b3ee1e1f68e9ba2b4fbc18d0ac5237835f2"
dependencies = [
"pin-project 0.4.27",
"pin-project",
"tracing",
]
@ -2801,6 +3007,12 @@ version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb0d2e7be6ae3a5fa87eed5fb451aff96f2573d2694942e40543ae0bbe19c796"
[[package]]
name = "unicode-width"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3"
[[package]]
name = "unicode-xid"
version = "0.2.1"
@ -2836,7 +3048,7 @@ dependencies = [
"once_cell",
"qstring",
"rustls",
"url 2.2.0",
"url 2.2.1",
"webpki",
"webpki-roots",
]
@ -2854,12 +3066,12 @@ dependencies = [
[[package]]
name = "url"
version = "2.2.0"
version = "2.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5909f2b0817350449ed73e8bcd81c8c3c8d9a7a5d8acba4b27db277f1868976e"
checksum = "9ccd964113622c8e9322cfac19eb1004a07e636c545f325da085d5cdde6f1f8b"
dependencies = [
"form_urlencoded",
"idna 0.2.1",
"idna 0.2.2",
"matches",
"percent-encoding 2.1.0",
]

View file

@ -15,18 +15,18 @@ edition = "2018"
name = "librespot"
path = "src/lib.rs"
# [[bin]]
# name = "librespot"
# path = "src/main.rs"
# doc = false
[[bin]]
name = "librespot"
path = "src/main.rs"
doc = false
[dependencies.librespot-audio]
path = "audio"
version = "0.1.3"
# [dependencies.librespot-connect]
# path = "connect"
# version = "0.1.3"
[dependencies.librespot-connect]
path = "connect"
version = "0.1.3"
[dependencies.librespot-core]
path = "core"
@ -44,6 +44,22 @@ version = "0.1.3"
path = "protocol"
version = "0.1.3"
[dependencies]
base64 = "0.13"
env_logger = {version = "0.8", default-features = false, features = ["termcolor","humantime","atty"]}
futures = "0.3"
getopts = "0.2"
hyper = "0.14"
log = "0.4"
num-bigint = "0.3"
protobuf = "~2.14.0"
rand = "0.7"
rpassword = "5.0"
tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros", "signal", "process"] }
url = "1.7"
sha-1 = "0.8"
hex = "0.4"
[features]
alsa-backend = ["librespot-playback/alsa-backend"]
portaudio-backend = ["librespot-playback/portaudio-backend"]

View file

@ -19,8 +19,8 @@ version = "0.1.3"
[dependencies]
base64 = "0.13"
futures = "0.1"
hyper = "0.12"
futures = "0.3"
hyper = { version = "0.14", features = ["server", "http1"] }
log = "0.4"
num-bigint = "0.3"
protobuf = "~2.14.0"
@ -28,7 +28,7 @@ rand = "0.7"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
tokio = "0.1"
tokio = { version = "1.0", features = ["macros"] }
url = "1.7"
sha-1 = "0.8"
hmac = "0.7"
@ -36,7 +36,7 @@ aes-ctr = "0.3"
block-modes = "0.3"
dns-sd = { version = "0.1.3", optional = true }
libmdns = { version = "0.2.7", optional = true }
libmdns = { version = "0.6", optional = true }
[features]

View file

@ -2,15 +2,18 @@ use aes_ctr::stream_cipher::generic_array::GenericArray;
use aes_ctr::stream_cipher::{NewStreamCipher, SyncStreamCipher};
use aes_ctr::Aes128Ctr;
use base64;
use futures::sync::mpsc;
use futures::{Future, Poll, Stream};
use futures::channel::{mpsc, oneshot};
use futures::{Stream, StreamExt};
use hmac::{Hmac, Mac};
use hyper::{
self, server::conn::Http, service::Service, Body, Method, Request, Response, StatusCode,
};
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, StatusCode};
use sha1::{Digest, Sha1};
use std::borrow::Cow;
use std::convert::Infallible;
use std::net::{Ipv4Addr, SocketAddr};
use std::task::{Context, Poll};
#[cfg(feature = "with-dns-sd")]
use dns_sd::DNSService;
@ -21,8 +24,8 @@ use num_bigint::BigUint;
use rand;
use std::collections::BTreeMap;
use std::io;
use std::pin::Pin;
use std::sync::Arc;
use tokio::runtime::current_thread::Handle;
use url;
use librespot_core::authentication::Credentials;
@ -63,13 +66,8 @@ impl Discovery {
(discovery, rx)
}
}
impl Discovery {
fn handle_get_info(
&self,
_params: &BTreeMap<String, String>,
) -> ::futures::Finished<Response<hyper::Body>, hyper::Error> {
fn handle_get_info(&self, _: BTreeMap<Cow<'_, str>, Cow<'_, str>>) -> Response<hyper::Body> {
let public_key = self.0.public_key.to_bytes_be();
let public_key = base64::encode(&public_key);
@ -93,20 +91,20 @@ impl Discovery {
});
let body = result.to_string();
::futures::finished(Response::new(Body::from(body)))
Response::new(Body::from(body))
}
fn handle_add_user(
&self,
params: &BTreeMap<String, String>,
) -> ::futures::Finished<Response<hyper::Body>, hyper::Error> {
let username = params.get("userName").unwrap();
params: BTreeMap<Cow<'_, str>, Cow<'_, str>>,
) -> Response<hyper::Body> {
let username = params.get("userName").unwrap().as_ref();
let encrypted_blob = params.get("blob").unwrap();
let client_key = params.get("clientKey").unwrap();
let encrypted_blob = base64::decode(encrypted_blob).unwrap();
let encrypted_blob = base64::decode(encrypted_blob.as_bytes()).unwrap();
let client_key = base64::decode(client_key).unwrap();
let client_key = base64::decode(client_key.as_bytes()).unwrap();
let client_key = BigUint::from_bytes_be(&client_key);
let shared_key = util::powm(&client_key, &self.0.private_key, &DH_PRIME);
@ -141,7 +139,7 @@ impl Discovery {
});
let body = result.to_string();
return ::futures::finished(Response::new(Body::from(body)));
return Response::new(Body::from(body));
}
let decrypted = {
@ -155,7 +153,7 @@ impl Discovery {
};
let credentials =
Credentials::with_blob(username.to_owned(), &decrypted, &self.0.device_id);
Credentials::with_blob(username.to_string(), &decrypted, &self.0.device_id);
self.0.tx.unbounded_send(credentials).unwrap();
@ -166,52 +164,39 @@ impl Discovery {
});
let body = result.to_string();
return ::futures::finished(Response::new(Body::from(body)));
Response::new(Body::from(body))
}
fn not_found(&self) -> ::futures::Finished<Response<hyper::Body>, hyper::Error> {
fn not_found(&self) -> Response<hyper::Body> {
let mut res = Response::default();
*res.status_mut() = StatusCode::NOT_FOUND;
::futures::finished(res)
res
}
}
impl Service for Discovery {
type ReqBody = Body;
type ResBody = Body;
type Error = hyper::Error;
type Future = Box<dyn Future<Item = Response<(Self::ResBody)>, Error = hyper::Error> + Send>;
fn call(&mut self, request: Request<(Self::ReqBody)>) -> Self::Future {
async fn call(self, request: Request<Body>) -> hyper::Result<Response<Body>> {
let mut params = BTreeMap::new();
let (parts, body) = request.into_parts();
if let Some(query) = parts.uri.query() {
params.extend(url::form_urlencoded::parse(query.as_bytes()).into_owned());
let query_params = url::form_urlencoded::parse(query.as_bytes());
params.extend(query_params);
}
if parts.method != Method::GET {
debug!("{:?} {:?} {:?}", parts.method, parts.uri.path(), params);
}
let this = self.clone();
Box::new(
body.fold(Vec::new(), |mut acc, chunk| {
acc.extend_from_slice(chunk.as_ref());
Ok::<_, hyper::Error>(acc)
})
.map(move |body| {
params.extend(url::form_urlencoded::parse(&body).into_owned());
params
})
.and_then(move |params| {
match (parts.method, params.get("action").map(AsRef::as_ref)) {
(Method::GET, Some("getInfo")) => this.handle_get_info(&params),
(Method::POST, Some("addUser")) => this.handle_add_user(&params),
_ => this.not_found(),
}
}),
let body = hyper::body::to_bytes(body).await?;
params.extend(url::form_urlencoded::parse(&body));
Ok(
match (parts.method, params.get("action").map(AsRef::as_ref)) {
(Method::GET, Some("getInfo")) => self.handle_get_info(params),
(Method::POST, Some("addUser")) => self.handle_add_user(params),
_ => self.not_found(),
},
)
}
}
@ -220,48 +205,40 @@ impl Service for Discovery {
pub struct DiscoveryStream {
credentials: mpsc::UnboundedReceiver<Credentials>,
_svc: DNSService,
_close_tx: oneshot::Sender<Infallible>,
}
#[cfg(not(feature = "with-dns-sd"))]
pub struct DiscoveryStream {
credentials: mpsc::UnboundedReceiver<Credentials>,
_svc: libmdns::Service,
_close_tx: oneshot::Sender<Infallible>,
}
pub fn discovery(
handle: &Handle,
config: ConnectConfig,
device_id: String,
port: u16,
) -> io::Result<DiscoveryStream> {
let (discovery, creds_rx) = Discovery::new(config.clone(), device_id);
let (close_tx, close_rx) = oneshot::channel();
let serve = {
let http = Http::new();
http.serve_addr(&format!("0.0.0.0:{}", port).parse().unwrap(), move || {
Ok(discovery.clone())
})
.unwrap()
};
let address = SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), port);
let s_port = serve.incoming_ref().local_addr().port();
let make_service = make_service_fn(move |_| {
let discovery = discovery.clone();
async move { Ok::<_, hyper::Error>(service_fn(move |request| discovery.clone().call(request))) }
});
let server = hyper::Server::bind(&address).serve(make_service);
let s_port = server.local_addr().port();
debug!("Zeroconf server listening on 0.0.0.0:{}", s_port);
let server_future = {
let handle = handle.clone();
serve
.for_each(
move |connecting: hyper::server::conn::Connecting<
hyper::server::conn::AddrStream,
futures::Failed<_, hyper::Error>,
>| {
handle.spawn(connecting.flatten().then(|_| Ok(()))).unwrap();
Ok(())
},
)
.then(|_| Ok(()))
};
handle.spawn(server_future).unwrap();
tokio::spawn(server.with_graceful_shutdown(async {
close_rx.await.unwrap_err();
debug!("Shutting down discovery server");
}));
#[cfg(feature = "with-dns-sd")]
let svc = DNSService::register(
@ -275,7 +252,7 @@ pub fn discovery(
.unwrap();
#[cfg(not(feature = "with-dns-sd"))]
let responder = libmdns::Responder::spawn(&handle)?;
let responder = libmdns::Responder::spawn(&tokio::runtime::Handle::current())?;
#[cfg(not(feature = "with-dns-sd"))]
let svc = responder.register(
@ -288,14 +265,14 @@ pub fn discovery(
Ok(DiscoveryStream {
credentials: creds_rx,
_svc: svc,
_close_tx: close_tx,
})
}
impl Stream for DiscoveryStream {
type Item = Credentials;
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
self.credentials.poll()
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.credentials.poll_next_unpin(cx)
}
}

View file

@ -1,26 +1,26 @@
use std;
use std::pin::Pin;
use std::time::{SystemTime, UNIX_EPOCH};
use futures::future;
use futures::sync::mpsc;
use futures::{Async, Future, Poll, Sink, Stream};
use protobuf::{self, Message};
use rand;
use rand::seq::SliceRandom;
use serde_json;
use crate::context::StationContext;
use crate::playback::mixer::Mixer;
use crate::playback::player::{Player, PlayerEvent, PlayerEventChannel};
use crate::protocol;
use crate::protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State, TrackRef};
use futures::channel::mpsc;
use futures::future::{self, FusedFuture};
use futures::stream::FusedStream;
use futures::{Future, FutureExt, StreamExt};
use librespot_core::config::{ConnectConfig, VolumeCtrl};
use librespot_core::mercury::MercuryError;
use librespot_core::mercury::{MercuryError, MercurySender};
use librespot_core::session::Session;
use librespot_core::spotify_id::{SpotifyAudioType, SpotifyId, SpotifyIdError};
use librespot_core::util::url_encode;
use librespot_core::util::SeqGenerator;
use librespot_core::version;
use protobuf::{self, Message};
use rand;
use rand::seq::SliceRandom;
use serde_json;
enum SpircPlayStatus {
Stopped,
@ -40,7 +40,10 @@ enum SpircPlayStatus {
},
}
pub struct SpircTask {
type BoxedFuture<T> = Pin<Box<dyn FusedFuture<Output = T> + Send>>;
type BoxedStream<T> = Pin<Box<dyn FusedStream<Item = T> + Send>>;
struct SpircTask {
player: Player,
mixer: Box<dyn Mixer>,
config: SpircTaskConfig,
@ -54,15 +57,15 @@ pub struct SpircTask {
mixer_started: bool,
play_status: SpircPlayStatus,
subscription: Box<dyn Stream<Item = Frame, Error = MercuryError>>,
sender: Box<dyn Sink<SinkItem = Frame, SinkError = MercuryError>>,
subscription: BoxedStream<Frame>,
sender: MercurySender,
commands: mpsc::UnboundedReceiver<SpircCommand>,
player_events: PlayerEventChannel,
shutdown: bool,
session: Session,
context_fut: Box<dyn Future<Item = serde_json::Value, Error = MercuryError>>,
autoplay_fut: Box<dyn Future<Item = String, Error = MercuryError>>,
context_fut: BoxedFuture<Result<serde_json::Value, MercuryError>>,
autoplay_fut: BoxedFuture<Result<String, MercuryError>>,
context: Option<StationContext>,
}
@ -246,7 +249,7 @@ impl Spirc {
session: Session,
player: Player,
mixer: Box<dyn Mixer>,
) -> (Spirc, SpircTask) {
) -> (Spirc, impl Future<Output = ()>) {
debug!("new Spirc[{}]", session.session_id());
let ident = session.device_id().to_owned();
@ -255,22 +258,20 @@ impl Spirc {
debug!("canonical_username: {}", url_encode(&session.username()));
let uri = format!("hm://remote/user/{}/", url_encode(&session.username()));
let subscription = session.mercury().subscribe(&uri as &str);
let subscription = subscription
.map(|stream| stream.map_err(|_| MercuryError))
.flatten_stream();
let subscription = Box::new(subscription.map(|response| -> Frame {
let data = response.payload.first().unwrap();
protobuf::parse_from_bytes(data).unwrap()
}));
let sender = Box::new(
let subscription = Box::pin(
session
.mercury()
.sender(uri)
.with(|frame: Frame| Ok(frame.write_to_bytes().unwrap())),
.subscribe(uri.clone())
.map(Result::unwrap)
.flatten_stream()
.map(|response| -> Frame {
let data = response.payload.first().unwrap();
protobuf::parse_from_bytes(data).unwrap()
}),
);
let sender = session.mercury().sender(uri);
let (cmd_tx, cmd_rx) = mpsc::unbounded();
let volume = config.volume;
@ -304,10 +305,10 @@ impl Spirc {
player_events: player_events,
shutdown: false,
session: session.clone(),
session: session,
context_fut: Box::new(future::empty()),
autoplay_fut: Box::new(future::empty()),
context_fut: Box::pin(future::pending()),
autoplay_fut: Box::pin(future::pending()),
context: None,
};
@ -317,7 +318,7 @@ impl Spirc {
task.hello();
(spirc, task)
(spirc, task.run())
}
pub fn play(&self) {
@ -346,114 +347,76 @@ impl Spirc {
}
}
impl Future for SpircTask {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
loop {
let mut progress = false;
if self.session.is_invalid() {
return Ok(Async::Ready(()));
}
if !self.shutdown {
match self.subscription.poll().unwrap() {
Async::Ready(Some(frame)) => {
progress = true;
self.handle_frame(frame);
}
Async::Ready(None) => {
impl SpircTask {
async fn run(mut self) {
while !self.session.is_invalid() && !self.shutdown {
tokio::select! {
frame = self.subscription.next() => match frame {
Some(frame) => self.handle_frame(frame),
None => {
error!("subscription terminated");
self.shutdown = true;
self.commands.close();
break;
}
Async::NotReady => (),
}
match self.commands.poll().unwrap() {
Async::Ready(Some(command)) => {
progress = true;
self.handle_command(command);
},
cmd = self.commands.next(), if !self.commands.is_terminated() => if let Some(cmd) = cmd {
self.handle_command(cmd);
},
event = self.player_events.next(), if !self.player_events.is_terminated() => if let Some(event) = event {
self.handle_player_event(event)
},
result = self.sender.flush(), if !self.sender.is_flushed() => if result.is_err() {
error!("Cannot flush spirc event sender.");
break;
},
context = &mut self.context_fut, if !self.context_fut.is_terminated() => {
match context {
Ok(value) => {
let r_context = serde_json::from_value::<StationContext>(value);
self.context = match r_context {
Ok(context) => {
info!(
"Resolved {:?} tracks from <{:?}>",
context.tracks.len(),
self.state.get_context_uri(),
);
Some(context)
}
Err(e) => {
error!("Unable to parse JSONContext {:?}", e);
None
}
};
// It needn't be so verbose - can be as simple as
// if let Some(ref context) = r_context {
// info!("Got {:?} tracks from <{}>", context.tracks.len(), context.uri);
// }
// self.context = r_context;
},
Err(err) => {
error!("ContextError: {:?}", err)
}
}
Async::Ready(None) => (),
Async::NotReady => (),
}
match self.player_events.poll() {
Ok(Async::NotReady) => (),
Ok(Async::Ready(None)) => (),
Err(_) => (),
Ok(Async::Ready(Some(event))) => {
progress = true;
self.handle_player_event(event);
},
autoplay = &mut self.autoplay_fut, if !self.autoplay_fut.is_terminated() => {
match autoplay {
Ok(autoplay_station_uri) => {
info!("Autoplay uri resolved to <{:?}>", autoplay_station_uri);
self.context_fut = self.resolve_station(&autoplay_station_uri);
},
Err(err) => {
error!("AutoplayError: {:?}", err)
}
}
}
// TODO: Refactor
match self.context_fut.poll() {
Ok(Async::Ready(value)) => {
let r_context = serde_json::from_value::<StationContext>(value.clone());
self.context = match r_context {
Ok(context) => {
info!(
"Resolved {:?} tracks from <{:?}>",
context.tracks.len(),
self.state.get_context_uri(),
);
Some(context)
}
Err(e) => {
error!("Unable to parse JSONContext {:?}\n{:?}", e, value);
None
}
};
// It needn't be so verbose - can be as simple as
// if let Some(ref context) = r_context {
// info!("Got {:?} tracks from <{}>", context.tracks.len(), context.uri);
// }
// self.context = r_context;
progress = true;
self.context_fut = Box::new(future::empty());
}
Ok(Async::NotReady) => (),
Err(err) => {
self.context_fut = Box::new(future::empty());
error!("ContextError: {:?}", err)
}
}
match self.autoplay_fut.poll() {
Ok(Async::Ready(autoplay_station_uri)) => {
info!("Autoplay uri resolved to <{:?}>", autoplay_station_uri);
self.context_fut = self.resolve_station(&autoplay_station_uri);
progress = true;
self.autoplay_fut = Box::new(future::empty());
}
Ok(Async::NotReady) => (),
Err(err) => {
self.autoplay_fut = Box::new(future::empty());
error!("AutoplayError: {:?}", err)
}
}
}
let poll_sender = self.sender.poll_complete().unwrap();
// Only shutdown once we've flushed out all our messages
if self.shutdown && poll_sender.is_ready() {
return Ok(Async::Ready(()));
}
if !progress {
return Ok(Async::NotReady);
},
else => break
}
}
}
}
impl SpircTask {
if self.sender.flush().await.is_err() {
warn!("Cannot flush spirc event sender.");
}
}
fn now_ms(&mut self) -> i64 {
let dur = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(dur) => dur,
@ -1060,52 +1023,53 @@ impl SpircTask {
}
}
fn resolve_station(
&self,
uri: &str,
) -> Box<dyn Future<Item = serde_json::Value, Error = MercuryError>> {
fn resolve_station(&self, uri: &str) -> BoxedFuture<Result<serde_json::Value, MercuryError>> {
let radio_uri = format!("hm://radio-apollo/v3/stations/{}", uri);
self.resolve_uri(&radio_uri)
}
fn resolve_autoplay_uri(
&self,
uri: &str,
) -> Box<dyn Future<Item = String, Error = MercuryError>> {
fn resolve_autoplay_uri(&self, uri: &str) -> BoxedFuture<Result<String, MercuryError>> {
let query_uri = format!("hm://autoplay-enabled/query?uri={}", uri);
let request = self.session.mercury().get(query_uri);
Box::new(request.and_then(move |response| {
if response.status_code == 200 {
Box::pin(
async {
let response = request.await?;
if response.status_code == 200 {
let data = response
.payload
.first()
.expect("Empty autoplay uri")
.to_vec();
let autoplay_uri = String::from_utf8(data).unwrap();
Ok(autoplay_uri)
} else {
warn!("No autoplay_uri found");
Err(MercuryError)
}
}
.fuse(),
)
}
fn resolve_uri(&self, uri: &str) -> BoxedFuture<Result<serde_json::Value, MercuryError>> {
let request = self.session.mercury().get(uri);
Box::pin(
async move {
let response = request.await?;
let data = response
.payload
.first()
.expect("Empty autoplay uri")
.to_vec();
let autoplay_uri = String::from_utf8(data).unwrap();
Ok(autoplay_uri)
} else {
warn!("No autoplay_uri found");
Err(MercuryError)
.expect("Empty payload on context uri");
let response: serde_json::Value = serde_json::from_slice(&data).unwrap();
Ok(response)
}
}))
}
fn resolve_uri(
&self,
uri: &str,
) -> Box<dyn Future<Item = serde_json::Value, Error = MercuryError>> {
let request = self.session.mercury().get(uri);
Box::new(request.and_then(move |response| {
let data = response
.payload
.first()
.expect("Empty payload on context uri");
let response: serde_json::Value = serde_json::from_slice(&data).unwrap();
Ok(response)
}))
.fuse(),
)
}
fn update_tracks_from_context(&mut self) {
@ -1345,7 +1309,6 @@ impl<'a> CommandSender<'a> {
self.frame.set_state(self.spirc.state.clone());
}
let send = self.spirc.sender.start_send(self.frame).unwrap();
assert!(send.is_ready());
self.spirc.sender.send(self.frame.write_to_bytes().unwrap());
}
}

View file

@ -102,46 +102,48 @@ impl MercuryManager {
MercurySender::new(self.clone(), uri.into())
}
pub async fn subscribe<T: Into<String>>(
pub fn subscribe<T: Into<String>>(
&self,
uri: T,
) -> Result<mpsc::UnboundedReceiver<MercuryResponse>, MercuryError> {
) -> impl Future<Output = Result<mpsc::UnboundedReceiver<MercuryResponse>, MercuryError>> + 'static
{
let uri = uri.into();
let response = self
.request(MercuryRequest {
method: MercuryMethod::SUB,
uri: uri.clone(),
content_type: None,
payload: Vec::new(),
})
.await?;
let (tx, rx) = mpsc::unbounded();
let manager = self.clone();
manager.lock(move |inner| {
if !inner.invalid {
debug!("subscribed uri={} count={}", uri, response.payload.len());
if !response.payload.is_empty() {
// Old subscription protocol, watch the provided list of URIs
for sub in response.payload {
let mut sub: protocol::pubsub::Subscription =
protobuf::parse_from_bytes(&sub).unwrap();
let sub_uri = sub.take_uri();
debug!("subscribed sub_uri={}", sub_uri);
inner.subscriptions.push((sub_uri, tx.clone()));
}
} else {
// New subscription protocol, watch the requested URI
inner.subscriptions.push((uri, tx));
}
}
let request = self.request(MercuryRequest {
method: MercuryMethod::SUB,
uri: uri.clone(),
content_type: None,
payload: Vec::new(),
});
Ok(rx)
let manager = self.clone();
async move {
let response = request.await?;
let (tx, rx) = mpsc::unbounded();
manager.lock(move |inner| {
if !inner.invalid {
debug!("subscribed uri={} count={}", uri, response.payload.len());
if !response.payload.is_empty() {
// Old subscription protocol, watch the provided list of URIs
for sub in response.payload {
let mut sub: protocol::pubsub::Subscription =
protobuf::parse_from_bytes(&sub).unwrap();
let sub_uri = sub.take_uri();
debug!("subscribed sub_uri={}", sub_uri);
inner.subscriptions.push((sub_uri, tx.clone()));
}
} else {
// New subscription protocol, watch the requested URI
inner.subscriptions.push((uri, tx));
}
}
});
Ok(rx)
}
}
pub(crate) fn dispatch(&self, cmd: u8, mut data: Bytes) {

View file

@ -1,5 +1,4 @@
use futures::Sink;
use std::{collections::VecDeque, pin::Pin, task::Context};
use std::collections::VecDeque;
use super::*;
@ -18,6 +17,22 @@ impl MercurySender {
pending: VecDeque::new(),
}
}
pub fn is_flushed(&self) -> bool {
self.pending.is_empty()
}
pub fn send(&mut self, item: Vec<u8>) {
let task = self.mercury.send(self.uri.clone(), item);
self.pending.push_back(task);
}
pub async fn flush(&mut self) -> Result<(), MercuryError> {
for fut in self.pending.drain(..) {
fut.await?;
}
Ok(())
}
}
impl Clone for MercurySender {
@ -29,39 +44,3 @@ impl Clone for MercurySender {
}
}
}
impl Sink<Vec<u8>> for MercurySender {
type Error = MercuryError;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_flush(cx)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {
match self.pending.front_mut() {
Some(task) => {
match Pin::new(task).poll(cx) {
Poll::Ready(Err(x)) => return Poll::Ready(Err(x)),
Poll::Pending => return Poll::Pending,
_ => (),
};
}
None => {
return Poll::Ready(Ok(()));
}
}
self.pending.pop_front();
}
}
fn start_send(mut self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
let task = self.mercury.send(self.uri.clone(), item);
self.pending.push_back(task);
Ok(())
}
}

View file

@ -39,6 +39,8 @@ struct SessionInternal {
mercury: OnceCell<MercuryManager>,
cache: Option<Arc<Cache>>,
handle: tokio::runtime::Handle,
session_id: usize,
}
@ -65,7 +67,13 @@ impl Session {
cache.save_credentials(&reusable_credentials);
}
let session = Session::create(conn, config, cache, reusable_credentials.username);
let session = Session::create(
conn,
config,
cache,
reusable_credentials.username,
tokio::runtime::Handle::current(),
);
Ok(session)
}
@ -75,6 +83,7 @@ impl Session {
config: SessionConfig,
cache: Option<Cache>,
username: String,
handle: tokio::runtime::Handle,
) -> Session {
let (sink, stream) = transport.split();
@ -100,6 +109,8 @@ impl Session {
channel: OnceCell::new(),
mercury: OnceCell::new(),
handle,
session_id: session_id,
}));
@ -139,7 +150,7 @@ impl Session {
T: Future + Send + 'static,
T::Output: Send + 'static,
{
tokio::spawn(task);
self.0.handle.spawn(task);
}
fn debug_info(&self) {

View file

@ -7,7 +7,6 @@ use crate::audio::{
use crate::audio_backend::Sink;
use crate::config::NormalisationType;
use crate::config::{Bitrate, PlayerConfig};
use crate::librespot_core::tokio;
use crate::metadata::{AudioItem, FileFormat};
use crate::mixer::AudioFilter;
use librespot_core::session::Session;
@ -15,25 +14,22 @@ use librespot_core::spotify_id::SpotifyId;
use librespot_core::util::SeqGenerator;
use byteorder::{LittleEndian, ReadBytesExt};
use futures::{
channel::{mpsc, oneshot},
future, Future, Stream, StreamExt,
};
use std::io::{Read, Seek, SeekFrom};
use std::mem;
use futures::channel::{mpsc, oneshot};
use futures::{future, Future, Stream, StreamExt, TryFutureExt};
use std::borrow::Cow;
use std::cmp::max;
use std::io::{self, Read, Seek, SeekFrom};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::{borrow::Cow, io};
use std::{
cmp::max,
pin::Pin,
task::{Context, Poll},
};
use std::{mem, thread};
const PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS: u32 = 30000;
pub struct Player {
commands: Option<mpsc::UnboundedSender<PlayerCommand>>,
task_handle: Option<tokio::task::JoinHandle<()>>,
thread_handle: Option<thread::JoinHandle<()>>,
play_request_id_generator: SeqGenerator<u64>,
}
@ -251,33 +247,33 @@ impl Player {
let (cmd_tx, cmd_rx) = mpsc::unbounded();
let (event_sender, event_receiver) = mpsc::unbounded();
debug!("new Player[{}]", session.session_id());
let handle = thread::spawn(move || {
debug!("new Player[{}]", session.session_id());
let internal = PlayerInternal {
session: session,
config: config,
commands: cmd_rx,
let internal = PlayerInternal {
session: session,
config: config,
commands: cmd_rx,
state: PlayerState::Stopped,
preload: PlayerPreload::None,
sink: sink_builder(),
sink_status: SinkStatus::Closed,
sink_event_callback: None,
audio_filter: audio_filter,
event_senders: [event_sender].to_vec(),
};
state: PlayerState::Stopped,
preload: PlayerPreload::None,
sink: sink_builder(),
sink_status: SinkStatus::Closed,
sink_event_callback: None,
audio_filter: audio_filter,
event_senders: [event_sender].to_vec(),
};
// While PlayerInternal is written as a future, it still contains blocking code.
// It must be run by using wait() in a dedicated thread.
let handle = tokio::spawn(async move {
internal.await;
// While PlayerInternal is written as a future, it still contains blocking code.
// It must be run by using wait() in a dedicated thread.
futures::executor::block_on(internal);
debug!("PlayerInternal thread finished.");
});
(
Player {
commands: Some(cmd_tx),
task_handle: Some(handle),
thread_handle: Some(handle),
play_request_id_generator: SeqGenerator::new(0),
},
event_receiver,
@ -351,13 +347,11 @@ impl Drop for Player {
fn drop(&mut self) {
debug!("Shutting down player thread ...");
self.commands = None;
if let Some(handle) = self.task_handle.take() {
tokio::spawn(async {
match handle.await {
Ok(_) => (),
Err(_) => error!("Player thread panicked!"),
}
});
if let Some(handle) = self.thread_handle.take() {
match handle.join() {
Ok(_) => (),
Err(_) => error!("Player thread panicked!"),
}
}
}
}
@ -436,15 +430,23 @@ impl PlayerState {
#[allow(dead_code)]
fn is_stopped(&self) -> bool {
matches!(self, Self::Stopped)
use self::PlayerState::*;
match *self {
Stopped => true,
_ => false,
}
}
fn is_loading(&self) -> bool {
matches!(self, Self::Loading { .. })
use self::PlayerState::*;
match *self {
Loading { .. } => true,
_ => false,
}
}
fn decoder(&mut self) -> Option<&mut Decoder> {
use PlayerState::*;
use self::PlayerState::*;
match *self {
Stopped | EndOfTrack { .. } | Loading { .. } => None,
Paused {
@ -1243,9 +1245,10 @@ impl PlayerInternal {
loaded_track
.stream_loader_controller
.set_random_access_mode();
let _ = tokio::task::block_in_place(|| {
loaded_track.decoder.seek(position_ms as i64)
});
let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking.
// But most likely the track is fully
// loaded already because we played
// to the end of it.
loaded_track.stream_loader_controller.set_stream_mode();
loaded_track.stream_position_pcm = Self::position_ms_to_pcm(position_ms);
}
@ -1278,7 +1281,7 @@ impl PlayerInternal {
// we can use the current decoder. Ensure it's at the correct position.
if Self::position_ms_to_pcm(position_ms) != *stream_position_pcm {
stream_loader_controller.set_random_access_mode();
let _ = tokio::task::block_in_place(|| decoder.seek(position_ms as i64));
let _ = decoder.seek(position_ms as i64); // This may be blocking.
stream_loader_controller.set_stream_mode();
*stream_position_pcm = Self::position_ms_to_pcm(position_ms);
}
@ -1346,9 +1349,7 @@ impl PlayerInternal {
loaded_track
.stream_loader_controller
.set_random_access_mode();
let _ = tokio::task::block_in_place(|| {
loaded_track.decoder.seek(position_ms as i64)
});
let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking
loaded_track.stream_loader_controller.set_stream_mode();
}
self.start_playback(track_id, play_request_id, *loaded_track, play);
@ -1563,7 +1564,7 @@ impl PlayerInternal {
}
}
pub fn load_track(
fn load_track(
&self,
spotify_id: SpotifyId,
position_ms: u32,
@ -1574,22 +1575,23 @@ impl PlayerInternal {
// easily. Instead we spawn a thread to do the work and return a one-shot channel as the
// future to work with.
let session = self.session.clone();
let config = self.config.clone();
let loader = PlayerTrackLoader {
session: self.session.clone(),
config: self.config.clone(),
};
async move {
let loader = PlayerTrackLoader { session, config };
let (result_tx, result_rx) = oneshot::channel();
let (result_tx, result_rx) = oneshot::channel();
tokio::spawn(async move {
if let Some(data) = loader.load_track(spotify_id, position_ms).await {
std::thread::spawn(move || {
futures::executor::block_on(loader.load_track(spotify_id, position_ms)).and_then(
move |data| {
let _ = result_tx.send(data);
}
});
Some(())
},
);
});
result_rx.await.map_err(|_| ())
}
result_rx.map_err(|_| ())
}
fn preload_data_before_playback(&mut self) {
@ -1615,9 +1617,7 @@ impl PlayerInternal {
* bytes_per_second as f64) as usize,
(READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize,
);
tokio::task::block_in_place(|| {
stream_loader_controller.fetch_next_blocking(wait_for_data_length)
});
stream_loader_controller.fetch_next_blocking(wait_for_data_length);
}
}
}

View file

@ -1,7 +1,7 @@
#![crate_name = "librespot"]
pub extern crate librespot_audio as audio;
// pub extern crate librespot_connect as connect;
pub extern crate librespot_connect as connect;
pub extern crate librespot_core as core;
pub extern crate librespot_metadata as metadata;
pub extern crate librespot_playback as playback;

600
src/main.rs Normal file
View file

@ -0,0 +1,600 @@
use futures::{channel::mpsc::UnboundedReceiver, future::FusedFuture, FutureExt, StreamExt};
use librespot_playback::player::PlayerEvent;
use log::{error, info, warn};
use sha1::{Digest, Sha1};
use std::path::Path;
use std::process::exit;
use std::str::FromStr;
use std::{env, time::Instant};
use std::{
io::{stderr, Write},
pin::Pin,
};
use url::Url;
use librespot::core::authentication::{get_credentials, Credentials};
use librespot::core::cache::Cache;
use librespot::core::config::{ConnectConfig, DeviceType, SessionConfig, VolumeCtrl};
use librespot::core::session::Session;
use librespot::core::version;
use librespot::connect::spirc::Spirc;
use librespot::playback::audio_backend::{self, Sink, BACKENDS};
use librespot::playback::config::{Bitrate, NormalisationType, PlayerConfig};
use librespot::playback::mixer::{self, Mixer, MixerConfig};
use librespot::playback::player::Player;
mod player_event_handler;
use player_event_handler::{emit_sink_event, run_program_on_events};
fn device_id(name: &str) -> String {
hex::encode(Sha1::digest(name.as_bytes()))
}
fn usage(program: &str, opts: &getopts::Options) -> String {
let brief = format!("Usage: {} [options]", program);
opts.usage(&brief)
}
fn setup_logging(verbose: bool) {
let mut builder = env_logger::Builder::new();
match env::var("RUST_LOG") {
Ok(config) => {
builder.parse_filters(&config);
builder.init();
if verbose {
warn!("`--verbose` flag overidden by `RUST_LOG` environment variable");
}
}
Err(_) => {
if verbose {
builder.parse_filters("libmdns=info,librespot=trace");
} else {
builder.parse_filters("libmdns=info,librespot=info");
}
builder.init();
}
}
}
fn list_backends() {
println!("Available Backends : ");
for (&(name, _), idx) in BACKENDS.iter().zip(0..) {
if idx == 0 {
println!("- {} (default)", name);
} else {
println!("- {}", name);
}
}
}
#[derive(Clone)]
struct Setup {
backend: fn(Option<String>) -> Box<dyn Sink + Send + 'static>,
device: Option<String>,
mixer: fn(Option<MixerConfig>) -> Box<dyn Mixer>,
cache: Option<Cache>,
player_config: PlayerConfig,
session_config: SessionConfig,
connect_config: ConnectConfig,
mixer_config: MixerConfig,
credentials: Option<Credentials>,
enable_discovery: bool,
zeroconf_port: u16,
player_event_program: Option<String>,
emit_sink_events: bool,
}
fn setup(args: &[String]) -> Setup {
let mut opts = getopts::Options::new();
opts.optopt(
"c",
"cache",
"Path to a directory where files will be cached.",
"CACHE",
).optopt(
"",
"system-cache",
"Path to a directory where system files (credentials, volume) will be cached. Can be different from cache option value",
"SYTEMCACHE",
).optflag("", "disable-audio-cache", "Disable caching of the audio data.")
.reqopt("n", "name", "Device name", "NAME")
.optopt("", "device-type", "Displayed device type", "DEVICE_TYPE")
.optopt(
"b",
"bitrate",
"Bitrate (96, 160 or 320). Defaults to 160",
"BITRATE",
)
.optopt(
"",
"onevent",
"Run PROGRAM when playback is about to begin.",
"PROGRAM",
)
.optflag("", "emit-sink-events", "Run program set by --onevent before sink is opened and after it is closed.")
.optflag("v", "verbose", "Enable verbose output")
.optopt("u", "username", "Username to sign in with", "USERNAME")
.optopt("p", "password", "Password", "PASSWORD")
.optopt("", "proxy", "HTTP proxy to use when connecting", "PROXY")
.optopt("", "ap-port", "Connect to AP with specified port. If no AP with that port are present fallback AP will be used. Available ports are usually 80, 443 and 4070", "AP_PORT")
.optflag("", "disable-discovery", "Disable discovery mode")
.optopt(
"",
"backend",
"Audio backend to use. Use '?' to list options",
"BACKEND",
)
.optopt(
"",
"device",
"Audio device to use. Use '?' to list options if using portaudio or alsa",
"DEVICE",
)
.optopt("", "mixer", "Mixer to use (alsa or softvol)", "MIXER")
.optopt(
"m",
"mixer-name",
"Alsa mixer name, e.g \"PCM\" or \"Master\". Defaults to 'PCM'",
"MIXER_NAME",
)
.optopt(
"",
"mixer-card",
"Alsa mixer card, e.g \"hw:0\" or similar from `aplay -l`. Defaults to 'default' ",
"MIXER_CARD",
)
.optopt(
"",
"mixer-index",
"Alsa mixer index, Index of the cards mixer. Defaults to 0",
"MIXER_INDEX",
)
.optflag(
"",
"mixer-linear-volume",
"Disable alsa's mapped volume scale (cubic). Default false",
)
.optopt(
"",
"initial-volume",
"Initial volume in %, once connected (must be from 0 to 100)",
"VOLUME",
)
.optopt(
"",
"zeroconf-port",
"The port the internal server advertised over zeroconf uses.",
"ZEROCONF_PORT",
)
.optflag(
"",
"enable-volume-normalisation",
"Play all tracks at the same volume",
)
.optopt(
"",
"normalisation-gain-type",
"Specify the normalisation gain type to use - [track, album]. Default is album.",
"GAIN_TYPE",
)
.optopt(
"",
"normalisation-pregain",
"Pregain (dB) applied by volume normalisation",
"PREGAIN",
)
.optopt(
"",
"volume-ctrl",
"Volume control type - [linear, log, fixed]. Default is logarithmic",
"VOLUME_CTRL"
)
.optflag(
"",
"autoplay",
"autoplay similar songs when your music ends.",
)
.optflag(
"",
"disable-gapless",
"disable gapless playback.",
);
let matches = match opts.parse(&args[1..]) {
Ok(m) => m,
Err(f) => {
writeln!(
stderr(),
"error: {}\n{}",
f.to_string(),
usage(&args[0], &opts)
)
.unwrap();
exit(1);
}
};
let verbose = matches.opt_present("verbose");
setup_logging(verbose);
info!(
"librespot {} ({}). Built on {}. Build ID: {}",
version::short_sha(),
version::commit_date(),
version::short_now(),
version::build_id()
);
let backend_name = matches.opt_str("backend");
if backend_name == Some("?".into()) {
list_backends();
exit(0);
}
let backend = audio_backend::find(backend_name).expect("Invalid backend");
let device = matches.opt_str("device");
if device == Some("?".into()) {
backend(device);
exit(0);
}
let mixer_name = matches.opt_str("mixer");
let mixer = mixer::find(mixer_name.as_ref()).expect("Invalid mixer");
let mixer_config = MixerConfig {
card: matches
.opt_str("mixer-card")
.unwrap_or_else(|| String::from("default")),
mixer: matches
.opt_str("mixer-name")
.unwrap_or_else(|| String::from("PCM")),
index: matches
.opt_str("mixer-index")
.map(|index| index.parse::<u32>().unwrap())
.unwrap_or(0),
mapped_volume: !matches.opt_present("mixer-linear-volume"),
};
let cache = {
let audio_dir;
let system_dir;
if matches.opt_present("disable-audio-cache") {
audio_dir = None;
system_dir = matches
.opt_str("system-cache")
.or_else(|| matches.opt_str("c"))
.map(|p| p.into());
} else {
let cache_dir = matches.opt_str("c");
audio_dir = cache_dir
.as_ref()
.map(|p| AsRef::<Path>::as_ref(p).join("files"));
system_dir = matches
.opt_str("system-cache")
.or(cache_dir)
.map(|p| p.into());
}
match Cache::new(system_dir, audio_dir) {
Ok(cache) => Some(cache),
Err(e) => {
warn!("Cannot create cache: {}", e);
None
}
}
};
let initial_volume = matches
.opt_str("initial-volume")
.map(|volume| {
let volume = volume.parse::<u16>().unwrap();
if volume > 100 {
panic!("Initial volume must be in the range 0-100");
}
(volume as i32 * 0xFFFF / 100) as u16
})
.or_else(|| cache.as_ref().and_then(Cache::volume))
.unwrap_or(0x8000);
let zeroconf_port = matches
.opt_str("zeroconf-port")
.map(|port| port.parse::<u16>().unwrap())
.unwrap_or(0);
let name = matches.opt_str("name").unwrap();
let credentials = {
let cached_credentials = cache.as_ref().and_then(Cache::credentials);
let password = |username: &String| -> String {
write!(stderr(), "Password for {}: ", username).unwrap();
stderr().flush().unwrap();
rpassword::read_password().unwrap()
};
get_credentials(
matches.opt_str("username"),
matches.opt_str("password"),
cached_credentials,
password,
)
};
let session_config = {
let device_id = device_id(&name);
SessionConfig {
user_agent: version::version_string(),
device_id: device_id,
proxy: matches.opt_str("proxy").or(std::env::var("http_proxy").ok()).map(
|s| {
match Url::parse(&s) {
Ok(url) => {
if url.host().is_none() || url.port_or_known_default().is_none() {
panic!("Invalid proxy url, only urls on the format \"http://host:port\" are allowed");
}
if url.scheme() != "http" {
panic!("Only unsecure http:// proxies are supported");
}
url
},
Err(err) => panic!("Invalid proxy url: {}, only urls on the format \"http://host:port\" are allowed", err)
}
},
),
ap_port: matches
.opt_str("ap-port")
.map(|port| port.parse::<u16>().expect("Invalid port")),
}
};
let player_config = {
let bitrate = matches
.opt_str("b")
.as_ref()
.map(|bitrate| Bitrate::from_str(bitrate).expect("Invalid bitrate"))
.unwrap_or(Bitrate::default());
let gain_type = matches
.opt_str("normalisation-gain-type")
.as_ref()
.map(|gain_type| {
NormalisationType::from_str(gain_type).expect("Invalid normalisation type")
})
.unwrap_or(NormalisationType::default());
PlayerConfig {
bitrate: bitrate,
gapless: !matches.opt_present("disable-gapless"),
normalisation: matches.opt_present("enable-volume-normalisation"),
normalisation_type: gain_type,
normalisation_pregain: matches
.opt_str("normalisation-pregain")
.map(|pregain| pregain.parse::<f32>().expect("Invalid pregain float value"))
.unwrap_or(PlayerConfig::default().normalisation_pregain),
}
};
let connect_config = {
let device_type = matches
.opt_str("device-type")
.as_ref()
.map(|device_type| DeviceType::from_str(device_type).expect("Invalid device type"))
.unwrap_or(DeviceType::default());
let volume_ctrl = matches
.opt_str("volume-ctrl")
.as_ref()
.map(|volume_ctrl| VolumeCtrl::from_str(volume_ctrl).expect("Invalid volume ctrl type"))
.unwrap_or(VolumeCtrl::default());
ConnectConfig {
name: name,
device_type: device_type,
volume: initial_volume,
volume_ctrl: volume_ctrl,
autoplay: matches.opt_present("autoplay"),
}
};
let enable_discovery = !matches.opt_present("disable-discovery");
Setup {
backend: backend,
cache: cache,
session_config: session_config,
player_config: player_config,
connect_config: connect_config,
credentials: credentials,
device: device,
enable_discovery: enable_discovery,
zeroconf_port: zeroconf_port,
mixer: mixer,
mixer_config: mixer_config,
player_event_program: matches.opt_str("onevent"),
emit_sink_events: matches.opt_present("emit-sink-events"),
}
}
#[tokio::main]
async fn main() {
if env::var("RUST_BACKTRACE").is_err() {
env::set_var("RUST_BACKTRACE", "full")
}
let args: Vec<String> = std::env::args().collect();
let setupp = setup(&args);
let mut last_credentials = None;
let mut spirc: Option<Spirc> = None;
let mut spirc_task: Option<Pin<_>> = None;
let mut player_event_channel: Option<UnboundedReceiver<PlayerEvent>> = None;
let mut auto_connect_times: Vec<Instant> = vec![];
let mut discovery = None;
let mut connecting: Pin<Box<dyn FusedFuture<Output = _>>> =
Box::pin(futures::future::pending());
if setupp.enable_discovery {
let config = setupp.connect_config.clone();
let device_id = setupp.session_config.device_id.clone();
discovery = Some(
librespot_connect::discovery::discovery(config, device_id, setupp.zeroconf_port)
.unwrap(),
);
}
if let Some(credentials) = setupp.credentials {
last_credentials = Some(credentials.clone());
connecting = Box::pin(
Session::connect(
setupp.session_config.clone(),
credentials,
setupp.cache.clone(),
)
.fuse(),
);
}
loop {
tokio::select! {
credentials = async { discovery.as_mut().unwrap().next().await }, if discovery.is_some() => {
match credentials {
Some(credentials) => {
last_credentials = Some(credentials.clone());
auto_connect_times.clear();
if let Some(spirc) = spirc.take() {
spirc.shutdown();
}
if let Some(spirc_task) = spirc_task.take() {
// Continue shutdown in its own task
tokio::spawn(spirc_task);
}
connecting = Box::pin(Session::connect(
setupp.session_config.clone(),
credentials,
setupp.cache.clone(),
).fuse());
},
None => {
warn!("Discovery stopped!");
discovery = None;
}
}
},
session = &mut connecting, if !connecting.is_terminated() => match session {
Ok(session) => {
let mixer_config = setupp.mixer_config.clone();
let mixer = (setupp.mixer)(Some(mixer_config));
let player_config = setupp.player_config.clone();
let connect_config = setupp.connect_config.clone();
let audio_filter = mixer.get_audio_filter();
let backend = setupp.backend;
let device = setupp.device.clone();
let (player, event_channel) =
Player::new(player_config, session.clone(), audio_filter, move || {
(backend)(device)
});
if setupp.emit_sink_events {
if let Some(player_event_program) = setupp.player_event_program.clone() {
player.set_sink_event_callback(Some(Box::new(move |sink_status| {
match emit_sink_event(sink_status, &player_event_program) {
Ok(e) if e.success() => (),
Ok(e) => {
if let Some(code) = e.code() {
warn!("Sink event prog returned exit code {}", code);
} else {
warn!("Sink event prog returned failure");
}
}
Err(e) => {
warn!("Emitting sink event failed: {}", e);
}
}
})));
}
};
let (spirc_, spirc_task_) = Spirc::new(connect_config, session, player, mixer);
spirc = Some(spirc_);
spirc_task = Some(Box::pin(spirc_task_));
player_event_channel = Some(event_channel);
},
Err(e) => {
warn!("Connection failed: {}", e);
}
},
_ = async { spirc_task.as_mut().unwrap().await }, if spirc_task.is_some() => {
spirc_task = None;
warn!("Spirc shut down unexpectedly");
while !auto_connect_times.is_empty()
&& ((Instant::now() - auto_connect_times[0]).as_secs() > 600)
{
let _ = auto_connect_times.remove(0);
}
if let Some(credentials) = last_credentials.clone() {
if auto_connect_times.len() >= 5 {
warn!("Spirc shut down too often. Not reconnecting automatically.");
} else {
auto_connect_times.push(Instant::now());
connecting = Box::pin(Session::connect(
setupp.session_config.clone(),
credentials,
setupp.cache.clone(),
).fuse());
}
}
},
event = async { player_event_channel.as_mut().unwrap().next().await }, if player_event_channel.is_some() => match event {
Some(event) => {
if let Some(program) = &setupp.player_event_program {
if let Some(child) = run_program_on_events(event, program) {
let mut child = child.expect("program failed to start");
tokio::spawn(async move {
match child.wait().await {
Ok(status) if !status.success() => error!("child exited with status {:?}", status.code()),
Err(e) => error!("failed to wait on child process: {}", e),
_ => {}
}
});
}
}
},
None => {
player_event_channel = None;
}
},
_ = tokio::signal::ctrl_c() => {
break;
}
}
}
info!("Gracefully shutting down");
// Shutdown spirc if necessary
if let Some(spirc) = spirc {
spirc.shutdown();
if let Some(mut spirc_task) = spirc_task {
tokio::select! {
_ = tokio::signal::ctrl_c() => (),
_ = spirc_task.as_mut() => ()
}
}
}
}

View file

@ -2,22 +2,12 @@ use librespot::playback::player::PlayerEvent;
use log::info;
use std::collections::HashMap;
use std::io;
use std::process::Command;
use tokio_process::{Child, CommandExt};
use std::process::{Command, ExitStatus};
use futures::Future;
use librespot::playback::player::SinkStatus;
use tokio::process::{Child as AsyncChild, Command as AsyncCommand};
fn run_program(program: &str, env_vars: HashMap<&str, String>) -> io::Result<Child> {
let mut v: Vec<&str> = program.split_whitespace().collect();
info!("Running {:?} with environment variables {:?}", v, env_vars);
Command::new(&v.remove(0))
.args(&v)
.envs(env_vars.iter())
.spawn_async()
}
pub fn run_program_on_events(event: PlayerEvent, onevent: &str) -> Option<io::Result<Child>> {
pub fn run_program_on_events(event: PlayerEvent, onevent: &str) -> Option<io::Result<AsyncChild>> {
let mut env_vars = HashMap::new();
match event {
PlayerEvent::Changed {
@ -68,10 +58,18 @@ pub fn run_program_on_events(event: PlayerEvent, onevent: &str) -> Option<io::Re
}
_ => return None,
}
Some(run_program(onevent, env_vars))
let mut v: Vec<&str> = onevent.split_whitespace().collect();
info!("Running {:?} with environment variables {:?}", v, env_vars);
Some(
AsyncCommand::new(&v.remove(0))
.args(&v)
.envs(env_vars.iter())
.spawn(),
)
}
pub fn emit_sink_event(sink_status: SinkStatus, onevent: &str) {
pub fn emit_sink_event(sink_status: SinkStatus, onevent: &str) -> io::Result<ExitStatus> {
let mut env_vars = HashMap::new();
env_vars.insert("PLAYER_EVENT", "sink".to_string());
let sink_status = match sink_status {
@ -80,6 +78,12 @@ pub fn emit_sink_event(sink_status: SinkStatus, onevent: &str) {
SinkStatus::Closed => "closed",
};
env_vars.insert("SINK_STATUS", sink_status.to_string());
let mut v: Vec<&str> = onevent.split_whitespace().collect();
info!("Running {:?} with environment variables {:?}", v, env_vars);
let _ = run_program(onevent, env_vars).and_then(|child| child.wait());
Command::new(&v.remove(0))
.args(&v)
.envs(env_vars.iter())
.spawn()?
.wait()
}