diff --git a/src/audio_key.rs b/src/audio_key.rs index 7fe4b779..98f52243 100644 --- a/src/audio_key.rs +++ b/src/audio_key.rs @@ -12,7 +12,7 @@ pub struct AudioKey(pub [u8; 16]); #[derive(Debug,Hash,PartialEq,Eq,Copy,Clone)] pub struct AudioKeyError; -type Result = ::std::result::Result; +pub type Result = ::std::result::Result; component! { AudioKeyManager : AudioKeyManagerInner { diff --git a/src/lib.in.rs b/src/lib.in.rs index 57686bb6..1799a257 100644 --- a/src/lib.in.rs +++ b/src/lib.in.rs @@ -1,5 +1,4 @@ pub mod apresolve; pub mod authentication; pub mod connection; -pub mod mercury; pub mod spirc; diff --git a/src/lib.rs b/src/lib.rs index aa9821cc..73f9123f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,6 +59,7 @@ pub mod audio_key; pub mod cache; pub mod diffie_hellman; pub mod link; +pub mod mercury; pub mod metadata; pub mod player; pub mod session; diff --git a/src/mercury.rs b/src/mercury.rs deleted file mode 100644 index 9923fd6c..00000000 --- a/src/mercury.rs +++ /dev/null @@ -1,233 +0,0 @@ -use byteorder::{BigEndian, ByteOrder, ReadBytesExt, WriteBytesExt}; -use eventual; -use protobuf::{self, Message}; -use std::collections::HashMap; -use std::io::{Cursor, Read, Write}; -use std::mem::replace; -use std::sync::mpsc; - -use protocol; -use session::{Session, PacketHandler}; - -#[derive(Debug, PartialEq, Eq)] -pub enum MercuryMethod { - GET, - SUB, - UNSUB, - SEND, -} - -pub struct MercuryRequest { - pub method: MercuryMethod, - pub uri: String, - pub content_type: Option, - pub payload: Vec>, -} - -#[derive(Debug)] -pub struct MercuryResponse { - pub uri: String, - pub payload: Vec>, -} - -enum MercuryCallback { - Future(eventual::Complete), - Subscription(mpsc::Sender), - Channel, -} - -pub struct MercuryPending { - parts: Vec>, - partial: Option>, - callback: MercuryCallback, -} - -pub struct MercuryManager { - next_seq: u32, - pending: HashMap, MercuryPending>, - subscriptions: HashMap>, -} - -impl ToString for MercuryMethod { - fn to_string(&self) -> String { - match *self { - MercuryMethod::GET => "GET", - MercuryMethod::SUB => "SUB", - MercuryMethod::UNSUB => "UNSUB", - MercuryMethod::SEND => "SEND", - } - .to_owned() - } -} - -impl MercuryManager { - pub fn new() -> MercuryManager { - MercuryManager { - next_seq: 0, - pending: HashMap::new(), - subscriptions: HashMap::new(), - } - } - - fn request_with_callback(&mut self, - session: &Session, - req: MercuryRequest, - cb: MercuryCallback) { - let mut seq = [0u8; 4]; - BigEndian::write_u32(&mut seq, self.next_seq); - self.next_seq += 1; - let data = self.encode_request(&seq, &req); - - let cmd = match req.method { - MercuryMethod::SUB => 0xb3, - MercuryMethod::UNSUB => 0xb4, - _ => 0xb2, - }; - - session.send_packet(cmd, data); - - self.pending.insert(seq.to_vec(), - MercuryPending { - parts: Vec::new(), - partial: None, - callback: cb, - }); - } - - pub fn request(&mut self, - session: &Session, - req: MercuryRequest) - -> eventual::Future { - let (tx, rx) = eventual::Future::pair(); - self.request_with_callback(session, req, MercuryCallback::Future(tx)); - rx - } - - pub fn subscribe(&mut self, session: &Session, uri: String) -> mpsc::Receiver { - let (tx, rx) = mpsc::channel(); - - self.request_with_callback(session, - MercuryRequest { - method: MercuryMethod::SUB, - uri: uri, - content_type: None, - payload: Vec::new(), - }, - MercuryCallback::Subscription(tx)); - - rx - } - - fn parse_part(mut s: &mut Read) -> Vec { - let size = s.read_u16::().unwrap() as usize; - let mut buffer = vec![0; size]; - s.read_exact(&mut buffer).unwrap(); - - buffer - } - - fn complete_subscription(&mut self, - response: MercuryResponse, - tx: mpsc::Sender) { - for sub_data in response.payload { - if let Ok(mut sub) = - protobuf::parse_from_bytes::(&sub_data) { - self.subscriptions.insert(sub.take_uri(), tx.clone()); - } - } - } - - fn complete_request(&mut self, mut pending: MercuryPending) { - let header_data = pending.parts.remove(0); - let header: protocol::mercury::Header = protobuf::parse_from_bytes(&header_data).unwrap(); - - let response = MercuryResponse { - uri: header.get_uri().to_owned(), - payload: pending.parts, - }; - - match pending.callback { - MercuryCallback::Future(tx) => tx.complete(response), - MercuryCallback::Subscription(tx) => self.complete_subscription(response, tx), - MercuryCallback::Channel => { - self.subscriptions - .get(header.get_uri()) - .map(|tx| tx.send(response).unwrap()); - } - } - } - - fn encode_request(&self, seq: &[u8], req: &MercuryRequest) -> Vec { - let mut packet = Vec::new(); - packet.write_u16::(seq.len() as u16).unwrap(); - packet.write_all(seq).unwrap(); - packet.write_u8(1).unwrap(); // Flags: FINAL - packet.write_u16::(1 + req.payload.len() as u16).unwrap(); // Part count - - let mut header = protobuf_init!(protocol::mercury::Header::new(), { - uri: req.uri.clone(), - method: req.method.to_string(), - }); - if let Some(ref content_type) = req.content_type { - header.set_content_type(content_type.clone()); - } - - packet.write_u16::(header.compute_size() as u16).unwrap(); - header.write_to_writer(&mut packet).unwrap(); - - for p in &req.payload { - packet.write_u16::(p.len() as u16).unwrap(); - packet.write(&p).unwrap(); - } - - packet - } -} - -impl PacketHandler for MercuryManager { - fn handle(&mut self, cmd: u8, data: Vec, _session: &Session) { - let mut packet = Cursor::new(data); - - let seq = { - let seq_length = packet.read_u16::().unwrap() as usize; - let mut seq = vec![0; seq_length]; - packet.read_exact(&mut seq).unwrap(); - seq - }; - let flags = packet.read_u8().unwrap(); - let count = packet.read_u16::().unwrap() as usize; - - let mut pending = if let Some(pending) = self.pending.remove(&seq) { - pending - } else if cmd == 0xb5 { - MercuryPending { - parts: Vec::new(), - partial: None, - callback: MercuryCallback::Channel, - } - } else { - warn!("Ignore seq {:?} cmd {}", seq, cmd); - return; - }; - - for i in 0..count { - let mut part = Self::parse_part(&mut packet); - if let Some(mut data) = replace(&mut pending.partial, None) { - data.append(&mut part); - part = data; - } - - if i == count - 1 && (flags == 2) { - pending.partial = Some(part) - } else { - pending.parts.push(part); - } - } - - if flags == 0x1 { - self.complete_request(pending); - } else { - self.pending.insert(seq, pending); - } - } -} diff --git a/src/mercury/mod.rs b/src/mercury/mod.rs new file mode 100644 index 00000000..d2ce23c0 --- /dev/null +++ b/src/mercury/mod.rs @@ -0,0 +1,207 @@ +use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; +use futures::sync::{oneshot, mpsc}; +use futures::{BoxFuture, Future}; +use std::collections::HashMap; +use std::io::Read; +use std::mem; +use protocol; +use protobuf; +use futures::{Async, Poll}; + +use util::SeqGenerator; + +mod types; +pub use self::types::*; + +mod sender; +pub use self::sender::MercurySender; + +component! { + MercuryManager : MercuryManagerInner { + sequence: SeqGenerator = SeqGenerator::new(0), + pending: HashMap, MercuryPending> = HashMap::new(), + subscriptions: HashMap> = HashMap::new(), + } +} + +pub struct MercuryPending { + parts: Vec>, + partial: Option>, + callback: Option>>, +} + +pub struct MercuryFuture(oneshot::Receiver>); +impl Future for MercuryFuture { + type Item = T; + type Error = MercuryError; + + fn poll(&mut self) -> Poll { + match self.0.poll() { + Ok(Async::Ready(Ok(value))) => Ok(Async::Ready(value)), + Ok(Async::Ready(Err(err))) => Err(err), + Ok(Async::NotReady) => Ok(Async::NotReady), + Err(oneshot::Canceled) => Err(MercuryError), + } + } +} + +impl MercuryManager { + fn next_seq(&self) -> Vec { + let mut seq = vec![0u8; 8]; + BigEndian::write_u64(&mut seq, self.lock(|inner| inner.sequence.get())); + seq + } + + pub fn request(&self, req: MercuryRequest) + -> MercuryFuture + { + let (tx, rx) = oneshot::channel(); + + let pending = MercuryPending { + parts: Vec::new(), + partial: None, + callback: Some(tx), + }; + + let seq = self.next_seq(); + self.lock(|inner| inner.pending.insert(seq.clone(), pending)); + + let cmd = req.method.command(); + let data = req.encode(&seq); + + self.session().send_packet(cmd, data); + MercuryFuture(rx) + } + + pub fn get>(&self, uri: T) + -> MercuryFuture + { + self.request(MercuryRequest { + method: MercuryMethod::GET, + uri: uri.into(), + content_type: None, + payload: Vec::new(), + }) + } + + pub fn send>(&self, uri: T, data: Vec) + -> MercuryFuture + { + self.request(MercuryRequest { + method: MercuryMethod::SEND, + uri: uri.into(), + content_type: None, + payload: vec![data], + }) + } + + /* + pub fn sender>(&self, uri: T) -> MercurySender { + MercurySender::new(self.clone(), uri.into()) + } + */ + + pub fn subscribe>(&self, uri: T) + -> BoxFuture, MercuryError> + { + let request = self.request(MercuryRequest { + method: MercuryMethod::SUB, + uri: uri.into(), + content_type: None, + payload: Vec::new(), + }); + + let manager = self.clone(); + request.map(move |response| { + let (tx, rx) = mpsc::unbounded(); + + manager.lock(move |inner| { + for sub in response.payload { + let mut sub : protocol::pubsub::Subscription + = protobuf::parse_from_bytes(&sub).unwrap(); + let uri = sub.take_uri(); + inner.subscriptions.insert(uri, tx.clone()); + } + }); + + rx + }).boxed() + } + + pub fn dispatch(&self, cmd: u8, data: Vec) { + let mut packet = ::std::io::Cursor::new(data); + let seq = { + let len = packet.read_u16::().unwrap() as usize; + let mut seq = vec![0; len]; + packet.read_exact(&mut seq).unwrap(); + seq + }; + let flags = packet.read_u8().unwrap(); + let count = packet.read_u16::().unwrap() as usize; + + let pending = self.lock(|inner| inner.pending.remove(&seq)); + + let mut pending = match pending { + Some(pending) => pending, + None if cmd == 0xb5 => { + MercuryPending { + parts: Vec::new(), + partial: None, + callback: None, + } + } + None => { + warn!("Ignore seq {:?} cmd {:x}", seq, cmd); + return; + } + }; + + for i in 0..count { + let mut part = Self::parse_part(&mut packet); + if let Some(mut data) = mem::replace(&mut pending.partial, None) { + data.append(&mut part); + part = data; + } + + if i == count - 1 && (flags == 2) { + pending.partial = Some(part) + } else { + pending.parts.push(part); + } + } + + if flags == 0x1 { + self.complete_request(cmd, pending); + } else { + self.lock(move |inner| inner.pending.insert(seq, pending)); + } + } + + fn parse_part(s: &mut T) -> Vec { + let size = s.read_u16::().unwrap() as usize; + let mut buffer = vec![0; size]; + s.read_exact(&mut buffer).unwrap(); + + buffer + } + + fn complete_request(&self, cmd: u8, mut pending: MercuryPending) { + let header_data = pending.parts.remove(0); + let header: protocol::mercury::Header = protobuf::parse_from_bytes(&header_data).unwrap(); + + let response = MercuryResponse { + uri: header.get_uri().to_owned(), + payload: pending.parts, + }; + + if cmd == 0xb5 { + self.lock(|inner| { + if let Some(cb) = inner.subscriptions.get(&response.uri) { + cb.send(response).unwrap(); + } + }) + } else if let Some(cb) = pending.callback { + cb.complete(Ok(response)); + } + } +} diff --git a/src/mercury/sender.rs b/src/mercury/sender.rs new file mode 100644 index 00000000..be660db3 --- /dev/null +++ b/src/mercury/sender.rs @@ -0,0 +1,46 @@ +use std::collections::VecDeque; +use futures::{Async, Poll, Future, Sink, StartSend, AsyncSink}; + +use super::*; + +pub struct MercurySender { + mercury: MercuryManager, + uri: String, + pending: VecDeque>, +} + +impl MercurySender { + // TODO: pub(super) when stable + pub fn new(mercury: MercuryManager, uri: String) -> MercurySender { + MercurySender { + mercury: mercury, + uri: uri, + pending: VecDeque::new(), + } + } +} + +impl Sink for MercurySender { + type SinkItem = Vec; + type SinkError = MercuryError; + + fn start_send(&mut self, item: Self::SinkItem) -> StartSend { + let task = self.mercury.send(self.uri.clone(), item); + self.pending.push_back(task); + Ok(AsyncSink::Ready) + } + + fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { + loop { + match self.pending.front_mut() { + Some(task) => { + try_ready!(task.poll()); + } + None => { + return Ok(Async::Ready(())); + } + } + self.pending.pop_front(); + } + } +} diff --git a/src/mercury/types.rs b/src/mercury/types.rs new file mode 100644 index 00000000..60acb7a1 --- /dev/null +++ b/src/mercury/types.rs @@ -0,0 +1,80 @@ +use byteorder::{BigEndian, WriteBytesExt}; +use protobuf::Message; +use std::io::Write; + +use protocol; + +#[derive(Debug, PartialEq, Eq)] +pub enum MercuryMethod { + GET, + SUB, + UNSUB, + SEND, +} + +#[derive(Debug)] +pub struct MercuryRequest { + pub method: MercuryMethod, + pub uri: String, + pub content_type: Option, + pub payload: Vec>, +} + +#[derive(Debug)] +pub struct MercuryResponse { + pub uri: String, + pub payload: Vec>, +} + +#[derive(Debug,Hash,PartialEq,Eq,Copy,Clone)] +pub struct MercuryError; + +impl ToString for MercuryMethod { + fn to_string(&self) -> String { + match *self { + MercuryMethod::GET => "GET", + MercuryMethod::SUB => "SUB", + MercuryMethod::UNSUB => "UNSUB", + MercuryMethod::SEND => "SEND", + } + .to_owned() + } +} + +impl MercuryMethod { + pub fn command(&self) -> u8 { + match *self { + MercuryMethod::GET | MercuryMethod::SEND => 0xb2, + MercuryMethod::SUB => 0xb3, + MercuryMethod::UNSUB => 0xb4, + } + } +} + +impl MercuryRequest { + pub fn encode(&self, seq: &[u8]) -> Vec { + let mut packet = Vec::new(); + packet.write_u16::(seq.len() as u16).unwrap(); + packet.write_all(seq).unwrap(); + packet.write_u8(1).unwrap(); // Flags: FINAL + packet.write_u16::(1 + self.payload.len() as u16).unwrap(); // Part count + + let mut header = protocol::mercury::Header::new(); + header.set_uri(self.uri.clone()); + header.set_method(self.method.to_string()); + + if let Some(ref content_type) = self.content_type { + header.set_content_type(content_type.clone()); + } + + packet.write_u16::(header.compute_size() as u16).unwrap(); + header.write_to_writer(&mut packet).unwrap(); + + for p in &self.payload { + packet.write_u16::(p.len() as u16).unwrap(); + packet.write(&p).unwrap(); + } + + packet + } +} diff --git a/src/metadata.rs b/src/metadata.rs index 6ab1386f..c59a4063 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -1,9 +1,9 @@ -use eventual::{Async, Future}; +use eventual::Future; use linear_map::LinearMap; use protobuf; +use futures::Future as Future_; use protocol; -use mercury::{MercuryRequest, MercuryMethod}; use util::{SpotifyId, FileId, StrChunksExt}; use session::Session; @@ -189,17 +189,17 @@ impl MetadataManager { pub fn get(&mut self, session: &Session, id: SpotifyId) -> MetadataRef { let session = session.clone(); - session.mercury(MercuryRequest { - method: MercuryMethod::GET, - uri: format!("{}/{}", T::base_url(), id.to_base16()), - content_type: None, - payload: Vec::new(), - }) - .and_then(move |response| { - let data = response.payload.first().expect("Empty payload"); - let msg: T::Message = protobuf::parse_from_bytes(data).unwrap(); - Ok(T::parse(&msg, &session)) - }) + let uri = format!("{}/{}", T::base_url(), id.to_base16()); + let request = session.mercury().get(uri); + + let result = request.and_then(move |response| { + let data = response.payload.first().expect("Empty payload"); + let msg: T::Message = protobuf::parse_from_bytes(data).unwrap(); + + Ok(T::parse(&msg, &session)) + }).wait(); + + Future::of(result.unwrap()) } } diff --git a/src/session.rs b/src/session.rs index e61076f7..7255fd9f 100644 --- a/src/session.rs +++ b/src/session.rs @@ -5,7 +5,7 @@ use eventual::Future; use eventual::Async; use std::io::{self, Read, Cursor}; use std::result::Result; -use std::sync::{Mutex, RwLock, Arc, mpsc, Weak}; +use std::sync::{Mutex, RwLock, Arc, Weak}; use std::str::FromStr; use futures::Future as Future_; use futures::{Stream, BoxFuture}; @@ -14,15 +14,16 @@ use tokio_core::reactor::Handle; use album_cover::AlbumCover; use apresolve::apresolve_or_fallback; use audio_file::AudioFile; -use audio_key::AudioKeyManager; use authentication::Credentials; use cache::Cache; use connection::{self, adaptor}; -use mercury::{MercuryManager, MercuryRequest, MercuryResponse}; use metadata::{MetadataManager, MetadataRef, MetadataTrait}; use stream::StreamManager; use util::{SpotifyId, FileId, ReadSeek, Lazy}; +use audio_key::AudioKeyManager; +use mercury::MercuryManager; + use stream; #[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq)] @@ -62,13 +63,13 @@ pub struct SessionInternal { data: RwLock, cache: Box, - mercury: Mutex, metadata: Mutex, stream: Mutex, rx_connection: Mutex), io::Error>>, tx_connection: Mutex)>>, audio_key: Lazy, + mercury: Lazy, } #[derive(Clone)] @@ -129,11 +130,11 @@ impl Session { tx_connection: Mutex::new(tx), cache: cache, - mercury: Mutex::new(MercuryManager::new()), metadata: Mutex::new(MetadataManager::new()), stream: Mutex::new(StreamManager::new()), audio_key: Lazy::new(), + mercury: Lazy::new(), })); (session, task) @@ -143,6 +144,10 @@ impl Session { self.0.audio_key.get(|| AudioKeyManager::new(self.weak())) } + pub fn mercury(&self) -> &MercuryManager { + self.0.mercury.get(|| MercuryManager::new(self.weak())) + } + pub fn poll(&self) { let (cmd, data) = self.recv(); @@ -150,11 +155,12 @@ impl Session { 0x4 => self.send_packet(0x49, data), 0x4a => (), 0x9 | 0xa => self.0.stream.lock().unwrap().handle(cmd, data, self), - 0xd | 0xe => self.audio_key().dispatch(cmd, data), 0x1b => { self.0.data.write().unwrap().country = String::from_utf8(data).unwrap(); } - 0xb2...0xb6 => self.0.mercury.lock().unwrap().handle(cmd, data, self), + + 0xd | 0xe => self.audio_key().dispatch(cmd, data), + 0xb2...0xb6 => self.mercury().dispatch(cmd, data), _ => (), } } @@ -225,14 +231,6 @@ impl Session { self.0.metadata.lock().unwrap().get(self, id) } - pub fn mercury(&self, req: MercuryRequest) -> Future { - self.0.mercury.lock().unwrap().request(self, req) - } - - pub fn mercury_sub(&self, uri: String) -> mpsc::Receiver { - self.0.mercury.lock().unwrap().subscribe(self, uri) - } - pub fn cache(&self) -> &Cache { self.0.cache.as_ref() } diff --git a/src/spirc.rs b/src/spirc.rs index cbff27ae..f226c029 100644 --- a/src/spirc.rs +++ b/src/spirc.rs @@ -1,15 +1,14 @@ -use eventual::Async; use protobuf::{self, Message, RepeatedField}; use std::borrow::Cow; use std::sync::{Mutex, Arc}; use std::collections::HashMap; -use mercury::{MercuryRequest, MercuryMethod}; use player::{Player, PlayerState}; use session::Session; use util; use util::SpotifyId; use version; +use futures::{Future, Stream}; use protocol; pub use protocol::spirc::{PlayStatus, MessageType}; @@ -79,7 +78,8 @@ impl SpircManager { let rx = { let mut internal = self.0.lock().unwrap(); - let rx = internal.session.mercury_sub(internal.uri()); + let rx = internal.session.mercury().subscribe(internal.uri()); + let rx = rx.map_err(|_| ()).flatten_stream().wait(); internal.notify(true, None); @@ -97,7 +97,7 @@ impl SpircManager { }; for pkt in rx { - let data = pkt.payload.first().unwrap(); + let data = pkt.as_ref().unwrap().payload.first().unwrap(); let frame = protobuf::parse_from_bytes::(data).unwrap(); debug!("{:?} {:?} {} {} {}", @@ -452,15 +452,10 @@ impl<'a> CommandSender<'a> { pkt.set_state(self.spirc_internal.spirc_state(&state)); } - self.spirc_internal - .session - .mercury(MercuryRequest { - method: MercuryMethod::SEND, - uri: self.spirc_internal.uri(), - content_type: None, - payload: vec![pkt.write_to_bytes().unwrap()], - }) - .fire(); + let payload = pkt.write_to_bytes().unwrap(); + let uri = self.spirc_internal.uri(); + self.spirc_internal.session.mercury() + .send(uri, payload).wait().unwrap(); } }