Migrated audio crate to futures 0.3

This commit is contained in:
johannesd3 2021-01-21 22:12:35 +01:00
parent 424ba3ae25
commit 80d384e001
4 changed files with 418 additions and 367 deletions

View file

@ -11,16 +11,17 @@ path = "../core"
version = "0.1.3" version = "0.1.3"
[dependencies] [dependencies]
aes-ctr = "0.6"
bit-set = "0.5" bit-set = "0.5"
byteorder = "1.3" byteorder = "1.4"
bytes = "0.4" bytes = "1.0"
futures = "0.1" futures = "0.3"
lewton = "0.9" lewton = "0.9"
log = "0.4" log = "0.4"
num-bigint = "0.3" num-bigint = "0.3"
num-traits = "0.2" num-traits = "0.2"
pin-project = "1.0"
tempfile = "3.1" tempfile = "3.1"
aes-ctr = "0.3"
librespot-tremor = { version = "0.1.0", optional = true } librespot-tremor = { version = "0.1.0", optional = true }
vorbis = { version ="0.0.14", optional = true } vorbis = { version ="0.0.14", optional = true }

View file

@ -1,7 +1,7 @@
use std::io; use std::io;
use aes_ctr::stream_cipher::generic_array::GenericArray; use aes_ctr::cipher::generic_array::GenericArray;
use aes_ctr::stream_cipher::{NewStreamCipher, SyncStreamCipher, SyncStreamCipherSeek}; use aes_ctr::cipher::{NewStreamCipher, SyncStreamCipher, SyncStreamCipherSeek};
use aes_ctr::Aes128Ctr; use aes_ctr::Aes128Ctr;
use librespot_core::audio_key::AudioKey; use librespot_core::audio_key::AudioKey;

View file

@ -1,17 +1,25 @@
use crate::range_set::{Range, RangeSet}; use crate::range_set::{Range, RangeSet};
use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use bytes::Bytes; use bytes::Bytes;
use futures::sync::{mpsc, oneshot}; use futures::{
use futures::Stream; channel::{mpsc, oneshot},
use futures::{Async, Future, Poll}; future,
use std::cmp::{max, min}; };
use futures::{Future, Stream, StreamExt, TryFutureExt, TryStreamExt};
use std::fs; use std::fs;
use std::io::{self, Read, Seek, SeekFrom, Write}; use std::io::{self, Read, Seek, SeekFrom, Write};
use std::sync::{Arc, Condvar, Mutex}; use std::sync::{Arc, Condvar, Mutex};
use std::task::Poll;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use std::{
cmp::{max, min},
pin::Pin,
task::Context,
};
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
use futures::sync::mpsc::unbounded; use futures::channel::mpsc::unbounded;
use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders}; use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders};
use librespot_core::session::Session; use librespot_core::session::Session;
use librespot_core::spotify_id::FileId; use librespot_core::spotify_id::FileId;
@ -88,22 +96,6 @@ pub enum AudioFile {
Streaming(AudioFileStreaming), Streaming(AudioFileStreaming),
} }
pub enum AudioFileOpen {
Cached(Option<fs::File>),
Streaming(AudioFileOpenStreaming),
}
pub struct AudioFileOpenStreaming {
session: Session,
initial_data_rx: Option<ChannelData>,
initial_data_length: Option<usize>,
initial_request_sent_time: Instant,
headers: ChannelHeaders,
file_id: FileId,
complete_tx: Option<oneshot::Sender<NamedTempFile>>,
streaming_data_rate: usize,
}
enum StreamLoaderCommand { enum StreamLoaderCommand {
Fetch(Range), // signal the stream loader to fetch a range of the file Fetch(Range), // signal the stream loader to fetch a range of the file
RandomAccessMode(), // optimise download strategy for random access RandomAccessMode(), // optimise download strategy for random access
@ -120,45 +112,36 @@ pub struct StreamLoaderController {
impl StreamLoaderController { impl StreamLoaderController {
pub fn len(&self) -> usize { pub fn len(&self) -> usize {
return self.file_size; self.file_size
}
pub fn is_empty(&self) -> bool {
self.file_size == 0
} }
pub fn range_available(&self, range: Range) -> bool { pub fn range_available(&self, range: Range) -> bool {
if let Some(ref shared) = self.stream_shared { if let Some(ref shared) = self.stream_shared {
let download_status = shared.download_status.lock().unwrap(); let download_status = shared.download_status.lock().unwrap();
if range.length range.length
<= download_status <= download_status
.downloaded .downloaded
.contained_length_from_value(range.start) .contained_length_from_value(range.start)
{
return true;
} else {
return false;
}
} else { } else {
if range.length <= self.len() - range.start { range.length <= self.len() - range.start
return true;
} else {
return false;
}
} }
} }
pub fn range_to_end_available(&self) -> bool { pub fn range_to_end_available(&self) -> bool {
if let Some(ref shared) = self.stream_shared { self.stream_shared.as_ref().map_or(true, |shared| {
let read_position = shared.read_position.load(atomic::Ordering::Relaxed); let read_position = shared.read_position.load(atomic::Ordering::Relaxed);
self.range_available(Range::new(read_position, self.len() - read_position)) self.range_available(Range::new(read_position, self.len() - read_position))
} else { })
true
}
} }
pub fn ping_time_ms(&self) -> usize { pub fn ping_time_ms(&self) -> usize {
if let Some(ref shared) = self.stream_shared { self.stream_shared.as_ref().map_or(0, |shared| {
return shared.ping_time_ms.load(atomic::Ordering::Relaxed); shared.ping_time_ms.load(atomic::Ordering::Relaxed)
} else { })
return 0;
}
} }
fn send_stream_loader_command(&mut self, command: StreamLoaderCommand) { fn send_stream_loader_command(&mut self, command: StreamLoaderCommand) {
@ -216,27 +199,23 @@ impl StreamLoaderController {
} }
pub fn fetch_next(&mut self, length: usize) { pub fn fetch_next(&mut self, length: usize) {
let range: Range = if let Some(ref shared) = self.stream_shared { if let Some(ref shared) = self.stream_shared {
Range { let range = Range {
start: shared.read_position.load(atomic::Ordering::Relaxed), start: shared.read_position.load(atomic::Ordering::Relaxed),
length: length, length: length,
} };
} else { self.fetch(range)
return; }
};
self.fetch(range);
} }
pub fn fetch_next_blocking(&mut self, length: usize) { pub fn fetch_next_blocking(&mut self, length: usize) {
let range: Range = if let Some(ref shared) = self.stream_shared { if let Some(ref shared) = self.stream_shared {
Range { let range = Range {
start: shared.read_position.load(atomic::Ordering::Relaxed), start: shared.read_position.load(atomic::Ordering::Relaxed),
length: length, length: length,
} };
} else { self.fetch_blocking(range);
return; }
};
self.fetch_blocking(range);
} }
pub fn set_random_access_mode(&mut self) { pub fn set_random_access_mode(&mut self) {
@ -288,108 +267,16 @@ struct AudioFileShared {
read_position: AtomicUsize, read_position: AtomicUsize,
} }
impl AudioFileOpenStreaming {
fn finish(&mut self, size: usize) -> AudioFileStreaming {
let shared = Arc::new(AudioFileShared {
file_id: self.file_id,
file_size: size,
stream_data_rate: self.streaming_data_rate,
cond: Condvar::new(),
download_status: Mutex::new(AudioFileDownloadStatus {
requested: RangeSet::new(),
downloaded: RangeSet::new(),
}),
download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise
number_of_open_requests: AtomicUsize::new(0),
ping_time_ms: AtomicUsize::new(0),
read_position: AtomicUsize::new(0),
});
let mut write_file = NamedTempFile::new().unwrap();
write_file.as_file().set_len(size as u64).unwrap();
write_file.seek(SeekFrom::Start(0)).unwrap();
let read_file = write_file.reopen().unwrap();
let initial_data_rx = self.initial_data_rx.take().unwrap();
let initial_data_length = self.initial_data_length.take().unwrap();
let complete_tx = self.complete_tx.take().unwrap();
//let (seek_tx, seek_rx) = mpsc::unbounded();
let (stream_loader_command_tx, stream_loader_command_rx) =
mpsc::unbounded::<StreamLoaderCommand>();
let fetcher = AudioFileFetch::new(
self.session.clone(),
shared.clone(),
initial_data_rx,
self.initial_request_sent_time,
initial_data_length,
write_file,
stream_loader_command_rx,
complete_tx,
);
self.session.spawn(move |_| fetcher);
AudioFileStreaming {
read_file: read_file,
position: 0,
//seek: seek_tx,
stream_loader_command_tx: stream_loader_command_tx,
shared: shared,
}
}
}
impl Future for AudioFileOpen {
type Item = AudioFile;
type Error = ChannelError;
fn poll(&mut self) -> Poll<AudioFile, ChannelError> {
match *self {
AudioFileOpen::Streaming(ref mut open) => {
let file = try_ready!(open.poll());
Ok(Async::Ready(AudioFile::Streaming(file)))
}
AudioFileOpen::Cached(ref mut file) => {
let file = file.take().unwrap();
Ok(Async::Ready(AudioFile::Cached(file)))
}
}
}
}
impl Future for AudioFileOpenStreaming {
type Item = AudioFileStreaming;
type Error = ChannelError;
fn poll(&mut self) -> Poll<AudioFileStreaming, ChannelError> {
loop {
let (id, data) = try_ready!(self.headers.poll()).unwrap();
if id == 0x3 {
let size = BigEndian::read_u32(&data) as usize * 4;
let file = self.finish(size);
return Ok(Async::Ready(file));
}
}
}
}
impl AudioFile { impl AudioFile {
pub fn open( pub async fn open(
session: &Session, session: &Session,
file_id: FileId, file_id: FileId,
bytes_per_second: usize, bytes_per_second: usize,
play_from_beginning: bool, play_from_beginning: bool,
) -> AudioFileOpen { ) -> Result<AudioFile, ChannelError> {
let cache = session.cache().cloned(); if let Some(file) = session.cache().and_then(|cache| cache.file(file_id)) {
if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) {
debug!("File {} already in cache", file_id); debug!("File {} already in cache", file_id);
return AudioFileOpen::Cached(Some(file)); return Ok(AudioFile::Cached(file));
} }
debug!("Downloading file {}", file_id); debug!("Downloading file {}", file_id);
@ -411,56 +298,112 @@ impl AudioFile {
} }
let (headers, data) = request_range(session, file_id, 0, initial_data_length).split(); let (headers, data) = request_range(session, file_id, 0, initial_data_length).split();
let open = AudioFileOpenStreaming { let streaming = AudioFileStreaming::open(
session: session.clone(), session.clone(),
file_id: file_id, data,
initial_data_length,
headers: headers, Instant::now(),
initial_data_rx: Some(data), headers,
initial_data_length: Some(initial_data_length), file_id,
initial_request_sent_time: Instant::now(), complete_tx,
bytes_per_second,
complete_tx: Some(complete_tx), );
streaming_data_rate: bytes_per_second,
};
let session_ = session.clone(); let session_ = session.clone();
session.spawn(move |_| { session.spawn(complete_rx.map_ok(move |mut file| {
complete_rx if let Some(cache) = session_.cache() {
.map(move |mut file| { cache.save_file(file_id, &mut file);
if let Some(cache) = session_.cache() { debug!("File {} complete, saving to cache", file_id);
cache.save_file(file_id, &mut file); } else {
debug!("File {} complete, saving to cache", file_id); debug!("File {} complete", file_id);
} else { }
debug!("File {} complete", file_id); }));
}
})
.or_else(|oneshot::Canceled| Ok(()))
});
return AudioFileOpen::Streaming(open); Ok(AudioFile::Streaming(streaming.await?))
} }
pub fn get_stream_loader_controller(&self) -> StreamLoaderController { pub fn get_stream_loader_controller(&self) -> StreamLoaderController {
match self { match self {
AudioFile::Streaming(ref stream) => { AudioFile::Streaming(ref stream) => StreamLoaderController {
return StreamLoaderController { channel_tx: Some(stream.stream_loader_command_tx.clone()),
channel_tx: Some(stream.stream_loader_command_tx.clone()), stream_shared: Some(stream.shared.clone()),
stream_shared: Some(stream.shared.clone()), file_size: stream.shared.file_size,
file_size: stream.shared.file_size, },
}; AudioFile::Cached(ref file) => StreamLoaderController {
} channel_tx: None,
AudioFile::Cached(ref file) => { stream_shared: None,
return StreamLoaderController { file_size: file.metadata().unwrap().len() as usize,
channel_tx: None, },
stream_shared: None,
file_size: file.metadata().unwrap().len() as usize,
};
}
} }
} }
} }
impl AudioFileStreaming {
pub async fn open(
session: Session,
initial_data_rx: ChannelData,
initial_data_length: usize,
initial_request_sent_time: Instant,
headers: ChannelHeaders,
file_id: FileId,
complete_tx: oneshot::Sender<NamedTempFile>,
streaming_data_rate: usize,
) -> Result<AudioFileStreaming, ChannelError> {
let (_, data) = headers
.try_filter(|(id, _)| future::ready(*id == 0x3))
.next()
.await
.unwrap()?;
let size = BigEndian::read_u32(&data) as usize * 4;
let shared = Arc::new(AudioFileShared {
file_id: file_id,
file_size: size,
stream_data_rate: streaming_data_rate,
cond: Condvar::new(),
download_status: Mutex::new(AudioFileDownloadStatus {
requested: RangeSet::new(),
downloaded: RangeSet::new(),
}),
download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise
number_of_open_requests: AtomicUsize::new(0),
ping_time_ms: AtomicUsize::new(0),
read_position: AtomicUsize::new(0),
});
let mut write_file = NamedTempFile::new().unwrap();
write_file.as_file().set_len(size as u64).unwrap();
write_file.seek(SeekFrom::Start(0)).unwrap();
let read_file = write_file.reopen().unwrap();
//let (seek_tx, seek_rx) = mpsc::unbounded();
let (stream_loader_command_tx, stream_loader_command_rx) =
mpsc::unbounded::<StreamLoaderCommand>();
let fetcher = AudioFileFetch::new(
session.clone(),
shared.clone(),
initial_data_rx,
initial_request_sent_time,
initial_data_length,
write_file,
stream_loader_command_rx,
complete_tx,
);
session.spawn(fetcher);
Ok(AudioFileStreaming {
read_file: read_file,
position: 0,
//seek: seek_tx,
stream_loader_command_tx: stream_loader_command_tx,
shared: shared,
})
}
}
fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel { fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel {
assert!( assert!(
offset % 4 == 0, offset % 4 == 0,
@ -502,141 +445,261 @@ enum ReceivedData {
Data(PartialFileData), Data(PartialFileData),
} }
struct AudioFileFetchDataReceiver { async fn audio_file_fetch_receive_data(
shared: Arc<AudioFileShared>, shared: Arc<AudioFileShared>,
file_data_tx: mpsc::UnboundedSender<ReceivedData>, file_data_tx: mpsc::UnboundedSender<ReceivedData>,
data_rx: ChannelData, data_rx: ChannelData,
initial_data_offset: usize, initial_data_offset: usize,
initial_request_length: usize, initial_request_length: usize,
data_offset: usize, request_sent_time: Instant,
request_length: usize, ) {
request_sent_time: Option<Instant>, let mut data_offset = initial_data_offset;
measure_ping_time: bool, let mut request_length = initial_request_length;
} let mut measure_ping_time = shared
.number_of_open_requests
.load(atomic::Ordering::SeqCst)
== 0;
impl AudioFileFetchDataReceiver { shared
fn new( .number_of_open_requests
shared: Arc<AudioFileShared>, .fetch_add(1, atomic::Ordering::SeqCst);
file_data_tx: mpsc::UnboundedSender<ReceivedData>,
data_rx: ChannelData,
data_offset: usize,
request_length: usize,
request_sent_time: Instant,
) -> AudioFileFetchDataReceiver {
let measure_ping_time = shared
.number_of_open_requests
.load(atomic::Ordering::SeqCst)
== 0;
shared enum TryFoldErr {
.number_of_open_requests ChannelError,
.fetch_add(1, atomic::Ordering::SeqCst); FinishEarly,
}
AudioFileFetchDataReceiver { let result = data_rx
shared: shared, .map_err(|_| TryFoldErr::ChannelError)
data_rx: data_rx, .try_for_each(|data| {
file_data_tx: file_data_tx, if measure_ping_time {
initial_data_offset: data_offset, let duration = Instant::now() - request_sent_time;
initial_request_length: request_length, let duration_ms: u64;
data_offset: data_offset, if 0.001 * (duration.as_millis() as f64)
request_length: request_length, > MAXIMUM_ASSUMED_PING_TIME_SECONDS
request_sent_time: Some(request_sent_time), {
measure_ping_time: measure_ping_time, duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64;
} } else {
duration_ms = duration.as_millis() as u64;
}
let _ = file_data_tx
.unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize));
measure_ping_time = false;
}
let data_size = data.len();
let _ = file_data_tx
.unbounded_send(ReceivedData::Data(PartialFileData {
offset: data_offset,
data: data,
}));
data_offset += data_size;
if request_length < data_size {
warn!("Data receiver for range {} (+{}) received more data from server than requested.", initial_data_offset, initial_request_length);
request_length = 0;
} else {
request_length -= data_size;
}
future::ready(if request_length == 0 {
Err(TryFoldErr::FinishEarly)
} else {
Ok(())
})
})
.await;
if request_length > 0 {
let missing_range = Range::new(data_offset, request_length);
let mut download_status = shared.download_status.lock().unwrap();
download_status.requested.subtract_range(&missing_range);
shared.cond.notify_all();
}
shared
.number_of_open_requests
.fetch_sub(1, atomic::Ordering::SeqCst);
if let Err(TryFoldErr::ChannelError) = result {
warn!(
"Error from channel for data receiver for range {} (+{}).",
initial_data_offset, initial_request_length
);
} else if request_length > 0 {
warn!(
"Data receiver for range {} (+{}) received less data from server than requested.",
initial_data_offset, initial_request_length
);
} }
} }
/*
async fn audio_file_fetch(
session: Session,
shared: Arc<AudioFileShared>,
initial_data_rx: ChannelData,
initial_request_sent_time: Instant,
initial_data_length: usize,
impl AudioFileFetchDataReceiver { output: NamedTempFile,
fn finish(&mut self) { stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
if self.request_length > 0 { complete_tx: oneshot::Sender<NamedTempFile>,
let missing_range = Range::new(self.data_offset, self.request_length); ) {
let (file_data_tx, file_data_rx) = unbounded::<ReceivedData>();
let mut download_status = self.shared.download_status.lock().unwrap(); let requested_range = Range::new(0, initial_data_length);
download_status.requested.subtract_range(&missing_range); let mut download_status = shared.download_status.lock().unwrap();
self.shared.cond.notify_all(); download_status.requested.add_range(&requested_range);
}
self.shared session.spawn(audio_file_fetch_receive_data(
.number_of_open_requests shared.clone(),
.fetch_sub(1, atomic::Ordering::SeqCst); file_data_tx.clone(),
} initial_data_rx,
} 0,
initial_data_length,
initial_request_sent_time,
));
impl Future for AudioFileFetchDataReceiver { let mut network_response_times_ms: Vec::new();
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> { let f1 = file_data_rx.map(|x| Ok::<_, ()>(x)).try_for_each(|x| {
loop { match x {
match self.data_rx.poll() { ReceivedData::ResponseTimeMs(response_time_ms) => {
Ok(Async::Ready(Some(data))) => { trace!("Ping time estimated as: {} ms.", response_time_ms);
if self.measure_ping_time {
if let Some(request_sent_time) = self.request_sent_time { // record the response time
let duration = Instant::now() - request_sent_time; network_response_times_ms.push(response_time_ms);
let duration_ms: u64;
if 0.001 * (duration.as_millis() as f64) // prune old response times. Keep at most three.
> MAXIMUM_ASSUMED_PING_TIME_SECONDS while network_response_times_ms.len() > 3 {
{ network_response_times_ms.remove(0);
duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64;
} else {
duration_ms = duration.as_millis() as u64;
}
let _ = self
.file_data_tx
.unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize));
self.measure_ping_time = false;
}
}
let data_size = data.len();
let _ = self
.file_data_tx
.unbounded_send(ReceivedData::Data(PartialFileData {
offset: self.data_offset,
data: data,
}));
self.data_offset += data_size;
if self.request_length < data_size {
warn!("Data receiver for range {} (+{}) received more data from server than requested.", self.initial_data_offset, self.initial_request_length);
self.request_length = 0;
} else {
self.request_length -= data_size;
}
if self.request_length == 0 {
self.finish();
return Ok(Async::Ready(()));
}
} }
Ok(Async::Ready(None)) => {
if self.request_length > 0 { // stats::median is experimental. So we calculate the median of up to three ourselves.
warn!("Data receiver for range {} (+{}) received less data from server than requested.", self.initial_data_offset, self.initial_request_length); let ping_time_ms: usize = match network_response_times_ms.len() {
1 => network_response_times_ms[0] as usize,
2 => {
((network_response_times_ms[0] + network_response_times_ms[1]) / 2) as usize
} }
3 => {
let mut times = network_response_times_ms.clone();
times.sort();
times[1]
}
_ => unreachable!(),
};
// store our new estimate for everyone to see
shared
.ping_time_ms
.store(ping_time_ms, atomic::Ordering::Relaxed);
}
ReceivedData::Data(data) => {
output
.as_mut()
.unwrap()
.seek(SeekFrom::Start(data.offset as u64))
.unwrap();
output
.as_mut()
.unwrap()
.write_all(data.data.as_ref())
.unwrap();
let mut full = false;
{
let mut download_status = shared.download_status.lock().unwrap();
let received_range = Range::new(data.offset, data.data.len());
download_status.downloaded.add_range(&received_range);
shared.cond.notify_all();
if download_status.downloaded.contained_length_from_value(0)
>= shared.file_size
{
full = true;
}
drop(download_status);
}
if full {
self.finish(); self.finish();
return Ok(Async::Ready(())); return future::ready(Err(()));
}
Ok(Async::NotReady) => {
return Ok(Async::NotReady);
}
Err(ChannelError) => {
warn!(
"Error from channel for data receiver for range {} (+{}).",
self.initial_data_offset, self.initial_request_length
);
self.finish();
return Ok(Async::Ready(()));
} }
} }
} }
} future::ready(Ok(()))
} });
let f2 = stream_loader_command_rx.map(Ok::<_, ()>).try_for_each(|x| {
match cmd {
StreamLoaderCommand::Fetch(request) => {
self.download_range(request.start, request.length);
}
StreamLoaderCommand::RandomAccessMode() => {
*(shared.download_strategy.lock().unwrap()) = DownloadStrategy::RandomAccess();
}
StreamLoaderCommand::StreamMode() => {
*(shared.download_strategy.lock().unwrap()) = DownloadStrategy::Streaming();
}
StreamLoaderCommand::Close() => return future::ready(Err(())),
}
Ok(())
});
let f3 = future::poll_fn(|_| {
if let DownloadStrategy::Streaming() = self.get_download_strategy() {
let number_of_open_requests = shared
.number_of_open_requests
.load(atomic::Ordering::SeqCst);
let max_requests_to_send =
MAX_PREFETCH_REQUESTS - min(MAX_PREFETCH_REQUESTS, number_of_open_requests);
if max_requests_to_send > 0 {
let bytes_pending: usize = {
let download_status = shared.download_status.lock().unwrap();
download_status
.requested
.minus(&download_status.downloaded)
.len()
};
let ping_time_seconds =
0.001 * shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
let download_rate = session.channel().get_download_rate_estimate();
let desired_pending_bytes = max(
(PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * shared.stream_data_rate as f64)
as usize,
(FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64)
as usize,
);
if bytes_pending < desired_pending_bytes {
self.pre_fetch_more_data(
desired_pending_bytes - bytes_pending,
max_requests_to_send,
);
}
}
}
Poll::Pending
});
future::select_all(vec![f1, f2, f3]).await
}*/
#[pin_project]
struct AudioFileFetch { struct AudioFileFetch {
session: Session, session: Session,
shared: Arc<AudioFileShared>, shared: Arc<AudioFileShared>,
output: Option<NamedTempFile>, output: Option<NamedTempFile>,
file_data_tx: mpsc::UnboundedSender<ReceivedData>, file_data_tx: mpsc::UnboundedSender<ReceivedData>,
#[pin]
file_data_rx: mpsc::UnboundedReceiver<ReceivedData>, file_data_rx: mpsc::UnboundedReceiver<ReceivedData>,
#[pin]
stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>, stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
complete_tx: Option<oneshot::Sender<NamedTempFile>>, complete_tx: Option<oneshot::Sender<NamedTempFile>>,
network_response_times_ms: Vec<usize>, network_response_times_ms: Vec<usize>,
@ -662,16 +725,14 @@ impl AudioFileFetch {
download_status.requested.add_range(&requested_range); download_status.requested.add_range(&requested_range);
} }
let initial_data_receiver = AudioFileFetchDataReceiver::new( session.spawn(audio_file_fetch_receive_data(
shared.clone(), shared.clone(),
file_data_tx.clone(), file_data_tx.clone(),
initial_data_rx, initial_data_rx,
0, 0,
initial_data_length, initial_data_length,
initial_request_sent_time, initial_request_sent_time,
); ));
session.spawn(move |_| initial_data_receiver);
AudioFileFetch { AudioFileFetch {
session: session, session: session,
@ -701,7 +762,7 @@ impl AudioFileFetch {
return; return;
} }
if length <= 0 { if length == 0 {
return; return;
} }
@ -737,16 +798,14 @@ impl AudioFileFetch {
download_status.requested.add_range(range); download_status.requested.add_range(range);
let receiver = AudioFileFetchDataReceiver::new( self.session.spawn(audio_file_fetch_receive_data(
self.shared.clone(), self.shared.clone(),
self.file_data_tx.clone(), self.file_data_tx.clone(),
data, data,
range.start, range.start,
range.length, range.length,
Instant::now(), Instant::now(),
); ));
self.session.spawn(move |_| receiver);
} }
} }
@ -794,13 +853,13 @@ impl AudioFileFetch {
} }
} }
fn poll_file_data_rx(&mut self) -> Poll<(), ()> {
fn poll_file_data_rx(&mut self, cx: &mut Context<'_>) -> Poll<()> {
loop { loop {
match self.file_data_rx.poll() { match Pin::new(&mut self.file_data_rx).poll_next(cx) {
Ok(Async::Ready(None)) => { Poll::Ready(None) => return Poll::Ready(()),
return Ok(Async::Ready(())); Poll::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms))) => {
}
Ok(Async::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms)))) => {
trace!("Ping time estimated as: {} ms.", response_time_ms); trace!("Ping time estimated as: {} ms.", response_time_ms);
// record the response time // record the response time
@ -821,7 +880,7 @@ impl AudioFileFetch {
} }
3 => { 3 => {
let mut times = self.network_response_times_ms.clone(); let mut times = self.network_response_times_ms.clone();
times.sort(); times.sort_unstable();
times[1] times[1]
} }
_ => unreachable!(), _ => unreachable!(),
@ -832,7 +891,7 @@ impl AudioFileFetch {
.ping_time_ms .ping_time_ms
.store(ping_time_ms, atomic::Ordering::Relaxed); .store(ping_time_ms, atomic::Ordering::Relaxed);
} }
Ok(Async::Ready(Some(ReceivedData::Data(data)))) => { Poll::Ready(Some(ReceivedData::Data(data))) => {
self.output self.output
.as_mut() .as_mut()
.unwrap() .unwrap()
@ -864,39 +923,40 @@ impl AudioFileFetch {
if full { if full {
self.finish(); self.finish();
return Ok(Async::Ready(())); return Poll::Ready(())
} }
} }
Ok(Async::NotReady) => { Poll::Pending => {
return Ok(Async::NotReady); return Poll::Pending
} }
Err(()) => unreachable!(),
} }
} }
} }
fn poll_stream_loader_command_rx(&mut self) -> Poll<(), ()> { fn poll_stream_loader_command_rx(&mut self, cx: &mut Context<'_>) -> Poll<()> {
loop { loop {
match self.stream_loader_command_rx.poll() { match Pin::new(&mut self.stream_loader_command_rx).poll_next(cx) {
Ok(Async::Ready(None)) => { Poll::Ready(None) =>
return Ok(Async::Ready(())); return Poll::Ready(()),
Poll::Ready(Some(cmd)) => {
match cmd {
StreamLoaderCommand::Fetch(request) => {
self.download_range(request.start, request.length);
}
StreamLoaderCommand::RandomAccessMode() => {
*(self.shared.download_strategy.lock().unwrap()) =
DownloadStrategy::RandomAccess();
}
StreamLoaderCommand::StreamMode() => {
*(self.shared.download_strategy.lock().unwrap()) =
DownloadStrategy::Streaming();
}
StreamLoaderCommand::Close() => return Poll::Ready(())
}
} }
Ok(Async::Ready(Some(StreamLoaderCommand::Fetch(request)))) => { Poll::Pending => return Poll::Pending
self.download_range(request.start, request.length);
}
Ok(Async::Ready(Some(StreamLoaderCommand::RandomAccessMode()))) => {
*(self.shared.download_strategy.lock().unwrap()) =
DownloadStrategy::RandomAccess();
}
Ok(Async::Ready(Some(StreamLoaderCommand::StreamMode()))) => {
*(self.shared.download_strategy.lock().unwrap()) =
DownloadStrategy::Streaming();
}
Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => {
return Ok(Async::Ready(()));
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(()) => unreachable!(),
} }
} }
} }
@ -909,26 +969,16 @@ impl AudioFileFetch {
let _ = complete_tx.send(output); let _ = complete_tx.send(output);
} }
} }
impl Future for AudioFileFetch { impl Future for AudioFileFetch {
type Item = (); type Output = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
match self.poll_stream_loader_command_rx() { if let Poll::Ready(()) = self.poll_stream_loader_command_rx(cx) {
Ok(Async::NotReady) => (), return Poll::Ready(())
Ok(Async::Ready(_)) => {
return Ok(Async::Ready(()));
}
Err(()) => unreachable!(),
} }
match self.poll_file_data_rx() { if let Poll::Ready(()) = self.poll_file_data_rx(cx) {
Ok(Async::NotReady) => (), return Poll::Ready(())
Ok(Async::Ready(_)) => {
return Ok(Async::Ready(()));
}
Err(()) => unreachable!(),
} }
if let DownloadStrategy::Streaming() = self.get_download_strategy() { if let DownloadStrategy::Streaming() = self.get_download_strategy() {
@ -968,8 +1018,7 @@ impl Future for AudioFileFetch {
} }
} }
} }
Poll::Pending
return Ok(Async::NotReady);
} }
} }
@ -1009,9 +1058,9 @@ impl Read for AudioFileStreaming {
ranges_to_request.subtract_range_set(&download_status.downloaded); ranges_to_request.subtract_range_set(&download_status.downloaded);
ranges_to_request.subtract_range_set(&download_status.requested); ranges_to_request.subtract_range_set(&download_status.requested);
for range in ranges_to_request.iter() { for &range in ranges_to_request.iter() {
self.stream_loader_command_tx self.stream_loader_command_tx
.unbounded_send(StreamLoaderCommand::Fetch(range.clone())) .unbounded_send(StreamLoaderCommand::Fetch(range))
.unwrap(); .unwrap();
} }
@ -1058,7 +1107,7 @@ impl Read for AudioFileStreaming {
.read_position .read_position
.store(self.position as usize, atomic::Ordering::Relaxed); .store(self.position as usize, atomic::Ordering::Relaxed);
return Ok(read_len); Ok(read_len)
} }
} }

View file

@ -1,12 +1,13 @@
#[macro_use] #[macro_use]
extern crate futures;
#[macro_use]
extern crate log; extern crate log;
#[macro_use]
extern crate pin_project;
extern crate aes_ctr; extern crate aes_ctr;
extern crate bit_set; extern crate bit_set;
extern crate byteorder; extern crate byteorder;
extern crate bytes; extern crate bytes;
extern crate futures;
extern crate num_bigint; extern crate num_bigint;
extern crate num_traits; extern crate num_traits;
extern crate tempfile; extern crate tempfile;
@ -24,7 +25,7 @@ mod libvorbis_decoder;
mod range_set; mod range_set;
pub use decrypt::AudioDecrypt; pub use decrypt::AudioDecrypt;
pub use fetch::{AudioFile, AudioFileOpen, StreamLoaderController}; pub use fetch::{AudioFile, StreamLoaderController};
pub use fetch::{ pub use fetch::{
READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_BEFORE_PLAYBACK_SECONDS, READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_BEFORE_PLAYBACK_SECONDS,
READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS, READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS,