Move session to tokio over fully

This commit is contained in:
Paul Lietar 2017-01-20 02:37:02 +00:00
parent bf6be73caa
commit d62a154786
6 changed files with 45 additions and 121 deletions

View file

@ -18,7 +18,6 @@ use tokio_core::reactor::Handle;
use std::net::SocketAddr; use std::net::SocketAddr;
use authentication::Credentials; use authentication::Credentials;
use connection::adaptor::adapt_future;
use util; use util;
#[derive(Clone)] #[derive(Clone)]
@ -207,6 +206,8 @@ impl NewService for Discovery {
} }
} }
use tokio_core::reactor::Core;
pub fn discovery_login<A,B>(device_name: A, device_id: B) -> Result<Credentials, ()> pub fn discovery_login<A,B>(device_name: A, device_id: B) -> Result<Credentials, ()>
where A: Into<String>, where A: Into<String>,
B: Into<String> B: Into<String>
@ -214,25 +215,25 @@ pub fn discovery_login<A,B>(device_name: A, device_id: B) -> Result<Credentials,
let device_name = device_name.into(); let device_name = device_name.into();
let device_id = device_id.into(); let device_id = device_id.into();
let (discovery, rx) = Discovery::new(device_name.clone(), device_id); let (discovery, creds_rx) = Discovery::new(device_name.clone(), device_id);
let creds_rx = creds_rx.into_future()
.map(move |(creds, _)| creds.unwrap()).map_err(|(e, _)| e);
let addr = "0.0.0.0:0".parse().unwrap(); let addr = "0.0.0.0:0".parse().unwrap();
let cred = adapt_future(move |handle| {
let addr = discovery.serve(&addr, &handle).unwrap(); let mut core = Core::new().unwrap();
let handle = core.handle();
let listening_addr = discovery.serve(&addr, &handle).unwrap();
let responder = mdns::Responder::spawn(&handle).unwrap(); let responder = mdns::Responder::spawn(&handle).unwrap();
let svc = responder.register( let _svc = responder.register(
"_spotify-connect._tcp".to_owned(), "_spotify-connect._tcp".to_owned(),
device_name, device_name,
addr.port(), listening_addr.port(),
&["VERSION=1.0", "CPath=/"]); &["VERSION=1.0", "CPath=/"]);
rx.into_future() let creds = core.run(creds_rx).unwrap();
.map(move |(creds, _)| (creds, svc))
.map_err(|(e, _)| e)
});
Ok(creds)
let (creds, _svc) = cred.wait().unwrap().unwrap();
Ok(creds.unwrap())
} }

View file

@ -1,79 +0,0 @@
use futures::future::ok;
use futures::sync::mpsc;
use futures::sync::oneshot;
use futures::{Future, Sink, Stream, BoxFuture, IntoFuture};
use std::thread;
use tokio_core::reactor::Core;
use tokio_core::reactor::Handle;
pub struct SinkAdaptor<T>(mpsc::UnboundedSender<T>);
pub struct StreamAdaptor<T, E>(Option<mpsc::Receiver<Result<T, E>>>);
impl <T> SinkAdaptor<T> {
pub fn send(&mut self, item: T) {
mpsc::UnboundedSender::send(&mut self.0, item).unwrap();
}
}
impl <T, E> StreamAdaptor<T, E> {
pub fn recv(&mut self) -> Result<T, E> {
let receiver = self.0.take().unwrap();
let receiving = receiver.into_future();
let (packet, receiver) = receiving.wait().map_err(|(e, _)| e).unwrap();
self.0 = Some(receiver);
packet.unwrap()
}
}
pub fn adapt<S, E>(transport: S) -> (SinkAdaptor<S::SinkItem>,
StreamAdaptor<S::Item, E>,
BoxFuture<(), E>)
where S: Sink<SinkError=E> + Stream<Error=E> + Send + 'static,
S::Item: Send + 'static,
S::SinkItem: Send + 'static,
E: Send + 'static,
{
let (receiver_tx, receiver_rx) = mpsc::channel(0);
let (sender_tx, sender_rx) = mpsc::unbounded();
let (sink, stream) = transport.split();
let receiver_task = stream
.then(ok::<_, mpsc::SendError<_>>)
.forward(receiver_tx).map(|_| ())
.map_err(|e| -> E { panic!(e) });
let sender_task = sender_rx
.map_err(|e| -> E { panic!(e) })
.forward(sink).map(|_| ());
let task = (receiver_task, sender_task).into_future()
.map(|((), ())| ()).boxed();
(SinkAdaptor(sender_tx),
StreamAdaptor(Some(receiver_rx)), task)
}
pub fn adapt_future<F, U>(f: F) -> oneshot::Receiver<Result<U::Item, U::Error>>
where F: FnOnce(Handle) -> U + Send + 'static,
U: IntoFuture,
U::Item: Send + 'static,
U::Error: Send + 'static,
{
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
let mut core = Core::new().unwrap();
let handle = core.handle();
let task = f(handle).into_future();
let result = core.run(task);
tx.complete(result);
});
rx
}

View file

@ -1,6 +1,5 @@
mod codec; mod codec;
mod handshake; mod handshake;
pub mod adaptor;
pub use self::codec::APCodec; pub use self::codec::APCodec;
pub use self::handshake::handshake; pub use self::handshake::handshake;

View file

@ -172,7 +172,6 @@ fn main() {
}); });
thread::spawn(move || spirc.run()); thread::spawn(move || spirc.run());
thread::spawn(move || loop { session.poll() });
task task
}); });

View file

@ -94,11 +94,9 @@ impl MercuryManager {
}) })
} }
/*
pub fn sender<T: Into<String>>(&self, uri: T) -> MercurySender { pub fn sender<T: Into<String>>(&self, uri: T) -> MercurySender {
MercurySender::new(self.clone(), uri.into()) MercurySender::new(self.clone(), uri.into())
} }
*/
pub fn subscribe<T: Into<String>>(&self, uri: T) pub fn subscribe<T: Into<String>>(&self, uri: T)
-> BoxFuture<mpsc::UnboundedReceiver<MercuryResponse>, MercuryError> -> BoxFuture<mpsc::UnboundedReceiver<MercuryResponse>, MercuryError>

View file

@ -1,18 +1,19 @@
use crypto::digest::Digest; use crypto::digest::Digest;
use crypto::sha1::Sha1; use crypto::sha1::Sha1;
use futures::Future;
use futures::sync::mpsc;
use futures::{Stream, BoxFuture, IntoFuture};
use std::io; use std::io;
use std::result::Result; use std::result::Result;
use std::sync::{Mutex, RwLock, Arc, Weak};
use std::str::FromStr; use std::str::FromStr;
use futures::Future as Future_; use std::sync::{RwLock, Arc, Weak};
use futures::{Stream, BoxFuture, IntoFuture};
use tokio_core::reactor::{Handle, Remote}; use tokio_core::reactor::{Handle, Remote};
use apresolve::apresolve_or_fallback; use apresolve::apresolve_or_fallback;
use authentication::Credentials; use authentication::Credentials;
use cache::Cache; use cache::Cache;
use component::Lazy; use component::Lazy;
use connection::{self, adaptor}; use connection;
use audio_key::AudioKeyManager; use audio_key::AudioKeyManager;
use channel::ChannelManager; use channel::ChannelManager;
@ -58,8 +59,7 @@ pub struct SessionInternal {
cache: Box<Cache + Send + Sync>, cache: Box<Cache + Send + Sync>,
rx_connection: Mutex<adaptor::StreamAdaptor<(u8, Vec<u8>), io::Error>>, tx_connection: mpsc::UnboundedSender<(u8, Vec<u8>)>,
tx_connection: Mutex<adaptor::SinkAdaptor<(u8, Vec<u8>)>>,
audio_key: Lazy<AudioKeyManager>, audio_key: Lazy<AudioKeyManager>,
audio_file: Lazy<AudioFileManager>, audio_file: Lazy<AudioFileManager>,
@ -85,7 +85,7 @@ pub fn device_id(name: &str) -> String {
impl Session { impl Session {
pub fn connect(config: Config, credentials: Credentials, pub fn connect(config: Config, credentials: Credentials,
cache: Box<Cache + Send + Sync>, handle: Handle) cache: Box<Cache + Send + Sync>, handle: Handle)
-> Box<Future_<Item=(Session, BoxFuture<(), io::Error>), Error=io::Error>> -> Box<Future<Item=(Session, BoxFuture<(), io::Error>), Error=io::Error>>
{ {
let access_point = apresolve_or_fallback::<io::Error>(&handle); let access_point = apresolve_or_fallback::<io::Error>(&handle);
@ -120,7 +120,13 @@ impl Session {
-> (Session, BoxFuture<(), io::Error>) -> (Session, BoxFuture<(), io::Error>)
{ {
let transport = transport.map(|(cmd, data)| (cmd, data.as_ref().to_owned())); let transport = transport.map(|(cmd, data)| (cmd, data.as_ref().to_owned()));
let (tx, rx, task) = adaptor::adapt(transport); let (sink, stream) = transport.split();
let (sender_tx, sender_rx) = mpsc::unbounded();
let sender_task = sender_rx
.map_err(|e| -> io::Error { panic!(e) })
.forward(sink).map(|_| ());
let session = Session(Arc::new(SessionInternal { let session = Session(Arc::new(SessionInternal {
config: config, config: config,
@ -129,8 +135,7 @@ impl Session {
canonical_username: username, canonical_username: username,
}), }),
rx_connection: Mutex::new(rx), tx_connection: sender_tx,
tx_connection: Mutex::new(tx),
cache: cache, cache: cache,
@ -143,6 +148,17 @@ impl Session {
handle: handle.remote().clone(), handle: handle.remote().clone(),
})); }));
let receiver_task = {
let session = session.clone();
stream.for_each(move |(cmd, data)| {
session.dispatch(cmd, data);
Ok(())
})
};
let task = (receiver_task, sender_task).into_future()
.map(|((), ())| ()).boxed();
(session, task) (session, task)
} }
@ -174,9 +190,7 @@ impl Session {
self.0.handle.spawn(f) self.0.handle.spawn(f)
} }
pub fn poll(&self) { fn dispatch(&self, cmd: u8, data: Vec<u8>) {
let (cmd, data) = self.recv();
match cmd { match cmd {
0x4 => self.send_packet(0x49, data), 0x4 => self.send_packet(0x49, data),
0x4a => (), 0x4a => (),
@ -191,12 +205,8 @@ impl Session {
} }
} }
pub fn recv(&self) -> (u8, Vec<u8>) {
self.0.rx_connection.lock().unwrap().recv().unwrap()
}
pub fn send_packet(&self, cmd: u8, data: Vec<u8>) { pub fn send_packet(&self, cmd: u8, data: Vec<u8>) {
self.0.tx_connection.lock().unwrap().send((cmd, data)) self.0.tx_connection.send((cmd, data)).unwrap();
} }
pub fn cache(&self) -> &Cache { pub fn cache(&self) -> &Cache {
@ -229,7 +239,3 @@ impl SessionWeak {
Session(self.0.upgrade().expect("Session died")) Session(self.0.upgrade().expect("Session died"))
} }
} }
pub trait PacketHandler {
fn handle(&mut self, cmd: u8, data: Vec<u8>, session: &Session);
}