From 3d298768b3d1933443c115db1d2bdf836418c0fb Mon Sep 17 00:00:00 2001 From: Sean McNamara Date: Sat, 21 May 2022 14:55:55 -0400 Subject: [PATCH] Backport #964 GStreamer backend cleanup (#979) --- CHANGELOG.md | 3 +- Cargo.lock | 342 ++++++++++++++++++++---- playback/Cargo.toml | 10 +- playback/src/audio_backend/gstreamer.rs | 238 ++++++++++------- 4 files changed, 445 insertions(+), 148 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1e37fae3..13e7594d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,7 +18,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - [playback] More robust dynamic limiter for very wide dynamic range (breaking) - [core] `Session`: `connect()` now returns the long-term credentials. - [core] `Session`: `connect()` now accespt a flag if the credentails should be stored via the cache. -- [build] The MSRV is now 1.53. +- [chore] The MSRV is now 1.53. +- [playback] `gstreamer`: create own context, set correct states and use sync handler ### Added - [cache] Add `disable-credential-cache` flag (breaking). diff --git a/Cargo.lock b/Cargo.lock index 07f1e23d..10738efd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,21 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9ecd88a8c8378ca913a680cd98f0f13ac67383d35993f86c90a70e3f137816b" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "aes" version = "0.6.0" @@ -82,6 +97,12 @@ version = "1.0.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61604a8f862e1d5c3229fdd78f8b02c68dcf73a4c4b05fd636d12240aaa242c1" +[[package]] +name = "array-init" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6945cc5422176fc5e602e590c2878d2c2acd9a4fe20a4baa7c28022521698ec6" + [[package]] name = "async-trait" version = "0.1.51" @@ -110,6 +131,21 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "backtrace" +version = "0.3.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e121dee8023ce33ab248d9ce1493df03c3b38a659b240096fcbd7048ff9c31f" +dependencies = [ + "addr2line", + "cc", + "cfg-if 1.0.0", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "base64" version = "0.13.0" @@ -192,6 +228,15 @@ dependencies = [ "nom", ] +[[package]] +name = "cfg-expr" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b412e83326147c2bb881f8b40edfbf9905b9b8abaebd0e47ca190ba62fda8f0e" +dependencies = [ + "smallvec", +] + [[package]] name = "cfg-if" version = "0.1.10" @@ -421,6 +466,12 @@ dependencies = [ "termcolor", ] +[[package]] +name = "fixedbitset" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37ab347416e802de484e4d03c7316c48f1ecb56574dfd4a46a80f173ce1de04d" + [[package]] name = "fnv" version = "1.0.7" @@ -561,6 +612,12 @@ dependencies = [ "wasi", ] +[[package]] +name = "gimli" +version = "0.26.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4" + [[package]] name = "glib" version = "0.10.3" @@ -573,13 +630,32 @@ dependencies = [ "futures-executor", "futures-task", "futures-util", - "glib-macros", - "glib-sys", - "gobject-sys", + "glib-macros 0.10.1", + "glib-sys 0.10.1", + "gobject-sys 0.10.0", "libc", "once_cell", ] +[[package]] +name = "glib" +version = "0.14.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c515f1e62bf151ef6635f528d05b02c11506de986e43b34a5c920ef0b3796a4" +dependencies = [ + "bitflags", + "futures-channel", + "futures-core", + "futures-executor", + "futures-task", + "glib-macros 0.14.1", + "glib-sys 0.14.0", + "gobject-sys 0.14.0", + "libc", + "once_cell", + "smallvec", +] + [[package]] name = "glib-macros" version = "0.10.1" @@ -588,7 +664,7 @@ checksum = "41486a26d1366a8032b160b59065a59fb528530a46a49f627e7048fb8c064039" dependencies = [ "anyhow", "heck", - "itertools", + "itertools 0.9.0", "proc-macro-crate 0.1.5", "proc-macro-error", "proc-macro2", @@ -596,6 +672,21 @@ dependencies = [ "syn", ] +[[package]] +name = "glib-macros" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2aad66361f66796bfc73f530c51ef123970eb895ffba991a234fcf7bea89e518" +dependencies = [ + "anyhow", + "heck", + "proc-macro-crate 1.1.0", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "glib-sys" version = "0.10.1" @@ -603,7 +694,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7e9b997a66e9a23d073f2b1abb4dbfc3925e0b8952f67efd8d9b6e168e4cdc1" dependencies = [ "libc", - "system-deps", + "system-deps 1.3.2", +] + +[[package]] +name = "glib-sys" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c1d60554a212445e2a858e42a0e48cece1bd57b311a19a9468f70376cf554ae" +dependencies = [ + "libc", + "system-deps 3.2.0", ] [[package]] @@ -618,28 +719,38 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "952133b60c318a62bf82ee75b93acc7e84028a093e06b9e27981c2b6fe68218c" dependencies = [ - "glib-sys", + "glib-sys 0.10.1", "libc", - "system-deps", + "system-deps 1.3.2", +] + +[[package]] +name = "gobject-sys" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa92cae29759dae34ab5921d73fff5ad54b3d794ab842c117e36cafc7994c3f5" +dependencies = [ + "glib-sys 0.14.0", + "libc", + "system-deps 3.2.0", ] [[package]] name = "gstreamer" -version = "0.16.7" +version = "0.17.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ff5d0f7ff308ae37e6eb47b6ded17785bdea06e438a708cd09e0288c1862f33" +checksum = "c6a255f142048ba2c4a4dce39106db1965abe355d23f4b5335edea43a553faa4" dependencies = [ "bitflags", "cfg-if 1.0.0", "futures-channel", "futures-core", "futures-util", - "glib", - "glib-sys", - "gobject-sys", + "glib 0.14.8", "gstreamer-sys", "libc", "muldiv", + "num-integer", "num-rational", "once_cell", "paste", @@ -649,76 +760,102 @@ dependencies = [ [[package]] name = "gstreamer-app" -version = "0.16.5" +version = "0.17.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc80888271338c3ede875d8cafc452eb207476ff5539dcbe0018a8f5b827af0e" +checksum = "f73b8d33b1bbe9f22d0cf56661a1d2a2c9a0e099ea10e5f1f347be5038f5c043" dependencies = [ "bitflags", "futures-core", "futures-sink", - "glib", - "glib-sys", - "gobject-sys", + "glib 0.14.8", "gstreamer", "gstreamer-app-sys", "gstreamer-base", - "gstreamer-sys", "libc", "once_cell", ] [[package]] name = "gstreamer-app-sys" -version = "0.9.1" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "813f64275c9e7b33b828b9efcf9dfa64b95996766d4de996e84363ac65b87e3d" +checksum = "41865cfb8a5ddfa1161734a0d068dcd4689da852be0910b40484206408cfeafa" dependencies = [ - "glib-sys", + "glib-sys 0.14.0", "gstreamer-base-sys", "gstreamer-sys", "libc", - "system-deps", + "system-deps 3.2.0", +] + +[[package]] +name = "gstreamer-audio" +version = "0.17.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "420b6bcb1759231f01172751da094e7afa5cd9edf40bee7475f5bc86df433c57" +dependencies = [ + "array-init", + "bitflags", + "cfg-if 1.0.0", + "glib 0.14.8", + "gstreamer", + "gstreamer-audio-sys", + "gstreamer-base", + "libc", + "once_cell", +] + +[[package]] +name = "gstreamer-audio-sys" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d066ddfd05f63836f35ac4a5830d5bb2f7f3d6c33c870e9b15c667d20f65d7f6" +dependencies = [ + "glib-sys 0.14.0", + "gobject-sys 0.14.0", + "gstreamer-base-sys", + "gstreamer-sys", + "libc", + "system-deps 3.2.0", ] [[package]] name = "gstreamer-base" -version = "0.16.5" +version = "0.17.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bafd01c56f59cb10f4b5a10f97bb4bdf8c2b2784ae5b04da7e2d400cf6e6afcf" +checksum = "2c0c1d8c62eb5d08fb80173609f2eea71d385393363146e4e78107facbd67715" dependencies = [ "bitflags", - "glib", - "glib-sys", - "gobject-sys", + "cfg-if 1.0.0", + "glib 0.14.8", "gstreamer", "gstreamer-base-sys", - "gstreamer-sys", "libc", ] [[package]] name = "gstreamer-base-sys" -version = "0.9.1" +version = "0.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4b7b6dc2d6e160a1ae28612f602bd500b3fa474ce90bf6bb2f08072682beef5" +checksum = "28169a7b58edb93ad8ac766f0fa12dcd36a2af4257a97ee10194c7103baf3e27" dependencies = [ - "glib-sys", - "gobject-sys", + "glib-sys 0.14.0", + "gobject-sys 0.14.0", "gstreamer-sys", "libc", - "system-deps", + "system-deps 3.2.0", ] [[package]] name = "gstreamer-sys" -version = "0.9.1" +version = "0.17.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc1f154082d01af5718c5f8a8eb4f565a4ea5586ad8833a8fc2c2aa6844b601d" +checksum = "a81704feeb3e8599913bdd1e738455c2991a01ff4a1780cb62200993e454cc3e" dependencies = [ - "glib-sys", - "gobject-sys", + "glib-sys 0.14.0", + "gobject-sys 0.14.0", "libc", - "system-deps", + "system-deps 3.2.0", ] [[package]] @@ -941,6 +1078,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9a9d19fa1e79b6215ff29b9d6880b706147f16e9b1dbb1e4e5947b5b02bc5e3" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "0.4.8" @@ -1273,9 +1419,10 @@ dependencies = [ "cpal", "futures-executor", "futures-util", - "glib", + "glib 0.10.3", "gstreamer", "gstreamer-app", + "gstreamer-audio", "jack", "lewton", "libpulse-binding", @@ -1285,6 +1432,7 @@ dependencies = [ "librespot-metadata", "log", "ogg", + "parking_lot", "portaudio-rs", "rand", "rand_distr", @@ -1356,6 +1504,16 @@ version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" +[[package]] +name = "miniz_oxide" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b" +dependencies = [ + "adler", + "autocfg", +] + [[package]] name = "mio" version = "0.7.14" @@ -1380,9 +1538,9 @@ dependencies = [ [[package]] name = "muldiv" -version = "0.2.1" +version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0419348c027fa7be448d2ae7ea0e4e04c2334c31dc4e74ab29f00a2a7ca69204" +checksum = "b5136edda114182728ccdedb9f5eda882781f35fa6e80cc360af12a8932507f3" [[package]] name = "multimap" @@ -1531,9 +1689,9 @@ dependencies = [ [[package]] name = "num-rational" -version = "0.3.2" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12ac428b1cb17fce6f731001d307d351ec70a6d202fc2e60f7d4c5e42d8f4f07" +checksum = "d41702bd167c2df5520b384281bc111a4b5efcf7fbc4c9c222c815b07e0a6a6a" dependencies = [ "autocfg", "num-integer", @@ -1582,6 +1740,15 @@ dependencies = [ "syn", ] +[[package]] +name = "object" +version = "0.27.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67ac1d3f9a1d3616fd9a60c8d74296f22406a238b6a72f5cc1e6f314df4ffbf9" +dependencies = [ + "memchr", +] + [[package]] name = "oboe" version = "0.4.4" @@ -1643,11 +1810,14 @@ version = "0.8.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d76e8e1493bcac0d2766c42737f34458f1c8c50c0d23bcb24ea953affb273216" dependencies = [ + "backtrace", "cfg-if 1.0.0", "instant", "libc", + "petgraph", "redox_syscall", "smallvec", + "thread-id", "winapi", ] @@ -1679,6 +1849,16 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +[[package]] +name = "petgraph" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "467d164a6de56270bd7c4d070df81d07beace25012d5103ced4e9ff08d6afdb7" +dependencies = [ + "fixedbitset", + "indexmap", +] + [[package]] name = "pin-project-lite" version = "0.2.7" @@ -1942,6 +2122,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "rustc-demangle" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ef03e0a2b150c7a90d01faf6254c9c48a41e95fb2a8c2ac1c6f0d2b9aefc342" + [[package]] name = "rustc-hash" version = "1.1.0" @@ -1998,7 +2184,7 @@ checksum = "41a29aa21f175b5a41a6e26da572d5e5d1ee5660d35f9f9d0913e8a802098f74" dependencies = [ "cfg-if 0.1.10", "libc", - "version-compare", + "version-compare 0.0.10", ] [[package]] @@ -2102,9 +2288,9 @@ checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5" [[package]] name = "smallvec" -version = "1.7.0" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309" +checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" [[package]] name = "socket2" @@ -2134,6 +2320,12 @@ version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "57bd81eb48f4c437cadc685403cad539345bf703d78e63707418431cecd4522b" +[[package]] +name = "strum" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aaf86bbcfd1fa9670b7a129f64fc0c9fcbbfe4f1bc4210e9e98fe71ffc12cde2" + [[package]] name = "strum_macros" version = "0.18.0" @@ -2146,6 +2338,18 @@ dependencies = [ "syn", ] +[[package]] +name = "strum_macros" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d06aaeeee809dbc59eb4556183dd927df67db1540de5be8d3ec0b6636358a5ec" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "subtle" version = "2.4.1" @@ -2183,11 +2387,29 @@ checksum = "0f3ecc17269a19353b3558b313bba738b25d82993e30d62a18406a24aba4649b" dependencies = [ "heck", "pkg-config", - "strum", - "strum_macros", + "strum 0.18.0", + "strum_macros 0.18.0", "thiserror", "toml", - "version-compare", + "version-compare 0.0.10", +] + +[[package]] +name = "system-deps" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "480c269f870722b3b08d2f13053ce0c2ab722839f472863c3e2d61ff3a1c2fa6" +dependencies = [ + "anyhow", + "cfg-expr", + "heck", + "itertools 0.10.3", + "pkg-config", + "strum 0.21.0", + "strum_macros 0.21.1", + "thiserror", + "toml", + "version-compare 0.0.11", ] [[package]] @@ -2233,6 +2455,17 @@ dependencies = [ "syn", ] +[[package]] +name = "thread-id" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fdfe0627923f7411a43ec9ec9c39c3a9b4151be313e0922042581fb6c9b717f" +dependencies = [ + "libc", + "redox_syscall", + "winapi", +] + [[package]] name = "time" version = "0.1.43" @@ -2271,6 +2504,7 @@ dependencies = [ "mio", "num_cpus", "once_cell", + "parking_lot", "pin-project-lite", "signal-hook-registry", "tokio-macros", @@ -2377,9 +2611,9 @@ dependencies = [ [[package]] name = "unicode-segmentation" -version = "1.8.0" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8895849a949e7845e06bd6dc1aa51731a103c42707010a5b591c0038fb73385b" +checksum = "7e8820f5d777f6224dc4be3632222971ac30164d4a258d595640799554ebfd99" [[package]] name = "unicode-width" @@ -2431,6 +2665,12 @@ version = "0.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d63556a25bae6ea31b52e640d7c41d1ab27faba4ccb600013837a3d0b3994ca1" +[[package]] +name = "version-compare" +version = "0.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c18c859eead79d8b95d09e4678566e8d70105c4e7b251f707a03df32442661b" + [[package]] name = "version_check" version = "0.9.3" diff --git a/playback/Cargo.toml b/playback/Cargo.toml index 4e8d19c6..7126e3da 100644 --- a/playback/Cargo.toml +++ b/playback/Cargo.toml @@ -21,9 +21,10 @@ version = "0.3.1" futures-executor = "0.3" futures-util = { version = "0.3", default_features = false, features = ["alloc"] } log = "0.4" +parking_lot = { version = "0.11", features = ["deadlock_detection"] } byteorder = "1.4" shell-words = "1.0.0" -tokio = { version = "1", features = ["sync"] } +tokio = { version = "1", features = ["sync", "parking_lot"] } zerocopy = { version = "0.3" } thiserror = { version = "1" } @@ -34,8 +35,9 @@ libpulse-binding = { version = "2", optional = true, default-features = f libpulse-simple-binding = { version = "2", optional = true, default-features = false } jack = { version = "0.7", optional = true } sdl2 = { version = "0.34.3", optional = true } -gstreamer = { version = "0.16", optional = true } -gstreamer-app = { version = "0.16", optional = true } +gstreamer = { version = "0.17", optional = true } +gstreamer-app = { version = "0.17", optional = true } +gstreamer-audio = { version = "0.17", optional = true } glib = { version = "0.10", optional = true } # Rodio dependencies @@ -58,4 +60,4 @@ jackaudio-backend = ["jack"] rodio-backend = ["rodio", "cpal"] rodiojack-backend = ["rodio", "cpal/jack"] sdl-backend = ["sdl2"] -gstreamer-backend = ["gstreamer", "gstreamer-app", "glib"] +gstreamer-backend = ["gstreamer", "gstreamer-app", "gstreamer-audio", "glib"] diff --git a/playback/src/audio_backend/gstreamer.rs b/playback/src/audio_backend/gstreamer.rs index 8b957577..0a98846e 100644 --- a/playback/src/audio_backend/gstreamer.rs +++ b/playback/src/audio_backend/gstreamer.rs @@ -1,23 +1,28 @@ -use super::{Open, Sink, SinkAsBytes, SinkResult}; -use crate::config::AudioFormat; -use crate::convert::Converter; -use crate::decoder::AudioPacket; -use crate::{NUM_CHANNELS, SAMPLE_RATE}; +use gstreamer::{ + event::{FlushStart, FlushStop}, + prelude::*, + State, +}; use gstreamer as gst; use gstreamer_app as gst_app; +use gstreamer_audio as gst_audio; -use gst::prelude::*; -use zerocopy::AsBytes; +use parking_lot::Mutex; +use std::sync::Arc; -use std::sync::mpsc::{sync_channel, SyncSender}; -use std::thread; +use super::{Open, Sink, SinkAsBytes, SinkError, SinkResult}; + +use crate::{ + config::AudioFormat, convert::Converter, decoder::AudioPacket, NUM_CHANNELS, SAMPLE_RATE, +}; -#[allow(dead_code)] pub struct GstreamerSink { - tx: SyncSender>, + appsrc: gst_app::AppSrc, + bufferpool: gst::BufferPool, pipeline: gst::Pipeline, format: AudioFormat, + async_error: Arc>>, } impl Open for GstreamerSink { @@ -25,117 +30,166 @@ impl Open for GstreamerSink { info!("Using GStreamer sink with format: {:?}", format); gst::init().expect("failed to init GStreamer!"); - // GStreamer calls S24 and S24_3 different from the rest of the world let gst_format = match format { - AudioFormat::S24 => "S24_32".to_string(), - AudioFormat::S24_3 => "S24".to_string(), - _ => format!("{:?}", format), + AudioFormat::F64 => gst_audio::AUDIO_FORMAT_F64, + AudioFormat::F32 => gst_audio::AUDIO_FORMAT_F32, + AudioFormat::S32 => gst_audio::AUDIO_FORMAT_S32, + AudioFormat::S24 => gst_audio::AUDIO_FORMAT_S2432, + AudioFormat::S24_3 => gst_audio::AUDIO_FORMAT_S24, + AudioFormat::S16 => gst_audio::AUDIO_FORMAT_S16, }; + + let gst_info = gst_audio::AudioInfo::builder(gst_format, SAMPLE_RATE, NUM_CHANNELS as u32) + .build() + .expect("Failed to create GStreamer audio format"); + let gst_caps = gst_info.to_caps().expect("Failed to create GStreamer caps"); + let sample_size = format.size(); - let gst_bytes = 2048 * sample_size; + let gst_bytes = NUM_CHANNELS as usize * 2048 * sample_size; - #[cfg(target_endian = "little")] - const ENDIANNESS: &str = "LE"; - #[cfg(target_endian = "big")] - const ENDIANNESS: &str = "BE"; - - let pipeline_str_preamble = format!( - "appsrc caps=\"audio/x-raw,format={}{},layout=interleaved,channels={},rate={}\" block=true max-bytes={} name=appsrc0 ", - gst_format, ENDIANNESS, NUM_CHANNELS, SAMPLE_RATE, gst_bytes - ); - // no need to dither twice; use librespot dithering instead - let pipeline_str_rest = r#" ! audioconvert dithering=none ! autoaudiosink"#; - let pipeline_str: String = match device { - Some(x) => format!("{}{}", pipeline_str_preamble, x), - None => format!("{}{}", pipeline_str_preamble, pipeline_str_rest), - }; - info!("Pipeline: {}", pipeline_str); - - gst::init().unwrap(); - let pipelinee = gst::parse_launch(&*pipeline_str).expect("Couldn't launch pipeline; likely a GStreamer issue or an error in the pipeline string you specified in the 'device' argument to librespot."); - let pipeline = pipelinee - .dynamic_cast::() - .expect("couldn't cast pipeline element at runtime!"); - let bus = pipeline.get_bus().expect("couldn't get bus from pipeline"); - let mainloop = glib::MainLoop::new(None, false); - let appsrce: gst::Element = pipeline - .get_by_name("appsrc0") - .expect("couldn't get appsrc from pipeline"); - let appsrc: gst_app::AppSrc = appsrce - .dynamic_cast::() + let pipeline = gst::Pipeline::new(None); + let appsrc = gst::ElementFactory::make("appsrc", None) + .expect("Failed to create GStreamer appsrc element") + .downcast::() .expect("couldn't cast AppSrc element at runtime!"); + appsrc.set_caps(Some(&gst_caps)); + appsrc.set_max_bytes(gst_bytes as u64); + appsrc.set_block(true); + + let sink = match device { + None => { + // no need to dither twice; use librespot dithering instead + gst::parse_bin_from_description( + "audioconvert dithering=none ! audioresample ! autoaudiosink", + true, + ) + .expect("Failed to create default GStreamer sink") + } + Some(ref x) => gst::parse_bin_from_description(x, true) + .expect("Failed to create custom GStreamer sink"), + }; + pipeline + .add(&appsrc) + .expect("Failed to add GStreamer appsrc to pipeline"); + pipeline + .add(&sink) + .expect("Failed to add GStreamer sink to pipeline"); + appsrc + .link(&sink) + .expect("Failed to link GStreamer source to sink"); + + let bus = pipeline.bus().expect("couldn't get bus from pipeline"); + let bufferpool = gst::BufferPool::new(); - let appsrc_caps = appsrc.get_caps().expect("couldn't get appsrc caps"); - let mut conf = bufferpool.get_config(); - conf.set_params(Some(&appsrc_caps), 4096 * sample_size as u32, 0, 0); + + let mut conf = bufferpool.config(); + conf.set_params(Some(&gst_caps), gst_bytes as u32, 0, 0); bufferpool .set_config(conf) .expect("couldn't configure the buffer pool"); - bufferpool - .set_active(true) - .expect("couldn't activate buffer pool"); - let (tx, rx) = sync_channel::>(64 * sample_size); - thread::spawn(move || { - for data in rx { - let buffer = bufferpool.acquire_buffer(None); - if let Ok(mut buffer) = buffer { - let mutbuf = buffer.make_mut(); - mutbuf.set_size(data.len()); - mutbuf - .copy_from_slice(0, data.as_bytes()) - .expect("Failed to copy from slice"); - let _eat = appsrc.push_buffer(buffer); + let async_error = Arc::new(Mutex::new(None)); + let async_error_clone = async_error.clone(); + + bus.set_sync_handler(move |_bus, msg| { + match msg.view() { + gst::MessageView::Eos(_) => { + println!("gst signaled end of stream"); + + let mut async_error_storage = async_error_clone.lock(); + *async_error_storage = Some(String::from("gst signaled end of stream")); } + gst::MessageView::Error(err) => { + println!( + "Error from {:?}: {} ({:?})", + err.src().map(|s| s.path_string()), + err.error(), + err.debug() + ); + + let mut async_error_storage = async_error_clone.lock(); + *async_error_storage = Some(format!( + "Error from {:?}: {} ({:?})", + err.src().map(|s| s.path_string()), + err.error(), + err.debug() + )); + } + _ => (), } - }); - thread::spawn(move || { - let thread_mainloop = mainloop; - let watch_mainloop = thread_mainloop.clone(); - bus.add_watch(move |_, msg| { - match msg.view() { - gst::MessageView::Eos(..) => watch_mainloop.quit(), - gst::MessageView::Error(err) => { - println!( - "Error from {:?}: {} ({:?})", - err.get_src().map(|s| s.get_path_string()), - err.get_error(), - err.get_debug() - ); - watch_mainloop.quit(); - } - _ => (), - }; - - glib::Continue(true) - }) - .expect("failed to add bus watch"); - thread_mainloop.run(); + gst::BusSyncReply::Drop }); pipeline - .set_state(gst::State::Playing) - .expect("unable to set the pipeline to the `Playing` state"); + .set_state(State::Ready) + .expect("unable to set the pipeline to the `Ready` state"); Self { - tx, + appsrc, + bufferpool, pipeline, format, + async_error, } } } impl Sink for GstreamerSink { + fn start(&mut self) -> SinkResult<()> { + *self.async_error.lock() = None; + self.appsrc.send_event(FlushStop::new(true)); + self.bufferpool + .set_active(true) + .map_err(|e| SinkError::OnWrite(e.to_string()))?; + self.pipeline + .set_state(State::Playing) + .map_err(|e| SinkError::OnWrite(e.to_string()))?; + Ok(()) + } + + fn stop(&mut self) -> SinkResult<()> { + *self.async_error.lock() = None; + self.appsrc.send_event(FlushStart::new()); + self.pipeline + .set_state(State::Paused) + .map_err(|e| SinkError::OnWrite(e.to_string()))?; + self.bufferpool + .set_active(false) + .map_err(|e| SinkError::OnWrite(e.to_string()))?; + Ok(()) + } + sink_as_bytes!(); } +impl Drop for GstreamerSink { + fn drop(&mut self) { + let _ = self.pipeline.set_state(State::Null); + } +} + impl SinkAsBytes for GstreamerSink { fn write_bytes(&mut self, data: &[u8]) -> SinkResult<()> { - // Copy expensively (in to_vec()) to avoid thread synchronization - self.tx - .send(data.to_vec()) - .expect("tx send failed in write function"); + if let Some(async_error) = &*self.async_error.lock() { + return Err(SinkError::OnWrite(async_error.to_string())); + } + + let mut buffer = self + .bufferpool + .acquire_buffer(None) + .map_err(|e| SinkError::OnWrite(e.to_string()))?; + + let mutbuf = buffer.make_mut(); + mutbuf.set_size(data.len()); + mutbuf + .copy_from_slice(0, data) + .map_err(|e| SinkError::OnWrite(e.to_string()))?; + + self.appsrc + .push_buffer(buffer) + .map_err(|e| SinkError::OnWrite(e.to_string()))?; + Ok(()) } }