From 4d712efb48e531a469f5eaeca60280ea0319a004 Mon Sep 17 00:00:00 2001 From: Paul Lietar Date: Sun, 13 Mar 2016 20:03:40 +0000 Subject: [PATCH] Support downloading Album Covers. --- src/album_cover.rs | 27 ++++++++++++ src/audio_file.rs | 2 +- src/lib.in.rs | 5 ++- src/session.rs | 12 ++++-- src/stream.rs | 105 ++++++++++++++++++++++++--------------------- 5 files changed, 97 insertions(+), 54 deletions(-) create mode 100644 src/album_cover.rs diff --git a/src/album_cover.rs b/src/album_cover.rs new file mode 100644 index 00000000..9b5c1b9f --- /dev/null +++ b/src/album_cover.rs @@ -0,0 +1,27 @@ +use eventual; +use std::io::Write; +use byteorder::{WriteBytesExt, BigEndian}; + +use session::Session; +use util::FileId; +use stream::StreamEvent; + +pub fn get_album_cover(session: &Session, file_id: FileId) + -> eventual::Future, ()> { + + let (channel_id, rx) = session.allocate_stream(); + + let mut req: Vec = Vec::new(); + req.write_u16::(channel_id).unwrap(); + req.write_u16::(0).unwrap(); + req.write(&file_id.0).unwrap(); + session.send_packet(0x19, &req).unwrap(); + + rx.map_err(|_| ()) + .reduce(Vec::new(), |mut current, event| { + if let StreamEvent::Data(data) = event { + current.extend_from_slice(&data) + } + current + }) +} diff --git a/src/audio_file.rs b/src/audio_file.rs index 622af077..bd2ed395 100644 --- a/src/audio_file.rs +++ b/src/audio_file.rs @@ -40,7 +40,7 @@ struct AudioFileShared { impl AudioFileLoading { fn new(session: &Session, file_id: FileId) -> AudioFileLoading { let size = session.stream(file_id, 0, 1) - .into_iter() + .iter() .filter_map(|event| { match event { StreamEvent::Header(id, ref data) if id == 0x3 => { diff --git a/src/lib.in.rs b/src/lib.in.rs index 853bbf0e..c1fa32b4 100644 --- a/src/lib.in.rs +++ b/src/lib.in.rs @@ -1,4 +1,5 @@ #[macro_use] pub mod util; +mod album_cover; mod audio_decrypt; mod audio_file; mod audio_key; @@ -13,6 +14,8 @@ pub mod player; pub mod session; pub mod spirc; pub mod link; -mod stream; +pub mod stream; pub mod apresolve; mod zeroconf; + +pub use album_cover::get_album_cover; diff --git a/src/session.rs b/src/session.rs index 7c8a4667..0f07b48d 100644 --- a/src/session.rs +++ b/src/session.rs @@ -2,6 +2,7 @@ use crypto::digest::Digest; use crypto::sha1::Sha1; use crypto::hmac::Hmac; use crypto::mac::Mac; +use eventual; use eventual::Future; use protobuf::{self, Message}; use rand::thread_rng; @@ -20,10 +21,9 @@ use diffie_hellman::DHLocalKeys; use mercury::{MercuryManager, MercuryRequest, MercuryResponse}; use metadata::{MetadataManager, MetadataRef, MetadataTrait}; use protocol; -use stream::{StreamManager, StreamEvent}; +use stream::{ChannelId, StreamManager, StreamEvent, StreamError}; use util::{self, SpotifyId, FileId, mkdir_existing}; - pub enum Bitrate { Bitrate96, Bitrate160, @@ -249,7 +249,7 @@ impl Session { match cmd { 0x4 => self.send_packet(0x49, &data).unwrap(), 0x4a => (), - 0x9 => self.0.stream.lock().unwrap().handle(cmd, data), + 0x9 | 0xa => self.0.stream.lock().unwrap().handle(cmd, data), 0xd | 0xe => self.0.audio_key.lock().unwrap().handle(cmd, data), 0x1b => { self.0.data.write().unwrap().country = String::from_utf8(data).unwrap(); @@ -275,10 +275,14 @@ impl Session { self.0.audio_file.lock().unwrap().request(self, file) } - pub fn stream(&self, file: FileId, offset: u32, size: u32) -> mpsc::Receiver { + pub fn stream(&self, file: FileId, offset: u32, size: u32) -> eventual::Stream { self.0.stream.lock().unwrap().request(self, file, offset, size) } + pub fn allocate_stream(&self) -> (ChannelId, eventual::Stream) { + self.0.stream.lock().unwrap().allocate_stream() + } + pub fn metadata(&self, id: SpotifyId) -> MetadataRef { self.0.metadata.lock().unwrap().get(self, id) } diff --git a/src/stream.rs b/src/stream.rs index 0e45965a..946aa3f8 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,7 +1,8 @@ use byteorder::{BigEndian, ByteOrder, ReadBytesExt, WriteBytesExt}; use std::collections::HashMap; +use std::collections::hash_map::Entry; use std::io::{Cursor, Seek, SeekFrom, Write}; -use std::sync::mpsc; +use eventual::{self, Async}; use util::{ArcVec, FileId}; use connection::PacketHandler; @@ -13,7 +14,10 @@ pub enum StreamEvent { Data(ArcVec), } -type ChannelId = u16; +#[derive(Debug,Hash,PartialEq,Eq,Copy,Clone)] +pub struct StreamError; + +pub type ChannelId = u16; enum ChannelMode { Header, @@ -22,7 +26,7 @@ enum ChannelMode { struct Channel { mode: ChannelMode, - callback: mpsc::Sender, + callback: Option>, } pub struct StreamManager { @@ -38,17 +42,29 @@ impl StreamManager { } } + pub fn allocate_stream(&mut self) -> (ChannelId, eventual::Stream) { + let (tx, rx) = eventual::Stream::pair(); + + let channel_id = self.next_id; + self.next_id += 1; + + self.channels.insert(channel_id, + Channel { + mode: ChannelMode::Header, + callback: Some(tx), + }); + + (channel_id, rx) + } + pub fn request(&mut self, session: &Session, file: FileId, offset: u32, size: u32) - -> mpsc::Receiver { + -> eventual::Stream { - let (tx, rx) = mpsc::channel(); - - let channel_id = self.next_id; - self.next_id += 1; + let (channel_id, rx) = self.allocate_stream(); let mut data: Vec = Vec::new(); data.write_u16::(channel_id).unwrap(); @@ -64,76 +80,69 @@ impl StreamManager { session.send_packet(0x8, &data).unwrap(); - self.channels.insert(channel_id, - Channel { - mode: ChannelMode::Header, - callback: tx, - }); - rx } } -impl PacketHandler for StreamManager { - fn handle(&mut self, _cmd: u8, data: Vec) { +impl Channel { + fn handle_packet(&mut self, cmd: u8, data: Vec) { let data = ArcVec::new(data); let mut packet = Cursor::new(&data as &[u8]); + packet.read_u16::().unwrap(); // Skip channel id - let id: ChannelId = packet.read_u16::().unwrap(); - let mut close = false; - { - let channel = match self.channels.get_mut(&id) { - Some(ch) => ch, - None => { - return; - } - }; - - match channel.mode { + if cmd == 0xa { + self.callback.take().map(|c| c.fail(StreamError)); + } else { + match self.mode { ChannelMode::Header => { let mut length = 0; - while packet.position() < data.len() as u64 && !close { + while packet.position() < data.len() as u64 { length = packet.read_u16::().unwrap(); if length > 0 { let header_id = packet.read_u8().unwrap(); - channel.callback - .send(StreamEvent::Header( - header_id, - data.clone() - .offset(packet.position() as usize) - .limit(length as usize - 1) - )) - .unwrap_or_else(|_| { - close = true; - }); + let header_data = data.clone() + .offset(packet.position() as usize) + .limit(length as usize - 1); + + let header = StreamEvent::Header(header_id, header_data); + + self.callback = self.callback.take().and_then(|c| c.send(header).await().ok()); packet.seek(SeekFrom::Current(length as i64 - 1)).unwrap(); } } if length == 0 { - channel.mode = ChannelMode::Data; + self.mode = ChannelMode::Data; } } ChannelMode::Data => { if packet.position() < data.len() as u64 { - channel.callback - .send(StreamEvent::Data(data.clone() - .offset(packet.position() as usize))) - .unwrap_or_else(|_| { - close = true; - }); + let event_data = data.clone().offset(packet.position() as usize); + let event = StreamEvent::Data(event_data); + + self.callback = self.callback.take().and_then(|c| c.send(event).await().ok()); } else { - close = true; + self.callback = None; } } } } + } +} - if close { - self.channels.remove(&id); +impl PacketHandler for StreamManager { + fn handle(&mut self, cmd: u8, data: Vec) { + let id: ChannelId = BigEndian::read_u16(&data[0..2]); + + if let Entry::Occupied(mut entry) = self.channels.entry(id) { + entry.get_mut().handle_packet(cmd, data); + + if entry.get().callback.is_none() { + entry.remove(); + } } } }