use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; use futures::sync::{oneshot, mpsc}; use futures::{BoxFuture, Future}; use std::collections::HashMap; use std::io::Read; use std::mem; use protocol; use protobuf; use futures::{Async, Poll}; use util::SeqGenerator; mod types; pub use self::types::*; mod sender; pub use self::sender::MercurySender; component! { MercuryManager : MercuryManagerInner { sequence: SeqGenerator = SeqGenerator::new(0), pending: HashMap, MercuryPending> = HashMap::new(), subscriptions: HashMap> = HashMap::new(), } } pub struct MercuryPending { parts: Vec>, partial: Option>, callback: Option>>, } pub struct MercuryFuture(oneshot::Receiver>); impl Future for MercuryFuture { type Item = T; type Error = MercuryError; fn poll(&mut self) -> Poll { match self.0.poll() { Ok(Async::Ready(Ok(value))) => Ok(Async::Ready(value)), Ok(Async::Ready(Err(err))) => Err(err), Ok(Async::NotReady) => Ok(Async::NotReady), Err(oneshot::Canceled) => Err(MercuryError), } } } impl MercuryManager { fn next_seq(&self) -> Vec { let mut seq = vec![0u8; 8]; BigEndian::write_u64(&mut seq, self.lock(|inner| inner.sequence.get())); seq } pub fn request(&self, req: MercuryRequest) -> MercuryFuture { let (tx, rx) = oneshot::channel(); let pending = MercuryPending { parts: Vec::new(), partial: None, callback: Some(tx), }; let seq = self.next_seq(); self.lock(|inner| inner.pending.insert(seq.clone(), pending)); let cmd = req.method.command(); let data = req.encode(&seq); self.session().send_packet(cmd, data); MercuryFuture(rx) } pub fn get>(&self, uri: T) -> MercuryFuture { self.request(MercuryRequest { method: MercuryMethod::GET, uri: uri.into(), content_type: None, payload: Vec::new(), }) } pub fn send>(&self, uri: T, data: Vec) -> MercuryFuture { self.request(MercuryRequest { method: MercuryMethod::SEND, uri: uri.into(), content_type: None, payload: vec![data], }) } /* pub fn sender>(&self, uri: T) -> MercurySender { MercurySender::new(self.clone(), uri.into()) } */ pub fn subscribe>(&self, uri: T) -> BoxFuture, MercuryError> { let request = self.request(MercuryRequest { method: MercuryMethod::SUB, uri: uri.into(), content_type: None, payload: Vec::new(), }); let manager = self.clone(); request.map(move |response| { let (tx, rx) = mpsc::unbounded(); manager.lock(move |inner| { for sub in response.payload { let mut sub : protocol::pubsub::Subscription = protobuf::parse_from_bytes(&sub).unwrap(); let uri = sub.take_uri(); inner.subscriptions.insert(uri, tx.clone()); } }); rx }).boxed() } pub fn dispatch(&self, cmd: u8, data: Vec) { let mut packet = ::std::io::Cursor::new(data); let seq = { let len = packet.read_u16::().unwrap() as usize; let mut seq = vec![0; len]; packet.read_exact(&mut seq).unwrap(); seq }; let flags = packet.read_u8().unwrap(); let count = packet.read_u16::().unwrap() as usize; let pending = self.lock(|inner| inner.pending.remove(&seq)); let mut pending = match pending { Some(pending) => pending, None if cmd == 0xb5 => { MercuryPending { parts: Vec::new(), partial: None, callback: None, } } None => { warn!("Ignore seq {:?} cmd {:x}", seq, cmd); return; } }; for i in 0..count { let mut part = Self::parse_part(&mut packet); if let Some(mut data) = mem::replace(&mut pending.partial, None) { data.append(&mut part); part = data; } if i == count - 1 && (flags == 2) { pending.partial = Some(part) } else { pending.parts.push(part); } } if flags == 0x1 { self.complete_request(cmd, pending); } else { self.lock(move |inner| inner.pending.insert(seq, pending)); } } fn parse_part(s: &mut T) -> Vec { let size = s.read_u16::().unwrap() as usize; let mut buffer = vec![0; size]; s.read_exact(&mut buffer).unwrap(); buffer } fn complete_request(&self, cmd: u8, mut pending: MercuryPending) { let header_data = pending.parts.remove(0); let header: protocol::mercury::Header = protobuf::parse_from_bytes(&header_data).unwrap(); let response = MercuryResponse { uri: header.get_uri().to_owned(), payload: pending.parts, }; if cmd == 0xb5 { self.lock(|inner| { if let Some(cb) = inner.subscriptions.get(&response.uri) { cb.send(response).unwrap(); } }) } else if let Some(cb) = pending.callback { cb.complete(Ok(response)); } } }