librespot/core/src/channel.rs

177 lines
5.1 KiB
Rust
Raw Normal View History

2017-01-19 22:45:24 +00:00
use byteorder::{BigEndian, ByteOrder};
use futures::sync::{BiLock, mpsc};
use futures::{Poll, Async, Stream};
use std::collections::HashMap;
use tokio_core::io::EasyBuf;
use util::SeqGenerator;
component! {
ChannelManager : ChannelManagerInner {
sequence: SeqGenerator<u16> = SeqGenerator::new(0),
2017-01-29 17:54:32 +00:00
channels: HashMap<u16, mpsc::UnboundedSender<(u8, EasyBuf)>> = HashMap::new(),
2017-01-19 22:45:24 +00:00
}
}
#[derive(Debug,Hash,PartialEq,Eq,Copy,Clone)]
pub struct ChannelError;
pub struct Channel {
2017-01-29 17:54:32 +00:00
receiver: mpsc::UnboundedReceiver<(u8, EasyBuf)>,
2017-01-19 22:45:24 +00:00
state: ChannelState,
}
pub struct ChannelHeaders(BiLock<Channel>);
pub struct ChannelData(BiLock<Channel>);
pub enum ChannelEvent {
Header(u8, Vec<u8>),
2017-01-29 17:54:32 +00:00
Data(EasyBuf),
2017-01-19 22:45:24 +00:00
}
#[derive(Clone)]
enum ChannelState {
Header(EasyBuf),
Data,
Closed,
}
impl ChannelManager {
pub fn allocate(&self) -> (u16, Channel) {
let (tx, rx) = mpsc::unbounded();
let seq = self.lock(|inner| {
let seq = inner.sequence.get();
inner.channels.insert(seq, tx);
seq
});
let channel = Channel {
receiver: rx,
state: ChannelState::Header(EasyBuf::new()),
};
(seq, channel)
}
2017-01-29 17:54:32 +00:00
pub fn dispatch(&self, cmd: u8, mut data: EasyBuf) {
2017-01-19 22:45:24 +00:00
use std::collections::hash_map::Entry;
2017-01-29 17:54:32 +00:00
let id: u16 = BigEndian::read_u16(data.drain_to(2).as_ref());
2017-01-19 22:45:24 +00:00
self.lock(|inner| {
if let Entry::Occupied(entry) = inner.channels.entry(id) {
2017-01-29 17:54:32 +00:00
let _ = entry.get().send((cmd, data));
2017-01-19 22:45:24 +00:00
}
});
}
}
impl Channel {
2017-01-29 17:54:32 +00:00
fn recv_packet(&mut self) -> Poll<EasyBuf, ChannelError> {
2017-01-19 22:45:24 +00:00
let (cmd, packet) = match self.receiver.poll() {
Ok(Async::Ready(t)) => t.expect("channel closed"),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(()) => unreachable!(),
};
if cmd == 0xa {
2017-01-29 17:54:32 +00:00
let code = BigEndian::read_u16(&packet.as_ref()[..2]);
2017-01-19 22:45:24 +00:00
error!("channel error: {} {}", packet.len(), code);
self.state = ChannelState::Closed;
Err(ChannelError)
} else {
Ok(Async::Ready(packet))
}
}
pub fn split(self) -> (ChannelHeaders, ChannelData) {
let (headers, data) = BiLock::new(self);
(ChannelHeaders(headers), ChannelData(data))
}
}
impl Stream for Channel {
type Item = ChannelEvent;
type Error = ChannelError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
match self.state.clone() {
ChannelState::Closed => panic!("Polling already terminated channel"),
ChannelState::Header(mut data) => {
if data.len() == 0 {
2017-01-29 17:54:32 +00:00
data = try_ready!(self.recv_packet());
2017-01-19 22:45:24 +00:00
}
let length = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize;
if length == 0 {
assert_eq!(data.len(), 0);
self.state = ChannelState::Data;
} else {
let header_id = data.drain_to(1).as_ref()[0];
let header_data = data.drain_to(length - 1).as_ref().to_owned();
self.state = ChannelState::Header(data);
2017-01-29 17:54:32 +00:00
let event = ChannelEvent::Header(header_id, header_data);
return Ok(Async::Ready(Some(event)));
2017-01-19 22:45:24 +00:00
}
}
ChannelState::Data => {
let data = try_ready!(self.recv_packet());
2017-01-29 17:54:32 +00:00
if data.len() == 0 {
2017-01-19 22:45:24 +00:00
self.receiver.close();
self.state = ChannelState::Closed;
return Ok(Async::Ready(None));
} else {
2017-01-29 17:54:32 +00:00
let event = ChannelEvent::Data(data);
return Ok(Async::Ready(Some(event)));
2017-01-19 22:45:24 +00:00
}
}
}
}
}
}
impl Stream for ChannelData {
2017-01-29 17:54:32 +00:00
type Item = EasyBuf;
2017-01-19 22:45:24 +00:00
type Error = ChannelError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let mut channel = match self.0.poll_lock() {
Async::Ready(c) => c,
Async::NotReady => return Ok(Async::NotReady),
};
loop {
match try_ready!(channel.poll()) {
Some(ChannelEvent::Header(..)) => (),
Some(ChannelEvent::Data(data)) => return Ok(Async::Ready(Some(data))),
None => return Ok(Async::Ready(None)),
}
}
}
}
impl Stream for ChannelHeaders {
type Item = (u8, Vec<u8>);
type Error = ChannelError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let mut channel = match self.0.poll_lock() {
Async::Ready(c) => c,
Async::NotReady => return Ok(Async::NotReady),
};
match try_ready!(channel.poll()) {
Some(ChannelEvent::Header(id, data)) => Ok(Async::Ready(Some((id, data)))),
Some(ChannelEvent::Data(..)) | None => Ok(Async::Ready(None)),
}
}
}