mirror of
https://github.com/librespot-org/librespot.git
synced 2024-12-18 17:11:53 +00:00
Merge pull request #9 from brain0/fix_deprecated
Update dependencies and fix the use of deprecated functionality
This commit is contained in:
commit
0105c53af4
23 changed files with 462 additions and 432 deletions
|
@ -1,6 +1,6 @@
|
|||
language: rust
|
||||
rust:
|
||||
- 1.17.0
|
||||
- 1.18.0
|
||||
- stable
|
||||
- beta
|
||||
- nightly
|
||||
|
@ -24,13 +24,11 @@ before_script:
|
|||
script:
|
||||
- cargo build --no-default-features
|
||||
- cargo build --no-default-features --features "with-tremor"
|
||||
- cargo build --no-default-features --features "with-lewton";
|
||||
- cargo build --no-default-features --features "portaudio-backend"
|
||||
- cargo build --no-default-features --features "pulseaudio-backend"
|
||||
- cargo build --no-default-features --features "alsa-backend"
|
||||
- cargo build --no-default-features --target armv7-unknown-linux-gnueabihf
|
||||
- if [[ $TRAVIS_RUST_VERSION != *"1.17.0"* ]]; then
|
||||
cargo build --no-default-features --features "with-lewton";
|
||||
fi
|
||||
|
||||
notifications:
|
||||
email: false
|
||||
|
|
584
Cargo.lock
generated
584
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -46,6 +46,7 @@ serde = "0.9.6"
|
|||
serde_derive = "0.9.6"
|
||||
serde_json = "0.9.5"
|
||||
tokio-core = "0.1.2"
|
||||
tokio-io = "0.1"
|
||||
tokio-signal = "0.1.2"
|
||||
url = "1.3"
|
||||
|
||||
|
|
|
@ -21,7 +21,7 @@ I've done noting more than make this pretty so big thanks to:
|
|||
[brain0](https://github.com/brain0/) for [making pluseaudio more robust against audio failures](https://github.com/ComlOnline/librespot/pull/6)
|
||||
|
||||
## Building
|
||||
Rust 1.17.0 or later is required to build librespot.
|
||||
Rust 1.18.0 or later is required to build librespot.
|
||||
|
||||
**If you are building librespot on macOS, the homebrew provided rust may fail due to the way in which homebrew installs rust. In this case, uninstall the homebrew version of rust and use [rustup](https://www.rustup.rs/), and librespot should then build.**
|
||||
|
||||
|
|
|
@ -340,7 +340,7 @@ impl Seek for AudioFileStreaming {
|
|||
// Notify the fetch thread to get the correct block
|
||||
// This can fail if fetch thread has completed, in which case the
|
||||
// block is ready. Just ignore the error.
|
||||
let _ = self.seek.send(self.position);
|
||||
let _ = self.seek.unbounded_send(self.position);
|
||||
Ok(self.position)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -10,6 +10,7 @@ path = "../protocol"
|
|||
[dependencies]
|
||||
base64 = "0.5.0"
|
||||
byteorder = "1.0"
|
||||
bytes = "0.4"
|
||||
error-chain = { version = "0.9.0", default_features = false }
|
||||
futures = "0.1.8"
|
||||
hyper = "0.11.2"
|
||||
|
@ -27,6 +28,7 @@ serde_derive = "0.9.6"
|
|||
serde_json = "0.9.5"
|
||||
shannon = "0.2.0"
|
||||
tokio-core = "0.1.2"
|
||||
tokio-io = "0.1"
|
||||
uuid = { version = "0.4", features = ["v4"] }
|
||||
|
||||
[build-dependencies]
|
||||
|
|
|
@ -39,5 +39,7 @@ pub fn build_id() -> &'static str {{
|
|||
protobuf_macros::expand("src/lib.in.rs", &out.join("lib.rs")).unwrap();
|
||||
|
||||
println!("cargo:rerun-if-changed=src/lib.in.rs");
|
||||
println!("cargo:rerun-if-changed=src/connection");
|
||||
println!("cargo:rerun-if-changed=src/connection/mod.rs");
|
||||
println!("cargo:rerun-if-changed=src/connection/codec.rs");
|
||||
println!("cargo:rerun-if-changed=src/connection/handshake.rs");
|
||||
}
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
|
||||
use bytes::Bytes;
|
||||
use futures::sync::oneshot;
|
||||
use futures::{Async, Future, Poll};
|
||||
use std::collections::HashMap;
|
||||
use std::io::Write;
|
||||
use tokio_core::io::EasyBuf;
|
||||
|
||||
use util::SeqGenerator;
|
||||
use util::{SpotifyId, FileId};
|
||||
|
@ -22,8 +22,8 @@ component! {
|
|||
}
|
||||
|
||||
impl AudioKeyManager {
|
||||
pub fn dispatch(&self, cmd: u8, mut data: EasyBuf) {
|
||||
let seq = BigEndian::read_u32(data.drain_to(4).as_ref());
|
||||
pub fn dispatch(&self, cmd: u8, mut data: Bytes) {
|
||||
let seq = BigEndian::read_u32(data.split_to(4).as_ref());
|
||||
|
||||
let sender = self.lock(|inner| inner.pending.remove(&seq));
|
||||
|
||||
|
@ -32,11 +32,11 @@ impl AudioKeyManager {
|
|||
0xd => {
|
||||
let mut key = [0u8; 16];
|
||||
key.copy_from_slice(data.as_ref());
|
||||
sender.complete(Ok(AudioKey(key)));
|
||||
let _ = sender.send(Ok(AudioKey(key)));
|
||||
}
|
||||
0xe => {
|
||||
warn!("error audio key {:x} {:x}", data.as_ref()[0], data.as_ref()[1]);
|
||||
sender.complete(Err(AudioKeyError));
|
||||
let _ = sender.send(Err(AudioKeyError));
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
|
|
|
@ -1,15 +1,15 @@
|
|||
use byteorder::{BigEndian, ByteOrder};
|
||||
use bytes::Bytes;
|
||||
use futures::sync::{BiLock, mpsc};
|
||||
use futures::{Poll, Async, Stream};
|
||||
use std::collections::HashMap;
|
||||
use tokio_core::io::EasyBuf;
|
||||
|
||||
use util::SeqGenerator;
|
||||
|
||||
component! {
|
||||
ChannelManager : ChannelManagerInner {
|
||||
sequence: SeqGenerator<u16> = SeqGenerator::new(0),
|
||||
channels: HashMap<u16, mpsc::UnboundedSender<(u8, EasyBuf)>> = HashMap::new(),
|
||||
channels: HashMap<u16, mpsc::UnboundedSender<(u8, Bytes)>> = HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -17,7 +17,7 @@ component! {
|
|||
pub struct ChannelError;
|
||||
|
||||
pub struct Channel {
|
||||
receiver: mpsc::UnboundedReceiver<(u8, EasyBuf)>,
|
||||
receiver: mpsc::UnboundedReceiver<(u8, Bytes)>,
|
||||
state: ChannelState,
|
||||
}
|
||||
|
||||
|
@ -26,12 +26,12 @@ pub struct ChannelData(BiLock<Channel>);
|
|||
|
||||
pub enum ChannelEvent {
|
||||
Header(u8, Vec<u8>),
|
||||
Data(EasyBuf),
|
||||
Data(Bytes),
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
enum ChannelState {
|
||||
Header(EasyBuf),
|
||||
Header(Bytes),
|
||||
Data,
|
||||
Closed,
|
||||
}
|
||||
|
@ -48,27 +48,27 @@ impl ChannelManager {
|
|||
|
||||
let channel = Channel {
|
||||
receiver: rx,
|
||||
state: ChannelState::Header(EasyBuf::new()),
|
||||
state: ChannelState::Header(Bytes::new()),
|
||||
};
|
||||
|
||||
(seq, channel)
|
||||
}
|
||||
|
||||
pub fn dispatch(&self, cmd: u8, mut data: EasyBuf) {
|
||||
pub fn dispatch(&self, cmd: u8, mut data: Bytes) {
|
||||
use std::collections::hash_map::Entry;
|
||||
|
||||
let id: u16 = BigEndian::read_u16(data.drain_to(2).as_ref());
|
||||
let id: u16 = BigEndian::read_u16(data.split_to(2).as_ref());
|
||||
|
||||
self.lock(|inner| {
|
||||
if let Entry::Occupied(entry) = inner.channels.entry(id) {
|
||||
let _ = entry.get().send((cmd, data));
|
||||
let _ = entry.get().unbounded_send((cmd, data));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl Channel {
|
||||
fn recv_packet(&mut self) -> Poll<EasyBuf, ChannelError> {
|
||||
fn recv_packet(&mut self) -> Poll<Bytes, ChannelError> {
|
||||
let (cmd, packet) = match self.receiver.poll() {
|
||||
Ok(Async::Ready(t)) => t.expect("channel closed"),
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
|
@ -107,13 +107,13 @@ impl Stream for Channel {
|
|||
data = try_ready!(self.recv_packet());
|
||||
}
|
||||
|
||||
let length = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize;
|
||||
let length = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
|
||||
if length == 0 {
|
||||
assert_eq!(data.len(), 0);
|
||||
self.state = ChannelState::Data;
|
||||
} else {
|
||||
let header_id = data.drain_to(1).as_ref()[0];
|
||||
let header_data = data.drain_to(length - 1).as_ref().to_owned();
|
||||
let header_id = data.split_to(1).as_ref()[0];
|
||||
let header_data = data.split_to(length - 1).as_ref().to_owned();
|
||||
|
||||
self.state = ChannelState::Header(data);
|
||||
|
||||
|
@ -139,7 +139,7 @@ impl Stream for Channel {
|
|||
}
|
||||
|
||||
impl Stream for ChannelData {
|
||||
type Item = EasyBuf;
|
||||
type Item = Bytes;
|
||||
type Error = ChannelError;
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
|
||||
use byteorder::{BigEndian, ByteOrder};
|
||||
use bytes::{Bytes, BytesMut, BufMut};
|
||||
use shannon::Shannon;
|
||||
use std::io;
|
||||
use tokio_core::io::{Codec, EasyBuf};
|
||||
use tokio_io::codec::{Decoder, Encoder};
|
||||
|
||||
const HEADER_SIZE: usize = 3;
|
||||
const MAC_SIZE: usize = 4;
|
||||
|
@ -34,16 +35,17 @@ impl APCodec {
|
|||
}
|
||||
}
|
||||
|
||||
impl Codec for APCodec {
|
||||
type Out = (u8, Vec<u8>);
|
||||
type In = (u8, EasyBuf);
|
||||
impl Encoder for APCodec {
|
||||
type Item = (u8, Vec<u8>);
|
||||
type Error = io::Error;
|
||||
|
||||
fn encode(&mut self, item: (u8, Vec<u8>), buf: &mut Vec<u8>) -> io::Result<()> {
|
||||
fn encode(&mut self, item: (u8, Vec<u8>), buf: &mut BytesMut) -> io::Result<()> {
|
||||
let (cmd, payload) = item;
|
||||
let offset = buf.len();
|
||||
|
||||
buf.write_u8(cmd).unwrap();
|
||||
buf.write_u16::<BigEndian>(payload.len() as u16).unwrap();
|
||||
buf.reserve(3 + payload.len());
|
||||
buf.put_u8(cmd);
|
||||
buf.put_u16::<BigEndian>(payload.len() as u16);
|
||||
buf.extend_from_slice(&payload);
|
||||
|
||||
self.encode_cipher.nonce_u32(self.encode_nonce);
|
||||
|
@ -57,12 +59,17 @@ impl Codec for APCodec {
|
|||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn decode(&mut self, buf: &mut EasyBuf) -> io::Result<Option<(u8, EasyBuf)>> {
|
||||
impl Decoder for APCodec {
|
||||
type Item = (u8, Bytes);
|
||||
type Error = io::Error;
|
||||
|
||||
fn decode(&mut self, buf: &mut BytesMut) -> io::Result<Option<(u8, Bytes)>> {
|
||||
if let DecodeState::Header = self.decode_state {
|
||||
if buf.len() >= HEADER_SIZE {
|
||||
let mut header = [0u8; HEADER_SIZE];
|
||||
header.copy_from_slice(buf.drain_to(HEADER_SIZE).as_slice());
|
||||
header.copy_from_slice(buf.split_to(HEADER_SIZE).as_ref());
|
||||
|
||||
self.decode_cipher.nonce_u32(self.decode_nonce);
|
||||
self.decode_nonce += 1;
|
||||
|
@ -79,13 +86,13 @@ impl Codec for APCodec {
|
|||
if buf.len() >= size + MAC_SIZE {
|
||||
self.decode_state = DecodeState::Header;
|
||||
|
||||
let mut payload = buf.drain_to(size + MAC_SIZE);
|
||||
let mut payload = buf.split_to(size + MAC_SIZE);
|
||||
|
||||
self.decode_cipher.decrypt(&mut payload.get_mut()[..size]);
|
||||
self.decode_cipher.decrypt(&mut payload.get_mut(..size).unwrap());
|
||||
let mac = payload.split_off(size);
|
||||
self.decode_cipher.check_mac(mac.as_slice())?;
|
||||
self.decode_cipher.check_mac(mac.as_ref())?;
|
||||
|
||||
return Ok(Some((cmd, payload)));
|
||||
return Ok(Some((cmd, payload.freeze())));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -3,9 +3,11 @@ use crypto::hmac::Hmac;
|
|||
use crypto::mac::Mac;use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
|
||||
use protobuf::{self, Message, MessageStatic};
|
||||
use rand::thread_rng;
|
||||
use std::io::{self, Read, Write};
|
||||
use std::io::{self, Read};
|
||||
use std::marker::PhantomData;
|
||||
use tokio_core::io::{Io, Framed, write_all, WriteAll, read_exact, ReadExact, Window};
|
||||
use tokio_io::{AsyncRead, AsyncWrite};
|
||||
use tokio_io::codec::Framed;
|
||||
use tokio_io::io::{write_all, WriteAll, read_exact, ReadExact, Window};
|
||||
use futures::{Poll, Async, Future};
|
||||
|
||||
use diffie_hellman::DHLocalKeys;
|
||||
|
@ -25,7 +27,7 @@ enum HandshakeState<T> {
|
|||
ClientResponse(Option<APCodec>, WriteAll<T, Vec<u8>>),
|
||||
}
|
||||
|
||||
pub fn handshake<T: Io>(connection: T) -> Handshake<T> {
|
||||
pub fn handshake<T: AsyncRead + AsyncWrite>(connection: T) -> Handshake<T> {
|
||||
let local_keys = DHLocalKeys::random(&mut thread_rng());
|
||||
let client_hello = client_hello(connection, local_keys.public_key());
|
||||
|
||||
|
@ -35,7 +37,7 @@ pub fn handshake<T: Io>(connection: T) -> Handshake<T> {
|
|||
}
|
||||
}
|
||||
|
||||
impl <T: Io> Future for Handshake<T> {
|
||||
impl <T: AsyncRead + AsyncWrite> Future for Handshake<T> {
|
||||
type Item = Framed<T, APCodec>;
|
||||
type Error = io::Error;
|
||||
|
||||
|
@ -78,7 +80,7 @@ impl <T: Io> Future for Handshake<T> {
|
|||
}
|
||||
}
|
||||
|
||||
fn client_hello<T: Write>(connection: T, gc: Vec<u8>) -> WriteAll<T, Vec<u8>> {
|
||||
fn client_hello<T: AsyncWrite>(connection: T, gc: Vec<u8>) -> WriteAll<T, Vec<u8>> {
|
||||
let packet = protobuf_init!(ClientHello::new(), {
|
||||
build_info => {
|
||||
product: protocol::keyexchange::Product::PRODUCT_PARTNER,
|
||||
|
@ -104,7 +106,7 @@ fn client_hello<T: Write>(connection: T, gc: Vec<u8>) -> WriteAll<T, Vec<u8>> {
|
|||
write_all(connection, buffer)
|
||||
}
|
||||
|
||||
fn client_response<T: Write>(connection: T, challenge: Vec<u8>) -> WriteAll<T, Vec<u8>> {
|
||||
fn client_response<T: AsyncWrite>(connection: T, challenge: Vec<u8>) -> WriteAll<T, Vec<u8>> {
|
||||
let packet = protobuf_init!(ClientResponsePlaintext::new(), {
|
||||
login_crypto_response.diffie_hellman => {
|
||||
hmac: challenge
|
||||
|
@ -126,14 +128,14 @@ enum RecvPacket<T, M: MessageStatic> {
|
|||
Body(ReadExact<T, Window<Vec<u8>>>, PhantomData<M>),
|
||||
}
|
||||
|
||||
fn recv_packet<T, M>(connection: T, acc: Vec<u8>) -> RecvPacket<T, M>
|
||||
fn recv_packet<T: AsyncRead, M>(connection: T, acc: Vec<u8>) -> RecvPacket<T, M>
|
||||
where T: Read,
|
||||
M: MessageStatic
|
||||
{
|
||||
RecvPacket::Header(read_into_accumulator(connection, 4, acc), PhantomData)
|
||||
}
|
||||
|
||||
impl <T, M> Future for RecvPacket<T, M>
|
||||
impl <T: AsyncRead, M> Future for RecvPacket<T, M>
|
||||
where T: Read,
|
||||
M: MessageStatic
|
||||
{
|
||||
|
@ -165,7 +167,7 @@ impl <T, M> Future for RecvPacket<T, M>
|
|||
}
|
||||
}
|
||||
|
||||
fn read_into_accumulator<T: Read>(connection: T, size: usize, mut acc: Vec<u8>) -> ReadExact<T, Window<Vec<u8>>> {
|
||||
fn read_into_accumulator<T: AsyncRead>(connection: T, size: usize, mut acc: Vec<u8>) -> ReadExact<T, Window<Vec<u8>>> {
|
||||
let offset = acc.len();
|
||||
acc.resize(offset + size, 0);
|
||||
|
||||
|
|
|
@ -4,12 +4,12 @@ mod handshake;
|
|||
pub use self::codec::APCodec;
|
||||
pub use self::handshake::handshake;
|
||||
|
||||
use futures::{Future, Sink, Stream, BoxFuture};
|
||||
use futures::{Future, Sink, Stream};
|
||||
use std::io;
|
||||
use std::net::ToSocketAddrs;
|
||||
use tokio_core::net::TcpStream;
|
||||
use tokio_core::reactor::Handle;
|
||||
use tokio_core::io::Framed;
|
||||
use tokio_io::codec::Framed;
|
||||
use protobuf::{self, Message};
|
||||
|
||||
use authentication::Credentials;
|
||||
|
@ -17,18 +17,18 @@ use version;
|
|||
|
||||
pub type Transport = Framed<TcpStream, APCodec>;
|
||||
|
||||
pub fn connect<A: ToSocketAddrs>(addr: A, handle: &Handle) -> BoxFuture<Transport, io::Error> {
|
||||
pub fn connect<A: ToSocketAddrs>(addr: A, handle: &Handle) -> Box<Future<Item = Transport, Error = io::Error>> {
|
||||
let addr = addr.to_socket_addrs().unwrap().next().unwrap();
|
||||
let socket = TcpStream::connect(&addr, handle);
|
||||
let connection = socket.and_then(|socket| {
|
||||
handshake(socket)
|
||||
});
|
||||
|
||||
connection.boxed()
|
||||
Box::new(connection)
|
||||
}
|
||||
|
||||
pub fn authenticate(transport: Transport, credentials: Credentials, device_id: String)
|
||||
-> BoxFuture<(Transport, Credentials), io::Error>
|
||||
-> Box<Future<Item = (Transport, Credentials), Error = io::Error>>
|
||||
{
|
||||
use protocol::authentication::{APWelcome, ClientResponseEncrypted, CpuFamily, Os};
|
||||
|
||||
|
@ -50,7 +50,7 @@ pub fn authenticate(transport: Transport, credentials: Credentials, device_id: S
|
|||
let cmd = 0xab;
|
||||
let data = packet.write_to_bytes().unwrap();
|
||||
|
||||
transport.send((cmd, data)).and_then(|transport| {
|
||||
Box::new(transport.send((cmd, data)).and_then(|transport| {
|
||||
transport.into_future().map_err(|(err, _stream)| err)
|
||||
}).and_then(|(packet, transport)| {
|
||||
match packet {
|
||||
|
@ -71,5 +71,5 @@ pub fn authenticate(transport: Transport, credentials: Credentials, device_id: S
|
|||
Some((cmd, _)) => panic!("Unexpected packet {:?}", cmd),
|
||||
None => panic!("EOF"),
|
||||
}
|
||||
}).boxed()
|
||||
}))
|
||||
}
|
||||
|
|
|
@ -1,8 +1,5 @@
|
|||
#![cfg_attr(feature = "cargo-clippy", allow(unused_io_amount))]
|
||||
|
||||
// TODO: many items from tokio-core::io have been deprecated in favour of tokio-io
|
||||
#![allow(deprecated)]
|
||||
|
||||
#[macro_use] extern crate error_chain;
|
||||
#[macro_use] extern crate futures;
|
||||
#[macro_use] extern crate lazy_static;
|
||||
|
@ -11,6 +8,7 @@
|
|||
|
||||
extern crate base64;
|
||||
extern crate byteorder;
|
||||
extern crate bytes;
|
||||
extern crate crypto;
|
||||
extern crate hyper;
|
||||
extern crate num_bigint;
|
||||
|
@ -23,6 +21,7 @@ extern crate serde;
|
|||
extern crate serde_json;
|
||||
extern crate shannon;
|
||||
extern crate tokio_core;
|
||||
extern crate tokio_io;
|
||||
extern crate uuid;
|
||||
|
||||
extern crate librespot_protocol as protocol;
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
use byteorder::{BigEndian, ByteOrder};
|
||||
use bytes::Bytes;
|
||||
use futures::sync::{oneshot, mpsc};
|
||||
use futures::{Async, Poll, BoxFuture, Future};
|
||||
use futures::{Async, Poll, Future};
|
||||
use protobuf;
|
||||
use protocol;
|
||||
use std::collections::HashMap;
|
||||
use std::mem;
|
||||
use tokio_core::io::EasyBuf;
|
||||
|
||||
use util::SeqGenerator;
|
||||
|
||||
|
@ -99,7 +99,7 @@ impl MercuryManager {
|
|||
}
|
||||
|
||||
pub fn subscribe<T: Into<String>>(&self, uri: T)
|
||||
-> BoxFuture<mpsc::UnboundedReceiver<MercuryResponse>, MercuryError>
|
||||
-> Box<Future<Item = mpsc::UnboundedReceiver<MercuryResponse>, Error = MercuryError>>
|
||||
{
|
||||
let uri = uri.into();
|
||||
let request = self.request(MercuryRequest {
|
||||
|
@ -110,7 +110,7 @@ impl MercuryManager {
|
|||
});
|
||||
|
||||
let manager = self.clone();
|
||||
request.map(move |response| {
|
||||
Box::new(request.map(move |response| {
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
|
||||
manager.lock(move |inner| {
|
||||
|
@ -133,15 +133,15 @@ impl MercuryManager {
|
|||
});
|
||||
|
||||
rx
|
||||
}).boxed()
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn dispatch(&self, cmd: u8, mut data: EasyBuf) {
|
||||
let seq_len = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize;
|
||||
let seq = data.drain_to(seq_len).as_ref().to_owned();
|
||||
pub fn dispatch(&self, cmd: u8, mut data: Bytes) {
|
||||
let seq_len = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
|
||||
let seq = data.split_to(seq_len).as_ref().to_owned();
|
||||
|
||||
let flags = data.drain_to(1).as_ref()[0];
|
||||
let count = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize;
|
||||
let flags = data.split_to(1).as_ref()[0];
|
||||
let count = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
|
||||
|
||||
let pending = self.lock(|inner| inner.pending.remove(&seq));
|
||||
|
||||
|
@ -181,9 +181,9 @@ impl MercuryManager {
|
|||
}
|
||||
}
|
||||
|
||||
fn parse_part(data: &mut EasyBuf) -> Vec<u8> {
|
||||
let size = BigEndian::read_u16(data.drain_to(2).as_ref()) as usize;
|
||||
data.drain_to(size).as_ref().to_owned()
|
||||
fn parse_part(data: &mut Bytes) -> Vec<u8> {
|
||||
let size = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
|
||||
data.split_to(size).as_ref().to_owned()
|
||||
}
|
||||
|
||||
fn complete_request(&self, cmd: u8, mut pending: MercuryPending) {
|
||||
|
@ -199,7 +199,7 @@ impl MercuryManager {
|
|||
if response.status_code >= 400 {
|
||||
warn!("error {} for uri {}", response.status_code, &response.uri);
|
||||
if let Some(cb) = pending.callback {
|
||||
cb.complete(Err(MercuryError));
|
||||
let _ = cb.send(Err(MercuryError));
|
||||
}
|
||||
} else {
|
||||
if cmd == 0xb5 {
|
||||
|
@ -211,7 +211,7 @@ impl MercuryManager {
|
|||
|
||||
// if send fails, remove from list of subs
|
||||
// TODO: send unsub message
|
||||
sub.send(response.clone()).is_ok()
|
||||
sub.unbounded_send(response.clone()).is_ok()
|
||||
} else {
|
||||
// URI doesn't match
|
||||
true
|
||||
|
@ -223,7 +223,7 @@ impl MercuryManager {
|
|||
}
|
||||
})
|
||||
} else if let Some(cb) = pending.callback {
|
||||
cb.complete(Ok(response));
|
||||
let _ = cb.send(Ok(response));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,10 +1,10 @@
|
|||
use bytes::Bytes;
|
||||
use crypto::digest::Digest;
|
||||
use crypto::sha1::Sha1;
|
||||
use futures::sync::mpsc;
|
||||
use futures::{Future, Stream, BoxFuture, IntoFuture, Poll, Async};
|
||||
use futures::{Future, Stream, IntoFuture, Poll, Async};
|
||||
use std::io;
|
||||
use std::sync::{RwLock, Arc, Weak};
|
||||
use tokio_core::io::EasyBuf;
|
||||
use tokio_core::reactor::{Handle, Remote};
|
||||
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT, Ordering};
|
||||
|
||||
|
@ -90,7 +90,7 @@ impl Session {
|
|||
|
||||
fn create(handle: &Handle, transport: connection::Transport,
|
||||
config: SessionConfig, cache: Option<Cache>, username: String)
|
||||
-> (Session, BoxFuture<(), io::Error>)
|
||||
-> (Session, Box<Future<Item = (), Error = io::Error>>)
|
||||
{
|
||||
let (sink, stream) = transport.split();
|
||||
|
||||
|
@ -124,8 +124,8 @@ impl Session {
|
|||
.forward(sink).map(|_| ());
|
||||
let receiver_task = DispatchTask(stream, session.weak());
|
||||
|
||||
let task = (receiver_task, sender_task).into_future()
|
||||
.map(|((), ())| ()).boxed();
|
||||
let task = Box::new((receiver_task, sender_task).into_future()
|
||||
.map(|((), ())| ()));
|
||||
|
||||
(session, task)
|
||||
}
|
||||
|
@ -156,7 +156,7 @@ impl Session {
|
|||
}
|
||||
|
||||
#[cfg_attr(feature = "cargo-clippy", allow(match_same_arms))]
|
||||
fn dispatch(&self, cmd: u8, data: EasyBuf) {
|
||||
fn dispatch(&self, cmd: u8, data: Bytes) {
|
||||
match cmd {
|
||||
0x4 => {
|
||||
self.debug_info();
|
||||
|
@ -177,7 +177,7 @@ impl Session {
|
|||
}
|
||||
|
||||
pub fn send_packet(&self, cmd: u8, data: Vec<u8>) {
|
||||
self.0.tx_connection.send((cmd, data)).unwrap();
|
||||
self.0.tx_connection.unbounded_send((cmd, data)).unwrap();
|
||||
}
|
||||
|
||||
pub fn cache(&self) -> Option<&Arc<Cache>> {
|
||||
|
@ -229,10 +229,10 @@ impl Drop for SessionInternal {
|
|||
}
|
||||
|
||||
struct DispatchTask<S>(S, SessionWeak)
|
||||
where S: Stream<Item = (u8, EasyBuf)>;
|
||||
where S: Stream<Item = (u8, Bytes)>;
|
||||
|
||||
impl <S> Future for DispatchTask<S>
|
||||
where S: Stream<Item = (u8, EasyBuf)>
|
||||
where S: Stream<Item = (u8, Bytes)>
|
||||
{
|
||||
type Item = ();
|
||||
type Error = S::Error;
|
||||
|
@ -253,7 +253,7 @@ impl <S> Future for DispatchTask<S>
|
|||
}
|
||||
|
||||
impl <S> Drop for DispatchTask<S>
|
||||
where S: Stream<Item = (u8, EasyBuf)>
|
||||
where S: Stream<Item = (u8, Bytes)>
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
debug!("drop Dispatch");
|
||||
|
|
|
@ -2,6 +2,8 @@ use std;
|
|||
use std::fmt;
|
||||
use util::u128;
|
||||
use byteorder::{BigEndian, ByteOrder};
|
||||
// Unneeded since 1.21
|
||||
#[allow(unused_imports)]
|
||||
use std::ascii::AsciiExt;
|
||||
|
||||
#[derive(Debug,Copy,Clone,PartialEq,Eq,Hash)]
|
||||
|
|
|
@ -8,7 +8,7 @@ extern crate librespot_protocol as protocol;
|
|||
|
||||
pub mod cover;
|
||||
|
||||
use futures::{Future, BoxFuture};
|
||||
use futures::Future;
|
||||
use linear_map::LinearMap;
|
||||
|
||||
use core::mercury::MercuryError;
|
||||
|
@ -57,17 +57,17 @@ pub trait Metadata : Send + Sized + 'static {
|
|||
fn base_url() -> &'static str;
|
||||
fn parse(msg: &Self::Message, session: &Session) -> Self;
|
||||
|
||||
fn get(session: &Session, id: SpotifyId) -> BoxFuture<Self, MercuryError> {
|
||||
fn get(session: &Session, id: SpotifyId) -> Box<Future<Item = Self, Error = MercuryError>> {
|
||||
let uri = format!("{}/{}", Self::base_url(), id.to_base16());
|
||||
let request = session.mercury().get(uri);
|
||||
|
||||
let session = session.clone();
|
||||
request.and_then(move |response| {
|
||||
Box::new(request.and_then(move |response| {
|
||||
let data = response.payload.first().expect("Empty payload");
|
||||
let msg: Self::Message = protobuf::parse_from_bytes(data).unwrap();
|
||||
|
||||
Ok(Self::parse(&msg, &session))
|
||||
}).boxed()
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -3,8 +3,8 @@ use crypto::digest::Digest;
|
|||
use crypto::mac::Mac;
|
||||
use crypto;
|
||||
use futures::sync::mpsc;
|
||||
use futures::{Future, Stream, BoxFuture, Poll, Async};
|
||||
use hyper::server::{Service, NewService, Request, Response, Http};
|
||||
use futures::{Future, Stream, Poll};
|
||||
use hyper::server::{Service, Request, Response, Http};
|
||||
use hyper::{self, Get, Post, StatusCode};
|
||||
use mdns;
|
||||
use num_bigint::BigUint;
|
||||
|
@ -12,7 +12,6 @@ use rand;
|
|||
use std::collections::BTreeMap;
|
||||
use std::io;
|
||||
use std::sync::Arc;
|
||||
use tokio_core::net::TcpListener;
|
||||
use tokio_core::reactor::Handle;
|
||||
use url;
|
||||
|
||||
|
@ -32,7 +31,7 @@ struct DiscoveryInner {
|
|||
}
|
||||
|
||||
impl Discovery {
|
||||
pub fn new(config: ConnectConfig, device_id: String)
|
||||
fn new(config: ConnectConfig, device_id: String)
|
||||
-> (Discovery, mpsc::UnboundedReceiver<Credentials>)
|
||||
{
|
||||
let (tx, rx) = mpsc::unbounded();
|
||||
|
@ -136,7 +135,7 @@ impl Discovery {
|
|||
|
||||
let credentials = Credentials::with_blob(username.to_owned(), &decrypted, &self.0.device_id);
|
||||
|
||||
self.0.tx.send(credentials).unwrap();
|
||||
self.0.tx.unbounded_send(credentials).unwrap();
|
||||
|
||||
let result = json!({
|
||||
"status": 101,
|
||||
|
@ -159,7 +158,7 @@ impl Service for Discovery {
|
|||
type Request = Request;
|
||||
type Response = Response;
|
||||
type Error = hyper::Error;
|
||||
type Future = BoxFuture<Response, hyper::Error>;
|
||||
type Future = Box<Future<Item = Response, Error = hyper::Error>>;
|
||||
|
||||
fn call(&self, request: Request) -> Self::Future {
|
||||
let mut params = BTreeMap::new();
|
||||
|
@ -174,7 +173,7 @@ impl Service for Discovery {
|
|||
}
|
||||
|
||||
let this = self.clone();
|
||||
body.fold(Vec::new(), |mut acc, chunk| {
|
||||
Box::new(body.fold(Vec::new(), |mut acc, chunk| {
|
||||
acc.extend_from_slice(chunk.as_ref());
|
||||
Ok::<_, hyper::Error>(acc)
|
||||
}).map(move |body| {
|
||||
|
@ -186,25 +185,13 @@ impl Service for Discovery {
|
|||
(Post, Some("addUser")) => this.handle_add_user(¶ms),
|
||||
_ => this.not_found(),
|
||||
}
|
||||
}).boxed()
|
||||
}
|
||||
}
|
||||
|
||||
impl NewService for Discovery {
|
||||
type Request = Request;
|
||||
type Response = Response;
|
||||
type Error = hyper::Error;
|
||||
type Instance = Self;
|
||||
|
||||
fn new_service(&self) -> io::Result<Self::Instance> {
|
||||
Ok(self.clone())
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DiscoveryStream {
|
||||
credentials: mpsc::UnboundedReceiver<Credentials>,
|
||||
_svc: mdns::Service,
|
||||
task: Box<Future<Item=(), Error=io::Error>>,
|
||||
}
|
||||
|
||||
pub fn discovery(handle: &Handle, config: ConnectConfig, device_id: String)
|
||||
|
@ -212,15 +199,20 @@ pub fn discovery(handle: &Handle, config: ConnectConfig, device_id: String)
|
|||
{
|
||||
let (discovery, creds_rx) = Discovery::new(config.clone(), device_id);
|
||||
|
||||
let listener = TcpListener::bind(&"0.0.0.0:0".parse().unwrap(), handle)?;
|
||||
let addr = listener.local_addr()?;
|
||||
|
||||
let http = Http::new();
|
||||
let handle_ = handle.clone();
|
||||
let task = Box::new(listener.incoming().for_each(move |(socket, addr)| {
|
||||
http.bind_connection(&handle_, socket, addr, discovery.clone());
|
||||
Ok(())
|
||||
}));
|
||||
let serve = {
|
||||
let http = Http::new();
|
||||
http.serve_addr_handle(&"0.0.0.0:0".parse().unwrap(), &handle, move || Ok(discovery.clone())).unwrap()
|
||||
};
|
||||
let addr = serve.incoming_ref().local_addr();
|
||||
let server_future = {
|
||||
let handle = handle.clone();
|
||||
serve.for_each(move |connection| {
|
||||
handle.spawn(connection.then(|_| Ok(())));
|
||||
Ok(())
|
||||
})
|
||||
.then(|_| Ok(()))
|
||||
};
|
||||
handle.spawn(server_future);
|
||||
|
||||
let responder = mdns::Responder::spawn(&handle)?;
|
||||
let svc = responder.register(
|
||||
|
@ -232,20 +224,14 @@ pub fn discovery(handle: &Handle, config: ConnectConfig, device_id: String)
|
|||
Ok(DiscoveryStream {
|
||||
credentials: creds_rx,
|
||||
_svc: svc,
|
||||
task: task,
|
||||
})
|
||||
}
|
||||
|
||||
impl Stream for DiscoveryStream {
|
||||
type Item = Credentials;
|
||||
type Error = io::Error;
|
||||
type Error = ();
|
||||
|
||||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
|
||||
match self.task.poll()? {
|
||||
Async::Ready(()) => unreachable!(),
|
||||
Async::NotReady => (),
|
||||
}
|
||||
|
||||
Ok(self.credentials.poll().unwrap())
|
||||
self.credentials.poll()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use futures::{Future, BoxFuture};
|
||||
use futures::Future;
|
||||
use serde_json;
|
||||
|
||||
use core::mercury::MercuryError;
|
||||
|
@ -13,14 +13,14 @@ pub struct Token {
|
|||
pub scope: Vec<String>,
|
||||
}
|
||||
|
||||
pub fn get_token(session: &Session, client_id: &str, scopes: &str) -> BoxFuture<Token, MercuryError> {
|
||||
pub fn get_token(session: &Session, client_id: &str, scopes: &str) -> Box<Future<Item = Token, Error = MercuryError>> {
|
||||
let url = format!("hm://keymaster/token/authenticated?client_id={}&scope={}",
|
||||
client_id, scopes);
|
||||
session.mercury().get(url).map(move |response| {
|
||||
Box::new(session.mercury().get(url).map(move |response| {
|
||||
let data = response.payload.first().expect("Empty payload");
|
||||
let data = String::from_utf8(data.clone()).unwrap();
|
||||
let token : Token = serde_json::from_str(&data).unwrap();
|
||||
|
||||
token
|
||||
}).boxed()
|
||||
}))
|
||||
}
|
||||
|
|
|
@ -2,9 +2,6 @@
|
|||
|
||||
#![cfg_attr(feature = "cargo-clippy", allow(unused_io_amount))]
|
||||
|
||||
// TODO: many items from tokio-core::io have been deprecated in favour of tokio-io
|
||||
#![allow(deprecated)]
|
||||
|
||||
#[macro_use] extern crate log;
|
||||
#[macro_use] extern crate serde_json;
|
||||
#[macro_use] extern crate serde_derive;
|
||||
|
|
|
@ -1,12 +1,10 @@
|
|||
// TODO: many items from tokio-core::io have been deprecated in favour of tokio-io
|
||||
#![allow(deprecated)]
|
||||
|
||||
#[macro_use] extern crate log;
|
||||
extern crate env_logger;
|
||||
extern crate futures;
|
||||
extern crate getopts;
|
||||
extern crate librespot;
|
||||
extern crate tokio_core;
|
||||
extern crate tokio_io;
|
||||
extern crate tokio_signal;
|
||||
|
||||
use env_logger::LogBuilder;
|
||||
|
@ -17,7 +15,7 @@ use std::path::PathBuf;
|
|||
use std::process::exit;
|
||||
use std::str::FromStr;
|
||||
use tokio_core::reactor::{Handle, Core};
|
||||
use tokio_core::io::IoStream;
|
||||
use tokio_io::IoStream;
|
||||
use std::mem;
|
||||
|
||||
use librespot::core::authentication::{get_credentials, Credentials};
|
||||
|
@ -264,7 +262,7 @@ impl Main {
|
|||
spirc: None,
|
||||
spirc_task: None,
|
||||
shutdown: false,
|
||||
signal: tokio_signal::ctrl_c(&handle).flatten_stream().boxed(),
|
||||
signal: Box::new(tokio_signal::ctrl_c(&handle).flatten_stream()),
|
||||
};
|
||||
|
||||
if setup.enable_discovery {
|
||||
|
|
|
@ -155,7 +155,7 @@ impl PlayerState {
|
|||
match self {
|
||||
Paused { end_of_track, .. } |
|
||||
Playing { end_of_track, .. } => {
|
||||
end_of_track.complete(())
|
||||
let _ = end_of_track.send(());
|
||||
}
|
||||
|
||||
Stopped => warn!("signal_end_of_track from stopped state"),
|
||||
|
@ -313,7 +313,7 @@ impl PlayerInternal {
|
|||
}
|
||||
|
||||
None => {
|
||||
end_of_track.complete(());
|
||||
let _ = end_of_track.send(());
|
||||
if self.state.is_playing() {
|
||||
self.run_onstop();
|
||||
}
|
||||
|
|
36
src/spirc.rs
36
src/spirc.rs
|
@ -1,8 +1,6 @@
|
|||
use futures::future;
|
||||
use futures::sink::BoxSink;
|
||||
use futures::stream::BoxStream;
|
||||
use futures::sync::{oneshot, mpsc};
|
||||
use futures::{Future, Stream, Sink, Async, Poll, BoxFuture};
|
||||
use futures::{Future, Stream, Sink, Async, Poll};
|
||||
use protobuf::{self, Message};
|
||||
|
||||
use core::config::ConnectConfig;
|
||||
|
@ -30,10 +28,10 @@ pub struct SpircTask {
|
|||
device: DeviceState,
|
||||
state: State,
|
||||
|
||||
subscription: BoxStream<Frame, MercuryError>,
|
||||
sender: BoxSink<Frame, MercuryError>,
|
||||
subscription: Box<Stream<Item = Frame, Error = MercuryError>>,
|
||||
sender: Box<Sink<SinkItem = Frame, SinkError = MercuryError>>,
|
||||
commands: mpsc::UnboundedReceiver<SpircCommand>,
|
||||
end_of_track: BoxFuture<(), oneshot::Canceled>,
|
||||
end_of_track: Box<Future<Item = (), Error = oneshot::Canceled>>,
|
||||
|
||||
shutdown: bool,
|
||||
session: Session,
|
||||
|
@ -134,10 +132,10 @@ impl Spirc {
|
|||
|
||||
let subscription = session.mercury().subscribe(&uri as &str);
|
||||
let subscription = subscription.map(|stream| stream.map_err(|_| MercuryError)).flatten_stream();
|
||||
let subscription = subscription.map(|response| -> Frame {
|
||||
let subscription = Box::new(subscription.map(|response| -> Frame {
|
||||
let data = response.payload.first().unwrap();
|
||||
protobuf::parse_from_bytes(data).unwrap()
|
||||
}).boxed();
|
||||
}));
|
||||
|
||||
let sender = Box::new(session.mercury().sender(uri).with(|frame: Frame| {
|
||||
Ok(frame.write_to_bytes().unwrap())
|
||||
|
@ -163,7 +161,7 @@ impl Spirc {
|
|||
subscription: subscription,
|
||||
sender: sender,
|
||||
commands: cmd_rx,
|
||||
end_of_track: future::empty().boxed(),
|
||||
end_of_track: Box::new(future::empty()),
|
||||
|
||||
shutdown: false,
|
||||
session: session.clone(),
|
||||
|
@ -179,28 +177,28 @@ impl Spirc {
|
|||
}
|
||||
|
||||
pub fn play(&self) {
|
||||
let _ = mpsc::UnboundedSender::send(&self.commands, SpircCommand::Play);
|
||||
let _ = self.commands.unbounded_send(SpircCommand::Play);
|
||||
}
|
||||
pub fn play_pause(&self) {
|
||||
let _ = mpsc::UnboundedSender::send(&self.commands, SpircCommand::PlayPause);
|
||||
let _ = self.commands.unbounded_send(SpircCommand::PlayPause);
|
||||
}
|
||||
pub fn pause(&self) {
|
||||
let _ = mpsc::UnboundedSender::send(&self.commands, SpircCommand::Pause);
|
||||
let _ = self.commands.unbounded_send(SpircCommand::Pause);
|
||||
}
|
||||
pub fn prev(&self) {
|
||||
let _ = mpsc::UnboundedSender::send(&self.commands, SpircCommand::Prev);
|
||||
let _ = self.commands.unbounded_send(SpircCommand::Prev);
|
||||
}
|
||||
pub fn next(&self) {
|
||||
let _ = mpsc::UnboundedSender::send(&self.commands, SpircCommand::Next);
|
||||
let _ = self.commands.unbounded_send(SpircCommand::Next);
|
||||
}
|
||||
pub fn volume_up(&self) {
|
||||
let _ = mpsc::UnboundedSender::send(&self.commands, SpircCommand::VolumeUp);
|
||||
let _ = self.commands.unbounded_send(SpircCommand::VolumeUp);
|
||||
}
|
||||
pub fn volume_down(&self) {
|
||||
let _ = mpsc::UnboundedSender::send(&self.commands, SpircCommand::VolumeDown);
|
||||
let _ = self.commands.unbounded_send(SpircCommand::VolumeDown);
|
||||
}
|
||||
pub fn shutdown(&self) {
|
||||
let _ = mpsc::UnboundedSender::send(&self.commands, SpircCommand::Shutdown);
|
||||
let _ = self.commands.unbounded_send(SpircCommand::Shutdown);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -238,7 +236,7 @@ impl Future for SpircTask {
|
|||
}
|
||||
Ok(Async::NotReady) => (),
|
||||
Err(oneshot::Canceled) => {
|
||||
self.end_of_track = future::empty().boxed()
|
||||
self.end_of_track = Box::new(future::empty())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -587,7 +585,7 @@ impl SpircTask {
|
|||
self.state.set_status(PlayStatus::kPlayStatusPause);
|
||||
}
|
||||
|
||||
self.end_of_track = end_of_track.boxed();
|
||||
self.end_of_track = Box::new(end_of_track);
|
||||
}
|
||||
|
||||
fn hello(&mut self) {
|
||||
|
|
Loading…
Reference in a new issue