diff --git a/src/audio_file.rs b/src/audio_file.rs index 395bf568..369d5ca4 100644 --- a/src/audio_file.rs +++ b/src/audio_file.rs @@ -132,9 +132,12 @@ impl AudioFileManager { let cache = self.session().cache().cloned(); if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) { + debug!("File {} already in cache", file_id); return AudioFileOpen::Cached(future::ok(file)); } + debug!("Downloading file {}", file_id); + let (complete_tx, complete_rx) = oneshot::channel(); let (headers, data) = request_chunk(&self.session(), file_id, 0).split(); @@ -153,6 +156,9 @@ impl AudioFileManager { complete_rx.map(move |mut file| { if let Some(cache) = session.cache() { cache.save_file(file_id, &mut file); + debug!("File {} complete, saving to cache", file_id); + } else { + debug!("File {} complete", file_id); } }).or_else(|oneshot::Canceled| Ok(())) }); @@ -275,7 +281,7 @@ impl Future for AudioFileFetch { progress = true; self.output.as_mut().unwrap() - .write_all(&data).unwrap(); + .write_all(data.as_ref()).unwrap(); } Ok(Async::Ready(None)) => { progress = true; diff --git a/src/audio_key.rs b/src/audio_key.rs index 7875a259..2d6a21ee 100644 --- a/src/audio_key.rs +++ b/src/audio_key.rs @@ -3,6 +3,7 @@ use futures::sync::oneshot; use futures::{Async, Future, Poll}; use std::collections::HashMap; use std::io::Write; +use tokio_core::io::EasyBuf; use util::SeqGenerator; use util::{SpotifyId, FileId}; @@ -21,8 +22,8 @@ component! { } impl AudioKeyManager { - pub fn dispatch(&self, cmd: u8, data: Vec) { - let seq = BigEndian::read_u32(&data[..4]); + pub fn dispatch(&self, cmd: u8, mut data: EasyBuf) { + let seq = BigEndian::read_u32(data.drain_to(4).as_ref()); let sender = self.lock(|inner| inner.pending.remove(&seq)); @@ -30,11 +31,11 @@ impl AudioKeyManager { match cmd { 0xd => { let mut key = [0u8; 16]; - key.copy_from_slice(&data[4..20]); + key.copy_from_slice(data.as_ref()); sender.complete(Ok(AudioKey(key))); } 0xe => { - warn!("error audio key {:x} {:x}", data[4], data[5]); + warn!("error audio key {:x} {:x}", data.as_ref()[0], data.as_ref()[1]); sender.complete(Err(AudioKeyError)); } _ => (), diff --git a/src/channel.rs b/src/channel.rs index d3913efa..30a9e00e 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -9,7 +9,7 @@ use util::SeqGenerator; component! { ChannelManager : ChannelManagerInner { sequence: SeqGenerator = SeqGenerator::new(0), - channels: HashMap)>> = HashMap::new(), + channels: HashMap> = HashMap::new(), } } @@ -17,7 +17,7 @@ component! { pub struct ChannelError; pub struct Channel { - receiver: mpsc::UnboundedReceiver<(u8, Vec)>, + receiver: mpsc::UnboundedReceiver<(u8, EasyBuf)>, state: ChannelState, } @@ -26,7 +26,7 @@ pub struct ChannelData(BiLock); pub enum ChannelEvent { Header(u8, Vec), - Data(Vec), + Data(EasyBuf), } #[derive(Clone)] @@ -54,21 +54,21 @@ impl ChannelManager { (seq, channel) } - pub fn dispatch(&self, cmd: u8, data: Vec) { + pub fn dispatch(&self, cmd: u8, mut data: EasyBuf) { use std::collections::hash_map::Entry; - let id: u16 = BigEndian::read_u16(&data[..2]); + let id: u16 = BigEndian::read_u16(data.drain_to(2).as_ref()); self.lock(|inner| { if let Entry::Occupied(entry) = inner.channels.entry(id) { - let _ = entry.get().send((cmd, data[2..].to_owned())); + let _ = entry.get().send((cmd, data)); } }); } } impl Channel { - fn recv_packet(&mut self) -> Poll, ChannelError> { + fn recv_packet(&mut self) -> Poll { let (cmd, packet) = match self.receiver.poll() { Ok(Async::Ready(t)) => t.expect("channel closed"), Ok(Async::NotReady) => return Ok(Async::NotReady), @@ -76,7 +76,7 @@ impl Channel { }; if cmd == 0xa { - let code = BigEndian::read_u16(&packet[..2]); + let code = BigEndian::read_u16(&packet.as_ref()[..2]); error!("channel error: {} {}", packet.len(), code); self.state = ChannelState::Closed; @@ -104,7 +104,7 @@ impl Stream for Channel { ChannelState::Closed => panic!("Polling already terminated channel"), ChannelState::Header(mut data) => { if data.len() == 0 { - data = EasyBuf::from(try_ready!(self.recv_packet())); + data = try_ready!(self.recv_packet()); } let length = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize; @@ -117,18 +117,20 @@ impl Stream for Channel { self.state = ChannelState::Header(data); - return Ok(Async::Ready(Some(ChannelEvent::Header(header_id, header_data)))); + let event = ChannelEvent::Header(header_id, header_data); + return Ok(Async::Ready(Some(event))); } } ChannelState::Data => { let data = try_ready!(self.recv_packet()); - if data.is_empty() { + if data.len() == 0 { self.receiver.close(); self.state = ChannelState::Closed; return Ok(Async::Ready(None)); } else { - return Ok(Async::Ready(Some(ChannelEvent::Data(data)))); + let event = ChannelEvent::Data(data); + return Ok(Async::Ready(Some(event))); } } } @@ -137,7 +139,7 @@ impl Stream for Channel { } impl Stream for ChannelData { - type Item = Vec; + type Item = EasyBuf; type Error = ChannelError; fn poll(&mut self) -> Poll, Self::Error> { diff --git a/src/connection/mod.rs b/src/connection/mod.rs index e88a66fc..8c28e0be 100644 --- a/src/connection/mod.rs +++ b/src/connection/mod.rs @@ -27,7 +27,9 @@ pub fn connect(addr: A, handle: &Handle) -> BoxFuture BoxFuture<(Transport, Credentials), io::Error> { +pub fn authenticate(transport: Transport, credentials: Credentials, device_id: String) + -> BoxFuture<(Transport, Credentials), io::Error> +{ use protocol::authentication::{APWelcome, ClientResponseEncrypted, CpuFamily, Os}; let packet = protobuf_init!(ClientResponseEncrypted::new(), { diff --git a/src/mercury/mod.rs b/src/mercury/mod.rs index cf014692..5db7f925 100644 --- a/src/mercury/mod.rs +++ b/src/mercury/mod.rs @@ -1,11 +1,11 @@ -use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; +use byteorder::{BigEndian, ByteOrder}; use futures::sync::{oneshot, mpsc}; use futures::{Async, Poll, BoxFuture, Future}; use protobuf; use protocol; use std::collections::HashMap; -use std::io::Read; use std::mem; +use tokio_core::io::EasyBuf; use util::SeqGenerator; @@ -125,16 +125,12 @@ impl MercuryManager { }).boxed() } - pub fn dispatch(&self, cmd: u8, data: Vec) { - let mut packet = ::std::io::Cursor::new(data); - let seq = { - let len = packet.read_u16::().unwrap() as usize; - let mut seq = vec![0; len]; - packet.read_exact(&mut seq).unwrap(); - seq - }; - let flags = packet.read_u8().unwrap(); - let count = packet.read_u16::().unwrap() as usize; + pub fn dispatch(&self, cmd: u8, mut data: EasyBuf) { + let seq_len = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize; + let seq = data.drain_to(seq_len).as_ref().to_owned(); + + let flags = data.drain_to(1).as_ref()[0]; + let count = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize; let pending = self.lock(|inner| inner.pending.remove(&seq)); @@ -154,10 +150,10 @@ impl MercuryManager { }; for i in 0..count { - let mut part = Self::parse_part(&mut packet); - if let Some(mut data) = mem::replace(&mut pending.partial, None) { - data.append(&mut part); - part = data; + let mut part = Self::parse_part(&mut data); + if let Some(mut partial) = mem::replace(&mut pending.partial, None) { + partial.extend_from_slice(&part); + part = partial; } if i == count - 1 && (flags == 2) { @@ -174,12 +170,9 @@ impl MercuryManager { } } - fn parse_part(s: &mut T) -> Vec { - let size = s.read_u16::().unwrap() as usize; - let mut buffer = vec![0; size]; - s.read_exact(&mut buffer).unwrap(); - - buffer + fn parse_part(data: &mut EasyBuf) -> Vec { + let size = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize; + data.drain_to(size).as_ref().to_owned() } fn complete_request(&self, cmd: u8, mut pending: MercuryPending) { diff --git a/src/session.rs b/src/session.rs index 8994217c..e4ed375a 100644 --- a/src/session.rs +++ b/src/session.rs @@ -7,6 +7,7 @@ use std::io; use std::result::Result; use std::str::FromStr; use std::sync::{RwLock, Arc, Weak}; +use tokio_core::io::EasyBuf; use tokio_core::reactor::{Handle, Remote}; use apresolve::apresolve_or_fallback; @@ -121,7 +122,6 @@ impl Session { config: Config, cache: Option, username: String) -> (Session, BoxFuture<(), io::Error>) { - let transport = transport.map(|(cmd, data)| (cmd, data.as_ref().to_owned())); let (sink, stream) = transport.split(); let (sender_tx, sender_rx) = mpsc::unbounded(); @@ -193,12 +193,13 @@ impl Session { } #[cfg_attr(feature = "cargo-clippy", allow(match_same_arms))] - fn dispatch(&self, cmd: u8, data: Vec) { + fn dispatch(&self, cmd: u8, data: EasyBuf) { match cmd { - 0x4 => self.send_packet(0x49, data), + 0x4 => self.send_packet(0x49, data.as_ref().to_owned()), 0x4a => (), 0x1b => { - self.0.data.write().unwrap().country = String::from_utf8(data).unwrap(); + let country = String::from_utf8(data.as_ref().to_owned()).unwrap(); + self.0.data.write().unwrap().country = country; } 0x9 | 0xa => self.channel().dispatch(cmd, data), diff --git a/src/util/arcvec.rs b/src/util/arcvec.rs deleted file mode 100644 index 7313997d..00000000 --- a/src/util/arcvec.rs +++ /dev/null @@ -1,51 +0,0 @@ -use std::sync::Arc; -use std::fmt; -use std::ops::Deref; - -#[derive(Clone)] -pub struct ArcVec { - data: Arc>, - offset: usize, - length: usize, -} - -impl ArcVec { - pub fn new(data: Vec) -> ArcVec { - let length = data.len(); - ArcVec { - data: Arc::new(data), - offset: 0, - length: length, - } - } - - pub fn offset(mut self, offset: usize) -> ArcVec { - assert!(offset <= self.length); - - self.offset += offset; - self.length -= offset; - - self - } - - pub fn limit(mut self, length: usize) -> ArcVec { - assert!(length <= self.length); - self.length = length; - - self - } -} - -impl Deref for ArcVec { - type Target = [T]; - - fn deref(&self) -> &[T] { - &self.data[self.offset..self.offset + self.length] - } -} - -impl fmt::Debug for ArcVec { - fn fmt(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> { - self.deref().fmt(formatter) - } -} diff --git a/src/util/mod.rs b/src/util/mod.rs index 4bf45e8c..01a3a506 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -12,12 +12,10 @@ use std::time::{UNIX_EPOCH, SystemTime}; mod int128; mod spotify_id; -mod arcvec; mod subfile; pub use util::int128::u128; pub use util::spotify_id::{SpotifyId, FileId}; -pub use util::arcvec::ArcVec; pub use util::subfile::Subfile; pub fn rand_vec(rng: &mut G, size: usize) -> Vec { diff --git a/src/util/spotify_id.rs b/src/util/spotify_id.rs index d45447a4..dc22a972 100644 --- a/src/util/spotify_id.rs +++ b/src/util/spotify_id.rs @@ -1,4 +1,5 @@ use std; +use std::fmt; use util::u128; use byteorder::{BigEndian, ByteOrder}; use std::ascii::AsciiExt; @@ -6,9 +7,6 @@ use std::ascii::AsciiExt; #[derive(Debug,Copy,Clone,PartialEq,Eq,Hash)] pub struct SpotifyId(u128); -#[derive(Debug,Copy,Clone,PartialEq,Eq,Hash)] -pub struct FileId(pub [u8; 20]); - const BASE62_DIGITS: &'static [u8] = b"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"; const BASE16_DIGITS: &'static [u8] = b"0123456789abcdef"; @@ -79,6 +77,9 @@ impl SpotifyId { } } +#[derive(Copy,Clone,PartialEq,Eq,PartialOrd,Ord,Hash)] +pub struct FileId(pub [u8; 20]); + impl FileId { pub fn to_base16(&self) -> String { self.0 @@ -88,3 +89,15 @@ impl FileId { .concat() } } + +impl fmt::Debug for FileId { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_tuple("FileId").field(&self.to_base16()).finish() + } +} + +impl fmt::Display for FileId { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str(&self.to_base16()) + } +}