Merge pull request #581 from ashthespy/tokio_migration

Tokio migration
This commit is contained in:
Ash 2021-01-26 19:56:27 +01:00 committed by GitHub
commit aa90278ab6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
36 changed files with 2574 additions and 3805 deletions

3286
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -15,49 +15,35 @@ edition = "2018"
name = "librespot"
path = "src/lib.rs"
[[bin]]
name = "librespot"
path = "src/main.rs"
doc = false
# [[bin]]
# name = "librespot"
# path = "src/main.rs"
# doc = false
[dependencies.librespot-audio]
path = "audio"
version = "0.1.3"
[dependencies.librespot-connect]
path = "connect"
version = "0.1.3"
# [dependencies.librespot-connect]
# path = "connect"
# version = "0.1.3"
[dependencies.librespot-core]
path = "core"
version = "0.1.3"
[dependencies.librespot-metadata]
path = "metadata"
version = "0.1.3"
[dependencies.librespot-playback]
path = "playback"
version = "0.1.3"
[dependencies.librespot-protocol]
path = "protocol"
version = "0.1.3"
[dependencies]
base64 = "0.13"
env_logger = {version = "0.8", default-features = false, features = ["termcolor","humantime","atty"]}
futures = "0.1"
getopts = "0.2"
hyper = "0.11"
log = "0.4"
num-bigint = "0.3"
protobuf = "~2.14.0"
rand = "0.7"
rpassword = "5.0"
tokio-core = "0.1"
tokio-io = "0.1"
tokio-process = "0.2"
tokio-signal = "0.2"
url = "1.7"
sha-1 = "0.8"
hex = "0.4"
[features]
alsa-backend = ["librespot-playback/alsa-backend"]
portaudio-backend = ["librespot-playback/portaudio-backend"]
@ -70,7 +56,7 @@ gstreamer-backend = ["librespot-playback/gstreamer-backend"]
with-tremor = ["librespot-audio/with-tremor"]
with-vorbis = ["librespot-audio/with-vorbis"]
with-dns-sd = ["librespot-connect/with-dns-sd"]
# with-dns-sd = ["librespot-connect/with-dns-sd"]
default = ["librespot-playback/rodio-backend"]

View file

@ -11,16 +11,17 @@ path = "../core"
version = "0.1.3"
[dependencies]
aes-ctr = "0.6"
bit-set = "0.5"
byteorder = "1.3"
bytes = "0.4"
futures = "0.1"
lewton = "0.9"
byteorder = "1.4"
bytes = "1.0"
futures = "0.3"
lewton = "0.10"
log = "0.4"
num-bigint = "0.3"
num-traits = "0.2"
pin-project-lite = "0.2.4"
tempfile = "3.1"
aes-ctr = "0.3"
librespot-tremor = { version = "0.1.0", optional = true }
vorbis = { version ="0.0.14", optional = true }

View file

@ -1,7 +1,7 @@
use std::io;
use aes_ctr::stream_cipher::generic_array::GenericArray;
use aes_ctr::stream_cipher::{NewStreamCipher, SyncStreamCipher, SyncStreamCipherSeek};
use aes_ctr::cipher::generic_array::GenericArray;
use aes_ctr::cipher::{NewStreamCipher, SyncStreamCipher, SyncStreamCipherSeek};
use aes_ctr::Aes128Ctr;
use librespot_core::audio_key::AudioKey;

View file

@ -1,17 +1,25 @@
use crate::range_set::{Range, RangeSet};
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use bytes::Bytes;
use futures::sync::{mpsc, oneshot};
use futures::Stream;
use futures::{Async, Future, Poll};
use std::cmp::{max, min};
use futures::{
channel::{mpsc, oneshot},
future,
};
use futures::{Future, Stream, StreamExt, TryFutureExt, TryStreamExt};
use std::fs;
use std::io::{self, Read, Seek, SeekFrom, Write};
use std::sync::{Arc, Condvar, Mutex};
use std::task::Poll;
use std::time::{Duration, Instant};
use std::{
cmp::{max, min},
pin::Pin,
task::Context,
};
use tempfile::NamedTempFile;
use futures::sync::mpsc::unbounded;
use futures::channel::mpsc::unbounded;
use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders};
use librespot_core::session::Session;
use librespot_core::spotify_id::FileId;
@ -88,22 +96,6 @@ pub enum AudioFile {
Streaming(AudioFileStreaming),
}
pub enum AudioFileOpen {
Cached(Option<fs::File>),
Streaming(AudioFileOpenStreaming),
}
pub struct AudioFileOpenStreaming {
session: Session,
initial_data_rx: Option<ChannelData>,
initial_data_length: Option<usize>,
initial_request_sent_time: Instant,
headers: ChannelHeaders,
file_id: FileId,
complete_tx: Option<oneshot::Sender<NamedTempFile>>,
streaming_data_rate: usize,
}
enum StreamLoaderCommand {
Fetch(Range), // signal the stream loader to fetch a range of the file
RandomAccessMode(), // optimise download strategy for random access
@ -120,45 +112,36 @@ pub struct StreamLoaderController {
impl StreamLoaderController {
pub fn len(&self) -> usize {
return self.file_size;
self.file_size
}
pub fn is_empty(&self) -> bool {
self.file_size == 0
}
pub fn range_available(&self, range: Range) -> bool {
if let Some(ref shared) = self.stream_shared {
let download_status = shared.download_status.lock().unwrap();
if range.length
range.length
<= download_status
.downloaded
.contained_length_from_value(range.start)
{
return true;
} else {
return false;
}
} else {
if range.length <= self.len() - range.start {
return true;
} else {
return false;
}
range.length <= self.len() - range.start
}
}
pub fn range_to_end_available(&self) -> bool {
if let Some(ref shared) = self.stream_shared {
self.stream_shared.as_ref().map_or(true, |shared| {
let read_position = shared.read_position.load(atomic::Ordering::Relaxed);
self.range_available(Range::new(read_position, self.len() - read_position))
} else {
true
}
})
}
pub fn ping_time_ms(&self) -> usize {
if let Some(ref shared) = self.stream_shared {
return shared.ping_time_ms.load(atomic::Ordering::Relaxed);
} else {
return 0;
}
self.stream_shared.as_ref().map_or(0, |shared| {
shared.ping_time_ms.load(atomic::Ordering::Relaxed)
})
}
fn send_stream_loader_command(&mut self, command: StreamLoaderCommand) {
@ -216,27 +199,23 @@ impl StreamLoaderController {
}
pub fn fetch_next(&mut self, length: usize) {
let range: Range = if let Some(ref shared) = self.stream_shared {
Range {
if let Some(ref shared) = self.stream_shared {
let range = Range {
start: shared.read_position.load(atomic::Ordering::Relaxed),
length: length,
}
} else {
return;
};
self.fetch(range);
};
self.fetch(range)
}
}
pub fn fetch_next_blocking(&mut self, length: usize) {
let range: Range = if let Some(ref shared) = self.stream_shared {
Range {
if let Some(ref shared) = self.stream_shared {
let range = Range {
start: shared.read_position.load(atomic::Ordering::Relaxed),
length: length,
}
} else {
return;
};
self.fetch_blocking(range);
};
self.fetch_blocking(range);
}
}
pub fn set_random_access_mode(&mut self) {
@ -288,108 +267,16 @@ struct AudioFileShared {
read_position: AtomicUsize,
}
impl AudioFileOpenStreaming {
fn finish(&mut self, size: usize) -> AudioFileStreaming {
let shared = Arc::new(AudioFileShared {
file_id: self.file_id,
file_size: size,
stream_data_rate: self.streaming_data_rate,
cond: Condvar::new(),
download_status: Mutex::new(AudioFileDownloadStatus {
requested: RangeSet::new(),
downloaded: RangeSet::new(),
}),
download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise
number_of_open_requests: AtomicUsize::new(0),
ping_time_ms: AtomicUsize::new(0),
read_position: AtomicUsize::new(0),
});
let mut write_file = NamedTempFile::new().unwrap();
write_file.as_file().set_len(size as u64).unwrap();
write_file.seek(SeekFrom::Start(0)).unwrap();
let read_file = write_file.reopen().unwrap();
let initial_data_rx = self.initial_data_rx.take().unwrap();
let initial_data_length = self.initial_data_length.take().unwrap();
let complete_tx = self.complete_tx.take().unwrap();
//let (seek_tx, seek_rx) = mpsc::unbounded();
let (stream_loader_command_tx, stream_loader_command_rx) =
mpsc::unbounded::<StreamLoaderCommand>();
let fetcher = AudioFileFetch::new(
self.session.clone(),
shared.clone(),
initial_data_rx,
self.initial_request_sent_time,
initial_data_length,
write_file,
stream_loader_command_rx,
complete_tx,
);
self.session.spawn(move |_| fetcher);
AudioFileStreaming {
read_file: read_file,
position: 0,
//seek: seek_tx,
stream_loader_command_tx: stream_loader_command_tx,
shared: shared,
}
}
}
impl Future for AudioFileOpen {
type Item = AudioFile;
type Error = ChannelError;
fn poll(&mut self) -> Poll<AudioFile, ChannelError> {
match *self {
AudioFileOpen::Streaming(ref mut open) => {
let file = try_ready!(open.poll());
Ok(Async::Ready(AudioFile::Streaming(file)))
}
AudioFileOpen::Cached(ref mut file) => {
let file = file.take().unwrap();
Ok(Async::Ready(AudioFile::Cached(file)))
}
}
}
}
impl Future for AudioFileOpenStreaming {
type Item = AudioFileStreaming;
type Error = ChannelError;
fn poll(&mut self) -> Poll<AudioFileStreaming, ChannelError> {
loop {
let (id, data) = try_ready!(self.headers.poll()).unwrap();
if id == 0x3 {
let size = BigEndian::read_u32(&data) as usize * 4;
let file = self.finish(size);
return Ok(Async::Ready(file));
}
}
}
}
impl AudioFile {
pub fn open(
pub async fn open(
session: &Session,
file_id: FileId,
bytes_per_second: usize,
play_from_beginning: bool,
) -> AudioFileOpen {
let cache = session.cache().cloned();
if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) {
) -> Result<AudioFile, ChannelError> {
if let Some(file) = session.cache().and_then(|cache| cache.file(file_id)) {
debug!("File {} already in cache", file_id);
return AudioFileOpen::Cached(Some(file));
return Ok(AudioFile::Cached(file));
}
debug!("Downloading file {}", file_id);
@ -411,56 +298,112 @@ impl AudioFile {
}
let (headers, data) = request_range(session, file_id, 0, initial_data_length).split();
let open = AudioFileOpenStreaming {
session: session.clone(),
file_id: file_id,
headers: headers,
initial_data_rx: Some(data),
initial_data_length: Some(initial_data_length),
initial_request_sent_time: Instant::now(),
complete_tx: Some(complete_tx),
streaming_data_rate: bytes_per_second,
};
let streaming = AudioFileStreaming::open(
session.clone(),
data,
initial_data_length,
Instant::now(),
headers,
file_id,
complete_tx,
bytes_per_second,
);
let session_ = session.clone();
session.spawn(move |_| {
complete_rx
.map(move |mut file| {
if let Some(cache) = session_.cache() {
cache.save_file(file_id, &mut file);
debug!("File {} complete, saving to cache", file_id);
} else {
debug!("File {} complete", file_id);
}
})
.or_else(|oneshot::Canceled| Ok(()))
});
session.spawn(complete_rx.map_ok(move |mut file| {
if let Some(cache) = session_.cache() {
cache.save_file(file_id, &mut file);
debug!("File {} complete, saving to cache", file_id);
} else {
debug!("File {} complete", file_id);
}
}));
return AudioFileOpen::Streaming(open);
Ok(AudioFile::Streaming(streaming.await?))
}
pub fn get_stream_loader_controller(&self) -> StreamLoaderController {
match self {
AudioFile::Streaming(ref stream) => {
return StreamLoaderController {
channel_tx: Some(stream.stream_loader_command_tx.clone()),
stream_shared: Some(stream.shared.clone()),
file_size: stream.shared.file_size,
};
}
AudioFile::Cached(ref file) => {
return StreamLoaderController {
channel_tx: None,
stream_shared: None,
file_size: file.metadata().unwrap().len() as usize,
};
}
AudioFile::Streaming(ref stream) => StreamLoaderController {
channel_tx: Some(stream.stream_loader_command_tx.clone()),
stream_shared: Some(stream.shared.clone()),
file_size: stream.shared.file_size,
},
AudioFile::Cached(ref file) => StreamLoaderController {
channel_tx: None,
stream_shared: None,
file_size: file.metadata().unwrap().len() as usize,
},
}
}
}
impl AudioFileStreaming {
pub async fn open(
session: Session,
initial_data_rx: ChannelData,
initial_data_length: usize,
initial_request_sent_time: Instant,
headers: ChannelHeaders,
file_id: FileId,
complete_tx: oneshot::Sender<NamedTempFile>,
streaming_data_rate: usize,
) -> Result<AudioFileStreaming, ChannelError> {
let (_, data) = headers
.try_filter(|(id, _)| future::ready(*id == 0x3))
.next()
.await
.unwrap()?;
let size = BigEndian::read_u32(&data) as usize * 4;
let shared = Arc::new(AudioFileShared {
file_id: file_id,
file_size: size,
stream_data_rate: streaming_data_rate,
cond: Condvar::new(),
download_status: Mutex::new(AudioFileDownloadStatus {
requested: RangeSet::new(),
downloaded: RangeSet::new(),
}),
download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise
number_of_open_requests: AtomicUsize::new(0),
ping_time_ms: AtomicUsize::new(0),
read_position: AtomicUsize::new(0),
});
let mut write_file = NamedTempFile::new().unwrap();
write_file.as_file().set_len(size as u64).unwrap();
write_file.seek(SeekFrom::Start(0)).unwrap();
let read_file = write_file.reopen().unwrap();
//let (seek_tx, seek_rx) = mpsc::unbounded();
let (stream_loader_command_tx, stream_loader_command_rx) =
mpsc::unbounded::<StreamLoaderCommand>();
let fetcher = AudioFileFetch::new(
session.clone(),
shared.clone(),
initial_data_rx,
initial_request_sent_time,
initial_data_length,
write_file,
stream_loader_command_rx,
complete_tx,
);
session.spawn(fetcher);
Ok(AudioFileStreaming {
read_file: read_file,
position: 0,
//seek: seek_tx,
stream_loader_command_tx: stream_loader_command_tx,
shared: shared,
})
}
}
fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel {
assert!(
offset % 4 == 0,
@ -502,146 +445,267 @@ enum ReceivedData {
Data(PartialFileData),
}
struct AudioFileFetchDataReceiver {
async fn audio_file_fetch_receive_data(
shared: Arc<AudioFileShared>,
file_data_tx: mpsc::UnboundedSender<ReceivedData>,
data_rx: ChannelData,
initial_data_offset: usize,
initial_request_length: usize,
data_offset: usize,
request_length: usize,
request_sent_time: Option<Instant>,
measure_ping_time: bool,
}
request_sent_time: Instant,
) {
let mut data_offset = initial_data_offset;
let mut request_length = initial_request_length;
let mut measure_ping_time = shared
.number_of_open_requests
.load(atomic::Ordering::SeqCst)
== 0;
impl AudioFileFetchDataReceiver {
fn new(
shared: Arc<AudioFileShared>,
file_data_tx: mpsc::UnboundedSender<ReceivedData>,
data_rx: ChannelData,
data_offset: usize,
request_length: usize,
request_sent_time: Instant,
) -> AudioFileFetchDataReceiver {
let measure_ping_time = shared
.number_of_open_requests
.load(atomic::Ordering::SeqCst)
== 0;
shared
.number_of_open_requests
.fetch_add(1, atomic::Ordering::SeqCst);
shared
.number_of_open_requests
.fetch_add(1, atomic::Ordering::SeqCst);
enum TryFoldErr {
ChannelError,
FinishEarly,
}
AudioFileFetchDataReceiver {
shared: shared,
data_rx: data_rx,
file_data_tx: file_data_tx,
initial_data_offset: data_offset,
initial_request_length: request_length,
data_offset: data_offset,
request_length: request_length,
request_sent_time: Some(request_sent_time),
measure_ping_time: measure_ping_time,
}
let result = data_rx
.map_err(|_| TryFoldErr::ChannelError)
.try_for_each(|data| {
if measure_ping_time {
let duration = Instant::now() - request_sent_time;
let duration_ms: u64;
if 0.001 * (duration.as_millis() as f64)
> MAXIMUM_ASSUMED_PING_TIME_SECONDS
{
duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64;
} else {
duration_ms = duration.as_millis() as u64;
}
let _ = file_data_tx
.unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize));
measure_ping_time = false;
}
let data_size = data.len();
let _ = file_data_tx
.unbounded_send(ReceivedData::Data(PartialFileData {
offset: data_offset,
data: data,
}));
data_offset += data_size;
if request_length < data_size {
warn!("Data receiver for range {} (+{}) received more data from server than requested.", initial_data_offset, initial_request_length);
request_length = 0;
} else {
request_length -= data_size;
}
future::ready(if request_length == 0 {
Err(TryFoldErr::FinishEarly)
} else {
Ok(())
})
})
.await;
if request_length > 0 {
let missing_range = Range::new(data_offset, request_length);
let mut download_status = shared.download_status.lock().unwrap();
download_status.requested.subtract_range(&missing_range);
shared.cond.notify_all();
}
shared
.number_of_open_requests
.fetch_sub(1, atomic::Ordering::SeqCst);
if let Err(TryFoldErr::ChannelError) = result {
warn!(
"Error from channel for data receiver for range {} (+{}).",
initial_data_offset, initial_request_length
);
} else if request_length > 0 {
warn!(
"Data receiver for range {} (+{}) received less data from server than requested.",
initial_data_offset, initial_request_length
);
}
}
/*
async fn audio_file_fetch(
session: Session,
shared: Arc<AudioFileShared>,
initial_data_rx: ChannelData,
initial_request_sent_time: Instant,
initial_data_length: usize,
impl AudioFileFetchDataReceiver {
fn finish(&mut self) {
if self.request_length > 0 {
let missing_range = Range::new(self.data_offset, self.request_length);
output: NamedTempFile,
stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
complete_tx: oneshot::Sender<NamedTempFile>,
) {
let (file_data_tx, file_data_rx) = unbounded::<ReceivedData>();
let mut download_status = self.shared.download_status.lock().unwrap();
download_status.requested.subtract_range(&missing_range);
self.shared.cond.notify_all();
}
let requested_range = Range::new(0, initial_data_length);
let mut download_status = shared.download_status.lock().unwrap();
download_status.requested.add_range(&requested_range);
self.shared
.number_of_open_requests
.fetch_sub(1, atomic::Ordering::SeqCst);
}
}
session.spawn(audio_file_fetch_receive_data(
shared.clone(),
file_data_tx.clone(),
initial_data_rx,
0,
initial_data_length,
initial_request_sent_time,
));
impl Future for AudioFileFetchDataReceiver {
type Item = ();
type Error = ();
let mut network_response_times_ms: Vec::new();
fn poll(&mut self) -> Poll<(), ()> {
loop {
match self.data_rx.poll() {
Ok(Async::Ready(Some(data))) => {
if self.measure_ping_time {
if let Some(request_sent_time) = self.request_sent_time {
let duration = Instant::now() - request_sent_time;
let duration_ms: u64;
if 0.001 * (duration.as_millis() as f64)
> MAXIMUM_ASSUMED_PING_TIME_SECONDS
{
duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64;
} else {
duration_ms = duration.as_millis() as u64;
}
let _ = self
.file_data_tx
.unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize));
self.measure_ping_time = false;
}
}
let data_size = data.len();
let _ = self
.file_data_tx
.unbounded_send(ReceivedData::Data(PartialFileData {
offset: self.data_offset,
data: data,
}));
self.data_offset += data_size;
if self.request_length < data_size {
warn!("Data receiver for range {} (+{}) received more data from server than requested.", self.initial_data_offset, self.initial_request_length);
self.request_length = 0;
} else {
self.request_length -= data_size;
}
if self.request_length == 0 {
self.finish();
return Ok(Async::Ready(()));
}
let f1 = file_data_rx.map(|x| Ok::<_, ()>(x)).try_for_each(|x| {
match x {
ReceivedData::ResponseTimeMs(response_time_ms) => {
trace!("Ping time estimated as: {} ms.", response_time_ms);
// record the response time
network_response_times_ms.push(response_time_ms);
// prune old response times. Keep at most three.
while network_response_times_ms.len() > 3 {
network_response_times_ms.remove(0);
}
Ok(Async::Ready(None)) => {
if self.request_length > 0 {
warn!("Data receiver for range {} (+{}) received less data from server than requested.", self.initial_data_offset, self.initial_request_length);
// stats::median is experimental. So we calculate the median of up to three ourselves.
let ping_time_ms: usize = match network_response_times_ms.len() {
1 => network_response_times_ms[0] as usize,
2 => {
((network_response_times_ms[0] + network_response_times_ms[1]) / 2) as usize
}
3 => {
let mut times = network_response_times_ms.clone();
times.sort();
times[1]
}
_ => unreachable!(),
};
// store our new estimate for everyone to see
shared
.ping_time_ms
.store(ping_time_ms, atomic::Ordering::Relaxed);
}
ReceivedData::Data(data) => {
output
.as_mut()
.unwrap()
.seek(SeekFrom::Start(data.offset as u64))
.unwrap();
output
.as_mut()
.unwrap()
.write_all(data.data.as_ref())
.unwrap();
let mut full = false;
{
let mut download_status = shared.download_status.lock().unwrap();
let received_range = Range::new(data.offset, data.data.len());
download_status.downloaded.add_range(&received_range);
shared.cond.notify_all();
if download_status.downloaded.contained_length_from_value(0)
>= shared.file_size
{
full = true;
}
drop(download_status);
}
if full {
self.finish();
return Ok(Async::Ready(()));
}
Ok(Async::NotReady) => {
return Ok(Async::NotReady);
}
Err(ChannelError) => {
warn!(
"Error from channel for data receiver for range {} (+{}).",
self.initial_data_offset, self.initial_request_length
);
self.finish();
return Ok(Async::Ready(()));
return future::ready(Err(()));
}
}
}
future::ready(Ok(()))
});
let f2 = stream_loader_command_rx.map(Ok::<_, ()>).try_for_each(|x| {
match cmd {
StreamLoaderCommand::Fetch(request) => {
self.download_range(request.start, request.length);
}
StreamLoaderCommand::RandomAccessMode() => {
*(shared.download_strategy.lock().unwrap()) = DownloadStrategy::RandomAccess();
}
StreamLoaderCommand::StreamMode() => {
*(shared.download_strategy.lock().unwrap()) = DownloadStrategy::Streaming();
}
StreamLoaderCommand::Close() => return future::ready(Err(())),
}
Ok(())
});
let f3 = future::poll_fn(|_| {
if let DownloadStrategy::Streaming() = self.get_download_strategy() {
let number_of_open_requests = shared
.number_of_open_requests
.load(atomic::Ordering::SeqCst);
let max_requests_to_send =
MAX_PREFETCH_REQUESTS - min(MAX_PREFETCH_REQUESTS, number_of_open_requests);
if max_requests_to_send > 0 {
let bytes_pending: usize = {
let download_status = shared.download_status.lock().unwrap();
download_status
.requested
.minus(&download_status.downloaded)
.len()
};
let ping_time_seconds =
0.001 * shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
let download_rate = session.channel().get_download_rate_estimate();
let desired_pending_bytes = max(
(PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * shared.stream_data_rate as f64)
as usize,
(FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64)
as usize,
);
if bytes_pending < desired_pending_bytes {
self.pre_fetch_more_data(
desired_pending_bytes - bytes_pending,
max_requests_to_send,
);
}
}
}
Poll::Pending
});
future::select_all(vec![f1, f2, f3]).await
}*/
pin_project! {
struct AudioFileFetch {
session: Session,
shared: Arc<AudioFileShared>,
output: Option<NamedTempFile>,
file_data_tx: mpsc::UnboundedSender<ReceivedData>,
#[pin]
file_data_rx: mpsc::UnboundedReceiver<ReceivedData>,
#[pin]
stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
complete_tx: Option<oneshot::Sender<NamedTempFile>>,
network_response_times_ms: Vec<usize>,
}
}
struct AudioFileFetch {
session: Session,
shared: Arc<AudioFileShared>,
output: Option<NamedTempFile>,
file_data_tx: mpsc::UnboundedSender<ReceivedData>,
file_data_rx: mpsc::UnboundedReceiver<ReceivedData>,
stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
complete_tx: Option<oneshot::Sender<NamedTempFile>>,
network_response_times_ms: Vec<usize>,
}
impl AudioFileFetch {
fn new(
session: Session,
@ -662,16 +726,14 @@ impl AudioFileFetch {
download_status.requested.add_range(&requested_range);
}
let initial_data_receiver = AudioFileFetchDataReceiver::new(
session.spawn(audio_file_fetch_receive_data(
shared.clone(),
file_data_tx.clone(),
initial_data_rx,
0,
initial_data_length,
initial_request_sent_time,
);
session.spawn(move |_| initial_data_receiver);
));
AudioFileFetch {
session: session,
@ -701,7 +763,7 @@ impl AudioFileFetch {
return;
}
if length <= 0 {
if length == 0 {
return;
}
@ -737,16 +799,14 @@ impl AudioFileFetch {
download_status.requested.add_range(range);
let receiver = AudioFileFetchDataReceiver::new(
self.session.spawn(audio_file_fetch_receive_data(
self.shared.clone(),
self.file_data_tx.clone(),
data,
range.start,
range.length,
Instant::now(),
);
self.session.spawn(move |_| receiver);
));
}
}
@ -794,13 +854,11 @@ impl AudioFileFetch {
}
}
fn poll_file_data_rx(&mut self) -> Poll<(), ()> {
fn poll_file_data_rx(&mut self, cx: &mut Context<'_>) -> Poll<()> {
loop {
match self.file_data_rx.poll() {
Ok(Async::Ready(None)) => {
return Ok(Async::Ready(()));
}
Ok(Async::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms)))) => {
match Pin::new(&mut self.file_data_rx).poll_next(cx) {
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms))) => {
trace!("Ping time estimated as: {} ms.", response_time_ms);
// record the response time
@ -821,7 +879,7 @@ impl AudioFileFetch {
}
3 => {
let mut times = self.network_response_times_ms.clone();
times.sort();
times.sort_unstable();
times[1]
}
_ => unreachable!(),
@ -832,7 +890,7 @@ impl AudioFileFetch {
.ping_time_ms
.store(ping_time_ms, atomic::Ordering::Relaxed);
}
Ok(Async::Ready(Some(ReceivedData::Data(data)))) => {
Poll::Ready(Some(ReceivedData::Data(data))) => {
self.output
.as_mut()
.unwrap()
@ -864,39 +922,33 @@ impl AudioFileFetch {
if full {
self.finish();
return Ok(Async::Ready(()));
return Poll::Ready(());
}
}
Ok(Async::NotReady) => {
return Ok(Async::NotReady);
}
Err(()) => unreachable!(),
Poll::Pending => return Poll::Pending,
}
}
}
fn poll_stream_loader_command_rx(&mut self) -> Poll<(), ()> {
fn poll_stream_loader_command_rx(&mut self, cx: &mut Context<'_>) -> Poll<()> {
loop {
match self.stream_loader_command_rx.poll() {
Ok(Async::Ready(None)) => {
return Ok(Async::Ready(()));
}
Ok(Async::Ready(Some(StreamLoaderCommand::Fetch(request)))) => {
self.download_range(request.start, request.length);
}
Ok(Async::Ready(Some(StreamLoaderCommand::RandomAccessMode()))) => {
*(self.shared.download_strategy.lock().unwrap()) =
DownloadStrategy::RandomAccess();
}
Ok(Async::Ready(Some(StreamLoaderCommand::StreamMode()))) => {
*(self.shared.download_strategy.lock().unwrap()) =
DownloadStrategy::Streaming();
}
Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => {
return Ok(Async::Ready(()));
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(()) => unreachable!(),
match Pin::new(&mut self.stream_loader_command_rx).poll_next(cx) {
Poll::Ready(None) => return Poll::Ready(()),
Poll::Ready(Some(cmd)) => match cmd {
StreamLoaderCommand::Fetch(request) => {
self.download_range(request.start, request.length);
}
StreamLoaderCommand::RandomAccessMode() => {
*(self.shared.download_strategy.lock().unwrap()) =
DownloadStrategy::RandomAccess();
}
StreamLoaderCommand::StreamMode() => {
*(self.shared.download_strategy.lock().unwrap()) =
DownloadStrategy::Streaming();
}
StreamLoaderCommand::Close() => return Poll::Ready(()),
},
Poll::Pending => return Poll::Pending,
}
}
}
@ -909,26 +961,16 @@ impl AudioFileFetch {
let _ = complete_tx.send(output);
}
}
impl Future for AudioFileFetch {
type Item = ();
type Error = ();
type Output = ();
fn poll(&mut self) -> Poll<(), ()> {
match self.poll_stream_loader_command_rx() {
Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => {
return Ok(Async::Ready(()));
}
Err(()) => unreachable!(),
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if let Poll::Ready(()) = self.poll_stream_loader_command_rx(cx) {
return Poll::Ready(());
}
match self.poll_file_data_rx() {
Ok(Async::NotReady) => (),
Ok(Async::Ready(_)) => {
return Ok(Async::Ready(()));
}
Err(()) => unreachable!(),
if let Poll::Ready(()) = self.poll_file_data_rx(cx) {
return Poll::Ready(());
}
if let DownloadStrategy::Streaming() = self.get_download_strategy() {
@ -968,8 +1010,7 @@ impl Future for AudioFileFetch {
}
}
}
return Ok(Async::NotReady);
Poll::Pending
}
}
@ -1009,9 +1050,9 @@ impl Read for AudioFileStreaming {
ranges_to_request.subtract_range_set(&download_status.downloaded);
ranges_to_request.subtract_range_set(&download_status.requested);
for range in ranges_to_request.iter() {
for &range in ranges_to_request.iter() {
self.stream_loader_command_tx
.unbounded_send(StreamLoaderCommand::Fetch(range.clone()))
.unbounded_send(StreamLoaderCommand::Fetch(range))
.unwrap();
}
@ -1058,7 +1099,7 @@ impl Read for AudioFileStreaming {
.read_position
.store(self.position as usize, atomic::Ordering::Relaxed);
return Ok(read_len);
Ok(read_len)
}
}

View file

@ -1,12 +1,15 @@
#[macro_use]
extern crate futures;
#![allow(clippy::unused_io_amount)]
#[macro_use]
extern crate log;
#[macro_use]
extern crate pin_project_lite;
extern crate aes_ctr;
extern crate bit_set;
extern crate byteorder;
extern crate bytes;
extern crate futures;
extern crate num_bigint;
extern crate num_traits;
extern crate tempfile;
@ -24,7 +27,7 @@ mod libvorbis_decoder;
mod range_set;
pub use decrypt::AudioDecrypt;
pub use fetch::{AudioFile, AudioFileOpen, StreamLoaderController};
pub use fetch::{AudioFile, StreamLoaderController};
pub use fetch::{
READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_BEFORE_PLAYBACK_SECONDS,
READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS,

View file

@ -54,11 +54,7 @@ impl RangeSet {
}
pub fn len(&self) -> usize {
let mut result = 0;
for range in self.ranges.iter() {
result += range.length;
}
return result;
self.ranges.iter().map(|r| r.length).fold(0, std::ops::Add::add)
}
pub fn get_range(&self, index: usize) -> Range {
@ -98,12 +94,12 @@ impl RangeSet {
return false;
}
}
return true;
true
}
pub fn add_range(&mut self, range: &Range) {
if range.length <= 0 {
// the interval is empty or invalid -> nothing to do.
if range.length == 0 {
// the interval is empty -> nothing to do.
return;
}
@ -111,7 +107,7 @@ impl RangeSet {
// the new range is clear of any ranges we already iterated over.
if range.end() < self.ranges[index].start {
// the new range starts after anything we already passed and ends before the next range starts (they don't touch) -> insert it.
self.ranges.insert(index, range.clone());
self.ranges.insert(index, *range);
return;
} else if range.start <= self.ranges[index].end()
&& self.ranges[index].start <= range.end()
@ -119,7 +115,7 @@ impl RangeSet {
// the new range overlaps (or touches) the first range. They are to be merged.
// In addition we might have to merge further ranges in as well.
let mut new_range = range.clone();
let mut new_range = *range;
while index < self.ranges.len() && self.ranges[index].start <= new_range.end() {
let new_end = max(new_range.end(), self.ranges[index].end());
@ -134,7 +130,7 @@ impl RangeSet {
}
// the new range is after everything else -> just add it
self.ranges.push(range.clone());
self.ranges.push(*range);
}
#[allow(dead_code)]
@ -152,7 +148,7 @@ impl RangeSet {
}
pub fn subtract_range(&mut self, range: &Range) {
if range.length <= 0 {
if range.length == 0 {
return;
}

View file

@ -20,7 +20,7 @@ version = "0.1.3"
[dependencies]
base64 = "0.13"
futures = "0.1"
hyper = "0.11"
hyper = "0.12"
log = "0.4"
num-bigint = "0.3"
protobuf = "~2.14.0"
@ -28,7 +28,7 @@ rand = "0.7"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
tokio-core = "0.1"
tokio = "0.1"
url = "1.7"
sha-1 = "0.8"
hmac = "0.7"
@ -38,6 +38,7 @@ block-modes = "0.3"
dns-sd = { version = "0.1.3", optional = true }
libmdns = { version = "0.2.7", optional = true }
[features]
default = ["libmdns"]
with-dns-sd = ["dns-sd"]

View file

@ -5,8 +5,10 @@ use base64;
use futures::sync::mpsc;
use futures::{Future, Poll, Stream};
use hmac::{Hmac, Mac};
use hyper::server::{Http, Request, Response, Service};
use hyper::{self, Get, Post, StatusCode};
use hyper::{
self, server::conn::Http, service::Service, Body, Method, Request, Response, StatusCode,
};
use sha1::{Digest, Sha1};
#[cfg(feature = "with-dns-sd")]
@ -20,7 +22,7 @@ use rand;
use std::collections::BTreeMap;
use std::io;
use std::sync::Arc;
use tokio_core::reactor::Handle;
use tokio::runtime::current_thread::Handle;
use url;
use librespot_core::authentication::Credentials;
@ -67,7 +69,7 @@ impl Discovery {
fn handle_get_info(
&self,
_params: &BTreeMap<String, String>,
) -> ::futures::Finished<Response, hyper::Error> {
) -> ::futures::Finished<Response<hyper::Body>, hyper::Error> {
let public_key = self.0.public_key.to_bytes_be();
let public_key = base64::encode(&public_key);
@ -88,13 +90,13 @@ impl Discovery {
});
let body = result.to_string();
::futures::finished(Response::new().with_body(body))
::futures::finished(Response::new(Body::from(body)))
}
fn handle_add_user(
&self,
params: &BTreeMap<String, String>,
) -> ::futures::Finished<Response, hyper::Error> {
) -> ::futures::Finished<Response<hyper::Body>, hyper::Error> {
let username = params.get("userName").unwrap();
let encrypted_blob = params.get("blob").unwrap();
let client_key = params.get("clientKey").unwrap();
@ -136,7 +138,7 @@ impl Discovery {
});
let body = result.to_string();
return ::futures::finished(Response::new().with_body(body));
return ::futures::finished(Response::new(Body::from(body)));
}
let decrypted = {
@ -161,30 +163,33 @@ impl Discovery {
});
let body = result.to_string();
::futures::finished(Response::new().with_body(body))
return ::futures::finished(Response::new(Body::from(body)));
}
fn not_found(&self) -> ::futures::Finished<Response, hyper::Error> {
::futures::finished(Response::new().with_status(StatusCode::NotFound))
fn not_found(&self) -> ::futures::Finished<Response<hyper::Body>, hyper::Error> {
let mut res = Response::default();
*res.status_mut() = StatusCode::NOT_FOUND;
::futures::finished(res)
}
}
impl Service for Discovery {
type Request = Request;
type Response = Response;
type ReqBody = Body;
type ResBody = Body;
type Error = hyper::Error;
type Future = Box<dyn Future<Item = Response, Error = hyper::Error>>;
fn call(&self, request: Request) -> Self::Future {
type Future = Box<dyn Future<Item = Response<(Self::ResBody)>, Error = hyper::Error> + Send>;
fn call(&mut self, request: Request<(Self::ReqBody)>) -> Self::Future {
let mut params = BTreeMap::new();
let (method, uri, _, _, body) = request.deconstruct();
if let Some(query) = uri.query() {
let (parts, body) = request.into_parts();
if let Some(query) = parts.uri.query() {
params.extend(url::form_urlencoded::parse(query.as_bytes()).into_owned());
}
if method != Get {
debug!("{:?} {:?} {:?}", method, uri.path(), params);
if parts.method != Method::GET {
debug!("{:?} {:?} {:?}", parts.method, parts.uri.path(), params);
}
let this = self.clone();
@ -198,9 +203,9 @@ impl Service for Discovery {
params
})
.and_then(move |params| {
match (method, params.get("action").map(AsRef::as_ref)) {
(Get, Some("getInfo")) => this.handle_get_info(&params),
(Post, Some("addUser")) => this.handle_add_user(&params),
match (parts.method, params.get("action").map(AsRef::as_ref)) {
(Method::GET, Some("getInfo")) => this.handle_get_info(&params),
(Method::POST, Some("addUser")) => this.handle_add_user(&params),
_ => this.not_found(),
}
}),
@ -230,11 +235,9 @@ pub fn discovery(
let serve = {
let http = Http::new();
http.serve_addr_handle(
&format!("0.0.0.0:{}", port).parse().unwrap(),
&handle,
move || Ok(discovery.clone()),
)
http.serve_addr(&format!("0.0.0.0:{}", port).parse().unwrap(), move || {
Ok(discovery.clone())
})
.unwrap()
};
@ -244,13 +247,18 @@ pub fn discovery(
let server_future = {
let handle = handle.clone();
serve
.for_each(move |connection| {
handle.spawn(connection.then(|_| Ok(())));
Ok(())
})
.for_each(
move |connecting: hyper::server::conn::Connecting<
hyper::server::conn::AddrStream,
futures::Failed<_, hyper::Error>,
>| {
handle.spawn(connecting.flatten().then(|_| Ok(()))).unwrap();
Ok(())
},
)
.then(|_| Ok(()))
};
handle.spawn(server_future);
handle.spawn(server_future).unwrap();
#[cfg(feature = "with-dns-sd")]
let svc = DNSService::register(

View file

@ -12,7 +12,7 @@ extern crate hyper;
extern crate num_bigint;
extern crate protobuf;
extern crate rand;
extern crate tokio_core;
extern crate tokio;
extern crate url;
extern crate aes_ctr;

View file

@ -13,35 +13,36 @@ path = "../protocol"
version = "0.1.3"
[dependencies]
aes = "0.6"
base64 = "0.13"
byteorder = "1.3"
bytes = "0.4"
error-chain = { version = "0.12", default_features = false }
futures = "0.1"
byteorder = "1.4"
bytes = "1.0"
futures = { version = "0.3", features = ["bilock", "unstable"] }
hmac = "0.7"
httparse = "1.3"
hyper = "0.11"
hyper-proxy = { version = "0.4", default_features = false }
lazy_static = "1.3"
hyper = { version = "0.14", features = ["client", "tcp", "http1", "http2"] }
log = "0.4"
num-bigint = "0.3"
num-integer = "0.1"
num-traits = "0.2"
once_cell = "1.5.2"
pbkdf2 = "0.3"
pin-project-lite = "0.2.4"
protobuf = "~2.14.0"
rand = "0.7"
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
sha-1 = "~0.8"
shannon = "0.2.0"
tokio-codec = "0.1"
tokio-core = "0.1"
tokio-io = "0.1"
tokio = { version = "1.0", features = ["io-util", "rt-multi-thread"] }
tokio-util = { version = "0.6", features = ["codec"] }
url = "1.7"
uuid = { version = "0.8", features = ["v4"] }
sha-1 = "0.8"
hmac = "0.7"
pbkdf2 = "0.3"
aes = "0.3"
[build-dependencies]
rand = "0.7"
vergen = "3.0.4"
[dev-dependencies]
tokio = {version = "1.0", features = ["macros"] }

View file

@ -1,101 +1,69 @@
const AP_FALLBACK: &'static str = "ap.spotify.com:443";
const APRESOLVE_ENDPOINT: &'static str = "http://apresolve.spotify.com/";
use futures::{Future, Stream};
use hyper::client::HttpConnector;
use hyper::{self, Client, Method, Request, Uri};
use hyper_proxy::{Intercept, Proxy, ProxyConnector};
use serde_json;
use std::str::FromStr;
use tokio_core::reactor::Handle;
use hyper::{Body, Client, Method, Request, Uri};
use std::error::Error;
use url::Url;
error_chain! {}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct APResolveData {
ap_list: Vec<String>,
}
fn apresolve(
handle: &Handle,
proxy: &Option<Url>,
ap_port: &Option<u16>,
) -> Box<dyn Future<Item = String, Error = Error>> {
let url = Uri::from_str(APRESOLVE_ENDPOINT).expect("invalid AP resolve URL");
let use_proxy = proxy.is_some();
async fn apresolve(proxy: &Option<Url>, ap_port: &Option<u16>) -> Result<String, Box<dyn Error>> {
let port = ap_port.unwrap_or(443);
let mut req = Request::new(Method::Get, url.clone());
let response = match *proxy {
Some(ref val) => {
let proxy_url = Uri::from_str(val.as_str()).expect("invalid http proxy");
let proxy = Proxy::new(Intercept::All, proxy_url);
let connector = HttpConnector::new(4, handle);
let req = Request::builder()
.method(Method::GET)
.uri(
APRESOLVE_ENDPOINT
.parse::<Uri>()
.expect("invalid AP resolve URL"),
)
.body(Body::empty())?;
let client = if proxy.is_some() {
todo!("proxies not yet supported")
/*let proxy = {
let proxy_url = val.as_str().parse().expect("invalid http proxy");
let mut proxy = Proxy::new(Intercept::All, proxy_url);
let connector = HttpConnector::new();
let proxy_connector = ProxyConnector::from_proxy_unsecured(connector, proxy);
if let Some(headers) = proxy_connector.http_headers(&url) {
req.headers_mut().extend(headers.iter());
req.set_proxy(true);
}
let client = Client::configure().connector(proxy_connector).build(handle);
client.request(req)
}
_ => {
let client = Client::new(handle);
client.request(req)
}
proxy_connector
};
if let Some(headers) = proxy.http_headers(&APRESOLVE_ENDPOINT.parse().unwrap()) {
req.headers_mut().extend(headers.clone());
};
Client::builder().build(proxy)*/
} else {
Client::new()
};
let body = response.and_then(|response| {
response.body().fold(Vec::new(), |mut acc, chunk| {
acc.extend_from_slice(chunk.as_ref());
Ok::<_, hyper::Error>(acc)
})
});
let body = body.then(|result| result.chain_err(|| "HTTP error"));
let body =
body.and_then(|body| String::from_utf8(body).chain_err(|| "invalid UTF8 in response"));
let response = client.request(req).await?;
let data = body
.and_then(|body| serde_json::from_str::<APResolveData>(&body).chain_err(|| "invalid JSON"));
let body = hyper::body::to_bytes(response.into_body()).await?;
let data: APResolveData = serde_json::from_slice(body.as_ref())?;
let p = ap_port.clone();
let ap = data.and_then(move |data| {
let mut aps = data.ap_list.iter().filter(|ap| {
if p.is_some() {
Uri::from_str(ap).ok().map_or(false, |uri| {
uri.port().map_or(false, |port| port == p.unwrap())
})
} else if use_proxy {
// It is unlikely that the proxy will accept CONNECT on anything other than 443.
Uri::from_str(ap)
.ok()
.map_or(false, |uri| uri.port().map_or(false, |port| port == 443))
let ap = if ap_port.is_some() || proxy.is_some() {
data.ap_list.into_iter().find_map(|ap| {
if ap.parse::<Uri>().ok()?.port()? == port {
Some(ap)
} else {
true
None
}
});
let ap = aps.next().ok_or("empty AP List")?;
Ok(ap.clone())
});
Box::new(ap)
})
} else {
data.ap_list.into_iter().next()
}
.ok_or("empty AP List")?;
Ok(ap)
}
pub(crate) fn apresolve_or_fallback<E>(
handle: &Handle,
proxy: &Option<Url>,
ap_port: &Option<u16>,
) -> Box<dyn Future<Item = String, Error = E>>
where
E: 'static,
{
let ap = apresolve(handle, proxy, ap_port).or_else(|e| {
warn!("Failed to resolve Access Point: {}", e.description());
pub async fn apresolve_or_fallback(proxy: &Option<Url>, ap_port: &Option<u16>) -> String {
apresolve(proxy, ap_port).await.unwrap_or_else(|e| {
warn!("Failed to resolve Access Point: {}", e);
warn!("Using fallback \"{}\"", AP_FALLBACK);
Ok(AP_FALLBACK.into())
});
Box::new(ap)
AP_FALLBACK.into()
})
}

View file

@ -1,7 +1,6 @@
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use bytes::Bytes;
use futures::sync::oneshot;
use futures::{Async, Future, Poll};
use futures::channel::oneshot;
use std::collections::HashMap;
use std::io::Write;
@ -47,7 +46,7 @@ impl AudioKeyManager {
}
}
pub fn request(&self, track: SpotifyId, file: FileId) -> AudioKeyFuture<AudioKey> {
pub async fn request(&self, track: SpotifyId, file: FileId) -> Result<AudioKey, AudioKeyError> {
let (tx, rx) = oneshot::channel();
let seq = self.lock(move |inner| {
@ -57,7 +56,7 @@ impl AudioKeyManager {
});
self.send_key_request(seq, track, file);
AudioKeyFuture(rx)
rx.await.map_err(|_| AudioKeyError)?
}
fn send_key_request(&self, seq: u32, track: SpotifyId, file: FileId) {
@ -70,18 +69,3 @@ impl AudioKeyManager {
self.session().send_packet(0xc, data)
}
}
pub struct AudioKeyFuture<T>(oneshot::Receiver<Result<T, AudioKeyError>>);
impl<T> Future for AudioKeyFuture<T> {
type Item = T;
type Error = AudioKeyError;
fn poll(&mut self) -> Poll<T, AudioKeyError> {
match self.0.poll() {
Ok(Async::Ready(Ok(value))) => Ok(Async::Ready(value)),
Ok(Async::Ready(Err(err))) => Err(err),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(oneshot::Canceled) => Err(AudioKeyError),
}
}
}

View file

@ -1,11 +1,9 @@
use aes::Aes192;
use base64;
use aes::NewBlockCipher;
use byteorder::{BigEndian, ByteOrder};
use hmac::Hmac;
use pbkdf2::pbkdf2;
use protobuf::ProtobufEnum;
use serde;
use serde_json;
use sha1::{Digest, Sha1};
use std::fs::File;
use std::io::{self, Read, Write};
@ -76,9 +74,9 @@ impl Credentials {
// decrypt data using ECB mode without padding
let blob = {
use aes::block_cipher_trait::generic_array::typenum::Unsigned;
use aes::block_cipher_trait::generic_array::GenericArray;
use aes::block_cipher_trait::BlockCipher;
use aes::cipher::generic_array::typenum::Unsigned;
use aes::cipher::generic_array::GenericArray;
use aes::cipher::BlockCipher;
let mut data = base64::decode(encrypted_blob).unwrap();
let cipher = Aes192::new(GenericArray::from_slice(&key));

View file

@ -1,9 +1,12 @@
use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes;
use futures::sync::{mpsc, BiLock};
use futures::{Async, Poll, Stream};
use std::collections::HashMap;
use std::time::Instant;
use futures::{channel::mpsc, lock::BiLock, Stream, StreamExt};
use std::{
collections::HashMap,
pin::Pin,
task::{Context, Poll},
time::Instant,
};
use crate::util::SeqGenerator;
@ -101,12 +104,10 @@ impl ChannelManager {
}
impl Channel {
fn recv_packet(&mut self) -> Poll<Bytes, ChannelError> {
let (cmd, packet) = match self.receiver.poll() {
Ok(Async::Ready(Some(t))) => t,
Ok(Async::Ready(None)) => return Err(ChannelError), // The channel has been closed.
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(()) => unreachable!(),
fn recv_packet(&mut self, cx: &mut Context<'_>) -> Poll<Result<Bytes, ChannelError>> {
let (cmd, packet) = match self.receiver.poll_next_unpin(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(o) => o.ok_or(ChannelError)?,
};
if cmd == 0xa {
@ -115,9 +116,9 @@ impl Channel {
self.state = ChannelState::Closed;
Err(ChannelError)
Poll::Ready(Err(ChannelError))
} else {
Ok(Async::Ready(packet))
Poll::Ready(Ok(packet))
}
}
@ -129,16 +130,19 @@ impl Channel {
}
impl Stream for Channel {
type Item = ChannelEvent;
type Error = ChannelError;
type Item = Result<ChannelEvent, ChannelError>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match self.state.clone() {
ChannelState::Closed => panic!("Polling already terminated channel"),
ChannelState::Header(mut data) => {
if data.len() == 0 {
data = try_ready!(self.recv_packet());
data = match self.recv_packet(cx) {
Poll::Ready(Ok(x)) => x,
Poll::Ready(Err(x)) => return Poll::Ready(Some(Err(x))),
Poll::Pending => return Poll::Pending,
};
}
let length = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
@ -152,19 +156,23 @@ impl Stream for Channel {
self.state = ChannelState::Header(data);
let event = ChannelEvent::Header(header_id, header_data);
return Ok(Async::Ready(Some(event)));
return Poll::Ready(Some(Ok(event)));
}
}
ChannelState::Data => {
let data = try_ready!(self.recv_packet());
let data = match self.recv_packet(cx) {
Poll::Ready(Ok(x)) => x,
Poll::Ready(Err(x)) => return Poll::Ready(Some(Err(x))),
Poll::Pending => return Poll::Pending,
};
if data.len() == 0 {
self.receiver.close();
self.state = ChannelState::Closed;
return Ok(Async::Ready(None));
return Poll::Ready(None);
} else {
let event = ChannelEvent::Data(data);
return Ok(Async::Ready(Some(event)));
return Poll::Ready(Some(Ok(event)));
}
}
}
@ -173,38 +181,45 @@ impl Stream for Channel {
}
impl Stream for ChannelData {
type Item = Bytes;
type Error = ChannelError;
type Item = Result<Bytes, ChannelError>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let mut channel = match self.0.poll_lock() {
Async::Ready(c) => c,
Async::NotReady => return Ok(Async::NotReady),
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut channel = match self.0.poll_lock(cx) {
Poll::Ready(c) => c,
Poll::Pending => return Poll::Pending,
};
loop {
match try_ready!(channel.poll()) {
let x = match channel.poll_next_unpin(cx) {
Poll::Ready(x) => x.transpose()?,
Poll::Pending => return Poll::Pending,
};
match x {
Some(ChannelEvent::Header(..)) => (),
Some(ChannelEvent::Data(data)) => return Ok(Async::Ready(Some(data))),
None => return Ok(Async::Ready(None)),
Some(ChannelEvent::Data(data)) => return Poll::Ready(Some(Ok(data))),
None => return Poll::Ready(None),
}
}
}
}
impl Stream for ChannelHeaders {
type Item = (u8, Vec<u8>);
type Error = ChannelError;
type Item = Result<(u8, Vec<u8>), ChannelError>;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let mut channel = match self.0.poll_lock() {
Async::Ready(c) => c,
Async::NotReady => return Ok(Async::NotReady),
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut channel = match self.0.poll_lock(cx) {
Poll::Ready(c) => c,
Poll::Pending => return Poll::Pending,
};
match try_ready!(channel.poll()) {
Some(ChannelEvent::Header(id, data)) => Ok(Async::Ready(Some((id, data)))),
Some(ChannelEvent::Data(..)) | None => Ok(Async::Ready(None)),
let x = match channel.poll_next_unpin(cx) {
Poll::Ready(x) => x.transpose()?,
Poll::Pending => return Poll::Pending,
};
match x {
Some(ChannelEvent::Header(id, data)) => Poll::Ready(Some(Ok((id, data)))),
Some(ChannelEvent::Data(..)) | None => Poll::Ready(None),
}
}
}

View file

@ -35,29 +35,3 @@ macro_rules! component {
}
}
}
use std::cell::UnsafeCell;
use std::sync::Mutex;
pub(crate) struct Lazy<T>(Mutex<bool>, UnsafeCell<Option<T>>);
unsafe impl<T: Sync> Sync for Lazy<T> {}
unsafe impl<T: Send> Send for Lazy<T> {}
#[cfg_attr(feature = "cargo-clippy", allow(mutex_atomic))]
impl<T> Lazy<T> {
pub(crate) fn new() -> Lazy<T> {
Lazy(Mutex::new(false), UnsafeCell::new(None))
}
pub(crate) fn get<F: FnOnce() -> T>(&self, f: F) -> &T {
let mut inner = self.0.lock().unwrap();
if !*inner {
unsafe {
*self.1.get() = Some(f());
}
*inner = true;
}
unsafe { &*self.1.get() }.as_ref().unwrap()
}
}

View file

@ -2,7 +2,7 @@ use byteorder::{BigEndian, ByteOrder};
use bytes::{BufMut, Bytes, BytesMut};
use shannon::Shannon;
use std::io;
use tokio_io::codec::{Decoder, Encoder};
use tokio_util::codec::{Decoder, Encoder};
const HEADER_SIZE: usize = 3;
const MAC_SIZE: usize = 4;
@ -35,8 +35,7 @@ impl APCodec {
}
}
impl Encoder for APCodec {
type Item = (u8, Vec<u8>);
impl Encoder<(u8, Vec<u8>)> for APCodec {
type Error = io::Error;
fn encode(&mut self, item: (u8, Vec<u8>), buf: &mut BytesMut) -> io::Result<()> {
@ -45,7 +44,7 @@ impl Encoder for APCodec {
buf.reserve(3 + payload.len());
buf.put_u8(cmd);
buf.put_u16_be(payload.len() as u16);
buf.put_u16(payload.len() as u16);
buf.extend_from_slice(&payload);
self.encode_cipher.nonce_u32(self.encode_nonce);

View file

@ -1,14 +1,11 @@
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use futures::{Async, Future, Poll};
use hmac::{Hmac, Mac};
use protobuf::{self, Message};
use rand::thread_rng;
use sha1::Sha1;
use std::io::{self, Read};
use std::marker::PhantomData;
use tokio_codec::{Decoder, Framed};
use tokio_io::io::{read_exact, write_all, ReadExact, Window, WriteAll};
use tokio_io::{AsyncRead, AsyncWrite};
use std::io;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio_util::codec::{Decoder, Framed};
use super::codec::APCodec;
use crate::diffie_hellman::DHLocalKeys;
@ -16,72 +13,33 @@ use crate::protocol;
use crate::protocol::keyexchange::{APResponseMessage, ClientHello, ClientResponsePlaintext};
use crate::util;
pub struct Handshake<T> {
keys: DHLocalKeys,
state: HandshakeState<T>,
}
enum HandshakeState<T> {
ClientHello(WriteAll<T, Vec<u8>>),
APResponse(RecvPacket<T, APResponseMessage>),
ClientResponse(Option<APCodec>, WriteAll<T, Vec<u8>>),
}
pub fn handshake<T: AsyncRead + AsyncWrite>(connection: T) -> Handshake<T> {
pub async fn handshake<T: AsyncRead + AsyncWrite + Unpin>(
mut connection: T,
) -> io::Result<Framed<T, APCodec>> {
let local_keys = DHLocalKeys::random(&mut thread_rng());
let client_hello = client_hello(connection, local_keys.public_key());
let gc = local_keys.public_key();
let mut accumulator = client_hello(&mut connection, gc).await?;
let message: APResponseMessage = recv_packet(&mut connection, &mut accumulator).await?;
let remote_key = message
.get_challenge()
.get_login_crypto_challenge()
.get_diffie_hellman()
.get_gs()
.to_owned();
Handshake {
keys: local_keys,
state: HandshakeState::ClientHello(client_hello),
}
let shared_secret = local_keys.shared_secret(&remote_key);
let (challenge, send_key, recv_key) = compute_keys(&shared_secret, &accumulator);
let codec = APCodec::new(&send_key, &recv_key);
client_response(&mut connection, challenge).await?;
Ok(codec.framed(connection))
}
impl<T: AsyncRead + AsyncWrite> Future for Handshake<T> {
type Item = Framed<T, APCodec>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, io::Error> {
use self::HandshakeState::*;
loop {
self.state = match self.state {
ClientHello(ref mut write) => {
let (connection, accumulator) = try_ready!(write.poll());
let read = recv_packet(connection, accumulator);
APResponse(read)
}
APResponse(ref mut read) => {
let (connection, message, accumulator) = try_ready!(read.poll());
let remote_key = message
.get_challenge()
.get_login_crypto_challenge()
.get_diffie_hellman()
.get_gs()
.to_owned();
let shared_secret = self.keys.shared_secret(&remote_key);
let (challenge, send_key, recv_key) =
compute_keys(&shared_secret, &accumulator);
let codec = APCodec::new(&send_key, &recv_key);
let write = client_response(connection, challenge);
ClientResponse(Some(codec), write)
}
ClientResponse(ref mut codec, ref mut write) => {
let (connection, _) = try_ready!(write.poll());
let codec = codec.take().unwrap();
let framed = codec.framed(connection);
return Ok(Async::Ready(framed));
}
}
}
}
}
fn client_hello<T: AsyncWrite>(connection: T, gc: Vec<u8>) -> WriteAll<T, Vec<u8>> {
async fn client_hello<T>(connection: &mut T, gc: Vec<u8>) -> io::Result<Vec<u8>>
where
T: AsyncWrite + Unpin,
{
let mut packet = ClientHello::new();
packet
.mut_build_info()
@ -106,13 +64,17 @@ fn client_hello<T: AsyncWrite>(connection: T, gc: Vec<u8>) -> WriteAll<T, Vec<u8
let mut buffer = vec![0, 4];
let size = 2 + 4 + packet.compute_size();
buffer.write_u32::<BigEndian>(size).unwrap();
<Vec<u8> as WriteBytesExt>::write_u32::<BigEndian>(&mut buffer, size).unwrap();
packet.write_to_vec(&mut buffer).unwrap();
write_all(connection, buffer)
connection.write_all(&buffer[..]).await?;
Ok(buffer)
}
fn client_response<T: AsyncWrite>(connection: T, challenge: Vec<u8>) -> WriteAll<T, Vec<u8>> {
async fn client_response<T>(connection: &mut T, challenge: Vec<u8>) -> io::Result<()>
where
T: AsyncWrite + Unpin,
{
let mut packet = ClientResponsePlaintext::new();
packet
.mut_login_crypto_response()
@ -123,70 +85,35 @@ fn client_response<T: AsyncWrite>(connection: T, challenge: Vec<u8>) -> WriteAll
let mut buffer = vec![];
let size = 4 + packet.compute_size();
buffer.write_u32::<BigEndian>(size).unwrap();
<Vec<u8> as WriteBytesExt>::write_u32::<BigEndian>(&mut buffer, size).unwrap();
packet.write_to_vec(&mut buffer).unwrap();
write_all(connection, buffer)
connection.write_all(&buffer[..]).await?;
Ok(())
}
enum RecvPacket<T, M: Message> {
Header(ReadExact<T, Window<Vec<u8>>>, PhantomData<M>),
Body(ReadExact<T, Window<Vec<u8>>>, PhantomData<M>),
}
fn recv_packet<T: AsyncRead, M>(connection: T, acc: Vec<u8>) -> RecvPacket<T, M>
async fn recv_packet<T, M>(connection: &mut T, acc: &mut Vec<u8>) -> io::Result<M>
where
T: Read,
T: AsyncRead + Unpin,
M: Message,
{
RecvPacket::Header(read_into_accumulator(connection, 4, acc), PhantomData)
let header = read_into_accumulator(connection, 4, acc).await?;
let size = BigEndian::read_u32(header) as usize;
let data = read_into_accumulator(connection, size - 4, acc).await?;
let message = protobuf::parse_from_bytes(data).unwrap();
Ok(message)
}
impl<T: AsyncRead, M> Future for RecvPacket<T, M>
where
T: Read,
M: Message,
{
type Item = (T, M, Vec<u8>);
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, io::Error> {
use self::RecvPacket::*;
loop {
*self = match *self {
Header(ref mut read, _) => {
let (connection, header) = try_ready!(read.poll());
let size = BigEndian::read_u32(header.as_ref()) as usize;
let acc = header.into_inner();
let read = read_into_accumulator(connection, size - 4, acc);
RecvPacket::Body(read, PhantomData)
}
Body(ref mut read, _) => {
let (connection, data) = try_ready!(read.poll());
let message = protobuf::parse_from_bytes(data.as_ref()).unwrap();
let acc = data.into_inner();
return Ok(Async::Ready((connection, message, acc)));
}
}
}
}
}
fn read_into_accumulator<T: AsyncRead>(
connection: T,
async fn read_into_accumulator<'a, T: AsyncRead + Unpin>(
connection: &mut T,
size: usize,
mut acc: Vec<u8>,
) -> ReadExact<T, Window<Vec<u8>>> {
acc: &'a mut Vec<u8>,
) -> io::Result<&'a mut [u8]> {
let offset = acc.len();
acc.resize(offset + size, 0);
let mut window = Window::new(acc);
window.set_start(offset);
read_exact(connection, window)
connection.read_exact(&mut acc[offset..]).await?;
Ok(&mut acc[offset..])
}
fn compute_keys(shared_secret: &[u8], packets: &[u8]) -> (Vec<u8>, Vec<u8>, Vec<u8>) {

View file

@ -4,13 +4,12 @@ mod handshake;
pub use self::codec::APCodec;
pub use self::handshake::handshake;
use futures::{Future, Sink, Stream};
use futures::{SinkExt, StreamExt};
use protobuf::{self, Message};
use std::io;
use std::net::ToSocketAddrs;
use tokio_codec::Framed;
use tokio_core::net::TcpStream;
use tokio_core::reactor::Handle;
use tokio::net::TcpStream;
use tokio_util::codec::Framed;
use url::Url;
use crate::authentication::Credentials;
@ -20,53 +19,36 @@ use crate::proxytunnel;
pub type Transport = Framed<TcpStream, APCodec>;
pub fn connect(
addr: String,
handle: &Handle,
proxy: &Option<Url>,
) -> Box<dyn Future<Item = Transport, Error = io::Error>> {
let (addr, connect_url) = match *proxy {
Some(ref url) => {
info!("Using proxy \"{}\"", url);
match url.to_socket_addrs().and_then(|mut iter| {
iter.next().ok_or(io::Error::new(
pub async fn connect(addr: String, proxy: &Option<Url>) -> io::Result<Transport> {
let socket = if let Some(proxy) = proxy {
info!("Using proxy \"{}\"", proxy);
let socket_addr = proxy.to_socket_addrs().and_then(|mut iter| {
iter.next().ok_or_else(|| {
io::Error::new(
io::ErrorKind::NotFound,
"Can't resolve proxy server address",
))
}) {
Ok(socket_addr) => (socket_addr, Some(addr)),
Err(error) => return Box::new(futures::future::err(error)),
}
}
None => {
match addr.to_socket_addrs().and_then(|mut iter| {
iter.next().ok_or(io::Error::new(
io::ErrorKind::NotFound,
"Can't resolve server address",
))
}) {
Ok(socket_addr) => (socket_addr, None),
Err(error) => return Box::new(futures::future::err(error)),
}
}
)
})
})?;
let socket = TcpStream::connect(&socket_addr).await?;
proxytunnel::connect(socket, &addr).await?
} else {
let socket_addr = addr.to_socket_addrs().and_then(|mut iter| {
iter.next().ok_or_else(|| {
io::Error::new(io::ErrorKind::NotFound, "Can't resolve server address")
})
})?;
TcpStream::connect(&socket_addr).await?
};
let socket = TcpStream::connect(&addr, handle);
if let Some(connect_url) = connect_url {
let connection = socket
.and_then(move |socket| proxytunnel::connect(socket, &connect_url).and_then(handshake));
Box::new(connection)
} else {
let connection = socket.and_then(handshake);
Box::new(connection)
}
handshake(socket).await
}
pub fn authenticate(
transport: Transport,
pub async fn authenticate(
transport: &mut Transport,
credentials: Credentials,
device_id: String,
) -> Box<dyn Future<Item = (Transport, Credentials), Error = io::Error>> {
device_id: &str,
) -> io::Result<Credentials> {
use crate::protocol::authentication::{APWelcome, ClientResponseEncrypted, CpuFamily, Os};
use crate::protocol::keyexchange::APLoginFailed;
@ -91,41 +73,37 @@ pub fn authenticate(
version::short_sha(),
version::build_id()
));
packet.mut_system_info().set_device_id(device_id);
packet
.mut_system_info()
.set_device_id(device_id.to_string());
packet.set_version_string(version::version_string());
let cmd = 0xab;
let data = packet.write_to_bytes().unwrap();
Box::new(
transport
.send((cmd, data))
.and_then(|transport| transport.into_future().map_err(|(err, _stream)| err))
.and_then(|(packet, transport)| match packet {
Some((0xac, data)) => {
let welcome_data: APWelcome =
protobuf::parse_from_bytes(data.as_ref()).unwrap();
transport.send((cmd, data)).await?;
let (cmd, data) = transport.next().await.expect("EOF")?;
match cmd {
0xac => {
let welcome_data: APWelcome = protobuf::parse_from_bytes(data.as_ref()).unwrap();
let reusable_credentials = Credentials {
username: welcome_data.get_canonical_username().to_owned(),
auth_type: welcome_data.get_reusable_auth_credentials_type(),
auth_data: welcome_data.get_reusable_auth_credentials().to_owned(),
};
let reusable_credentials = Credentials {
username: welcome_data.get_canonical_username().to_owned(),
auth_type: welcome_data.get_reusable_auth_credentials_type(),
auth_data: welcome_data.get_reusable_auth_credentials().to_owned(),
};
Ok((transport, reusable_credentials))
}
Ok(reusable_credentials)
}
Some((0xad, data)) => {
let error_data: APLoginFailed =
protobuf::parse_from_bytes(data.as_ref()).unwrap();
panic!(
"Authentication failed with reason: {:?}",
error_data.get_error_code()
)
}
0xad => {
let error_data: APLoginFailed = protobuf::parse_from_bytes(data.as_ref()).unwrap();
panic!(
"Authentication failed with reason: {:?}",
error_data.get_error_code()
)
}
Some((cmd, _)) => panic!("Unexpected packet {:?}", cmd),
None => panic!("EOF"),
}),
)
_ => panic!("Unexpected packet {:?}", cmd),
}
}

View file

@ -1,12 +1,12 @@
use num_bigint::BigUint;
use num_traits::FromPrimitive;
use once_cell::sync::Lazy;
use rand::Rng;
use crate::util;
lazy_static! {
pub static ref DH_GENERATOR: BigUint = BigUint::from_u64(0x2).unwrap();
pub static ref DH_PRIME: BigUint = BigUint::from_bytes_be(&[
pub static DH_GENERATOR: Lazy<BigUint> = Lazy::new(|| BigUint::from_bytes_be(&[0x02]));
pub static DH_PRIME: Lazy<BigUint> = Lazy::new(|| {
BigUint::from_bytes_be(&[
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xc9, 0x0f, 0xda, 0xa2, 0x21, 0x68, 0xc2,
0x34, 0xc4, 0xc6, 0x62, 0x8b, 0x80, 0xdc, 0x1c, 0xd1, 0x29, 0x02, 0x4e, 0x08, 0x8a, 0x67,
0xcc, 0x74, 0x02, 0x0b, 0xbe, 0xa6, 0x3b, 0x13, 0x9b, 0x22, 0x51, 0x4a, 0x08, 0x79, 0x8e,
@ -14,8 +14,8 @@ lazy_static! {
0xf2, 0x5f, 0x14, 0x37, 0x4f, 0xe1, 0x35, 0x6d, 0x6d, 0x51, 0xc2, 0x45, 0xe4, 0x85, 0xb5,
0x76, 0x62, 0x5e, 0x7e, 0xc6, 0xf4, 0x4c, 0x42, 0xe9, 0xa6, 0x3a, 0x36, 0x20, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
]);
}
])
});
pub struct DHLocalKeys {
private_key: BigUint,

View file

@ -1,8 +1,4 @@
use futures::Future;
use serde_json;
use crate::mercury::MercuryError;
use crate::session::Session;
use crate::{mercury::MercuryError, session::Session};
#[derive(Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")]
@ -13,20 +9,16 @@ pub struct Token {
pub scope: Vec<String>,
}
pub fn get_token(
pub async fn get_token(
session: &Session,
client_id: &str,
scopes: &str,
) -> Box<dyn Future<Item = Token, Error = MercuryError>> {
) -> Result<Token, MercuryError> {
let url = format!(
"hm://keymaster/token/authenticated?client_id={}&scope={}",
client_id, scopes
);
Box::new(session.mercury().get(url).map(move |response| {
let data = response.payload.first().expect("Empty payload");
let data = String::from_utf8(data.clone()).unwrap();
let token: Token = serde_json::from_str(&data).unwrap();
token
}))
let response = session.mercury().get(url).await?;
let data = response.payload.first().expect("Empty payload");
serde_json::from_slice(data.as_ref()).map_err(|_| MercuryError)
}

View file

@ -1,27 +1,23 @@
#![cfg_attr(feature = "cargo-clippy", allow(unused_io_amount))]
#![allow(clippy::unused_io_amount)]
#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate futures;
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate log;
#[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate pin_project_lite;
extern crate aes;
extern crate base64;
extern crate byteorder;
extern crate bytes;
extern crate futures;
extern crate hmac;
extern crate httparse;
extern crate hyper;
extern crate hyper_proxy;
extern crate num_bigint;
extern crate num_integer;
extern crate num_traits;
extern crate once_cell;
extern crate pbkdf2;
extern crate protobuf;
extern crate rand;
@ -29,9 +25,8 @@ extern crate serde;
extern crate serde_json;
extern crate sha1;
extern crate shannon;
extern crate tokio_codec;
extern crate tokio_core;
extern crate tokio_io;
pub extern crate tokio;
extern crate tokio_util;
extern crate url;
extern crate uuid;
@ -39,13 +34,14 @@ extern crate librespot_protocol as protocol;
#[macro_use]
mod component;
mod apresolve;
pub mod apresolve;
pub mod audio_key;
pub mod authentication;
pub mod cache;
pub mod channel;
pub mod config;
mod connection;
pub mod connection;
pub mod diffie_hellman;
pub mod keymaster;
pub mod mercury;

View file

@ -1,14 +1,14 @@
use crate::protocol;
use crate::util::url_encode;
use crate::util::SeqGenerator;
use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes;
use futures::sync::{mpsc, oneshot};
use futures::{Async, Future, Poll};
use protobuf;
use std::collections::HashMap;
use std::mem;
use crate::util::SeqGenerator;
use futures::{
channel::{mpsc, oneshot},
Future,
};
use std::{collections::HashMap, task::Poll};
use std::{mem, pin::Pin, task::Context};
mod types;
pub use self::types::*;
@ -31,17 +31,21 @@ pub struct MercuryPending {
callback: Option<oneshot::Sender<Result<MercuryResponse, MercuryError>>>,
}
pub struct MercuryFuture<T>(oneshot::Receiver<Result<T, MercuryError>>);
impl<T> Future for MercuryFuture<T> {
type Item = T;
type Error = MercuryError;
pin_project! {
pub struct MercuryFuture<T> {
#[pin]
receiver: oneshot::Receiver<Result<T, MercuryError>>
}
}
fn poll(&mut self) -> Poll<T, MercuryError> {
match self.0.poll() {
Ok(Async::Ready(Ok(value))) => Ok(Async::Ready(value)),
Ok(Async::Ready(Err(err))) => Err(err),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(oneshot::Canceled) => Err(MercuryError),
impl<T> Future for MercuryFuture<T> {
type Output = Result<T, MercuryError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project().receiver.poll(cx) {
Poll::Ready(Ok(x)) => Poll::Ready(x),
Poll::Ready(Err(_)) => Poll::Ready(Err(MercuryError)),
Poll::Pending => Poll::Pending,
}
}
}
@ -73,7 +77,7 @@ impl MercuryManager {
let data = req.encode(&seq);
self.session().send_packet(cmd, data);
MercuryFuture(rx)
MercuryFuture { receiver: rx }
}
pub fn get<T: Into<String>>(&self, uri: T) -> MercuryFuture<MercuryResponse> {
@ -98,46 +102,46 @@ impl MercuryManager {
MercurySender::new(self.clone(), uri.into())
}
pub fn subscribe<T: Into<String>>(
pub async fn subscribe<T: Into<String>>(
&self,
uri: T,
) -> Box<dyn Future<Item = mpsc::UnboundedReceiver<MercuryResponse>, Error = MercuryError>>
{
) -> Result<mpsc::UnboundedReceiver<MercuryResponse>, MercuryError> {
let uri = uri.into();
let request = self.request(MercuryRequest {
method: MercuryMethod::SUB,
uri: uri.clone(),
content_type: None,
payload: Vec::new(),
});
let response = self
.request(MercuryRequest {
method: MercuryMethod::SUB,
uri: uri.clone(),
content_type: None,
payload: Vec::new(),
})
.await?;
let (tx, rx) = mpsc::unbounded();
let manager = self.clone();
Box::new(request.map(move |response| {
let (tx, rx) = mpsc::unbounded();
manager.lock(move |inner| {
if !inner.invalid {
debug!("subscribed uri={} count={}", uri, response.payload.len());
if response.payload.len() > 0 {
// Old subscription protocol, watch the provided list of URIs
for sub in response.payload {
let mut sub: protocol::pubsub::Subscription =
protobuf::parse_from_bytes(&sub).unwrap();
let sub_uri = sub.take_uri();
manager.lock(move |inner| {
if !inner.invalid {
debug!("subscribed uri={} count={}", uri, response.payload.len());
if !response.payload.is_empty() {
// Old subscription protocol, watch the provided list of URIs
for sub in response.payload {
let mut sub: protocol::pubsub::Subscription =
protobuf::parse_from_bytes(&sub).unwrap();
let sub_uri = sub.take_uri();
debug!("subscribed sub_uri={}", sub_uri);
debug!("subscribed sub_uri={}", sub_uri);
inner.subscriptions.push((sub_uri, tx.clone()));
}
} else {
// New subscription protocol, watch the requested URI
inner.subscriptions.push((uri, tx));
inner.subscriptions.push((sub_uri, tx.clone()));
}
} else {
// New subscription protocol, watch the requested URI
inner.subscriptions.push((uri, tx));
}
});
}
});
rx
}))
Ok(rx)
}
pub(crate) fn dispatch(&self, cmd: u8, mut data: Bytes) {
@ -193,7 +197,7 @@ impl MercuryManager {
let header: protocol::mercury::Header = protobuf::parse_from_bytes(&header_data).unwrap();
let response = MercuryResponse {
uri: url_encode(header.get_uri()).to_owned(),
uri: url_encode(header.get_uri()),
status_code: header.get_status_code(),
payload: pending.parts,
};

View file

@ -1,5 +1,5 @@
use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend};
use std::collections::VecDeque;
use futures::Sink;
use std::{collections::VecDeque, pin::Pin, task::Context};
use super::*;
@ -30,27 +30,38 @@ impl Clone for MercurySender {
}
}
impl Sink for MercurySender {
type SinkItem = Vec<u8>;
type SinkError = MercuryError;
impl Sink<Vec<u8>> for MercurySender {
type Error = MercuryError;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
let task = self.mercury.send(self.uri.clone(), item);
self.pending.push_back(task);
Ok(AsyncSink::Ready)
fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_flush(cx)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {
match self.pending.front_mut() {
Some(task) => {
try_ready!(task.poll());
match Pin::new(task).poll(cx) {
Poll::Ready(Err(x)) => return Poll::Ready(Err(x)),
Poll::Pending => return Poll::Pending,
_ => (),
};
}
None => {
return Ok(Async::Ready(()));
return Poll::Ready(Ok(()));
}
}
self.pending.pop_front();
}
}
fn start_send(mut self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
let task = self.mercury.send(self.uri.clone(), item);
self.pending.push_back(task);
Ok(())
}
}

View file

@ -1,110 +1,45 @@
use std::io;
use std::str::FromStr;
use futures::{Async, Future, Poll};
use httparse;
use hyper::Uri;
use tokio_io::io::{read, write_all, Read, Window, WriteAll};
use tokio_io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
pub struct ProxyTunnel<T> {
state: ProxyState<T>,
}
enum ProxyState<T> {
ProxyConnect(WriteAll<T, Vec<u8>>),
ProxyResponse(Read<T, Window<Vec<u8>>>),
}
pub fn connect<T: AsyncRead + AsyncWrite>(connection: T, connect_url: &str) -> ProxyTunnel<T> {
let proxy = proxy_connect(connection, connect_url);
ProxyTunnel {
state: ProxyState::ProxyConnect(proxy),
}
}
impl<T: AsyncRead + AsyncWrite> Future for ProxyTunnel<T> {
type Item = T;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, io::Error> {
use self::ProxyState::*;
loop {
self.state = match self.state {
ProxyConnect(ref mut write) => {
let (connection, mut accumulator) = try_ready!(write.poll());
let capacity = accumulator.capacity();
accumulator.resize(capacity, 0);
let window = Window::new(accumulator);
let read = read(connection, window);
ProxyResponse(read)
}
ProxyResponse(ref mut read_f) => {
let (connection, mut window, bytes_read) = try_ready!(read_f.poll());
if bytes_read == 0 {
return Err(io::Error::new(io::ErrorKind::Other, "Early EOF from proxy"));
}
let data_end = window.start() + bytes_read;
let buf = window.get_ref()[0..data_end].to_vec();
let mut headers = [httparse::EMPTY_HEADER; 16];
let mut response = httparse::Response::new(&mut headers);
let status = match response.parse(&buf) {
Ok(status) => status,
Err(err) => {
return Err(io::Error::new(io::ErrorKind::Other, err.to_string()));
}
};
if status.is_complete() {
if let Some(code) = response.code {
if code == 200 {
// Proxy says all is well
return Ok(Async::Ready(connection));
} else {
let reason = response.reason.unwrap_or("no reason");
let msg = format!("Proxy responded with {}: {}", code, reason);
return Err(io::Error::new(io::ErrorKind::Other, msg));
}
} else {
return Err(io::Error::new(
io::ErrorKind::Other,
"Malformed response from proxy",
));
}
} else {
if data_end >= window.end() {
// Allocate some more buffer space
let newsize = data_end + 100;
window.get_mut().resize(newsize, 0);
window.set_end(newsize);
}
// We did not get a full header
window.set_start(data_end);
let read = read(connection, window);
ProxyResponse(read)
}
}
}
}
}
}
fn proxy_connect<T: AsyncWrite>(connection: T, connect_url: &str) -> WriteAll<T, Vec<u8>> {
let uri = Uri::from_str(connect_url).unwrap();
let buffer = format!(
pub async fn connect<T: AsyncRead + AsyncWrite + Unpin>(
mut connection: T,
connect_url: &str,
) -> io::Result<T> {
let uri = connect_url.parse::<Uri>().unwrap();
let mut buffer = format!(
"CONNECT {0}:{1} HTTP/1.1\r\n\
\r\n",
uri.host().expect(&format!("No host in {}", uri)),
uri.port().expect(&format!("No port in {}", uri))
uri.host().unwrap_or_else(|| panic!("No host in {}", uri)),
uri.port().unwrap_or_else(|| panic!("No port in {}", uri))
)
.into_bytes();
connection.write_all(buffer.as_ref()).await?;
write_all(connection, buffer)
buffer.clear();
connection.read_to_end(&mut buffer).await?;
if buffer.is_empty() {
return Err(io::Error::new(io::ErrorKind::Other, "Early EOF from proxy"));
}
let mut headers = [httparse::EMPTY_HEADER; 16];
let mut response = httparse::Response::new(&mut headers);
response
.parse(&buffer[..])
.map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
match response.code {
Some(200) => Ok(connection), // Proxy says all is well
Some(code) => {
let reason = response.reason.unwrap_or("no reason");
let msg = format!("Proxy responded with {}: {}", code, reason);
Err(io::Error::new(io::ErrorKind::Other, msg))
}
None => Err(io::Error::new(
io::ErrorKind::Other,
"Malformed response from proxy",
)),
}
}

View file

@ -1,20 +1,20 @@
use std::io;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock, Weak};
use std::task::Poll;
use std::time::{SystemTime, UNIX_EPOCH};
use std::{io, pin::Pin, task::Context};
use once_cell::sync::OnceCell;
use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes;
use futures::sync::mpsc;
use futures::{Async, Future, IntoFuture, Poll, Stream};
use tokio_core::reactor::{Handle, Remote};
use futures::{channel::mpsc, Future, FutureExt, StreamExt, TryStream, TryStreamExt};
use crate::apresolve::apresolve_or_fallback;
use crate::audio_key::AudioKeyManager;
use crate::authentication::Credentials;
use crate::cache::Cache;
use crate::channel::ChannelManager;
use crate::component::Lazy;
use crate::config::SessionConfig;
use crate::connection;
use crate::mercury::MercuryManager;
@ -32,13 +32,11 @@ struct SessionInternal {
tx_connection: mpsc::UnboundedSender<(u8, Vec<u8>)>,
audio_key: Lazy<AudioKeyManager>,
channel: Lazy<ChannelManager>,
mercury: Lazy<MercuryManager>,
audio_key: OnceCell<AudioKeyManager>,
channel: OnceCell<ChannelManager>,
mercury: OnceCell<MercuryManager>,
cache: Option<Arc<Cache>>,
handle: Remote,
session_id: usize,
}
@ -48,58 +46,34 @@ static SESSION_COUNTER: AtomicUsize = AtomicUsize::new(0);
pub struct Session(Arc<SessionInternal>);
impl Session {
pub fn connect(
pub async fn connect(
config: SessionConfig,
credentials: Credentials,
cache: Option<Cache>,
handle: Handle,
) -> Box<dyn Future<Item = Session, Error = io::Error>> {
let access_point =
apresolve_or_fallback::<io::Error>(&handle, &config.proxy, &config.ap_port);
) -> io::Result<Session> {
let ap = apresolve_or_fallback(&config.proxy, &config.ap_port).await;
let handle_ = handle.clone();
let proxy = config.proxy.clone();
let connection = access_point.and_then(move |addr| {
info!("Connecting to AP \"{}\"", addr);
connection::connect(addr, &handle_, &proxy)
});
info!("Connecting to AP \"{}\"", ap);
let mut conn = connection::connect(ap, &config.proxy).await?;
let device_id = config.device_id.clone();
let authentication = connection.and_then(move |connection| {
connection::authenticate(connection, credentials, device_id)
});
let reusable_credentials =
connection::authenticate(&mut conn, credentials, &config.device_id).await?;
info!("Authenticated as \"{}\" !", reusable_credentials.username);
if let Some(cache) = &cache {
cache.save_credentials(&reusable_credentials);
}
let result = authentication.map(move |(transport, reusable_credentials)| {
info!("Authenticated as \"{}\" !", reusable_credentials.username);
if let Some(ref cache) = cache {
cache.save_credentials(&reusable_credentials);
}
let session = Session::create(conn, config, cache, reusable_credentials.username);
let (session, task) = Session::create(
&handle,
transport,
config,
cache,
reusable_credentials.username.clone(),
);
handle.spawn(task.map_err(|e| {
error!("{:?}", e);
}));
session
});
Box::new(result)
Ok(session)
}
fn create(
handle: &Handle,
transport: connection::Transport,
config: SessionConfig,
cache: Option<Cache>,
username: String,
) -> (Session, Box<dyn Future<Item = (), Error = io::Error>>) {
) -> Session {
let (sink, stream) = transport.split();
let (sender_tx, sender_rx) = mpsc::unbounded();
@ -120,53 +94,50 @@ impl Session {
cache: cache.map(Arc::new),
audio_key: Lazy::new(),
channel: Lazy::new(),
mercury: Lazy::new(),
handle: handle.remote().clone(),
audio_key: OnceCell::new(),
channel: OnceCell::new(),
mercury: OnceCell::new(),
session_id: session_id,
}));
let sender_task = sender_rx
.map_err(|e| -> io::Error { panic!(e) })
.forward(sink)
.map(|_| ());
let sender_task = sender_rx.map(Ok::<_, io::Error>).forward(sink);
let receiver_task = DispatchTask(stream, session.weak());
let task = Box::new(
(receiver_task, sender_task)
.into_future()
.map(|((), ())| ()),
);
(session, task)
let task =
futures::future::join(sender_task, receiver_task).map(|_| io::Result::<_>::Ok(()));
tokio::spawn(task);
session
}
pub fn audio_key(&self) -> &AudioKeyManager {
self.0.audio_key.get(|| AudioKeyManager::new(self.weak()))
self.0
.audio_key
.get_or_init(|| AudioKeyManager::new(self.weak()))
}
pub fn channel(&self) -> &ChannelManager {
self.0.channel.get(|| ChannelManager::new(self.weak()))
self.0
.channel
.get_or_init(|| ChannelManager::new(self.weak()))
}
pub fn mercury(&self) -> &MercuryManager {
self.0.mercury.get(|| MercuryManager::new(self.weak()))
self.0
.mercury
.get_or_init(|| MercuryManager::new(self.weak()))
}
pub fn time_delta(&self) -> i64 {
self.0.data.read().unwrap().time_delta
}
pub fn spawn<F, R>(&self, f: F)
pub fn spawn<T>(&self, task: T)
where
F: FnOnce(&Handle) -> R + Send + 'static,
R: IntoFuture<Item = (), Error = ()>,
R::Future: 'static,
T: Future + Send + 'static,
T::Output: Send + 'static,
{
self.0.handle.spawn(f)
tokio::spawn(task);
}
fn debug_info(&self) {
@ -178,7 +149,7 @@ impl Session {
);
}
#[cfg_attr(feature = "cargo-clippy", allow(match_same_arms))]
#[allow(clippy::match_same_arms)]
fn dispatch(&self, cmd: u8, data: Bytes) {
match cmd {
0x4 => {
@ -273,35 +244,34 @@ impl Drop for SessionInternal {
struct DispatchTask<S>(S, SessionWeak)
where
S: Stream<Item = (u8, Bytes)>;
S: TryStream<Ok = (u8, Bytes)> + Unpin;
impl<S> Future for DispatchTask<S>
where
S: Stream<Item = (u8, Bytes)>,
<S as Stream>::Error: ::std::fmt::Debug,
S: TryStream<Ok = (u8, Bytes)> + Unpin,
<S as TryStream>::Ok: std::fmt::Debug,
{
type Item = ();
type Error = S::Error;
type Output = Result<(), S::Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let session = match self.1.try_upgrade() {
Some(session) => session,
None => return Ok(Async::Ready(())),
None => return Poll::Ready(Ok(())),
};
loop {
let (cmd, data) = match self.0.poll() {
Ok(Async::Ready(Some(t))) => t,
Ok(Async::Ready(None)) => {
let (cmd, data) = match self.0.try_poll_next_unpin(cx) {
Poll::Ready(Some(Ok(t))) => t,
Poll::Ready(None) => {
warn!("Connection to server closed.");
session.shutdown();
return Ok(Async::Ready(()));
return Poll::Ready(Ok(()));
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => {
Poll::Ready(Some(Err(e))) => {
session.shutdown();
return Err(From::from(e));
return Poll::Ready(Err(e));
}
Poll::Pending => return Poll::Pending,
};
session.dispatch(cmd, data);
@ -311,7 +281,7 @@ where
impl<S> Drop for DispatchTask<S>
where
S: Stream<Item = (u8, Bytes)>,
S: TryStream<Ok = (u8, Bytes)> + Unpin,
{
fn drop(&mut self) {
debug!("drop Dispatch");

32
core/tests/connect.rs Normal file
View file

@ -0,0 +1,32 @@
use librespot_core::*;
#[cfg(test)]
mod tests {
use super::*;
// Test AP Resolve
use apresolve::apresolve_or_fallback;
#[tokio::test]
async fn test_ap_resolve() {
let ap = apresolve_or_fallback(&None, &None).await;
println!("AP: {:?}", ap);
}
// Test connect
use authentication::Credentials;
use config::SessionConfig;
#[tokio::test]
async fn test_connection() -> Result<(), Box<dyn std::error::Error>> {
println!("Running connection test");
let ap = apresolve_or_fallback(&None, &None).await;
let credentials = Credentials::with_password(String::from("test"), String::from("test"));
let session_config = SessionConfig::default();
let proxy = None;
println!("Connecting to AP \"{}\"", ap);
let mut connection = connection::connect(ap, &proxy).await?;
let rc = connection::authenticate(&mut connection, credentials, &session_config.device_id)
.await?;
println!("Authenticated as \"{}\"", rc.username);
Ok(())
}
}

View file

@ -8,8 +8,9 @@ repository = "https://github.com/librespot-org/librespot"
edition = "2018"
[dependencies]
async-trait = "0.1"
byteorder = "1.3"
futures = "0.1"
futures = "0.3"
linear-map = "1.2"
protobuf = "~2.14.0"
log = "0.4"

View file

@ -1,6 +1,11 @@
#![allow(clippy::unused_io_amount)]
#![allow(clippy::redundant_field_names)]
#[macro_use]
extern crate log;
#[macro_use]
extern crate async_trait;
extern crate byteorder;
extern crate futures;
extern crate linear_map;
@ -11,8 +16,6 @@ extern crate librespot_protocol as protocol;
pub mod cover;
use futures::future;
use futures::Future;
use linear_map::LinearMap;
use librespot_core::mercury::MercuryError;
@ -69,81 +72,67 @@ pub struct AudioItem {
}
impl AudioItem {
pub fn get_audio_item(
session: &Session,
id: SpotifyId,
) -> Box<dyn Future<Item = AudioItem, Error = MercuryError>> {
pub async fn get_audio_item(session: &Session, id: SpotifyId) -> Result<Self, MercuryError> {
match id.audio_type {
SpotifyAudioType::Track => Track::get_audio_item(session, id),
SpotifyAudioType::Podcast => Episode::get_audio_item(session, id),
SpotifyAudioType::NonPlayable => {
Box::new(future::err::<AudioItem, MercuryError>(MercuryError))
}
SpotifyAudioType::Track => Track::get_audio_item(session, id).await,
SpotifyAudioType::Podcast => Episode::get_audio_item(session, id).await,
SpotifyAudioType::NonPlayable => Err(MercuryError),
}
}
}
#[async_trait]
trait AudioFiles {
fn get_audio_item(
session: &Session,
id: SpotifyId,
) -> Box<dyn Future<Item = AudioItem, Error = MercuryError>>;
async fn get_audio_item(session: &Session, id: SpotifyId) -> Result<AudioItem, MercuryError>;
}
#[async_trait]
impl AudioFiles for Track {
fn get_audio_item(
session: &Session,
id: SpotifyId,
) -> Box<dyn Future<Item = AudioItem, Error = MercuryError>> {
Box::new(Self::get(session, id).and_then(move |item| {
Ok(AudioItem {
id: id,
uri: format!("spotify:track:{}", id.to_base62()),
files: item.files,
name: item.name,
duration: item.duration,
available: item.available,
alternatives: Some(item.alternatives),
})
}))
async fn get_audio_item(session: &Session, id: SpotifyId) -> Result<AudioItem, MercuryError> {
let item = Self::get(session, id).await?;
Ok(AudioItem {
id: id,
uri: format!("spotify:track:{}", id.to_base62()),
files: item.files,
name: item.name,
duration: item.duration,
available: item.available,
alternatives: Some(item.alternatives),
})
}
}
#[async_trait]
impl AudioFiles for Episode {
fn get_audio_item(
session: &Session,
id: SpotifyId,
) -> Box<dyn Future<Item = AudioItem, Error = MercuryError>> {
Box::new(Self::get(session, id).and_then(move |item| {
Ok(AudioItem {
id: id,
uri: format!("spotify:episode:{}", id.to_base62()),
files: item.files,
name: item.name,
duration: item.duration,
available: item.available,
alternatives: None,
})
}))
async fn get_audio_item(session: &Session, id: SpotifyId) -> Result<AudioItem, MercuryError> {
let item = Self::get(session, id).await?;
Ok(AudioItem {
id: id,
uri: format!("spotify:episode:{}", id.to_base62()),
files: item.files,
name: item.name,
duration: item.duration,
available: item.available,
alternatives: None,
})
}
}
#[async_trait]
pub trait Metadata: Send + Sized + 'static {
type Message: protobuf::Message;
fn request_url(id: SpotifyId) -> String;
fn parse(msg: &Self::Message, session: &Session) -> Self;
fn get(session: &Session, id: SpotifyId) -> Box<dyn Future<Item = Self, Error = MercuryError>> {
async fn get(session: &Session, id: SpotifyId) -> Result<Self, MercuryError> {
let uri = Self::request_url(id);
let request = session.mercury().get(uri);
let response = session.mercury().get(uri).await?;
let data = response.payload.first().expect("Empty payload");
let msg: Self::Message = protobuf::parse_from_bytes(data).unwrap();
let session = session.clone();
Box::new(request.and_then(move |response| {
let data = response.payload.first().expect("Empty payload");
let msg: Self::Message = protobuf::parse_from_bytes(data).unwrap();
Ok(Self::parse(&msg, &session))
}))
Ok(Self::parse(&msg, &session))
}
}

View file

@ -18,9 +18,9 @@ path = "../metadata"
version = "0.1.3"
[dependencies]
futures = "0.1"
futures = "0.3"
log = "0.4"
byteorder = "1.3"
byteorder = "1.4"
shell-words = "1.0.0"
alsa = { version = "0.2", optional = true }

View file

@ -10,7 +10,9 @@ pub trait Sink {
fn write(&mut self, data: &[i16]) -> io::Result<()>;
}
fn mk_sink<S: Sink + Open + 'static>(device: Option<String>) -> Box<dyn Sink> {
pub type SinkBuilder = fn(Option<String>) -> Box<dyn Sink + Send>;
fn mk_sink<S: Sink + Open + Send + 'static>(device: Option<String>) -> Box<dyn Sink + Send> {
Box::new(S::open(device))
}
@ -54,7 +56,7 @@ use self::pipe::StdoutSink;
mod subprocess;
use self::subprocess::SubprocessSink;
pub const BACKENDS: &'static [(&'static str, fn(Option<String>) -> Box<dyn Sink>)] = &[
pub const BACKENDS: &'static [(&'static str, SinkBuilder)] = &[
#[cfg(feature = "alsa-backend")]
("alsa", mk_sink::<AlsaSink>),
#[cfg(feature = "portaudio-backend")]
@ -73,7 +75,7 @@ pub const BACKENDS: &'static [(&'static str, fn(Option<String>) -> Box<dyn Sink>
("subprocess", mk_sink::<SubprocessSink>),
];
pub fn find(name: Option<String>) -> Option<fn(Option<String>) -> Box<dyn Sink>> {
pub fn find(name: Option<String>) -> Option<SinkBuilder> {
if let Some(name) = name {
BACKENDS
.iter()

View file

@ -4,7 +4,7 @@ use std::io::{self, Write};
use std::mem;
use std::slice;
pub struct StdoutSink(Box<dyn Write>);
pub struct StdoutSink(Box<dyn Write + Send>);
impl Open for StdoutSink {
fn open(path: Option<String>) -> StdoutSink {

View file

@ -1,20 +1,3 @@
use byteorder::{LittleEndian, ReadBytesExt};
use futures;
use futures::{future, Async, Future, Poll, Stream};
use std;
use std::borrow::Cow;
use std::cmp::max;
use std::io::{Read, Result, Seek, SeekFrom};
use std::mem;
use std::thread;
use std::time::{Duration, Instant};
use crate::config::{Bitrate, PlayerConfig};
use librespot_core::session::Session;
use librespot_core::spotify_id::SpotifyId;
use librespot_core::util::SeqGenerator;
use crate::audio::{AudioDecrypt, AudioFile, StreamLoaderController};
use crate::audio::{VorbisDecoder, VorbisPacket};
use crate::audio::{
@ -22,14 +5,34 @@ use crate::audio::{
READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS,
};
use crate::audio_backend::Sink;
use crate::config::{Bitrate, PlayerConfig};
use crate::librespot_core::tokio;
use crate::metadata::{AudioItem, FileFormat};
use crate::mixer::AudioFilter;
use librespot_core::session::Session;
use librespot_core::spotify_id::SpotifyId;
use librespot_core::util::SeqGenerator;
use byteorder::{LittleEndian, ReadBytesExt};
use futures::{
channel::{mpsc, oneshot},
future, Future, Stream, StreamExt,
};
use std::io::{Read, Seek, SeekFrom};
use std::mem;
use std::time::{Duration, Instant};
use std::{borrow::Cow, io};
use std::{
cmp::max,
pin::Pin,
task::{Context, Poll},
};
const PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS: u32 = 30000;
pub struct Player {
commands: Option<futures::sync::mpsc::UnboundedSender<PlayerCommand>>,
thread_handle: Option<thread::JoinHandle<()>>,
commands: Option<mpsc::UnboundedSender<PlayerCommand>>,
task_handle: Option<tokio::task::JoinHandle<()>>,
play_request_id_generator: SeqGenerator<u64>,
}
@ -45,15 +48,15 @@ pub type SinkEventCallback = Box<dyn Fn(SinkStatus) + Send>;
struct PlayerInternal {
session: Session,
config: PlayerConfig,
commands: futures::sync::mpsc::UnboundedReceiver<PlayerCommand>,
commands: mpsc::UnboundedReceiver<PlayerCommand>,
state: PlayerState,
preload: PlayerPreload,
sink: Box<dyn Sink>,
sink: Box<dyn Sink + Send>,
sink_status: SinkStatus,
sink_event_callback: Option<SinkEventCallback>,
audio_filter: Option<Box<dyn AudioFilter + Send>>,
event_senders: Vec<futures::sync::mpsc::UnboundedSender<PlayerEvent>>,
event_senders: Vec<mpsc::UnboundedSender<PlayerEvent>>,
}
enum PlayerCommand {
@ -70,7 +73,7 @@ enum PlayerCommand {
Pause,
Stop,
Seek(u32),
AddEventSender(futures::sync::mpsc::UnboundedSender<PlayerEvent>),
AddEventSender(mpsc::UnboundedSender<PlayerEvent>),
SetSinkEventCallback(Option<SinkEventCallback>),
EmitVolumeSetEvent(u16),
}
@ -182,7 +185,7 @@ impl PlayerEvent {
}
}
pub type PlayerEventChannel = futures::sync::mpsc::UnboundedReceiver<PlayerEvent>;
pub type PlayerEventChannel = mpsc::UnboundedReceiver<PlayerEvent>;
#[derive(Clone, Copy, Debug)]
struct NormalisationData {
@ -193,7 +196,7 @@ struct NormalisationData {
}
impl NormalisationData {
fn parse_from_file<T: Read + Seek>(mut file: T) -> Result<NormalisationData> {
fn parse_from_file<T: Read + Seek>(mut file: T) -> io::Result<NormalisationData> {
const SPOTIFY_NORMALIZATION_HEADER_START_OFFSET: u64 = 144;
file.seek(SeekFrom::Start(SPOTIFY_NORMALIZATION_HEADER_START_OFFSET))
.unwrap();
@ -239,38 +242,38 @@ impl Player {
sink_builder: F,
) -> (Player, PlayerEventChannel)
where
F: FnOnce() -> Box<dyn Sink> + Send + 'static,
F: FnOnce() -> Box<dyn Sink + Send> + Send + 'static,
{
let (cmd_tx, cmd_rx) = futures::sync::mpsc::unbounded();
let (event_sender, event_receiver) = futures::sync::mpsc::unbounded();
let (cmd_tx, cmd_rx) = mpsc::unbounded();
let (event_sender, event_receiver) = mpsc::unbounded();
let handle = thread::spawn(move || {
debug!("new Player[{}]", session.session_id());
debug!("new Player[{}]", session.session_id());
let internal = PlayerInternal {
session: session,
config: config,
commands: cmd_rx,
let internal = PlayerInternal {
session: session,
config: config,
commands: cmd_rx,
state: PlayerState::Stopped,
preload: PlayerPreload::None,
sink: sink_builder(),
sink_status: SinkStatus::Closed,
sink_event_callback: None,
audio_filter: audio_filter,
event_senders: [event_sender].to_vec(),
};
state: PlayerState::Stopped,
preload: PlayerPreload::None,
sink: sink_builder(),
sink_status: SinkStatus::Closed,
sink_event_callback: None,
audio_filter: audio_filter,
event_senders: [event_sender].to_vec(),
};
// While PlayerInternal is written as a future, it still contains blocking code.
// It must be run by using wait() in a dedicated thread.
let _ = internal.wait();
// While PlayerInternal is written as a future, it still contains blocking code.
// It must be run by using wait() in a dedicated thread.
let handle = tokio::spawn(async move {
internal.await;
debug!("PlayerInternal thread finished.");
});
(
Player {
commands: Some(cmd_tx),
thread_handle: Some(handle),
task_handle: Some(handle),
play_request_id_generator: SeqGenerator::new(0),
},
event_receiver,
@ -314,22 +317,21 @@ impl Player {
}
pub fn get_player_event_channel(&self) -> PlayerEventChannel {
let (event_sender, event_receiver) = futures::sync::mpsc::unbounded();
let (event_sender, event_receiver) = mpsc::unbounded();
self.command(PlayerCommand::AddEventSender(event_sender));
event_receiver
}
pub fn get_end_of_track_future(&self) -> Box<dyn Future<Item = (), Error = ()>> {
let result = self
.get_player_event_channel()
.filter(|event| match event {
PlayerEvent::EndOfTrack { .. } | PlayerEvent::Stopped { .. } => true,
_ => false,
pub async fn get_end_of_track_future(&self) {
self.get_player_event_channel()
.filter(|event| {
future::ready(matches!(
event,
PlayerEvent::EndOfTrack { .. } | PlayerEvent::Stopped { .. }
))
})
.into_future()
.map_err(|_| ())
.map(|_| ());
Box::new(result)
.for_each(|_| future::ready(()))
.await
}
pub fn set_sink_event_callback(&self, callback: Option<SinkEventCallback>) {
@ -345,11 +347,13 @@ impl Drop for Player {
fn drop(&mut self) {
debug!("Shutting down player thread ...");
self.commands = None;
if let Some(handle) = self.thread_handle.take() {
match handle.join() {
Ok(_) => (),
Err(_) => error!("Player thread panicked!"),
}
if let Some(handle) = self.task_handle.take() {
tokio::spawn(async {
match handle.await {
Ok(_) => (),
Err(_) => error!("Player thread panicked!"),
}
});
}
}
}
@ -367,11 +371,11 @@ enum PlayerPreload {
None,
Loading {
track_id: SpotifyId,
loader: Box<dyn Future<Item = PlayerLoadedTrackData, Error = ()>>,
loader: Pin<Box<dyn Future<Output = Result<PlayerLoadedTrackData, ()>> + Send>>,
},
Ready {
track_id: SpotifyId,
loaded_track: PlayerLoadedTrackData,
loaded_track: Box<PlayerLoadedTrackData>,
},
}
@ -383,7 +387,7 @@ enum PlayerState {
track_id: SpotifyId,
play_request_id: u64,
start_playback: bool,
loader: Box<dyn Future<Item = PlayerLoadedTrackData, Error = ()>>,
loader: Pin<Box<dyn Future<Output = Result<PlayerLoadedTrackData, ()>> + Send>>,
},
Paused {
track_id: SpotifyId,
@ -428,23 +432,15 @@ impl PlayerState {
#[allow(dead_code)]
fn is_stopped(&self) -> bool {
use self::PlayerState::*;
match *self {
Stopped => true,
_ => false,
}
matches!(self, Self::Stopped)
}
fn is_loading(&self) -> bool {
use self::PlayerState::*;
match *self {
Loading { .. } => true,
_ => false,
}
matches!(self, Self::Loading { .. })
}
fn decoder(&mut self) -> Option<&mut Decoder> {
use self::PlayerState::*;
use PlayerState::*;
match *self {
Stopped | EndOfTrack { .. } | Loading { .. } => None,
Paused {
@ -573,22 +569,23 @@ struct PlayerTrackLoader {
}
impl PlayerTrackLoader {
fn find_available_alternative<'a>(&self, audio: &'a AudioItem) -> Option<Cow<'a, AudioItem>> {
async fn find_available_alternative<'a, 'b>(
&'a self,
audio: &'b AudioItem,
) -> Option<Cow<'b, AudioItem>> {
if audio.available {
Some(Cow::Borrowed(audio))
} else if let Some(alternatives) = &audio.alternatives {
let alternatives = alternatives
.iter()
.map(|alt_id| AudioItem::get_audio_item(&self.session, *alt_id));
let alternatives = future::try_join_all(alternatives).await.unwrap();
alternatives
.into_iter()
.find(|alt| alt.available)
.map(Cow::Owned)
} else {
if let Some(alternatives) = &audio.alternatives {
let alternatives = alternatives
.iter()
.map(|alt_id| AudioItem::get_audio_item(&self.session, *alt_id));
let alternatives = future::join_all(alternatives).wait().unwrap();
alternatives
.into_iter()
.find(|alt| alt.available)
.map(Cow::Owned)
} else {
None
}
None
}
}
@ -611,8 +608,12 @@ impl PlayerTrackLoader {
}
}
fn load_track(&self, spotify_id: SpotifyId, position_ms: u32) -> Option<PlayerLoadedTrackData> {
let audio = match AudioItem::get_audio_item(&self.session, spotify_id).wait() {
async fn load_track(
&self,
spotify_id: SpotifyId,
position_ms: u32,
) -> Option<PlayerLoadedTrackData> {
let audio = match AudioItem::get_audio_item(&self.session, spotify_id).await {
Ok(audio) => audio,
Err(_) => {
error!("Unable to load audio item.");
@ -622,7 +623,7 @@ impl PlayerTrackLoader {
info!("Loading <{}> with Spotify URI <{}>", audio.name, audio.uri);
let audio = match self.find_available_alternative(&audio) {
let audio = match self.find_available_alternative(&audio).await {
Some(audio) => audio,
None => {
warn!("<{}> is not available", audio.uri);
@ -675,7 +676,7 @@ impl PlayerTrackLoader {
play_from_beginning,
);
let encrypted_file = match encrypted_file.wait() {
let encrypted_file = match encrypted_file.await {
Ok(encrypted_file) => encrypted_file,
Err(_) => {
error!("Unable to load encrypted file.");
@ -693,7 +694,7 @@ impl PlayerTrackLoader {
stream_loader_controller.set_random_access_mode();
}
let key = match key.wait() {
let key = match key.await {
Ok(key) => key,
Err(_) => {
error!("Unable to load decryption key");
@ -709,7 +710,7 @@ impl PlayerTrackLoader {
}
Err(_) => {
warn!("Unable to extract normalisation data, using default value.");
1.0 as f32
1.0_f32
}
};
@ -738,10 +739,9 @@ impl PlayerTrackLoader {
}
impl Future for PlayerInternal {
type Item = ();
type Error = ();
type Output = ();
fn poll(&mut self) -> Poll<(), ()> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
// While this is written as a future, it still contains blocking code.
// It must be run on its own thread.
@ -749,14 +749,13 @@ impl Future for PlayerInternal {
let mut all_futures_completed_or_not_ready = true;
// process commands that were sent to us
let cmd = match self.commands.poll() {
Ok(Async::Ready(None)) => return Ok(Async::Ready(())), // client has disconnected - shut down.
Ok(Async::Ready(Some(cmd))) => {
let cmd = match Pin::new(&mut self.commands).poll_next(cx) {
Poll::Ready(None) => return Poll::Ready(()), // client has disconnected - shut down.
Poll::Ready(Some(cmd)) => {
all_futures_completed_or_not_ready = false;
Some(cmd)
}
Ok(Async::NotReady) => None,
Err(_) => None,
_ => None,
};
if let Some(cmd) = cmd {
@ -771,8 +770,8 @@ impl Future for PlayerInternal {
play_request_id,
} = self.state
{
match loader.poll() {
Ok(Async::Ready(loaded_track)) => {
match loader.as_mut().poll(cx) {
Poll::Ready(Ok(loaded_track)) => {
self.start_playback(
track_id,
play_request_id,
@ -783,8 +782,7 @@ impl Future for PlayerInternal {
panic!("The state wasn't changed by start_playback()");
}
}
Ok(Async::NotReady) => (),
Err(_) => {
Poll::Ready(Err(_)) => {
warn!("Unable to load <{:?}>\nSkipping to next track", track_id);
assert!(self.state.is_loading());
self.send_event(PlayerEvent::EndOfTrack {
@ -792,6 +790,7 @@ impl Future for PlayerInternal {
play_request_id,
})
}
Poll::Pending => (),
}
}
@ -801,16 +800,15 @@ impl Future for PlayerInternal {
track_id,
} = self.preload
{
match loader.poll() {
Ok(Async::Ready(loaded_track)) => {
match loader.as_mut().poll(cx) {
Poll::Ready(Ok(loaded_track)) => {
self.send_event(PlayerEvent::Preloading { track_id });
self.preload = PlayerPreload::Ready {
track_id,
loaded_track,
loaded_track: Box::new(loaded_track),
};
}
Ok(Async::NotReady) => (),
Err(_) => {
Poll::Ready(Err(_)) => {
debug!("Unable to preload {:?}", track_id);
self.preload = PlayerPreload::None;
// Let Spirc know that the track was unavailable.
@ -827,6 +825,7 @@ impl Future for PlayerInternal {
});
}
}
Poll::Pending => (),
}
}
@ -847,8 +846,7 @@ impl Future for PlayerInternal {
let packet = decoder.next_packet().expect("Vorbis error");
if let Some(ref packet) = packet {
*stream_position_pcm =
*stream_position_pcm + (packet.data().len() / 2) as u64;
*stream_position_pcm += (packet.data().len() / 2) as u64;
let stream_position_millis = Self::position_pcm_to_ms(*stream_position_pcm);
let notify_about_position = match *reported_nominal_start_time {
@ -858,11 +856,7 @@ impl Future for PlayerInternal {
let lag = (Instant::now() - reported_nominal_start_time).as_millis()
as i64
- stream_position_millis as i64;
if lag > 1000 {
true
} else {
false
}
lag > 1000
}
};
if notify_about_position {
@ -918,11 +912,11 @@ impl Future for PlayerInternal {
}
if self.session.is_invalid() {
return Ok(Async::Ready(()));
return Poll::Ready(());
}
if (!self.state.is_playing()) && all_futures_completed_or_not_ready {
return Ok(Async::NotReady);
return Poll::Pending;
}
}
}
@ -1061,12 +1055,14 @@ impl PlayerInternal {
fn handle_packet(&mut self, packet: Option<VorbisPacket>, normalisation_factor: f32) {
match packet {
Some(mut packet) => {
if packet.data().len() > 0 {
if !packet.data().is_empty() {
if let Some(ref editor) = self.audio_filter {
editor.modify_stream(&mut packet.data_mut())
};
if self.config.normalisation && normalisation_factor != 1.0 {
if self.config.normalisation
&& (normalisation_factor - 1.0).abs() < f32::EPSILON
{
for x in packet.data_mut().iter_mut() {
*x = (*x as f32 * normalisation_factor) as i16;
}
@ -1214,10 +1210,9 @@ impl PlayerInternal {
loaded_track
.stream_loader_controller
.set_random_access_mode();
let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking.
// But most likely the track is fully
// loaded already because we played
// to the end of it.
let _ = tokio::task::block_in_place(|| {
loaded_track.decoder.seek(position_ms as i64)
});
loaded_track.stream_loader_controller.set_stream_mode();
loaded_track.stream_position_pcm = Self::position_ms_to_pcm(position_ms);
}
@ -1250,7 +1245,7 @@ impl PlayerInternal {
// we can use the current decoder. Ensure it's at the correct position.
if Self::position_ms_to_pcm(position_ms) != *stream_position_pcm {
stream_loader_controller.set_random_access_mode();
let _ = decoder.seek(position_ms as i64); // This may be blocking.
let _ = tokio::task::block_in_place(|| decoder.seek(position_ms as i64));
stream_loader_controller.set_stream_mode();
*stream_position_pcm = Self::position_ms_to_pcm(position_ms);
}
@ -1318,10 +1313,12 @@ impl PlayerInternal {
loaded_track
.stream_loader_controller
.set_random_access_mode();
let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking
let _ = tokio::task::block_in_place(|| {
loaded_track.decoder.seek(position_ms as i64)
});
loaded_track.stream_loader_controller.set_stream_mode();
}
self.start_playback(track_id, play_request_id, loaded_track, play);
self.start_playback(track_id, play_request_id, *loaded_track, play);
return;
} else {
unreachable!();
@ -1363,9 +1360,7 @@ impl PlayerInternal {
self.preload = PlayerPreload::None;
// If we don't have a loader yet, create one from scratch.
let loader = loader
.or_else(|| Some(self.load_track(track_id, position_ms)))
.unwrap();
let loader = loader.unwrap_or_else(|| Box::pin(self.load_track(track_id, position_ms)));
// Set ourselves to a loading state.
self.state = PlayerState::Loading {
@ -1420,7 +1415,10 @@ impl PlayerInternal {
// schedule the preload of the current track if desired.
if preload_track {
let loader = self.load_track(track_id, 0);
self.preload = PlayerPreload::Loading { track_id, loader }
self.preload = PlayerPreload::Loading {
track_id,
loader: Box::pin(loader),
}
}
}
@ -1532,34 +1530,33 @@ impl PlayerInternal {
}
}
fn load_track(
pub fn load_track(
&self,
spotify_id: SpotifyId,
position_ms: u32,
) -> Box<dyn Future<Item = PlayerLoadedTrackData, Error = ()>> {
) -> impl Future<Output = Result<PlayerLoadedTrackData, ()>> + Send + 'static {
// This method creates a future that returns the loaded stream and associated info.
// Ideally all work should be done using asynchronous code. However, seek() on the
// audio stream is implemented in a blocking fashion. Thus, we can't turn it into future
// easily. Instead we spawn a thread to do the work and return a one-shot channel as the
// future to work with.
let loader = PlayerTrackLoader {
session: self.session.clone(),
config: self.config.clone(),
};
let session = self.session.clone();
let config = self.config.clone();
let (result_tx, result_rx) = futures::sync::oneshot::channel();
async move {
let loader = PlayerTrackLoader { session, config };
std::thread::spawn(move || {
loader
.load_track(spotify_id, position_ms)
.and_then(move |data| {
let (result_tx, result_rx) = oneshot::channel();
tokio::spawn(async move {
if let Some(data) = loader.load_track(spotify_id, position_ms).await {
let _ = result_tx.send(data);
Some(())
});
});
}
});
Box::new(result_rx.map_err(|_| ()))
result_rx.await.map_err(|_| ())
}
}
fn preload_data_before_playback(&mut self) {
@ -1585,7 +1582,9 @@ impl PlayerInternal {
* bytes_per_second as f64) as usize,
(READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize,
);
stream_loader_controller.fetch_next_blocking(wait_for_data_length);
tokio::task::block_in_place(|| {
stream_loader_controller.fetch_next_blocking(wait_for_data_length)
});
}
}
}
@ -1689,13 +1688,13 @@ impl<T: Read + Seek> Subfile<T> {
}
impl<T: Read + Seek> Read for Subfile<T> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.stream.read(buf)
}
}
impl<T: Read + Seek> Seek for Subfile<T> {
fn seek(&mut self, mut pos: SeekFrom) -> Result<u64> {
fn seek(&mut self, mut pos: SeekFrom) -> io::Result<u64> {
pos = match pos {
SeekFrom::Start(offset) => SeekFrom::Start(offset + self.offset),
x => x,

View file

@ -1,3 +1,4 @@
# max_width = 105
reorder_imports = true
reorder_modules = true
edition = "2018"

View file

@ -1,8 +1,7 @@
#![crate_name = "librespot"]
#![cfg_attr(feature = "cargo-clippy", allow(unused_io_amount))]
pub extern crate librespot_audio as audio;
pub extern crate librespot_connect as connect;
// pub extern crate librespot_connect as connect;
pub extern crate librespot_core as core;
pub extern crate librespot_metadata as metadata;
pub extern crate librespot_playback as playback;

View file

@ -1,620 +0,0 @@
use futures::sync::mpsc::UnboundedReceiver;
use futures::{Async, Future, Poll, Stream};
use log::{error, info, trace, warn};
use sha1::{Digest, Sha1};
use std::env;
use std::io::{self, stderr, Write};
use std::mem;
use std::path::PathBuf;
use std::process::exit;
use std::str::FromStr;
use std::time::Instant;
use tokio_core::reactor::{Core, Handle};
use tokio_io::IoStream;
use url::Url;
use librespot::core::authentication::{get_credentials, Credentials};
use librespot::core::cache::Cache;
use librespot::core::config::{ConnectConfig, DeviceType, SessionConfig, VolumeCtrl};
use librespot::core::session::Session;
use librespot::core::version;
use librespot::connect::discovery::{discovery, DiscoveryStream};
use librespot::connect::spirc::{Spirc, SpircTask};
use librespot::playback::audio_backend::{self, Sink, BACKENDS};
use librespot::playback::config::{Bitrate, PlayerConfig};
use librespot::playback::mixer::{self, Mixer, MixerConfig};
use librespot::playback::player::{Player, PlayerEvent};
mod player_event_handler;
use crate::player_event_handler::{emit_sink_event, run_program_on_events};
fn device_id(name: &str) -> String {
hex::encode(Sha1::digest(name.as_bytes()))
}
fn usage(program: &str, opts: &getopts::Options) -> String {
let brief = format!("Usage: {} [options]", program);
opts.usage(&brief)
}
fn setup_logging(verbose: bool) {
let mut builder = env_logger::Builder::new();
match env::var("RUST_LOG") {
Ok(config) => {
builder.parse_filters(&config);
builder.init();
if verbose {
warn!("`--verbose` flag overidden by `RUST_LOG` environment variable");
}
}
Err(_) => {
if verbose {
builder.parse_filters("libmdns=info,librespot=trace");
} else {
builder.parse_filters("libmdns=info,librespot=info");
}
builder.init();
}
}
}
fn list_backends() {
println!("Available Backends : ");
for (&(name, _), idx) in BACKENDS.iter().zip(0..) {
if idx == 0 {
println!("- {} (default)", name);
} else {
println!("- {}", name);
}
}
}
#[derive(Clone)]
struct Setup {
backend: fn(Option<String>) -> Box<dyn Sink>,
device: Option<String>,
mixer: fn(Option<MixerConfig>) -> Box<dyn Mixer>,
cache: Option<Cache>,
player_config: PlayerConfig,
session_config: SessionConfig,
connect_config: ConnectConfig,
mixer_config: MixerConfig,
credentials: Option<Credentials>,
enable_discovery: bool,
zeroconf_port: u16,
player_event_program: Option<String>,
emit_sink_events: bool,
}
fn setup(args: &[String]) -> Setup {
let mut opts = getopts::Options::new();
opts.optopt(
"c",
"cache",
"Path to a directory where files will be cached.",
"CACHE",
).optopt(
"",
"system-cache",
"Path to a directory where system files (credentials, volume) will be cached. Can be different from cache option value",
"SYTEMCACHE",
).optflag("", "disable-audio-cache", "Disable caching of the audio data.")
.reqopt("n", "name", "Device name", "NAME")
.optopt("", "device-type", "Displayed device type", "DEVICE_TYPE")
.optopt(
"b",
"bitrate",
"Bitrate (96, 160 or 320). Defaults to 160",
"BITRATE",
)
.optopt(
"",
"onevent",
"Run PROGRAM when playback is about to begin.",
"PROGRAM",
)
.optflag("", "emit-sink-events", "Run program set by --onevent before sink is opened and after it is closed.")
.optflag("v", "verbose", "Enable verbose output")
.optopt("u", "username", "Username to sign in with", "USERNAME")
.optopt("p", "password", "Password", "PASSWORD")
.optopt("", "proxy", "HTTP proxy to use when connecting", "PROXY")
.optopt("", "ap-port", "Connect to AP with specified port. If no AP with that port are present fallback AP will be used. Available ports are usually 80, 443 and 4070", "AP_PORT")
.optflag("", "disable-discovery", "Disable discovery mode")
.optopt(
"",
"backend",
"Audio backend to use. Use '?' to list options",
"BACKEND",
)
.optopt(
"",
"device",
"Audio device to use. Use '?' to list options if using portaudio or alsa",
"DEVICE",
)
.optopt("", "mixer", "Mixer to use (alsa or softvol)", "MIXER")
.optopt(
"m",
"mixer-name",
"Alsa mixer name, e.g \"PCM\" or \"Master\". Defaults to 'PCM'",
"MIXER_NAME",
)
.optopt(
"",
"mixer-card",
"Alsa mixer card, e.g \"hw:0\" or similar from `aplay -l`. Defaults to 'default' ",
"MIXER_CARD",
)
.optopt(
"",
"mixer-index",
"Alsa mixer index, Index of the cards mixer. Defaults to 0",
"MIXER_INDEX",
)
.optflag(
"",
"mixer-linear-volume",
"Disable alsa's mapped volume scale (cubic). Default false",
)
.optopt(
"",
"initial-volume",
"Initial volume in %, once connected (must be from 0 to 100)",
"VOLUME",
)
.optopt(
"",
"zeroconf-port",
"The port the internal server advertised over zeroconf uses.",
"ZEROCONF_PORT",
)
.optflag(
"",
"enable-volume-normalisation",
"Play all tracks at the same volume",
)
.optopt(
"",
"normalisation-pregain",
"Pregain (dB) applied by volume normalisation",
"PREGAIN",
)
.optopt(
"",
"volume-ctrl",
"Volume control type - [linear, log, fixed]. Default is logarithmic",
"VOLUME_CTRL"
)
.optflag(
"",
"autoplay",
"autoplay similar songs when your music ends.",
)
.optflag(
"",
"disable-gapless",
"disable gapless playback.",
);
let matches = match opts.parse(&args[1..]) {
Ok(m) => m,
Err(f) => {
writeln!(
stderr(),
"error: {}\n{}",
f.to_string(),
usage(&args[0], &opts)
)
.unwrap();
exit(1);
}
};
let verbose = matches.opt_present("verbose");
setup_logging(verbose);
info!(
"librespot {} ({}). Built on {}. Build ID: {}",
version::short_sha(),
version::commit_date(),
version::short_now(),
version::build_id()
);
let backend_name = matches.opt_str("backend");
if backend_name == Some("?".into()) {
list_backends();
exit(0);
}
let backend = audio_backend::find(backend_name).expect("Invalid backend");
let device = matches.opt_str("device");
if device == Some("?".into()) {
backend(device);
exit(0);
}
let mixer_name = matches.opt_str("mixer");
let mixer = mixer::find(mixer_name.as_ref()).expect("Invalid mixer");
let mixer_config = MixerConfig {
card: matches
.opt_str("mixer-card")
.unwrap_or(String::from("default")),
mixer: matches.opt_str("mixer-name").unwrap_or(String::from("PCM")),
index: matches
.opt_str("mixer-index")
.map(|index| index.parse::<u32>().unwrap())
.unwrap_or(0),
mapped_volume: !matches.opt_present("mixer-linear-volume"),
};
let cache = matches.opt_str("c").map(|cache_path| {
let use_audio_cache = !matches.opt_present("disable-audio-cache");
let system_cache_directory = matches
.opt_str("system-cache")
.unwrap_or(String::from(cache_path.clone()));
Cache::new(
PathBuf::from(cache_path),
PathBuf::from(system_cache_directory),
use_audio_cache,
)
});
let initial_volume = matches
.opt_str("initial-volume")
.map(|volume| {
let volume = volume.parse::<u16>().unwrap();
if volume > 100 {
panic!("Initial volume must be in the range 0-100");
}
(volume as i32 * 0xFFFF / 100) as u16
})
.or_else(|| cache.as_ref().and_then(Cache::volume))
.unwrap_or(0x8000);
let zeroconf_port = matches
.opt_str("zeroconf-port")
.map(|port| port.parse::<u16>().unwrap())
.unwrap_or(0);
let name = matches.opt_str("name").unwrap();
let credentials = {
let cached_credentials = cache.as_ref().and_then(Cache::credentials);
let password = |username: &String| -> String {
write!(stderr(), "Password for {}: ", username).unwrap();
stderr().flush().unwrap();
rpassword::read_password().unwrap()
};
get_credentials(
matches.opt_str("username"),
matches.opt_str("password"),
cached_credentials,
password,
)
};
let session_config = {
let device_id = device_id(&name);
SessionConfig {
user_agent: version::version_string(),
device_id: device_id,
proxy: matches.opt_str("proxy").or(std::env::var("http_proxy").ok()).map(
|s| {
match Url::parse(&s) {
Ok(url) => {
if url.host().is_none() || url.port_or_known_default().is_none() {
panic!("Invalid proxy url, only urls on the format \"http://host:port\" are allowed");
}
if url.scheme() != "http" {
panic!("Only unsecure http:// proxies are supported");
}
url
},
Err(err) => panic!("Invalid proxy url: {}, only urls on the format \"http://host:port\" are allowed", err)
}
},
),
ap_port: matches
.opt_str("ap-port")
.map(|port| port.parse::<u16>().expect("Invalid port")),
}
};
let player_config = {
let bitrate = matches
.opt_str("b")
.as_ref()
.map(|bitrate| Bitrate::from_str(bitrate).expect("Invalid bitrate"))
.unwrap_or(Bitrate::default());
PlayerConfig {
bitrate: bitrate,
gapless: !matches.opt_present("disable-gapless"),
normalisation: matches.opt_present("enable-volume-normalisation"),
normalisation_pregain: matches
.opt_str("normalisation-pregain")
.map(|pregain| pregain.parse::<f32>().expect("Invalid pregain float value"))
.unwrap_or(PlayerConfig::default().normalisation_pregain),
}
};
let connect_config = {
let device_type = matches
.opt_str("device-type")
.as_ref()
.map(|device_type| DeviceType::from_str(device_type).expect("Invalid device type"))
.unwrap_or(DeviceType::default());
let volume_ctrl = matches
.opt_str("volume-ctrl")
.as_ref()
.map(|volume_ctrl| VolumeCtrl::from_str(volume_ctrl).expect("Invalid volume ctrl type"))
.unwrap_or(VolumeCtrl::default());
ConnectConfig {
name: name,
device_type: device_type,
volume: initial_volume,
volume_ctrl: volume_ctrl,
autoplay: matches.opt_present("autoplay"),
}
};
let enable_discovery = !matches.opt_present("disable-discovery");
Setup {
backend: backend,
cache: cache,
session_config: session_config,
player_config: player_config,
connect_config: connect_config,
credentials: credentials,
device: device,
enable_discovery: enable_discovery,
zeroconf_port: zeroconf_port,
mixer: mixer,
mixer_config: mixer_config,
player_event_program: matches.opt_str("onevent"),
emit_sink_events: matches.opt_present("emit-sink-events"),
}
}
struct Main {
cache: Option<Cache>,
player_config: PlayerConfig,
session_config: SessionConfig,
connect_config: ConnectConfig,
backend: fn(Option<String>) -> Box<dyn Sink>,
device: Option<String>,
mixer: fn(Option<MixerConfig>) -> Box<dyn Mixer>,
mixer_config: MixerConfig,
handle: Handle,
discovery: Option<DiscoveryStream>,
signal: IoStream<()>,
spirc: Option<Spirc>,
spirc_task: Option<SpircTask>,
connect: Box<dyn Future<Item = Session, Error = io::Error>>,
shutdown: bool,
last_credentials: Option<Credentials>,
auto_connect_times: Vec<Instant>,
player_event_channel: Option<UnboundedReceiver<PlayerEvent>>,
player_event_program: Option<String>,
emit_sink_events: bool,
}
impl Main {
fn new(handle: Handle, setup: Setup) -> Main {
let mut task = Main {
handle: handle.clone(),
cache: setup.cache,
session_config: setup.session_config,
player_config: setup.player_config,
connect_config: setup.connect_config,
backend: setup.backend,
device: setup.device,
mixer: setup.mixer,
mixer_config: setup.mixer_config,
connect: Box::new(futures::future::empty()),
discovery: None,
spirc: None,
spirc_task: None,
shutdown: false,
last_credentials: None,
auto_connect_times: Vec::new(),
signal: Box::new(tokio_signal::ctrl_c().flatten_stream()),
player_event_channel: None,
player_event_program: setup.player_event_program,
emit_sink_events: setup.emit_sink_events,
};
if setup.enable_discovery {
let config = task.connect_config.clone();
let device_id = task.session_config.device_id.clone();
task.discovery =
Some(discovery(&handle, config, device_id, setup.zeroconf_port).unwrap());
}
if let Some(credentials) = setup.credentials {
task.credentials(credentials);
}
task
}
fn credentials(&mut self, credentials: Credentials) {
self.last_credentials = Some(credentials.clone());
let config = self.session_config.clone();
let handle = self.handle.clone();
let connection = Session::connect(config, credentials, self.cache.clone(), handle);
self.connect = connection;
self.spirc = None;
let task = mem::replace(&mut self.spirc_task, None);
if let Some(task) = task {
self.handle.spawn(task);
}
}
}
impl Future for Main {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
loop {
let mut progress = false;
if let Some(Async::Ready(Some(creds))) =
self.discovery.as_mut().map(|d| d.poll().unwrap())
{
if let Some(ref spirc) = self.spirc {
spirc.shutdown();
}
self.auto_connect_times.clear();
self.credentials(creds);
progress = true;
}
match self.connect.poll() {
Ok(Async::Ready(session)) => {
self.connect = Box::new(futures::future::empty());
let mixer_config = self.mixer_config.clone();
let mixer = (self.mixer)(Some(mixer_config));
let player_config = self.player_config.clone();
let connect_config = self.connect_config.clone();
let audio_filter = mixer.get_audio_filter();
let backend = self.backend;
let device = self.device.clone();
let (player, event_channel) =
Player::new(player_config, session.clone(), audio_filter, move || {
(backend)(device)
});
if self.emit_sink_events {
if let Some(player_event_program) = &self.player_event_program {
let player_event_program = player_event_program.clone();
player.set_sink_event_callback(Some(Box::new(move |sink_status| {
emit_sink_event(sink_status, &player_event_program)
})));
}
}
let (spirc, spirc_task) = Spirc::new(connect_config, session, player, mixer);
self.spirc = Some(spirc);
self.spirc_task = Some(spirc_task);
self.player_event_channel = Some(event_channel);
progress = true;
}
Ok(Async::NotReady) => (),
Err(error) => {
error!("Could not connect to server: {}", error);
self.connect = Box::new(futures::future::empty());
}
}
if let Async::Ready(Some(())) = self.signal.poll().unwrap() {
trace!("Ctrl-C received");
if !self.shutdown {
if let Some(ref spirc) = self.spirc {
spirc.shutdown();
} else {
return Ok(Async::Ready(()));
}
self.shutdown = true;
} else {
return Ok(Async::Ready(()));
}
progress = true;
}
let mut drop_spirc_and_try_to_reconnect = false;
if let Some(ref mut spirc_task) = self.spirc_task {
if let Async::Ready(()) = spirc_task.poll().unwrap() {
if self.shutdown {
return Ok(Async::Ready(()));
} else {
warn!("Spirc shut down unexpectedly");
drop_spirc_and_try_to_reconnect = true;
}
progress = true;
}
}
if drop_spirc_and_try_to_reconnect {
self.spirc_task = None;
while (!self.auto_connect_times.is_empty())
&& ((Instant::now() - self.auto_connect_times[0]).as_secs() > 600)
{
let _ = self.auto_connect_times.remove(0);
}
if let Some(credentials) = self.last_credentials.clone() {
if self.auto_connect_times.len() >= 5 {
warn!("Spirc shut down too often. Not reconnecting automatically.");
} else {
self.auto_connect_times.push(Instant::now());
self.credentials(credentials);
}
}
}
if let Some(ref mut player_event_channel) = self.player_event_channel {
if let Async::Ready(Some(event)) = player_event_channel.poll().unwrap() {
progress = true;
if let Some(ref program) = self.player_event_program {
if let Some(child) = run_program_on_events(event, program) {
let child = child
.expect("program failed to start")
.map(|status| {
if !status.success() {
error!("child exited with status {:?}", status.code());
}
})
.map_err(|e| error!("failed to wait on child process: {}", e));
self.handle.spawn(child);
}
}
}
}
if !progress {
return Ok(Async::NotReady);
}
}
}
}
fn main() {
if env::var("RUST_BACKTRACE").is_err() {
env::set_var("RUST_BACKTRACE", "full")
}
let mut core = Core::new().unwrap();
let handle = core.handle();
let args: Vec<String> = std::env::args().collect();
core.run(Main::new(handle, setup(&args))).unwrap()
}