From d27063d5da9170b944a19cc1e0508563c9c52421 Mon Sep 17 00:00:00 2001 From: Paul Lietar Date: Wed, 18 Jan 2017 18:41:22 +0000 Subject: [PATCH] Create event loop in main --- src/audio_backend/alsa.rs | 4 +- src/audio_backend/mod.rs | 9 ++- src/audio_backend/pipe.rs | 2 +- src/audio_backend/portaudio.rs | 4 +- src/audio_backend/pulseaudio.rs | 2 +- src/connection/adaptor.rs | 64 +++++++--------------- src/main.rs | 77 +++++++++++++++++--------- src/session.rs | 97 ++++++++++++++++----------------- src/spirc.rs | 2 +- 9 files changed, 129 insertions(+), 132 deletions(-) diff --git a/src/audio_backend/alsa.rs b/src/audio_backend/alsa.rs index 95a4162f..ce459380 100644 --- a/src/audio_backend/alsa.rs +++ b/src/audio_backend/alsa.rs @@ -5,10 +5,10 @@ use alsa::{PCM, Stream, Mode, Format, Access}; pub struct AlsaSink(Option, String); impl Open for AlsaSink { - fn open(device: Option<&str>) -> AlsaSink { + fn open(device: Option) -> AlsaSink { info!("Using alsa sink"); - let name = device.unwrap_or("default").to_string(); + let name = device.unwrap_or("default".to_string()); AlsaSink(None, name) } diff --git a/src/audio_backend/mod.rs b/src/audio_backend/mod.rs index 4b4f6bfe..a7034055 100644 --- a/src/audio_backend/mod.rs +++ b/src/audio_backend/mod.rs @@ -1,7 +1,7 @@ use std::io; pub trait Open { - fn open(Option<&str>) -> Self; + fn open(Option) -> Self; } pub trait Sink { @@ -49,8 +49,7 @@ macro_rules! _declare_backends { ) } -#[allow(dead_code)] -fn mk_sink(device: Option<&str>) -> Box { +fn mk_sink(device: Option) -> Box { Box::new(S::open(device)) } @@ -75,7 +74,7 @@ use self::pipe::StdoutSink; declare_backends! { pub const BACKENDS : &'static [ (&'static str, - &'static (Fn(Option<&str>) -> Box + Sync + Send + 'static)) + &'static (Fn(Option) -> Box + Sync + Send + 'static)) ] = &[ #[cfg(feature = "alsa-backend")] ("alsa", &mk_sink::), @@ -87,7 +86,7 @@ declare_backends! { ]; } -pub fn find>(name: Option) -> Option<&'static (Fn(Option<&str>) -> Box + Send + Sync)> { +pub fn find>(name: Option) -> Option<&'static (Fn(Option) -> Box + Send + Sync)> { if let Some(name) = name.as_ref().map(AsRef::as_ref) { BACKENDS.iter().find(|backend| name == backend.0).map(|backend| backend.1) } else { diff --git a/src/audio_backend/pipe.rs b/src/audio_backend/pipe.rs index 9761c5cb..10461cfb 100644 --- a/src/audio_backend/pipe.rs +++ b/src/audio_backend/pipe.rs @@ -7,7 +7,7 @@ use std::slice; pub struct StdoutSink(Box); impl Open for StdoutSink { - fn open(path: Option<&str>) -> StdoutSink { + fn open(path: Option) -> StdoutSink { if let Some(path) = path { let file = OpenOptions::new().write(true).open(path).unwrap(); StdoutSink(Box::new(file)) diff --git a/src/audio_backend/portaudio.rs b/src/audio_backend/portaudio.rs index 44a34338..2460c22e 100644 --- a/src/audio_backend/portaudio.rs +++ b/src/audio_backend/portaudio.rs @@ -39,13 +39,13 @@ fn find_output(device: &str) -> Option { } impl <'a> Open for PortAudioSink<'a> { - fn open(device: Option<&str>) -> PortAudioSink<'a> { + fn open(device: Option) -> PortAudioSink<'a> { debug!("Using PortAudio sink"); portaudio::initialize().unwrap(); - let device_idx = match device { + let device_idx = match device.as_ref().map(AsRef::as_ref) { Some("?") => { list_outputs(); exit(0) diff --git a/src/audio_backend/pulseaudio.rs b/src/audio_backend/pulseaudio.rs index 05876fe6..3b9a09b3 100644 --- a/src/audio_backend/pulseaudio.rs +++ b/src/audio_backend/pulseaudio.rs @@ -8,7 +8,7 @@ use std::ffi::CString; pub struct PulseAudioSink(*mut pa_simple); impl Open for PulseAudioSink { - fn open(device: Option<&str>) -> PulseAudioSink { + fn open(device: Option) -> PulseAudioSink { debug!("Using PulseAudio sink"); if device.is_some() { diff --git a/src/connection/adaptor.rs b/src/connection/adaptor.rs index aab88399..e91a5a0d 100644 --- a/src/connection/adaptor.rs +++ b/src/connection/adaptor.rs @@ -6,8 +6,8 @@ use std::thread; use tokio_core::reactor::Core; use tokio_core::reactor::Handle; -pub struct SinkAdaptor(Option>); -pub struct StreamAdaptor(Option>>); +pub struct SinkAdaptor(pub Option>); +pub struct StreamAdaptor(pub Option>>); impl SinkAdaptor { pub fn send(&mut self, item: T) { @@ -30,59 +30,33 @@ impl StreamAdaptor { } } -fn adapt_sink(sink: S, rx: mpsc::Receiver) -> BoxFuture<(), ()> - where S: Sink + Send + 'static, - S::SinkItem: Send, - S::SinkError: Send, -{ - rx.map_err(|_| -> S::SinkError { panic!("") }) - .forward(sink) - .map(|_| ()).map_err(|_| ()) - .boxed() -} - -fn adapt_stream(stream: S, tx: mpsc::Sender>) -> BoxFuture<(), ()> - where S: Stream + Send + 'static, - S::Item: Send, - S::Error: Send, -{ - stream.then(ok::<_, mpsc::SendError<_>>) - .forward(tx) - .map(|_| ()).map_err(|_| ()) - .boxed() -} - -pub fn adapt(f: F) -> (SinkAdaptor, StreamAdaptor) - where F: FnOnce(Handle) -> U + Send + 'static, - U: IntoFuture, - S: Sink + Stream + Send + 'static, +pub fn adapt(transport: S) -> (SinkAdaptor, + StreamAdaptor, + BoxFuture<(), E>) + where S: Sink + Stream + Send + 'static, S::Item: Send + 'static, - S::Error: Send + 'static, S::SinkItem: Send + 'static, - S::SinkError: Send + 'static, + E: Send + 'static, { - let (receiver_tx, receiver_rx) = mpsc::channel(0); let (sender_tx, sender_rx) = mpsc::channel(0); + let (sink, stream) = transport.split(); - thread::spawn(move || { - let mut core = Core::new().unwrap(); - let handle = core.handle(); - let task = - f(handle).into_future() - .map(|connection| connection.split()) - .map_err(|_| ()) - .and_then(|(sink, stream)| { - (adapt_sink(sink, sender_rx), - adapt_stream(stream, receiver_tx)) - }); + let receiver_task = stream + .then(ok::<_, mpsc::SendError<_>>) + .forward(receiver_tx).map(|_| ()) + .map_err(|e| -> E { panic!(e) }); - core.run(task).unwrap(); - }); + let sender_task = sender_rx + .map_err(|e| -> E { panic!(e) }) + .forward(sink).map(|_| ()); + + let task = (receiver_task, sender_task).into_future() + .map(|((), ())| ()).boxed(); (SinkAdaptor(Some(sender_tx)), - StreamAdaptor(Some(receiver_rx))) + StreamAdaptor(Some(receiver_rx)), task) } pub fn adapt_future(f: F) -> oneshot::Receiver> diff --git a/src/main.rs b/src/main.rs index d0ccf007..664507f1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,8 @@ extern crate getopts; extern crate librespot; extern crate ctrlc; extern crate env_logger; +extern crate futures; +extern crate tokio_core; use env_logger::LogBuilder; use std::io::{stderr, Write}; @@ -11,10 +13,12 @@ use std::thread; use std::env; use std::path::PathBuf; use std::str::FromStr; +use futures::Future; +use tokio_core::reactor::Core; use librespot::spirc::SpircManager; -use librespot::authentication::get_credentials; -use librespot::audio_backend::{self, BACKENDS}; +use librespot::authentication::{get_credentials, Credentials}; +use librespot::audio_backend::{self, Sink, BACKENDS}; use librespot::cache::{Cache, DefaultCache, NoCache}; use librespot::player::Player; use librespot::session::{Bitrate, Config, Session}; @@ -59,7 +63,15 @@ fn list_backends() { } } -fn setup(args: &[String]) -> (Session, Player) { +struct Setup { + backend: &'static (Fn(Option) -> Box + Send + Sync), + cache: Box, + config: Config, + credentials: Credentials, + device: Option, +} + +fn setup(args: &[String]) -> Setup { let mut opts = getopts::Options::new(); opts.optopt("c", "cache", "Path to a directory where files will be cached.", "CACHE") .reqopt("n", "name", "Device name", "NAME") @@ -101,8 +113,8 @@ fn setup(args: &[String]) -> (Session, Player) { .map(|bitrate| Bitrate::from_str(bitrate).expect("Invalid bitrate")) .unwrap_or(Bitrate::Bitrate160); - let device_name = matches.opt_str("name").unwrap(); - let device_id = librespot::session::device_id(&device_name); + let name = matches.opt_str("name").unwrap(); + let device_id = librespot::session::device_id(&name); let cache = matches.opt_str("c").map(|cache_location| { Box::new(DefaultCache::new(PathBuf::from(cache_location)).unwrap()) @@ -111,46 +123,59 @@ fn setup(args: &[String]) -> (Session, Player) { let cached_credentials = cache.get_credentials(); - let credentials = get_credentials(&device_name, &device_id, + let credentials = get_credentials(&name, &device_id, matches.opt_str("username"), matches.opt_str("password"), cached_credentials); let config = Config { user_agent: version::version_string(), - device_name: device_name, + name: name, device_id: device_id, bitrate: bitrate, onstart: matches.opt_str("onstart"), onstop: matches.opt_str("onstop"), }; - let session = Session::new(config, cache); + let device = matches.opt_str("device"); - session.login(credentials).unwrap(); - - let device_name = matches.opt_str("device"); - let player = Player::new(session.clone(), move || { - (backend)(device_name.as_ref().map(AsRef::as_ref)) - }); - - (session, player) + Setup { + backend: backend, + cache: cache, + config: config, + credentials: credentials, + device: device, + } } fn main() { + let mut core = Core::new().unwrap(); + let handle = core.handle(); + let args: Vec = std::env::args().collect(); - let (session, player) = setup(&args); - let spirc = SpircManager::new(session.clone(), player); - let spirc_signal = spirc.clone(); - thread::spawn(move || spirc.run()); + let Setup { backend, cache, config, credentials, device } = setup(&args); - ctrlc::set_handler(move || { - spirc_signal.send_goodbye(); - exit(0); + let connection = Session::connect(config, credentials, cache, handle); + + let task = connection.and_then(move |(session, task)| { + let player = Player::new(session.clone(), move || { + (backend)(device) + }); + + let spirc = SpircManager::new(session.clone(), player); + let spirc_signal = spirc.clone(); + + ctrlc::set_handler(move || { + spirc_signal.send_goodbye(); + exit(0); + }); + + thread::spawn(move || spirc.run()); + thread::spawn(move || loop { session.poll() }); + + task }); - loop { - session.poll(); - } + core.run(task).unwrap() } diff --git a/src/session.rs b/src/session.rs index 2d3f838e..417f43de 100644 --- a/src/session.rs +++ b/src/session.rs @@ -9,7 +9,7 @@ use std::sync::{Mutex, RwLock, Arc, mpsc}; use std::str::FromStr; use futures::Future as Future_; use futures::Stream; -use futures::sync::oneshot; +use tokio_core::reactor::Handle; use album_cover::AlbumCover; use apresolve::apresolve_or_fallback; @@ -45,7 +45,7 @@ impl FromStr for Bitrate { pub struct Config { pub user_agent: String, - pub device_name: String, + pub name: String, pub device_id: String, pub bitrate: Bitrate, pub onstart: Option, @@ -66,74 +66,73 @@ pub struct SessionInternal { metadata: Mutex, stream: Mutex, audio_key: Mutex, - rx_connection: Mutex), io::Error>>>, - tx_connection: Mutex)>>>, + rx_connection: Mutex), io::Error>>, + tx_connection: Mutex)>>, } #[derive(Clone)] pub struct Session(pub Arc); -pub fn device_id(device_name: &str) -> String { +pub fn device_id(name: &str) -> String { let mut h = Sha1::new(); - h.input_str(&device_name); + h.input_str(&name); h.result_str() } impl Session { - pub fn new(config: Config, cache: Box) -> Session { - Session(Arc::new(SessionInternal { + pub fn connect(config: Config, credentials: Credentials, + cache: Box, handle: Handle) + -> Box>), Error=io::Error>> + { + let access_point = apresolve_or_fallback::(&handle); + + let connection = access_point.and_then(move |addr| { + info!("Connecting to AP \"{}\"", addr); + connection::connect::<&str>(&addr, &handle) + }); + + let device_id = config.device_id.clone(); + let authentication = connection.and_then(move |connection| { + connection::authenticate(connection, credentials, device_id) + }); + + let result = authentication.map(move |(transport, reusable_credentials)| { + info!("Authenticated !"); + cache.put_credentials(&reusable_credentials); + + let (session, task) = Session::create(transport, config, cache, reusable_credentials.username.clone()); + (session, task) + }); + + Box::new(result) + } + + fn create(transport: connection::Transport, config: Config, + cache: Box, username: String) -> (Session, Box>) + { + let transport = transport.map(|(cmd, data)| (cmd, data.as_ref().to_owned())); + let (tx, rx, task) = adaptor::adapt(transport); + + let session = Session(Arc::new(SessionInternal { config: config, data: RwLock::new(SessionData { country: String::new(), - canonical_username: String::new(), + canonical_username: username, }), - rx_connection: Mutex::new(None), - tx_connection: Mutex::new(None), + rx_connection: Mutex::new(rx), + tx_connection: Mutex::new(tx), cache: cache, mercury: Mutex::new(MercuryManager::new()), metadata: Mutex::new(MetadataManager::new()), stream: Mutex::new(StreamManager::new()), audio_key: Mutex::new(AudioKeyManager::new()), - })) + })); + + (session, task) } - pub fn login(&self, credentials: Credentials) -> Result { - let device_id = self.device_id().to_owned(); - - let (creds_tx, creds_rx) = oneshot::channel(); - - let (tx, rx) = adaptor::adapt(move |handle| { - let access_point = apresolve_or_fallback::(&handle); - - let connection = access_point.and_then(move |addr| { - info!("Connecting to AP \"{}\"", addr); - connection::connect::<&str>(&addr, &handle) - }); - - let authentication = connection.and_then(move |connection| { - connection::authenticate(connection, credentials, device_id) - }); - - authentication.map(|(transport, creds)| { - creds_tx.complete(creds); - transport.map(|(cmd, data)| (cmd, data.as_ref().to_owned())) - }) - }); - - let reusable_credentials: Credentials = creds_rx.wait().unwrap(); - - self.0.data.write().unwrap().canonical_username = reusable_credentials.username.clone(); - *self.0.rx_connection.lock().unwrap() = Some(rx); - *self.0.tx_connection.lock().unwrap() = Some(tx); - - info!("Authenticated !"); - - self.0.cache.put_credentials(&reusable_credentials); - - Ok(reusable_credentials) - } pub fn poll(&self) { let (cmd, data) = self.recv(); @@ -152,11 +151,11 @@ impl Session { } pub fn recv(&self) -> (u8, Vec) { - self.0.rx_connection.lock().unwrap().as_mut().unwrap().recv().unwrap() + self.0.rx_connection.lock().unwrap().recv().unwrap() } pub fn send_packet(&self, cmd: u8, data: Vec) { - self.0.tx_connection.lock().unwrap().as_mut().unwrap().send((cmd, data)) + self.0.tx_connection.lock().unwrap().send((cmd, data)) } pub fn audio_key(&self, track: SpotifyId, file_id: FileId) -> Future { diff --git a/src/spirc.rs b/src/spirc.rs index 6ee463e4..cbff27ae 100644 --- a/src/spirc.rs +++ b/src/spirc.rs @@ -46,7 +46,7 @@ struct SpircInternal { impl SpircManager { pub fn new(session: Session, player: Player) -> SpircManager { let ident = session.device_id().to_owned(); - let name = session.config().device_name.clone(); + let name = session.config().name.clone(); SpircManager(Arc::new(Mutex::new(SpircInternal { player: player,