Merge branch 'futures_migration' of https://github.com/Johannesd3/librespot into tokio_migration

This commit is contained in:
ashthespy 2021-01-25 18:35:18 +01:00
commit 9546fb6e61
32 changed files with 1252 additions and 2882 deletions

1180
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -15,49 +15,35 @@ edition = "2018"
name = "librespot"
path = "src/lib.rs"
[[bin]]
name = "librespot"
path = "src/main.rs"
doc = false
# [[bin]]
# name = "librespot"
# path = "src/main.rs"
# doc = false
[dependencies.librespot-audio]
path = "audio"
version = "0.1.3"
[dependencies.librespot-connect]
path = "connect"
version = "0.1.3"
# [dependencies.librespot-connect]
# path = "connect"
# version = "0.1.3"
[dependencies.librespot-core]
path = "core"
version = "0.1.3"
[dependencies.librespot-metadata]
path = "metadata"
version = "0.1.3"
[dependencies.librespot-playback]
path = "playback"
version = "0.1.3"
[dependencies.librespot-protocol]
path = "protocol"
version = "0.1.3"
[dependencies]
base64 = "0.13"
env_logger = {version = "0.8", default-features = false, features = ["termcolor","humantime","atty"]}
futures = "0.1"
getopts = "0.2"
log = "0.4"
num-bigint = "0.3"
protobuf = "~2.14.0"
rand = "0.7"
rpassword = "5.0"
# tokio = "0.1"
tokio = { version = "0.2", features = ["rt-core"] }
tokio-io = "0.1"
tokio-process = "0.2"
tokio-signal = "0.2"
url = "1.7"
sha-1 = "0.8"
hex = "0.4"
[features]
alsa-backend = ["librespot-playback/alsa-backend"]
portaudio-backend = ["librespot-playback/portaudio-backend"]
@ -70,7 +56,7 @@ gstreamer-backend = ["librespot-playback/gstreamer-backend"]
with-tremor = ["librespot-audio/with-tremor"]
with-vorbis = ["librespot-audio/with-vorbis"]
with-dns-sd = ["librespot-connect/with-dns-sd"]
# with-dns-sd = ["librespot-connect/with-dns-sd"]
default = ["librespot-playback/rodio-backend"]

View file

@ -11,17 +11,17 @@ path = "../core"
version = "0.1.3"
[dependencies]
aes-ctr = "0.6"
bit-set = "0.5"
byteorder = "1.3"
bytes = "0.4"
byteorder = "1.4"
bytes = "1.0"
futures = "0.3"
tokio = { version = "0.2", features = ["full"] } # Temp "rt-core", "sync"
lewton = "0.9"
lewton = "0.10"
log = "0.4"
num-bigint = "0.3"
num-traits = "0.2"
pin-project-lite = "0.2.4"
tempfile = "3.1"
aes-ctr = "0.3"
librespot-tremor = { version = "0.1.0", optional = true }
vorbis = { version ="0.0.14", optional = true }

View file

@ -1,7 +1,7 @@
use std::io;
use aes_ctr::stream_cipher::generic_array::GenericArray;
use aes_ctr::stream_cipher::{NewStreamCipher, SyncStreamCipher, SyncStreamCipherSeek};
use aes_ctr::cipher::generic_array::GenericArray;
use aes_ctr::cipher::{NewStreamCipher, SyncStreamCipher, SyncStreamCipherSeek};
use aes_ctr::Aes128Ctr;
use librespot_core::audio_key::AudioKey;

View file

@ -1,30 +1,31 @@
use crate::range_set::{Range, RangeSet};
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use bytes::Bytes;
use std::cmp::{max, min};
use futures::{
channel::{mpsc, oneshot},
future,
};
use futures::{Future, Stream, StreamExt, TryFutureExt, TryStreamExt};
use std::fs;
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::sync::{Arc, Condvar, Mutex};
use std::task::Poll;
use std::time::{Duration, Instant};
use std::{
cmp::{max, min},
pin::Pin,
task::Context,
};
use tempfile::NamedTempFile;
use futures::channel::mpsc::unbounded;
use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders};
use librespot_core::session::Session;
use librespot_core::spotify_id::FileId;
use std::sync::atomic;
use std::sync::atomic::AtomicUsize;
use futures::{
channel::{mpsc, mpsc::unbounded, oneshot},
ready, Future, Stream,
};
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::task;
const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16;
// The minimum size of a block that is requested from the Spotify servers in one request.
// This is the block size that is typically requested while doing a seek() on a file.
@ -95,22 +96,6 @@ pub enum AudioFile {
Streaming(AudioFileStreaming),
}
pub enum AudioFileOpen {
Cached(Option<fs::File>),
Streaming(AudioFileOpenStreaming),
}
pub struct AudioFileOpenStreaming {
session: Session,
initial_data_rx: Option<ChannelData>,
initial_data_length: Option<usize>,
initial_request_sent_time: Instant,
headers: ChannelHeaders,
file_id: FileId,
complete_tx: Option<oneshot::Sender<NamedTempFile>>,
streaming_data_rate: usize,
}
enum StreamLoaderCommand {
Fetch(Range), // signal the stream loader to fetch a range of the file
RandomAccessMode(), // optimise download strategy for random access
@ -127,45 +112,36 @@ pub struct StreamLoaderController {
impl StreamLoaderController {
pub fn len(&self) -> usize {
return self.file_size;
self.file_size
}
pub fn is_empty(&self) -> bool {
self.file_size == 0
}
pub fn range_available(&self, range: Range) -> bool {
if let Some(ref shared) = self.stream_shared {
let download_status = shared.download_status.lock().unwrap();
if range.length
range.length
<= download_status
.downloaded
.contained_length_from_value(range.start)
{
return true;
} else {
return false;
}
} else {
if range.length <= self.len() - range.start {
return true;
} else {
return false;
}
range.length <= self.len() - range.start
}
}
pub fn range_to_end_available(&self) -> bool {
if let Some(ref shared) = self.stream_shared {
self.stream_shared.as_ref().map_or(true, |shared| {
let read_position = shared.read_position.load(atomic::Ordering::Relaxed);
self.range_available(Range::new(read_position, self.len() - read_position))
} else {
true
}
})
}
pub fn ping_time_ms(&self) -> usize {
if let Some(ref shared) = self.stream_shared {
return shared.ping_time_ms.load(atomic::Ordering::Relaxed);
} else {
return 0;
}
self.stream_shared.as_ref().map_or(0, |shared| {
shared.ping_time_ms.load(atomic::Ordering::Relaxed)
})
}
fn send_stream_loader_command(&mut self, command: StreamLoaderCommand) {
@ -223,27 +199,23 @@ impl StreamLoaderController {
}
pub fn fetch_next(&mut self, length: usize) {
let range: Range = if let Some(ref shared) = self.stream_shared {
Range {
if let Some(ref shared) = self.stream_shared {
let range = Range {
start: shared.read_position.load(atomic::Ordering::Relaxed),
length: length,
}
} else {
return;
};
self.fetch(range);
};
self.fetch(range)
}
}
pub fn fetch_next_blocking(&mut self, length: usize) {
let range: Range = if let Some(ref shared) = self.stream_shared {
Range {
if let Some(ref shared) = self.stream_shared {
let range = Range {
start: shared.read_position.load(atomic::Ordering::Relaxed),
length: length,
}
} else {
return;
};
self.fetch_blocking(range);
};
self.fetch_blocking(range);
}
}
pub fn set_random_access_mode(&mut self) {
@ -295,110 +267,16 @@ struct AudioFileShared {
read_position: AtomicUsize,
}
impl AudioFileOpenStreaming {
fn finish(&mut self, size: usize) -> AudioFileStreaming {
let shared = Arc::new(AudioFileShared {
file_id: self.file_id,
file_size: size,
stream_data_rate: self.streaming_data_rate,
cond: Condvar::new(),
download_status: Mutex::new(AudioFileDownloadStatus {
requested: RangeSet::new(),
downloaded: RangeSet::new(),
}),
download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise
number_of_open_requests: AtomicUsize::new(0),
ping_time_ms: AtomicUsize::new(0),
read_position: AtomicUsize::new(0),
});
let mut write_file = NamedTempFile::new().unwrap();
write_file.as_file().set_len(size as u64).unwrap();
write_file.seek(SeekFrom::Start(0)).unwrap();
let read_file = write_file.reopen().unwrap();
let initial_data_rx = self.initial_data_rx.take().unwrap();
let initial_data_length = self.initial_data_length.take().unwrap();
let complete_tx = self.complete_tx.take().unwrap();
//let (seek_tx, seek_rx) = mpsc::unbounded();
let (stream_loader_command_tx, stream_loader_command_rx) =
mpsc::unbounded::<StreamLoaderCommand>();
let fetcher = AudioFileFetch::new(
self.session.clone(),
shared.clone(),
initial_data_rx,
self.initial_request_sent_time,
initial_data_length,
write_file,
stream_loader_command_rx,
complete_tx,
);
self.session.spawn(fetcher);
// tokio::spawn(move |_| fetcher);
AudioFileStreaming {
read_file: read_file,
position: 0,
//seek: seek_tx,
stream_loader_command_tx: stream_loader_command_tx,
shared: shared,
}
}
}
impl Future for AudioFileOpen {
type Output = Result<AudioFile, ChannelError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<AudioFile, ChannelError>> {
match *self {
AudioFileOpen::Streaming(ref mut open) => {
let file = ready!(open.poll());
Poll::Ready(Ok(AudioFile::Streaming(file)))
}
AudioFileOpen::Cached(ref mut file) => {
let file = file.take().unwrap();
Poll::Ready(Ok(AudioFile::Cached(file)))
}
}
}
}
impl Future for AudioFileOpenStreaming {
type Output = Result<AudioFileStreaming, ChannelError>;
fn poll(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<AudioFileStreaming, ChannelError>> {
loop {
let (id, data) = ready!(self.headers.poll()).unwrap();
if id == 0x3 {
let size = BigEndian::read_u32(&data) as usize * 4;
let file = self.finish(size);
return Poll::Ready(Ok(file));
}
}
}
}
impl AudioFile {
pub fn open(
pub async fn open(
session: &Session,
file_id: FileId,
bytes_per_second: usize,
play_from_beginning: bool,
) -> AudioFileOpen {
let cache = session.cache().cloned();
if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) {
) -> Result<AudioFile, ChannelError> {
if let Some(file) = session.cache().and_then(|cache| cache.file(file_id)) {
debug!("File {} already in cache", file_id);
return AudioFileOpen::Cached(Some(file));
return Ok(AudioFile::Cached(file));
}
debug!("Downloading file {}", file_id);
@ -420,56 +298,112 @@ impl AudioFile {
}
let (headers, data) = request_range(session, file_id, 0, initial_data_length).split();
let open = AudioFileOpenStreaming {
session: session.clone(),
file_id: file_id,
headers: headers,
initial_data_rx: Some(data),
initial_data_length: Some(initial_data_length),
initial_request_sent_time: Instant::now(),
complete_tx: Some(complete_tx),
streaming_data_rate: bytes_per_second,
};
let session_ = session.clone();
session.spawn(
complete_rx
.map(move |mut file| {
if let Some(cache) = session_.cache() {
cache.save_file(file_id, &mut file);
debug!("File {} complete, saving to cache", file_id);
} else {
debug!("File {} complete", file_id);
}
})
.or_else(|oneshot::Canceled| Ok(())),
let streaming = AudioFileStreaming::open(
session.clone(),
data,
initial_data_length,
Instant::now(),
headers,
file_id,
complete_tx,
bytes_per_second,
);
return AudioFileOpen::Streaming(open);
let session_ = session.clone();
session.spawn(complete_rx.map_ok(move |mut file| {
if let Some(cache) = session_.cache() {
cache.save_file(file_id, &mut file);
debug!("File {} complete, saving to cache", file_id);
} else {
debug!("File {} complete", file_id);
}
}));
Ok(AudioFile::Streaming(streaming.await?))
}
pub fn get_stream_loader_controller(&self) -> StreamLoaderController {
match self {
AudioFile::Streaming(ref stream) => {
return StreamLoaderController {
channel_tx: Some(stream.stream_loader_command_tx.clone()),
stream_shared: Some(stream.shared.clone()),
file_size: stream.shared.file_size,
};
}
AudioFile::Cached(ref file) => {
return StreamLoaderController {
channel_tx: None,
stream_shared: None,
file_size: file.metadata().unwrap().len() as usize,
};
}
AudioFile::Streaming(ref stream) => StreamLoaderController {
channel_tx: Some(stream.stream_loader_command_tx.clone()),
stream_shared: Some(stream.shared.clone()),
file_size: stream.shared.file_size,
},
AudioFile::Cached(ref file) => StreamLoaderController {
channel_tx: None,
stream_shared: None,
file_size: file.metadata().unwrap().len() as usize,
},
}
}
}
impl AudioFileStreaming {
pub async fn open(
session: Session,
initial_data_rx: ChannelData,
initial_data_length: usize,
initial_request_sent_time: Instant,
headers: ChannelHeaders,
file_id: FileId,
complete_tx: oneshot::Sender<NamedTempFile>,
streaming_data_rate: usize,
) -> Result<AudioFileStreaming, ChannelError> {
let (_, data) = headers
.try_filter(|(id, _)| future::ready(*id == 0x3))
.next()
.await
.unwrap()?;
let size = BigEndian::read_u32(&data) as usize * 4;
let shared = Arc::new(AudioFileShared {
file_id: file_id,
file_size: size,
stream_data_rate: streaming_data_rate,
cond: Condvar::new(),
download_status: Mutex::new(AudioFileDownloadStatus {
requested: RangeSet::new(),
downloaded: RangeSet::new(),
}),
download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise
number_of_open_requests: AtomicUsize::new(0),
ping_time_ms: AtomicUsize::new(0),
read_position: AtomicUsize::new(0),
});
let mut write_file = NamedTempFile::new().unwrap();
write_file.as_file().set_len(size as u64).unwrap();
write_file.seek(SeekFrom::Start(0)).unwrap();
let read_file = write_file.reopen().unwrap();
//let (seek_tx, seek_rx) = mpsc::unbounded();
let (stream_loader_command_tx, stream_loader_command_rx) =
mpsc::unbounded::<StreamLoaderCommand>();
let fetcher = AudioFileFetch::new(
session.clone(),
shared.clone(),
initial_data_rx,
initial_request_sent_time,
initial_data_length,
write_file,
stream_loader_command_rx,
complete_tx,
);
session.spawn(fetcher);
Ok(AudioFileStreaming {
read_file: read_file,
position: 0,
//seek: seek_tx,
stream_loader_command_tx: stream_loader_command_tx,
shared: shared,
})
}
}
fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel {
assert!(
offset % 4 == 0,
@ -511,143 +445,267 @@ enum ReceivedData {
Data(PartialFileData),
}
struct AudioFileFetchDataReceiver {
async fn audio_file_fetch_receive_data(
shared: Arc<AudioFileShared>,
file_data_tx: mpsc::UnboundedSender<ReceivedData>,
data_rx: ChannelData,
initial_data_offset: usize,
initial_request_length: usize,
data_offset: usize,
request_length: usize,
request_sent_time: Option<Instant>,
measure_ping_time: bool,
}
request_sent_time: Instant,
) {
let mut data_offset = initial_data_offset;
let mut request_length = initial_request_length;
let mut measure_ping_time = shared
.number_of_open_requests
.load(atomic::Ordering::SeqCst)
== 0;
impl AudioFileFetchDataReceiver {
fn new(
shared: Arc<AudioFileShared>,
file_data_tx: mpsc::UnboundedSender<ReceivedData>,
data_rx: ChannelData,
data_offset: usize,
request_length: usize,
request_sent_time: Instant,
) -> AudioFileFetchDataReceiver {
let measure_ping_time = shared
.number_of_open_requests
.load(atomic::Ordering::SeqCst)
== 0;
shared
.number_of_open_requests
.fetch_add(1, atomic::Ordering::SeqCst);
shared
.number_of_open_requests
.fetch_add(1, atomic::Ordering::SeqCst);
enum TryFoldErr {
ChannelError,
FinishEarly,
}
AudioFileFetchDataReceiver {
shared: shared,
data_rx: data_rx,
file_data_tx: file_data_tx,
initial_data_offset: data_offset,
initial_request_length: request_length,
data_offset: data_offset,
request_length: request_length,
request_sent_time: Some(request_sent_time),
measure_ping_time: measure_ping_time,
}
let result = data_rx
.map_err(|_| TryFoldErr::ChannelError)
.try_for_each(|data| {
if measure_ping_time {
let duration = Instant::now() - request_sent_time;
let duration_ms: u64;
if 0.001 * (duration.as_millis() as f64)
> MAXIMUM_ASSUMED_PING_TIME_SECONDS
{
duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64;
} else {
duration_ms = duration.as_millis() as u64;
}
let _ = file_data_tx
.unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize));
measure_ping_time = false;
}
let data_size = data.len();
let _ = file_data_tx
.unbounded_send(ReceivedData::Data(PartialFileData {
offset: data_offset,
data: data,
}));
data_offset += data_size;
if request_length < data_size {
warn!("Data receiver for range {} (+{}) received more data from server than requested.", initial_data_offset, initial_request_length);
request_length = 0;
} else {
request_length -= data_size;
}
future::ready(if request_length == 0 {
Err(TryFoldErr::FinishEarly)
} else {
Ok(())
})
})
.await;
if request_length > 0 {
let missing_range = Range::new(data_offset, request_length);
let mut download_status = shared.download_status.lock().unwrap();
download_status.requested.subtract_range(&missing_range);
shared.cond.notify_all();
}
shared
.number_of_open_requests
.fetch_sub(1, atomic::Ordering::SeqCst);
if let Err(TryFoldErr::ChannelError) = result {
warn!(
"Error from channel for data receiver for range {} (+{}).",
initial_data_offset, initial_request_length
);
} else if request_length > 0 {
warn!(
"Data receiver for range {} (+{}) received less data from server than requested.",
initial_data_offset, initial_request_length
);
}
}
/*
async fn audio_file_fetch(
session: Session,
shared: Arc<AudioFileShared>,
initial_data_rx: ChannelData,
initial_request_sent_time: Instant,
initial_data_length: usize,
impl AudioFileFetchDataReceiver {
fn finish(&mut self) {
if self.request_length > 0 {
let missing_range = Range::new(self.data_offset, self.request_length);
output: NamedTempFile,
stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
complete_tx: oneshot::Sender<NamedTempFile>,
) {
let (file_data_tx, file_data_rx) = unbounded::<ReceivedData>();
let mut download_status = self.shared.download_status.lock().unwrap();
download_status.requested.subtract_range(&missing_range);
self.shared.cond.notify_all();
}
let requested_range = Range::new(0, initial_data_length);
let mut download_status = shared.download_status.lock().unwrap();
download_status.requested.add_range(&requested_range);
self.shared
.number_of_open_requests
.fetch_sub(1, atomic::Ordering::SeqCst);
}
}
session.spawn(audio_file_fetch_receive_data(
shared.clone(),
file_data_tx.clone(),
initial_data_rx,
0,
initial_data_length,
initial_request_sent_time,
));
impl Future for AudioFileFetchDataReceiver {
type Output = ();
let mut network_response_times_ms: Vec::new();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
loop {
match self.data_rx.poll() {
Poll::Ready(Some(data)) => {
if self.measure_ping_time {
if let Some(request_sent_time) = self.request_sent_time {
let duration = Instant::now() - request_sent_time;
let duration_ms: u64;
if 0.001 * (duration.as_millis() as f64)
> MAXIMUM_ASSUMED_PING_TIME_SECONDS
{
duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64;
} else {
duration_ms = duration.as_millis() as u64;
}
let _ = self
.file_data_tx
.unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize));
self.measure_ping_time = false;
}
}
let data_size = data.len();
let _ = self
.file_data_tx
.unbounded_send(ReceivedData::Data(PartialFileData {
offset: self.data_offset,
data: data,
}));
self.data_offset += data_size;
if self.request_length < data_size {
warn!("Data receiver for range {} (+{}) received more data from server than requested.", self.initial_data_offset, self.initial_request_length);
self.request_length = 0;
} else {
self.request_length -= data_size;
}
if self.request_length == 0 {
self.finish();
return Poll::Ready(());
}
let f1 = file_data_rx.map(|x| Ok::<_, ()>(x)).try_for_each(|x| {
match x {
ReceivedData::ResponseTimeMs(response_time_ms) => {
trace!("Ping time estimated as: {} ms.", response_time_ms);
// record the response time
network_response_times_ms.push(response_time_ms);
// prune old response times. Keep at most three.
while network_response_times_ms.len() > 3 {
network_response_times_ms.remove(0);
}
Poll::Ready(None) => {
if self.request_length > 0 {
warn!("Data receiver for range {} (+{}) received less data from server than requested.", self.initial_data_offset, self.initial_request_length);
// stats::median is experimental. So we calculate the median of up to three ourselves.
let ping_time_ms: usize = match network_response_times_ms.len() {
1 => network_response_times_ms[0] as usize,
2 => {
((network_response_times_ms[0] + network_response_times_ms[1]) / 2) as usize
}
self.finish();
return Poll::Ready(());
3 => {
let mut times = network_response_times_ms.clone();
times.sort();
times[1]
}
_ => unreachable!(),
};
// store our new estimate for everyone to see
shared
.ping_time_ms
.store(ping_time_ms, atomic::Ordering::Relaxed);
}
ReceivedData::Data(data) => {
output
.as_mut()
.unwrap()
.seek(SeekFrom::Start(data.offset as u64))
.unwrap();
output
.as_mut()
.unwrap()
.write_all(data.data.as_ref())
.unwrap();
let mut full = false;
{
let mut download_status = shared.download_status.lock().unwrap();
let received_range = Range::new(data.offset, data.data.len());
download_status.downloaded.add_range(&received_range);
shared.cond.notify_all();
if download_status.downloaded.contained_length_from_value(0)
>= shared.file_size
{
full = true;
}
drop(download_status);
}
Poll::Pending => return Poll::Pending,
Err(ChannelError) => {
warn!(
"Error from channel for data receiver for range {} (+{}).",
self.initial_data_offset, self.initial_request_length
);
if full {
self.finish();
return Poll::Ready(());
return future::ready(Err(()));
}
}
}
future::ready(Ok(()))
});
let f2 = stream_loader_command_rx.map(Ok::<_, ()>).try_for_each(|x| {
match cmd {
StreamLoaderCommand::Fetch(request) => {
self.download_range(request.start, request.length);
}
StreamLoaderCommand::RandomAccessMode() => {
*(shared.download_strategy.lock().unwrap()) = DownloadStrategy::RandomAccess();
}
StreamLoaderCommand::StreamMode() => {
*(shared.download_strategy.lock().unwrap()) = DownloadStrategy::Streaming();
}
StreamLoaderCommand::Close() => return future::ready(Err(())),
}
Ok(())
});
let f3 = future::poll_fn(|_| {
if let DownloadStrategy::Streaming() = self.get_download_strategy() {
let number_of_open_requests = shared
.number_of_open_requests
.load(atomic::Ordering::SeqCst);
let max_requests_to_send =
MAX_PREFETCH_REQUESTS - min(MAX_PREFETCH_REQUESTS, number_of_open_requests);
if max_requests_to_send > 0 {
let bytes_pending: usize = {
let download_status = shared.download_status.lock().unwrap();
download_status
.requested
.minus(&download_status.downloaded)
.len()
};
let ping_time_seconds =
0.001 * shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
let download_rate = session.channel().get_download_rate_estimate();
let desired_pending_bytes = max(
(PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * shared.stream_data_rate as f64)
as usize,
(FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64)
as usize,
);
if bytes_pending < desired_pending_bytes {
self.pre_fetch_more_data(
desired_pending_bytes - bytes_pending,
max_requests_to_send,
);
}
}
}
Poll::Pending
});
future::select_all(vec![f1, f2, f3]).await
}*/
pin_project! {
struct AudioFileFetch {
session: Session,
shared: Arc<AudioFileShared>,
output: Option<NamedTempFile>,
file_data_tx: mpsc::UnboundedSender<ReceivedData>,
#[pin]
file_data_rx: mpsc::UnboundedReceiver<ReceivedData>,
#[pin]
stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
complete_tx: Option<oneshot::Sender<NamedTempFile>>,
network_response_times_ms: Vec<usize>,
}
}
struct AudioFileFetch {
session: Session,
shared: Arc<AudioFileShared>,
output: Option<NamedTempFile>,
file_data_tx: mpsc::UnboundedSender<ReceivedData>,
file_data_rx: mpsc::UnboundedReceiver<ReceivedData>,
stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
complete_tx: Option<oneshot::Sender<NamedTempFile>>,
network_response_times_ms: Vec<usize>,
}
impl AudioFileFetch {
fn new(
session: Session,
@ -668,17 +726,14 @@ impl AudioFileFetch {
download_status.requested.add_range(&requested_range);
}
let initial_data_receiver = AudioFileFetchDataReceiver::new(
session.spawn(audio_file_fetch_receive_data(
shared.clone(),
file_data_tx.clone(),
initial_data_rx,
0,
initial_data_length,
initial_request_sent_time,
);
session.spawn(initial_data_receiver);
// tokio::spawn(move |_| initial_data_receiver);
));
AudioFileFetch {
session: session,
@ -708,7 +763,7 @@ impl AudioFileFetch {
return;
}
if length <= 0 {
if length == 0 {
return;
}
@ -744,17 +799,14 @@ impl AudioFileFetch {
download_status.requested.add_range(range);
let receiver = AudioFileFetchDataReceiver::new(
self.session.spawn(audio_file_fetch_receive_data(
self.shared.clone(),
self.file_data_tx.clone(),
data,
range.start,
range.length,
Instant::now(),
);
self.session.spawn(receiver);
// tokio::spawn(move |_| receiver);
));
}
}
@ -802,9 +854,9 @@ impl AudioFileFetch {
}
}
fn poll_file_data_rx(&mut self) -> Poll<()> {
fn poll_file_data_rx(&mut self, cx: &mut Context<'_>) -> Poll<()> {
loop {
match self.file_data_rx.poll() {
match Pin::new(&mut self.file_data_rx).poll_next(cx) {
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms))) => {
trace!("Ping time estimated as: {} ms.", response_time_ms);
@ -827,7 +879,7 @@ impl AudioFileFetch {
}
3 => {
let mut times = self.network_response_times_ms.clone();
times.sort();
times.sort_unstable();
times[1]
}
_ => unreachable!(),
@ -874,30 +926,29 @@ impl AudioFileFetch {
}
}
Poll::Pending => return Poll::Pending,
// Err(()) => unreachable!(),
}
}
}
fn poll_stream_loader_command_rx(&mut self) -> Poll<()> {
fn poll_stream_loader_command_rx(&mut self, cx: &mut Context<'_>) -> Poll<()> {
loop {
match self.stream_loader_command_rx.poll() {
match Pin::new(&mut self.stream_loader_command_rx).poll_next(cx) {
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(StreamLoaderCommand::Fetch(request))) => {
self.download_range(request.start, request.length);
}
Poll::Ready(Some(StreamLoaderCommand::RandomAccessMode())) => {
*(self.shared.download_strategy.lock().unwrap()) =
DownloadStrategy::RandomAccess();
}
Poll::Ready(Some(StreamLoaderCommand::StreamMode())) => {
*(self.shared.download_strategy.lock().unwrap()) =
DownloadStrategy::Streaming();
}
Poll::Ready(Some(StreamLoaderCommand::Close())) => return Poll::Ready(()),
Poll::Ready(Some(cmd)) => match cmd {
StreamLoaderCommand::Fetch(request) => {
self.download_range(request.start, request.length);
}
StreamLoaderCommand::RandomAccessMode() => {
*(self.shared.download_strategy.lock().unwrap()) =
DownloadStrategy::RandomAccess();
}
StreamLoaderCommand::StreamMode() => {
*(self.shared.download_strategy.lock().unwrap()) =
DownloadStrategy::Streaming();
}
StreamLoaderCommand::Close() => return Poll::Ready(()),
},
Poll::Pending => return Poll::Pending,
// Err(()) => unreachable!(),
}
}
}
@ -910,21 +961,16 @@ impl AudioFileFetch {
let _ = complete_tx.send(output);
}
}
impl Future for AudioFileFetch {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
match self.poll_stream_loader_command_rx() {
Poll::Pending => (),
Poll::Ready(_) => return Poll::Ready(()),
// Err(()) => unreachable!(),
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if let Poll::Ready(()) = self.poll_stream_loader_command_rx(cx) {
return Poll::Ready(());
}
match self.poll_file_data_rx() {
Poll::Pending => (),
Poll::Ready(_) => return Poll::Ready(()),
// Err(()) => unreachable!(),
if let Poll::Ready(()) = self.poll_file_data_rx(cx) {
return Poll::Ready(());
}
if let DownloadStrategy::Streaming() = self.get_download_strategy() {
@ -964,8 +1010,7 @@ impl Future for AudioFileFetch {
}
}
}
return Poll::Pending;
Poll::Pending
}
}
@ -1005,9 +1050,9 @@ impl Read for AudioFileStreaming {
ranges_to_request.subtract_range_set(&download_status.downloaded);
ranges_to_request.subtract_range_set(&download_status.requested);
for range in ranges_to_request.iter() {
for &range in ranges_to_request.iter() {
self.stream_loader_command_tx
.unbounded_send(StreamLoaderCommand::Fetch(range.clone()))
.unbounded_send(StreamLoaderCommand::Fetch(range))
.unwrap();
}
@ -1054,7 +1099,7 @@ impl Read for AudioFileStreaming {
.read_position
.store(self.position as usize, atomic::Ordering::Relaxed);
return Ok(read_len);
Ok(read_len)
}
}

View file

@ -1,12 +1,15 @@
#[macro_use]
extern crate futures;
#![allow(clippy::unused_io_amount)]
#[macro_use]
extern crate log;
#[macro_use]
extern crate pin_project_lite;
extern crate aes_ctr;
extern crate bit_set;
extern crate byteorder;
extern crate bytes;
extern crate futures;
extern crate num_bigint;
extern crate num_traits;
extern crate tempfile;
@ -24,7 +27,7 @@ mod libvorbis_decoder;
mod range_set;
pub use decrypt::AudioDecrypt;
pub use fetch::{AudioFile, AudioFileOpen, StreamLoaderController};
pub use fetch::{AudioFile, StreamLoaderController};
pub use fetch::{
READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_BEFORE_PLAYBACK_SECONDS,
READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS,

View file

@ -54,11 +54,7 @@ impl RangeSet {
}
pub fn len(&self) -> usize {
let mut result = 0;
for range in self.ranges.iter() {
result += range.length;
}
return result;
self.ranges.iter().map(|r| r.length).fold(0, std::ops::Add::add)
}
pub fn get_range(&self, index: usize) -> Range {
@ -98,12 +94,12 @@ impl RangeSet {
return false;
}
}
return true;
true
}
pub fn add_range(&mut self, range: &Range) {
if range.length <= 0 {
// the interval is empty or invalid -> nothing to do.
if range.length == 0 {
// the interval is empty -> nothing to do.
return;
}
@ -111,7 +107,7 @@ impl RangeSet {
// the new range is clear of any ranges we already iterated over.
if range.end() < self.ranges[index].start {
// the new range starts after anything we already passed and ends before the next range starts (they don't touch) -> insert it.
self.ranges.insert(index, range.clone());
self.ranges.insert(index, *range);
return;
} else if range.start <= self.ranges[index].end()
&& self.ranges[index].start <= range.end()
@ -119,7 +115,7 @@ impl RangeSet {
// the new range overlaps (or touches) the first range. They are to be merged.
// In addition we might have to merge further ranges in as well.
let mut new_range = range.clone();
let mut new_range = *range;
while index < self.ranges.len() && self.ranges[index].start <= new_range.end() {
let new_end = max(new_range.end(), self.ranges[index].end());
@ -134,7 +130,7 @@ impl RangeSet {
}
// the new range is after everything else -> just add it
self.ranges.push(range.clone());
self.ranges.push(*range);
}
#[allow(dead_code)]
@ -152,7 +148,7 @@ impl RangeSet {
}
pub fn subtract_range(&mut self, range: &Range) {
if range.length <= 0 {
if range.length == 0 {
return;
}

View file

@ -13,37 +13,36 @@ path = "../protocol"
version = "0.1.3"
[dependencies]
aes = "0.6"
base64 = "0.13"
thiserror = "1.0"
byteorder = "1.3"
bytes = "0.5"
error-chain = { version = "0.12", default_features = false }
futures = {version = "0.3",features =["unstable","bilock"]}
byteorder = "1.4"
bytes = "1.0"
futures = { version = "0.3", features = ["bilock", "unstable"] }
hmac = "0.7"
httparse = "1.3"
hyper = "0.13"
hyper-proxy = { version = "0.6", default_features = false }
lazy_static = "1.3"
hyper = { version = "0.14", features = ["client", "tcp", "http1", "http2"] }
log = "0.4"
num-bigint = "0.3"
num-integer = "0.1"
num-traits = "0.2"
once_cell = "1.5.2"
pbkdf2 = "0.3"
pin-project-lite = "0.2.4"
protobuf = "~2.14.0"
rand = "0.7"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
sha-1 = "~0.8"
shannon = "0.2.0"
tokio = {version = "0.2", features = ["full","io-util","tcp"]} # io-util
tokio-util = {version = "0.3", features = ["compat","codec"]}
# tokio-codec = "0.1"
# tokio-io = "0.1"
tokio = { version = "1.0", features = ["io-util", "rt-multi-thread"] }
tokio-util = { version = "0.6", features = ["codec"] }
url = "1.7"
uuid = { version = "0.8", features = ["v4"] }
sha-1 = "0.8"
hmac = "0.7"
pbkdf2 = "0.3"
aes = "0.3"
[build-dependencies]
rand = "0.7"
vergen = "3.0.4"
[dev-dependencies]
tokio = {version = "1.0", features = ["macros"] }

View file

@ -1,83 +1,69 @@
const AP_FALLBACK: &'static str = "ap.spotify.com:443";
const APRESOLVE_ENDPOINT: &'static str = "http://apresolve.spotify.com/";
use hyper::client::HttpConnector;
use hyper::{self, Body, Client, Request, Uri};
use hyper_proxy::{Intercept, Proxy, ProxyConnector};
use serde_json;
use std::error;
use std::str::FromStr;
use hyper::{Body, Client, Method, Request, Uri};
use std::error::Error;
use url::Url;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct APResolveData {
ap_list: Vec<String>,
}
type Result<T> = std::result::Result<T, Box<dyn error::Error>>;
async fn apresolve(proxy: &Option<Url>, ap_port: &Option<u16>) -> Result<String> {
let url = Uri::from_str(APRESOLVE_ENDPOINT)?; //.expect("invalid AP resolve URL");
let use_proxy = proxy.is_some();
async fn apresolve(proxy: &Option<Url>, ap_port: &Option<u16>) -> Result<String, Box<dyn Error>> {
let port = ap_port.unwrap_or(443);
let mut req = Request::get(&url).body(Body::empty())?;
let response = match *proxy {
Some(ref val) => {
let proxy_url = Uri::from_str(val.as_str()).expect("invalid http proxy");
let proxy = Proxy::new(Intercept::All, proxy_url);
let req = Request::builder()
.method(Method::GET)
.uri(
APRESOLVE_ENDPOINT
.parse::<Uri>()
.expect("invalid AP resolve URL"),
)
.body(Body::empty())?;
let client = if proxy.is_some() {
todo!("proxies not yet supported")
/*let proxy = {
let proxy_url = val.as_str().parse().expect("invalid http proxy");
let mut proxy = Proxy::new(Intercept::All, proxy_url);
let connector = HttpConnector::new();
let proxy_connector = ProxyConnector::from_proxy_unsecured(connector, proxy);
if let Some(headers) = proxy_connector.http_headers(&url) {
req.headers_mut().extend(headers.clone().into_iter());
}
let client = Client::builder().build(proxy_connector);
client.request(req)
}
_ => {
let client = Client::new();
client.request(req)
}
}
.await?;
proxy_connector
};
let body = hyper::body::to_bytes(response.into_body()).await?;
let body = String::from_utf8(body.to_vec())?;
let data = serde_json::from_str::<APResolveData>(&body)?;
let ap = {
let mut aps = data.ap_list.iter().filter(|ap| {
if let Some(p) = ap_port {
Uri::from_str(ap)
.ok()
.map_or(false, |uri| uri.port_u16().map_or(false, |port| &port == p))
} else if use_proxy {
// It is unlikely that the proxy will accept CONNECT on anything other than 443.
Uri::from_str(ap).ok().map_or(false, |uri| {
uri.port_u16().map_or(false, |port| port == 443)
})
} else {
true
}
});
let ap = aps.next().ok_or("empty AP List")?;
Ok(ap.clone())
if let Some(headers) = proxy.http_headers(&APRESOLVE_ENDPOINT.parse().unwrap()) {
req.headers_mut().extend(headers.clone());
};
Client::builder().build(proxy)*/
} else {
Client::new()
};
ap
let response = client.request(req).await?;
let body = hyper::body::to_bytes(response.into_body()).await?;
let data: APResolveData = serde_json::from_slice(body.as_ref())?;
let ap = if ap_port.is_some() || proxy.is_some() {
data.ap_list.into_iter().find_map(|ap| {
if ap.parse::<Uri>().ok()?.port()? == port {
Some(ap)
} else {
None
}
})
} else {
data.ap_list.into_iter().next()
}
.ok_or("empty AP List")?;
Ok(ap)
}
pub(crate) async fn apresolve_or_fallback<E>(
proxy: &Option<Url>,
ap_port: &Option<u16>,
) -> Result<String> {
// match apresolve.await {
// Ok(ap)
// }
let ap = apresolve(proxy, ap_port).await.or_else(|e| {
warn!("Failed to resolve Access Point: {:?}", e);
pub async fn apresolve_or_fallback(proxy: &Option<Url>, ap_port: &Option<u16>) -> String {
apresolve(proxy, ap_port).await.unwrap_or_else(|e| {
warn!("Failed to resolve Access Point: {}", e);
warn!("Using fallback \"{}\"", AP_FALLBACK);
Ok(AP_FALLBACK.into())
});
ap
AP_FALLBACK.into()
})
}

View file

@ -3,7 +3,6 @@ use bytes::Bytes;
use futures::channel::oneshot;
use std::collections::HashMap;
use std::io::Write;
use thiserror::Error;
use crate::spotify_id::{FileId, SpotifyId};
use crate::util::SeqGenerator;
@ -11,13 +10,8 @@ use crate::util::SeqGenerator;
#[derive(Debug, Hash, PartialEq, Eq, Copy, Clone)]
pub struct AudioKey(pub [u8; 16]);
#[derive(Error, Debug)]
pub enum AudioKeyError {
#[error("AudioKey sender disconnected")]
Cancelled(#[from] oneshot::Canceled),
#[error("Unknown server response: `{0:?}`")]
UnknownResponse(Vec<u8>),
}
#[derive(Debug, Hash, PartialEq, Eq, Copy, Clone)]
pub struct AudioKeyError;
component! {
AudioKeyManager : AudioKeyManagerInner {
@ -45,11 +39,9 @@ impl AudioKeyManager {
data.as_ref()[0],
data.as_ref()[1]
);
let _ = sender.send(Err(AudioKeyError::UnknownResponse(
data.as_ref()[..1].to_vec(),
)));
let _ = sender.send(Err(AudioKeyError));
}
_ => warn!("Unexpected audioKey response: 0x{:x?} {:?}", cmd, data),
_ => (),
}
}
}
@ -64,7 +56,7 @@ impl AudioKeyManager {
});
self.send_key_request(seq, track, file);
rx.await?
rx.await.map_err(|_| AudioKeyError)?
}
fn send_key_request(&self, seq: u32, track: SpotifyId, file: FileId) {

View file

@ -1,11 +1,9 @@
use aes::Aes192;
use base64;
use aes::NewBlockCipher;
use byteorder::{BigEndian, ByteOrder};
use hmac::Hmac;
use pbkdf2::pbkdf2;
use protobuf::ProtobufEnum;
use serde;
use serde_json;
use sha1::{Digest, Sha1};
use std::fs::File;
use std::io::{self, Read, Write};
@ -76,9 +74,9 @@ impl Credentials {
// decrypt data using ECB mode without padding
let blob = {
use aes::block_cipher_trait::generic_array::typenum::Unsigned;
use aes::block_cipher_trait::generic_array::GenericArray;
use aes::block_cipher_trait::BlockCipher;
use aes::cipher::generic_array::typenum::Unsigned;
use aes::cipher::generic_array::GenericArray;
use aes::cipher::BlockCipher;
let mut data = base64::decode(encrypted_blob).unwrap();
let cipher = Aes192::new(GenericArray::from_slice(&key));

View file

@ -1,16 +1,15 @@
use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes;
use std::collections::HashMap;
use std::time::Instant;
use crate::util::SeqGenerator;
use futures::{channel::mpsc, lock::BiLock, Stream};
use futures::{channel::mpsc, lock::BiLock, Stream, StreamExt};
use std::{
collections::HashMap,
pin::Pin,
task::{Context, Poll},
time::Instant,
};
use crate::util::SeqGenerator;
component! {
ChannelManager : ChannelManagerInner {
sequence: SeqGenerator<u16> = SeqGenerator::new(0),
@ -105,12 +104,10 @@ impl ChannelManager {
}
impl Channel {
fn recv_packet(&mut self) -> Poll<Result<Bytes, ChannelError>> {
let (cmd, packet) = match self.receiver.poll() {
Poll::Ready(Ok(Some(t))) => t,
Poll::Ready(Ok(t)) => return Err(ChannelError), // The channel has been closed.
fn recv_packet(&mut self, cx: &mut Context<'_>) -> Poll<Result<Bytes, ChannelError>> {
let (cmd, packet) = match self.receiver.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending,
Err(()) => unreachable!(),
Poll::Ready(o) => o.ok_or(ChannelError)?,
};
if cmd == 0xa {
@ -119,7 +116,7 @@ impl Channel {
self.state = ChannelState::Closed;
Err(ChannelError)
Poll::Ready(Err(ChannelError))
} else {
Poll::Ready(Ok(packet))
}
@ -133,15 +130,19 @@ impl Channel {
}
impl Stream for Channel {
type Item = Result<Option<ChannelEvent>, ChannelError>;
type Item = Result<ChannelEvent, ChannelError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match self.state.clone() {
ChannelState::Closed => panic!("Polling already terminated channel"),
ChannelState::Header(mut data) => {
if data.len() == 0 {
data = ready!(self.recv_packet());
data = match self.recv_packet(cx) {
Poll::Ready(Ok(x)) => x,
Poll::Ready(Err(x)) => return Poll::Ready(Some(Err(x))),
Poll::Pending => return Poll::Pending,
};
}
let length = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
@ -155,19 +156,23 @@ impl Stream for Channel {
self.state = ChannelState::Header(data);
let event = ChannelEvent::Header(header_id, header_data);
return Poll::Ready(Ok(Some(event)));
return Poll::Ready(Some(Ok(event)));
}
}
ChannelState::Data => {
let data = ready!(self.recv_packet());
let data = match self.recv_packet(cx) {
Poll::Ready(Ok(x)) => x,
Poll::Ready(Err(x)) => return Poll::Ready(Some(Err(x))),
Poll::Pending => return Poll::Pending,
};
if data.len() == 0 {
self.receiver.close();
self.state = ChannelState::Closed;
return Poll::Ready(Ok(None));
return Poll::Ready(None);
} else {
let event = ChannelEvent::Data(data);
return Poll::Ready(Ok(Some(event)));
return Poll::Ready(Some(Ok(event)));
}
}
}
@ -176,36 +181,45 @@ impl Stream for Channel {
}
impl Stream for ChannelData {
type Item = Result<Option<Bytes>, ChannelError>;
type Item = Result<Bytes, ChannelError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut channel = match self.0.poll_lock() {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut channel = match self.0.poll_lock(cx) {
Poll::Ready(c) => c,
Poll::Pending => return Poll::Pending,
};
loop {
match ready!(channel.poll()) {
let x = match channel.poll_next_unpin(cx) {
Poll::Ready(x) => x.transpose()?,
Poll::Pending => return Poll::Pending,
};
match x {
Some(ChannelEvent::Header(..)) => (),
Some(ChannelEvent::Data(data)) => return Poll::Ready(Ok(Some(data))),
None => return Poll::Ready(Ok(None)),
Some(ChannelEvent::Data(data)) => return Poll::Ready(Some(Ok(data))),
None => return Poll::Ready(None),
}
}
}
}
impl Stream for ChannelHeaders {
type Item = Result<Option<(u8, Vec<u8>)>, ChannelError>;
type Item = Result<(u8, Vec<u8>), ChannelError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut channel = match self.0.poll_lock() {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut channel = match self.0.poll_lock(cx) {
Poll::Ready(c) => c,
Poll::Pending => return Poll::Pending,
};
match ready!(channel.poll()) {
Some(ChannelEvent::Header(id, data)) => Poll::Ready(Ok(Some((id, data)))),
Some(ChannelEvent::Data(..)) | None => Poll::Ready(Ok(None)),
let x = match channel.poll_next_unpin(cx) {
Poll::Ready(x) => x.transpose()?,
Poll::Pending => return Poll::Pending,
};
match x {
Some(ChannelEvent::Header(id, data)) => Poll::Ready(Some(Ok((id, data)))),
Some(ChannelEvent::Data(..)) | None => Poll::Ready(None),
}
}
}

View file

@ -35,29 +35,3 @@ macro_rules! component {
}
}
}
use std::cell::UnsafeCell;
use std::sync::Mutex;
pub(crate) struct Lazy<T>(Mutex<bool>, UnsafeCell<Option<T>>);
unsafe impl<T: Sync> Sync for Lazy<T> {}
unsafe impl<T: Send> Send for Lazy<T> {}
#[cfg_attr(feature = "cargo-clippy", allow(mutex_atomic))]
impl<T> Lazy<T> {
pub(crate) fn new() -> Lazy<T> {
Lazy(Mutex::new(false), UnsafeCell::new(None))
}
pub(crate) fn get<F: FnOnce() -> T>(&self, f: F) -> &T {
let mut inner = self.0.lock().unwrap();
if !*inner {
unsafe {
*self.1.get() = Some(f());
}
*inner = true;
}
unsafe { &*self.1.get() }.as_ref().unwrap()
}
}

View file

@ -35,11 +35,10 @@ impl APCodec {
}
}
type APCodecItem = (u8, Vec<u8>);
impl Encoder<APCodecItem> for APCodec {
impl Encoder<(u8, Vec<u8>)> for APCodec {
type Error = io::Error;
fn encode(&mut self, item: APCodecItem, buf: &mut BytesMut) -> io::Result<()> {
fn encode(&mut self, item: (u8, Vec<u8>), buf: &mut BytesMut) -> io::Result<()> {
let (cmd, payload) = item;
let offset = buf.len();

View file

@ -1,42 +1,25 @@
use super::codec::APCodec;
use crate::{
diffie_hellman::DHLocalKeys,
protocol,
protocol::keyexchange::{APResponseMessage, ClientHello, ClientResponsePlaintext},
util,
};
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use hmac::{Hmac, Mac};
use protobuf::{self, Message};
use rand::thread_rng;
use sha1::Sha1;
use std::{io, marker::Unpin};
use std::io;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio_util::codec::{Decoder, Framed};
// struct handshake {
// keys: DHLocalKeys,
// connection: T,
// accumulator: Vec<u8>,
// }
use super::codec::APCodec;
use crate::diffie_hellman::DHLocalKeys;
use crate::protocol;
use crate::protocol::keyexchange::{APResponseMessage, ClientHello, ClientResponsePlaintext};
use crate::util;
pub async fn handshake<T: AsyncRead + AsyncWrite + Unpin>(
mut connection: T,
) -> Result<Framed<T, APCodec>, io::Error> {
) -> io::Result<Framed<T, APCodec>> {
let local_keys = DHLocalKeys::random(&mut thread_rng());
// Send ClientHello
let client_hello: Vec<u8> = client_hello(local_keys.public_key()).await?;
connection.write_all(&client_hello).await?;
// Receive APResponseMessage
let size = connection.read_u32().await?;
let mut buffer = Vec::with_capacity(size as usize - 4);
let bytes = connection.read_buf(&mut buffer).await?;
let message = protobuf::parse_from_bytes::<APResponseMessage>(&buffer[..bytes])?;
let mut accumulator = client_hello.clone();
accumulator.extend_from_slice(&size.to_be_bytes());
accumulator.extend_from_slice(&buffer);
let gc = local_keys.public_key();
let mut accumulator = client_hello(&mut connection, gc).await?;
let message: APResponseMessage = recv_packet(&mut connection, &mut accumulator).await?;
let remote_key = message
.get_challenge()
.get_login_crypto_challenge()
@ -44,28 +27,19 @@ pub async fn handshake<T: AsyncRead + AsyncWrite + Unpin>(
.get_gs()
.to_owned();
// Solve the challenge
let shared_secret = local_keys.shared_secret(&remote_key);
let (challenge, send_key, recv_key) = compute_keys(&shared_secret, &accumulator);
let codec = APCodec::new(&send_key, &recv_key);
let buffer: Vec<u8> = client_response(challenge).await?;
connection.write_all(&buffer).await?;
let framed = codec.framed(connection);
Ok(framed)
client_response(&mut connection, challenge).await?;
Ok(codec.framed(connection))
}
// async fn recv_packet<T: AsyncRead + Unpin, Message: protobuf::Message>(
// mut connection: T,
// ) -> Result<(Message, &Vec<u8>), io::Error> {
// let size = connection.read_u32().await?;
// let mut buffer = Vec::with_capacity(size as usize - 4);
// let bytes = connection.read_buf(&mut buffer).await?;
// let proto = protobuf::parse_from_bytes(&buffer[..bytes])?;
// Ok(proto)
// }
async fn client_hello(gc: Vec<u8>) -> Result<Vec<u8>, io::Error> {
async fn client_hello<T>(connection: &mut T, gc: Vec<u8>) -> io::Result<Vec<u8>>
where
T: AsyncWrite + Unpin,
{
let mut packet = ClientHello::new();
packet
.mut_build_info()
@ -73,7 +47,7 @@ async fn client_hello(gc: Vec<u8>) -> Result<Vec<u8>, io::Error> {
packet
.mut_build_info()
.set_platform(protocol::keyexchange::Platform::PLATFORM_LINUX_X86);
packet.mut_build_info().set_version(109_800_078);
packet.mut_build_info().set_version(109800078);
packet
.mut_cryptosuites_supported()
.push(protocol::keyexchange::Cryptosuite::CRYPTO_SUITE_SHANNON);
@ -88,15 +62,19 @@ async fn client_hello(gc: Vec<u8>) -> Result<Vec<u8>, io::Error> {
packet.set_client_nonce(util::rand_vec(&mut thread_rng(), 0x10));
packet.set_padding(vec![0x1e]);
let mut buffer = vec![0, 4];
let size = 2 + 4 + packet.compute_size();
let mut buffer = Vec::with_capacity(size as usize);
buffer.extend(&[0, 4]);
buffer.write_u32(size).await?;
buffer.extend(packet.write_to_bytes()?);
<Vec<u8> as WriteBytesExt>::write_u32::<BigEndian>(&mut buffer, size).unwrap();
packet.write_to_vec(&mut buffer).unwrap();
connection.write_all(&buffer[..]).await?;
Ok(buffer)
}
async fn client_response(challenge: Vec<u8>) -> Result<Vec<u8>, io::Error> {
async fn client_response<T>(connection: &mut T, challenge: Vec<u8>) -> io::Result<()>
where
T: AsyncWrite + Unpin,
{
let mut packet = ClientResponsePlaintext::new();
packet
.mut_login_crypto_response()
@ -105,14 +83,37 @@ async fn client_response(challenge: Vec<u8>) -> Result<Vec<u8>, io::Error> {
packet.mut_pow_response();
packet.mut_crypto_response();
// let mut buffer = vec![];
let mut buffer = vec![];
let size = 4 + packet.compute_size();
let mut buffer = Vec::with_capacity(size as usize);
buffer.write_u32(size).await?;
// This seems to reallocate
// packet.write_to_vec(&mut buffer)?;
buffer.extend(packet.write_to_bytes()?);
Ok(buffer)
<Vec<u8> as WriteBytesExt>::write_u32::<BigEndian>(&mut buffer, size).unwrap();
packet.write_to_vec(&mut buffer).unwrap();
connection.write_all(&buffer[..]).await?;
Ok(())
}
async fn recv_packet<T, M>(connection: &mut T, acc: &mut Vec<u8>) -> io::Result<M>
where
T: AsyncRead + Unpin,
M: Message,
{
let header = read_into_accumulator(connection, 4, acc).await?;
let size = BigEndian::read_u32(header) as usize;
let data = read_into_accumulator(connection, size - 4, acc).await?;
let message = protobuf::parse_from_bytes(data).unwrap();
Ok(message)
}
async fn read_into_accumulator<'a, T: AsyncRead + Unpin>(
connection: &mut T,
size: usize,
acc: &'a mut Vec<u8>,
) -> io::Result<&'a mut [u8]> {
let offset = acc.len();
acc.resize(offset + size, 0);
connection.read_exact(&mut acc[offset..]).await?;
Ok(&mut acc[offset..])
}
fn compute_keys(shared_secret: &[u8], packets: &[u8]) -> (Vec<u8>, Vec<u8>, Vec<u8>) {

View file

@ -1,63 +1,56 @@
mod codec;
mod handshake;
pub use self::{codec::APCodec, handshake::handshake};
use crate::{authentication::Credentials, version};
pub use self::codec::APCodec;
pub use self::handshake::handshake;
use futures::{SinkExt, StreamExt};
use protobuf::{self, Message};
use std::{io, net::ToSocketAddrs};
use std::io;
use std::net::ToSocketAddrs;
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use url::Url;
// use crate::proxytunnel;
use crate::authentication::Credentials;
use crate::version;
use crate::proxytunnel;
pub type Transport = Framed<TcpStream, APCodec>;
pub async fn connect(addr: String, proxy: &Option<Url>) -> Result<Transport, io::Error> {
let (addr, connect_url): (_, Option<String>) = match *proxy {
Some(ref url) => {
info!("Using proxy \"{}\"", url);
let mut iter = url.to_socket_addrs()?;
let socket_addr = iter.next().ok_or_else(|| {
pub async fn connect(addr: String, proxy: &Option<Url>) -> io::Result<Transport> {
let socket = if let Some(proxy) = proxy {
info!("Using proxy \"{}\"", proxy);
let socket_addr = proxy.to_socket_addrs().and_then(|mut iter| {
iter.next().ok_or_else(|| {
io::Error::new(
io::ErrorKind::NotFound,
"Can't resolve proxy server address",
)
})?;
(socket_addr, Some(addr))
}
None => {
let mut iter = addr.to_socket_addrs()?;
let socket_addr = iter.next().ok_or_else(|| {
})
})?;
let socket = TcpStream::connect(&socket_addr).await?;
proxytunnel::connect(socket, &addr).await?
} else {
let socket_addr = addr.to_socket_addrs().and_then(|mut iter| {
iter.next().ok_or_else(|| {
io::Error::new(io::ErrorKind::NotFound, "Can't resolve server address")
})?;
(socket_addr, None)
}
})
})?;
TcpStream::connect(&socket_addr).await?
};
let connection = TcpStream::connect(&addr).await?;
if let Some(connect_url) = connect_url {
unimplemented!()
// let connection = proxytunnel::connect(connection, &connect_url).await?;
// let connection = handshake(connection).await?;
// Ok(connection)
} else {
handshake(connection).await
}
handshake(socket).await
}
pub async fn authenticate(
mut transport: Transport,
transport: &mut Transport,
credentials: Credentials,
device_id: String,
) -> Result<(Transport, Credentials), io::Error> {
use crate::protocol::{
authentication::{APWelcome, ClientResponseEncrypted, CpuFamily, Os},
keyexchange::APLoginFailed,
};
device_id: &str,
) -> io::Result<Credentials> {
use crate::protocol::authentication::{APWelcome, ClientResponseEncrypted, CpuFamily, Os};
use crate::protocol::keyexchange::APLoginFailed;
let mut packet = ClientResponseEncrypted::new();
packet
@ -80,19 +73,18 @@ pub async fn authenticate(
version::short_sha(),
version::build_id()
));
packet.mut_system_info().set_device_id(device_id);
packet
.mut_system_info()
.set_device_id(device_id.to_string());
packet.set_version_string(version::version_string());
let cmd: u8 = 0xab;
let cmd = 0xab;
let data = packet.write_to_bytes().unwrap();
transport.send((cmd, data)).await?;
let packet = transport.next().await;
// TODO: Don't panic?
match packet {
Some(Ok((0xac, data))) => {
let (cmd, data) = transport.next().await.expect("EOF")?;
match cmd {
0xac => {
let welcome_data: APWelcome = protobuf::parse_from_bytes(data.as_ref()).unwrap();
let reusable_credentials = Credentials {
@ -101,10 +93,10 @@ pub async fn authenticate(
auth_data: welcome_data.get_reusable_auth_credentials().to_owned(),
};
Ok((transport, reusable_credentials))
Ok(reusable_credentials)
}
Some(Ok((0xad, data))) => {
0xad => {
let error_data: APLoginFailed = protobuf::parse_from_bytes(data.as_ref()).unwrap();
panic!(
"Authentication failed with reason: {:?}",
@ -112,8 +104,6 @@ pub async fn authenticate(
)
}
Some(Ok((cmd, _))) => panic!("Unexpected packet {:?}", cmd),
Some(err @ Err(_)) => panic!("Packet error: {:?}", err),
None => panic!("EOF"),
_ => panic!("Unexpected packet {:?}", cmd),
}
}

View file

@ -1,12 +1,12 @@
use num_bigint::BigUint;
use num_traits::FromPrimitive;
use once_cell::sync::Lazy;
use rand::Rng;
use crate::util;
lazy_static! {
pub static ref DH_GENERATOR: BigUint = BigUint::from_u64(0x2).unwrap();
pub static ref DH_PRIME: BigUint = BigUint::from_bytes_be(&[
pub static DH_GENERATOR: Lazy<BigUint> = Lazy::new(|| BigUint::from_bytes_be(&[0x02]));
pub static DH_PRIME: Lazy<BigUint> = Lazy::new(|| {
BigUint::from_bytes_be(&[
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xc9, 0x0f, 0xda, 0xa2, 0x21, 0x68, 0xc2,
0x34, 0xc4, 0xc6, 0x62, 0x8b, 0x80, 0xdc, 0x1c, 0xd1, 0x29, 0x02, 0x4e, 0x08, 0x8a, 0x67,
0xcc, 0x74, 0x02, 0x0b, 0xbe, 0xa6, 0x3b, 0x13, 0x9b, 0x22, 0x51, 0x4a, 0x08, 0x79, 0x8e,
@ -14,8 +14,8 @@ lazy_static! {
0xf2, 0x5f, 0x14, 0x37, 0x4f, 0xe1, 0x35, 0x6d, 0x6d, 0x51, 0xc2, 0x45, 0xe4, 0x85, 0xb5,
0x76, 0x62, 0x5e, 0x7e, 0xc6, 0xf4, 0x4c, 0x42, 0xe9, 0xa6, 0x3a, 0x36, 0x20, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
]);
}
])
});
pub struct DHLocalKeys {
private_key: BigUint,

View file

@ -1,8 +1,4 @@
// use futures::Future;
use serde_json;
use crate::mercury::MercuryError;
use crate::session::Session;
use crate::{mercury::MercuryError, session::Session};
#[derive(Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
@ -22,13 +18,7 @@ pub async fn get_token(
"hm://keymaster/token/authenticated?client_id={}&scope={}",
client_id, scopes
);
// Box::new(session.mercury().get(url).map(move |response| {
session.mercury().get(url).await.map(move |response| {
let data = response.payload.first().expect("Empty payload");
let data = String::from_utf8(data.clone()).unwrap();
let token: Token = serde_json::from_str(&data).unwrap();
token
})
let response = session.mercury().get(url).await?;
let data = response.payload.first().expect("Empty payload");
serde_json::from_slice(data.as_ref()).map_err(|_| MercuryError)
}

View file

@ -1,27 +1,23 @@
#![cfg_attr(feature = "cargo-clippy", allow(unused_io_amount))]
#![allow(clippy::unused_io_amount)]
// #[macro_use]
// extern crate error_chain;
#[macro_use]
extern crate futures;
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate log;
#[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate pin_project_lite;
extern crate aes;
extern crate base64;
extern crate byteorder;
extern crate bytes;
extern crate futures;
extern crate hmac;
extern crate httparse;
extern crate hyper;
extern crate hyper_proxy;
extern crate num_bigint;
extern crate num_integer;
extern crate num_traits;
extern crate once_cell;
extern crate pbkdf2;
extern crate protobuf;
extern crate rand;
@ -29,9 +25,8 @@ extern crate serde;
extern crate serde_json;
extern crate sha1;
extern crate shannon;
extern crate tokio;
// extern crate tokio_codec;
// extern crate tokio_io;
pub extern crate tokio;
extern crate tokio_util;
extern crate url;
extern crate uuid;
@ -39,7 +34,8 @@ extern crate librespot_protocol as protocol;
#[macro_use]
mod component;
mod apresolve;
pub mod apresolve;
pub mod audio_key;
pub mod authentication;
pub mod cache;
@ -49,7 +45,7 @@ pub mod connection;
pub mod diffie_hellman;
pub mod keymaster;
pub mod mercury;
pub mod proxytunnel;
mod proxytunnel;
pub mod session;
pub mod spotify_id;
pub mod util;

View file

@ -1,18 +1,14 @@
use crate::protocol;
use crate::util::url_encode;
use crate::util::SeqGenerator;
use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes;
use protobuf;
use std::collections::HashMap;
use std::mem;
use futures::{
channel::{mpsc, oneshot},
Future, FutureExt,
Future,
};
use std::task::Poll;
use crate::util::SeqGenerator;
use std::{collections::HashMap, task::Poll};
use std::{mem, pin::Pin, task::Context};
mod types;
pub use self::types::*;
@ -35,16 +31,21 @@ pub struct MercuryPending {
callback: Option<oneshot::Sender<Result<MercuryResponse, MercuryError>>>,
}
pub struct MercuryFuture<T>(oneshot::Receiver<Result<T, MercuryError>>);
pin_project! {
pub struct MercuryFuture<T> {
#[pin]
receiver: oneshot::Receiver<Result<T, MercuryError>>
}
}
impl<T> Future for MercuryFuture<T> {
type Output = Result<T, MercuryError>;
fn poll(&mut self) -> Poll<Self::Output> {
match self.0.poll() {
Poll::Ready(Ok(Ok(value))) => Poll::Ready(Ok(value)),
Poll::Ready(Ok(Err(err))) => Err(err),
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project().receiver.poll(cx) {
Poll::Ready(Ok(x)) => Poll::Ready(x),
Poll::Ready(Err(_)) => Poll::Ready(Err(MercuryError)),
Poll::Pending => Poll::Pending,
Err(oneshot::Canceled) => Err(MercuryError),
}
}
}
@ -76,7 +77,7 @@ impl MercuryManager {
let data = req.encode(&seq);
self.session().send_packet(cmd, data);
MercuryFuture(rx)
MercuryFuture { receiver: rx }
}
pub fn get<T: Into<String>>(&self, uri: T) -> MercuryFuture<MercuryResponse> {
@ -106,40 +107,41 @@ impl MercuryManager {
uri: T,
) -> Result<mpsc::UnboundedReceiver<MercuryResponse>, MercuryError> {
let uri = uri.into();
let request = self.request(MercuryRequest {
method: MercuryMethod::SUB,
uri: uri.clone(),
content_type: None,
payload: Vec::new(),
});
let response = self
.request(MercuryRequest {
method: MercuryMethod::SUB,
uri: uri.clone(),
content_type: None,
payload: Vec::new(),
})
.await?;
let (tx, rx) = mpsc::unbounded();
let manager = self.clone();
request.await.map(move |response| {
let (tx, rx) = mpsc::unbounded();
manager.lock(move |inner| {
if !inner.invalid {
debug!("subscribed uri={} count={}", uri, response.payload.len());
if response.payload.len() > 0 {
// Old subscription protocol, watch the provided list of URIs
for sub in response.payload {
let mut sub: protocol::pubsub::Subscription =
protobuf::parse_from_bytes(&sub).unwrap();
let sub_uri = sub.take_uri();
manager.lock(move |inner| {
if !inner.invalid {
debug!("subscribed uri={} count={}", uri, response.payload.len());
if !response.payload.is_empty() {
// Old subscription protocol, watch the provided list of URIs
for sub in response.payload {
let mut sub: protocol::pubsub::Subscription =
protobuf::parse_from_bytes(&sub).unwrap();
let sub_uri = sub.take_uri();
debug!("subscribed sub_uri={}", sub_uri);
debug!("subscribed sub_uri={}", sub_uri);
inner.subscriptions.push((sub_uri, tx.clone()));
}
} else {
// New subscription protocol, watch the requested URI
inner.subscriptions.push((uri, tx));
inner.subscriptions.push((sub_uri, tx.clone()));
}
} else {
// New subscription protocol, watch the requested URI
inner.subscriptions.push((uri, tx));
}
});
}
});
rx
})
Ok(rx)
}
pub(crate) fn dispatch(&self, cmd: u8, mut data: Bytes) {
@ -195,7 +197,7 @@ impl MercuryManager {
let header: protocol::mercury::Header = protobuf::parse_from_bytes(&header_data).unwrap();
let response = MercuryResponse {
uri: url_encode(header.get_uri()).to_owned(),
uri: url_encode(header.get_uri()),
status_code: header.get_status_code(),
payload: pending.parts,
};

View file

@ -1,11 +1,7 @@
use futures::{Future, Sink};
use std::collections::VecDeque;
use futures::Sink;
use std::{collections::VecDeque, pin::Pin, task::Context};
use super::*;
use std::{
pin::Pin,
task::{Context, Poll},
};
pub struct MercurySender {
mercury: MercuryManager,
@ -34,25 +30,38 @@ impl Clone for MercurySender {
}
}
type SinkItem = Vec<u8>;
impl Sink<SinkItem> for MercurySender {
impl Sink<Vec<u8>> for MercurySender {
type Error = MercuryError;
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> Result<(), Self::Error> {
let task = self.mercury.send(self.uri.clone(), item);
self.pending.push_back(task);
fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_flush(cx)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {
match self.pending.front_mut() {
Some(task) => {
ready!(task.poll());
match Pin::new(task).poll(cx) {
Poll::Ready(Err(x)) => return Poll::Ready(Err(x)),
Poll::Pending => return Poll::Pending,
_ => (),
};
}
None => {
return Poll::Ready(Ok(()));
}
None => return Poll::Ready(Ok(())),
}
self.pending.pop_front();
}
}
fn start_send(mut self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
let task = self.mercury.send(self.uri.clone(), item);
self.pending.push_back(task);
Ok(())
}
}

View file

@ -1,124 +1,45 @@
use std::io;
use std::str::FromStr;
use httparse;
use hyper::Uri;
// use tokio_io::io::{read, write_all, Read, Window, WriteAll};
// use tokio_io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use futures::{
io::{Read, Window, WriteAll},
AsyncRead, AsyncWrite, Future,
};
use std::{
pin::Pin,
task::{Context, Poll},
};
// use tokio::io::{AsyncReadExt, AsyncWriteExt};
pub struct ProxyTunnel<'a, T> {
state: ProxyState<'a, T>,
}
enum ProxyState<'a, T> {
ProxyConnect(WriteAll<'a, T>),
ProxyResponse(Read<'a, T>),
}
pub fn connect<'a, T: AsyncRead + AsyncWrite>(
connection: T,
pub async fn connect<T: AsyncRead + AsyncWrite + Unpin>(
mut connection: T,
connect_url: &str,
) -> ProxyTunnel<'a, T> {
let proxy = proxy_connect(connection, connect_url);
ProxyTunnel {
state: ProxyState::ProxyConnect(proxy),
}
}
impl<'a, T: AsyncRead + AsyncWrite> Future for ProxyTunnel<'a, T> {
type Output = Result<T, io::Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use self::ProxyState::*;
loop {
self.state = match self.state {
ProxyConnect(ref mut write) => {
let (connection, mut accumulator) = ready!(write.poll());
let capacity = accumulator.capacity();
accumulator.resize(capacity, 0);
let window = Window::new(accumulator);
// let read = read(connection, window);
// ProxyResponse(read)
ProxyResponse(connection.read(window))
}
ProxyResponse(ref mut read_f) => {
let (connection, mut window, bytes_read) = ready!(read_f.poll());
if bytes_read == 0 {
return Err(io::Error::new(io::ErrorKind::Other, "Early EOF from proxy"));
}
let data_end = window.start() + bytes_read;
let buf = window.get_ref()[0..data_end].to_vec();
let mut headers = [httparse::EMPTY_HEADER; 16];
let mut response = httparse::Response::new(&mut headers);
let status = match response.parse(&buf) {
Ok(status) => status,
Err(err) => {
return Err(io::Error::new(io::ErrorKind::Other, err.to_string()));
}
};
if status.is_complete() {
if let Some(code) = response.code {
if code == 200 {
// Proxy says all is well
return Poll::Ready(connection);
} else {
let reason = response.reason.unwrap_or("no reason");
let msg = format!("Proxy responded with {}: {}", code, reason);
return Err(io::Error::new(io::ErrorKind::Other, msg));
}
} else {
return Err(io::Error::new(
io::ErrorKind::Other,
"Malformed response from proxy",
));
}
} else {
if data_end >= window.end() {
// Allocate some more buffer space
let newsize = data_end + 100;
window.get_mut().resize(newsize, 0);
window.set_end(newsize);
}
// We did not get a full header
window.set_start(data_end);
// let read = read(connection, window);
// ProxyResponse(read)
ProxyResponse(connection.read(window))
}
}
}
}
}
}
fn proxy_connect<T: AsyncWrite>(connection: T, connect_url: &str) -> WriteAll<T> {
let uri = Uri::from_str(connect_url).unwrap();
let buffer = format!(
) -> io::Result<T> {
let uri = connect_url.parse::<Uri>().unwrap();
let mut buffer = format!(
"CONNECT {0}:{1} HTTP/1.1\r\n\
\r\n",
uri.host().expect(&format!("No host in {}", uri)),
uri.port_u16().expect(&format!("No port in {}", uri))
uri.host().unwrap_or_else(|| panic!("No host in {}", uri)),
uri.port().unwrap_or_else(|| panic!("No port in {}", uri))
)
.into_bytes();
connection.write_all(buffer.as_ref()).await?;
// write_all(connection, buffer)
connection.write_all(buffer)
buffer.clear();
connection.read_to_end(&mut buffer).await?;
if buffer.is_empty() {
return Err(io::Error::new(io::ErrorKind::Other, "Early EOF from proxy"));
}
let mut headers = [httparse::EMPTY_HEADER; 16];
let mut response = httparse::Response::new(&mut headers);
response
.parse(&buffer[..])
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
match response.code {
Some(200) => Ok(connection), // Proxy says all is well
Some(code) => {
let reason = response.reason.unwrap_or("no reason");
let msg = format!("Proxy responded with {}: {}", code, reason);
Err(io::Error::new(io::ErrorKind::Other, msg))
}
None => Err(io::Error::new(
io::ErrorKind::Other,
"Malformed response from proxy",
)),
}
}

View file

@ -1,32 +1,23 @@
use std::io;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock, Weak};
use std::sync::{Arc, RwLock, Weak};
use std::task::Poll;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{io, pin::Pin, task::Context};
use once_cell::sync::OnceCell;
use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes;
// use futures::sync::mpsc;
// use futures::{Async, Future, IntoFuture, Poll, Stream};
// use tokio::runtime::{current_thread, current_thread::Handle};
// use futures::future::{IntoFuture, Remote};
use futures::{channel::mpsc, future, Future, Stream, StreamExt, TryFutureExt};
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::runtime::Handle;
use futures::{channel::mpsc, Future, FutureExt, StreamExt, TryStream, TryStreamExt};
use crate::apresolve::apresolve_or_fallback;
// use crate::audio_key::AudioKeyManager;
use crate::audio_key::AudioKeyManager;
use crate::authentication::Credentials;
use crate::cache::Cache;
// use crate::channel::ChannelManager;
// use crate::component::Lazy;
use crate::channel::ChannelManager;
use crate::config::SessionConfig;
use crate::connection;
// use crate::mercury::MercuryManager;
use crate::mercury::MercuryManager;
struct SessionData {
country: String,
@ -39,13 +30,13 @@ struct SessionInternal {
config: SessionConfig,
data: RwLock<SessionData>,
tx_connection: mpsc::UnboundedSender<io::Result<(u8, Vec<u8>)>>,
tx_connection: mpsc::UnboundedSender<(u8, Vec<u8>)>,
// audio_key: Lazy<AudioKeyManager>,
// channel: Lazy<ChannelManager>,
// mercury: Lazy<MercuryManager>,
audio_key: OnceCell<AudioKeyManager>,
channel: OnceCell<ChannelManager>,
mercury: OnceCell<MercuryManager>,
cache: Option<Arc<Cache>>,
handle: Mutex<Handle>,
session_id: usize,
}
@ -54,63 +45,35 @@ static SESSION_COUNTER: AtomicUsize = AtomicUsize::new(0);
#[derive(Clone)]
pub struct Session(Arc<SessionInternal>);
// TODO: Define better errors!
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
impl Session {
pub async fn connect(
config: SessionConfig,
credentials: Credentials,
cache: Option<Cache>,
handle: Handle,
) -> Result<Session> {
let access_point_addr =
apresolve_or_fallback::<io::Error>(&config.proxy, &config.ap_port).await?;
) -> io::Result<Session> {
let ap = apresolve_or_fallback(&config.proxy, &config.ap_port).await;
let proxy = config.proxy.clone();
info!("Connecting to AP \"{}\"", access_point_addr);
let connection = connection::connect(access_point_addr, &proxy);
info!("Connecting to AP \"{}\"", ap);
let mut conn = connection::connect(ap, &config.proxy).await?;
let device_id = config.device_id.clone();
let authentication = connection.and_then(move |connection| {
connection::authenticate(connection, credentials, device_id)
});
let reusable_credentials =
connection::authenticate(&mut conn, credentials, &config.device_id).await?;
info!("Authenticated as \"{}\" !", reusable_credentials.username);
if let Some(cache) = &cache {
cache.save_credentials(&reusable_credentials);
}
let result = match authentication.await {
Ok((transport, reusable_credentials)) => {
info!("Authenticated as \"{}\" !", reusable_credentials.username);
if let Some(ref cache) = cache {
cache.save_credentials(&reusable_credentials);
}
let session = Session::create(conn, config, cache, reusable_credentials.username);
let (session, tasks) = Session::create(
&handle,
transport,
config,
cache,
reusable_credentials.username.clone(),
);
tokio::task::spawn_local(async move { tasks });
Ok(session)
}
Err(e) => {
error!("Unable to Connect");
Err(e.into())
}
};
result
Ok(session)
}
fn create(
handle: &Handle,
transport: connection::Transport,
config: SessionConfig,
cache: Option<Cache>,
username: String,
) -> (Session, Box<dyn Future<Output = (Result<()>, Result<()>)>>) {
) -> Session {
let (sink, stream) = transport.split();
let (sender_tx, sender_rx) = mpsc::unbounded();
@ -119,7 +82,7 @@ impl Session {
debug!("new Session[{}]", session_id);
let session = Session(Arc::new(SessionInternal {
config,
config: config,
data: RwLock::new(SessionData {
country: String::new(),
canonical_username: username,
@ -131,73 +94,51 @@ impl Session {
cache: cache.map(Arc::new),
// audio_key: Lazy::new(),
// channel: Lazy::new(),
// mercury: Lazy::new(),
handle: Mutex::new(handle.clone()),
session_id,
audio_key: OnceCell::new(),
channel: OnceCell::new(),
mercury: OnceCell::new(),
session_id: session_id,
}));
let sender_task = sender_rx
.forward(sink)
.map_err(|e| -> Box<dyn std::error::Error> { Box::new(e) });
let sender_task = sender_rx.map(Ok::<_, io::Error>).forward(sink);
let receiver_task = DispatchTask(stream, session.weak());
let task = Box::new(future::join(receiver_task, sender_task));
(session, task)
let task =
futures::future::join(sender_task, receiver_task).map(|_| io::Result::<_>::Ok(()));
tokio::spawn(task);
session
}
// pub fn audio_key(&self) -> &AudioKeyManager {
// self.0.audio_key.get(|| AudioKeyManager::new(self.weak()))
// }
pub fn audio_key(&self) -> &AudioKeyManager {
self.0
.audio_key
.get_or_init(|| AudioKeyManager::new(self.weak()))
}
// pub fn channel(&self) -> &ChannelManager {
// self.0.channel.get(|| ChannelManager::new(self.weak()))
// }
pub fn channel(&self) -> &ChannelManager {
self.0
.channel
.get_or_init(|| ChannelManager::new(self.weak()))
}
// pub fn mercury(&self) -> &MercuryManager {
// self.0.mercury.get(|| MercuryManager::new(self.weak()))
// }
pub fn mercury(&self) -> &MercuryManager {
self.0
.mercury
.get_or_init(|| MercuryManager::new(self.weak()))
}
pub fn time_delta(&self) -> i64 {
self.0.data.read().unwrap().time_delta
}
// Spawn a future directly
// pub fn spawn<F>(&self, f: F)
// where
// F: Future<Output = ()> + Send + 'static,
// {
// let handle = self.0.handle.lock().unwrap();
// let spawn_res = handle.spawn(f);
// match spawn_res {
// Ok(_) => (),
// Err(e) => error!("Session SpawnErr {:?}", e),
// }
// }
// pub fn spawn<F, R>(&self, f: F)
// where
// F: FnOnce() -> R + Send + 'static,
// R: Future<Item = (), Error = ()> + Send + 'static,
// {
// // This fails when called from a different thread
// // current_thread::spawn(future::lazy(|| f()));
//
// // These fail when the Future doesn't implement Send
// let handle = self.0.handle.lock().unwrap();
// let spawn_res = handle.spawn(lazy(|| f()));
//
// // let mut te = current_thread::TaskExecutor::current();
// // let spawn_res = te.spawn_local(Box::new(future::lazy(|| f())));
//
// match spawn_res {
// Ok(_) => (),
// Err(e) => error!("Session SpawnErr {:?}", e),
// }
// }
pub fn spawn<T>(&self, task: T)
where
T: Future + Send + 'static,
T::Output: Send + 'static,
{
tokio::spawn(task);
}
fn debug_info(&self) {
debug!(
@ -208,7 +149,7 @@ impl Session {
);
}
// #[cfg_attr(feature = "cargo-clippy", allow(match_same_arms))]
#[allow(clippy::match_same_arms)]
fn dispatch(&self, cmd: u8, data: Bytes) {
match cmd {
0x4 => {
@ -231,18 +172,15 @@ impl Session {
self.0.data.write().unwrap().country = country;
}
// 0x9 | 0xa => self.channel().dispatch(cmd, data),
// 0xd | 0xe => self.audio_key().dispatch(cmd, data),
// 0xb2..=0xb6 => self.mercury().dispatch(cmd, data),
_ => trace!("Unknown dispatch cmd :{:?} {:?}", cmd, data),
0x9 | 0xa => self.channel().dispatch(cmd, data),
0xd | 0xe => self.audio_key().dispatch(cmd, data),
0xb2..=0xb6 => self.mercury().dispatch(cmd, data),
_ => (),
}
}
pub fn send_packet(&self, cmd: u8, data: Vec<u8>) {
self.0
.tx_connection
.unbounded_send(Ok((cmd, data)))
.unwrap();
self.0.tx_connection.unbounded_send((cmd, data)).unwrap();
}
pub fn cache(&self) -> Option<&Arc<Cache>> {
@ -276,8 +214,8 @@ impl Session {
pub fn shutdown(&self) {
debug!("Invalidating session[{}]", self.0.session_id);
self.0.data.write().unwrap().invalid = true;
// self.mercury().shutdown();
// self.channel().shutdown();
self.mercury().shutdown();
self.channel().shutdown();
}
pub fn is_invalid(&self) -> bool {
@ -306,35 +244,36 @@ impl Drop for SessionInternal {
struct DispatchTask<S>(S, SessionWeak)
where
S: Stream<Item = io::Result<(u8, Bytes)>> + Unpin;
S: TryStream<Ok = (u8, Bytes)> + Unpin;
impl<S: Stream<Item = io::Result<(u8, Bytes)>>> Future for DispatchTask<S>
impl<S> Future for DispatchTask<S>
where
S: Stream<Item = io::Result<(u8, Bytes)>> + Unpin,
S: TryStream<Ok = (u8, Bytes)> + Unpin,
<S as TryStream>::Ok: std::fmt::Debug,
{
type Output = Result<()>;
type Output = Result<(), S::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let session = match self.1.try_upgrade() {
Some(session) => session,
None => return Poll::Ready(Ok(())),
};
loop {
let (cmd, data) = match Pin::new(&mut self.0).poll_next(cx) {
let (cmd, data) = match self.0.try_poll_next_unpin(cx) {
Poll::Ready(Some(Ok(t))) => t,
Poll::Ready(Some(Err(e))) => {
warn!("Server Connectioned errored");
session.shutdown();
return Poll::Ready(Err(Box::new(e)));
}
Poll::Ready(None) => {
warn!("Connection to server closed.");
session.shutdown();
return Poll::Ready(Ok(()));
}
Poll::Ready(Some(Err(e))) => {
session.shutdown();
return Poll::Ready(Err(e));
}
Poll::Pending => return Poll::Pending,
};
session.dispatch(cmd, data);
}
}
@ -342,7 +281,7 @@ where
impl<S> Drop for DispatchTask<S>
where
S: Stream<Item = io::Result<(u8, Bytes)>> + Unpin,
S: TryStream<Ok = (u8, Bytes)> + Unpin,
{
fn drop(&mut self) {
debug!("drop Dispatch");

View file

@ -1,45 +1,32 @@
use futures::future::TryFutureExt;
use librespot_core::*;
use tokio::runtime;
#[cfg(test)]
mod tests {
use super::*;
// Test AP Resolve
use apresolve::apresolve_or_fallback;
#[test]
fn test_ap_resolve() {
let mut rt = runtime::Runtime::new().unwrap();
let ap = rt.block_on(apresolve_or_fallback(&None, &Some(80)));
#[tokio::test]
async fn test_ap_resolve() {
let ap = apresolve_or_fallback(&None, &None).await;
println!("AP: {:?}", ap);
}
// Test connect
use authentication::Credentials;
use config::SessionConfig;
use connection;
#[test]
fn test_connection() {
#[tokio::test]
async fn test_connection() -> Result<(), Box<dyn std::error::Error>> {
println!("Running connection test");
let mut rt = runtime::Runtime::new().unwrap();
let access_point_addr = rt.block_on(apresolve_or_fallback(&None, &None)).unwrap();
let ap = apresolve_or_fallback(&None, &None).await;
let credentials = Credentials::with_password(String::from("test"), String::from("test"));
let session_config = SessionConfig::default();
let proxy = None;
println!("Connecting to AP \"{}\"", access_point_addr);
let connection = connection::connect(access_point_addr, &proxy);
let device_id = session_config.device_id.clone();
let authentication = connection.and_then(move |connection| {
connection::authenticate(connection, credentials, device_id)
});
match rt.block_on(authentication) {
Ok((_transport, reusable_credentials)) => {
println!("Authenticated as \"{}\" !", reusable_credentials.username)
}
// TODO assert that we get BadCredentials once we don't panic
Err(e) => println!("ConnectError: {:?}", e),
}
println!("Connecting to AP \"{}\"", ap);
let mut connection = connection::connect(ap, &proxy).await?;
let rc = connection::authenticate(&mut connection, credentials, &session_config.device_id)
.await?;
println!("Authenticated as \"{}\"", rc.username);
Ok(())
}
}

View file

@ -8,8 +8,9 @@ repository = "https://github.com/librespot-org/librespot"
edition = "2018"
[dependencies]
async-trait = "0.1"
byteorder = "1.3"
futures = "0.1"
futures = "0.3"
linear-map = "1.2"
protobuf = "~2.14.0"
log = "0.4"

View file

@ -1,6 +1,11 @@
#![allow(clippy::unused_io_amount)]
#![allow(clippy::redundant_field_names)]
#[macro_use]
extern crate log;
#[macro_use]
extern crate async_trait;
extern crate byteorder;
extern crate futures;
extern crate linear_map;
@ -11,8 +16,6 @@ extern crate librespot_protocol as protocol;
pub mod cover;
use futures::future;
use futures::Future;
use linear_map::LinearMap;
use librespot_core::mercury::MercuryError;
@ -69,81 +72,67 @@ pub struct AudioItem {
}
impl AudioItem {
pub fn get_audio_item(
session: &Session,
id: SpotifyId,
) -> Box<dyn Future<Item = AudioItem, Error = MercuryError>> {
pub async fn get_audio_item(session: &Session, id: SpotifyId) -> Result<Self, MercuryError> {
match id.audio_type {
SpotifyAudioType::Track => Track::get_audio_item(session, id),
SpotifyAudioType::Podcast => Episode::get_audio_item(session, id),
SpotifyAudioType::NonPlayable => {
Box::new(future::err::<AudioItem, MercuryError>(MercuryError))
}
SpotifyAudioType::Track => Track::get_audio_item(session, id).await,
SpotifyAudioType::Podcast => Episode::get_audio_item(session, id).await,
SpotifyAudioType::NonPlayable => Err(MercuryError),
}
}
}
#[async_trait]
trait AudioFiles {
fn get_audio_item(
session: &Session,
id: SpotifyId,
) -> Box<dyn Future<Item = AudioItem, Error = MercuryError>>;
async fn get_audio_item(session: &Session, id: SpotifyId) -> Result<AudioItem, MercuryError>;
}
#[async_trait]
impl AudioFiles for Track {
fn get_audio_item(
session: &Session,
id: SpotifyId,
) -> Box<dyn Future<Item = AudioItem, Error = MercuryError>> {
Box::new(Self::get(session, id).and_then(move |item| {
Ok(AudioItem {
id: id,
uri: format!("spotify:track:{}", id.to_base62()),
files: item.files,
name: item.name,
duration: item.duration,
available: item.available,
alternatives: Some(item.alternatives),
})
}))
async fn get_audio_item(session: &Session, id: SpotifyId) -> Result<AudioItem, MercuryError> {
let item = Self::get(session, id).await?;
Ok(AudioItem {
id: id,
uri: format!("spotify:track:{}", id.to_base62()),
files: item.files,
name: item.name,
duration: item.duration,
available: item.available,
alternatives: Some(item.alternatives),
})
}
}
#[async_trait]
impl AudioFiles for Episode {
fn get_audio_item(
session: &Session,
id: SpotifyId,
) -> Box<dyn Future<Item = AudioItem, Error = MercuryError>> {
Box::new(Self::get(session, id).and_then(move |item| {
Ok(AudioItem {
id: id,
uri: format!("spotify:episode:{}", id.to_base62()),
files: item.files,
name: item.name,
duration: item.duration,
available: item.available,
alternatives: None,
})
}))
async fn get_audio_item(session: &Session, id: SpotifyId) -> Result<AudioItem, MercuryError> {
let item = Self::get(session, id).await?;
Ok(AudioItem {
id: id,
uri: format!("spotify:episode:{}", id.to_base62()),
files: item.files,
name: item.name,
duration: item.duration,
available: item.available,
alternatives: None,
})
}
}
#[async_trait]
pub trait Metadata: Send + Sized + 'static {
type Message: protobuf::Message;
fn request_url(id: SpotifyId) -> String;
fn parse(msg: &Self::Message, session: &Session) -> Self;
fn get(session: &Session, id: SpotifyId) -> Box<dyn Future<Item = Self, Error = MercuryError>> {
async fn get(session: &Session, id: SpotifyId) -> Result<Self, MercuryError> {
let uri = Self::request_url(id);
let request = session.mercury().get(uri);
let response = session.mercury().get(uri).await?;
let data = response.payload.first().expect("Empty payload");
let msg: Self::Message = protobuf::parse_from_bytes(data).unwrap();
let session = session.clone();
Box::new(request.and_then(move |response| {
let data = response.payload.first().expect("Empty payload");
let msg: Self::Message = protobuf::parse_from_bytes(data).unwrap();
Ok(Self::parse(&msg, &session))
}))
Ok(Self::parse(&msg, &session))
}
}

View file

@ -18,9 +18,9 @@ path = "../metadata"
version = "0.1.3"
[dependencies]
futures = "0.1"
futures = "0.3"
log = "0.4"
byteorder = "1.3"
byteorder = "1.4"
shell-words = "1.0.0"
alsa = { version = "0.2", optional = true }

View file

@ -10,7 +10,9 @@ pub trait Sink {
fn write(&mut self, data: &[i16]) -> io::Result<()>;
}
fn mk_sink<S: Sink + Open + 'static>(device: Option<String>) -> Box<dyn Sink> {
pub type SinkBuilder = fn(Option<String>) -> Box<dyn Sink + Send>;
fn mk_sink<S: Sink + Open + Send + 'static>(device: Option<String>) -> Box<dyn Sink + Send> {
Box::new(S::open(device))
}
@ -54,7 +56,7 @@ use self::pipe::StdoutSink;
mod subprocess;
use self::subprocess::SubprocessSink;
pub const BACKENDS: &'static [(&'static str, fn(Option<String>) -> Box<dyn Sink>)] = &[
pub const BACKENDS: &'static [(&'static str, SinkBuilder)] = &[
#[cfg(feature = "alsa-backend")]
("alsa", mk_sink::<AlsaSink>),
#[cfg(feature = "portaudio-backend")]
@ -73,7 +75,7 @@ pub const BACKENDS: &'static [(&'static str, fn(Option<String>) -> Box<dyn Sink>
("subprocess", mk_sink::<SubprocessSink>),
];
pub fn find(name: Option<String>) -> Option<fn(Option<String>) -> Box<dyn Sink>> {
pub fn find(name: Option<String>) -> Option<SinkBuilder> {
if let Some(name) = name {
BACKENDS
.iter()

View file

@ -4,7 +4,7 @@ use std::io::{self, Write};
use std::mem;
use std::slice;
pub struct StdoutSink(Box<dyn Write>);
pub struct StdoutSink(Box<dyn Write + Send>);
impl Open for StdoutSink {
fn open(path: Option<String>) -> StdoutSink {

View file

@ -1,20 +1,3 @@
use byteorder::{LittleEndian, ReadBytesExt};
use futures;
use futures::{future, Async, Future, Poll, Stream};
use std;
use std::borrow::Cow;
use std::cmp::max;
use std::io::{Read, Result, Seek, SeekFrom};
use std::mem;
use std::thread;
use std::time::{Duration, Instant};
use crate::config::{Bitrate, PlayerConfig};
use librespot_core::session::Session;
use librespot_core::spotify_id::SpotifyId;
use librespot_core::util::SeqGenerator;
use crate::audio::{AudioDecrypt, AudioFile, StreamLoaderController};
use crate::audio::{VorbisDecoder, VorbisPacket};
use crate::audio::{
@ -22,14 +5,34 @@ use crate::audio::{
READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS,
};
use crate::audio_backend::Sink;
use crate::config::{Bitrate, PlayerConfig};
use crate::librespot_core::tokio;
use crate::metadata::{AudioItem, FileFormat};
use crate::mixer::AudioFilter;
use librespot_core::session::Session;
use librespot_core::spotify_id::SpotifyId;
use librespot_core::util::SeqGenerator;
use byteorder::{LittleEndian, ReadBytesExt};
use futures::{
channel::{mpsc, oneshot},
future, Future, Stream, StreamExt,
};
use std::io::{Read, Seek, SeekFrom};
use std::mem;
use std::time::{Duration, Instant};
use std::{borrow::Cow, io};
use std::{
cmp::max,
pin::Pin,
task::{Context, Poll},
};
const PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS: u32 = 30000;
pub struct Player {
commands: Option<futures::sync::mpsc::UnboundedSender<PlayerCommand>>,
thread_handle: Option<thread::JoinHandle<()>>,
commands: Option<mpsc::UnboundedSender<PlayerCommand>>,
task_handle: Option<tokio::task::JoinHandle<()>>,
play_request_id_generator: SeqGenerator<u64>,
}
@ -45,15 +48,15 @@ pub type SinkEventCallback = Box<dyn Fn(SinkStatus) + Send>;
struct PlayerInternal {
session: Session,
config: PlayerConfig,
commands: futures::sync::mpsc::UnboundedReceiver<PlayerCommand>,
commands: mpsc::UnboundedReceiver<PlayerCommand>,
state: PlayerState,
preload: PlayerPreload,
sink: Box<dyn Sink>,
sink: Box<dyn Sink + Send>,
sink_status: SinkStatus,
sink_event_callback: Option<SinkEventCallback>,
audio_filter: Option<Box<dyn AudioFilter + Send>>,
event_senders: Vec<futures::sync::mpsc::UnboundedSender<PlayerEvent>>,
event_senders: Vec<mpsc::UnboundedSender<PlayerEvent>>,
}
enum PlayerCommand {
@ -70,7 +73,7 @@ enum PlayerCommand {
Pause,
Stop,
Seek(u32),
AddEventSender(futures::sync::mpsc::UnboundedSender<PlayerEvent>),
AddEventSender(mpsc::UnboundedSender<PlayerEvent>),
SetSinkEventCallback(Option<SinkEventCallback>),
EmitVolumeSetEvent(u16),
}
@ -182,7 +185,7 @@ impl PlayerEvent {
}
}
pub type PlayerEventChannel = futures::sync::mpsc::UnboundedReceiver<PlayerEvent>;
pub type PlayerEventChannel = mpsc::UnboundedReceiver<PlayerEvent>;
#[derive(Clone, Copy, Debug)]
struct NormalisationData {
@ -193,7 +196,7 @@ struct NormalisationData {
}
impl NormalisationData {
fn parse_from_file<T: Read + Seek>(mut file: T) -> Result<NormalisationData> {
fn parse_from_file<T: Read + Seek>(mut file: T) -> io::Result<NormalisationData> {
const SPOTIFY_NORMALIZATION_HEADER_START_OFFSET: u64 = 144;
file.seek(SeekFrom::Start(SPOTIFY_NORMALIZATION_HEADER_START_OFFSET))
.unwrap();
@ -239,38 +242,38 @@ impl Player {
sink_builder: F,
) -> (Player, PlayerEventChannel)
where
F: FnOnce() -> Box<dyn Sink> + Send + 'static,
F: FnOnce() -> Box<dyn Sink + Send> + Send + 'static,
{
let (cmd_tx, cmd_rx) = futures::sync::mpsc::unbounded();
let (event_sender, event_receiver) = futures::sync::mpsc::unbounded();
let (cmd_tx, cmd_rx) = mpsc::unbounded();
let (event_sender, event_receiver) = mpsc::unbounded();
let handle = thread::spawn(move || {
debug!("new Player[{}]", session.session_id());
debug!("new Player[{}]", session.session_id());
let internal = PlayerInternal {
session: session,
config: config,
commands: cmd_rx,
let internal = PlayerInternal {
session: session,
config: config,
commands: cmd_rx,
state: PlayerState::Stopped,
preload: PlayerPreload::None,
sink: sink_builder(),
sink_status: SinkStatus::Closed,
sink_event_callback: None,
audio_filter: audio_filter,
event_senders: [event_sender].to_vec(),
};
state: PlayerState::Stopped,
preload: PlayerPreload::None,
sink: sink_builder(),
sink_status: SinkStatus::Closed,
sink_event_callback: None,
audio_filter: audio_filter,
event_senders: [event_sender].to_vec(),
};
// While PlayerInternal is written as a future, it still contains blocking code.
// It must be run by using wait() in a dedicated thread.
let _ = internal.wait();
// While PlayerInternal is written as a future, it still contains blocking code.
// It must be run by using wait() in a dedicated thread.
let handle = tokio::spawn(async move {
internal.await;
debug!("PlayerInternal thread finished.");
});
(
Player {
commands: Some(cmd_tx),
thread_handle: Some(handle),
task_handle: Some(handle),
play_request_id_generator: SeqGenerator::new(0),
},
event_receiver,
@ -314,22 +317,21 @@ impl Player {
}
pub fn get_player_event_channel(&self) -> PlayerEventChannel {
let (event_sender, event_receiver) = futures::sync::mpsc::unbounded();
let (event_sender, event_receiver) = mpsc::unbounded();
self.command(PlayerCommand::AddEventSender(event_sender));
event_receiver
}
pub fn get_end_of_track_future(&self) -> Box<dyn Future<Item = (), Error = ()>> {
let result = self
.get_player_event_channel()
.filter(|event| match event {
PlayerEvent::EndOfTrack { .. } | PlayerEvent::Stopped { .. } => true,
_ => false,
pub async fn get_end_of_track_future(&self) {
self.get_player_event_channel()
.filter(|event| {
future::ready(matches!(
event,
PlayerEvent::EndOfTrack { .. } | PlayerEvent::Stopped { .. }
))
})
.into_future()
.map_err(|_| ())
.map(|_| ());
Box::new(result)
.for_each(|_| future::ready(()))
.await
}
pub fn set_sink_event_callback(&self, callback: Option<SinkEventCallback>) {
@ -345,11 +347,13 @@ impl Drop for Player {
fn drop(&mut self) {
debug!("Shutting down player thread ...");
self.commands = None;
if let Some(handle) = self.thread_handle.take() {
match handle.join() {
Ok(_) => (),
Err(_) => error!("Player thread panicked!"),
}
if let Some(handle) = self.task_handle.take() {
tokio::spawn(async {
match handle.await {
Ok(_) => (),
Err(_) => error!("Player thread panicked!"),
}
});
}
}
}
@ -367,11 +371,11 @@ enum PlayerPreload {
None,
Loading {
track_id: SpotifyId,
loader: Box<dyn Future<Item = PlayerLoadedTrackData, Error = ()>>,
loader: Pin<Box<dyn Future<Output = Result<PlayerLoadedTrackData, ()>> + Send>>,
},
Ready {
track_id: SpotifyId,
loaded_track: PlayerLoadedTrackData,
loaded_track: Box<PlayerLoadedTrackData>,
},
}
@ -383,7 +387,7 @@ enum PlayerState {
track_id: SpotifyId,
play_request_id: u64,
start_playback: bool,
loader: Box<dyn Future<Item = PlayerLoadedTrackData, Error = ()>>,
loader: Pin<Box<dyn Future<Output = Result<PlayerLoadedTrackData, ()>> + Send>>,
},
Paused {
track_id: SpotifyId,
@ -428,23 +432,15 @@ impl PlayerState {
#[allow(dead_code)]
fn is_stopped(&self) -> bool {
use self::PlayerState::*;
match *self {
Stopped => true,
_ => false,
}
matches!(self, Self::Stopped)
}
fn is_loading(&self) -> bool {
use self::PlayerState::*;
match *self {
Loading { .. } => true,
_ => false,
}
matches!(self, Self::Loading { .. })
}
fn decoder(&mut self) -> Option<&mut Decoder> {
use self::PlayerState::*;
use PlayerState::*;
match *self {
Stopped | EndOfTrack { .. } | Loading { .. } => None,
Paused {
@ -573,22 +569,23 @@ struct PlayerTrackLoader {
}
impl PlayerTrackLoader {
fn find_available_alternative<'a>(&self, audio: &'a AudioItem) -> Option<Cow<'a, AudioItem>> {
async fn find_available_alternative<'a, 'b>(
&'a self,
audio: &'b AudioItem,
) -> Option<Cow<'b, AudioItem>> {
if audio.available {
Some(Cow::Borrowed(audio))
} else if let Some(alternatives) = &audio.alternatives {
let alternatives = alternatives
.iter()
.map(|alt_id| AudioItem::get_audio_item(&self.session, *alt_id));
let alternatives = future::try_join_all(alternatives).await.unwrap();
alternatives
.into_iter()
.find(|alt| alt.available)
.map(Cow::Owned)
} else {
if let Some(alternatives) = &audio.alternatives {
let alternatives = alternatives
.iter()
.map(|alt_id| AudioItem::get_audio_item(&self.session, *alt_id));
let alternatives = future::join_all(alternatives).wait().unwrap();
alternatives
.into_iter()
.find(|alt| alt.available)
.map(Cow::Owned)
} else {
None
}
None
}
}
@ -611,8 +608,12 @@ impl PlayerTrackLoader {
}
}
fn load_track(&self, spotify_id: SpotifyId, position_ms: u32) -> Option<PlayerLoadedTrackData> {
let audio = match AudioItem::get_audio_item(&self.session, spotify_id).wait() {
async fn load_track(
&self,
spotify_id: SpotifyId,
position_ms: u32,
) -> Option<PlayerLoadedTrackData> {
let audio = match AudioItem::get_audio_item(&self.session, spotify_id).await {
Ok(audio) => audio,
Err(_) => {
error!("Unable to load audio item.");
@ -622,7 +623,7 @@ impl PlayerTrackLoader {
info!("Loading <{}> with Spotify URI <{}>", audio.name, audio.uri);
let audio = match self.find_available_alternative(&audio) {
let audio = match self.find_available_alternative(&audio).await {
Some(audio) => audio,
None => {
warn!("<{}> is not available", audio.uri);
@ -675,7 +676,7 @@ impl PlayerTrackLoader {
play_from_beginning,
);
let encrypted_file = match encrypted_file.wait() {
let encrypted_file = match encrypted_file.await {
Ok(encrypted_file) => encrypted_file,
Err(_) => {
error!("Unable to load encrypted file.");
@ -693,7 +694,7 @@ impl PlayerTrackLoader {
stream_loader_controller.set_random_access_mode();
}
let key = match key.wait() {
let key = match key.await {
Ok(key) => key,
Err(_) => {
error!("Unable to load decryption key");
@ -709,7 +710,7 @@ impl PlayerTrackLoader {
}
Err(_) => {
warn!("Unable to extract normalisation data, using default value.");
1.0 as f32
1.0_f32
}
};
@ -738,10 +739,9 @@ impl PlayerTrackLoader {
}
impl Future for PlayerInternal {
type Item = ();
type Error = ();
type Output = ();
fn poll(&mut self) -> Poll<(), ()> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// While this is written as a future, it still contains blocking code.
// It must be run on its own thread.
@ -749,14 +749,13 @@ impl Future for PlayerInternal {
let mut all_futures_completed_or_not_ready = true;
// process commands that were sent to us
let cmd = match self.commands.poll() {
Ok(Async::Ready(None)) => return Ok(Async::Ready(())), // client has disconnected - shut down.
Ok(Async::Ready(Some(cmd))) => {
let cmd = match Pin::new(&mut self.commands).poll_next(cx) {
Poll::Ready(None) => return Poll::Ready(()), // client has disconnected - shut down.
Poll::Ready(Some(cmd)) => {
all_futures_completed_or_not_ready = false;
Some(cmd)
}
Ok(Async::NotReady) => None,
Err(_) => None,
_ => None,
};
if let Some(cmd) = cmd {
@ -771,8 +770,8 @@ impl Future for PlayerInternal {
play_request_id,
} = self.state
{
match loader.poll() {
Ok(Async::Ready(loaded_track)) => {
match loader.as_mut().poll(cx) {
Poll::Ready(Ok(loaded_track)) => {
self.start_playback(
track_id,
play_request_id,
@ -783,8 +782,7 @@ impl Future for PlayerInternal {
panic!("The state wasn't changed by start_playback()");
}
}
Ok(Async::NotReady) => (),
Err(_) => {
Poll::Ready(Err(_)) => {
warn!("Unable to load <{:?}>\nSkipping to next track", track_id);
assert!(self.state.is_loading());
self.send_event(PlayerEvent::EndOfTrack {
@ -792,6 +790,7 @@ impl Future for PlayerInternal {
play_request_id,
})
}
Poll::Pending => (),
}
}
@ -801,16 +800,15 @@ impl Future for PlayerInternal {
track_id,
} = self.preload
{
match loader.poll() {
Ok(Async::Ready(loaded_track)) => {
match loader.as_mut().poll(cx) {
Poll::Ready(Ok(loaded_track)) => {
self.send_event(PlayerEvent::Preloading { track_id });
self.preload = PlayerPreload::Ready {
track_id,
loaded_track,
loaded_track: Box::new(loaded_track),
};
}
Ok(Async::NotReady) => (),
Err(_) => {
Poll::Ready(Err(_)) => {
debug!("Unable to preload {:?}", track_id);
self.preload = PlayerPreload::None;
// Let Spirc know that the track was unavailable.
@ -827,6 +825,7 @@ impl Future for PlayerInternal {
});
}
}
Poll::Pending => (),
}
}
@ -847,8 +846,7 @@ impl Future for PlayerInternal {
let packet = decoder.next_packet().expect("Vorbis error");
if let Some(ref packet) = packet {
*stream_position_pcm =
*stream_position_pcm + (packet.data().len() / 2) as u64;
*stream_position_pcm += (packet.data().len() / 2) as u64;
let stream_position_millis = Self::position_pcm_to_ms(*stream_position_pcm);
let notify_about_position = match *reported_nominal_start_time {
@ -858,11 +856,7 @@ impl Future for PlayerInternal {
let lag = (Instant::now() - reported_nominal_start_time).as_millis()
as i64
- stream_position_millis as i64;
if lag > 1000 {
true
} else {
false
}
lag > 1000
}
};
if notify_about_position {
@ -918,11 +912,11 @@ impl Future for PlayerInternal {
}
if self.session.is_invalid() {
return Ok(Async::Ready(()));
return Poll::Ready(());
}
if (!self.state.is_playing()) && all_futures_completed_or_not_ready {
return Ok(Async::NotReady);
return Poll::Pending;
}
}
}
@ -1061,12 +1055,14 @@ impl PlayerInternal {
fn handle_packet(&mut self, packet: Option<VorbisPacket>, normalisation_factor: f32) {
match packet {
Some(mut packet) => {
if packet.data().len() > 0 {
if !packet.data().is_empty() {
if let Some(ref editor) = self.audio_filter {
editor.modify_stream(&mut packet.data_mut())
};
if self.config.normalisation && normalisation_factor != 1.0 {
if self.config.normalisation
&& (normalisation_factor - 1.0).abs() < f32::EPSILON
{
for x in packet.data_mut().iter_mut() {
*x = (*x as f32 * normalisation_factor) as i16;
}
@ -1214,10 +1210,9 @@ impl PlayerInternal {
loaded_track
.stream_loader_controller
.set_random_access_mode();
let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking.
// But most likely the track is fully
// loaded already because we played
// to the end of it.
let _ = tokio::task::block_in_place(|| {
loaded_track.decoder.seek(position_ms as i64)
});
loaded_track.stream_loader_controller.set_stream_mode();
loaded_track.stream_position_pcm = Self::position_ms_to_pcm(position_ms);
}
@ -1250,7 +1245,7 @@ impl PlayerInternal {
// we can use the current decoder. Ensure it's at the correct position.
if Self::position_ms_to_pcm(position_ms) != *stream_position_pcm {
stream_loader_controller.set_random_access_mode();
let _ = decoder.seek(position_ms as i64); // This may be blocking.
let _ = tokio::task::block_in_place(|| decoder.seek(position_ms as i64));
stream_loader_controller.set_stream_mode();
*stream_position_pcm = Self::position_ms_to_pcm(position_ms);
}
@ -1318,10 +1313,12 @@ impl PlayerInternal {
loaded_track
.stream_loader_controller
.set_random_access_mode();
let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking
let _ = tokio::task::block_in_place(|| {
loaded_track.decoder.seek(position_ms as i64)
});
loaded_track.stream_loader_controller.set_stream_mode();
}
self.start_playback(track_id, play_request_id, loaded_track, play);
self.start_playback(track_id, play_request_id, *loaded_track, play);
return;
} else {
unreachable!();
@ -1363,9 +1360,7 @@ impl PlayerInternal {
self.preload = PlayerPreload::None;
// If we don't have a loader yet, create one from scratch.
let loader = loader
.or_else(|| Some(self.load_track(track_id, position_ms)))
.unwrap();
let loader = loader.unwrap_or_else(|| Box::pin(self.load_track(track_id, position_ms)));
// Set ourselves to a loading state.
self.state = PlayerState::Loading {
@ -1420,7 +1415,10 @@ impl PlayerInternal {
// schedule the preload of the current track if desired.
if preload_track {
let loader = self.load_track(track_id, 0);
self.preload = PlayerPreload::Loading { track_id, loader }
self.preload = PlayerPreload::Loading {
track_id,
loader: Box::pin(loader),
}
}
}
@ -1532,34 +1530,33 @@ impl PlayerInternal {
}
}
fn load_track(
pub fn load_track(
&self,
spotify_id: SpotifyId,
position_ms: u32,
) -> Box<dyn Future<Item = PlayerLoadedTrackData, Error = ()>> {
) -> impl Future<Output = Result<PlayerLoadedTrackData, ()>> + Send + 'static {
// This method creates a future that returns the loaded stream and associated info.
// Ideally all work should be done using asynchronous code. However, seek() on the
// audio stream is implemented in a blocking fashion. Thus, we can't turn it into future
// easily. Instead we spawn a thread to do the work and return a one-shot channel as the
// future to work with.
let loader = PlayerTrackLoader {
session: self.session.clone(),
config: self.config.clone(),
};
let session = self.session.clone();
let config = self.config.clone();
let (result_tx, result_rx) = futures::sync::oneshot::channel();
async move {
let loader = PlayerTrackLoader { session, config };
std::thread::spawn(move || {
loader
.load_track(spotify_id, position_ms)
.and_then(move |data| {
let (result_tx, result_rx) = oneshot::channel();
tokio::spawn(async move {
if let Some(data) = loader.load_track(spotify_id, position_ms).await {
let _ = result_tx.send(data);
Some(())
});
});
}
});
Box::new(result_rx.map_err(|_| ()))
result_rx.await.map_err(|_| ())
}
}
fn preload_data_before_playback(&mut self) {
@ -1585,7 +1582,9 @@ impl PlayerInternal {
* bytes_per_second as f64) as usize,
(READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize,
);
stream_loader_controller.fetch_next_blocking(wait_for_data_length);
tokio::task::block_in_place(|| {
stream_loader_controller.fetch_next_blocking(wait_for_data_length)
});
}
}
}
@ -1689,13 +1688,13 @@ impl<T: Read + Seek> Subfile<T> {
}
impl<T: Read + Seek> Read for Subfile<T> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.stream.read(buf)
}
}
impl<T: Read + Seek> Seek for Subfile<T> {
fn seek(&mut self, mut pos: SeekFrom) -> Result<u64> {
fn seek(&mut self, mut pos: SeekFrom) -> io::Result<u64> {
pos = match pos {
SeekFrom::Start(offset) => SeekFrom::Start(offset + self.offset),
x => x,

View file

@ -1,8 +1,7 @@
#![crate_name = "librespot"]
#![cfg_attr(feature = "cargo-clippy", allow(unused_io_amount))]
pub extern crate librespot_audio as audio;
pub extern crate librespot_connect as connect;
// pub extern crate librespot_connect as connect;
pub extern crate librespot_core as core;
pub extern crate librespot_metadata as metadata;
pub extern crate librespot_playback as playback;

View file

@ -1,623 +0,0 @@
use futures::sync::mpsc::UnboundedReceiver;
use futures::{Async, Future, Poll, Stream};
use log::{error, info, trace, warn};
use sha1::{Digest, Sha1};
use std::env;
use std::io::{self, stderr, Write};
use std::mem;
use std::path::PathBuf;
use std::process::exit;
use std::str::FromStr;
use std::time::Instant;
use tokio_io::IoStream;
use url::Url;
use librespot::core::authentication::{get_credentials, Credentials};
use librespot::core::cache::Cache;
use librespot::core::config::{ConnectConfig, DeviceType, SessionConfig, VolumeCtrl};
use librespot::core::session::Session;
use librespot::core::version;
use librespot::connect::discovery::{discovery, DiscoveryStream};
use librespot::connect::spirc::{Spirc, SpircTask};
use librespot::playback::audio_backend::{self, Sink, BACKENDS};
use librespot::playback::config::{Bitrate, PlayerConfig};
use librespot::playback::mixer::{self, Mixer, MixerConfig};
use librespot::playback::player::{Player, PlayerEvent};
use tokio::runtime::{
current_thread,
current_thread::{Handle, Runtime},
};
mod player_event_handler;
use crate::player_event_handler::{emit_sink_event, run_program_on_events};
fn device_id(name: &str) -> String {
hex::encode(Sha1::digest(name.as_bytes()))
}
fn usage(program: &str, opts: &getopts::Options) -> String {
let brief = format!("Usage: {} [options]", program);
opts.usage(&brief)
}
fn setup_logging(verbose: bool) {
let mut builder = env_logger::Builder::new();
match env::var("RUST_LOG") {
Ok(config) => {
builder.parse_filters(&config);
builder.init();
if verbose {
warn!("`--verbose` flag overidden by `RUST_LOG` environment variable");
}
}
Err(_) => {
if verbose {
builder.parse_filters("libmdns=info,librespot=trace");
} else {
builder.parse_filters("libmdns=info,librespot=info");
}
builder.init();
}
}
}
fn list_backends() {
println!("Available Backends : ");
for (&(name, _), idx) in BACKENDS.iter().zip(0..) {
if idx == 0 {
println!("- {} (default)", name);
} else {
println!("- {}", name);
}
}
}
#[derive(Clone)]
struct Setup {
backend: fn(Option<String>) -> Box<dyn Sink>,
device: Option<String>,
mixer: fn(Option<MixerConfig>) -> Box<dyn Mixer>,
cache: Option<Cache>,
player_config: PlayerConfig,
session_config: SessionConfig,
connect_config: ConnectConfig,
mixer_config: MixerConfig,
credentials: Option<Credentials>,
enable_discovery: bool,
zeroconf_port: u16,
player_event_program: Option<String>,
emit_sink_events: bool,
}
fn setup(args: &[String]) -> Setup {
let mut opts = getopts::Options::new();
opts.optopt(
"c",
"cache",
"Path to a directory where files will be cached.",
"CACHE",
).optopt(
"",
"system-cache",
"Path to a directory where system files (credentials, volume) will be cached. Can be different from cache option value",
"SYTEMCACHE",
).optflag("", "disable-audio-cache", "Disable caching of the audio data.")
.reqopt("n", "name", "Device name", "NAME")
.optopt("", "device-type", "Displayed device type", "DEVICE_TYPE")
.optopt(
"b",
"bitrate",
"Bitrate (96, 160 or 320). Defaults to 160",
"BITRATE",
)
.optopt(
"",
"onevent",
"Run PROGRAM when playback is about to begin.",
"PROGRAM",
)
.optflag("", "emit-sink-events", "Run program set by --onevent before sink is opened and after it is closed.")
.optflag("v", "verbose", "Enable verbose output")
.optopt("u", "username", "Username to sign in with", "USERNAME")
.optopt("p", "password", "Password", "PASSWORD")
.optopt("", "proxy", "HTTP proxy to use when connecting", "PROXY")
.optopt("", "ap-port", "Connect to AP with specified port. If no AP with that port are present fallback AP will be used. Available ports are usually 80, 443 and 4070", "AP_PORT")
.optflag("", "disable-discovery", "Disable discovery mode")
.optopt(
"",
"backend",
"Audio backend to use. Use '?' to list options",
"BACKEND",
)
.optopt(
"",
"device",
"Audio device to use. Use '?' to list options if using portaudio or alsa",
"DEVICE",
)
.optopt("", "mixer", "Mixer to use (alsa or softvol)", "MIXER")
.optopt(
"m",
"mixer-name",
"Alsa mixer name, e.g \"PCM\" or \"Master\". Defaults to 'PCM'",
"MIXER_NAME",
)
.optopt(
"",
"mixer-card",
"Alsa mixer card, e.g \"hw:0\" or similar from `aplay -l`. Defaults to 'default' ",
"MIXER_CARD",
)
.optopt(
"",
"mixer-index",
"Alsa mixer index, Index of the cards mixer. Defaults to 0",
"MIXER_INDEX",
)
.optflag(
"",
"mixer-linear-volume",
"Disable alsa's mapped volume scale (cubic). Default false",
)
.optopt(
"",
"initial-volume",
"Initial volume in %, once connected (must be from 0 to 100)",
"VOLUME",
)
.optopt(
"",
"zeroconf-port",
"The port the internal server advertised over zeroconf uses.",
"ZEROCONF_PORT",
)
.optflag(
"",
"enable-volume-normalisation",
"Play all tracks at the same volume",
)
.optopt(
"",
"normalisation-pregain",
"Pregain (dB) applied by volume normalisation",
"PREGAIN",
)
.optopt(
"",
"volume-ctrl",
"Volume control type - [linear, log, fixed]. Default is logarithmic",
"VOLUME_CTRL"
)
.optflag(
"",
"autoplay",
"autoplay similar songs when your music ends.",
)
.optflag(
"",
"disable-gapless",
"disable gapless playback.",
);
let matches = match opts.parse(&args[1..]) {
Ok(m) => m,
Err(f) => {
writeln!(
stderr(),
"error: {}\n{}",
f.to_string(),
usage(&args[0], &opts)
)
.unwrap();
exit(1);
}
};
let verbose = matches.opt_present("verbose");
setup_logging(verbose);
info!(
"librespot {} ({}). Built on {}. Build ID: {}",
version::short_sha(),
version::commit_date(),
version::short_now(),
version::build_id()
);
let backend_name = matches.opt_str("backend");
if backend_name == Some("?".into()) {
list_backends();
exit(0);
}
let backend = audio_backend::find(backend_name).expect("Invalid backend");
let device = matches.opt_str("device");
if device == Some("?".into()) {
backend(device);
exit(0);
}
let mixer_name = matches.opt_str("mixer");
let mixer = mixer::find(mixer_name.as_ref()).expect("Invalid mixer");
let mixer_config = MixerConfig {
card: matches
.opt_str("mixer-card")
.unwrap_or(String::from("default")),
mixer: matches.opt_str("mixer-name").unwrap_or(String::from("PCM")),
index: matches
.opt_str("mixer-index")
.map(|index| index.parse::<u32>().unwrap())
.unwrap_or(0),
mapped_volume: !matches.opt_present("mixer-linear-volume"),
};
let cache = matches.opt_str("c").map(|cache_path| {
let use_audio_cache = !matches.opt_present("disable-audio-cache");
let system_cache_directory = matches
.opt_str("system-cache")
.unwrap_or(String::from(cache_path.clone()));
Cache::new(
PathBuf::from(cache_path),
PathBuf::from(system_cache_directory),
use_audio_cache,
)
});
let initial_volume = matches
.opt_str("initial-volume")
.map(|volume| {
let volume = volume.parse::<u16>().unwrap();
if volume > 100 {
panic!("Initial volume must be in the range 0-100");
}
(volume as i32 * 0xFFFF / 100) as u16
})
.or_else(|| cache.as_ref().and_then(Cache::volume))
.unwrap_or(0x8000);
let zeroconf_port = matches
.opt_str("zeroconf-port")
.map(|port| port.parse::<u16>().unwrap())
.unwrap_or(0);
let name = matches.opt_str("name").unwrap();
let credentials = {
let cached_credentials = cache.as_ref().and_then(Cache::credentials);
let password = |username: &String| -> String {
write!(stderr(), "Password for {}: ", username).unwrap();
stderr().flush().unwrap();
rpassword::read_password().unwrap()
};
get_credentials(
matches.opt_str("username"),
matches.opt_str("password"),
cached_credentials,
password,
)
};
let session_config = {
let device_id = device_id(&name);
SessionConfig {
user_agent: version::version_string(),
device_id: device_id,
proxy: matches.opt_str("proxy").or(std::env::var("http_proxy").ok()).map(
|s| {
match Url::parse(&s) {
Ok(url) => {
if url.host().is_none() || url.port_or_known_default().is_none() {
panic!("Invalid proxy url, only urls on the format \"http://host:port\" are allowed");
}
if url.scheme() != "http" {
panic!("Only unsecure http:// proxies are supported");
}
url
},
Err(err) => panic!("Invalid proxy url: {}, only urls on the format \"http://host:port\" are allowed", err)
}
},
),
ap_port: matches
.opt_str("ap-port")
.map(|port| port.parse::<u16>().expect("Invalid port")),
}
};
let player_config = {
let bitrate = matches
.opt_str("b")
.as_ref()
.map(|bitrate| Bitrate::from_str(bitrate).expect("Invalid bitrate"))
.unwrap_or(Bitrate::default());
PlayerConfig {
bitrate: bitrate,
gapless: !matches.opt_present("disable-gapless"),
normalisation: matches.opt_present("enable-volume-normalisation"),
normalisation_pregain: matches
.opt_str("normalisation-pregain")
.map(|pregain| pregain.parse::<f32>().expect("Invalid pregain float value"))
.unwrap_or(PlayerConfig::default().normalisation_pregain),
}
};
let connect_config = {
let device_type = matches
.opt_str("device-type")
.as_ref()
.map(|device_type| DeviceType::from_str(device_type).expect("Invalid device type"))
.unwrap_or(DeviceType::default());
let volume_ctrl = matches
.opt_str("volume-ctrl")
.as_ref()
.map(|volume_ctrl| VolumeCtrl::from_str(volume_ctrl).expect("Invalid volume ctrl type"))
.unwrap_or(VolumeCtrl::default());
ConnectConfig {
name: name,
device_type: device_type,
volume: initial_volume,
volume_ctrl: volume_ctrl,
autoplay: matches.opt_present("autoplay"),
}
};
let enable_discovery = !matches.opt_present("disable-discovery");
Setup {
backend: backend,
cache: cache,
session_config: session_config,
player_config: player_config,
connect_config: connect_config,
credentials: credentials,
device: device,
enable_discovery: enable_discovery,
zeroconf_port: zeroconf_port,
mixer: mixer,
mixer_config: mixer_config,
player_event_program: matches.opt_str("onevent"),
emit_sink_events: matches.opt_present("emit-sink-events"),
}
}
struct Main {
cache: Option<Cache>,
player_config: PlayerConfig,
session_config: SessionConfig,
connect_config: ConnectConfig,
backend: fn(Option<String>) -> Box<dyn Sink>,
device: Option<String>,
mixer: fn(Option<MixerConfig>) -> Box<dyn Mixer>,
mixer_config: MixerConfig,
handle: Handle,
discovery: Option<DiscoveryStream>,
signal: IoStream<()>,
spirc: Option<Spirc>,
spirc_task: Option<SpircTask>,
connect: Box<dyn Future<Item = Session, Error = io::Error>>,
shutdown: bool,
last_credentials: Option<Credentials>,
auto_connect_times: Vec<Instant>,
player_event_channel: Option<UnboundedReceiver<PlayerEvent>>,
player_event_program: Option<String>,
emit_sink_events: bool,
}
impl Main {
fn new(handle: Handle, setup: Setup) -> Main {
let mut task = Main {
handle: handle,
cache: setup.cache,
session_config: setup.session_config,
player_config: setup.player_config,
connect_config: setup.connect_config,
backend: setup.backend,
device: setup.device,
mixer: setup.mixer,
mixer_config: setup.mixer_config,
connect: Box::new(futures::future::empty()),
discovery: None,
spirc: None,
spirc_task: None,
shutdown: false,
last_credentials: None,
auto_connect_times: Vec::new(),
signal: Box::new(tokio_signal::ctrl_c().flatten_stream()),
player_event_channel: None,
player_event_program: setup.player_event_program,
emit_sink_events: setup.emit_sink_events,
};
// if setup.enable_discovery {
// let config = task.connect_config.clone();
// let device_id = task.session_config.device_id.clone();
//
// task.discovery = Some(discovery(config, device_id, setup.zeroconf_port).unwrap());
// }
if let Some(credentials) = setup.credentials {
task.credentials(credentials);
}
task
}
fn credentials(&mut self, credentials: Credentials) {
self.last_credentials = Some(credentials.clone());
let config = self.session_config.clone();
let handle = self.handle.clone();
let connection = Session::connect(config, credentials, self.cache.clone(), handle);
self.connect = connection;
self.spirc = None;
let task = mem::replace(&mut self.spirc_task, None);
if let Some(task) = task {
current_thread::spawn(Box::new(task));
}
}
}
impl Future for Main {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
loop {
let mut progress = false;
if let Some(Async::Ready(Some(creds))) =
self.discovery.as_mut().map(|d| d.poll().unwrap())
{
if let Some(ref spirc) = self.spirc {
spirc.shutdown();
}
self.auto_connect_times.clear();
self.credentials(creds);
progress = true;
}
match self.connect.poll() {
Ok(Async::Ready(session)) => {
self.connect = Box::new(futures::future::empty());
let mixer_config = self.mixer_config.clone();
let mixer = (self.mixer)(Some(mixer_config));
let player_config = self.player_config.clone();
let connect_config = self.connect_config.clone();
let audio_filter = mixer.get_audio_filter();
let backend = self.backend;
let device = self.device.clone();
let (player, event_channel) =
Player::new(player_config, session.clone(), audio_filter, move || {
(backend)(device)
});
if self.emit_sink_events {
if let Some(player_event_program) = &self.player_event_program {
let player_event_program = player_event_program.clone();
player.set_sink_event_callback(Some(Box::new(move |sink_status| {
emit_sink_event(sink_status, &player_event_program)
})));
}
}
let (spirc, spirc_task) = Spirc::new(connect_config, session, player, mixer);
self.spirc = Some(spirc);
self.spirc_task = Some(spirc_task);
self.player_event_channel = Some(event_channel);
progress = true;
}
Ok(Async::NotReady) => (),
Err(error) => {
error!("Could not connect to server: {}", error);
self.connect = Box::new(futures::future::empty());
}
}
if let Async::Ready(Some(())) = self.signal.poll().unwrap() {
trace!("Ctrl-C received");
if !self.shutdown {
if let Some(ref spirc) = self.spirc {
spirc.shutdown();
} else {
return Ok(Async::Ready(()));
}
self.shutdown = true;
} else {
return Ok(Async::Ready(()));
}
progress = true;
}
let mut drop_spirc_and_try_to_reconnect = false;
if let Some(ref mut spirc_task) = self.spirc_task {
if let Async::Ready(()) = spirc_task.poll().unwrap() {
if self.shutdown {
return Ok(Async::Ready(()));
} else {
warn!("Spirc shut down unexpectedly");
drop_spirc_and_try_to_reconnect = true;
}
progress = true;
}
}
if drop_spirc_and_try_to_reconnect {
self.spirc_task = None;
while (!self.auto_connect_times.is_empty())
&& ((Instant::now() - self.auto_connect_times[0]).as_secs() > 600)
{
let _ = self.auto_connect_times.remove(0);
}
if let Some(credentials) = self.last_credentials.clone() {
if self.auto_connect_times.len() >= 5 {
warn!("Spirc shut down too often. Not reconnecting automatically.");
} else {
self.auto_connect_times.push(Instant::now());
self.credentials(credentials);
}
}
}
if let Some(ref mut player_event_channel) = self.player_event_channel {
if let Async::Ready(Some(event)) = player_event_channel.poll().unwrap() {
progress = true;
if let Some(ref program) = self.player_event_program {
if let Some(child) = run_program_on_events(event, program) {
let child = child
.expect("program failed to start")
.map(|status| {
if !status.success() {
error!("child exited with status {:?}", status.code());
}
})
.map_err(|e| error!("failed to wait on child process: {}", e));
current_thread::spawn(child);
}
}
}
}
if !progress {
return Ok(Async::NotReady);
}
}
}
}
fn main() {
if env::var("RUST_BACKTRACE").is_err() {
env::set_var("RUST_BACKTRACE", "full")
}
let args: Vec<String> = std::env::args().collect();
let mut runtime = Runtime::new().unwrap();
let handle = runtime.handle();
runtime.block_on(Main::new(handle, setup(&args))).unwrap();
runtime.run().unwrap();
// current_thread::block_on_all(Main::new(setup(&args))).unwrap()
}