Merge pull request #649 from Johannesd3/tokio-migration-refactor-deps

[Tokio migration] Merge dev and refactor
This commit is contained in:
Ash 2021-02-26 10:49:10 +01:00 committed by GitHub
commit 9d77fef008
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
53 changed files with 1268 additions and 1215 deletions

View file

@ -78,6 +78,7 @@ jobs:
- run: cargo build --locked --no-default-features --features "portaudio-backend" - run: cargo build --locked --no-default-features --features "portaudio-backend"
- run: cargo build --locked --no-default-features --features "pulseaudio-backend" - run: cargo build --locked --no-default-features --features "pulseaudio-backend"
- run: cargo build --locked --no-default-features --features "jackaudio-backend" - run: cargo build --locked --no-default-features --features "jackaudio-backend"
- run: cargo build --locked --no-default-features --features "rodiojack-backend"
- run: cargo build --locked --no-default-features --features "rodio-backend" - run: cargo build --locked --no-default-features --features "rodio-backend"
- run: cargo build --locked --no-default-features --features "sdl-backend" - run: cargo build --locked --no-default-features --features "sdl-backend"
- run: cargo build --locked --no-default-features --features "gstreamer-backend" - run: cargo build --locked --no-default-features --features "gstreamer-backend"

5
.gitignore vendored
View file

@ -3,4 +3,7 @@ target
spotify_appkey.key spotify_appkey.key
.vagrant/ .vagrant/
.project .project
.history .history
*.save

View file

@ -46,6 +46,7 @@ Depending on the chosen backend, specific development libraries are required.
|PortAudio | `portaudio19-dev` | `portaudio-devel` | `portaudio` | |PortAudio | `portaudio19-dev` | `portaudio-devel` | `portaudio` |
|PulseAudio | `libpulse-dev` | `pulseaudio-libs-devel` | | |PulseAudio | `libpulse-dev` | `pulseaudio-libs-devel` | |
|JACK | `libjack-dev` | `jack-audio-connection-kit-devel` | | |JACK | `libjack-dev` | `jack-audio-connection-kit-devel` | |
|JACK over Rodio | `libjack-dev` | `jack-audio-connection-kit-devel` | - |
|SDL | `libsdl2-dev` | `SDL2-devel` | | |SDL | `libsdl2-dev` | `SDL2-devel` | |
|Pipe | - | - | - | |Pipe | - | - | - |

564
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,6 +1,6 @@
[package] [package]
name = "librespot" name = "librespot"
version = "0.1.3" version = "0.1.6"
authors = ["Librespot Org"] authors = ["Librespot Org"]
license = "MIT" license = "MIT"
description = "An open source client library for Spotify, with support for Spotify Connect" description = "An open source client library for Spotify, with support for Spotify Connect"
@ -22,59 +22,61 @@ doc = false
[dependencies.librespot-audio] [dependencies.librespot-audio]
path = "audio" path = "audio"
version = "0.1.3" version = "0.1.6"
[dependencies.librespot-connect] [dependencies.librespot-connect]
path = "connect" path = "connect"
version = "0.1.3" version = "0.1.6"
[dependencies.librespot-core] [dependencies.librespot-core]
path = "core" path = "core"
version = "0.1.3" version = "0.1.6"
[dependencies.librespot-metadata] [dependencies.librespot-metadata]
path = "metadata" path = "metadata"
version = "0.1.3" version = "0.1.6"
[dependencies.librespot-playback] [dependencies.librespot-playback]
path = "playback" path = "playback"
version = "0.1.3" version = "0.1.6"
[dependencies.librespot-protocol] [dependencies.librespot-protocol]
path = "protocol" path = "protocol"
version = "0.1.3" version = "0.1.6"
[dependencies] [dependencies]
base64 = "0.13" base64 = "0.13"
env_logger = {version = "0.8", default-features = false, features = ["termcolor","humantime","atty"]} env_logger = {version = "0.8", default-features = false, features = ["termcolor","humantime","atty"]}
futures = "0.3" futures-util = { version = "0.3", default_features = false }
getopts = "0.2" getopts = "0.2"
hex = "0.4"
hyper = "0.14" hyper = "0.14"
log = "0.4" log = "0.4"
num-bigint = "0.3"
protobuf = "~2.14.0"
rand = "0.7"
rpassword = "5.0" rpassword = "5.0"
tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros", "signal", "process"] } tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros", "signal", "sync", "process"] }
url = "1.7" url = "2.1"
sha-1 = "0.8" sha-1 = "0.9"
hex = "0.4"
[features] [features]
apresolve = ["librespot-core/apresolve"]
apresolve-http2 = ["librespot-core/apresolve-http2"]
alsa-backend = ["librespot-playback/alsa-backend"] alsa-backend = ["librespot-playback/alsa-backend"]
portaudio-backend = ["librespot-playback/portaudio-backend"] portaudio-backend = ["librespot-playback/portaudio-backend"]
pulseaudio-backend = ["librespot-playback/pulseaudio-backend"] pulseaudio-backend = ["librespot-playback/pulseaudio-backend"]
jackaudio-backend = ["librespot-playback/jackaudio-backend"] jackaudio-backend = ["librespot-playback/jackaudio-backend"]
rodio-backend = ["librespot-playback/rodio-backend"] rodio-backend = ["librespot-playback/rodio-backend"]
rodiojack-backend = ["librespot-playback/rodiojack-backend"]
sdl-backend = ["librespot-playback/sdl-backend"] sdl-backend = ["librespot-playback/sdl-backend"]
gstreamer-backend = ["librespot-playback/gstreamer-backend"] gstreamer-backend = ["librespot-playback/gstreamer-backend"]
with-tremor = ["librespot-audio/with-tremor"] with-tremor = ["librespot-audio/with-tremor"]
with-vorbis = ["librespot-audio/with-vorbis"] with-vorbis = ["librespot-audio/with-vorbis"]
with-lewton = ["librespot-audio/with-lewton"]
# with-dns-sd = ["librespot-connect/with-dns-sd"] # with-dns-sd = ["librespot-connect/with-dns-sd"]
default = ["librespot-playback/rodio-backend"] default = ["rodio-backend", "apresolve", "with-lewton"]
[package.metadata.deb] [package.metadata.deb]
maintainer = "librespot-org" maintainer = "librespot-org"

View file

@ -27,7 +27,9 @@ There is some brief documentation on how the protocol works in the [docs](https:
If you wish to learn more about how librespot works overall, the best way is to simply read the code, and ask any questions you have in our [Gitter Room](https://gitter.im/librespot-org/spotify-connect-resources). If you wish to learn more about how librespot works overall, the best way is to simply read the code, and ask any questions you have in our [Gitter Room](https://gitter.im/librespot-org/spotify-connect-resources).
# Issues # Issues & Discussions
**We have recently started using Github discussions for general questions and feature requests, as they are a more natural medium for such cases, and allow for upvoting to prioritize feature development. Check them out [here](https://github.com/librespot-org/librespot/discussions). Bugs and issues with the underlying library should still be reported as issues.**
If you run into a bug when using librespot, please search the existing issues before opening a new one. Chances are, we've encountered it before, and have provided a resolution. If not, please open a new one, and where possible, include the backtrace librespot generates on crashing, along with anything we can use to reproduce the issue, eg. the Spotify URI of the song that caused the crash. If you run into a bug when using librespot, please search the existing issues before opening a new one. Chances are, we've encountered it before, and have provided a resolution. If not, please open a new one, and where possible, include the backtrace librespot generates on crashing, along with anything we can use to reproduce the issue, eg. the Spotify URI of the song that caused the crash.
# Building # Building
@ -54,6 +56,7 @@ ALSA
PortAudio PortAudio
PulseAudio PulseAudio
JACK JACK
JACK over Rodio
SDL SDL
Pipe Pipe
``` ```

View file

@ -1,6 +1,6 @@
[package] [package]
name = "librespot-audio" name = "librespot-audio"
version = "0.1.3" version = "0.1.6"
authors = ["Paul Lietar <paul@lietar.net>"] authors = ["Paul Lietar <paul@lietar.net>"]
description="The audio fetching and processing logic for librespot" description="The audio fetching and processing logic for librespot"
license="MIT" license="MIT"
@ -8,24 +8,24 @@ edition = "2018"
[dependencies.librespot-core] [dependencies.librespot-core]
path = "../core" path = "../core"
version = "0.1.3" version = "0.1.6"
[dependencies] [dependencies]
aes-ctr = "0.6" aes-ctr = "0.6"
bit-set = "0.5"
byteorder = "1.4" byteorder = "1.4"
bytes = "1.0" bytes = "1.0"
futures = "0.3" cfg-if = "1"
lewton = "0.10"
log = "0.4" log = "0.4"
num-bigint = "0.3" futures-util = { version = "0.3", default_features = false }
num-traits = "0.2" ogg = "0.8"
pin-project-lite = "0.2.4"
tempfile = "3.1" tempfile = "3.1"
tokio = { version = "1", features = ["sync"] }
lewton = { version = "0.10", optional = true }
librespot-tremor = { version = "0.2.0", optional = true } librespot-tremor = { version = "0.2.0", optional = true }
vorbis = { version ="0.0.14", optional = true } vorbis = { version ="0.0.14", optional = true }
[features] [features]
with-lewton = ["lewton"]
with-tremor = ["librespot-tremor"] with-tremor = ["librespot-tremor"]
with-vorbis = ["vorbis"] with-vorbis = ["vorbis"]

View file

@ -1,30 +1,23 @@
use crate::range_set::{Range, RangeSet}; use std::cmp::{max, min};
use std::fs;
use std::future::Future;
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::pin::Pin;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::{Arc, Condvar, Mutex};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use bytes::Bytes; use bytes::Bytes;
use futures::{ use futures_util::{future, StreamExt, TryFutureExt, TryStreamExt};
channel::{mpsc, oneshot},
future,
};
use futures::{Future, Stream, StreamExt, TryFutureExt, TryStreamExt};
use std::fs;
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::sync::{Arc, Condvar, Mutex};
use std::task::Poll;
use std::time::{Duration, Instant};
use std::{
cmp::{max, min},
pin::Pin,
task::Context,
};
use tempfile::NamedTempFile;
use futures::channel::mpsc::unbounded;
use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders}; use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders};
use librespot_core::session::Session; use librespot_core::session::Session;
use librespot_core::spotify_id::FileId; use librespot_core::spotify_id::FileId;
use std::sync::atomic; use tempfile::NamedTempFile;
use std::sync::atomic::AtomicUsize; use tokio::sync::{mpsc, oneshot};
use crate::range_set::{Range, RangeSet};
const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16; const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16;
// The minimum size of a block that is requested from the Spotify servers in one request. // The minimum size of a block that is requested from the Spotify servers in one request.
@ -96,6 +89,7 @@ pub enum AudioFile {
Streaming(AudioFileStreaming), Streaming(AudioFileStreaming),
} }
#[derive(Debug)]
enum StreamLoaderCommand { enum StreamLoaderCommand {
Fetch(Range), // signal the stream loader to fetch a range of the file Fetch(Range), // signal the stream loader to fetch a range of the file
RandomAccessMode(), // optimise download strategy for random access RandomAccessMode(), // optimise download strategy for random access
@ -147,7 +141,7 @@ impl StreamLoaderController {
fn send_stream_loader_command(&mut self, command: StreamLoaderCommand) { fn send_stream_loader_command(&mut self, command: StreamLoaderCommand) {
if let Some(ref mut channel) = self.channel_tx { if let Some(ref mut channel) = self.channel_tx {
// ignore the error in case the channel has been closed already. // ignore the error in case the channel has been closed already.
let _ = channel.unbounded_send(command); let _ = channel.send(command);
} }
} }
@ -191,7 +185,7 @@ impl StreamLoaderController {
// We can't use self.fetch here because self can't be borrowed mutably, so we access the channel directly. // We can't use self.fetch here because self can't be borrowed mutably, so we access the channel directly.
if let Some(ref mut channel) = self.channel_tx { if let Some(ref mut channel) = self.channel_tx {
// ignore the error in case the channel has been closed already. // ignore the error in case the channel has been closed already.
let _ = channel.unbounded_send(StreamLoaderCommand::Fetch(range)); let _ = channel.send(StreamLoaderCommand::Fetch(range));
} }
} }
} }
@ -387,7 +381,7 @@ impl AudioFileStreaming {
//let (seek_tx, seek_rx) = mpsc::unbounded(); //let (seek_tx, seek_rx) = mpsc::unbounded();
let (stream_loader_command_tx, stream_loader_command_rx) = let (stream_loader_command_tx, stream_loader_command_rx) =
mpsc::unbounded::<StreamLoaderCommand>(); mpsc::unbounded_channel::<StreamLoaderCommand>();
let fetcher = AudioFileFetch::new( let fetcher = AudioFileFetch::new(
session.clone(), session.clone(),
@ -455,7 +449,7 @@ enum ReceivedData {
async fn audio_file_fetch_receive_data( async fn audio_file_fetch_receive_data(
shared: Arc<AudioFileShared>, shared: Arc<AudioFileShared>,
file_data_tx: mpsc::UnboundedSender<ReceivedData>, file_data_tx: mpsc::UnboundedSender<ReceivedData>,
data_rx: ChannelData, mut data_rx: ChannelData,
initial_data_offset: usize, initial_data_offset: usize,
initial_request_length: usize, initial_request_length: usize,
request_sent_time: Instant, request_sent_time: Instant,
@ -471,49 +465,44 @@ async fn audio_file_fetch_receive_data(
.number_of_open_requests .number_of_open_requests
.fetch_add(1, atomic::Ordering::SeqCst); .fetch_add(1, atomic::Ordering::SeqCst);
enum TryFoldErr { let result = loop {
ChannelError, let data = match data_rx.next().await {
FinishEarly, Some(Ok(data)) => data,
} Some(Err(e)) => break Err(e),
None => break Ok(()),
};
let result = data_rx if measure_ping_time {
.map_err(|_| TryFoldErr::ChannelError) let duration = Instant::now() - request_sent_time;
.try_for_each(|data| { let duration_ms: u64;
if measure_ping_time { if 0.001 * (duration.as_millis() as f64) > MAXIMUM_ASSUMED_PING_TIME_SECONDS {
let duration = Instant::now() - request_sent_time; duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64;
let duration_ms: u64;
if 0.001 * (duration.as_millis() as f64)
> MAXIMUM_ASSUMED_PING_TIME_SECONDS
{
duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64;
} else {
duration_ms = duration.as_millis() as u64;
}
let _ = file_data_tx
.unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize));
measure_ping_time = false;
}
let data_size = data.len();
let _ = file_data_tx
.unbounded_send(ReceivedData::Data(PartialFileData {
offset: data_offset,
data: data,
}));
data_offset += data_size;
if request_length < data_size {
warn!("Data receiver for range {} (+{}) received more data from server than requested.", initial_data_offset, initial_request_length);
request_length = 0;
} else { } else {
request_length -= data_size; duration_ms = duration.as_millis() as u64;
} }
let _ = file_data_tx.send(ReceivedData::ResponseTimeMs(duration_ms as usize));
measure_ping_time = false;
}
let data_size = data.len();
let _ = file_data_tx.send(ReceivedData::Data(PartialFileData {
offset: data_offset,
data: data,
}));
data_offset += data_size;
if request_length < data_size {
warn!(
"Data receiver for range {} (+{}) received more data from server than requested.",
initial_data_offset, initial_request_length
);
request_length = 0;
} else {
request_length -= data_size;
}
future::ready(if request_length == 0 { if request_length == 0 {
Err(TryFoldErr::FinishEarly) break Ok(());
} else { }
Ok(()) };
})
})
.await;
if request_length > 0 { if request_length > 0 {
let missing_range = Range::new(data_offset, request_length); let missing_range = Range::new(data_offset, request_length);
@ -527,7 +516,7 @@ async fn audio_file_fetch_receive_data(
.number_of_open_requests .number_of_open_requests
.fetch_sub(1, atomic::Ordering::SeqCst); .fetch_sub(1, atomic::Ordering::SeqCst);
if let Err(TryFoldErr::ChannelError) = result { if result.is_err() {
warn!( warn!(
"Error from channel for data receiver for range {} (+{}).", "Error from channel for data receiver for range {} (+{}).",
initial_data_offset, initial_request_length initial_data_offset, initial_request_length
@ -696,21 +685,17 @@ async fn audio_file_fetch(
future::select_all(vec![f1, f2, f3]).await future::select_all(vec![f1, f2, f3]).await
}*/ }*/
pin_project! { struct AudioFileFetch {
struct AudioFileFetch { session: Session,
session: Session, shared: Arc<AudioFileShared>,
shared: Arc<AudioFileShared>, output: Option<NamedTempFile>,
output: Option<NamedTempFile>,
file_data_tx: mpsc::UnboundedSender<ReceivedData>, file_data_tx: mpsc::UnboundedSender<ReceivedData>,
#[pin] file_data_rx: mpsc::UnboundedReceiver<ReceivedData>,
file_data_rx: mpsc::UnboundedReceiver<ReceivedData>,
#[pin] stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>, complete_tx: Option<oneshot::Sender<NamedTempFile>>,
complete_tx: Option<oneshot::Sender<NamedTempFile>>, network_response_times_ms: Vec<usize>,
network_response_times_ms: Vec<usize>,
}
} }
impl AudioFileFetch { impl AudioFileFetch {
@ -725,7 +710,7 @@ impl AudioFileFetch {
stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>, stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
complete_tx: oneshot::Sender<NamedTempFile>, complete_tx: oneshot::Sender<NamedTempFile>,
) -> AudioFileFetch { ) -> AudioFileFetch {
let (file_data_tx, file_data_rx) = unbounded::<ReceivedData>(); let (file_data_tx, file_data_rx) = mpsc::unbounded_channel::<ReceivedData>();
{ {
let requested_range = Range::new(0, initial_data_length); let requested_range = Range::new(0, initial_data_length);
@ -863,7 +848,7 @@ impl AudioFileFetch {
fn poll_file_data_rx(&mut self, cx: &mut Context<'_>) -> Poll<()> { fn poll_file_data_rx(&mut self, cx: &mut Context<'_>) -> Poll<()> {
loop { loop {
match Pin::new(&mut self.file_data_rx).poll_next(cx) { match self.file_data_rx.poll_recv(cx) {
Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms))) => { Poll::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms))) => {
trace!("Ping time estimated as: {} ms.", response_time_ms); trace!("Ping time estimated as: {} ms.", response_time_ms);
@ -939,7 +924,7 @@ impl AudioFileFetch {
fn poll_stream_loader_command_rx(&mut self, cx: &mut Context<'_>) -> Poll<()> { fn poll_stream_loader_command_rx(&mut self, cx: &mut Context<'_>) -> Poll<()> {
loop { loop {
match Pin::new(&mut self.stream_loader_command_rx).poll_next(cx) { match self.stream_loader_command_rx.poll_recv(cx) {
Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(cmd)) => match cmd { Poll::Ready(Some(cmd)) => match cmd {
StreamLoaderCommand::Fetch(request) => { StreamLoaderCommand::Fetch(request) => {
@ -1059,7 +1044,7 @@ impl Read for AudioFileStreaming {
for &range in ranges_to_request.iter() { for &range in ranges_to_request.iter() {
self.stream_loader_command_tx self.stream_loader_command_tx
.unbounded_send(StreamLoaderCommand::Fetch(range)) .send(StreamLoaderCommand::Fetch(range))
.unwrap(); .unwrap();
} }

View file

@ -1,13 +1,11 @@
extern crate lewton; use lewton::inside_ogg::OggStreamReader;
use self::lewton::inside_ogg::OggStreamReader;
use super::{AudioDecoder, AudioError, AudioPacket};
use std::error; use std::error;
use std::fmt; use std::fmt;
use std::io::{Read, Seek}; use std::io::{Read, Seek};
pub struct VorbisDecoder<R: Read + Seek>(OggStreamReader<R>); pub struct VorbisDecoder<R: Read + Seek>(OggStreamReader<R>);
pub struct VorbisPacket(Vec<i16>);
pub struct VorbisError(lewton::VorbisError); pub struct VorbisError(lewton::VorbisError);
impl<R> VorbisDecoder<R> impl<R> VorbisDecoder<R>
@ -17,41 +15,38 @@ where
pub fn new(input: R) -> Result<VorbisDecoder<R>, VorbisError> { pub fn new(input: R) -> Result<VorbisDecoder<R>, VorbisError> {
Ok(VorbisDecoder(OggStreamReader::new(input)?)) Ok(VorbisDecoder(OggStreamReader::new(input)?))
} }
}
pub fn seek(&mut self, ms: i64) -> Result<(), VorbisError> { impl<R> AudioDecoder for VorbisDecoder<R>
where
R: Read + Seek,
{
fn seek(&mut self, ms: i64) -> Result<(), AudioError> {
let absgp = ms * 44100 / 1000; let absgp = ms * 44100 / 1000;
self.0.seek_absgp_pg(absgp as u64)?; match self.0.seek_absgp_pg(absgp as u64) {
Ok(()) Ok(_) => return Ok(()),
Err(err) => return Err(AudioError::VorbisError(err.into())),
}
} }
pub fn next_packet(&mut self) -> Result<Option<VorbisPacket>, VorbisError> { fn next_packet(&mut self) -> Result<Option<AudioPacket>, AudioError> {
use self::lewton::audio::AudioReadError::AudioIsHeader; use lewton::audio::AudioReadError::AudioIsHeader;
use self::lewton::OggReadError::NoCapturePatternFound; use lewton::OggReadError::NoCapturePatternFound;
use self::lewton::VorbisError::BadAudio; use lewton::VorbisError::BadAudio;
use self::lewton::VorbisError::OggError; use lewton::VorbisError::OggError;
loop { loop {
match self.0.read_dec_packet_itl() { match self.0.read_dec_packet_itl() {
Ok(Some(packet)) => return Ok(Some(VorbisPacket(packet))), Ok(Some(packet)) => return Ok(Some(AudioPacket::Samples(packet))),
Ok(None) => return Ok(None), Ok(None) => return Ok(None),
Err(BadAudio(AudioIsHeader)) => (), Err(BadAudio(AudioIsHeader)) => (),
Err(OggError(NoCapturePatternFound)) => (), Err(OggError(NoCapturePatternFound)) => (),
Err(err) => return Err(err.into()), Err(err) => return Err(AudioError::VorbisError(err.into())),
} }
} }
} }
} }
impl VorbisPacket {
pub fn data(&self) -> &[i16] {
&self.0
}
pub fn data_mut(&mut self) -> &mut [i16] {
&mut self.0
}
}
impl From<lewton::VorbisError> for VorbisError { impl From<lewton::VorbisError> for VorbisError {
fn from(err: lewton::VorbisError) -> VorbisError { fn from(err: lewton::VorbisError) -> VorbisError {
VorbisError(err) VorbisError(err)

View file

@ -2,27 +2,33 @@
#[macro_use] #[macro_use]
extern crate log; extern crate log;
#[macro_use]
extern crate pin_project_lite;
extern crate aes_ctr;
extern crate bit_set;
extern crate byteorder;
extern crate bytes;
extern crate futures;
extern crate num_bigint;
extern crate num_traits;
extern crate tempfile;
extern crate librespot_core;
mod decrypt; mod decrypt;
mod fetch; mod fetch;
#[cfg(not(any(feature = "with-tremor", feature = "with-vorbis")))] use cfg_if::cfg_if;
mod lewton_decoder;
#[cfg(any(feature = "with-tremor", feature = "with-vorbis"))] #[cfg(any(
mod libvorbis_decoder; all(feature = "with-lewton", feature = "with-tremor"),
all(feature = "with-vorbis", feature = "with-tremor"),
all(feature = "with-lewton", feature = "with-vorbis")
))]
compile_error!("Cannot use two decoders at the same time.");
cfg_if! {
if #[cfg(feature = "with-lewton")] {
mod lewton_decoder;
pub use lewton_decoder::{VorbisDecoder, VorbisError};
} else if #[cfg(any(feature = "with-tremor", feature = "with-vorbis"))] {
mod libvorbis_decoder;
pub use crate::libvorbis_decoder::{VorbisDecoder, VorbisError};
} else {
compile_error!("Must choose a vorbis decoder.");
}
}
mod passthrough_decoder;
pub use passthrough_decoder::{PassthroughDecoder, PassthroughError};
mod range_set; mod range_set;
@ -32,8 +38,64 @@ pub use fetch::{
READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_BEFORE_PLAYBACK_SECONDS, READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_BEFORE_PLAYBACK_SECONDS,
READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS,
}; };
use std::fmt;
#[cfg(not(any(feature = "with-tremor", feature = "with-vorbis")))] pub enum AudioPacket {
pub use crate::lewton_decoder::{VorbisDecoder, VorbisError, VorbisPacket}; Samples(Vec<i16>),
#[cfg(any(feature = "with-tremor", feature = "with-vorbis"))] OggData(Vec<u8>),
pub use libvorbis_decoder::{VorbisDecoder, VorbisError, VorbisPacket}; }
impl AudioPacket {
pub fn samples(&self) -> &[i16] {
match self {
AudioPacket::Samples(s) => s,
AudioPacket::OggData(_) => panic!("can't return OggData on samples"),
}
}
pub fn oggdata(&self) -> &[u8] {
match self {
AudioPacket::Samples(_) => panic!("can't return samples on OggData"),
AudioPacket::OggData(d) => d,
}
}
pub fn is_empty(&self) -> bool {
match self {
AudioPacket::Samples(s) => s.is_empty(),
AudioPacket::OggData(d) => d.is_empty(),
}
}
}
#[derive(Debug)]
pub enum AudioError {
PassthroughError(PassthroughError),
VorbisError(VorbisError),
}
impl fmt::Display for AudioError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
AudioError::PassthroughError(err) => write!(f, "PassthroughError({})", err),
AudioError::VorbisError(err) => write!(f, "VorbisError({})", err),
}
}
}
impl From<VorbisError> for AudioError {
fn from(err: VorbisError) -> AudioError {
AudioError::VorbisError(VorbisError::from(err))
}
}
impl From<PassthroughError> for AudioError {
fn from(err: PassthroughError) -> AudioError {
AudioError::PassthroughError(PassthroughError::from(err))
}
}
pub trait AudioDecoder {
fn seek(&mut self, ms: i64) -> Result<(), AudioError>;
fn next_packet(&mut self) -> Result<Option<AudioPacket>, AudioError>;
}

View file

@ -1,14 +1,12 @@
#[cfg(feature = "with-tremor")] #[cfg(feature = "with-tremor")]
extern crate librespot_tremor as vorbis; use librespot_tremor as vorbis;
#[cfg(not(feature = "with-tremor"))]
extern crate vorbis;
use super::{AudioDecoder, AudioError, AudioPacket};
use std::error; use std::error;
use std::fmt; use std::fmt;
use std::io::{Read, Seek}; use std::io::{Read, Seek};
pub struct VorbisDecoder<R: Read + Seek>(vorbis::Decoder<R>); pub struct VorbisDecoder<R: Read + Seek>(vorbis::Decoder<R>);
pub struct VorbisPacket(vorbis::Packet);
pub struct VorbisError(vorbis::VorbisError); pub struct VorbisError(vorbis::VorbisError);
impl<R> VorbisDecoder<R> impl<R> VorbisDecoder<R>
@ -18,23 +16,28 @@ where
pub fn new(input: R) -> Result<VorbisDecoder<R>, VorbisError> { pub fn new(input: R) -> Result<VorbisDecoder<R>, VorbisError> {
Ok(VorbisDecoder(vorbis::Decoder::new(input)?)) Ok(VorbisDecoder(vorbis::Decoder::new(input)?))
} }
}
impl<R> AudioDecoder for VorbisDecoder<R>
where
R: Read + Seek,
{
#[cfg(not(feature = "with-tremor"))] #[cfg(not(feature = "with-tremor"))]
pub fn seek(&mut self, ms: i64) -> Result<(), VorbisError> { fn seek(&mut self, ms: i64) -> Result<(), AudioError> {
self.0.time_seek(ms as f64 / 1000f64)?; self.0.time_seek(ms as f64 / 1000f64)?;
Ok(()) Ok(())
} }
#[cfg(feature = "with-tremor")] #[cfg(feature = "with-tremor")]
pub fn seek(&mut self, ms: i64) -> Result<(), VorbisError> { fn seek(&mut self, ms: i64) -> Result<(), AudioError> {
self.0.time_seek(ms)?; self.0.time_seek(ms)?;
Ok(()) Ok(())
} }
pub fn next_packet(&mut self) -> Result<Option<VorbisPacket>, VorbisError> { fn next_packet(&mut self) -> Result<Option<AudioPacket>, AudioError> {
loop { loop {
match self.0.packets().next() { match self.0.packets().next() {
Some(Ok(packet)) => return Ok(Some(VorbisPacket(packet))), Some(Ok(packet)) => return Ok(Some(AudioPacket::Samples(packet.data))),
None => return Ok(None), None => return Ok(None),
Some(Err(vorbis::VorbisError::Hole)) => (), Some(Err(vorbis::VorbisError::Hole)) => (),
@ -44,16 +47,6 @@ where
} }
} }
impl VorbisPacket {
pub fn data(&self) -> &[i16] {
&self.0.data
}
pub fn data_mut(&mut self) -> &mut [i16] {
&mut self.0.data
}
}
impl From<vorbis::VorbisError> for VorbisError { impl From<vorbis::VorbisError> for VorbisError {
fn from(err: vorbis::VorbisError) -> VorbisError { fn from(err: vorbis::VorbisError) -> VorbisError {
VorbisError(err) VorbisError(err)
@ -77,3 +70,9 @@ impl error::Error for VorbisError {
error::Error::source(&self.0) error::Error::source(&self.0)
} }
} }
impl From<vorbis::VorbisError> for AudioError {
fn from(err: vorbis::VorbisError) -> AudioError {
AudioError::VorbisError(VorbisError(err))
}
}

View file

@ -0,0 +1,191 @@
// Passthrough decoder for librespot
use super::{AudioDecoder, AudioError, AudioPacket};
use ogg::{OggReadError, Packet, PacketReader, PacketWriteEndInfo, PacketWriter};
use std::fmt;
use std::io::{Read, Seek};
use std::time::{SystemTime, UNIX_EPOCH};
fn write_headers<T: Read + Seek>(
rdr: &mut PacketReader<T>,
wtr: &mut PacketWriter<Vec<u8>>,
) -> Result<u32, PassthroughError> {
let mut stream_serial: u32 = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u32;
// search for ident, comment, setup
get_header(1, rdr, wtr, &mut stream_serial, PacketWriteEndInfo::EndPage)?;
get_header(
3,
rdr,
wtr,
&mut stream_serial,
PacketWriteEndInfo::NormalPacket,
)?;
get_header(5, rdr, wtr, &mut stream_serial, PacketWriteEndInfo::EndPage)?;
// remove un-needed packets
rdr.delete_unread_packets();
return Ok(stream_serial);
}
fn get_header<T>(
code: u8,
rdr: &mut PacketReader<T>,
wtr: &mut PacketWriter<Vec<u8>>,
stream_serial: &mut u32,
info: PacketWriteEndInfo,
) -> Result<u32, PassthroughError>
where
T: Read + Seek,
{
let pck: Packet = rdr.read_packet_expected()?;
// set a unique serial number
if pck.stream_serial() != 0 {
*stream_serial = pck.stream_serial();
}
let pkt_type = pck.data[0];
debug!("Vorbis header type{}", &pkt_type);
// all headers are mandatory
if pkt_type != code {
return Err(PassthroughError(OggReadError::InvalidData));
}
// headers keep original granule number
let absgp_page = pck.absgp_page();
wtr.write_packet(
pck.data.into_boxed_slice(),
*stream_serial,
info,
absgp_page,
)
.unwrap();
return Ok(*stream_serial);
}
pub struct PassthroughDecoder<R: Read + Seek> {
rdr: PacketReader<R>,
wtr: PacketWriter<Vec<u8>>,
lastgp_page: Option<u64>,
absgp_page: u64,
stream_serial: u32,
}
pub struct PassthroughError(ogg::OggReadError);
impl<R: Read + Seek> PassthroughDecoder<R> {
/// Constructs a new Decoder from a given implementation of `Read + Seek`.
pub fn new(rdr: R) -> Result<Self, PassthroughError> {
let mut rdr = PacketReader::new(rdr);
let mut wtr = PacketWriter::new(Vec::new());
let stream_serial = write_headers(&mut rdr, &mut wtr)?;
info!("Starting passthrough track with serial {}", stream_serial);
return Ok(PassthroughDecoder {
rdr,
wtr,
lastgp_page: Some(0),
absgp_page: 0,
stream_serial,
});
}
}
impl<R: Read + Seek> AudioDecoder for PassthroughDecoder<R> {
fn seek(&mut self, ms: i64) -> Result<(), AudioError> {
info!("Seeking to {}", ms);
self.lastgp_page = match ms {
0 => Some(0),
_ => None,
};
// hard-coded to 44.1 kHz
match self.rdr.seek_absgp(None, (ms * 44100 / 1000) as u64) {
Ok(_) => return Ok(()),
Err(err) => return Err(AudioError::PassthroughError(err.into())),
}
}
fn next_packet(&mut self) -> Result<Option<AudioPacket>, AudioError> {
let mut skip = self.lastgp_page.is_none();
loop {
let pck = match self.rdr.read_packet() {
Ok(Some(pck)) => pck,
Ok(None) | Err(OggReadError::NoCapturePatternFound) => {
info!("end of streaming");
return Ok(None);
}
Err(err) => return Err(AudioError::PassthroughError(err.into())),
};
let pckgp_page = pck.absgp_page();
let lastgp_page = self.lastgp_page.get_or_insert(pckgp_page);
// consume packets till next page to get a granule reference
if skip {
if *lastgp_page == pckgp_page {
debug!("skipping packet");
continue;
}
skip = false;
info!("skipped at {}", pckgp_page);
}
// now we can calculate absolute granule
self.absgp_page += pckgp_page - *lastgp_page;
self.lastgp_page = Some(pckgp_page);
// set packet type
let inf = if pck.last_in_stream() {
self.lastgp_page = Some(0);
PacketWriteEndInfo::EndStream
} else if pck.last_in_page() {
PacketWriteEndInfo::EndPage
} else {
PacketWriteEndInfo::NormalPacket
};
self.wtr
.write_packet(
pck.data.into_boxed_slice(),
self.stream_serial,
inf,
self.absgp_page,
)
.unwrap();
let data = self.wtr.inner_mut();
if data.len() > 0 {
let result = AudioPacket::OggData(std::mem::take(data));
return Ok(Some(result));
}
}
}
}
impl fmt::Debug for PassthroughError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Debug::fmt(&self.0, f)
}
}
impl From<ogg::OggReadError> for PassthroughError {
fn from(err: OggReadError) -> PassthroughError {
PassthroughError(err)
}
}
impl fmt::Display for PassthroughError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(&self.0, f)
}
}

View file

@ -2,7 +2,7 @@ use std::cmp::{max, min};
use std::fmt; use std::fmt;
use std::slice::Iter; use std::slice::Iter;
#[derive(Copy, Clone)] #[derive(Copy, Clone, Debug)]
pub struct Range { pub struct Range {
pub start: usize, pub start: usize,
pub length: usize, pub length: usize,

View file

@ -1,44 +1,48 @@
[package] [package]
name = "librespot-connect" name = "librespot-connect"
version = "0.1.3" version = "0.1.6"
authors = ["Paul Lietar <paul@lietar.net>"] authors = ["Paul Lietar <paul@lietar.net>"]
description = "The discovery and Spotify Connect logic for librespot" description = "The discovery and Spotify Connect logic for librespot"
license = "MIT" license = "MIT"
repository = "https://github.com/librespot-org/librespot" repository = "https://github.com/librespot-org/librespot"
edition = "2018" edition = "2018"
[dependencies.librespot-core]
path = "../core"
version = "0.1.3"
[dependencies.librespot-playback]
path = "../playback"
version = "0.1.3"
[dependencies.librespot-protocol]
path = "../protocol"
version = "0.1.3"
[dependencies] [dependencies]
aes-ctr = "0.6"
base64 = "0.13" base64 = "0.13"
futures = "0.3" block-modes = "0.7"
hyper = { version = "0.14", features = ["server", "http1"] } futures-core = "0.3"
futures-util = { version = "0.3", default_features = false }
hmac = "0.10"
hyper = { version = "0.14", features = ["server", "http1", "tcp"] }
log = "0.4" log = "0.4"
num-bigint = "0.3" num-bigint = "0.3"
protobuf = "~2.14.0" protobuf = "~2.14.0"
rand = "0.7" rand = "0.8"
serde = "1.0" serde = { version = "1.0", features = ["derive"] }
serde_derive = "1.0"
serde_json = "1.0" serde_json = "1.0"
tokio = { version = "1.0", features = ["macros"] } sha-1 = "0.9"
url = "1.7" tokio = { version = "1.0", features = ["macros", "rt", "sync"] }
sha-1 = "0.8" tokio-stream = { version = "0.1" }
hmac = "0.7" url = "2.1"
aes-ctr = "0.3"
block-modes = "0.3"
dns-sd = { version = "0.1.3", optional = true } dns-sd = { version = "0.1.3", optional = true }
libmdns = { version = "0.6", optional = true } libmdns = { version = "0.6", optional = true }
[dependencies.librespot-core]
path = "../core"
version = "0.1.6"
[dependencies.librespot-playback]
path = "../playback"
version = "0.1.6"
[dependencies.librespot-protocol]
path = "../protocol"
version = "0.1.6"
[features] [features]
default = ["libmdns"] with-libmdns = ["libmdns"]
with-dns-sd = ["dns-sd"] with-dns-sd = ["dns-sd"]
default = ["with-libmdns"]

View file

@ -1,7 +1,7 @@
use crate::core::spotify_id::SpotifyId;
use crate::protocol::spirc::TrackRef; use crate::protocol::spirc::TrackRef;
use librespot_core::spotify_id::SpotifyId;
use serde; use serde::Deserialize;
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
pub struct StationContext { pub struct StationContext {

View file

@ -1,13 +1,13 @@
use aes_ctr::stream_cipher::generic_array::GenericArray; use aes_ctr::cipher::generic_array::GenericArray;
use aes_ctr::stream_cipher::{NewStreamCipher, SyncStreamCipher}; use aes_ctr::cipher::{NewStreamCipher, SyncStreamCipher};
use aes_ctr::Aes128Ctr; use aes_ctr::Aes128Ctr;
use base64; use futures_core::Stream;
use futures::channel::{mpsc, oneshot}; use hmac::{Hmac, Mac, NewMac};
use futures::{Stream, StreamExt};
use hmac::{Hmac, Mac};
use hyper::service::{make_service_fn, service_fn}; use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, StatusCode}; use hyper::{Body, Method, Request, Response, StatusCode};
use serde_json::json;
use sha1::{Digest, Sha1}; use sha1::{Digest, Sha1};
use tokio::sync::{mpsc, oneshot};
use std::borrow::Cow; use std::borrow::Cow;
use std::convert::Infallible; use std::convert::Infallible;
@ -50,7 +50,7 @@ impl Discovery {
config: ConnectConfig, config: ConnectConfig,
device_id: String, device_id: String,
) -> (Discovery, mpsc::UnboundedReceiver<Credentials>) { ) -> (Discovery, mpsc::UnboundedReceiver<Credentials>) {
let (tx, rx) = mpsc::unbounded(); let (tx, rx) = mpsc::unbounded_channel();
let key_data = util::rand_vec(&mut rand::thread_rng(), 95); let key_data = util::rand_vec(&mut rand::thread_rng(), 95);
let private_key = BigUint::from_bytes_be(&key_data); let private_key = BigUint::from_bytes_be(&key_data);
@ -118,18 +118,18 @@ impl Discovery {
let checksum_key = { let checksum_key = {
let mut h = HmacSha1::new_varkey(base_key).expect("HMAC can take key of any size"); let mut h = HmacSha1::new_varkey(base_key).expect("HMAC can take key of any size");
h.input(b"checksum"); h.update(b"checksum");
h.result().code() h.finalize().into_bytes()
}; };
let encryption_key = { let encryption_key = {
let mut h = HmacSha1::new_varkey(&base_key).expect("HMAC can take key of any size"); let mut h = HmacSha1::new_varkey(&base_key).expect("HMAC can take key of any size");
h.input(b"encryption"); h.update(b"encryption");
h.result().code() h.finalize().into_bytes()
}; };
let mut h = HmacSha1::new_varkey(&checksum_key).expect("HMAC can take key of any size"); let mut h = HmacSha1::new_varkey(&checksum_key).expect("HMAC can take key of any size");
h.input(encrypted); h.update(encrypted);
if let Err(_) = h.verify(cksum) { if let Err(_) = h.verify(cksum) {
warn!("Login error for user {:?}: MAC mismatch", username); warn!("Login error for user {:?}: MAC mismatch", username);
let result = json!({ let result = json!({
@ -155,7 +155,7 @@ impl Discovery {
let credentials = let credentials =
Credentials::with_blob(username.to_string(), &decrypted, &self.0.device_id); Credentials::with_blob(username.to_string(), &decrypted, &self.0.device_id);
self.0.tx.unbounded_send(credentials).unwrap(); self.0.tx.send(credentials).unwrap();
let result = json!({ let result = json!({
"status": 101, "status": 101,
@ -273,6 +273,6 @@ impl Stream for DiscoveryStream {
type Item = Credentials; type Item = Credentials;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.credentials.poll_next_unpin(cx) self.credentials.poll_recv(cx)
} }
} }

View file

@ -1,34 +1,9 @@
#[macro_use] #[macro_use]
extern crate log; extern crate log;
#[macro_use]
extern crate serde_json;
#[macro_use]
extern crate serde_derive;
extern crate serde;
extern crate base64; use librespot_core as core;
extern crate futures; use librespot_playback as playback;
extern crate hyper; use librespot_protocol as protocol;
extern crate num_bigint;
extern crate protobuf;
extern crate rand;
extern crate tokio;
extern crate url;
extern crate aes_ctr;
extern crate block_modes;
extern crate hmac;
extern crate sha1;
#[cfg(feature = "with-dns-sd")]
extern crate dns_sd;
#[cfg(not(feature = "with-dns-sd"))]
extern crate libmdns;
extern crate librespot_core;
extern crate librespot_playback as playback;
extern crate librespot_protocol as protocol;
pub mod context; pub mod context;
pub mod discovery; pub mod discovery;

View file

@ -1,26 +1,27 @@
use std::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use crate::context::StationContext; use crate::context::StationContext;
use crate::core::config::{ConnectConfig, VolumeCtrl};
use crate::core::mercury::{MercuryError, MercurySender};
use crate::core::session::Session;
use crate::core::spotify_id::{SpotifyAudioType, SpotifyId, SpotifyIdError};
use crate::core::util::url_encode;
use crate::core::util::SeqGenerator;
use crate::core::version;
use crate::playback::mixer::Mixer; use crate::playback::mixer::Mixer;
use crate::playback::player::{Player, PlayerEvent, PlayerEventChannel}; use crate::playback::player::{Player, PlayerEvent, PlayerEventChannel};
use crate::protocol; use crate::protocol;
use crate::protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State, TrackRef}; use crate::protocol::spirc::{DeviceState, Frame, MessageType, PlayStatus, State, TrackRef};
use futures::channel::mpsc;
use futures::future::{self, FusedFuture}; use futures_util::future::{self, FusedFuture};
use futures::stream::FusedStream; use futures_util::stream::FusedStream;
use futures::{Future, FutureExt, StreamExt}; use futures_util::{FutureExt, StreamExt};
use librespot_core::config::{ConnectConfig, VolumeCtrl};
use librespot_core::mercury::{MercuryError, MercurySender};
use librespot_core::session::Session;
use librespot_core::spotify_id::{SpotifyAudioType, SpotifyId, SpotifyIdError};
use librespot_core::util::url_encode;
use librespot_core::util::SeqGenerator;
use librespot_core::version;
use protobuf::{self, Message}; use protobuf::{self, Message};
use rand;
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use serde_json; use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
enum SpircPlayStatus { enum SpircPlayStatus {
Stopped, Stopped,
@ -59,8 +60,8 @@ struct SpircTask {
subscription: BoxedStream<Frame>, subscription: BoxedStream<Frame>,
sender: MercurySender, sender: MercurySender,
commands: mpsc::UnboundedReceiver<SpircCommand>, commands: Option<mpsc::UnboundedReceiver<SpircCommand>>,
player_events: PlayerEventChannel, player_events: Option<PlayerEventChannel>,
shutdown: bool, shutdown: bool,
session: Session, session: Session,
@ -263,6 +264,7 @@ impl Spirc {
.mercury() .mercury()
.subscribe(uri.clone()) .subscribe(uri.clone())
.map(Result::unwrap) .map(Result::unwrap)
.map(UnboundedReceiverStream::new)
.flatten_stream() .flatten_stream()
.map(|response| -> Frame { .map(|response| -> Frame {
let data = response.payload.first().unwrap(); let data = response.payload.first().unwrap();
@ -272,7 +274,7 @@ impl Spirc {
let sender = session.mercury().sender(uri); let sender = session.mercury().sender(uri);
let (cmd_tx, cmd_rx) = mpsc::unbounded(); let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let volume = config.volume; let volume = config.volume;
let task_config = SpircTaskConfig { let task_config = SpircTaskConfig {
@ -301,8 +303,8 @@ impl Spirc {
subscription: subscription, subscription: subscription,
sender: sender, sender: sender,
commands: cmd_rx, commands: Some(cmd_rx),
player_events: player_events, player_events: Some(player_events),
shutdown: false, shutdown: false,
session: session, session: session,
@ -322,34 +324,36 @@ impl Spirc {
} }
pub fn play(&self) { pub fn play(&self) {
let _ = self.commands.unbounded_send(SpircCommand::Play); let _ = self.commands.send(SpircCommand::Play);
} }
pub fn play_pause(&self) { pub fn play_pause(&self) {
let _ = self.commands.unbounded_send(SpircCommand::PlayPause); let _ = self.commands.send(SpircCommand::PlayPause);
} }
pub fn pause(&self) { pub fn pause(&self) {
let _ = self.commands.unbounded_send(SpircCommand::Pause); let _ = self.commands.send(SpircCommand::Pause);
} }
pub fn prev(&self) { pub fn prev(&self) {
let _ = self.commands.unbounded_send(SpircCommand::Prev); let _ = self.commands.send(SpircCommand::Prev);
} }
pub fn next(&self) { pub fn next(&self) {
let _ = self.commands.unbounded_send(SpircCommand::Next); let _ = self.commands.send(SpircCommand::Next);
} }
pub fn volume_up(&self) { pub fn volume_up(&self) {
let _ = self.commands.unbounded_send(SpircCommand::VolumeUp); let _ = self.commands.send(SpircCommand::VolumeUp);
} }
pub fn volume_down(&self) { pub fn volume_down(&self) {
let _ = self.commands.unbounded_send(SpircCommand::VolumeDown); let _ = self.commands.send(SpircCommand::VolumeDown);
} }
pub fn shutdown(&self) { pub fn shutdown(&self) {
let _ = self.commands.unbounded_send(SpircCommand::Shutdown); let _ = self.commands.send(SpircCommand::Shutdown);
} }
} }
impl SpircTask { impl SpircTask {
async fn run(mut self) { async fn run(mut self) {
while !self.session.is_invalid() && !self.shutdown { while !self.session.is_invalid() && !self.shutdown {
let commands = self.commands.as_mut();
let player_events = self.player_events.as_mut();
tokio::select! { tokio::select! {
frame = self.subscription.next() => match frame { frame = self.subscription.next() => match frame {
Some(frame) => self.handle_frame(frame), Some(frame) => self.handle_frame(frame),
@ -358,10 +362,10 @@ impl SpircTask {
break; break;
} }
}, },
cmd = self.commands.next(), if !self.commands.is_terminated() => if let Some(cmd) = cmd { cmd = async { commands.unwrap().recv().await }, if commands.is_some() => if let Some(cmd) = cmd {
self.handle_command(cmd); self.handle_command(cmd);
}, },
event = self.player_events.next(), if !self.player_events.is_terminated() => if let Some(event) = event { event = async { player_events.unwrap().recv().await }, if player_events.is_some() => if let Some(event) = event {
self.handle_player_event(event) self.handle_player_event(event)
}, },
result = self.sender.flush(), if !self.sender.is_flushed() => if result.is_err() { result = self.sender.flush(), if !self.sender.is_flushed() => if result.is_err() {
@ -508,7 +512,7 @@ impl SpircTask {
SpircCommand::Shutdown => { SpircCommand::Shutdown => {
CommandSender::new(self, MessageType::kMessageTypeGoodbye).send(); CommandSender::new(self, MessageType::kMessageTypeGoodbye).send();
self.shutdown = true; self.shutdown = true;
self.commands.close(); self.commands.as_mut().map(|rx| rx.close());
} }
} }
} }
@ -790,7 +794,7 @@ impl SpircTask {
self.handle_play() self.handle_play()
} }
SpircPlayStatus::Playing { .. } | SpircPlayStatus::LoadingPlay { .. } => { SpircPlayStatus::Playing { .. } | SpircPlayStatus::LoadingPlay { .. } => {
self.handle_play() self.handle_pause()
} }
_ => (), _ => (),
} }

View file

@ -1,6 +1,6 @@
[package] [package]
name = "librespot-core" name = "librespot-core"
version = "0.1.3" version = "0.1.6"
authors = ["Paul Lietar <paul@lietar.net>"] authors = ["Paul Lietar <paul@lietar.net>"]
build = "build.rs" build = "build.rs"
description = "The core functionality provided by librespot" description = "The core functionality provided by librespot"
@ -10,42 +10,45 @@ edition = "2018"
[dependencies.librespot-protocol] [dependencies.librespot-protocol]
path = "../protocol" path = "../protocol"
version = "0.1.3" version = "0.1.6"
[dependencies] [dependencies]
aes = "0.6" aes = "0.6"
base64 = "0.13" base64 = "0.13"
byteorder = "1.4" byteorder = "1.4"
bytes = "1.0" bytes = "1.0"
error-chain = { version = "0.12", default-features = false } cfg-if = "1"
futures = { version = "0.3", features = ["bilock", "unstable"] } futures-core = { version = "0.3", default-features = false }
hmac = "0.7" futures-util = { version = "0.3", default-features = false, features = ["alloc", "bilock", "unstable", "sink"] }
hmac = "0.10"
httparse = "1.3" httparse = "1.3"
hyper = { version = "0.14", features = ["client", "tcp", "http1", "http2"] } hyper = { version = "0.14", optional = true, features = ["client", "tcp", "http1"] }
log = "0.4" log = "0.4"
num-bigint = "0.3" num-bigint = "0.3"
num-integer = "0.1" num-integer = "0.1"
num-traits = "0.2" num-traits = "0.2"
once_cell = "1.5.2" once_cell = "1.5.2"
pbkdf2 = "0.3" pbkdf2 = { version = "0.7", default-features = false, features = ["hmac"] }
pin-project-lite = "0.2.4"
protobuf = "~2.14.0" protobuf = "~2.14.0"
rand = "0.7" rand = "0.8"
serde = "1.0" serde = { version = "1.0", features = ["derive"] }
serde_derive = "1.0"
serde_json = "1.0" serde_json = "1.0"
sha-1 = "~0.8" sha-1 = "0.9"
shannon = "0.2.0" shannon = "0.2.0"
tokio = { version = "1.0", features = ["io-util", "rt-multi-thread"] } thiserror = "1"
tokio = { version = "1.0", features = ["io-util", "net", "rt", "sync"] }
tokio-stream = "0.1"
tokio-util = { version = "0.6", features = ["codec"] } tokio-util = { version = "0.6", features = ["codec"] }
tower-service = "0.3" url = "2.1"
url = "1.7"
uuid = { version = "0.8", features = ["v4"] }
[build-dependencies] [build-dependencies]
rand = "0.7" rand = "0.8"
vergen = "3.0.4" vergen = "3.0.4"
[dev-dependencies] [dev-dependencies]
env_logger = "*" env_logger = "*"
tokio = {version = "1.0", features = ["macros"] } tokio = {version = "1.0", features = ["macros"] }
[features]
apresolve = ["hyper"]
apresolve-http2 = ["apresolve", "hyper/http2"]

View file

@ -1,6 +1,3 @@
extern crate rand;
extern crate vergen;
use rand::distributions::Alphanumeric; use rand::distributions::Alphanumeric;
use rand::Rng; use rand::Rng;
use vergen::{generate_cargo_keys, ConstantsFlags}; use vergen::{generate_cargo_keys, ConstantsFlags};
@ -10,10 +7,10 @@ fn main() {
flags.toggle(ConstantsFlags::REBUILD_ON_HEAD_CHANGE); flags.toggle(ConstantsFlags::REBUILD_ON_HEAD_CHANGE);
generate_cargo_keys(ConstantsFlags::all()).expect("Unable to generate the cargo keys!"); generate_cargo_keys(ConstantsFlags::all()).expect("Unable to generate the cargo keys!");
let mut rng = rand::thread_rng(); let build_id: String = rand::thread_rng()
let build_id: String = ::std::iter::repeat(()) .sample_iter(Alphanumeric)
.map(|()| rng.sample(Alphanumeric))
.take(8) .take(8)
.map(char::from)
.collect(); .collect();
println!("cargo:rustc-env=VERGEN_BUILD_ID={}", build_id); println!("cargo:rustc-env=VERGEN_BUILD_ID={}", build_id);
} }

View file

@ -1,61 +1,73 @@
const AP_FALLBACK: &'static str = "ap.spotify.com:443"; const AP_FALLBACK: &'static str = "ap.spotify.com:443";
const APRESOLVE_ENDPOINT: &'static str = "http://apresolve.spotify.com:80";
use hyper::{Body, Client, Method, Request, Uri};
use std::error::Error;
use url::Url; use url::Url;
use crate::proxytunnel::ProxyTunnel; cfg_if! {
if #[cfg(feature = "apresolve")] {
const APRESOLVE_ENDPOINT: &'static str = "http://apresolve.spotify.com:80";
#[derive(Clone, Debug, Serialize, Deserialize)] use std::error::Error;
pub struct APResolveData {
ap_list: Vec<String>,
}
async fn apresolve(proxy: &Option<Url>, ap_port: &Option<u16>) -> Result<String, Box<dyn Error>> { use hyper::{Body, Client, Method, Request, Uri};
let port = ap_port.unwrap_or(443); use serde::{Serialize, Deserialize};
let req = Request::builder() use crate::proxytunnel::ProxyTunnel;
.method(Method::GET)
.uri(
APRESOLVE_ENDPOINT
.parse::<Uri>()
.expect("invalid AP resolve URL"),
)
.body(Body::empty())?;
let response = if let Some(url) = proxy { #[derive(Clone, Debug, Serialize, Deserialize)]
Client::builder() pub struct APResolveData {
.build(ProxyTunnel::new(url)?) ap_list: Vec<String>,
.request(req) }
.await?
} else {
Client::new().request(req).await?
};
let body = hyper::body::to_bytes(response.into_body()).await?; async fn apresolve(proxy: &Option<Url>, ap_port: &Option<u16>) -> Result<String, Box<dyn Error>> {
let data: APResolveData = serde_json::from_slice(body.as_ref())?; let port = ap_port.unwrap_or(443);
let ap = if ap_port.is_some() || proxy.is_some() { let req = Request::builder()
data.ap_list.into_iter().find_map(|ap| { .method(Method::GET)
if ap.parse::<Uri>().ok()?.port()? == port { .uri(
Some(ap) APRESOLVE_ENDPOINT
.parse::<Uri>()
.expect("invalid AP resolve URL"),
)
.body(Body::empty())?;
let response = if let Some(url) = proxy {
Client::builder()
.build(ProxyTunnel::new(&url.socket_addrs(|| None)?[..])?)
.request(req)
.await?
} else { } else {
None Client::new().request(req).await?
};
let body = hyper::body::to_bytes(response.into_body()).await?;
let data: APResolveData = serde_json::from_slice(body.as_ref())?;
let ap = if ap_port.is_some() || proxy.is_some() {
data.ap_list.into_iter().find_map(|ap| {
if ap.parse::<Uri>().ok()?.port()? == port {
Some(ap)
} else {
None
}
})
} else {
data.ap_list.into_iter().next()
} }
}) .ok_or("empty AP List")?;
Ok(ap)
}
pub async fn apresolve_or_fallback(proxy: &Option<Url>, ap_port: &Option<u16>) -> String {
apresolve(proxy, ap_port).await.unwrap_or_else(|e| {
warn!("Failed to resolve Access Point: {}", e);
warn!("Using fallback \"{}\"", AP_FALLBACK);
AP_FALLBACK.into()
})
}
} else { } else {
data.ap_list.into_iter().next() pub async fn apresolve_or_fallback(_: &Option<Url>, _: &Option<u16>) -> String {
AP_FALLBACK.to_string()
}
} }
.ok_or("empty AP List")?;
Ok(ap)
}
pub async fn apresolve_or_fallback(proxy: &Option<Url>, ap_port: &Option<u16>) -> String {
apresolve(proxy, ap_port).await.unwrap_or_else(|e| {
warn!("Failed to resolve Access Point: {}", e);
warn!("Using fallback \"{}\"", AP_FALLBACK);
AP_FALLBACK.into()
})
} }

View file

@ -1,8 +1,8 @@
use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use bytes::Bytes; use bytes::Bytes;
use futures::channel::oneshot;
use std::collections::HashMap; use std::collections::HashMap;
use std::io::Write; use std::io::Write;
use tokio::sync::oneshot;
use crate::spotify_id::{FileId, SpotifyId}; use crate::spotify_id::{FileId, SpotifyId};
use crate::util::SeqGenerator; use crate::util::SeqGenerator;

View file

@ -1,14 +1,14 @@
use std::io::{self, Read};
use aes::Aes192; use aes::Aes192;
use aes::NewBlockCipher;
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use hmac::Hmac; use hmac::Hmac;
use pbkdf2::pbkdf2; use pbkdf2::pbkdf2;
use protobuf::ProtobufEnum; use protobuf::ProtobufEnum;
use serde::{Deserialize, Serialize};
use sha1::{Digest, Sha1}; use sha1::{Digest, Sha1};
use std::io::{self, Read};
use crate::protocol::authentication::AuthenticationType; use crate::protocol::authentication::AuthenticationType;
use crate::protocol::keyexchange::{APLoginFailed, ErrorCode};
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Credentials { pub struct Credentials {
@ -74,7 +74,7 @@ impl Credentials {
let blob = { let blob = {
use aes::cipher::generic_array::typenum::Unsigned; use aes::cipher::generic_array::typenum::Unsigned;
use aes::cipher::generic_array::GenericArray; use aes::cipher::generic_array::GenericArray;
use aes::cipher::BlockCipher; use aes::cipher::{BlockCipher, NewBlockCipher};
let mut data = base64::decode(encrypted_blob).unwrap(); let mut data = base64::decode(encrypted_blob).unwrap();
let cipher = Aes192::new(GenericArray::from_slice(&key)); let cipher = Aes192::new(GenericArray::from_slice(&key));
@ -142,61 +142,3 @@ where
let v: String = serde::Deserialize::deserialize(de)?; let v: String = serde::Deserialize::deserialize(de)?;
base64::decode(&v).map_err(|e| serde::de::Error::custom(e.to_string())) base64::decode(&v).map_err(|e| serde::de::Error::custom(e.to_string()))
} }
pub fn get_credentials<F: FnOnce(&String) -> String>(
username: Option<String>,
password: Option<String>,
cached_credentials: Option<Credentials>,
prompt: F,
) -> Option<Credentials> {
match (username, password, cached_credentials) {
(Some(username), Some(password), _) => Some(Credentials::with_password(username, password)),
(Some(ref username), _, Some(ref credentials)) if *username == credentials.username => {
Some(credentials.clone())
}
(Some(username), None, _) => Some(Credentials::with_password(
username.clone(),
prompt(&username),
)),
(None, _, Some(credentials)) => Some(credentials),
(None, _, None) => None,
}
}
error_chain! {
types {
AuthenticationError, AuthenticationErrorKind, AuthenticationResultExt, AuthenticationResult;
}
foreign_links {
Io(::std::io::Error);
}
errors {
BadCredentials {
description("Bad credentials")
display("Authentication failed with error: Bad credentials")
}
PremiumAccountRequired {
description("Premium account required")
display("Authentication failed with error: Premium account required")
}
}
}
impl From<APLoginFailed> for AuthenticationError {
fn from(login_failure: APLoginFailed) -> Self {
let error_code = login_failure.get_error_code();
match error_code {
ErrorCode::BadCredentials => Self::from_kind(AuthenticationErrorKind::BadCredentials),
ErrorCode::PremiumAccountRequired => {
Self::from_kind(AuthenticationErrorKind::PremiumAccountRequired)
}
_ => format!("Authentication failed with error: {:?}", error_code).into(),
}
}
}

View file

@ -1,12 +1,14 @@
use std::collections::HashMap;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Instant;
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes; use bytes::Bytes;
use futures::{channel::mpsc, lock::BiLock, Stream, StreamExt}; use futures_core::Stream;
use std::{ use futures_util::lock::BiLock;
collections::HashMap, use futures_util::StreamExt;
pin::Pin, use tokio::sync::mpsc;
task::{Context, Poll},
time::Instant,
};
use crate::util::SeqGenerator; use crate::util::SeqGenerator;
@ -46,7 +48,7 @@ enum ChannelState {
impl ChannelManager { impl ChannelManager {
pub fn allocate(&self) -> (u16, Channel) { pub fn allocate(&self) -> (u16, Channel) {
let (tx, rx) = mpsc::unbounded(); let (tx, rx) = mpsc::unbounded_channel();
let seq = self.lock(|inner| { let seq = self.lock(|inner| {
let seq = inner.sequence.get(); let seq = inner.sequence.get();
@ -85,7 +87,7 @@ impl ChannelManager {
inner.download_measurement_bytes += data.len(); inner.download_measurement_bytes += data.len();
if let Entry::Occupied(entry) = inner.channels.entry(id) { if let Entry::Occupied(entry) = inner.channels.entry(id) {
let _ = entry.get().unbounded_send((cmd, data)); let _ = entry.get().send((cmd, data));
} }
}); });
} }
@ -105,7 +107,7 @@ impl ChannelManager {
impl Channel { impl Channel {
fn recv_packet(&mut self, cx: &mut Context<'_>) -> Poll<Result<Bytes, ChannelError>> { fn recv_packet(&mut self, cx: &mut Context<'_>) -> Poll<Result<Bytes, ChannelError>> {
let (cmd, packet) = match self.receiver.poll_next_unpin(cx) { let (cmd, packet) = match self.receiver.poll_recv(cx) {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(o) => o.ok_or(ChannelError)?, Poll::Ready(o) => o.ok_or(ChannelError)?,
}; };

View file

@ -1,9 +1,6 @@
use std::fmt; use std::fmt;
use std::str::FromStr; use std::str::FromStr;
use url::Url; use url::Url;
use uuid::Uuid;
use crate::version;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct SessionConfig { pub struct SessionConfig {
@ -13,18 +10,6 @@ pub struct SessionConfig {
pub ap_port: Option<u16>, pub ap_port: Option<u16>,
} }
impl Default for SessionConfig {
fn default() -> SessionConfig {
let device_id = Uuid::new_v4().to_hyphenated().to_string();
SessionConfig {
user_agent: version::version_string(),
device_id: device_id,
proxy: None,
ap_port: None,
}
}
}
#[derive(Clone, Copy, Debug, Hash, PartialOrd, Ord, PartialEq, Eq)] #[derive(Clone, Copy, Debug, Hash, PartialOrd, Ord, PartialEq, Eq)]
pub enum DeviceType { pub enum DeviceType {
Unknown = 0, Unknown = 0,

View file

@ -1,5 +1,5 @@
use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use hmac::{Hmac, Mac}; use hmac::{Hmac, Mac, NewMac};
use protobuf::{self, Message}; use protobuf::{self, Message};
use rand::thread_rng; use rand::thread_rng;
use sha1::Sha1; use sha1::Sha1;
@ -122,16 +122,16 @@ fn compute_keys(shared_secret: &[u8], packets: &[u8]) -> (Vec<u8>, Vec<u8>, Vec<
let mut data = Vec::with_capacity(0x64); let mut data = Vec::with_capacity(0x64);
for i in 1..6 { for i in 1..6 {
let mut mac = HmacSha1::new_varkey(&shared_secret).expect("HMAC can take key of any size"); let mut mac = HmacSha1::new_varkey(&shared_secret).expect("HMAC can take key of any size");
mac.input(packets); mac.update(packets);
mac.input(&[i]); mac.update(&[i]);
data.extend_from_slice(&mac.result().code()); data.extend_from_slice(&mac.finalize().into_bytes());
} }
let mut mac = HmacSha1::new_varkey(&data[..0x14]).expect("HMAC can take key of any size"); let mut mac = HmacSha1::new_varkey(&data[..0x14]).expect("HMAC can take key of any size");
mac.input(packets); mac.update(packets);
( (
mac.result().code().to_vec(), mac.finalize().into_bytes().to_vec(),
data[0x14..0x34].to_vec(), data[0x14..0x34].to_vec(),
data[0x34..0x54].to_vec(), data[0x34..0x54].to_vec(),
) )

View file

@ -4,21 +4,60 @@ mod handshake;
pub use self::codec::APCodec; pub use self::codec::APCodec;
pub use self::handshake::handshake; pub use self::handshake::handshake;
use futures::{SinkExt, StreamExt}; use std::io::{self, ErrorKind};
use protobuf::{self, Message};
use std::io;
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use futures_util::{SinkExt, StreamExt};
use protobuf::{self, Message, ProtobufError};
use thiserror::Error;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio_util::codec::Framed; use tokio_util::codec::Framed;
use url::Url; use url::Url;
use crate::authentication::{AuthenticationError, Credentials}; use crate::authentication::Credentials;
use crate::protocol::keyexchange::{APLoginFailed, ErrorCode};
use crate::proxytunnel;
use crate::version; use crate::version;
use crate::proxytunnel;
pub type Transport = Framed<TcpStream, APCodec>; pub type Transport = Framed<TcpStream, APCodec>;
fn login_error_message(code: &ErrorCode) -> &'static str {
pub use ErrorCode::*;
match code {
ProtocolError => "Protocol error",
TryAnotherAP => "Try another AP",
BadConnectionId => "Bad connection id",
TravelRestriction => "Travel restriction",
PremiumAccountRequired => "Premium account required",
BadCredentials => "Bad credentials",
CouldNotValidateCredentials => "Could not validate credentials",
AccountExists => "Account exists",
ExtraVerificationRequired => "Extra verification required",
InvalidAppKey => "Invalid app key",
ApplicationBanned => "Application banned",
}
}
#[derive(Debug, Error)]
pub enum AuthenticationError {
#[error("Login failed with reason: {}", login_error_message(.0))]
LoginFailed(ErrorCode),
#[error("Authentication failed: {0}")]
IoError(#[from] io::Error),
}
impl From<ProtobufError> for AuthenticationError {
fn from(e: ProtobufError) -> Self {
io::Error::new(ErrorKind::InvalidData, e).into()
}
}
impl From<APLoginFailed> for AuthenticationError {
fn from(login_failure: APLoginFailed) -> Self {
Self::LoginFailed(login_failure.get_error_code())
}
}
pub async fn connect(addr: String, proxy: &Option<Url>) -> io::Result<Transport> { pub async fn connect(addr: String, proxy: &Option<Url>) -> io::Result<Transport> {
let socket = if let Some(proxy) = proxy { let socket = if let Some(proxy) = proxy {
info!("Using proxy \"{}\"", proxy); info!("Using proxy \"{}\"", proxy);
@ -37,8 +76,8 @@ pub async fn connect(addr: String, proxy: &Option<Url>) -> io::Result<Transport>
.next() .next()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Missing port"))?; .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Missing port"))?;
let socket_addr = proxy.to_socket_addrs().and_then(|mut iter| { let socket_addr = proxy.socket_addrs(|| None).and_then(|addrs| {
iter.next().ok_or_else(|| { addrs.into_iter().next().ok_or_else(|| {
io::Error::new( io::Error::new(
io::ErrorKind::NotFound, io::ErrorKind::NotFound,
"Can't resolve proxy server address", "Can't resolve proxy server address",
@ -66,7 +105,6 @@ pub async fn authenticate(
device_id: &str, device_id: &str,
) -> Result<Credentials, AuthenticationError> { ) -> Result<Credentials, AuthenticationError> {
use crate::protocol::authentication::{APWelcome, ClientResponseEncrypted, CpuFamily, Os}; use crate::protocol::authentication::{APWelcome, ClientResponseEncrypted, CpuFamily, Os};
use crate::protocol::keyexchange::APLoginFailed;
let mut packet = ClientResponseEncrypted::new(); let mut packet = ClientResponseEncrypted::new();
packet packet
@ -101,7 +139,7 @@ pub async fn authenticate(
let (cmd, data) = transport.next().await.expect("EOF")?; let (cmd, data) = transport.next().await.expect("EOF")?;
match cmd { match cmd {
0xac => { 0xac => {
let welcome_data: APWelcome = protobuf::parse_from_bytes(data.as_ref()).unwrap(); let welcome_data: APWelcome = protobuf::parse_from_bytes(data.as_ref())?;
let reusable_credentials = Credentials { let reusable_credentials = Credentials {
username: welcome_data.get_canonical_username().to_owned(), username: welcome_data.get_canonical_username().to_owned(),
@ -111,12 +149,13 @@ pub async fn authenticate(
Ok(reusable_credentials) Ok(reusable_credentials)
} }
0xad => { 0xad => {
let error_data: APLoginFailed = protobuf::parse_from_bytes(data.as_ref()).unwrap(); let error_data: APLoginFailed = protobuf::parse_from_bytes(data.as_ref())?;
Err(error_data.into()) Err(error_data.into())
} }
_ => {
_ => panic!("Unexpected packet {:?}", cmd), let msg = format!("Received invalid packet: {}", cmd);
Err(io::Error::new(ErrorKind::InvalidData, msg).into())
}
} }
} }

View file

@ -1,3 +1,5 @@
use serde::Deserialize;
use crate::{mercury::MercuryError, session::Session}; use crate::{mercury::MercuryError, session::Session};
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]

View file

@ -3,48 +3,20 @@
#[macro_use] #[macro_use]
extern crate log; extern crate log;
#[macro_use] #[macro_use]
extern crate serde_derive; extern crate cfg_if;
#[macro_use]
extern crate pin_project_lite;
#[macro_use]
extern crate error_chain;
extern crate aes;
extern crate base64;
extern crate byteorder;
extern crate bytes;
extern crate futures;
extern crate hmac;
extern crate httparse;
extern crate hyper;
extern crate num_bigint;
extern crate num_integer;
extern crate num_traits;
extern crate once_cell;
extern crate pbkdf2;
extern crate protobuf;
extern crate rand;
extern crate serde;
extern crate serde_json;
extern crate sha1;
extern crate shannon;
pub extern crate tokio;
extern crate tokio_util;
extern crate tower_service;
extern crate url;
extern crate uuid;
extern crate librespot_protocol as protocol; use librespot_protocol as protocol;
#[macro_use] #[macro_use]
mod component; mod component;
pub mod apresolve; mod apresolve;
pub mod audio_key; pub mod audio_key;
pub mod authentication; pub mod authentication;
pub mod cache; pub mod cache;
pub mod channel; pub mod channel;
pub mod config; pub mod config;
pub mod connection; mod connection;
pub mod diffie_hellman; pub mod diffie_hellman;
pub mod keymaster; pub mod keymaster;
pub mod mercury; pub mod mercury;

View file

@ -1,14 +1,17 @@
use std::collections::HashMap;
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes;
use tokio::sync::{mpsc, oneshot};
use crate::protocol; use crate::protocol;
use crate::util::url_encode; use crate::util::url_encode;
use crate::util::SeqGenerator; use crate::util::SeqGenerator;
use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes;
use futures::{
channel::{mpsc, oneshot},
Future,
};
use std::{collections::HashMap, task::Poll};
use std::{mem, pin::Pin, task::Context};
mod types; mod types;
pub use self::types::*; pub use self::types::*;
@ -31,18 +34,15 @@ pub struct MercuryPending {
callback: Option<oneshot::Sender<Result<MercuryResponse, MercuryError>>>, callback: Option<oneshot::Sender<Result<MercuryResponse, MercuryError>>>,
} }
pin_project! { pub struct MercuryFuture<T> {
pub struct MercuryFuture<T> { receiver: oneshot::Receiver<Result<T, MercuryError>>,
#[pin]
receiver: oneshot::Receiver<Result<T, MercuryError>>
}
} }
impl<T> Future for MercuryFuture<T> { impl<T> Future for MercuryFuture<T> {
type Output = Result<T, MercuryError>; type Output = Result<T, MercuryError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project().receiver.poll(cx) { match Pin::new(&mut self.receiver).poll(cx) {
Poll::Ready(Ok(x)) => Poll::Ready(x), Poll::Ready(Ok(x)) => Poll::Ready(x),
Poll::Ready(Err(_)) => Poll::Ready(Err(MercuryError)), Poll::Ready(Err(_)) => Poll::Ready(Err(MercuryError)),
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
@ -119,7 +119,7 @@ impl MercuryManager {
async move { async move {
let response = request.await?; let response = request.await?;
let (tx, rx) = mpsc::unbounded(); let (tx, rx) = mpsc::unbounded_channel();
manager.lock(move |inner| { manager.lock(move |inner| {
if !inner.invalid { if !inner.invalid {
@ -221,7 +221,7 @@ impl MercuryManager {
// if send fails, remove from list of subs // if send fails, remove from list of subs
// TODO: send unsub message // TODO: send unsub message
sub.unbounded_send(response.clone()).is_ok() sub.send(response.clone()).is_ok()
} else { } else {
// URI doesn't match // URI doesn't match
true true

View file

@ -1,16 +1,6 @@
use futures::Future; use std::io;
use hyper::Uri;
use std::{ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
io,
net::{SocketAddr, ToSocketAddrs},
pin::Pin,
task::Poll,
};
use tokio::{
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
net::TcpStream,
};
use tower_service::Service;
pub async fn connect<T: AsyncRead + AsyncWrite + Unpin>( pub async fn connect<T: AsyncRead + AsyncWrite + Unpin>(
mut proxy_connection: T, mut proxy_connection: T,
@ -64,43 +54,56 @@ pub async fn connect<T: AsyncRead + AsyncWrite + Unpin>(
} }
} }
#[derive(Clone)] cfg_if! {
pub struct ProxyTunnel { if #[cfg(feature = "apresolve")] {
proxy_addr: SocketAddr, use std::future::Future;
} use std::net::{SocketAddr, ToSocketAddrs};
use std::pin::Pin;
use std::task::Poll;
impl ProxyTunnel { use hyper::service::Service;
pub fn new<T: ToSocketAddrs>(addr: T) -> io::Result<Self> { use hyper::Uri;
let addr = addr.to_socket_addrs()?.next().ok_or_else(|| { use tokio::net::TcpStream;
io::Error::new(io::ErrorKind::InvalidInput, "No socket address given")
})?; #[derive(Clone)]
Ok(Self { proxy_addr: addr }) pub struct ProxyTunnel {
} proxy_addr: SocketAddr,
} }
impl Service<Uri> for ProxyTunnel { impl ProxyTunnel {
type Response = TcpStream; pub fn new<T: ToSocketAddrs>(addr: T) -> io::Result<Self> {
type Error = io::Error; let addr = addr.to_socket_addrs()?.next().ok_or_else(|| {
type Future = Pin<Box<dyn Future<Output = io::Result<TcpStream>> + Send>>; io::Error::new(io::ErrorKind::InvalidInput, "No socket address given")
})?;
fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<io::Result<()>> { Ok(Self { proxy_addr: addr })
Poll::Ready(Ok(())) }
} }
fn call(&mut self, url: Uri) -> Self::Future { impl Service<Uri> for ProxyTunnel {
let proxy_addr = self.proxy_addr; type Response = TcpStream;
let fut = async move { type Error = io::Error;
let host = url type Future = Pin<Box<dyn Future<Output = io::Result<TcpStream>> + Send>>;
.host()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Host is missing"))?; fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<io::Result<()>> {
let port = url Poll::Ready(Ok(()))
.port() }
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Port is missing"))?;
fn call(&mut self, url: Uri) -> Self::Future {
let conn = TcpStream::connect(proxy_addr).await?; let proxy_addr = self.proxy_addr;
connect(conn, host, port.as_u16()).await let fut = async move {
}; let host = url
.host()
Box::pin(fut) .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Host is missing"))?;
let port = url
.port()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Port is missing"))?;
let conn = TcpStream::connect(proxy_addr).await?;
connect(conn, host, port.as_u16()).await
};
Box::pin(fut)
}
}
} }
} }

View file

@ -1,14 +1,20 @@
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock, Weak}; use std::sync::{Arc, RwLock, Weak};
use std::task::Context;
use std::task::Poll; use std::task::Poll;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use std::{io, pin::Pin, task::Context};
use once_cell::sync::OnceCell;
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes; use bytes::Bytes;
use futures::{channel::mpsc, Future, FutureExt, StreamExt, TryStream, TryStreamExt}; use futures_core::TryStream;
use futures_util::{FutureExt, StreamExt, TryStreamExt};
use once_cell::sync::OnceCell;
use thiserror::Error;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::apresolve::apresolve_or_fallback; use crate::apresolve::apresolve_or_fallback;
use crate::audio_key::AudioKeyManager; use crate::audio_key::AudioKeyManager;
@ -16,10 +22,16 @@ use crate::authentication::Credentials;
use crate::cache::Cache; use crate::cache::Cache;
use crate::channel::ChannelManager; use crate::channel::ChannelManager;
use crate::config::SessionConfig; use crate::config::SessionConfig;
use crate::connection; use crate::connection::{self, AuthenticationError};
use crate::mercury::MercuryManager; use crate::mercury::MercuryManager;
pub use crate::authentication::{AuthenticationError, AuthenticationErrorKind}; #[derive(Debug, Error)]
pub enum SessionError {
#[error(transparent)]
AuthenticationError(#[from] AuthenticationError),
#[error("Cannot create session: {0}")]
IoError(#[from] io::Error),
}
struct SessionData { struct SessionData {
country: String, country: String,
@ -54,7 +66,7 @@ impl Session {
config: SessionConfig, config: SessionConfig,
credentials: Credentials, credentials: Credentials,
cache: Option<Cache>, cache: Option<Cache>,
) -> Result<Session, AuthenticationError> { ) -> Result<Session, SessionError> {
let ap = apresolve_or_fallback(&config.proxy, &config.ap_port).await; let ap = apresolve_or_fallback(&config.proxy, &config.ap_port).await;
info!("Connecting to AP \"{}\"", ap); info!("Connecting to AP \"{}\"", ap);
@ -87,7 +99,7 @@ impl Session {
) -> Session { ) -> Session {
let (sink, stream) = transport.split(); let (sink, stream) = transport.split();
let (sender_tx, sender_rx) = mpsc::unbounded(); let (sender_tx, sender_rx) = mpsc::unbounded_channel();
let session_id = SESSION_COUNTER.fetch_add(1, Ordering::Relaxed); let session_id = SESSION_COUNTER.fetch_add(1, Ordering::Relaxed);
debug!("new Session[{}]", session_id); debug!("new Session[{}]", session_id);
@ -114,11 +126,13 @@ impl Session {
session_id: session_id, session_id: session_id,
})); }));
let sender_task = sender_rx.map(Ok::<_, io::Error>).forward(sink); let sender_task = UnboundedReceiverStream::new(sender_rx)
.map(Ok)
.forward(sink);
let receiver_task = DispatchTask(stream, session.weak()); let receiver_task = DispatchTask(stream, session.weak());
let task = let task =
futures::future::join(sender_task, receiver_task).map(|_| io::Result::<_>::Ok(())); futures_util::future::join(sender_task, receiver_task).map(|_| io::Result::<_>::Ok(()));
tokio::spawn(task); tokio::spawn(task);
session session
} }
@ -193,7 +207,7 @@ impl Session {
} }
pub fn send_packet(&self, cmd: u8, data: Vec<u8>) { pub fn send_packet(&self, cmd: u8, data: Vec<u8>) {
self.0.tx_connection.unbounded_send((cmd, data)).unwrap(); self.0.tx_connection.send((cmd, data)).unwrap();
} }
pub fn cache(&self) -> Option<&Arc<Cache>> { pub fn cache(&self) -> Option<&Arc<Cache>> {

View file

@ -1,6 +1,6 @@
[package] [package]
name = "librespot-metadata" name = "librespot-metadata"
version = "0.1.3" version = "0.1.6"
authors = ["Paul Lietar <paul@lietar.net>"] authors = ["Paul Lietar <paul@lietar.net>"]
description = "The metadata logic for librespot" description = "The metadata logic for librespot"
license = "MIT" license = "MIT"
@ -10,14 +10,12 @@ edition = "2018"
[dependencies] [dependencies]
async-trait = "0.1" async-trait = "0.1"
byteorder = "1.3" byteorder = "1.3"
futures = "0.3"
linear-map = "1.2"
protobuf = "~2.14.0" protobuf = "~2.14.0"
log = "0.4" log = "0.4"
[dependencies.librespot-core] [dependencies.librespot-core]
path = "../core" path = "../core"
version = "0.1.3" version = "0.1.6"
[dependencies.librespot-protocol] [dependencies.librespot-protocol]
path = "../protocol" path = "../protocol"
version = "0.1.3" version = "0.1.6"

View file

@ -1,26 +1,20 @@
#![allow(clippy::unused_io_amount)] #![allow(clippy::unused_io_amount)]
#![allow(clippy::redundant_field_names)] #![allow(clippy::redundant_field_names)]
#[macro_use] #[macro_use]
extern crate log; extern crate log;
#[macro_use] #[macro_use]
extern crate async_trait; extern crate async_trait;
extern crate byteorder;
extern crate futures;
extern crate linear_map;
extern crate protobuf;
extern crate librespot_core;
extern crate librespot_protocol as protocol;
pub mod cover; pub mod cover;
use linear_map::LinearMap; use std::collections::HashMap;
use librespot_core::mercury::MercuryError; use librespot_core::mercury::MercuryError;
use librespot_core::session::Session; use librespot_core::session::Session;
use librespot_core::spotify_id::{FileId, SpotifyAudioType, SpotifyId}; use librespot_core::spotify_id::{FileId, SpotifyAudioType, SpotifyId};
use librespot_protocol as protocol;
pub use crate::protocol::metadata::AudioFile_Format as FileFormat; pub use crate::protocol::metadata::AudioFile_Format as FileFormat;
@ -64,7 +58,7 @@ where
pub struct AudioItem { pub struct AudioItem {
pub id: SpotifyId, pub id: SpotifyId,
pub uri: String, pub uri: String,
pub files: LinearMap<FileFormat, FileId>, pub files: HashMap<FileFormat, FileId>,
pub name: String, pub name: String,
pub duration: i32, pub duration: i32,
pub available: bool, pub available: bool,
@ -143,7 +137,7 @@ pub struct Track {
pub duration: i32, pub duration: i32,
pub album: SpotifyId, pub album: SpotifyId,
pub artists: Vec<SpotifyId>, pub artists: Vec<SpotifyId>,
pub files: LinearMap<FileFormat, FileId>, pub files: HashMap<FileFormat, FileId>,
pub alternatives: Vec<SpotifyId>, pub alternatives: Vec<SpotifyId>,
pub available: bool, pub available: bool,
} }
@ -165,7 +159,7 @@ pub struct Episode {
pub duration: i32, pub duration: i32,
pub language: String, pub language: String,
pub show: SpotifyId, pub show: SpotifyId,
pub files: LinearMap<FileFormat, FileId>, pub files: HashMap<FileFormat, FileId>,
pub covers: Vec<FileId>, pub covers: Vec<FileId>,
pub available: bool, pub available: bool,
pub explicit: bool, pub explicit: bool,

View file

@ -1,6 +1,6 @@
[package] [package]
name = "librespot-playback" name = "librespot-playback"
version = "0.1.3" version = "0.1.6"
authors = ["Sasha Hilton <sashahilton00@gmail.com>"] authors = ["Sasha Hilton <sashahilton00@gmail.com>"]
description = "The audio playback logic for librespot" description = "The audio playback logic for librespot"
license = "MIT" license = "MIT"
@ -9,19 +9,21 @@ edition = "2018"
[dependencies.librespot-audio] [dependencies.librespot-audio]
path = "../audio" path = "../audio"
version = "0.1.3" version = "0.1.6"
[dependencies.librespot-core] [dependencies.librespot-core]
path = "../core" path = "../core"
version = "0.1.3" version = "0.1.6"
[dependencies.librespot-metadata] [dependencies.librespot-metadata]
path = "../metadata" path = "../metadata"
version = "0.1.3" version = "0.1.6"
[dependencies] [dependencies]
futures = "0.3" futures-executor = "0.3"
futures-util = { version = "0.3", default_features = false, features = ["alloc"] }
log = "0.4" log = "0.4"
byteorder = "1.4" byteorder = "1.4"
shell-words = "1.0.0" shell-words = "1.0.0"
tokio = { version = "1", features = ["sync"] }
alsa = { version = "0.4", optional = true } alsa = { version = "0.4", optional = true }
portaudio-rs = { version = "0.3", optional = true } portaudio-rs = { version = "0.3", optional = true }
@ -46,5 +48,6 @@ portaudio-backend = ["portaudio-rs"]
pulseaudio-backend = ["libpulse-binding", "libpulse-simple-binding"] pulseaudio-backend = ["libpulse-binding", "libpulse-simple-binding"]
jackaudio-backend = ["jack"] jackaudio-backend = ["jack"]
rodio-backend = ["rodio", "cpal", "thiserror"] rodio-backend = ["rodio", "cpal", "thiserror"]
rodiojack-backend = ["rodio", "cpal/jack", "thiserror"]
sdl-backend = ["sdl2"] sdl-backend = ["sdl2"]
gstreamer-backend = ["gstreamer", "gstreamer-app", "glib", "zerocopy"] gstreamer-backend = ["gstreamer", "gstreamer-app", "glib", "zerocopy"]

View file

@ -1,4 +1,5 @@
use super::{Open, Sink}; use super::{Open, Sink};
use crate::audio::AudioPacket;
use alsa::device_name::HintIter; use alsa::device_name::HintIter;
use alsa::pcm::{Access, Format, Frames, HwParams, PCM}; use alsa::pcm::{Access, Format, Frames, HwParams, PCM};
use alsa::{Direction, Error, ValueOr}; use alsa::{Direction, Error, ValueOr};
@ -124,8 +125,9 @@ impl Sink for AlsaSink {
Ok(()) Ok(())
} }
fn write(&mut self, data: &[i16]) -> io::Result<()> { fn write(&mut self, packet: &AudioPacket) -> io::Result<()> {
let mut processed_data = 0; let mut processed_data = 0;
let data = packet.samples();
while processed_data < data.len() { while processed_data < data.len() {
let data_to_buffer = min( let data_to_buffer = min(
self.buffer.capacity() - self.buffer.len(), self.buffer.capacity() - self.buffer.len(),

View file

@ -1,4 +1,5 @@
use super::{Open, Sink}; use super::{Open, Sink};
use crate::audio::AudioPacket;
use gst::prelude::*; use gst::prelude::*;
use gst::*; use gst::*;
use std::sync::mpsc::{sync_channel, SyncSender}; use std::sync::mpsc::{sync_channel, SyncSender};
@ -104,9 +105,9 @@ impl Sink for GstreamerSink {
fn stop(&mut self) -> io::Result<()> { fn stop(&mut self) -> io::Result<()> {
Ok(()) Ok(())
} }
fn write(&mut self, data: &[i16]) -> io::Result<()> { fn write(&mut self, packet: &AudioPacket) -> io::Result<()> {
// Copy expensively (in to_vec()) to avoid thread synchronization // Copy expensively (in to_vec()) to avoid thread synchronization
let deighta: &[u8] = data.as_bytes(); let deighta: &[u8] = packet.samples().as_bytes();
self.tx self.tx
.send(deighta.to_vec()) .send(deighta.to_vec())
.expect("tx send failed in write function"); .expect("tx send failed in write function");

View file

@ -1,4 +1,5 @@
use super::{Open, Sink}; use super::{Open, Sink};
use crate::audio::AudioPacket;
use jack::{ use jack::{
AsyncClient, AudioOut, Client, ClientOptions, Control, Port, ProcessHandler, ProcessScope, AsyncClient, AudioOut, Client, ClientOptions, Control, Port, ProcessHandler, ProcessScope,
}; };
@ -73,8 +74,8 @@ impl Sink for JackSink {
Ok(()) Ok(())
} }
fn write(&mut self, data: &[i16]) -> io::Result<()> { fn write(&mut self, packet: &AudioPacket) -> io::Result<()> {
for s in data.iter() { for s in packet.samples().iter() {
let res = self.send.send(*s); let res = self.send.send(*s);
if res.is_err() { if res.is_err() {
error!("jackaudio: cannot write to channel"); error!("jackaudio: cannot write to channel");

View file

@ -1,3 +1,4 @@
use crate::audio::AudioPacket;
use std::io; use std::io;
pub trait Open { pub trait Open {
@ -7,7 +8,7 @@ pub trait Open {
pub trait Sink { pub trait Sink {
fn start(&mut self) -> io::Result<()>; fn start(&mut self) -> io::Result<()>;
fn stop(&mut self) -> io::Result<()>; fn stop(&mut self) -> io::Result<()>;
fn write(&mut self, data: &[i16]) -> io::Result<()>; fn write(&mut self, packet: &AudioPacket) -> io::Result<()>;
} }
pub type SinkBuilder = fn(Option<String>) -> Box<dyn Sink + Send>; pub type SinkBuilder = fn(Option<String>) -> Box<dyn Sink + Send>;
@ -41,10 +42,9 @@ mod gstreamer;
#[cfg(feature = "gstreamer-backend")] #[cfg(feature = "gstreamer-backend")]
use self::gstreamer::GstreamerSink; use self::gstreamer::GstreamerSink;
#[cfg(feature = "rodio-backend")] #[cfg(any(feature = "rodio-backend", feature = "rodiojack-backend"))]
mod rodio; mod rodio;
#[cfg(feature = "rodio-backend")]
use self::rodio::RodioSink;
#[cfg(feature = "sdl-backend")] #[cfg(feature = "sdl-backend")]
mod sdl; mod sdl;
#[cfg(feature = "sdl-backend")] #[cfg(feature = "sdl-backend")]
@ -68,7 +68,9 @@ pub const BACKENDS: &'static [(&'static str, SinkBuilder)] = &[
#[cfg(feature = "gstreamer-backend")] #[cfg(feature = "gstreamer-backend")]
("gstreamer", mk_sink::<GstreamerSink>), ("gstreamer", mk_sink::<GstreamerSink>),
#[cfg(feature = "rodio-backend")] #[cfg(feature = "rodio-backend")]
("rodio", mk_sink::<RodioSink>), ("rodio", rodio::mk_rodio),
#[cfg(feature = "rodiojack-backend")]
("rodiojack", rodio::mk_rodiojack),
#[cfg(feature = "sdl-backend")] #[cfg(feature = "sdl-backend")]
("sdl", mk_sink::<SdlSink>), ("sdl", mk_sink::<SdlSink>),
("pipe", mk_sink::<StdoutSink>), ("pipe", mk_sink::<StdoutSink>),

View file

@ -1,4 +1,5 @@
use super::{Open, Sink}; use super::{Open, Sink};
use crate::audio::AudioPacket;
use std::fs::OpenOptions; use std::fs::OpenOptions;
use std::io::{self, Write}; use std::io::{self, Write};
use std::mem; use std::mem;
@ -26,12 +27,15 @@ impl Sink for StdoutSink {
Ok(()) Ok(())
} }
fn write(&mut self, data: &[i16]) -> io::Result<()> { fn write(&mut self, packet: &AudioPacket) -> io::Result<()> {
let data: &[u8] = unsafe { let data: &[u8] = match packet {
slice::from_raw_parts( AudioPacket::Samples(data) => unsafe {
data.as_ptr() as *const u8, slice::from_raw_parts(
data.len() * mem::size_of::<i16>(), data.as_ptr() as *const u8,
) data.len() * mem::size_of::<i16>(),
)
},
AudioPacket::OggData(data) => data,
}; };
self.0.write_all(data)?; self.0.write_all(data)?;

View file

@ -1,4 +1,5 @@
use super::{Open, Sink}; use super::{Open, Sink};
use crate::audio::AudioPacket;
use portaudio_rs; use portaudio_rs;
use portaudio_rs::device::{get_default_output_index, DeviceIndex, DeviceInfo}; use portaudio_rs::device::{get_default_output_index, DeviceIndex, DeviceInfo};
use portaudio_rs::stream::*; use portaudio_rs::stream::*;
@ -95,8 +96,8 @@ impl<'a> Sink for PortAudioSink<'a> {
self.0 = None; self.0 = None;
Ok(()) Ok(())
} }
fn write(&mut self, data: &[i16]) -> io::Result<()> { fn write(&mut self, packet: &AudioPacket) -> io::Result<()> {
match self.0.as_mut().unwrap().write(data) { match self.0.as_mut().unwrap().write(packet.samples()) {
Ok(_) => (), Ok(_) => (),
Err(portaudio_rs::PaError::OutputUnderflowed) => error!("PortAudio write underflow"), Err(portaudio_rs::PaError::OutputUnderflowed) => error!("PortAudio write underflow"),
Err(e) => panic!("PA Error {}", e), Err(e) => panic!("PA Error {}", e),

View file

@ -1,4 +1,5 @@
use super::{Open, Sink}; use super::{Open, Sink};
use crate::audio::AudioPacket;
use libpulse_binding::{self as pulse, stream::Direction}; use libpulse_binding::{self as pulse, stream::Direction};
use libpulse_simple_binding::Simple; use libpulse_simple_binding::Simple;
use std::io; use std::io;
@ -65,13 +66,17 @@ impl Sink for PulseAudioSink {
Ok(()) Ok(())
} }
fn write(&mut self, data: &[i16]) -> io::Result<()> { fn write(&mut self, packet: &AudioPacket) -> io::Result<()> {
if let Some(s) = &self.s { if let Some(s) = &self.s {
// SAFETY: An i16 consists of two bytes, so that the given slice can be interpreted // SAFETY: An i16 consists of two bytes, so that the given slice can be interpreted
// as a byte array of double length. Each byte pointer is validly aligned, and so // as a byte array of double length. Each byte pointer is validly aligned, and so
// is the newly created slice. // is the newly created slice.
let d: &[u8] = let d: &[u8] = unsafe {
unsafe { std::slice::from_raw_parts(data.as_ptr() as *const u8, data.len() * 2) }; std::slice::from_raw_parts(
packet.samples().as_ptr() as *const u8,
packet.samples().len() * 2,
)
};
match s.write(d) { match s.write(d) {
Ok(_) => Ok(()), Ok(_) => Ok(()),

View file

@ -5,7 +5,27 @@ use std::{io, thread, time};
use cpal::traits::{DeviceTrait, HostTrait}; use cpal::traits::{DeviceTrait, HostTrait};
use thiserror::Error; use thiserror::Error;
use super::{Open, Sink}; use super::Sink;
use crate::audio::AudioPacket;
#[cfg(all(
feature = "rodiojack-backend",
not(any(target_os = "linux", target_os = "dragonfly", target_os = "freebsd"))
))]
compile_error!("Rodio JACK backend is currently only supported on linux.");
#[cfg(feature = "rodio-backend")]
pub fn mk_rodio(device: Option<String>) -> Box<dyn Sink + Send> {
Box::new(open(cpal::default_host(), device))
}
#[cfg(feature = "rodiojack-backend")]
pub fn mk_rodiojack(device: Option<String>) -> Box<dyn Sink + Send> {
Box::new(open(
cpal::host_from_id(cpal::HostId::Jack).unwrap(),
device,
))
}
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum RodioError { pub enum RodioError {
@ -59,10 +79,10 @@ fn list_formats(device: &rodio::Device) {
} }
} }
fn list_outputs() -> Result<(), cpal::DevicesError> { fn list_outputs(host: &cpal::Host) -> Result<(), cpal::DevicesError> {
let mut default_device_name = None; let mut default_device_name = None;
if let Some(default_device) = get_default_device() { if let Some(default_device) = host.default_output_device() {
default_device_name = default_device.name().ok(); default_device_name = default_device.name().ok();
println!( println!(
"Default Audio Device:\n {}", "Default Audio Device:\n {}",
@ -76,7 +96,7 @@ fn list_outputs() -> Result<(), cpal::DevicesError> {
warn!("No default device was found"); warn!("No default device was found");
} }
for device in cpal::default_host().output_devices()? { for device in host.output_devices()? {
match device.name() { match device.name() {
Ok(name) if Some(&name) == default_device_name.as_ref() => (), Ok(name) if Some(&name) == default_device_name.as_ref() => (),
Ok(name) => { Ok(name) => {
@ -94,14 +114,13 @@ fn list_outputs() -> Result<(), cpal::DevicesError> {
Ok(()) Ok(())
} }
fn get_default_device() -> Option<rodio::Device> { fn create_sink(
cpal::default_host().default_output_device() host: &cpal::Host,
} device: Option<String>,
) -> Result<(rodio::Sink, rodio::OutputStream), RodioError> {
fn create_sink(device: Option<String>) -> Result<(rodio::Sink, rodio::OutputStream), RodioError> {
let rodio_device = match device { let rodio_device = match device {
Some(ask) if &ask == "?" => { Some(ask) if &ask == "?" => {
let exit_code = match list_outputs() { let exit_code = match list_outputs(host) {
Ok(()) => 0, Ok(()) => 0,
Err(e) => { Err(e) => {
error!("{}", e); error!("{}", e);
@ -111,12 +130,13 @@ fn create_sink(device: Option<String>) -> Result<(rodio::Sink, rodio::OutputStre
exit(exit_code) exit(exit_code)
} }
Some(device_name) => { Some(device_name) => {
cpal::default_host() host.output_devices()?
.output_devices()?
.find(|d| d.name().ok().map_or(false, |name| name == device_name)) // Ignore devices for which getting name fails .find(|d| d.name().ok().map_or(false, |name| name == device_name)) // Ignore devices for which getting name fails
.ok_or(RodioError::DeviceNotAvailable(device_name))? .ok_or(RodioError::DeviceNotAvailable(device_name))?
} }
None => get_default_device().ok_or(RodioError::NoDeviceAvailable)?, None => host
.default_output_device()
.ok_or(RodioError::NoDeviceAvailable)?,
}; };
let name = rodio_device.name().ok(); let name = rodio_device.name().ok();
@ -130,37 +150,32 @@ fn create_sink(device: Option<String>) -> Result<(rodio::Sink, rodio::OutputStre
Ok((sink, stream)) Ok((sink, stream))
} }
impl Open for RodioSink { pub fn open(host: cpal::Host, device: Option<String>) -> RodioSink {
fn open(device: Option<String>) -> RodioSink { debug!("Using rodio sink with cpal host: {}", host.id().name());
debug!(
"Using rodio sink with cpal host: {:?}",
cpal::default_host().id().name()
);
let (sink_tx, sink_rx) = mpsc::sync_channel(1); let (sink_tx, sink_rx) = mpsc::sync_channel(1);
let (close_tx, close_rx) = mpsc::sync_channel(1); let (close_tx, close_rx) = mpsc::sync_channel(1);
std::thread::spawn(move || match create_sink(device) { std::thread::spawn(move || match create_sink(&host, device) {
Ok((sink, stream)) => { Ok((sink, stream)) => {
sink_tx.send(Ok(sink)).unwrap(); sink_tx.send(Ok(sink)).unwrap();
close_rx.recv().unwrap_err(); // This will fail as soon as the sender is dropped close_rx.recv().unwrap_err(); // This will fail as soon as the sender is dropped
debug!("drop rodio::OutputStream"); debug!("drop rodio::OutputStream");
drop(stream); drop(stream);
}
Err(e) => {
sink_tx.send(Err(e)).unwrap();
}
});
// Instead of the second `unwrap`, better error handling could be introduced
let sink = sink_rx.recv().unwrap().unwrap();
debug!("Rodio sink was created");
RodioSink {
rodio_sink: sink,
_close_tx: close_tx,
} }
Err(e) => {
sink_tx.send(Err(e)).unwrap();
}
});
// Instead of the second `unwrap`, better error handling could be introduced
let sink = sink_rx.recv().unwrap().unwrap();
debug!("Rodio sink was created");
RodioSink {
rodio_sink: sink,
_close_tx: close_tx,
} }
} }
@ -178,8 +193,8 @@ impl Sink for RodioSink {
Ok(()) Ok(())
} }
fn write(&mut self, data: &[i16]) -> io::Result<()> { fn write(&mut self, packet: &AudioPacket) -> io::Result<()> {
let source = rodio::buffer::SamplesBuffer::new(2, 44100, data); let source = rodio::buffer::SamplesBuffer::new(2, 44100, packet.samples());
self.rodio_sink.append(source); self.rodio_sink.append(source);
// Chunk sizes seem to be about 256 to 3000 ish items long. // Chunk sizes seem to be about 256 to 3000 ish items long.

View file

@ -1,4 +1,5 @@
use super::{Open, Sink}; use super::{Open, Sink};
use crate::audio::AudioPacket;
use sdl2::audio::{AudioQueue, AudioSpecDesired}; use sdl2::audio::{AudioQueue, AudioSpecDesired};
use std::{io, thread, time}; use std::{io, thread, time};
@ -45,12 +46,12 @@ impl Sink for SdlSink {
Ok(()) Ok(())
} }
fn write(&mut self, data: &[i16]) -> io::Result<()> { fn write(&mut self, packet: &AudioPacket) -> io::Result<()> {
while self.queue.size() > (2 * 2 * 44_100) { while self.queue.size() > (2 * 2 * 44_100) {
// sleep and wait for sdl thread to drain the queue a bit // sleep and wait for sdl thread to drain the queue a bit
thread::sleep(time::Duration::from_millis(10)); thread::sleep(time::Duration::from_millis(10));
} }
self.queue.queue(data); self.queue.queue(packet.samples());
Ok(()) Ok(())
} }
} }

View file

@ -1,4 +1,5 @@
use super::{Open, Sink}; use super::{Open, Sink};
use crate::audio::AudioPacket;
use shell_words::split; use shell_words::split;
use std::io::{self, Write}; use std::io::{self, Write};
use std::mem; use std::mem;
@ -43,11 +44,11 @@ impl Sink for SubprocessSink {
Ok(()) Ok(())
} }
fn write(&mut self, data: &[i16]) -> io::Result<()> { fn write(&mut self, packet: &AudioPacket) -> io::Result<()> {
let data: &[u8] = unsafe { let data: &[u8] = unsafe {
slice::from_raw_parts( slice::from_raw_parts(
data.as_ptr() as *const u8, packet.samples().as_ptr() as *const u8,
data.len() * mem::size_of::<i16>(), packet.samples().len() * mem::size_of::<i16>(),
) )
}; };
if let Some(child) = &mut self.child { if let Some(child) = &mut self.child {

View file

@ -55,6 +55,7 @@ pub struct PlayerConfig {
pub normalisation_type: NormalisationType, pub normalisation_type: NormalisationType,
pub normalisation_pregain: f32, pub normalisation_pregain: f32,
pub gapless: bool, pub gapless: bool,
pub passthrough: bool,
} }
impl Default for PlayerConfig { impl Default for PlayerConfig {
@ -65,6 +66,7 @@ impl Default for PlayerConfig {
normalisation_type: NormalisationType::default(), normalisation_type: NormalisationType::default(),
normalisation_pregain: 0.0, normalisation_pregain: 0.0,
gapless: true, gapless: true,
passthrough: false,
} }
} }
} }

View file

@ -1,39 +1,9 @@
#[macro_use] #[macro_use]
extern crate log; extern crate log;
extern crate byteorder; use librespot_audio as audio;
extern crate futures; use librespot_core as core;
extern crate shell_words; use librespot_metadata as metadata;
#[cfg(feature = "alsa-backend")]
extern crate alsa;
#[cfg(feature = "portaudio-backend")]
extern crate portaudio_rs;
#[cfg(feature = "pulseaudio-backend")]
extern crate libpulse_binding;
#[cfg(feature = "pulseaudio-backend")]
extern crate libpulse_simple_binding;
#[cfg(feature = "jackaudio-backend")]
extern crate jack;
#[cfg(feature = "gstreamer-backend")]
extern crate glib;
#[cfg(feature = "gstreamer-backend")]
extern crate gstreamer as gst;
#[cfg(feature = "gstreamer-backend")]
extern crate gstreamer_app as gst_app;
#[cfg(feature = "gstreamer-backend")]
extern crate zerocopy;
#[cfg(feature = "sdl-backend")]
extern crate sdl2;
extern crate librespot_audio as audio;
extern crate librespot_core;
extern crate librespot_metadata as metadata;
pub mod audio_backend; pub mod audio_backend;
pub mod config; pub mod config;

View file

@ -1,5 +1,18 @@
use std::cmp::max;
use std::future::Future;
use std::io::{self, Read, Seek, SeekFrom};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::{mem, thread};
use byteorder::{LittleEndian, ReadBytesExt};
use futures_util::stream::futures_unordered::FuturesUnordered;
use futures_util::{future, StreamExt, TryFutureExt};
use tokio::sync::{mpsc, oneshot};
use crate::audio::{AudioDecoder, AudioError, AudioPacket, PassthroughDecoder, VorbisDecoder};
use crate::audio::{AudioDecrypt, AudioFile, StreamLoaderController}; use crate::audio::{AudioDecrypt, AudioFile, StreamLoaderController};
use crate::audio::{VorbisDecoder, VorbisPacket};
use crate::audio::{ use crate::audio::{
READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_BEFORE_PLAYBACK_SECONDS, READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_BEFORE_PLAYBACK_SECONDS,
READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS,
@ -7,23 +20,11 @@ use crate::audio::{
use crate::audio_backend::Sink; use crate::audio_backend::Sink;
use crate::config::NormalisationType; use crate::config::NormalisationType;
use crate::config::{Bitrate, PlayerConfig}; use crate::config::{Bitrate, PlayerConfig};
use crate::core::session::Session;
use crate::core::spotify_id::SpotifyId;
use crate::core::util::SeqGenerator;
use crate::metadata::{AudioItem, FileFormat}; use crate::metadata::{AudioItem, FileFormat};
use crate::mixer::AudioFilter; use crate::mixer::AudioFilter;
use librespot_core::session::Session;
use librespot_core::spotify_id::SpotifyId;
use librespot_core::util::SeqGenerator;
use byteorder::{LittleEndian, ReadBytesExt};
use futures::channel::{mpsc, oneshot};
use futures::{future, Future, Stream, StreamExt, TryFutureExt};
use std::borrow::Cow;
use std::cmp::max;
use std::io::{self, Read, Seek, SeekFrom};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use std::{mem, thread};
const PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS: u32 = 30000; const PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS: u32 = 30000;
@ -244,8 +245,8 @@ impl Player {
where where
F: FnOnce() -> Box<dyn Sink + Send> + Send + 'static, F: FnOnce() -> Box<dyn Sink + Send> + Send + 'static,
{ {
let (cmd_tx, cmd_rx) = mpsc::unbounded(); let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let (event_sender, event_receiver) = mpsc::unbounded(); let (event_sender, event_receiver) = mpsc::unbounded_channel();
let handle = thread::spawn(move || { let handle = thread::spawn(move || {
debug!("new Player[{}]", session.session_id()); debug!("new Player[{}]", session.session_id());
@ -265,8 +266,8 @@ impl Player {
}; };
// While PlayerInternal is written as a future, it still contains blocking code. // While PlayerInternal is written as a future, it still contains blocking code.
// It must be run by using wait() in a dedicated thread. // It must be run by using block_on() in a dedicated thread.
futures::executor::block_on(internal); futures_executor::block_on(internal);
debug!("PlayerInternal thread finished."); debug!("PlayerInternal thread finished.");
}); });
@ -281,7 +282,7 @@ impl Player {
} }
fn command(&self, cmd: PlayerCommand) { fn command(&self, cmd: PlayerCommand) {
self.commands.as_ref().unwrap().unbounded_send(cmd).unwrap(); self.commands.as_ref().unwrap().send(cmd).unwrap();
} }
pub fn load(&mut self, track_id: SpotifyId, start_playing: bool, position_ms: u32) -> u64 { pub fn load(&mut self, track_id: SpotifyId, start_playing: bool, position_ms: u32) -> u64 {
@ -317,14 +318,14 @@ impl Player {
} }
pub fn get_player_event_channel(&self) -> PlayerEventChannel { pub fn get_player_event_channel(&self) -> PlayerEventChannel {
let (event_sender, event_receiver) = mpsc::unbounded(); let (event_sender, event_receiver) = mpsc::unbounded_channel();
self.command(PlayerCommand::AddEventSender(event_sender)); self.command(PlayerCommand::AddEventSender(event_sender));
event_receiver event_receiver
} }
pub async fn get_end_of_track_future(&self) { pub async fn await_end_of_track(&self) {
let mut channel = self.get_player_event_channel(); let mut channel = self.get_player_event_channel();
while let Some(event) = channel.next().await { while let Some(event) = channel.recv().await {
if matches!( if matches!(
event, event,
PlayerEvent::EndOfTrack { .. } | PlayerEvent::Stopped { .. } PlayerEvent::EndOfTrack { .. } | PlayerEvent::Stopped { .. }
@ -377,7 +378,7 @@ enum PlayerPreload {
}, },
} }
type Decoder = VorbisDecoder<Subfile<AudioDecrypt<AudioFile>>>; type Decoder = Box<dyn AudioDecoder + Send>;
enum PlayerState { enum PlayerState {
Stopped, Stopped,
@ -575,21 +576,20 @@ struct PlayerTrackLoader {
} }
impl PlayerTrackLoader { impl PlayerTrackLoader {
async fn find_available_alternative<'a, 'b>( async fn find_available_alternative(&self, audio: AudioItem) -> Option<AudioItem> {
&'a self,
audio: &'b AudioItem,
) -> Option<Cow<'b, AudioItem>> {
if audio.available { if audio.available {
Some(Cow::Borrowed(audio)) Some(audio)
} else if let Some(alternatives) = &audio.alternatives { } else if let Some(alternatives) = &audio.alternatives {
let alternatives = alternatives let alternatives: FuturesUnordered<_> = alternatives
.iter() .iter()
.map(|alt_id| AudioItem::get_audio_item(&self.session, *alt_id)); .map(|alt_id| AudioItem::get_audio_item(&self.session, *alt_id))
let alternatives = future::try_join_all(alternatives).await.unwrap(); .collect();
alternatives alternatives
.into_iter() .filter_map(|x| future::ready(x.ok()))
.find(|alt| alt.available) .filter(|x| future::ready(x.available))
.map(Cow::Owned) .next()
.await
} else { } else {
None None
} }
@ -629,10 +629,10 @@ impl PlayerTrackLoader {
info!("Loading <{}> with Spotify URI <{}>", audio.name, audio.uri); info!("Loading <{}> with Spotify URI <{}>", audio.name, audio.uri);
let audio = match self.find_available_alternative(&audio).await { let audio = match self.find_available_alternative(audio).await {
Some(audio) => audio, Some(audio) => audio,
None => { None => {
warn!("<{}> is not available", audio.uri); warn!("<{}> is not available", spotify_id.to_uri());
return None; return None;
} }
}; };
@ -730,7 +730,19 @@ impl PlayerTrackLoader {
let audio_file = Subfile::new(decrypted_file, 0xa7); let audio_file = Subfile::new(decrypted_file, 0xa7);
let mut decoder = match VorbisDecoder::new(audio_file) { let result = if self.config.passthrough {
match PassthroughDecoder::new(audio_file) {
Ok(result) => Ok(Box::new(result) as Decoder),
Err(e) => Err(AudioError::PassthroughError(e)),
}
} else {
match VorbisDecoder::new(audio_file) {
Ok(result) => Ok(Box::new(result) as Decoder),
Err(e) => Err(AudioError::VorbisError(e)),
}
};
let mut decoder = match result {
Ok(decoder) => decoder, Ok(decoder) => decoder,
Err(e) if is_cached => { Err(e) if is_cached => {
warn!( warn!(
@ -779,12 +791,13 @@ impl Future for PlayerInternal {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// While this is written as a future, it still contains blocking code. // While this is written as a future, it still contains blocking code.
// It must be run on its own thread. // It must be run on its own thread.
let passthrough = self.config.passthrough;
loop { loop {
let mut all_futures_completed_or_not_ready = true; let mut all_futures_completed_or_not_ready = true;
// process commands that were sent to us // process commands that were sent to us
let cmd = match Pin::new(&mut self.commands).poll_next(cx) { let cmd = match self.commands.poll_recv(cx) {
Poll::Ready(None) => return Poll::Ready(()), // client has disconnected - shut down. Poll::Ready(None) => return Poll::Ready(()), // client has disconnected - shut down.
Poll::Ready(Some(cmd)) => { Poll::Ready(Some(cmd)) => {
all_futures_completed_or_not_ready = false; all_futures_completed_or_not_ready = false;
@ -880,32 +893,44 @@ impl Future for PlayerInternal {
{ {
let packet = decoder.next_packet().expect("Vorbis error"); let packet = decoder.next_packet().expect("Vorbis error");
if let Some(ref packet) = packet { if !passthrough {
*stream_position_pcm += (packet.data().len() / 2) as u64; if let Some(ref packet) = packet {
let stream_position_millis = Self::position_pcm_to_ms(*stream_position_pcm); *stream_position_pcm =
*stream_position_pcm + (packet.samples().len() / 2) as u64;
let stream_position_millis =
Self::position_pcm_to_ms(*stream_position_pcm);
let notify_about_position = match *reported_nominal_start_time { let notify_about_position = match *reported_nominal_start_time {
None => true, None => true,
Some(reported_nominal_start_time) => { Some(reported_nominal_start_time) => {
// only notify if we're behind. If we're ahead it's probably due to a buffer of the backend and we;re actually in time. // only notify if we're behind. If we're ahead it's probably due to a buffer of the backend and we;re actually in time.
let lag = (Instant::now() - reported_nominal_start_time).as_millis() let lag = (Instant::now() - reported_nominal_start_time)
as i64 .as_millis()
- stream_position_millis as i64; as i64
lag > 1000 - stream_position_millis as i64;
if lag > 1000 {
true
} else {
false
}
}
};
if notify_about_position {
*reported_nominal_start_time = Some(
Instant::now()
- Duration::from_millis(stream_position_millis as u64),
);
self.send_event(PlayerEvent::Playing {
track_id,
play_request_id,
position_ms: stream_position_millis as u32,
duration_ms,
});
} }
};
if notify_about_position {
*reported_nominal_start_time = Some(
Instant::now()
- Duration::from_millis(stream_position_millis as u64),
);
self.send_event(PlayerEvent::Playing {
track_id,
play_request_id,
position_ms: stream_position_millis as u32,
duration_ms,
});
} }
} else {
// position, even if irrelevant, must be set so that seek() is called
*stream_position_pcm = duration_ms.into();
} }
self.handle_packet(packet, normalisation_factor); self.handle_packet(packet, normalisation_factor);
@ -1087,23 +1112,23 @@ impl PlayerInternal {
} }
} }
fn handle_packet(&mut self, packet: Option<VorbisPacket>, normalisation_factor: f32) { fn handle_packet(&mut self, packet: Option<AudioPacket>, normalisation_factor: f32) {
match packet { match packet {
Some(mut packet) => { Some(mut packet) => {
if !packet.data().is_empty() { if !packet.is_empty() {
if let Some(ref editor) = self.audio_filter { if let AudioPacket::Samples(ref mut data) = packet {
editor.modify_stream(&mut packet.data_mut()) if let Some(ref editor) = self.audio_filter {
}; editor.modify_stream(data)
}
if self.config.normalisation if self.config.normalisation && normalisation_factor != 1.0 {
&& (normalisation_factor - 1.0).abs() < f32::EPSILON for x in data.iter_mut() {
{ *x = (*x as f32 * normalisation_factor) as i16;
for x in packet.data_mut().iter_mut() { }
*x = (*x as f32 * normalisation_factor) as i16;
} }
} }
if let Err(err) = self.sink.write(&packet.data()) { if let Err(err) = self.sink.write(&packet) {
error!("Could not write audio: {}", err); error!("Could not write audio: {}", err);
self.ensure_sink_stopped(false); self.ensure_sink_stopped(false);
} }
@ -1555,7 +1580,7 @@ impl PlayerInternal {
fn send_event(&mut self, event: PlayerEvent) { fn send_event(&mut self, event: PlayerEvent) {
let mut index = 0; let mut index = 0;
while index < self.event_senders.len() { while index < self.event_senders.len() {
match self.event_senders[index].unbounded_send(event.clone()) { match self.event_senders[index].send(event.clone()) {
Ok(_) => index += 1, Ok(_) => index += 1,
Err(_) => { Err(_) => {
self.event_senders.remove(index); self.event_senders.remove(index);
@ -1583,7 +1608,7 @@ impl PlayerInternal {
let (result_tx, result_rx) = oneshot::channel(); let (result_tx, result_rx) = oneshot::channel();
std::thread::spawn(move || { std::thread::spawn(move || {
futures::executor::block_on(loader.load_track(spotify_id, position_ms)).and_then( futures_executor::block_on(loader.load_track(spotify_id, position_ms)).and_then(
move |data| { move |data| {
let _ = result_tx.send(data); let _ = result_tx.send(data);
Some(()) Some(())

View file

@ -1,6 +1,6 @@
[package] [package]
name = "librespot-protocol" name = "librespot-protocol"
version = "0.1.3" version = "0.1.6"
authors = ["Paul Liétar <paul@lietar.net>"] authors = ["Paul Liétar <paul@lietar.net>"]
build = "build.rs" build = "build.rs"
description = "The protobuf logic for communicating with Spotify servers" description = "The protobuf logic for communicating with Spotify servers"

View file

@ -1,16 +1,25 @@
#!/bin/bash #!/bin/bash
SKIP_MERGE='false'
DRY_RUN='false'
WORKINGDIR="$( cd "$(dirname "$0")" ; pwd -P )" WORKINGDIR="$( cd "$(dirname "$0")" ; pwd -P )"
cd $WORKINGDIR cd $WORKINGDIR
crates=( "protocol" "core" "audio" "metadata" "playback" "connect" "librespot" ) crates=( "protocol" "core" "audio" "metadata" "playback" "connect" "librespot" )
function switchBranch { function switchBranch {
# You are expected to have committed/stashed your changes before running this. if [ "$SKIP_MERGE" = 'false' ] ; then
echo "Switching to master branch and merging development." # You are expected to have committed/stashed your changes before running this.
git checkout master echo "Switching to master branch and merging development."
git pull git checkout master
git merge dev git pull
if [ "$DRY_RUN" = 'true' ] ; then
git merge --no-commit --no-ff dev
else
git merge dev
fi
fi
} }
function updateVersion { function updateVersion {
@ -26,15 +35,25 @@ function updateVersion {
echo "Path is $crate_path" echo "Path is $crate_path"
if [ "$CRATE" = "librespot" ] if [ "$CRATE" = "librespot" ]
then then
cargo update if [ "$DRY_RUN" = 'true' ] ; then
git add . && git commit -a -m "Update Cargo.lock" cargo update --dry-run
git add . && git commit --dry-run -a -m "Update Cargo.lock"
else
cargo update
git add . && git commit -a -m "Update Cargo.lock"
fi
fi fi
done done
} }
function commitAndTag { function commitAndTag {
git commit -a -m "Update version numbers to $1" if [ "$DRY_RUN" = 'true' ] ; then
git tag "v$1" -a -m "Update to version $1" # Skip tagging on dry run.
git commit --dry-run -a -m "Update version numbers to $1"
else
git commit -a -m "Update version numbers to $1"
git tag "v$1" -a -m "Update to version $1"
fi
} }
function get_crate_name { function get_crate_name {
@ -72,9 +91,17 @@ function publishCrates {
if [ "$CRATE" == "protocol" ] if [ "$CRATE" == "protocol" ]
then then
# Protocol crate needs --no-verify option due to build.rs modification. # Protocol crate needs --no-verify option due to build.rs modification.
cargo publish --no-verify if [ "$DRY_RUN" = 'true' ] ; then
cargo publish --no-verify --dry-run
else
cargo publish --no-verify
fi
else else
cargo publish if [ "$DRY_RUN" = 'true' ] ; then
cargo publish --dry-run
else
cargo publish
fi
fi fi
echo "Successfully published $crate_name to crates.io" echo "Successfully published $crate_name to crates.io"
remoteWait 30 $crate_name remoteWait 30 $crate_name
@ -83,10 +110,32 @@ function publishCrates {
function updateRepo { function updateRepo {
cd $WORKINGDIR cd $WORKINGDIR
echo "Pushing to master branch of repo." if [ "$DRY_RUN" = 'true' ] ; then
git push origin master echo "Pushing to master branch of repo. [DRY RUN]"
echo "Pushing v$1 tag to master branch of repo." git push --dry-run origin master
git push origin v$1 echo "Pushing v$1 tag to master branch of repo. [DRY RUN]"
git push --dry-run origin v$1
# Cancels any merges in progress
git merge --abort
git checkout dev
git merge --no-commit --no-ff master
# Cancels above merge
git merge --abort
git push --dry-run
else
echo "Pushing to master branch of repo."
git push origin master
echo "Pushing v$1 tag to master branch of repo."
git push origin v$1
# Update the dev repo with latest version commit
git checkout dev
git merge master
git push
fi
} }
function rebaseDev { function rebaseDev {
@ -105,5 +154,47 @@ function run {
echo "Successfully published v$1 to crates.io and uploaded changes to repo." echo "Successfully published v$1 to crates.io and uploaded changes to repo."
} }
#Set Script Name variable
SCRIPT=`basename ${BASH_SOURCE[0]}`
print_usage () {
local l_MSG=$1
if [ ! -z "${l_MSG}" ]; then
echo "Usage Error: $l_MSG"
fi
echo "Usage: $SCRIPT <args> <version>"
echo " where <version> specifies the version number in semver format, eg. 1.0.1"
echo "Recognized optional command line arguments"
echo "--dry-run -- Test the script before making live changes"
echo "--skip-merge -- Skip merging dev into master before publishing"
exit 1
}
### check number of command line arguments
NUMARGS=$#
if [ $NUMARGS -eq 0 ]; then
print_usage 'No command line arguments specified'
fi
while test $# -gt 0; do
case "$1" in
-h|--help)
print_usage
exit 0
;;
--dry-run)
DRY_RUN='true'
shift
;;
--skip-merge)
SKIP_MERGE='true'
shift
;;
*)
break
;;
esac
done
# First argument is new version number. # First argument is new version number.
run $1 run $1

View file

@ -1,8 +1,8 @@
#![crate_name = "librespot"] #![crate_name = "librespot"]
pub extern crate librespot_audio as audio; pub use librespot_audio as audio;
pub extern crate librespot_connect as connect; pub use librespot_connect as connect;
pub extern crate librespot_core as core; pub use librespot_core as core;
pub extern crate librespot_metadata as metadata; pub use librespot_metadata as metadata;
pub extern crate librespot_playback as playback; pub use librespot_playback as playback;
pub extern crate librespot_protocol as protocol; pub use librespot_protocol as protocol;

View file

@ -1,4 +1,4 @@
use futures::{channel::mpsc::UnboundedReceiver, future::FusedFuture, FutureExt, StreamExt}; use futures_util::{future, FutureExt, StreamExt};
use librespot_playback::player::PlayerEvent; use librespot_playback::player::PlayerEvent;
use log::{error, info, warn}; use log::{error, info, warn};
use sha1::{Digest, Sha1}; use sha1::{Digest, Sha1};
@ -10,9 +10,10 @@ use std::{
io::{stderr, Write}, io::{stderr, Write},
pin::Pin, pin::Pin,
}; };
use tokio::sync::mpsc::UnboundedReceiver;
use url::Url; use url::Url;
use librespot::core::authentication::{get_credentials, Credentials}; use librespot::core::authentication::Credentials;
use librespot::core::cache::Cache; use librespot::core::cache::Cache;
use librespot::core::config::{ConnectConfig, DeviceType, SessionConfig, VolumeCtrl}; use librespot::core::config::{ConnectConfig, DeviceType, SessionConfig, VolumeCtrl};
use librespot::core::session::Session; use librespot::core::session::Session;
@ -70,6 +71,29 @@ fn list_backends() {
} }
} }
pub fn get_credentials<F: FnOnce(&String) -> Option<String>>(
username: Option<String>,
password: Option<String>,
cached_credentials: Option<Credentials>,
prompt: F,
) -> Option<Credentials> {
if let Some(username) = username {
if let Some(password) = password {
return Some(Credentials::with_password(username, password));
}
match cached_credentials {
Some(credentials) if username == credentials.username => Some(credentials),
_ => {
let password = prompt(&username)?;
Some(Credentials::with_password(username, password))
}
}
} else {
cached_credentials
}
}
#[derive(Clone)] #[derive(Clone)]
struct Setup { struct Setup {
backend: fn(Option<String>) -> Box<dyn Sink + Send + 'static>, backend: fn(Option<String>) -> Box<dyn Sink + Send + 'static>,
@ -203,6 +227,11 @@ fn setup(args: &[String]) -> Setup {
"", "",
"disable-gapless", "disable-gapless",
"disable gapless playback.", "disable gapless playback.",
)
.optflag(
"",
"passthrough",
"Pass raw stream to output, only works for \"pipe\"."
); );
let matches = match opts.parse(&args[1..]) { let matches = match opts.parse(&args[1..]) {
@ -312,10 +341,10 @@ fn setup(args: &[String]) -> Setup {
let credentials = { let credentials = {
let cached_credentials = cache.as_ref().and_then(Cache::credentials); let cached_credentials = cache.as_ref().and_then(Cache::credentials);
let password = |username: &String| -> String { let password = |username: &String| -> Option<String> {
write!(stderr(), "Password for {}: ", username).unwrap(); write!(stderr(), "Password for {}: ", username).ok()?;
stderr().flush().unwrap(); stderr().flush().ok()?;
rpassword::read_password().unwrap() rpassword::read_password().ok()
}; };
get_credentials( get_credentials(
@ -355,6 +384,8 @@ fn setup(args: &[String]) -> Setup {
} }
}; };
let passthrough = matches.opt_present("passthrough");
let player_config = { let player_config = {
let bitrate = matches let bitrate = matches
.opt_str("b") .opt_str("b")
@ -377,6 +408,7 @@ fn setup(args: &[String]) -> Setup {
.opt_str("normalisation-pregain") .opt_str("normalisation-pregain")
.map(|pregain| pregain.parse::<f32>().expect("Invalid pregain float value")) .map(|pregain| pregain.parse::<f32>().expect("Invalid pregain float value"))
.unwrap_or(PlayerConfig::default().normalisation_pregain), .unwrap_or(PlayerConfig::default().normalisation_pregain),
passthrough,
} }
}; };
@ -436,8 +468,7 @@ async fn main() {
let mut player_event_channel: Option<UnboundedReceiver<PlayerEvent>> = None; let mut player_event_channel: Option<UnboundedReceiver<PlayerEvent>> = None;
let mut auto_connect_times: Vec<Instant> = vec![]; let mut auto_connect_times: Vec<Instant> = vec![];
let mut discovery = None; let mut discovery = None;
let mut connecting: Pin<Box<dyn FusedFuture<Output = _>>> = let mut connecting: Pin<Box<dyn future::FusedFuture<Output = _>>> = Box::pin(future::pending());
Box::pin(futures::future::pending());
if setupp.enable_discovery { if setupp.enable_discovery {
let config = setupp.connect_config.clone(); let config = setupp.connect_config.clone();
@ -558,7 +589,7 @@ async fn main() {
} }
} }
}, },
event = async { player_event_channel.as_mut().unwrap().next().await }, if player_event_channel.is_some() => match event { event = async { player_event_channel.as_mut().unwrap().recv().await }, if player_event_channel.is_some() => match event {
Some(event) => { Some(event) => {
if let Some(program) = &setupp.player_event_program { if let Some(program) = &setupp.player_event_program {
if let Some(child) = run_program_on_events(event, program) { if let Some(child) = run_program_on_events(event, program) {