diff --git a/Cargo.lock b/Cargo.lock index 73f8800a..cef238a3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,7 +8,6 @@ dependencies = [ "ctrlc 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)", - "eventual 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)", "getopts 0.2.14 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.11.0-a.0 (git+https://github.com/hyperium/hyper)", @@ -204,16 +203,6 @@ dependencies = [ "backtrace 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "eventual" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", - "syncbox 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)", - "time 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "futures" version = "0.1.8" @@ -805,15 +794,6 @@ name = "smallvec" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -[[package]] -name = "syncbox" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "log 0.3.6 (registry+https://github.com/rust-lang/crates.io-index)", - "time 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "syntex" version = "0.44.0" @@ -1216,7 +1196,6 @@ dependencies = [ "checksum dtoa 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "0dd841b58510c9618291ffa448da2e4e0f699d984d436122372f446dae62263d" "checksum env_logger 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "15abd780e45b3ea4f76b4e9a26ff4843258dd8a3eed2775a0e7368c2e7936c2f" "checksum error-chain 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "318cb3c71ee4cdea69fdc9e15c173b245ed6063e1709029e8fd32525a881120f" -"checksum eventual 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "b9bda6d089b434ca50f3d6feb5fca421309b8bac97b8be9af51cff879fa3f54b" "checksum futures 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)" = "3e0b237aed5d8b61bc7d6ee1b8ebd719d0a934a38d363c5e56daf34bb634d9b2" "checksum futures-cpupool 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "bb982bb25cd8fa5da6a8eb3a460354c984ff1113da82bcb4f0b0862b5795db82" "checksum gcc 0.3.41 (registry+https://github.com/rust-lang/crates.io-index)" = "3689e1982a563af74960ae3a4758aa632bb8fd984cfc3cc3b60ee6109477ab6e" @@ -1286,7 +1265,6 @@ dependencies = [ "checksum shannon-sys 0.1.0 (git+https://github.com/plietar/rust-shannon)" = "" "checksum slab 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "17b4fcaed89ab08ef143da37bc52adbcc04d4a69014f4c1208d6b51f0c47bc23" "checksum smallvec 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4c8cbcd6df1e117c2210e13ab5109635ad68a929fcbb8964dc965b76cb5ee013" -"checksum syncbox 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)" = "05bc2b72659ac27a2d0e7c4166c8596578197c4c41f767deab12c81f523b85c7" "checksum syntex 0.44.0 (registry+https://github.com/rust-lang/crates.io-index)" = "84f37b94d7ee762bcac58741f73a95465cf87188c3b93f10df9245aff821b2b4" "checksum syntex 0.50.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3bd253b0d7d787723a33384d426f0ebec7f8edccfaeb2022d0177162bb134da0" "checksum syntex 0.55.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5b186e277908427269816c542c912b45253ed11808a09780bd224679770ce351" diff --git a/Cargo.toml b/Cargo.toml index 122a93ef..a70fc464 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,6 @@ bit-set = "0.4.0" byteorder = "1.0" ctrlc = { version = "2.0", features = ["termination"] } env_logger = "0.3.2" -eventual = "0.1.6" getopts = "0.2.14" hyper = { git = "https://github.com/hyperium/hyper" } lazy_static = "0.2.0" diff --git a/src/apresolve.rs b/src/apresolve.rs index 087b60d9..80d1f4a5 100644 --- a/src/apresolve.rs +++ b/src/apresolve.rs @@ -7,6 +7,7 @@ use serde_json; use tokio_core::reactor::Handle; error_chain! { } + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct APResolveData { ap_list: Vec diff --git a/src/audio_file.rs b/src/audio_file.rs index 6354efb4..e8778650 100644 --- a/src/audio_file.rs +++ b/src/audio_file.rs @@ -1,167 +1,270 @@ use bit_set::BitSet; -use byteorder::{ByteOrder, BigEndian}; -use eventual; +use byteorder::{ByteOrder, BigEndian, WriteBytesExt}; +use futures::Stream; +use futures::sync::{oneshot, mpsc}; +use futures::{Poll, Async, Future}; use std::cmp::min; -use std::sync::{Arc, Condvar, Mutex}; -use std::sync::mpsc::{self, TryRecvError}; use std::fs; use std::io::{self, Read, Write, Seek, SeekFrom}; +use std::sync::{Arc, Condvar, Mutex}; use tempfile::NamedTempFile; -use util::{FileId, IgnoreExt}; +use channel::{Channel, ChannelData, ChannelError, ChannelHeaders}; use session::Session; -use audio_file2; +use util::FileId; const CHUNK_SIZE: usize = 0x20000; +component! { + AudioFileManager : AudioFileManagerInner { } +} + pub struct AudioFile { read_file: fs::File, position: u64, - seek: mpsc::Sender, + seek: mpsc::UnboundedSender, shared: Arc, } -struct AudioFileInternal { - partial_tx: Option>, - complete_tx: eventual::Complete, - write_file: NamedTempFile, - seek_rx: mpsc::Receiver, - shared: Arc, - chunk_count: usize, +pub struct AudioFileOpen { + session: Session, + data_rx: Option, + headers: ChannelHeaders, + file_id: FileId, + complete_tx: Option>, } struct AudioFileShared { + file_id: FileId, + chunk_count: usize, cond: Condvar, bitmap: Mutex, } -impl AudioFile { - pub fn new(session: &Session, file_id: FileId) - -> (eventual::Future, eventual::Future) { +impl AudioFileOpen { + fn finish(&mut self, size: usize) -> AudioFile { + let chunk_count = (size + CHUNK_SIZE - 1) / CHUNK_SIZE; let shared = Arc::new(AudioFileShared { + file_id: self.file_id, + chunk_count: chunk_count, cond: Condvar::new(), - bitmap: Mutex::new(BitSet::new()), + bitmap: Mutex::new(BitSet::with_capacity(chunk_count)), }); - let (seek_tx, seek_rx) = mpsc::channel(); - let (partial_tx, partial_rx) = eventual::Future::pair(); - let (complete_tx, complete_rx) = eventual::Future::pair(); + let mut write_file = NamedTempFile::new().unwrap(); + write_file.set_len(size as u64).unwrap(); + write_file.seek(SeekFrom::Start(0)).unwrap(); - let internal = AudioFileInternal { - shared: shared.clone(), - write_file: NamedTempFile::new().unwrap(), - seek_rx: seek_rx, - partial_tx: Some(partial_tx), - complete_tx: complete_tx, - chunk_count: 0, - }; + let read_file = write_file.reopen().unwrap(); - audio_file2::AudioFile::new(file_id, 0, internal, session); + let data_rx = self.data_rx.take().unwrap(); + let complete_tx = self.complete_tx.take().unwrap(); + let (seek_tx, seek_rx) = mpsc::unbounded(); - let file_rx = partial_rx.map(|read_file| { - AudioFile { - read_file: read_file, + let fetcher = AudioFileFetch::new( + self.session.clone(), shared.clone(), data_rx, write_file, seek_rx, complete_tx + ); + self.session.spawn(move |_| fetcher); - position: 0, - seek: seek_tx, + AudioFile { + read_file: read_file, - shared: shared, - } - }); - - (file_rx, complete_rx) + position: 0, + seek: seek_tx, + + shared: shared, + } } } -impl audio_file2::Handler for AudioFileInternal { - fn on_header(mut self, header_id: u8, header_data: &[u8], _session: &Session) -> audio_file2::Response { - if header_id == 0x3 { - if let Some(tx) = self.partial_tx.take() { - let size = BigEndian::read_u32(header_data) as usize * 4; - self.write_file.set_len(size as u64).unwrap(); - let read_file = self.write_file.reopen().unwrap(); +impl Future for AudioFileOpen { + type Item = AudioFile; + type Error = ChannelError; - self.chunk_count = (size + CHUNK_SIZE - 1) / CHUNK_SIZE; - self.shared.bitmap.lock().unwrap().reserve_len(self.chunk_count); + fn poll(&mut self) -> Poll { + loop { + let (id, data) = try_ready!(self.headers.poll()).unwrap(); - tx.complete(read_file) + if id == 0x3 { + let size = BigEndian::read_u32(&data) as usize * 4; + let file = self.finish(size); + + return Ok(Async::Ready(file)); } } - - audio_file2::Response::Continue(self) } +} - fn on_data(mut self, offset: usize, data: &[u8], _session: &Session) -> audio_file2::Response { - self.write_file.seek(SeekFrom::Start(offset as u64)).unwrap(); - self.write_file.write_all(&data).unwrap(); +impl AudioFileManager { + pub fn open(&self, file_id: FileId) -> (AudioFileOpen, oneshot::Receiver) { + let (complete_tx, complete_rx) = oneshot::channel(); + let (headers, data) = request_chunk(&self.session(), file_id, 0).split(); - // We've crossed a chunk boundary - // Mark the previous one as complete in the bitmap and notify the reader - let seek = if (offset + data.len()) % CHUNK_SIZE < data.len() { - let mut index = offset / CHUNK_SIZE; - let mut bitmap = self.shared.bitmap.lock().unwrap(); - bitmap.insert(index); - self.shared.cond.notify_all(); + let open = AudioFileOpen { + session: self.session(), + file_id: file_id, - // If all blocks are complete when can stop - if bitmap.len() >= self.chunk_count { - drop(bitmap); - self.write_file.seek(SeekFrom::Start(0)).unwrap(); - self.complete_tx.complete(self.write_file); - return audio_file2::Response::Close; - } + headers: headers, + data_rx: Some(data), - // Find the next undownloaded block - index = (index + 1) % self.chunk_count; - while bitmap.contains(index) { - index = (index + 1) % self.chunk_count; - } - - Some(index) - } else { - None + complete_tx: Some(complete_tx), }; - match self.seek_rx.try_recv() { - Ok(seek_offset) => audio_file2::Response::Seek(self, seek_offset as usize / CHUNK_SIZE * CHUNK_SIZE), - Err(TryRecvError::Disconnected) => audio_file2::Response::Close, - Err(TryRecvError::Empty) => match seek { - Some(index) => audio_file2::Response::Seek(self, index * CHUNK_SIZE), - None => audio_file2::Response::Continue(self), - }, + (open, complete_rx) + } +} + +fn request_chunk(session: &Session, file: FileId, index: usize) -> Channel { + debug!("requesting chunk {}", index); + + let start = (index * CHUNK_SIZE / 4) as u32; + let end = ((index + 1) * CHUNK_SIZE / 4) as u32; + + let (id, channel) = session.channel().allocate(); + + let mut data: Vec = Vec::new(); + data.write_u16::(id).unwrap(); + data.write_u8(0).unwrap(); + data.write_u8(1).unwrap(); + data.write_u16::(0x0000).unwrap(); + data.write_u32::(0x00000000).unwrap(); + data.write_u32::(0x00009C40).unwrap(); + data.write_u32::(0x00020000).unwrap(); + data.write(&file.0).unwrap(); + data.write_u32::(start).unwrap(); + data.write_u32::(end).unwrap(); + + session.send_packet(0x8, data); + + channel +} + +struct AudioFileFetch { + session: Session, + shared: Arc, + output: Option, + + index: usize, + data_rx: ChannelData, + + seek_rx: mpsc::UnboundedReceiver, + complete_tx: Option>, +} + +impl AudioFileFetch { + fn new(session: Session, shared: Arc, + data_rx: ChannelData, output: NamedTempFile, + seek_rx: mpsc::UnboundedReceiver, + complete_tx: oneshot::Sender) -> AudioFileFetch + { + AudioFileFetch { + session: session, + shared: shared, + output: Some(output), + + index: 0, + data_rx: data_rx, + + seek_rx: seek_rx, + complete_tx: Some(complete_tx), } } - fn on_eof(mut self, _session: &Session) -> audio_file2::Response { - let index = { - let mut index = self.chunk_count - 1; - let mut bitmap = self.shared.bitmap.lock().unwrap(); - bitmap.insert(index); - self.shared.cond.notify_all(); + fn download(&mut self, mut new_index: usize) { + assert!(new_index < self.shared.chunk_count); - // If all blocks are complete when can stop - if bitmap.len() >= self.chunk_count { - drop(bitmap); - self.write_file.seek(SeekFrom::Start(0)).unwrap(); - self.complete_tx.complete(self.write_file); - return audio_file2::Response::Close; + { + let bitmap = self.shared.bitmap.lock().unwrap(); + while bitmap.contains(new_index) { + new_index = (new_index + 1) % self.shared.chunk_count; } + } - // Find the next undownloaded block - index = (index + 1) % self.chunk_count; - while bitmap.contains(index) { - index = (index + 1) % self.chunk_count; - } - index - }; + if self.index != new_index { + self.index = new_index; - audio_file2::Response::Seek(self, index * CHUNK_SIZE) + let offset = self.index * CHUNK_SIZE; + + self.output.as_mut().unwrap() + .seek(SeekFrom::Start(offset as u64)).unwrap(); + + let (_headers, data) = request_chunk(&self.session, self.shared.file_id, self.index).split(); + self.data_rx = data; + } } - fn on_error(self, _session: &Session) { + fn finish(&mut self) { + let mut output = self.output.take().unwrap(); + let complete_tx = self.complete_tx.take().unwrap(); + + output.seek(SeekFrom::Start(0)).unwrap(); + complete_tx.complete(output); + } +} + +impl Future for AudioFileFetch { + type Item = (); + type Error = (); + + fn poll(&mut self) -> Poll<(), ()> { + loop { + let mut progress = false; + + match self.seek_rx.poll() { + Ok(Async::Ready(None)) => { + return Ok(Async::Ready(())); + } + Ok(Async::Ready(Some(offset))) => { + progress = true; + let index = offset as usize / CHUNK_SIZE; + self.download(index); + } + Ok(Async::NotReady) => (), + Err(()) => unreachable!(), + } + + match self.data_rx.poll() { + Ok(Async::Ready(Some(data))) => { + progress = true; + + self.output.as_mut().unwrap() + .write_all(&data).unwrap(); + } + Ok(Async::Ready(None)) => { + progress = true; + + debug!("chunk {} / {} complete", self.index, self.shared.chunk_count); + + let full = { + let mut bitmap = self.shared.bitmap.lock().unwrap(); + bitmap.insert(self.index as usize); + self.shared.cond.notify_all(); + + bitmap.len() >= self.shared.chunk_count + }; + + if full { + self.finish(); + return Ok(Async::Ready(())); + } + + let new_index = (self.index + 1) % self.shared.chunk_count; + self.download(new_index); + } + Ok(Async::NotReady) => (), + Err(ChannelError) => { + warn!("error from channel"); + return Ok(Async::Ready(())); + }, + } + + if !progress { + return Ok(Async::NotReady); + } + } } } @@ -192,7 +295,7 @@ impl Seek for AudioFile { // Notify the fetch thread to get the correct block // This can fail if fetch thread has completed, in which case the // block is ready. Just ignore the error. - self.seek.send(self.position).ignore(); - Ok(self.position as u64) + let _ = self.seek.send(self.position); + Ok(self.position) } } diff --git a/src/audio_file2.rs b/src/audio_file2.rs deleted file mode 100644 index db0179e6..00000000 --- a/src/audio_file2.rs +++ /dev/null @@ -1,131 +0,0 @@ -use session::Session; -use stream; -use util::FileId; - -use byteorder::{BigEndian, WriteBytesExt}; -use std::io::Write; - -const CHUNK_SIZE: usize = 0x20000; - -pub enum Response { -// Wait(H), - Continue(H), - Seek(H, usize), - Close, -} - -pub trait Handler : Sized + Send + 'static { - fn on_header(self, header_id: u8, header_data: &[u8], session: &Session) -> Response; - fn on_data(self, offset: usize, data: &[u8], session: &Session) -> Response; - fn on_eof(self, session: &Session) -> Response; - fn on_error(self, session: &Session); -} - -pub struct AudioFile { - handler: H, - file_id: FileId, - offset: usize, -} - -impl AudioFile { - pub fn new(file_id: FileId, offset: usize, handler: H, session: &Session) { - let handler = AudioFile { - handler: handler, - file_id: file_id, - offset: offset, - }; - - session.stream(Box::new(handler)); - } -} - -impl stream::Handler for AudioFile { - fn on_create(self, channel_id: stream::ChannelId, session: &Session) -> stream::Response { - debug!("Got channel {}", channel_id); - - let mut data: Vec = Vec::new(); - data.write_u16::(channel_id).unwrap(); - data.write_u8(0).unwrap(); - data.write_u8(1).unwrap(); - data.write_u16::(0x0000).unwrap(); - data.write_u32::(0x00000000).unwrap(); - data.write_u32::(0x00009C40).unwrap(); - data.write_u32::(0x00020000).unwrap(); - data.write(&self.file_id.0).unwrap(); - data.write_u32::(self.offset as u32 / 4).unwrap(); - data.write_u32::((self.offset + CHUNK_SIZE) as u32 / 4).unwrap(); - - session.send_packet(0x8, data); - - stream::Response::Continue(self) - } - - fn on_header(mut self, header_id: u8, header_data: &[u8], session: &Session) -> stream::Response { - match self.handler.on_header(header_id, header_data, session) { - Response::Continue(handler) => { - self.handler = handler; - stream::Response::Continue(self) - } - Response::Seek(handler, offset) => { - self.handler = handler; - self.offset = offset; - stream::Response::Spawn(self) - } - Response::Close => stream::Response::Close, - } - } - - fn on_data(mut self, data: &[u8], session: &Session) -> stream::Response { - match self.handler.on_data(self.offset, data, session) { - Response::Continue(handler) => { - self.handler = handler; - self.offset += data.len(); - stream::Response::Continue(self) - } - Response::Seek(handler, offset) => { - self.handler = handler; - self.offset = offset; - stream::Response::Spawn(self) - } - Response::Close => stream::Response::Close, - } - } - - fn on_close(self, _session: &Session) -> stream::Response { - // End of chunk, request a new one - stream::Response::Spawn(self) - } - - fn on_error(mut self, session: &Session) -> stream::Response { - match self.handler.on_eof(session) { - Response::Continue(_) => stream::Response::Close, - Response::Seek(handler, offset) => { - self.handler = handler; - self.offset = offset; - stream::Response::Spawn(self) - } - Response::Close => stream::Response::Close, - } - } - - fn box_on_create(self: Box, channel_id: stream::ChannelId, session: &Session) -> stream::Response> { - self.on_create(channel_id, session).boxed() - } - - fn box_on_header(self: Box, header_id: u8, header_data: &[u8], session: &Session) -> stream::Response> { - self.on_header(header_id, header_data, session).boxed() - } - - fn box_on_data(self: Box, data: &[u8], session: &Session) -> stream::Response> { - self.on_data(data, session).boxed() - } - - fn box_on_error(self: Box, session: &Session) -> stream::Response> { - self.on_error(session).boxed() - } - - fn box_on_close(self: Box, session: &Session) -> stream::Response> { - self.on_close(session).boxed() - } -} - diff --git a/src/channel.rs b/src/channel.rs new file mode 100644 index 00000000..d3913efa --- /dev/null +++ b/src/channel.rs @@ -0,0 +1,174 @@ +use byteorder::{BigEndian, ByteOrder}; +use futures::sync::{BiLock, mpsc}; +use futures::{Poll, Async, Stream}; +use std::collections::HashMap; +use tokio_core::io::EasyBuf; + +use util::SeqGenerator; + +component! { + ChannelManager : ChannelManagerInner { + sequence: SeqGenerator = SeqGenerator::new(0), + channels: HashMap)>> = HashMap::new(), + } +} + +#[derive(Debug,Hash,PartialEq,Eq,Copy,Clone)] +pub struct ChannelError; + +pub struct Channel { + receiver: mpsc::UnboundedReceiver<(u8, Vec)>, + state: ChannelState, +} + +pub struct ChannelHeaders(BiLock); +pub struct ChannelData(BiLock); + +pub enum ChannelEvent { + Header(u8, Vec), + Data(Vec), +} + +#[derive(Clone)] +enum ChannelState { + Header(EasyBuf), + Data, + Closed, +} + +impl ChannelManager { + pub fn allocate(&self) -> (u16, Channel) { + let (tx, rx) = mpsc::unbounded(); + + let seq = self.lock(|inner| { + let seq = inner.sequence.get(); + inner.channels.insert(seq, tx); + seq + }); + + let channel = Channel { + receiver: rx, + state: ChannelState::Header(EasyBuf::new()), + }; + + (seq, channel) + } + + pub fn dispatch(&self, cmd: u8, data: Vec) { + use std::collections::hash_map::Entry; + + let id: u16 = BigEndian::read_u16(&data[..2]); + + self.lock(|inner| { + if let Entry::Occupied(entry) = inner.channels.entry(id) { + let _ = entry.get().send((cmd, data[2..].to_owned())); + } + }); + } +} + +impl Channel { + fn recv_packet(&mut self) -> Poll, ChannelError> { + let (cmd, packet) = match self.receiver.poll() { + Ok(Async::Ready(t)) => t.expect("channel closed"), + Ok(Async::NotReady) => return Ok(Async::NotReady), + Err(()) => unreachable!(), + }; + + if cmd == 0xa { + let code = BigEndian::read_u16(&packet[..2]); + error!("channel error: {} {}", packet.len(), code); + + self.state = ChannelState::Closed; + + Err(ChannelError) + } else { + Ok(Async::Ready(packet)) + } + } + + pub fn split(self) -> (ChannelHeaders, ChannelData) { + let (headers, data) = BiLock::new(self); + + (ChannelHeaders(headers), ChannelData(data)) + } +} + +impl Stream for Channel { + type Item = ChannelEvent; + type Error = ChannelError; + + fn poll(&mut self) -> Poll, Self::Error> { + loop { + match self.state.clone() { + ChannelState::Closed => panic!("Polling already terminated channel"), + ChannelState::Header(mut data) => { + if data.len() == 0 { + data = EasyBuf::from(try_ready!(self.recv_packet())); + } + + let length = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize; + if length == 0 { + assert_eq!(data.len(), 0); + self.state = ChannelState::Data; + } else { + let header_id = data.drain_to(1).as_ref()[0]; + let header_data = data.drain_to(length - 1).as_ref().to_owned(); + + self.state = ChannelState::Header(data); + + return Ok(Async::Ready(Some(ChannelEvent::Header(header_id, header_data)))); + } + } + + ChannelState::Data => { + let data = try_ready!(self.recv_packet()); + if data.is_empty() { + self.receiver.close(); + self.state = ChannelState::Closed; + return Ok(Async::Ready(None)); + } else { + return Ok(Async::Ready(Some(ChannelEvent::Data(data)))); + } + } + } + } + } +} + +impl Stream for ChannelData { + type Item = Vec; + type Error = ChannelError; + + fn poll(&mut self) -> Poll, Self::Error> { + let mut channel = match self.0.poll_lock() { + Async::Ready(c) => c, + Async::NotReady => return Ok(Async::NotReady), + }; + + loop { + match try_ready!(channel.poll()) { + Some(ChannelEvent::Header(..)) => (), + Some(ChannelEvent::Data(data)) => return Ok(Async::Ready(Some(data))), + None => return Ok(Async::Ready(None)), + } + } + } +} + +impl Stream for ChannelHeaders { + type Item = (u8, Vec); + type Error = ChannelError; + + fn poll(&mut self) -> Poll, Self::Error> { + let mut channel = match self.0.poll_lock() { + Async::Ready(c) => c, + Async::NotReady => return Ok(Async::NotReady), + }; + + match try_ready!(channel.poll()) { + Some(ChannelEvent::Header(id, data)) => Ok(Async::Ready(Some((id, data)))), + Some(ChannelEvent::Data(..)) | None => Ok(Async::Ready(None)), + } + } +} diff --git a/src/component.rs b/src/component.rs index f4058e43..59753fa5 100644 --- a/src/component.rs +++ b/src/component.rs @@ -27,3 +27,28 @@ macro_rules! component { } } } + +use std::sync::Mutex; +use std::cell::UnsafeCell; + +pub struct Lazy(Mutex, UnsafeCell>); +unsafe impl Sync for Lazy {} +unsafe impl Send for Lazy {} + +impl Lazy { + pub fn new() -> Lazy { + Lazy(Mutex::new(false), UnsafeCell::new(None)) + } + + pub fn get T>(&self, f: F) -> &T { + let mut inner = self.0.lock().unwrap(); + if !*inner { + unsafe { + *self.1.get() = Some(f()); + } + *inner = true; + } + + unsafe { &*self.1.get() }.as_ref().unwrap() + } +} diff --git a/src/connection/adaptor.rs b/src/connection/adaptor.rs index e91a5a0d..1229e63c 100644 --- a/src/connection/adaptor.rs +++ b/src/connection/adaptor.rs @@ -6,14 +6,12 @@ use std::thread; use tokio_core::reactor::Core; use tokio_core::reactor::Handle; -pub struct SinkAdaptor(pub Option>); -pub struct StreamAdaptor(pub Option>>); +pub struct SinkAdaptor(mpsc::UnboundedSender); +pub struct StreamAdaptor(Option>>); impl SinkAdaptor { pub fn send(&mut self, item: T) { - let sender = self.0.take().unwrap(); - let sending = sender.send(item); - self.0 = Some(sending.wait().unwrap()); + mpsc::UnboundedSender::send(&mut self.0, item).unwrap(); } } @@ -39,7 +37,7 @@ pub fn adapt(transport: S) -> (SinkAdaptor, E: Send + 'static, { let (receiver_tx, receiver_rx) = mpsc::channel(0); - let (sender_tx, sender_rx) = mpsc::channel(0); + let (sender_tx, sender_rx) = mpsc::unbounded(); let (sink, stream) = transport.split(); @@ -55,7 +53,7 @@ pub fn adapt(transport: S) -> (SinkAdaptor, let task = (receiver_task, sender_task).into_future() .map(|((), ())| ()).boxed(); - (SinkAdaptor(Some(sender_tx)), + (SinkAdaptor(sender_tx), StreamAdaptor(Some(receiver_rx)), task) } diff --git a/src/lib.rs b/src/lib.rs index 21e445f1..e4a173e5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,7 +16,6 @@ extern crate bit_set; extern crate byteorder; extern crate crypto; -extern crate eventual; extern crate getopts; extern crate hyper; extern crate linear_map; @@ -50,19 +49,18 @@ extern crate portaudio; extern crate libpulse_sys; #[macro_use] mod component; -pub mod album_cover; + pub mod audio_backend; pub mod audio_decrypt; -pub mod audio_file2; pub mod audio_file; pub mod audio_key; pub mod cache; +pub mod channel; pub mod diffie_hellman; pub mod mercury; pub mod metadata; pub mod player; pub mod session; -pub mod stream; pub mod util; pub mod version; diff --git a/src/player.rs b/src/player.rs index 0220b5a3..9ccc8b16 100644 --- a/src/player.rs +++ b/src/player.rs @@ -5,11 +5,12 @@ use std::io::{Read, Seek}; use vorbis; use futures::{future, Future}; +use audio_file::AudioFile; use audio_decrypt::AudioDecrypt; use audio_backend::Sink; use metadata::{FileFormat, Track}; use session::{Bitrate, Session}; -use util::{self, ReadSeek, SpotifyId, Subfile}; +use util::{self, SpotifyId, Subfile}; pub use spirc::PlayStatus; #[cfg(not(feature = "with-tremor"))] @@ -177,7 +178,9 @@ fn find_available_alternative<'a>(session: &Session, track: &'a Track) -> Option } } -fn load_track(session: &Session, track_id: SpotifyId) -> Option>>>> { +fn load_track(session: &Session, track_id: SpotifyId) + -> Option>>> +{ let track = session.metadata().get::(track_id).wait().unwrap(); info!("Loading track \"{}\"", track.name); @@ -208,7 +211,10 @@ fn load_track(session: &Session, track_id: SpotifyId) -> Option, cache: Box, - stream: Mutex, + rx_connection: Mutex), io::Error>>, tx_connection: Mutex)>>, audio_key: Lazy, + audio_file: Lazy, + channel: Lazy, mercury: Lazy, metadata: Lazy, + + handle: Remote, } #[derive(Clone)] @@ -91,9 +89,10 @@ impl Session { { let access_point = apresolve_or_fallback::(&handle); + let handle_ = handle.clone(); let connection = access_point.and_then(move |addr| { info!("Connecting to AP \"{}\"", addr); - connection::connect::<&str>(&addr, &handle) + connection::connect::<&str>(&addr, &handle_) }); let device_id = config.device_id.clone(); @@ -105,15 +104,19 @@ impl Session { info!("Authenticated !"); cache.put_credentials(&reusable_credentials); - let (session, task) = Session::create(transport, config, cache, reusable_credentials.username.clone()); + let (session, task) = Session::create( + &handle, transport, config, cache, reusable_credentials.username.clone() + ); + (session, task) }); Box::new(result) } - fn create(transport: connection::Transport, config: Config, - cache: Box, username: String) + fn create(handle: &Handle, transport: connection::Transport, + config: Config, cache: Box, + username: String) -> (Session, BoxFuture<(), io::Error>) { let transport = transport.map(|(cmd, data)| (cmd, data.as_ref().to_owned())); @@ -130,11 +133,14 @@ impl Session { tx_connection: Mutex::new(tx), cache: cache, - stream: Mutex::new(StreamManager::new()), audio_key: Lazy::new(), + audio_file: Lazy::new(), + channel: Lazy::new(), mercury: Lazy::new(), metadata: Lazy::new(), + + handle: handle.remote().clone(), })); (session, task) @@ -144,6 +150,14 @@ impl Session { self.0.audio_key.get(|| AudioKeyManager::new(self.weak())) } + pub fn audio_file(&self) -> &AudioFileManager { + self.0.audio_file.get(|| AudioFileManager::new(self.weak())) + } + + pub fn channel(&self) -> &ChannelManager { + self.0.channel.get(|| ChannelManager::new(self.weak())) + } + pub fn mercury(&self) -> &MercuryManager { self.0.mercury.get(|| MercuryManager::new(self.weak())) } @@ -152,17 +166,25 @@ impl Session { self.0.metadata.get(|| MetadataManager::new(self.weak())) } + pub fn spawn(&self, f: F) + where F: FnOnce(&Handle) -> R + Send + 'static, + R: IntoFuture, + R::Future: 'static + { + self.0.handle.spawn(f) + } + pub fn poll(&self) { let (cmd, data) = self.recv(); match cmd { 0x4 => self.send_packet(0x49, data), 0x4a => (), - 0x9 | 0xa => self.0.stream.lock().unwrap().handle(cmd, data, self), 0x1b => { self.0.data.write().unwrap().country = String::from_utf8(data).unwrap(); } + 0x9 | 0xa => self.channel().dispatch(cmd, data), 0xd | 0xe => self.audio_key().dispatch(cmd, data), 0xb2...0xb6 => self.mercury().dispatch(cmd, data), _ => (), @@ -177,60 +199,6 @@ impl Session { self.0.tx_connection.lock().unwrap().send((cmd, data)) } - /* - pub fn audio_key(&self, track: SpotifyId, file_id: FileId) -> Future { - self.0.cache - .get_audio_key(track, file_id) - .map(Future::of) - .unwrap_or_else(|| { - let self_ = self.clone(); - self.0.audio_key.lock().unwrap() - .request(self, track, file_id) - .map(move |key| { - self_.0.cache.put_audio_key(track, file_id, key); - key - }) - }) - } - */ - - pub fn audio_file(&self, file_id: FileId) -> Box { - self.0.cache - .get_file(file_id) - .unwrap_or_else(|| { - let (audio_file, complete_rx) = AudioFile::new(self, file_id); - - let self_ = self.clone(); - complete_rx.map(move |mut complete_file| { - self_.0.cache.put_file(file_id, &mut complete_file) - }).fire(); - - Box::new(audio_file.await().unwrap()) - }) - } - - pub fn album_cover(&self, file_id: FileId) -> eventual::Future, ()> { - self.0.cache - .get_file(file_id) - .map(|mut f| { - let mut data = Vec::new(); - f.read_to_end(&mut data).unwrap(); - Future::of(data) - }) - .unwrap_or_else(|| { - let self_ = self.clone(); - AlbumCover::get(file_id, self) - .map(move |data| { - self_.0.cache.put_file(file_id, &mut Cursor::new(&data)); - data - }) - }) - } - - pub fn stream(&self, handler: Box) { - self.0.stream.lock().unwrap().create(handler, self) - } - pub fn cache(&self) -> &Cache { self.0.cache.as_ref() } diff --git a/src/stream.rs b/src/stream.rs deleted file mode 100644 index 4b802ffd..00000000 --- a/src/stream.rs +++ /dev/null @@ -1,169 +0,0 @@ -use byteorder::{BigEndian, ByteOrder, ReadBytesExt}; -use std::collections::HashMap; -use std::collections::hash_map::Entry; -use std::io::{Cursor, Seek, SeekFrom}; -use session::{Session, PacketHandler}; - -pub enum Response { - Continue(H), - Spawn(S), - Close, -} - -impl Response { - pub fn boxed(self) -> Response> { - match self { - Response::Continue(handler) => Response::Continue(Box::new(handler)), - Response::Spawn(handler) => Response::Spawn(Box::new(handler)), - Response::Close => Response::Close, - } - } -} - -pub trait Handler: Send { - fn on_create(self, channel_id: ChannelId, session: &Session) -> Response where Self: Sized; - fn on_header(self, header_id: u8, header_data: &[u8], session: &Session) -> Response where Self: Sized; - fn on_data(self, data: &[u8], session: &Session) -> Response where Self: Sized; - fn on_error(self, session: &Session) -> Response where Self: Sized; - fn on_close(self, session: &Session) -> Response where Self: Sized; - - fn box_on_create(self: Box, channel_id: ChannelId, session: &Session) -> Response>; - fn box_on_header(self: Box, header_id: u8, header_data: &[u8], session: &Session) -> Response>; - fn box_on_data(self: Box, data: &[u8], session: &Session) -> Response>; - fn box_on_error(self: Box, session: &Session) -> Response>; - fn box_on_close(self: Box, session: &Session) -> Response>; -} - -pub type ChannelId = u16; - -enum ChannelMode { - Header, - Data -} - -struct Channel(ChannelMode, Box); - -impl Channel { - fn handle_packet(self, cmd: u8, data: Vec, session: &Session) -> Response> { - let Channel(mode, mut handler) = self; - - let mut packet = Cursor::new(&data as &[u8]); - packet.read_u16::().unwrap(); // Skip channel id - - if cmd == 0xa { - trace!("error: {} {}", data.len(), packet.read_u16::().unwrap()); - return match handler.box_on_error(session) { - Response::Continue(_) => Response::Close, - Response::Spawn(f) => Response::Spawn(f), - Response::Close => Response::Close, - }; - } - - match mode { - ChannelMode::Header => { - let mut length = 0; - - while packet.position() < data.len() as u64 { - length = packet.read_u16::().unwrap(); - if length > 0 { - let header_id = packet.read_u8().unwrap(); - let header_data = &data[packet.position() as usize .. packet.position() as usize + length as usize - 1]; - - handler = match handler.box_on_header(header_id, header_data, session) { - Response::Continue(handler) => handler, - Response::Spawn(f) => return Response::Spawn(f), - Response::Close => return Response::Close, - }; - - packet.seek(SeekFrom::Current(length as i64 - 1)).unwrap(); - } - } - - if length == 0 { - Response::Continue(Channel(ChannelMode::Data, handler)) - } else { - Response::Continue(Channel(ChannelMode::Header, handler)) - } - } - ChannelMode::Data => { - if packet.position() < data.len() as u64 { - let event_data = &data[packet.position() as usize..]; - match handler.box_on_data(event_data, session) { - Response::Continue(handler) => Response::Continue(Channel(ChannelMode::Data, handler)), - Response::Spawn(f) => Response::Spawn(f), - Response::Close => Response::Close, - } - } else { - match handler.box_on_close(session) { - Response::Continue(_) => Response::Close, - Response::Spawn(f) => Response::Spawn(f), - Response::Close => Response::Close, - } - } - } - } - } -} - -pub struct StreamManager { - next_id: ChannelId, - channels: HashMap>, -} - -impl StreamManager { - pub fn new() -> StreamManager { - StreamManager { - next_id: 0, - channels: HashMap::new(), - } - } - - pub fn create(&mut self, handler: Box, session: &Session) { - let channel_id = self.next_id; - self.next_id += 1; - - trace!("allocated stream {}", channel_id); - - match handler.box_on_create(channel_id, session) { - Response::Continue(handler) => { - self.channels.insert(channel_id, Some(Channel(ChannelMode::Header, handler))); - } - Response::Spawn(handler) => self.create(handler, session), - Response::Close => (), - } - } -} - -impl PacketHandler for StreamManager { - fn handle(&mut self, cmd: u8, data: Vec, session: &Session) { - let id: ChannelId = BigEndian::read_u16(&data[0..2]); - - let spawn = if let Entry::Occupied(mut entry) = self.channels.entry(id) { - if let Some(channel) = entry.get_mut().take() { - match channel.handle_packet(cmd, data, session) { - Response::Continue(channel) => { - entry.insert(Some(channel)); - None - } - Response::Spawn(f) => { - entry.remove(); - Some(f) - } - Response::Close => { - entry.remove(); - None - } - } - } else { - None - } - } else { - None - }; - - - if let Some(s) = spawn { - self.create(s, session); - } - } -} diff --git a/src/util/mod.rs b/src/util/mod.rs index b2afdcf0..8bd41199 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -22,19 +22,6 @@ pub fn rand_vec(rng: &mut G, size: usize) -> Vec { rng.gen_iter().take(size).collect() } -pub trait IgnoreExt { - fn ignore(self); -} - -impl IgnoreExt for Result { - fn ignore(self) { - match self { - Ok(_) => (), - Err(_) => (), - } - } -} - pub fn now_ms() -> i64 { let dur = match SystemTime::now().duration_since(UNIX_EPOCH) { Ok(dur) => dur, @@ -135,28 +122,3 @@ impl SeqGenerator { mem::replace(&mut self.0, value) } } - -use std::sync::Mutex; -use std::cell::UnsafeCell; - -pub struct Lazy(Mutex, UnsafeCell>); -unsafe impl Sync for Lazy {} -unsafe impl Send for Lazy {} - -impl Lazy { - pub fn new() -> Lazy { - Lazy(Mutex::new(false), UnsafeCell::new(None)) - } - - pub fn get T>(&self, f: F) -> &T { - let mut inner = self.0.lock().unwrap(); - if !*inner { - unsafe { - *self.1.get() = Some(f()); - } - *inner = true; - } - - unsafe { &*self.1.get() }.as_ref().unwrap() - } -}