Move audio_file and channels to tokio

This commit is contained in:
Paul Lietar 2017-01-19 22:45:24 +00:00
parent 05118b40f8
commit 379c90c0b2
13 changed files with 468 additions and 556 deletions

22
Cargo.lock generated
View file

@ -8,7 +8,6 @@ dependencies = [
"ctrlc 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)",
"env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"error-chain 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
"eventual 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
"getopts 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.11.0-a.0 (git+https://github.com/hyperium/hyper)",
@ -204,16 +203,6 @@ dependencies = [
"backtrace 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "eventual"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"syncbox 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "futures"
version = "0.1.8"
@ -805,15 +794,6 @@ name = "smallvec"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "syncbox"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)",
"time 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "syntex"
version = "0.44.0"
@ -1216,7 +1196,6 @@ dependencies = [
"checksum dtoa 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0dd841b58510c9618291ffa448da2e4e0f699d984d436122372f446dae62263d"
"checksum env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "15abd780e45b3ea4f76b4e9a26ff4843258dd8a3eed2775a0e7368c2e7936c2f"
"checksum error-chain 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "318cb3c71ee4cdea69fdc9e15c173b245ed6063e1709029e8fd32525a881120f"
"checksum eventual 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "b9bda6d089b434ca50f3d6feb5fca421309b8bac97b8be9af51cff879fa3f54b"
"checksum futures 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3e0b237aed5d8b61bc7d6ee1b8ebd719d0a934a38d363c5e56daf34bb634d9b2"
"checksum futures-cpupool 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "bb982bb25cd8fa5da6a8eb3a460354c984ff1113da82bcb4f0b0862b5795db82"
"checksum gcc 0.3.41 (registry+https://github.com/rust-lang/crates.io-index)" = "3689e1982a563af74960ae3a4758aa632bb8fd984cfc3cc3b60ee6109477ab6e"
@ -1286,7 +1265,6 @@ dependencies = [
"checksum shannon-sys 0.1.0 (git+https://github.com/plietar/rust-shannon)" = "<none>"
"checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23"
"checksum smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4c8cbcd6df1e117c2210e13ab5109635ad68a929fcbb8964dc965b76cb5ee013"
"checksum syncbox 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "05bc2b72659ac27a2d0e7c4166c8596578197c4c41f767deab12c81f523b85c7"
"checksum syntex 0.44.0 (registry+https://github.com/rust-lang/crates.io-index)" = "84f37b94d7ee762bcac58741f73a95465cf87188c3b93f10df9245aff821b2b4"
"checksum syntex 0.50.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3bd253b0d7d787723a33384d426f0ebec7f8edccfaeb2022d0177162bb134da0"
"checksum syntex 0.55.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5b186e277908427269816c542c912b45253ed11808a09780bd224679770ce351"

View file

@ -25,7 +25,6 @@ bit-set = "0.4.0"
byteorder = "1.0"
ctrlc = { version = "2.0", features = ["termination"] }
env_logger = "0.3.2"
eventual = "0.1.6"
getopts = "0.2.14"
hyper = { git = "https://github.com/hyperium/hyper" }
lazy_static = "0.2.0"

View file

@ -7,6 +7,7 @@ use serde_json;
use tokio_core::reactor::Handle;
error_chain! { }
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct APResolveData {
ap_list: Vec<String>

View file

@ -1,167 +1,270 @@
use bit_set::BitSet;
use byteorder::{ByteOrder, BigEndian};
use eventual;
use byteorder::{ByteOrder, BigEndian, WriteBytesExt};
use futures::Stream;
use futures::sync::{oneshot, mpsc};
use futures::{Poll, Async, Future};
use std::cmp::min;
use std::sync::{Arc, Condvar, Mutex};
use std::sync::mpsc::{self, TryRecvError};
use std::fs;
use std::io::{self, Read, Write, Seek, SeekFrom};
use std::sync::{Arc, Condvar, Mutex};
use tempfile::NamedTempFile;
use util::{FileId, IgnoreExt};
use channel::{Channel, ChannelData, ChannelError, ChannelHeaders};
use session::Session;
use audio_file2;
use util::FileId;
const CHUNK_SIZE: usize = 0x20000;
component! {
AudioFileManager : AudioFileManagerInner { }
}
pub struct AudioFile {
read_file: fs::File,
position: u64,
seek: mpsc::Sender<u64>,
seek: mpsc::UnboundedSender<u64>,
shared: Arc<AudioFileShared>,
}
struct AudioFileInternal {
partial_tx: Option<eventual::Complete<fs::File, ()>>,
complete_tx: eventual::Complete<NamedTempFile, ()>,
write_file: NamedTempFile,
seek_rx: mpsc::Receiver<u64>,
shared: Arc<AudioFileShared>,
chunk_count: usize,
pub struct AudioFileOpen {
session: Session,
data_rx: Option<ChannelData>,
headers: ChannelHeaders,
file_id: FileId,
complete_tx: Option<oneshot::Sender<NamedTempFile>>,
}
struct AudioFileShared {
file_id: FileId,
chunk_count: usize,
cond: Condvar,
bitmap: Mutex<BitSet>,
}
impl AudioFile {
pub fn new(session: &Session, file_id: FileId)
-> (eventual::Future<AudioFile, ()>, eventual::Future<NamedTempFile, ()>) {
impl AudioFileOpen {
fn finish(&mut self, size: usize) -> AudioFile {
let chunk_count = (size + CHUNK_SIZE - 1) / CHUNK_SIZE;
let shared = Arc::new(AudioFileShared {
file_id: self.file_id,
chunk_count: chunk_count,
cond: Condvar::new(),
bitmap: Mutex::new(BitSet::new()),
bitmap: Mutex::new(BitSet::with_capacity(chunk_count)),
});
let (seek_tx, seek_rx) = mpsc::channel();
let (partial_tx, partial_rx) = eventual::Future::pair();
let (complete_tx, complete_rx) = eventual::Future::pair();
let mut write_file = NamedTempFile::new().unwrap();
write_file.set_len(size as u64).unwrap();
write_file.seek(SeekFrom::Start(0)).unwrap();
let internal = AudioFileInternal {
shared: shared.clone(),
write_file: NamedTempFile::new().unwrap(),
seek_rx: seek_rx,
partial_tx: Some(partial_tx),
complete_tx: complete_tx,
chunk_count: 0,
};
let read_file = write_file.reopen().unwrap();
audio_file2::AudioFile::new(file_id, 0, internal, session);
let data_rx = self.data_rx.take().unwrap();
let complete_tx = self.complete_tx.take().unwrap();
let (seek_tx, seek_rx) = mpsc::unbounded();
let file_rx = partial_rx.map(|read_file| {
AudioFile {
read_file: read_file,
let fetcher = AudioFileFetch::new(
self.session.clone(), shared.clone(), data_rx, write_file, seek_rx, complete_tx
);
self.session.spawn(move |_| fetcher);
position: 0,
seek: seek_tx,
AudioFile {
read_file: read_file,
shared: shared,
}
});
position: 0,
seek: seek_tx,
(file_rx, complete_rx)
shared: shared,
}
}
}
impl audio_file2::Handler for AudioFileInternal {
fn on_header(mut self, header_id: u8, header_data: &[u8], _session: &Session) -> audio_file2::Response<Self> {
if header_id == 0x3 {
if let Some(tx) = self.partial_tx.take() {
let size = BigEndian::read_u32(header_data) as usize * 4;
self.write_file.set_len(size as u64).unwrap();
let read_file = self.write_file.reopen().unwrap();
impl Future for AudioFileOpen {
type Item = AudioFile;
type Error = ChannelError;
self.chunk_count = (size + CHUNK_SIZE - 1) / CHUNK_SIZE;
self.shared.bitmap.lock().unwrap().reserve_len(self.chunk_count);
fn poll(&mut self) -> Poll<AudioFile, ChannelError> {
loop {
let (id, data) = try_ready!(self.headers.poll()).unwrap();
tx.complete(read_file)
if id == 0x3 {
let size = BigEndian::read_u32(&data) as usize * 4;
let file = self.finish(size);
return Ok(Async::Ready(file));
}
}
audio_file2::Response::Continue(self)
}
}
fn on_data(mut self, offset: usize, data: &[u8], _session: &Session) -> audio_file2::Response<Self> {
self.write_file.seek(SeekFrom::Start(offset as u64)).unwrap();
self.write_file.write_all(&data).unwrap();
impl AudioFileManager {
pub fn open(&self, file_id: FileId) -> (AudioFileOpen, oneshot::Receiver<NamedTempFile>) {
let (complete_tx, complete_rx) = oneshot::channel();
let (headers, data) = request_chunk(&self.session(), file_id, 0).split();
// We've crossed a chunk boundary
// Mark the previous one as complete in the bitmap and notify the reader
let seek = if (offset + data.len()) % CHUNK_SIZE < data.len() {
let mut index = offset / CHUNK_SIZE;
let mut bitmap = self.shared.bitmap.lock().unwrap();
bitmap.insert(index);
self.shared.cond.notify_all();
let open = AudioFileOpen {
session: self.session(),
file_id: file_id,
// If all blocks are complete when can stop
if bitmap.len() >= self.chunk_count {
drop(bitmap);
self.write_file.seek(SeekFrom::Start(0)).unwrap();
self.complete_tx.complete(self.write_file);
return audio_file2::Response::Close;
}
headers: headers,
data_rx: Some(data),
// Find the next undownloaded block
index = (index + 1) % self.chunk_count;
while bitmap.contains(index) {
index = (index + 1) % self.chunk_count;
}
Some(index)
} else {
None
complete_tx: Some(complete_tx),
};
match self.seek_rx.try_recv() {
Ok(seek_offset) => audio_file2::Response::Seek(self, seek_offset as usize / CHUNK_SIZE * CHUNK_SIZE),
Err(TryRecvError::Disconnected) => audio_file2::Response::Close,
Err(TryRecvError::Empty) => match seek {
Some(index) => audio_file2::Response::Seek(self, index * CHUNK_SIZE),
None => audio_file2::Response::Continue(self),
},
(open, complete_rx)
}
}
fn request_chunk(session: &Session, file: FileId, index: usize) -> Channel {
debug!("requesting chunk {}", index);
let start = (index * CHUNK_SIZE / 4) as u32;
let end = ((index + 1) * CHUNK_SIZE / 4) as u32;
let (id, channel) = session.channel().allocate();
let mut data: Vec<u8> = Vec::new();
data.write_u16::<BigEndian>(id).unwrap();
data.write_u8(0).unwrap();
data.write_u8(1).unwrap();
data.write_u16::<BigEndian>(0x0000).unwrap();
data.write_u32::<BigEndian>(0x00000000).unwrap();
data.write_u32::<BigEndian>(0x00009C40).unwrap();
data.write_u32::<BigEndian>(0x00020000).unwrap();
data.write(&file.0).unwrap();
data.write_u32::<BigEndian>(start).unwrap();
data.write_u32::<BigEndian>(end).unwrap();
session.send_packet(0x8, data);
channel
}
struct AudioFileFetch {
session: Session,
shared: Arc<AudioFileShared>,
output: Option<NamedTempFile>,
index: usize,
data_rx: ChannelData,
seek_rx: mpsc::UnboundedReceiver<u64>,
complete_tx: Option<oneshot::Sender<NamedTempFile>>,
}
impl AudioFileFetch {
fn new(session: Session, shared: Arc<AudioFileShared>,
data_rx: ChannelData, output: NamedTempFile,
seek_rx: mpsc::UnboundedReceiver<u64>,
complete_tx: oneshot::Sender<NamedTempFile>) -> AudioFileFetch
{
AudioFileFetch {
session: session,
shared: shared,
output: Some(output),
index: 0,
data_rx: data_rx,
seek_rx: seek_rx,
complete_tx: Some(complete_tx),
}
}
fn on_eof(mut self, _session: &Session) -> audio_file2::Response<Self> {
let index = {
let mut index = self.chunk_count - 1;
let mut bitmap = self.shared.bitmap.lock().unwrap();
bitmap.insert(index);
self.shared.cond.notify_all();
fn download(&mut self, mut new_index: usize) {
assert!(new_index < self.shared.chunk_count);
// If all blocks are complete when can stop
if bitmap.len() >= self.chunk_count {
drop(bitmap);
self.write_file.seek(SeekFrom::Start(0)).unwrap();
self.complete_tx.complete(self.write_file);
return audio_file2::Response::Close;
{
let bitmap = self.shared.bitmap.lock().unwrap();
while bitmap.contains(new_index) {
new_index = (new_index + 1) % self.shared.chunk_count;
}
}
// Find the next undownloaded block
index = (index + 1) % self.chunk_count;
while bitmap.contains(index) {
index = (index + 1) % self.chunk_count;
}
index
};
if self.index != new_index {
self.index = new_index;
audio_file2::Response::Seek(self, index * CHUNK_SIZE)
let offset = self.index * CHUNK_SIZE;
self.output.as_mut().unwrap()
.seek(SeekFrom::Start(offset as u64)).unwrap();
let (_headers, data) = request_chunk(&self.session, self.shared.file_id, self.index).split();
self.data_rx = data;
}
}
fn on_error(self, _session: &Session) {
fn finish(&mut self) {
let mut output = self.output.take().unwrap();
let complete_tx = self.complete_tx.take().unwrap();
output.seek(SeekFrom::Start(0)).unwrap();
complete_tx.complete(output);
}
}
impl Future for AudioFileFetch {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
loop {
let mut progress = false;
match self.seek_rx.poll() {
Ok(Async::Ready(None)) => {
return Ok(Async::Ready(()));
}
Ok(Async::Ready(Some(offset))) => {
progress = true;
let index = offset as usize / CHUNK_SIZE;
self.download(index);
}
Ok(Async::NotReady) => (),
Err(()) => unreachable!(),
}
match self.data_rx.poll() {
Ok(Async::Ready(Some(data))) => {
progress = true;
self.output.as_mut().unwrap()
.write_all(&data).unwrap();
}
Ok(Async::Ready(None)) => {
progress = true;
debug!("chunk {} / {} complete", self.index, self.shared.chunk_count);
let full = {
let mut bitmap = self.shared.bitmap.lock().unwrap();
bitmap.insert(self.index as usize);
self.shared.cond.notify_all();
bitmap.len() >= self.shared.chunk_count
};
if full {
self.finish();
return Ok(Async::Ready(()));
}
let new_index = (self.index + 1) % self.shared.chunk_count;
self.download(new_index);
}
Ok(Async::NotReady) => (),
Err(ChannelError) => {
warn!("error from channel");
return Ok(Async::Ready(()));
},
}
if !progress {
return Ok(Async::NotReady);
}
}
}
}
@ -192,7 +295,7 @@ impl Seek for AudioFile {
// Notify the fetch thread to get the correct block
// This can fail if fetch thread has completed, in which case the
// block is ready. Just ignore the error.
self.seek.send(self.position).ignore();
Ok(self.position as u64)
let _ = self.seek.send(self.position);
Ok(self.position)
}
}

View file

@ -1,131 +0,0 @@
use session::Session;
use stream;
use util::FileId;
use byteorder::{BigEndian, WriteBytesExt};
use std::io::Write;
const CHUNK_SIZE: usize = 0x20000;
pub enum Response<H> {
// Wait(H),
Continue(H),
Seek(H, usize),
Close,
}
pub trait Handler : Sized + Send + 'static {
fn on_header(self, header_id: u8, header_data: &[u8], session: &Session) -> Response<Self>;
fn on_data(self, offset: usize, data: &[u8], session: &Session) -> Response<Self>;
fn on_eof(self, session: &Session) -> Response<Self>;
fn on_error(self, session: &Session);
}
pub struct AudioFile<H: Handler> {
handler: H,
file_id: FileId,
offset: usize,
}
impl <H: Handler> AudioFile<H> {
pub fn new(file_id: FileId, offset: usize, handler: H, session: &Session) {
let handler = AudioFile {
handler: handler,
file_id: file_id,
offset: offset,
};
session.stream(Box::new(handler));
}
}
impl <H: Handler> stream::Handler for AudioFile<H> {
fn on_create(self, channel_id: stream::ChannelId, session: &Session) -> stream::Response<Self> {
debug!("Got channel {}", channel_id);
let mut data: Vec<u8> = Vec::new();
data.write_u16::<BigEndian>(channel_id).unwrap();
data.write_u8(0).unwrap();
data.write_u8(1).unwrap();
data.write_u16::<BigEndian>(0x0000).unwrap();
data.write_u32::<BigEndian>(0x00000000).unwrap();
data.write_u32::<BigEndian>(0x00009C40).unwrap();
data.write_u32::<BigEndian>(0x00020000).unwrap();
data.write(&self.file_id.0).unwrap();
data.write_u32::<BigEndian>(self.offset as u32 / 4).unwrap();
data.write_u32::<BigEndian>((self.offset + CHUNK_SIZE) as u32 / 4).unwrap();
session.send_packet(0x8, data);
stream::Response::Continue(self)
}
fn on_header(mut self, header_id: u8, header_data: &[u8], session: &Session) -> stream::Response<Self> {
match self.handler.on_header(header_id, header_data, session) {
Response::Continue(handler) => {
self.handler = handler;
stream::Response::Continue(self)
}
Response::Seek(handler, offset) => {
self.handler = handler;
self.offset = offset;
stream::Response::Spawn(self)
}
Response::Close => stream::Response::Close,
}
}
fn on_data(mut self, data: &[u8], session: &Session) -> stream::Response<Self> {
match self.handler.on_data(self.offset, data, session) {
Response::Continue(handler) => {
self.handler = handler;
self.offset += data.len();
stream::Response::Continue(self)
}
Response::Seek(handler, offset) => {
self.handler = handler;
self.offset = offset;
stream::Response::Spawn(self)
}
Response::Close => stream::Response::Close,
}
}
fn on_close(self, _session: &Session) -> stream::Response<Self> {
// End of chunk, request a new one
stream::Response::Spawn(self)
}
fn on_error(mut self, session: &Session) -> stream::Response<Self> {
match self.handler.on_eof(session) {
Response::Continue(_) => stream::Response::Close,
Response::Seek(handler, offset) => {
self.handler = handler;
self.offset = offset;
stream::Response::Spawn(self)
}
Response::Close => stream::Response::Close,
}
}
fn box_on_create(self: Box<Self>, channel_id: stream::ChannelId, session: &Session) -> stream::Response<Box<stream::Handler>> {
self.on_create(channel_id, session).boxed()
}
fn box_on_header(self: Box<Self>, header_id: u8, header_data: &[u8], session: &Session) -> stream::Response<Box<stream::Handler>> {
self.on_header(header_id, header_data, session).boxed()
}
fn box_on_data(self: Box<Self>, data: &[u8], session: &Session) -> stream::Response<Box<stream::Handler>> {
self.on_data(data, session).boxed()
}
fn box_on_error(self: Box<Self>, session: &Session) -> stream::Response<Box<stream::Handler>> {
self.on_error(session).boxed()
}
fn box_on_close(self: Box<Self>, session: &Session) -> stream::Response<Box<stream::Handler>> {
self.on_close(session).boxed()
}
}

174
src/channel.rs Normal file
View file

@ -0,0 +1,174 @@
use byteorder::{BigEndian, ByteOrder};
use futures::sync::{BiLock, mpsc};
use futures::{Poll, Async, Stream};
use std::collections::HashMap;
use tokio_core::io::EasyBuf;
use util::SeqGenerator;
component! {
ChannelManager : ChannelManagerInner {
sequence: SeqGenerator<u16> = SeqGenerator::new(0),
channels: HashMap<u16, mpsc::UnboundedSender<(u8, Vec<u8>)>> = HashMap::new(),
}
}
#[derive(Debug,Hash,PartialEq,Eq,Copy,Clone)]
pub struct ChannelError;
pub struct Channel {
receiver: mpsc::UnboundedReceiver<(u8, Vec<u8>)>,
state: ChannelState,
}
pub struct ChannelHeaders(BiLock<Channel>);
pub struct ChannelData(BiLock<Channel>);
pub enum ChannelEvent {
Header(u8, Vec<u8>),
Data(Vec<u8>),
}
#[derive(Clone)]
enum ChannelState {
Header(EasyBuf),
Data,
Closed,
}
impl ChannelManager {
pub fn allocate(&self) -> (u16, Channel) {
let (tx, rx) = mpsc::unbounded();
let seq = self.lock(|inner| {
let seq = inner.sequence.get();
inner.channels.insert(seq, tx);
seq
});
let channel = Channel {
receiver: rx,
state: ChannelState::Header(EasyBuf::new()),
};
(seq, channel)
}
pub fn dispatch(&self, cmd: u8, data: Vec<u8>) {
use std::collections::hash_map::Entry;
let id: u16 = BigEndian::read_u16(&data[..2]);
self.lock(|inner| {
if let Entry::Occupied(entry) = inner.channels.entry(id) {
let _ = entry.get().send((cmd, data[2..].to_owned()));
}
});
}
}
impl Channel {
fn recv_packet(&mut self) -> Poll<Vec<u8>, ChannelError> {
let (cmd, packet) = match self.receiver.poll() {
Ok(Async::Ready(t)) => t.expect("channel closed"),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(()) => unreachable!(),
};
if cmd == 0xa {
let code = BigEndian::read_u16(&packet[..2]);
error!("channel error: {} {}", packet.len(), code);
self.state = ChannelState::Closed;
Err(ChannelError)
} else {
Ok(Async::Ready(packet))
}
}
pub fn split(self) -> (ChannelHeaders, ChannelData) {
let (headers, data) = BiLock::new(self);
(ChannelHeaders(headers), ChannelData(data))
}
}
impl Stream for Channel {
type Item = ChannelEvent;
type Error = ChannelError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
match self.state.clone() {
ChannelState::Closed => panic!("Polling already terminated channel"),
ChannelState::Header(mut data) => {
if data.len() == 0 {
data = EasyBuf::from(try_ready!(self.recv_packet()));
}
let length = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize;
if length == 0 {
assert_eq!(data.len(), 0);
self.state = ChannelState::Data;
} else {
let header_id = data.drain_to(1).as_ref()[0];
let header_data = data.drain_to(length - 1).as_ref().to_owned();
self.state = ChannelState::Header(data);
return Ok(Async::Ready(Some(ChannelEvent::Header(header_id, header_data))));
}
}
ChannelState::Data => {
let data = try_ready!(self.recv_packet());
if data.is_empty() {
self.receiver.close();
self.state = ChannelState::Closed;
return Ok(Async::Ready(None));
} else {
return Ok(Async::Ready(Some(ChannelEvent::Data(data))));
}
}
}
}
}
}
impl Stream for ChannelData {
type Item = Vec<u8>;
type Error = ChannelError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let mut channel = match self.0.poll_lock() {
Async::Ready(c) => c,
Async::NotReady => return Ok(Async::NotReady),
};
loop {
match try_ready!(channel.poll()) {
Some(ChannelEvent::Header(..)) => (),
Some(ChannelEvent::Data(data)) => return Ok(Async::Ready(Some(data))),
None => return Ok(Async::Ready(None)),
}
}
}
}
impl Stream for ChannelHeaders {
type Item = (u8, Vec<u8>);
type Error = ChannelError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
let mut channel = match self.0.poll_lock() {
Async::Ready(c) => c,
Async::NotReady => return Ok(Async::NotReady),
};
match try_ready!(channel.poll()) {
Some(ChannelEvent::Header(id, data)) => Ok(Async::Ready(Some((id, data)))),
Some(ChannelEvent::Data(..)) | None => Ok(Async::Ready(None)),
}
}
}

View file

@ -27,3 +27,28 @@ macro_rules! component {
}
}
}
use std::sync::Mutex;
use std::cell::UnsafeCell;
pub struct Lazy<T>(Mutex<bool>, UnsafeCell<Option<T>>);
unsafe impl <T: Sync> Sync for Lazy<T> {}
unsafe impl <T: Send> Send for Lazy<T> {}
impl <T> Lazy<T> {
pub fn new() -> Lazy<T> {
Lazy(Mutex::new(false), UnsafeCell::new(None))
}
pub fn get<F: FnOnce() -> T>(&self, f: F) -> &T {
let mut inner = self.0.lock().unwrap();
if !*inner {
unsafe {
*self.1.get() = Some(f());
}
*inner = true;
}
unsafe { &*self.1.get() }.as_ref().unwrap()
}
}

View file

@ -6,14 +6,12 @@ use std::thread;
use tokio_core::reactor::Core;
use tokio_core::reactor::Handle;
pub struct SinkAdaptor<T>(pub Option<mpsc::Sender<T>>);
pub struct StreamAdaptor<T, E>(pub Option<mpsc::Receiver<Result<T, E>>>);
pub struct SinkAdaptor<T>(mpsc::UnboundedSender<T>);
pub struct StreamAdaptor<T, E>(Option<mpsc::Receiver<Result<T, E>>>);
impl <T> SinkAdaptor<T> {
pub fn send(&mut self, item: T) {
let sender = self.0.take().unwrap();
let sending = sender.send(item);
self.0 = Some(sending.wait().unwrap());
mpsc::UnboundedSender::send(&mut self.0, item).unwrap();
}
}
@ -39,7 +37,7 @@ pub fn adapt<S, E>(transport: S) -> (SinkAdaptor<S::SinkItem>,
E: Send + 'static,
{
let (receiver_tx, receiver_rx) = mpsc::channel(0);
let (sender_tx, sender_rx) = mpsc::channel(0);
let (sender_tx, sender_rx) = mpsc::unbounded();
let (sink, stream) = transport.split();
@ -55,7 +53,7 @@ pub fn adapt<S, E>(transport: S) -> (SinkAdaptor<S::SinkItem>,
let task = (receiver_task, sender_task).into_future()
.map(|((), ())| ()).boxed();
(SinkAdaptor(Some(sender_tx)),
(SinkAdaptor(sender_tx),
StreamAdaptor(Some(receiver_rx)), task)
}

View file

@ -16,7 +16,6 @@
extern crate bit_set;
extern crate byteorder;
extern crate crypto;
extern crate eventual;
extern crate getopts;
extern crate hyper;
extern crate linear_map;
@ -50,19 +49,18 @@ extern crate portaudio;
extern crate libpulse_sys;
#[macro_use] mod component;
pub mod album_cover;
pub mod audio_backend;
pub mod audio_decrypt;
pub mod audio_file2;
pub mod audio_file;
pub mod audio_key;
pub mod cache;
pub mod channel;
pub mod diffie_hellman;
pub mod mercury;
pub mod metadata;
pub mod player;
pub mod session;
pub mod stream;
pub mod util;
pub mod version;

View file

@ -5,11 +5,12 @@ use std::io::{Read, Seek};
use vorbis;
use futures::{future, Future};
use audio_file::AudioFile;
use audio_decrypt::AudioDecrypt;
use audio_backend::Sink;
use metadata::{FileFormat, Track};
use session::{Bitrate, Session};
use util::{self, ReadSeek, SpotifyId, Subfile};
use util::{self, SpotifyId, Subfile};
pub use spirc::PlayStatus;
#[cfg(not(feature = "with-tremor"))]
@ -177,7 +178,9 @@ fn find_available_alternative<'a>(session: &Session, track: &'a Track) -> Option
}
}
fn load_track(session: &Session, track_id: SpotifyId) -> Option<vorbis::Decoder<Subfile<AudioDecrypt<Box<ReadSeek>>>>> {
fn load_track(session: &Session, track_id: SpotifyId)
-> Option<vorbis::Decoder<Subfile<AudioDecrypt<AudioFile>>>>
{
let track = session.metadata().get::<Track>(track_id).wait().unwrap();
info!("Loading track \"{}\"", track.name);
@ -208,7 +211,10 @@ fn load_track(session: &Session, track_id: SpotifyId) -> Option<vorbis::Decoder<
let key = session.audio_key().request(track.id, file_id).wait().unwrap();
let audio_file = Subfile::new(AudioDecrypt::new(key, session.audio_file(file_id)), 0xa7);
let (open, _) = session.audio_file().open(file_id);
let encrypted_file = open.wait().unwrap();
let audio_file = Subfile::new(AudioDecrypt::new(key, encrypted_file), 0xa7);
let decoder = vorbis::Decoder::new(audio_file).unwrap();
Some(decoder)

View file

@ -1,30 +1,24 @@
use crypto::digest::Digest;
use crypto::sha1::Sha1;
use eventual;
use eventual::Future;
use eventual::Async;
use std::io::{self, Read, Cursor};
use std::io;
use std::result::Result;
use std::sync::{Mutex, RwLock, Arc, Weak};
use std::str::FromStr;
use futures::Future as Future_;
use futures::{Stream, BoxFuture};
use tokio_core::reactor::Handle;
use futures::{Stream, BoxFuture, IntoFuture};
use tokio_core::reactor::{Handle, Remote};
use album_cover::AlbumCover;
use apresolve::apresolve_or_fallback;
use audio_file::AudioFile;
use authentication::Credentials;
use cache::Cache;
use component::Lazy;
use connection::{self, adaptor};
use stream::StreamManager;
use util::{FileId, ReadSeek, Lazy};
use audio_key::AudioKeyManager;
use channel::ChannelManager;
use mercury::MercuryManager;
use metadata::MetadataManager;
use stream;
use audio_file::AudioFileManager;
#[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq)]
pub enum Bitrate {
@ -63,13 +57,17 @@ pub struct SessionInternal {
data: RwLock<SessionData>,
cache: Box<Cache + Send + Sync>,
stream: Mutex<StreamManager>,
rx_connection: Mutex<adaptor::StreamAdaptor<(u8, Vec<u8>), io::Error>>,
tx_connection: Mutex<adaptor::SinkAdaptor<(u8, Vec<u8>)>>,
audio_key: Lazy<AudioKeyManager>,
audio_file: Lazy<AudioFileManager>,
channel: Lazy<ChannelManager>,
mercury: Lazy<MercuryManager>,
metadata: Lazy<MetadataManager>,
handle: Remote,
}
#[derive(Clone)]
@ -91,9 +89,10 @@ impl Session {
{
let access_point = apresolve_or_fallback::<io::Error>(&handle);
let handle_ = handle.clone();
let connection = access_point.and_then(move |addr| {
info!("Connecting to AP \"{}\"", addr);
connection::connect::<&str>(&addr, &handle)
connection::connect::<&str>(&addr, &handle_)
});
let device_id = config.device_id.clone();
@ -105,15 +104,19 @@ impl Session {
info!("Authenticated !");
cache.put_credentials(&reusable_credentials);
let (session, task) = Session::create(transport, config, cache, reusable_credentials.username.clone());
let (session, task) = Session::create(
&handle, transport, config, cache, reusable_credentials.username.clone()
);
(session, task)
});
Box::new(result)
}
fn create(transport: connection::Transport, config: Config,
cache: Box<Cache + Send + Sync>, username: String)
fn create(handle: &Handle, transport: connection::Transport,
config: Config, cache: Box<Cache + Send + Sync>,
username: String)
-> (Session, BoxFuture<(), io::Error>)
{
let transport = transport.map(|(cmd, data)| (cmd, data.as_ref().to_owned()));
@ -130,11 +133,14 @@ impl Session {
tx_connection: Mutex::new(tx),
cache: cache,
stream: Mutex::new(StreamManager::new()),
audio_key: Lazy::new(),
audio_file: Lazy::new(),
channel: Lazy::new(),
mercury: Lazy::new(),
metadata: Lazy::new(),
handle: handle.remote().clone(),
}));
(session, task)
@ -144,6 +150,14 @@ impl Session {
self.0.audio_key.get(|| AudioKeyManager::new(self.weak()))
}
pub fn audio_file(&self) -> &AudioFileManager {
self.0.audio_file.get(|| AudioFileManager::new(self.weak()))
}
pub fn channel(&self) -> &ChannelManager {
self.0.channel.get(|| ChannelManager::new(self.weak()))
}
pub fn mercury(&self) -> &MercuryManager {
self.0.mercury.get(|| MercuryManager::new(self.weak()))
}
@ -152,17 +166,25 @@ impl Session {
self.0.metadata.get(|| MetadataManager::new(self.weak()))
}
pub fn spawn<F, R>(&self, f: F)
where F: FnOnce(&Handle) -> R + Send + 'static,
R: IntoFuture<Item=(), Error=()>,
R::Future: 'static
{
self.0.handle.spawn(f)
}
pub fn poll(&self) {
let (cmd, data) = self.recv();
match cmd {
0x4 => self.send_packet(0x49, data),
0x4a => (),
0x9 | 0xa => self.0.stream.lock().unwrap().handle(cmd, data, self),
0x1b => {
self.0.data.write().unwrap().country = String::from_utf8(data).unwrap();
}
0x9 | 0xa => self.channel().dispatch(cmd, data),
0xd | 0xe => self.audio_key().dispatch(cmd, data),
0xb2...0xb6 => self.mercury().dispatch(cmd, data),
_ => (),
@ -177,60 +199,6 @@ impl Session {
self.0.tx_connection.lock().unwrap().send((cmd, data))
}
/*
pub fn audio_key(&self, track: SpotifyId, file_id: FileId) -> Future<AudioKey, AudioKeyError> {
self.0.cache
.get_audio_key(track, file_id)
.map(Future::of)
.unwrap_or_else(|| {
let self_ = self.clone();
self.0.audio_key.lock().unwrap()
.request(self, track, file_id)
.map(move |key| {
self_.0.cache.put_audio_key(track, file_id, key);
key
})
})
}
*/
pub fn audio_file(&self, file_id: FileId) -> Box<ReadSeek> {
self.0.cache
.get_file(file_id)
.unwrap_or_else(|| {
let (audio_file, complete_rx) = AudioFile::new(self, file_id);
let self_ = self.clone();
complete_rx.map(move |mut complete_file| {
self_.0.cache.put_file(file_id, &mut complete_file)
}).fire();
Box::new(audio_file.await().unwrap())
})
}
pub fn album_cover(&self, file_id: FileId) -> eventual::Future<Vec<u8>, ()> {
self.0.cache
.get_file(file_id)
.map(|mut f| {
let mut data = Vec::new();
f.read_to_end(&mut data).unwrap();
Future::of(data)
})
.unwrap_or_else(|| {
let self_ = self.clone();
AlbumCover::get(file_id, self)
.map(move |data| {
self_.0.cache.put_file(file_id, &mut Cursor::new(&data));
data
})
})
}
pub fn stream(&self, handler: Box<stream::Handler>) {
self.0.stream.lock().unwrap().create(handler, self)
}
pub fn cache(&self) -> &Cache {
self.0.cache.as_ref()
}

View file

@ -1,169 +0,0 @@
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::io::{Cursor, Seek, SeekFrom};
use session::{Session, PacketHandler};
pub enum Response<H, S = H> {
Continue(H),
Spawn(S),
Close,
}
impl <H: Handler + 'static> Response<H> {
pub fn boxed(self) -> Response<Box<Handler>> {
match self {
Response::Continue(handler) => Response::Continue(Box::new(handler)),
Response::Spawn(handler) => Response::Spawn(Box::new(handler)),
Response::Close => Response::Close,
}
}
}
pub trait Handler: Send {
fn on_create(self, channel_id: ChannelId, session: &Session) -> Response<Self> where Self: Sized;
fn on_header(self, header_id: u8, header_data: &[u8], session: &Session) -> Response<Self> where Self: Sized;
fn on_data(self, data: &[u8], session: &Session) -> Response<Self> where Self: Sized;
fn on_error(self, session: &Session) -> Response<Self> where Self: Sized;
fn on_close(self, session: &Session) -> Response<Self> where Self: Sized;
fn box_on_create(self: Box<Self>, channel_id: ChannelId, session: &Session) -> Response<Box<Handler>>;
fn box_on_header(self: Box<Self>, header_id: u8, header_data: &[u8], session: &Session) -> Response<Box<Handler>>;
fn box_on_data(self: Box<Self>, data: &[u8], session: &Session) -> Response<Box<Handler>>;
fn box_on_error(self: Box<Self>, session: &Session) -> Response<Box<Handler>>;
fn box_on_close(self: Box<Self>, session: &Session) -> Response<Box<Handler>>;
}
pub type ChannelId = u16;
enum ChannelMode {
Header,
Data
}
struct Channel(ChannelMode, Box<Handler>);
impl Channel {
fn handle_packet(self, cmd: u8, data: Vec<u8>, session: &Session) -> Response<Self, Box<Handler>> {
let Channel(mode, mut handler) = self;
let mut packet = Cursor::new(&data as &[u8]);
packet.read_u16::<BigEndian>().unwrap(); // Skip channel id
if cmd == 0xa {
trace!("error: {} {}", data.len(), packet.read_u16::<BigEndian>().unwrap());
return match handler.box_on_error(session) {
Response::Continue(_) => Response::Close,
Response::Spawn(f) => Response::Spawn(f),
Response::Close => Response::Close,
};
}
match mode {
ChannelMode::Header => {
let mut length = 0;
while packet.position() < data.len() as u64 {
length = packet.read_u16::<BigEndian>().unwrap();
if length > 0 {
let header_id = packet.read_u8().unwrap();
let header_data = &data[packet.position() as usize .. packet.position() as usize + length as usize - 1];
handler = match handler.box_on_header(header_id, header_data, session) {
Response::Continue(handler) => handler,
Response::Spawn(f) => return Response::Spawn(f),
Response::Close => return Response::Close,
};
packet.seek(SeekFrom::Current(length as i64 - 1)).unwrap();
}
}
if length == 0 {
Response::Continue(Channel(ChannelMode::Data, handler))
} else {
Response::Continue(Channel(ChannelMode::Header, handler))
}
}
ChannelMode::Data => {
if packet.position() < data.len() as u64 {
let event_data = &data[packet.position() as usize..];
match handler.box_on_data(event_data, session) {
Response::Continue(handler) => Response::Continue(Channel(ChannelMode::Data, handler)),
Response::Spawn(f) => Response::Spawn(f),
Response::Close => Response::Close,
}
} else {
match handler.box_on_close(session) {
Response::Continue(_) => Response::Close,
Response::Spawn(f) => Response::Spawn(f),
Response::Close => Response::Close,
}
}
}
}
}
}
pub struct StreamManager {
next_id: ChannelId,
channels: HashMap<ChannelId, Option<Channel>>,
}
impl StreamManager {
pub fn new() -> StreamManager {
StreamManager {
next_id: 0,
channels: HashMap::new(),
}
}
pub fn create(&mut self, handler: Box<Handler>, session: &Session) {
let channel_id = self.next_id;
self.next_id += 1;
trace!("allocated stream {}", channel_id);
match handler.box_on_create(channel_id, session) {
Response::Continue(handler) => {
self.channels.insert(channel_id, Some(Channel(ChannelMode::Header, handler)));
}
Response::Spawn(handler) => self.create(handler, session),
Response::Close => (),
}
}
}
impl PacketHandler for StreamManager {
fn handle(&mut self, cmd: u8, data: Vec<u8>, session: &Session) {
let id: ChannelId = BigEndian::read_u16(&data[0..2]);
let spawn = if let Entry::Occupied(mut entry) = self.channels.entry(id) {
if let Some(channel) = entry.get_mut().take() {
match channel.handle_packet(cmd, data, session) {
Response::Continue(channel) => {
entry.insert(Some(channel));
None
}
Response::Spawn(f) => {
entry.remove();
Some(f)
}
Response::Close => {
entry.remove();
None
}
}
} else {
None
}
} else {
None
};
if let Some(s) = spawn {
self.create(s, session);
}
}
}

View file

@ -22,19 +22,6 @@ pub fn rand_vec<G: Rng, R: Rand>(rng: &mut G, size: usize) -> Vec<R> {
rng.gen_iter().take(size).collect()
}
pub trait IgnoreExt {
fn ignore(self);
}
impl<T, E> IgnoreExt for Result<T, E> {
fn ignore(self) {
match self {
Ok(_) => (),
Err(_) => (),
}
}
}
pub fn now_ms() -> i64 {
let dur = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(dur) => dur,
@ -135,28 +122,3 @@ impl <T: Seq> SeqGenerator<T> {
mem::replace(&mut self.0, value)
}
}
use std::sync::Mutex;
use std::cell::UnsafeCell;
pub struct Lazy<T>(Mutex<bool>, UnsafeCell<Option<T>>);
unsafe impl <T: Sync> Sync for Lazy<T> {}
unsafe impl <T: Send> Send for Lazy<T> {}
impl <T> Lazy<T> {
pub fn new() -> Lazy<T> {
Lazy(Mutex::new(false), UnsafeCell::new(None))
}
pub fn get<F: FnOnce() -> T>(&self, f: F) -> &T {
let mut inner = self.0.lock().unwrap();
if !*inner {
unsafe {
*self.1.get() = Some(f());
}
*inner = true;
}
unsafe { &*self.1.get() }.as_ref().unwrap()
}
}