Move MetadataManager to use tokio

This commit is contained in:
Paul Lietar 2017-01-19 12:56:49 +00:00
parent bcbd7afb1a
commit 05118b40f8
7 changed files with 49 additions and 129 deletions

View file

@ -1,5 +1,6 @@
use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use futures::sync::oneshot; use futures::sync::oneshot;
use futures::{Async, Future, Poll};
use std::collections::HashMap; use std::collections::HashMap;
use std::io::Write; use std::io::Write;
@ -12,12 +13,10 @@ pub struct AudioKey(pub [u8; 16]);
#[derive(Debug,Hash,PartialEq,Eq,Copy,Clone)] #[derive(Debug,Hash,PartialEq,Eq,Copy,Clone)]
pub struct AudioKeyError; pub struct AudioKeyError;
pub type Result = ::std::result::Result<AudioKey, AudioKeyError>;
component! { component! {
AudioKeyManager : AudioKeyManagerInner { AudioKeyManager : AudioKeyManagerInner {
sequence: SeqGenerator<u32> = SeqGenerator::new(0), sequence: SeqGenerator<u32> = SeqGenerator::new(0),
pending: HashMap<u32, oneshot::Sender<Result>> = HashMap::new(), pending: HashMap<u32, oneshot::Sender<Result<AudioKey, AudioKeyError>>> = HashMap::new(),
} }
} }
@ -43,7 +42,7 @@ impl AudioKeyManager {
} }
} }
pub fn request<'a>(&self, track: SpotifyId, file: FileId) -> oneshot::Receiver<Result> { pub fn request<'a>(&self, track: SpotifyId, file: FileId) -> AudioKeyFuture<AudioKey> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let seq = self.lock(move |inner| { let seq = self.lock(move |inner| {
@ -53,7 +52,7 @@ impl AudioKeyManager {
}); });
self.send_key_request(seq, track, file); self.send_key_request(seq, track, file);
rx AudioKeyFuture(rx)
} }
fn send_key_request<'a>(&self, seq: u32, track: SpotifyId, file: FileId) { fn send_key_request<'a>(&self, seq: u32, track: SpotifyId, file: FileId) {
@ -66,3 +65,19 @@ impl AudioKeyManager {
self.session().send_packet(0xc, data) self.session().send_packet(0xc, data)
} }
} }
pub struct AudioKeyFuture<T>(oneshot::Receiver<Result<T, AudioKeyError>>);
impl <T> Future for AudioKeyFuture<T> {
type Item = T;
type Error = AudioKeyError;
fn poll(&mut self) -> Poll<T, AudioKeyError> {
match self.0.poll() {
Ok(Async::Ready(Ok(value))) => Ok(Async::Ready(value)),
Ok(Async::Ready(Err(err))) => Err(err),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(oneshot::Canceled) => Err(AudioKeyError),
}
}
}

View file

@ -58,7 +58,6 @@ pub mod audio_file;
pub mod audio_key; pub mod audio_key;
pub mod cache; pub mod cache;
pub mod diffie_hellman; pub mod diffie_hellman;
pub mod link;
pub mod mercury; pub mod mercury;
pub mod metadata; pub mod metadata;
pub mod player; pub mod player;

View file

@ -1,83 +0,0 @@
use util::SpotifyId;
use session::Session;
use metadata::{MetadataRef, Album, Artist, Track};
#[derive(Debug,Clone)]
pub enum Link {
Track {
id: SpotifyId,
offset: u32,
},
Album {
id: SpotifyId,
},
Artist {
id: SpotifyId,
},
/*
Search,
Playlist,
Profile,
Starred,
LocalTrack,
Image,
*/
}
impl Link {
pub fn from_str(uri: &str) -> Result<Link, ()> {
let mut parts = uri.split(':');
if parts.next() != Some("spotify") {
return Err(())
}
match parts.next() {
Some("track") => parts.next()
.map(SpotifyId::from_base62)
.map(|id| Link::Track {
id: id,
offset: 0,
})
.ok_or(()),
Some("album") => parts.next()
.map(SpotifyId::from_base62)
.map(|id| Link::Album {
id: id,
})
.ok_or(()),
Some("artist") => parts.next()
.map(SpotifyId::from_base62)
.map(|id| Link::Artist {
id: id,
})
.ok_or(()),
_ => Err(())
}
}
pub fn as_track(&self, session: &Session) -> Option<MetadataRef<Track>> {
match *self {
Link::Track { id, .. } => Some(session.metadata::<Track>(id)),
_ => None,
}
}
pub fn as_album(&self, session: &Session) -> Option<MetadataRef<Album>> {
match *self {
Link::Album { id, .. } => Some(session.metadata::<Album>(id)),
_ => None,
}
}
pub fn as_artist(&self, session: &Session) -> Option<MetadataRef<Artist>> {
match *self {
Link::Artist { id, .. } => Some(session.metadata::<Artist>(id)),
_ => None,
}
}
}

View file

@ -1,12 +1,11 @@
use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use futures::sync::{oneshot, mpsc}; use futures::sync::{oneshot, mpsc};
use futures::{BoxFuture, Future}; use futures::{Async, Poll, BoxFuture, Future};
use protobuf;
use protocol;
use std::collections::HashMap; use std::collections::HashMap;
use std::io::Read; use std::io::Read;
use std::mem; use std::mem;
use protocol;
use protobuf;
use futures::{Async, Poll};
use util::SeqGenerator; use util::SeqGenerator;

View file

@ -1,11 +1,11 @@
use eventual::Future; use futures::{Future, BoxFuture};
use linear_map::LinearMap; use linear_map::LinearMap;
use protobuf; use protobuf;
use futures::Future as Future_;
use mercury::MercuryError;
use protocol; use protocol;
use util::{SpotifyId, FileId, StrChunksExt};
use session::Session; use session::Session;
use util::{SpotifyId, FileId, StrChunksExt};
pub use protocol::metadata::AudioFile_Format as FileFormat; pub use protocol::metadata::AudioFile_Format as FileFormat;
@ -59,11 +59,6 @@ pub struct Artist {
pub top_tracks: Vec<SpotifyId>, pub top_tracks: Vec<SpotifyId>,
} }
pub type MetadataRef<T> = Future<T, ()>;
pub type TrackRef = MetadataRef<Track>;
pub type AlbumRef = MetadataRef<Album>;
pub type ArtistRef = MetadataRef<Artist>;
impl MetadataTrait for Track { impl MetadataTrait for Track {
type Message = protocol::metadata::Track; type Message = protocol::metadata::Track;
@ -180,26 +175,22 @@ impl MetadataTrait for Artist {
} }
} }
pub struct MetadataManager; component! {
MetadataManager : MetadataManagerInner { }
impl MetadataManager {
pub fn new() -> MetadataManager {
MetadataManager
} }
pub fn get<T: MetadataTrait>(&mut self, session: &Session, id: SpotifyId) -> MetadataRef<T> { impl MetadataManager {
let session = session.clone(); pub fn get<T: MetadataTrait>(&self, id: SpotifyId) -> BoxFuture<T, MercuryError> {
let session = self.session();
let uri = format!("{}/{}", T::base_url(), id.to_base16()); let uri = format!("{}/{}", T::base_url(), id.to_base16());
let request = session.mercury().get(uri); let request = session.mercury().get(uri);
let result = request.and_then(move |response| { request.and_then(move |response| {
let data = response.payload.first().expect("Empty payload"); let data = response.payload.first().expect("Empty payload");
let msg: T::Message = protobuf::parse_from_bytes(data).unwrap(); let msg: T::Message = protobuf::parse_from_bytes(data).unwrap();
Ok(T::parse(&msg, &session)) Ok(T::parse(&msg, &session))
}).wait(); }).boxed()
Future::of(result.unwrap())
} }
} }

View file

@ -1,14 +1,13 @@
use eventual::{self, Async};
use std::borrow::Cow; use std::borrow::Cow;
use std::sync::{mpsc, Mutex, Arc, MutexGuard}; use std::sync::{mpsc, Mutex, Arc, MutexGuard};
use std::thread; use std::thread;
use std::io::{Read, Seek}; use std::io::{Read, Seek};
use vorbis; use vorbis;
use futures::Future; use futures::{future, Future};
use audio_decrypt::AudioDecrypt; use audio_decrypt::AudioDecrypt;
use audio_backend::Sink; use audio_backend::Sink;
use metadata::{FileFormat, Track, TrackRef}; use metadata::{FileFormat, Track};
use session::{Bitrate, Session}; use session::{Bitrate, Session};
use util::{self, ReadSeek, SpotifyId, Subfile}; use util::{self, ReadSeek, SpotifyId, Subfile};
pub use spirc::PlayStatus; pub use spirc::PlayStatus;
@ -170,16 +169,16 @@ fn find_available_alternative<'a>(session: &Session, track: &'a Track) -> Option
let alternatives = track.alternatives let alternatives = track.alternatives
.iter() .iter()
.map(|alt_id| { .map(|alt_id| {
session.metadata::<Track>(*alt_id) session.metadata().get::<Track>(*alt_id)
}) });
.collect::<Vec<TrackRef>>(); let alternatives = future::join_all(alternatives).wait().unwrap();
eventual::sequence(alternatives.into_iter()).iter().find(|alt| alt.available).map(Cow::Owned) alternatives.into_iter().find(|alt| alt.available).map(Cow::Owned)
} }
} }
fn load_track(session: &Session, track_id: SpotifyId) -> Option<vorbis::Decoder<Subfile<AudioDecrypt<Box<ReadSeek>>>>> { fn load_track(session: &Session, track_id: SpotifyId) -> Option<vorbis::Decoder<Subfile<AudioDecrypt<Box<ReadSeek>>>>> {
let track = session.metadata::<Track>(track_id).await().unwrap(); let track = session.metadata().get::<Track>(track_id).wait().unwrap();
info!("Loading track \"{}\"", track.name); info!("Loading track \"{}\"", track.name);
@ -207,7 +206,7 @@ fn load_track(session: &Session, track_id: SpotifyId) -> Option<vorbis::Decoder<
} }
}; };
let key = session.audio_key().request(track.id, file_id).wait().unwrap().unwrap(); let key = session.audio_key().request(track.id, file_id).wait().unwrap();
let audio_file = Subfile::new(AudioDecrypt::new(key, session.audio_file(file_id)), 0xa7); let audio_file = Subfile::new(AudioDecrypt::new(key, session.audio_file(file_id)), 0xa7);
let decoder = vorbis::Decoder::new(audio_file).unwrap(); let decoder = vorbis::Decoder::new(audio_file).unwrap();

View file

@ -17,12 +17,12 @@ use audio_file::AudioFile;
use authentication::Credentials; use authentication::Credentials;
use cache::Cache; use cache::Cache;
use connection::{self, adaptor}; use connection::{self, adaptor};
use metadata::{MetadataManager, MetadataRef, MetadataTrait};
use stream::StreamManager; use stream::StreamManager;
use util::{SpotifyId, FileId, ReadSeek, Lazy}; use util::{FileId, ReadSeek, Lazy};
use audio_key::AudioKeyManager; use audio_key::AudioKeyManager;
use mercury::MercuryManager; use mercury::MercuryManager;
use metadata::MetadataManager;
use stream; use stream;
@ -63,13 +63,13 @@ pub struct SessionInternal {
data: RwLock<SessionData>, data: RwLock<SessionData>,
cache: Box<Cache + Send + Sync>, cache: Box<Cache + Send + Sync>,
metadata: Mutex<MetadataManager>,
stream: Mutex<StreamManager>, stream: Mutex<StreamManager>,
rx_connection: Mutex<adaptor::StreamAdaptor<(u8, Vec<u8>), io::Error>>, rx_connection: Mutex<adaptor::StreamAdaptor<(u8, Vec<u8>), io::Error>>,
tx_connection: Mutex<adaptor::SinkAdaptor<(u8, Vec<u8>)>>, tx_connection: Mutex<adaptor::SinkAdaptor<(u8, Vec<u8>)>>,
audio_key: Lazy<AudioKeyManager>, audio_key: Lazy<AudioKeyManager>,
mercury: Lazy<MercuryManager>, mercury: Lazy<MercuryManager>,
metadata: Lazy<MetadataManager>,
} }
#[derive(Clone)] #[derive(Clone)]
@ -130,11 +130,11 @@ impl Session {
tx_connection: Mutex::new(tx), tx_connection: Mutex::new(tx),
cache: cache, cache: cache,
metadata: Mutex::new(MetadataManager::new()),
stream: Mutex::new(StreamManager::new()), stream: Mutex::new(StreamManager::new()),
audio_key: Lazy::new(), audio_key: Lazy::new(),
mercury: Lazy::new(), mercury: Lazy::new(),
metadata: Lazy::new(),
})); }));
(session, task) (session, task)
@ -148,6 +148,10 @@ impl Session {
self.0.mercury.get(|| MercuryManager::new(self.weak())) self.0.mercury.get(|| MercuryManager::new(self.weak()))
} }
pub fn metadata(&self) -> &MetadataManager {
self.0.metadata.get(|| MetadataManager::new(self.weak()))
}
pub fn poll(&self) { pub fn poll(&self) {
let (cmd, data) = self.recv(); let (cmd, data) = self.recv();
@ -227,10 +231,6 @@ impl Session {
self.0.stream.lock().unwrap().create(handler, self) self.0.stream.lock().unwrap().create(handler, self)
} }
pub fn metadata<T: MetadataTrait>(&self, id: SpotifyId) -> MetadataRef<T> {
self.0.metadata.lock().unwrap().get(self, id)
}
pub fn cache(&self) -> &Cache { pub fn cache(&self) -> &Cache {
self.0.cache.as_ref() self.0.cache.as_ref()
} }