diff --git a/Cargo.lock b/Cargo.lock index 55bc1f7e..a8577d19 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -180,21 +180,6 @@ dependencies = [ "shlex", ] -[[package]] -name = "bit-set" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e11e16035ea35e4e5997b393eacbf6f63983188f7a2ad25bfb13465f5ad59de" -dependencies = [ - "bit-vec", -] - -[[package]] -name = "bit-vec" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "349f9b6a179ed607305526ca489b34ad0a41aed5f7980fa90eb03160b69598fb" - [[package]] name = "bitflags" version = "1.2.1" @@ -1399,20 +1384,17 @@ name = "librespot-audio" version = "0.1.6" dependencies = [ "aes-ctr", - "bit-set", "byteorder", "bytes", "cfg-if 1.0.0", - "futures", + "futures-util", "lewton", "librespot-core", "librespot-tremor", "log", - "num-bigint", - "num-traits", "ogg", - "pin-project-lite", "tempfile", + "tokio", "vorbis", ] diff --git a/audio/Cargo.toml b/audio/Cargo.toml index 01d81f04..d8c0eea2 100644 --- a/audio/Cargo.toml +++ b/audio/Cargo.toml @@ -12,17 +12,14 @@ version = "0.1.6" [dependencies] aes-ctr = "0.6" -bit-set = "0.5" byteorder = "1.4" bytes = "1.0" cfg-if = "1" -futures = "0.3" log = "0.4" -num-bigint = "0.3" -num-traits = "0.2" +futures-util = { version = "0.3", default_features = false } ogg = "0.8" -pin-project-lite = "0.2.4" tempfile = "3.1" +tokio = { version = "1", features = ["sync"] } lewton = { version = "0.10", optional = true } librespot-tremor = { version = "0.2.0", optional = true } diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index 286a2b88..0ec9b01d 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -1,30 +1,23 @@ -use crate::range_set::{Range, RangeSet}; +use std::cmp::{max, min}; +use std::fs; +use std::future::Future; +use std::io::{self, Read, Seek, SeekFrom, Write}; +use std::pin::Pin; +use std::sync::atomic::{self, AtomicUsize}; +use std::sync::{Arc, Condvar, Mutex}; +use std::task::{Context, Poll}; +use std::time::{Duration, Instant}; + use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use bytes::Bytes; -use futures::{ - channel::{mpsc, oneshot}, - future, -}; -use futures::{Future, Stream, StreamExt, TryFutureExt, TryStreamExt}; - -use std::fs; -use std::io::{self, Read, Seek, SeekFrom, Write}; -use std::sync::{Arc, Condvar, Mutex}; -use std::task::Poll; -use std::time::{Duration, Instant}; -use std::{ - cmp::{max, min}, - pin::Pin, - task::Context, -}; -use tempfile::NamedTempFile; - -use futures::channel::mpsc::unbounded; +use futures_util::{future, StreamExt, TryFutureExt, TryStreamExt}; use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders}; use librespot_core::session::Session; use librespot_core::spotify_id::FileId; -use std::sync::atomic; -use std::sync::atomic::AtomicUsize; +use tempfile::NamedTempFile; +use tokio::sync::{mpsc, oneshot}; + +use crate::range_set::{Range, RangeSet}; const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16; // The minimum size of a block that is requested from the Spotify servers in one request. @@ -96,6 +89,7 @@ pub enum AudioFile { Streaming(AudioFileStreaming), } +#[derive(Debug)] enum StreamLoaderCommand { Fetch(Range), // signal the stream loader to fetch a range of the file RandomAccessMode(), // optimise download strategy for random access @@ -147,7 +141,7 @@ impl StreamLoaderController { fn send_stream_loader_command(&mut self, command: StreamLoaderCommand) { if let Some(ref mut channel) = self.channel_tx { // ignore the error in case the channel has been closed already. - let _ = channel.unbounded_send(command); + let _ = channel.send(command); } } @@ -191,7 +185,7 @@ impl StreamLoaderController { // We can't use self.fetch here because self can't be borrowed mutably, so we access the channel directly. if let Some(ref mut channel) = self.channel_tx { // ignore the error in case the channel has been closed already. - let _ = channel.unbounded_send(StreamLoaderCommand::Fetch(range)); + let _ = channel.send(StreamLoaderCommand::Fetch(range)); } } } @@ -387,7 +381,7 @@ impl AudioFileStreaming { //let (seek_tx, seek_rx) = mpsc::unbounded(); let (stream_loader_command_tx, stream_loader_command_rx) = - mpsc::unbounded::(); + mpsc::unbounded_channel::(); let fetcher = AudioFileFetch::new( session.clone(), @@ -490,12 +484,12 @@ async fn audio_file_fetch_receive_data( duration_ms = duration.as_millis() as u64; } let _ = file_data_tx - .unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize)); + .send(ReceivedData::ResponseTimeMs(duration_ms as usize)); measure_ping_time = false; } let data_size = data.len(); let _ = file_data_tx - .unbounded_send(ReceivedData::Data(PartialFileData { + .send(ReceivedData::Data(PartialFileData { offset: data_offset, data: data, })); @@ -696,21 +690,17 @@ async fn audio_file_fetch( future::select_all(vec![f1, f2, f3]).await }*/ -pin_project! { - struct AudioFileFetch { - session: Session, - shared: Arc, - output: Option, +struct AudioFileFetch { + session: Session, + shared: Arc, + output: Option, - file_data_tx: mpsc::UnboundedSender, - #[pin] - file_data_rx: mpsc::UnboundedReceiver, + file_data_tx: mpsc::UnboundedSender, + file_data_rx: mpsc::UnboundedReceiver, - #[pin] - stream_loader_command_rx: mpsc::UnboundedReceiver, - complete_tx: Option>, - network_response_times_ms: Vec, - } + stream_loader_command_rx: mpsc::UnboundedReceiver, + complete_tx: Option>, + network_response_times_ms: Vec, } impl AudioFileFetch { @@ -725,7 +715,7 @@ impl AudioFileFetch { stream_loader_command_rx: mpsc::UnboundedReceiver, complete_tx: oneshot::Sender, ) -> AudioFileFetch { - let (file_data_tx, file_data_rx) = unbounded::(); + let (file_data_tx, file_data_rx) = mpsc::unbounded_channel::(); { let requested_range = Range::new(0, initial_data_length); @@ -863,7 +853,7 @@ impl AudioFileFetch { fn poll_file_data_rx(&mut self, cx: &mut Context<'_>) -> Poll<()> { loop { - match Pin::new(&mut self.file_data_rx).poll_next(cx) { + match self.file_data_rx.poll_recv(cx) { Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms))) => { trace!("Ping time estimated as: {} ms.", response_time_ms); @@ -939,7 +929,7 @@ impl AudioFileFetch { fn poll_stream_loader_command_rx(&mut self, cx: &mut Context<'_>) -> Poll<()> { loop { - match Pin::new(&mut self.stream_loader_command_rx).poll_next(cx) { + match self.stream_loader_command_rx.poll_recv(cx) { Poll::Ready(None) => return Poll::Ready(()), Poll::Ready(Some(cmd)) => match cmd { StreamLoaderCommand::Fetch(request) => { @@ -1059,7 +1049,7 @@ impl Read for AudioFileStreaming { for &range in ranges_to_request.iter() { self.stream_loader_command_tx - .unbounded_send(StreamLoaderCommand::Fetch(range)) + .send(StreamLoaderCommand::Fetch(range)) .unwrap(); } diff --git a/audio/src/lib.rs b/audio/src/lib.rs index 9bb6f8e4..099fb4a8 100644 --- a/audio/src/lib.rs +++ b/audio/src/lib.rs @@ -2,8 +2,6 @@ #[macro_use] extern crate log; -#[macro_use] -extern crate pin_project_lite; mod decrypt; mod fetch; diff --git a/audio/src/range_set.rs b/audio/src/range_set.rs index d01d888e..31ce6500 100644 --- a/audio/src/range_set.rs +++ b/audio/src/range_set.rs @@ -2,7 +2,7 @@ use std::cmp::{max, min}; use std::fmt; use std::slice::Iter; -#[derive(Copy, Clone)] +#[derive(Copy, Clone, Debug)] pub struct Range { pub start: usize, pub length: usize,