diff --git a/Cargo.lock b/Cargo.lock index a46d537e..1bee8116 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -295,6 +295,7 @@ dependencies = [ "serde_derive 0.9.15 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 0.9.10 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-signal 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)", "url 1.6.0 (registry+https://github.com/rust-lang/crates.io-index)", "vergen 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -324,6 +325,7 @@ version = "0.1.0" dependencies = [ "base64 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", "byteorder 1.2.1 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "error-chain 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.1.17 (registry+https://github.com/rust-lang/crates.io-index)", "hyper 0.11.14 (registry+https://github.com/rust-lang/crates.io-index)", @@ -343,6 +345,7 @@ dependencies = [ "serde_json 0.9.10 (registry+https://github.com/rust-lang/crates.io-index)", "shannon 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-core 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-io 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", "uuid 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", "vergen 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", ] diff --git a/Cargo.toml b/Cargo.toml index 82bef079..277e149f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,7 @@ serde = "0.9.6" serde_derive = "0.9.6" serde_json = "0.9.5" tokio-core = "0.1.2" +tokio-io = "0.1" tokio-signal = "0.1.2" url = "1.3" diff --git a/core/Cargo.toml b/core/Cargo.toml index bf34de0b..86722f07 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -10,6 +10,7 @@ path = "../protocol" [dependencies] base64 = "0.5.0" byteorder = "1.0" +bytes = "0.4" error-chain = { version = "0.9.0", default_features = false } futures = "0.1.8" hyper = "0.11.2" @@ -27,6 +28,7 @@ serde_derive = "0.9.6" serde_json = "0.9.5" shannon = "0.2.0" tokio-core = "0.1.2" +tokio-io = "0.1" uuid = { version = "0.4", features = ["v4"] } [build-dependencies] diff --git a/core/build.rs b/core/build.rs index 01fd14a1..5a2e5db8 100644 --- a/core/build.rs +++ b/core/build.rs @@ -39,5 +39,7 @@ pub fn build_id() -> &'static str {{ protobuf_macros::expand("src/lib.in.rs", &out.join("lib.rs")).unwrap(); println!("cargo:rerun-if-changed=src/lib.in.rs"); - println!("cargo:rerun-if-changed=src/connection"); + println!("cargo:rerun-if-changed=src/connection/mod.rs"); + println!("cargo:rerun-if-changed=src/connection/codec.rs"); + println!("cargo:rerun-if-changed=src/connection/handshake.rs"); } diff --git a/core/src/audio_key.rs b/core/src/audio_key.rs index e19c5b87..41424880 100644 --- a/core/src/audio_key.rs +++ b/core/src/audio_key.rs @@ -1,9 +1,9 @@ use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; +use bytes::Bytes; use futures::sync::oneshot; use futures::{Async, Future, Poll}; use std::collections::HashMap; use std::io::Write; -use tokio_core::io::EasyBuf; use util::SeqGenerator; use util::{SpotifyId, FileId}; @@ -22,8 +22,8 @@ component! { } impl AudioKeyManager { - pub fn dispatch(&self, cmd: u8, mut data: EasyBuf) { - let seq = BigEndian::read_u32(data.drain_to(4).as_ref()); + pub fn dispatch(&self, cmd: u8, mut data: Bytes) { + let seq = BigEndian::read_u32(data.split_to(4).as_ref()); let sender = self.lock(|inner| inner.pending.remove(&seq)); diff --git a/core/src/channel.rs b/core/src/channel.rs index 8370da44..d7c74e50 100644 --- a/core/src/channel.rs +++ b/core/src/channel.rs @@ -1,15 +1,15 @@ use byteorder::{BigEndian, ByteOrder}; +use bytes::Bytes; use futures::sync::{BiLock, mpsc}; use futures::{Poll, Async, Stream}; use std::collections::HashMap; -use tokio_core::io::EasyBuf; use util::SeqGenerator; component! { ChannelManager : ChannelManagerInner { sequence: SeqGenerator = SeqGenerator::new(0), - channels: HashMap> = HashMap::new(), + channels: HashMap> = HashMap::new(), } } @@ -17,7 +17,7 @@ component! { pub struct ChannelError; pub struct Channel { - receiver: mpsc::UnboundedReceiver<(u8, EasyBuf)>, + receiver: mpsc::UnboundedReceiver<(u8, Bytes)>, state: ChannelState, } @@ -26,12 +26,12 @@ pub struct ChannelData(BiLock); pub enum ChannelEvent { Header(u8, Vec), - Data(EasyBuf), + Data(Bytes), } #[derive(Clone)] enum ChannelState { - Header(EasyBuf), + Header(Bytes), Data, Closed, } @@ -48,16 +48,16 @@ impl ChannelManager { let channel = Channel { receiver: rx, - state: ChannelState::Header(EasyBuf::new()), + state: ChannelState::Header(Bytes::new()), }; (seq, channel) } - pub fn dispatch(&self, cmd: u8, mut data: EasyBuf) { + pub fn dispatch(&self, cmd: u8, mut data: Bytes) { use std::collections::hash_map::Entry; - let id: u16 = BigEndian::read_u16(data.drain_to(2).as_ref()); + let id: u16 = BigEndian::read_u16(data.split_to(2).as_ref()); self.lock(|inner| { if let Entry::Occupied(entry) = inner.channels.entry(id) { @@ -68,7 +68,7 @@ impl ChannelManager { } impl Channel { - fn recv_packet(&mut self) -> Poll { + fn recv_packet(&mut self) -> Poll { let (cmd, packet) = match self.receiver.poll() { Ok(Async::Ready(t)) => t.expect("channel closed"), Ok(Async::NotReady) => return Ok(Async::NotReady), @@ -107,13 +107,13 @@ impl Stream for Channel { data = try_ready!(self.recv_packet()); } - let length = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize; + let length = BigEndian::read_u16(data.split_to(2).as_ref()) as usize; if length == 0 { assert_eq!(data.len(), 0); self.state = ChannelState::Data; } else { - let header_id = data.drain_to(1).as_ref()[0]; - let header_data = data.drain_to(length - 1).as_ref().to_owned(); + let header_id = data.split_to(1).as_ref()[0]; + let header_data = data.split_to(length - 1).as_ref().to_owned(); self.state = ChannelState::Header(data); @@ -139,7 +139,7 @@ impl Stream for Channel { } impl Stream for ChannelData { - type Item = EasyBuf; + type Item = Bytes; type Error = ChannelError; fn poll(&mut self) -> Poll, Self::Error> { diff --git a/core/src/connection/codec.rs b/core/src/connection/codec.rs index 6529d3d9..6fbede13 100644 --- a/core/src/connection/codec.rs +++ b/core/src/connection/codec.rs @@ -1,7 +1,8 @@ -use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; +use byteorder::{BigEndian, ByteOrder}; +use bytes::{Bytes, BytesMut, BufMut}; use shannon::Shannon; use std::io; -use tokio_core::io::{Codec, EasyBuf}; +use tokio_io::codec::{Decoder, Encoder}; const HEADER_SIZE: usize = 3; const MAC_SIZE: usize = 4; @@ -34,16 +35,17 @@ impl APCodec { } } -impl Codec for APCodec { - type Out = (u8, Vec); - type In = (u8, EasyBuf); +impl Encoder for APCodec { + type Item = (u8, Vec); + type Error = io::Error; - fn encode(&mut self, item: (u8, Vec), buf: &mut Vec) -> io::Result<()> { + fn encode(&mut self, item: (u8, Vec), buf: &mut BytesMut) -> io::Result<()> { let (cmd, payload) = item; let offset = buf.len(); - buf.write_u8(cmd).unwrap(); - buf.write_u16::(payload.len() as u16).unwrap(); + buf.reserve(3 + payload.len()); + buf.put_u8(cmd); + buf.put_u16::(payload.len() as u16); buf.extend_from_slice(&payload); self.encode_cipher.nonce_u32(self.encode_nonce); @@ -57,12 +59,17 @@ impl Codec for APCodec { Ok(()) } +} - fn decode(&mut self, buf: &mut EasyBuf) -> io::Result> { +impl Decoder for APCodec { + type Item = (u8, Bytes); + type Error = io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> io::Result> { if let DecodeState::Header = self.decode_state { if buf.len() >= HEADER_SIZE { let mut header = [0u8; HEADER_SIZE]; - header.copy_from_slice(buf.drain_to(HEADER_SIZE).as_slice()); + header.copy_from_slice(buf.split_to(HEADER_SIZE).as_ref()); self.decode_cipher.nonce_u32(self.decode_nonce); self.decode_nonce += 1; @@ -79,13 +86,13 @@ impl Codec for APCodec { if buf.len() >= size + MAC_SIZE { self.decode_state = DecodeState::Header; - let mut payload = buf.drain_to(size + MAC_SIZE); + let mut payload = buf.split_to(size + MAC_SIZE); - self.decode_cipher.decrypt(&mut payload.get_mut()[..size]); + self.decode_cipher.decrypt(&mut payload.get_mut(..size).unwrap()); let mac = payload.split_off(size); - self.decode_cipher.check_mac(mac.as_slice())?; + self.decode_cipher.check_mac(mac.as_ref())?; - return Ok(Some((cmd, payload))); + return Ok(Some((cmd, payload.freeze()))); } } diff --git a/core/src/connection/handshake.rs b/core/src/connection/handshake.rs index 83b0b37a..5b94f709 100644 --- a/core/src/connection/handshake.rs +++ b/core/src/connection/handshake.rs @@ -3,9 +3,11 @@ use crypto::hmac::Hmac; use crypto::mac::Mac;use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use protobuf::{self, Message, MessageStatic}; use rand::thread_rng; -use std::io::{self, Read, Write}; +use std::io::{self, Read}; use std::marker::PhantomData; -use tokio_core::io::{Io, Framed, write_all, WriteAll, read_exact, ReadExact, Window}; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_io::codec::Framed; +use tokio_io::io::{write_all, WriteAll, read_exact, ReadExact, Window}; use futures::{Poll, Async, Future}; use diffie_hellman::DHLocalKeys; @@ -25,7 +27,7 @@ enum HandshakeState { ClientResponse(Option, WriteAll>), } -pub fn handshake(connection: T) -> Handshake { +pub fn handshake(connection: T) -> Handshake { let local_keys = DHLocalKeys::random(&mut thread_rng()); let client_hello = client_hello(connection, local_keys.public_key()); @@ -35,7 +37,7 @@ pub fn handshake(connection: T) -> Handshake { } } -impl Future for Handshake { +impl Future for Handshake { type Item = Framed; type Error = io::Error; @@ -78,7 +80,7 @@ impl Future for Handshake { } } -fn client_hello(connection: T, gc: Vec) -> WriteAll> { +fn client_hello(connection: T, gc: Vec) -> WriteAll> { let packet = protobuf_init!(ClientHello::new(), { build_info => { product: protocol::keyexchange::Product::PRODUCT_PARTNER, @@ -104,7 +106,7 @@ fn client_hello(connection: T, gc: Vec) -> WriteAll> { write_all(connection, buffer) } -fn client_response(connection: T, challenge: Vec) -> WriteAll> { +fn client_response(connection: T, challenge: Vec) -> WriteAll> { let packet = protobuf_init!(ClientResponsePlaintext::new(), { login_crypto_response.diffie_hellman => { hmac: challenge @@ -126,14 +128,14 @@ enum RecvPacket { Body(ReadExact>>, PhantomData), } -fn recv_packet(connection: T, acc: Vec) -> RecvPacket +fn recv_packet(connection: T, acc: Vec) -> RecvPacket where T: Read, M: MessageStatic { RecvPacket::Header(read_into_accumulator(connection, 4, acc), PhantomData) } -impl Future for RecvPacket +impl Future for RecvPacket where T: Read, M: MessageStatic { @@ -165,7 +167,7 @@ impl Future for RecvPacket } } -fn read_into_accumulator(connection: T, size: usize, mut acc: Vec) -> ReadExact>> { +fn read_into_accumulator(connection: T, size: usize, mut acc: Vec) -> ReadExact>> { let offset = acc.len(); acc.resize(offset + size, 0); diff --git a/core/src/connection/mod.rs b/core/src/connection/mod.rs index 5df5c097..91c220b6 100644 --- a/core/src/connection/mod.rs +++ b/core/src/connection/mod.rs @@ -9,7 +9,7 @@ use std::io; use std::net::ToSocketAddrs; use tokio_core::net::TcpStream; use tokio_core::reactor::Handle; -use tokio_core::io::Framed; +use tokio_io::codec::Framed; use protobuf::{self, Message}; use authentication::Credentials; diff --git a/core/src/lib.rs b/core/src/lib.rs index 052c1236..17515a49 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,8 +1,5 @@ #![cfg_attr(feature = "cargo-clippy", allow(unused_io_amount))] -// TODO: many items from tokio-core::io have been deprecated in favour of tokio-io -#![allow(deprecated)] - #[macro_use] extern crate error_chain; #[macro_use] extern crate futures; #[macro_use] extern crate lazy_static; @@ -11,6 +8,7 @@ extern crate base64; extern crate byteorder; +extern crate bytes; extern crate crypto; extern crate hyper; extern crate num_bigint; @@ -23,6 +21,7 @@ extern crate serde; extern crate serde_json; extern crate shannon; extern crate tokio_core; +extern crate tokio_io; extern crate uuid; extern crate librespot_protocol as protocol; diff --git a/core/src/mercury/mod.rs b/core/src/mercury/mod.rs index 37514876..b37ed92b 100644 --- a/core/src/mercury/mod.rs +++ b/core/src/mercury/mod.rs @@ -1,11 +1,11 @@ use byteorder::{BigEndian, ByteOrder}; +use bytes::Bytes; use futures::sync::{oneshot, mpsc}; use futures::{Async, Poll, Future}; use protobuf; use protocol; use std::collections::HashMap; use std::mem; -use tokio_core::io::EasyBuf; use util::SeqGenerator; @@ -136,12 +136,12 @@ impl MercuryManager { })) } - pub fn dispatch(&self, cmd: u8, mut data: EasyBuf) { - let seq_len = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize; - let seq = data.drain_to(seq_len).as_ref().to_owned(); + pub fn dispatch(&self, cmd: u8, mut data: Bytes) { + let seq_len = BigEndian::read_u16(data.split_to(2).as_ref()) as usize; + let seq = data.split_to(seq_len).as_ref().to_owned(); - let flags = data.drain_to(1).as_ref()[0]; - let count = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize; + let flags = data.split_to(1).as_ref()[0]; + let count = BigEndian::read_u16(data.split_to(2).as_ref()) as usize; let pending = self.lock(|inner| inner.pending.remove(&seq)); @@ -181,9 +181,9 @@ impl MercuryManager { } } - fn parse_part(data: &mut EasyBuf) -> Vec { - let size = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize; - data.drain_to(size).as_ref().to_owned() + fn parse_part(data: &mut Bytes) -> Vec { + let size = BigEndian::read_u16(data.split_to(2).as_ref()) as usize; + data.split_to(size).as_ref().to_owned() } fn complete_request(&self, cmd: u8, mut pending: MercuryPending) { diff --git a/core/src/session.rs b/core/src/session.rs index 1534d3ed..e3b474bf 100644 --- a/core/src/session.rs +++ b/core/src/session.rs @@ -1,10 +1,10 @@ +use bytes::Bytes; use crypto::digest::Digest; use crypto::sha1::Sha1; use futures::sync::mpsc; use futures::{Future, Stream, IntoFuture, Poll, Async}; use std::io; use std::sync::{RwLock, Arc, Weak}; -use tokio_core::io::EasyBuf; use tokio_core::reactor::{Handle, Remote}; use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering}; @@ -156,7 +156,7 @@ impl Session { } #[cfg_attr(feature = "cargo-clippy", allow(match_same_arms))] - fn dispatch(&self, cmd: u8, data: EasyBuf) { + fn dispatch(&self, cmd: u8, data: Bytes) { match cmd { 0x4 => { self.debug_info(); @@ -229,10 +229,10 @@ impl Drop for SessionInternal { } struct DispatchTask(S, SessionWeak) - where S: Stream; + where S: Stream; impl Future for DispatchTask - where S: Stream + where S: Stream { type Item = (); type Error = S::Error; @@ -253,7 +253,7 @@ impl Future for DispatchTask } impl Drop for DispatchTask - where S: Stream + where S: Stream { fn drop(&mut self) { debug!("drop Dispatch"); diff --git a/src/main.rs b/src/main.rs index 2e05b607..85131ee9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,10 @@ -// TODO: many items from tokio-core::io have been deprecated in favour of tokio-io -#![allow(deprecated)] - #[macro_use] extern crate log; extern crate env_logger; extern crate futures; extern crate getopts; extern crate librespot; extern crate tokio_core; +extern crate tokio_io; extern crate tokio_signal; use env_logger::LogBuilder; @@ -17,7 +15,7 @@ use std::path::PathBuf; use std::process::exit; use std::str::FromStr; use tokio_core::reactor::{Handle, Core}; -use tokio_core::io::IoStream; +use tokio_io::IoStream; use std::mem; use librespot::core::authentication::{get_credentials, Credentials};