diff --git a/src/authentication/discovery.rs b/src/authentication/discovery.rs index 33f1b60c..0ac09308 100644 --- a/src/authentication/discovery.rs +++ b/src/authentication/discovery.rs @@ -18,7 +18,6 @@ use tokio_core::reactor::Handle; use std::net::SocketAddr; use authentication::Credentials; -use connection::adaptor::adapt_future; use util; #[derive(Clone)] @@ -207,6 +206,8 @@ impl NewService for Discovery { } } +use tokio_core::reactor::Core; + pub fn discovery_login(device_name: A, device_id: B) -> Result where A: Into, B: Into @@ -214,25 +215,25 @@ pub fn discovery_login(device_name: A, device_id: B) -> Result(mpsc::UnboundedSender); -pub struct StreamAdaptor(Option>>); - -impl SinkAdaptor { - pub fn send(&mut self, item: T) { - mpsc::UnboundedSender::send(&mut self.0, item).unwrap(); - } -} - -impl StreamAdaptor { - pub fn recv(&mut self) -> Result { - let receiver = self.0.take().unwrap(); - let receiving = receiver.into_future(); - - let (packet, receiver) = receiving.wait().map_err(|(e, _)| e).unwrap(); - - self.0 = Some(receiver); - - packet.unwrap() - } -} - -pub fn adapt(transport: S) -> (SinkAdaptor, - StreamAdaptor, - BoxFuture<(), E>) - where S: Sink + Stream + Send + 'static, - S::Item: Send + 'static, - S::SinkItem: Send + 'static, - E: Send + 'static, -{ - let (receiver_tx, receiver_rx) = mpsc::channel(0); - let (sender_tx, sender_rx) = mpsc::unbounded(); - - let (sink, stream) = transport.split(); - - let receiver_task = stream - .then(ok::<_, mpsc::SendError<_>>) - .forward(receiver_tx).map(|_| ()) - .map_err(|e| -> E { panic!(e) }); - - let sender_task = sender_rx - .map_err(|e| -> E { panic!(e) }) - .forward(sink).map(|_| ()); - - let task = (receiver_task, sender_task).into_future() - .map(|((), ())| ()).boxed(); - - (SinkAdaptor(sender_tx), - StreamAdaptor(Some(receiver_rx)), task) -} - -pub fn adapt_future(f: F) -> oneshot::Receiver> - where F: FnOnce(Handle) -> U + Send + 'static, - U: IntoFuture, - U::Item: Send + 'static, - U::Error: Send + 'static, -{ - let (tx, rx) = oneshot::channel(); - - thread::spawn(move || { - let mut core = Core::new().unwrap(); - let handle = core.handle(); - - let task = f(handle).into_future(); - let result = core.run(task); - - tx.complete(result); - }); - - rx -} diff --git a/src/connection/mod.rs b/src/connection/mod.rs index 3ceb6380..e88a66fc 100644 --- a/src/connection/mod.rs +++ b/src/connection/mod.rs @@ -1,6 +1,5 @@ mod codec; mod handshake; -pub mod adaptor; pub use self::codec::APCodec; pub use self::handshake::handshake; diff --git a/src/main.rs b/src/main.rs index 664507f1..c8883d58 100644 --- a/src/main.rs +++ b/src/main.rs @@ -172,7 +172,6 @@ fn main() { }); thread::spawn(move || spirc.run()); - thread::spawn(move || loop { session.poll() }); task }); diff --git a/src/mercury/mod.rs b/src/mercury/mod.rs index 077bcb69..cf014692 100644 --- a/src/mercury/mod.rs +++ b/src/mercury/mod.rs @@ -94,11 +94,9 @@ impl MercuryManager { }) } - /* pub fn sender>(&self, uri: T) -> MercurySender { MercurySender::new(self.clone(), uri.into()) } - */ pub fn subscribe>(&self, uri: T) -> BoxFuture, MercuryError> diff --git a/src/session.rs b/src/session.rs index 45e9c1ac..3c50d1e7 100644 --- a/src/session.rs +++ b/src/session.rs @@ -1,18 +1,19 @@ use crypto::digest::Digest; use crypto::sha1::Sha1; +use futures::Future; +use futures::sync::mpsc; +use futures::{Stream, BoxFuture, IntoFuture}; use std::io; use std::result::Result; -use std::sync::{Mutex, RwLock, Arc, Weak}; use std::str::FromStr; -use futures::Future as Future_; -use futures::{Stream, BoxFuture, IntoFuture}; +use std::sync::{RwLock, Arc, Weak}; use tokio_core::reactor::{Handle, Remote}; use apresolve::apresolve_or_fallback; use authentication::Credentials; use cache::Cache; use component::Lazy; -use connection::{self, adaptor}; +use connection; use audio_key::AudioKeyManager; use channel::ChannelManager; @@ -58,8 +59,7 @@ pub struct SessionInternal { cache: Box, - rx_connection: Mutex), io::Error>>, - tx_connection: Mutex)>>, + tx_connection: mpsc::UnboundedSender<(u8, Vec)>, audio_key: Lazy, audio_file: Lazy, @@ -85,7 +85,7 @@ pub fn device_id(name: &str) -> String { impl Session { pub fn connect(config: Config, credentials: Credentials, cache: Box, handle: Handle) - -> Box), Error=io::Error>> + -> Box), Error=io::Error>> { let access_point = apresolve_or_fallback::(&handle); @@ -120,7 +120,13 @@ impl Session { -> (Session, BoxFuture<(), io::Error>) { let transport = transport.map(|(cmd, data)| (cmd, data.as_ref().to_owned())); - let (tx, rx, task) = adaptor::adapt(transport); + let (sink, stream) = transport.split(); + + let (sender_tx, sender_rx) = mpsc::unbounded(); + + let sender_task = sender_rx + .map_err(|e| -> io::Error { panic!(e) }) + .forward(sink).map(|_| ()); let session = Session(Arc::new(SessionInternal { config: config, @@ -129,8 +135,7 @@ impl Session { canonical_username: username, }), - rx_connection: Mutex::new(rx), - tx_connection: Mutex::new(tx), + tx_connection: sender_tx, cache: cache, @@ -143,6 +148,17 @@ impl Session { handle: handle.remote().clone(), })); + let receiver_task = { + let session = session.clone(); + stream.for_each(move |(cmd, data)| { + session.dispatch(cmd, data); + Ok(()) + }) + }; + + let task = (receiver_task, sender_task).into_future() + .map(|((), ())| ()).boxed(); + (session, task) } @@ -174,9 +190,7 @@ impl Session { self.0.handle.spawn(f) } - pub fn poll(&self) { - let (cmd, data) = self.recv(); - + fn dispatch(&self, cmd: u8, data: Vec) { match cmd { 0x4 => self.send_packet(0x49, data), 0x4a => (), @@ -191,12 +205,8 @@ impl Session { } } - pub fn recv(&self) -> (u8, Vec) { - self.0.rx_connection.lock().unwrap().recv().unwrap() - } - pub fn send_packet(&self, cmd: u8, data: Vec) { - self.0.tx_connection.lock().unwrap().send((cmd, data)) + self.0.tx_connection.send((cmd, data)).unwrap(); } pub fn cache(&self) -> &Cache { @@ -229,7 +239,3 @@ impl SessionWeak { Session(self.0.upgrade().expect("Session died")) } } - -pub trait PacketHandler { - fn handle(&mut self, cmd: u8, data: Vec, session: &Session); -}