Clean up dependencies of librespot-core

* Use sub-crates of future
* Remove unnecessary pin-project
* Removed unused crates and features
* Replace futures channels by tokio channels
* Use serde's "derive" feature flag instead of serde_derive
This commit is contained in:
johannesd3 2021-02-10 22:54:35 +01:00 committed by Johannesd3
parent 8cff10e983
commit 10827bd6a8
12 changed files with 81 additions and 57 deletions

20
Cargo.lock generated
View file

@ -1453,7 +1453,9 @@ dependencies = [
"cfg-if 1.0.0", "cfg-if 1.0.0",
"env_logger", "env_logger",
"error-chain", "error-chain",
"futures", "futures-core",
"futures-sink",
"futures-util",
"hmac", "hmac",
"httparse", "httparse",
"hyper", "hyper",
@ -1464,15 +1466,14 @@ dependencies = [
"num-traits", "num-traits",
"once_cell", "once_cell",
"pbkdf2", "pbkdf2",
"pin-project-lite",
"protobuf", "protobuf",
"rand 0.7.3", "rand 0.8.3",
"serde", "serde",
"serde_derive",
"serde_json", "serde_json",
"sha-1 0.9.4", "sha-1 0.9.4",
"shannon", "shannon",
"tokio", "tokio",
"tokio-stream",
"tokio-util", "tokio-util",
"url 1.7.2", "url 1.7.2",
"vergen", "vergen",
@ -2819,6 +2820,17 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "tokio-stream"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1981ad97df782ab506a1f43bf82c967326960d278acf3bf8279809648c3ff3ea"
dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
]
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.6.3" version = "0.6.3"

View file

@ -19,7 +19,9 @@ byteorder = "1.4"
bytes = "1.0" bytes = "1.0"
cfg-if = "1" cfg-if = "1"
error-chain = { version = "0.12", default-features = false } error-chain = { version = "0.12", default-features = false }
futures = { version = "0.3", features = ["bilock", "unstable"] } futures-core = { version = "0.3", default-features = false }
futures-sink = { version = "0.3", default-features = false }
futures-util = { version = "0.3", default-features = false, features = ["alloc", "bilock", "unstable", "sink"] }
hmac = "0.10" hmac = "0.10"
httparse = "1.3" httparse = "1.3"
hyper = { version = "0.14", optional = true, features = ["client", "tcp", "http1"] } hyper = { version = "0.14", optional = true, features = ["client", "tcp", "http1"] }
@ -28,21 +30,20 @@ num-bigint = "0.3"
num-integer = "0.1" num-integer = "0.1"
num-traits = "0.2" num-traits = "0.2"
once_cell = "1.5.2" once_cell = "1.5.2"
pbkdf2 = { version = "0.7", default_features = false, features = ["hmac"] } pbkdf2 = { version = "0.7", default-features = false, features = ["hmac"] }
pin-project-lite = "0.2.4"
protobuf = "~2.14.0" protobuf = "~2.14.0"
rand = "0.7" rand = "0.8"
serde = "1.0" serde = { version = "1.0", features = ["derive"] }
serde_derive = "1.0"
serde_json = "1.0" serde_json = "1.0"
sha-1 = "0.9" sha-1 = "0.9"
shannon = "0.2.0" shannon = "0.2.0"
tokio = { version = "1.0", features = ["io-util", "rt-multi-thread"] } tokio = { version = "1.0", features = ["io-util", "net", "rt", "sync"] }
tokio-stream = "0.1"
tokio-util = { version = "0.6", features = ["codec"] } tokio-util = { version = "0.6", features = ["codec"] }
url = "1.7" url = "1.7"
[build-dependencies] [build-dependencies]
rand = "0.7" rand = "0.8"
vergen = "3.0.4" vergen = "3.0.4"
[dev-dependencies] [dev-dependencies]

View file

@ -7,10 +7,10 @@ fn main() {
flags.toggle(ConstantsFlags::REBUILD_ON_HEAD_CHANGE); flags.toggle(ConstantsFlags::REBUILD_ON_HEAD_CHANGE);
generate_cargo_keys(ConstantsFlags::all()).expect("Unable to generate the cargo keys!"); generate_cargo_keys(ConstantsFlags::all()).expect("Unable to generate the cargo keys!");
let mut rng = rand::thread_rng(); let build_id: String = rand::thread_rng()
let build_id: String = ::std::iter::repeat(()) .sample_iter(Alphanumeric)
.map(|()| rng.sample(Alphanumeric))
.take(8) .take(8)
.map(char::from)
.collect(); .collect();
println!("cargo:rustc-env=VERGEN_BUILD_ID={}", build_id); println!("cargo:rustc-env=VERGEN_BUILD_ID={}", build_id);
} }

View file

@ -9,6 +9,7 @@ cfg_if! {
use std::error::Error; use std::error::Error;
use hyper::{Body, Client, Method, Request, Uri}; use hyper::{Body, Client, Method, Request, Uri};
use serde::{Serialize, Deserialize};
use crate::proxytunnel::ProxyTunnel; use crate::proxytunnel::ProxyTunnel;

View file

@ -1,8 +1,8 @@
use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use bytes::Bytes; use bytes::Bytes;
use futures::channel::oneshot;
use std::collections::HashMap; use std::collections::HashMap;
use std::io::Write; use std::io::Write;
use tokio::sync::oneshot;
use crate::spotify_id::{FileId, SpotifyId}; use crate::spotify_id::{FileId, SpotifyId};
use crate::util::SeqGenerator; use crate::util::SeqGenerator;

View file

@ -1,10 +1,13 @@
use std::io::{self, Read};
use std::ops::FnOnce;
use aes::Aes192; use aes::Aes192;
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use hmac::Hmac; use hmac::Hmac;
use pbkdf2::pbkdf2; use pbkdf2::pbkdf2;
use protobuf::ProtobufEnum; use protobuf::ProtobufEnum;
use serde::{Deserialize, Serialize};
use sha1::{Digest, Sha1}; use sha1::{Digest, Sha1};
use std::io::{self, Read};
use crate::protocol::authentication::AuthenticationType; use crate::protocol::authentication::AuthenticationType;
use crate::protocol::keyexchange::{APLoginFailed, ErrorCode}; use crate::protocol::keyexchange::{APLoginFailed, ErrorCode};

View file

@ -1,12 +1,14 @@
use std::collections::HashMap;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Instant;
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes; use bytes::Bytes;
use futures::{channel::mpsc, lock::BiLock, Stream, StreamExt}; use futures_core::Stream;
use std::{ use futures_util::lock::BiLock;
collections::HashMap, use futures_util::StreamExt;
pin::Pin, use tokio::sync::mpsc;
task::{Context, Poll},
time::Instant,
};
use crate::util::SeqGenerator; use crate::util::SeqGenerator;
@ -46,7 +48,7 @@ enum ChannelState {
impl ChannelManager { impl ChannelManager {
pub fn allocate(&self) -> (u16, Channel) { pub fn allocate(&self) -> (u16, Channel) {
let (tx, rx) = mpsc::unbounded(); let (tx, rx) = mpsc::unbounded_channel();
let seq = self.lock(|inner| { let seq = self.lock(|inner| {
let seq = inner.sequence.get(); let seq = inner.sequence.get();
@ -85,7 +87,7 @@ impl ChannelManager {
inner.download_measurement_bytes += data.len(); inner.download_measurement_bytes += data.len();
if let Entry::Occupied(entry) = inner.channels.entry(id) { if let Entry::Occupied(entry) = inner.channels.entry(id) {
let _ = entry.get().unbounded_send((cmd, data)); let _ = entry.get().send((cmd, data));
} }
}); });
} }
@ -105,7 +107,7 @@ impl ChannelManager {
impl Channel { impl Channel {
fn recv_packet(&mut self, cx: &mut Context<'_>) -> Poll<Result<Bytes, ChannelError>> { fn recv_packet(&mut self, cx: &mut Context<'_>) -> Poll<Result<Bytes, ChannelError>> {
let (cmd, packet) = match self.receiver.poll_next_unpin(cx) { let (cmd, packet) = match self.receiver.poll_recv(cx) {
Poll::Pending => return Poll::Pending, Poll::Pending => return Poll::Pending,
Poll::Ready(o) => o.ok_or(ChannelError)?, Poll::Ready(o) => o.ok_or(ChannelError)?,
}; };

View file

@ -4,7 +4,7 @@ mod handshake;
pub use self::codec::APCodec; pub use self::codec::APCodec;
pub use self::handshake::handshake; pub use self::handshake::handshake;
use futures::{SinkExt, StreamExt}; use futures_util::{SinkExt, StreamExt};
use protobuf::{self, Message}; use protobuf::{self, Message};
use std::io; use std::io;
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;

View file

@ -1,3 +1,5 @@
use serde::Deserialize;
use crate::{mercury::MercuryError, session::Session}; use crate::{mercury::MercuryError, session::Session};
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]

View file

@ -5,10 +5,6 @@ extern crate log;
#[macro_use] #[macro_use]
extern crate cfg_if; extern crate cfg_if;
#[macro_use] #[macro_use]
extern crate serde_derive;
#[macro_use]
extern crate pin_project_lite;
#[macro_use]
extern crate error_chain; extern crate error_chain;
use librespot_protocol as protocol; use librespot_protocol as protocol;

View file

@ -1,14 +1,17 @@
use std::collections::HashMap;
use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes;
use tokio::sync::{mpsc, oneshot};
use crate::protocol; use crate::protocol;
use crate::util::url_encode; use crate::util::url_encode;
use crate::util::SeqGenerator; use crate::util::SeqGenerator;
use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes;
use futures::{
channel::{mpsc, oneshot},
Future,
};
use std::{collections::HashMap, task::Poll};
use std::{mem, pin::Pin, task::Context};
mod types; mod types;
pub use self::types::*; pub use self::types::*;
@ -31,18 +34,15 @@ pub struct MercuryPending {
callback: Option<oneshot::Sender<Result<MercuryResponse, MercuryError>>>, callback: Option<oneshot::Sender<Result<MercuryResponse, MercuryError>>>,
} }
pin_project! {
pub struct MercuryFuture<T> { pub struct MercuryFuture<T> {
#[pin] receiver: oneshot::Receiver<Result<T, MercuryError>>,
receiver: oneshot::Receiver<Result<T, MercuryError>>
}
} }
impl<T> Future for MercuryFuture<T> { impl<T> Future for MercuryFuture<T> {
type Output = Result<T, MercuryError>; type Output = Result<T, MercuryError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.project().receiver.poll(cx) { match Pin::new(&mut self.receiver).poll(cx) {
Poll::Ready(Ok(x)) => Poll::Ready(x), Poll::Ready(Ok(x)) => Poll::Ready(x),
Poll::Ready(Err(_)) => Poll::Ready(Err(MercuryError)), Poll::Ready(Err(_)) => Poll::Ready(Err(MercuryError)),
Poll::Pending => Poll::Pending, Poll::Pending => Poll::Pending,
@ -119,7 +119,7 @@ impl MercuryManager {
async move { async move {
let response = request.await?; let response = request.await?;
let (tx, rx) = mpsc::unbounded(); let (tx, rx) = mpsc::unbounded_channel();
manager.lock(move |inner| { manager.lock(move |inner| {
if !inner.invalid { if !inner.invalid {
@ -221,7 +221,7 @@ impl MercuryManager {
// if send fails, remove from list of subs // if send fails, remove from list of subs
// TODO: send unsub message // TODO: send unsub message
sub.unbounded_send(response.clone()).is_ok() sub.send(response.clone()).is_ok()
} else { } else {
// URI doesn't match // URI doesn't match
true true

View file

@ -1,14 +1,19 @@
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock, Weak}; use std::sync::{Arc, RwLock, Weak};
use std::task::Context;
use std::task::Poll; use std::task::Poll;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use std::{io, pin::Pin, task::Context};
use once_cell::sync::OnceCell;
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes; use bytes::Bytes;
use futures::{channel::mpsc, Future, FutureExt, StreamExt, TryStream, TryStreamExt}; use futures_core::TryStream;
use futures_util::{FutureExt, StreamExt, TryStreamExt};
use once_cell::sync::OnceCell;
use tokio::sync::mpsc;
use tokio_stream::wrappers::UnboundedReceiverStream;
use crate::apresolve::apresolve_or_fallback; use crate::apresolve::apresolve_or_fallback;
use crate::audio_key::AudioKeyManager; use crate::audio_key::AudioKeyManager;
@ -87,7 +92,7 @@ impl Session {
) -> Session { ) -> Session {
let (sink, stream) = transport.split(); let (sink, stream) = transport.split();
let (sender_tx, sender_rx) = mpsc::unbounded(); let (sender_tx, sender_rx) = mpsc::unbounded_channel();
let session_id = SESSION_COUNTER.fetch_add(1, Ordering::Relaxed); let session_id = SESSION_COUNTER.fetch_add(1, Ordering::Relaxed);
debug!("new Session[{}]", session_id); debug!("new Session[{}]", session_id);
@ -114,11 +119,13 @@ impl Session {
session_id: session_id, session_id: session_id,
})); }));
let sender_task = sender_rx.map(Ok::<_, io::Error>).forward(sink); let sender_task = UnboundedReceiverStream::new(sender_rx)
.map(Ok)
.forward(sink);
let receiver_task = DispatchTask(stream, session.weak()); let receiver_task = DispatchTask(stream, session.weak());
let task = let task =
futures::future::join(sender_task, receiver_task).map(|_| io::Result::<_>::Ok(())); futures_util::future::join(sender_task, receiver_task).map(|_| io::Result::<_>::Ok(()));
tokio::spawn(task); tokio::spawn(task);
session session
} }
@ -193,7 +200,7 @@ impl Session {
} }
pub fn send_packet(&self, cmd: u8, data: Vec<u8>) { pub fn send_packet(&self, cmd: u8, data: Vec<u8>) {
self.0.tx_connection.unbounded_send((cmd, data)).unwrap(); self.0.tx_connection.send((cmd, data)).unwrap();
} }
pub fn cache(&self) -> Option<&Arc<Cache>> { pub fn cache(&self) -> Option<&Arc<Cache>> {