Migrate core to tokio 1.0

This commit is contained in:
johannesd3 2021-01-21 21:49:39 +01:00
parent efabb03631
commit 40e6355c34
16 changed files with 406 additions and 661 deletions

View file

@ -13,34 +13,32 @@ path = "../protocol"
version = "0.1.3" version = "0.1.3"
[dependencies] [dependencies]
aes = "0.6"
base64 = "0.13" base64 = "0.13"
byteorder = "1.3" byteorder = "1.4"
bytes = "0.4" bytes = "1.0"
error-chain = { version = "0.12", default_features = false } futures = { version = "0.3", features = ["bilock", "unstable"] }
futures = "0.1" hmac = "0.7"
httparse = "1.3" httparse = "1.3"
hyper = "0.11" hyper = { version = "0.14", features = ["client", "tcp", "http1", "http2", "stream"] }
hyper-proxy = { version = "0.4", default_features = false }
lazy_static = "1.3"
log = "0.4" log = "0.4"
num-bigint = "0.3" num-bigint = "0.3"
num-integer = "0.1" num-integer = "0.1"
num-traits = "0.2" num-traits = "0.2"
once_cell = "1.5.2"
pbkdf2 = "0.3"
pin-project = "1.0"
protobuf = "~2.14.0" protobuf = "~2.14.0"
rand = "0.7" rand = "0.7"
serde = "1.0" serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
serde_json = "1.0" serde_json = "1.0"
sha-1 = "~0.8"
shannon = "0.2.0" shannon = "0.2.0"
tokio-codec = "0.1" tokio = { version = "1.0", features = ["io-util", "rt-multi-thread", "macros" ] }
tokio-core = "0.1" tokio-util = { version = "0.6", features = ["codec"] }
tokio-io = "0.1"
url = "1.7" url = "1.7"
uuid = { version = "0.8", features = ["v4"] } uuid = { version = "0.8", features = ["v4"] }
sha-1 = "0.8"
hmac = "0.7"
pbkdf2 = "0.3"
aes = "0.3"
[build-dependencies] [build-dependencies]
rand = "0.7" rand = "0.7"

View file

@ -1,101 +1,69 @@
const AP_FALLBACK: &'static str = "ap.spotify.com:443"; const AP_FALLBACK: &'static str = "ap.spotify.com:443";
const APRESOLVE_ENDPOINT: &'static str = "http://apresolve.spotify.com/"; const APRESOLVE_ENDPOINT: &'static str = "http://apresolve.spotify.com/";
use futures::{Future, Stream}; use hyper::{Body, Client, Method, Request, Uri};
use hyper::client::HttpConnector; use std::error::Error;
use hyper::{self, Client, Method, Request, Uri};
use hyper_proxy::{Intercept, Proxy, ProxyConnector};
use serde_json;
use std::str::FromStr;
use tokio_core::reactor::Handle;
use url::Url; use url::Url;
error_chain! {}
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct APResolveData { pub struct APResolveData {
ap_list: Vec<String>, ap_list: Vec<String>,
} }
fn apresolve( async fn apresolve(proxy: &Option<Url>, ap_port: &Option<u16>) -> Result<String, Box<dyn Error>> {
handle: &Handle, let port = ap_port.unwrap_or(443);
proxy: &Option<Url>,
ap_port: &Option<u16>,
) -> Box<dyn Future<Item = String, Error = Error>> {
let url = Uri::from_str(APRESOLVE_ENDPOINT).expect("invalid AP resolve URL");
let use_proxy = proxy.is_some();
let mut req = Request::new(Method::Get, url.clone()); let req = Request::builder()
let response = match *proxy { .method(Method::GET)
Some(ref val) => { .uri(
let proxy_url = Uri::from_str(val.as_str()).expect("invalid http proxy"); APRESOLVE_ENDPOINT
let proxy = Proxy::new(Intercept::All, proxy_url); .parse::<Uri>()
let connector = HttpConnector::new(4, handle); .expect("invalid AP resolve URL"),
)
.body(Body::empty())?;
let client = if proxy.is_some() {
todo!("proxies not yet supported")
/*let proxy = {
let proxy_url = val.as_str().parse().expect("invalid http proxy");
let mut proxy = Proxy::new(Intercept::All, proxy_url);
let connector = HttpConnector::new();
let proxy_connector = ProxyConnector::from_proxy_unsecured(connector, proxy); let proxy_connector = ProxyConnector::from_proxy_unsecured(connector, proxy);
if let Some(headers) = proxy_connector.http_headers(&url) { proxy_connector
req.headers_mut().extend(headers.iter());
req.set_proxy(true);
}
let client = Client::configure().connector(proxy_connector).build(handle);
client.request(req)
}
_ => {
let client = Client::new(handle);
client.request(req)
}
}; };
let body = response.and_then(|response| { if let Some(headers) = proxy.http_headers(&APRESOLVE_ENDPOINT.parse().unwrap()) {
response.body().fold(Vec::new(), |mut acc, chunk| { req.headers_mut().extend(headers.clone());
acc.extend_from_slice(chunk.as_ref()); };
Ok::<_, hyper::Error>(acc) Client::builder().build(proxy)*/
})
});
let body = body.then(|result| result.chain_err(|| "HTTP error"));
let body =
body.and_then(|body| String::from_utf8(body).chain_err(|| "invalid UTF8 in response"));
let data = body
.and_then(|body| serde_json::from_str::<APResolveData>(&body).chain_err(|| "invalid JSON"));
let p = ap_port.clone();
let ap = data.and_then(move |data| {
let mut aps = data.ap_list.iter().filter(|ap| {
if p.is_some() {
Uri::from_str(ap).ok().map_or(false, |uri| {
uri.port().map_or(false, |port| port == p.unwrap())
})
} else if use_proxy {
// It is unlikely that the proxy will accept CONNECT on anything other than 443.
Uri::from_str(ap)
.ok()
.map_or(false, |uri| uri.port().map_or(false, |port| port == 443))
} else { } else {
true Client::new()
};
let response = client.request(req).await?;
let body = hyper::body::to_bytes(response.into_body()).await?;
let data: APResolveData = serde_json::from_slice(body.as_ref())?;
let ap = if ap_port.is_some() || proxy.is_some() {
data.ap_list.into_iter().find_map(|ap| {
if ap.parse::<Uri>().ok()?.port()? == port {
Some(ap)
} else {
None
} }
}); })
} else {
let ap = aps.next().ok_or("empty AP List")?; data.ap_list.into_iter().next()
Ok(ap.clone()) }
}); .ok_or("empty AP List")?;
Ok(ap)
Box::new(ap)
} }
pub(crate) fn apresolve_or_fallback<E>( pub async fn apresolve_or_fallback(proxy: &Option<Url>, ap_port: &Option<u16>) -> String {
handle: &Handle, apresolve(proxy, ap_port).await.unwrap_or_else(|e| {
proxy: &Option<Url>, warn!("Failed to resolve Access Point: {}", e);
ap_port: &Option<u16>,
) -> Box<dyn Future<Item = String, Error = E>>
where
E: 'static,
{
let ap = apresolve(handle, proxy, ap_port).or_else(|e| {
warn!("Failed to resolve Access Point: {}", e.description());
warn!("Using fallback \"{}\"", AP_FALLBACK); warn!("Using fallback \"{}\"", AP_FALLBACK);
Ok(AP_FALLBACK.into()) AP_FALLBACK.into()
}); })
Box::new(ap)
} }

View file

@ -1,7 +1,6 @@
use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use bytes::Bytes; use bytes::Bytes;
use futures::sync::oneshot; use futures::channel::oneshot;
use futures::{Async, Future, Poll};
use std::collections::HashMap; use std::collections::HashMap;
use std::io::Write; use std::io::Write;
@ -47,7 +46,7 @@ impl AudioKeyManager {
} }
} }
pub fn request(&self, track: SpotifyId, file: FileId) -> AudioKeyFuture<AudioKey> { pub async fn request(&self, track: SpotifyId, file: FileId) -> Result<AudioKey, AudioKeyError> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
let seq = self.lock(move |inner| { let seq = self.lock(move |inner| {
@ -57,7 +56,7 @@ impl AudioKeyManager {
}); });
self.send_key_request(seq, track, file); self.send_key_request(seq, track, file);
AudioKeyFuture(rx) rx.await.map_err(|_| AudioKeyError)?
} }
fn send_key_request(&self, seq: u32, track: SpotifyId, file: FileId) { fn send_key_request(&self, seq: u32, track: SpotifyId, file: FileId) {
@ -70,18 +69,3 @@ impl AudioKeyManager {
self.session().send_packet(0xc, data) self.session().send_packet(0xc, data)
} }
} }
pub struct AudioKeyFuture<T>(oneshot::Receiver<Result<T, AudioKeyError>>);
impl<T> Future for AudioKeyFuture<T> {
type Item = T;
type Error = AudioKeyError;
fn poll(&mut self) -> Poll<T, AudioKeyError> {
match self.0.poll() {
Ok(Async::Ready(Ok(value))) => Ok(Async::Ready(value)),
Ok(Async::Ready(Err(err))) => Err(err),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(oneshot::Canceled) => Err(AudioKeyError),
}
}
}

View file

@ -1,11 +1,9 @@
use aes::Aes192; use aes::Aes192;
use base64; use aes::NewBlockCipher;
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use hmac::Hmac; use hmac::Hmac;
use pbkdf2::pbkdf2; use pbkdf2::pbkdf2;
use protobuf::ProtobufEnum; use protobuf::ProtobufEnum;
use serde;
use serde_json;
use sha1::{Digest, Sha1}; use sha1::{Digest, Sha1};
use std::fs::File; use std::fs::File;
use std::io::{self, Read, Write}; use std::io::{self, Read, Write};
@ -76,9 +74,9 @@ impl Credentials {
// decrypt data using ECB mode without padding // decrypt data using ECB mode without padding
let blob = { let blob = {
use aes::block_cipher_trait::generic_array::typenum::Unsigned; use aes::cipher::generic_array::typenum::Unsigned;
use aes::block_cipher_trait::generic_array::GenericArray; use aes::cipher::generic_array::GenericArray;
use aes::block_cipher_trait::BlockCipher; use aes::cipher::BlockCipher;
let mut data = base64::decode(encrypted_blob).unwrap(); let mut data = base64::decode(encrypted_blob).unwrap();
let cipher = Aes192::new(GenericArray::from_slice(&key)); let cipher = Aes192::new(GenericArray::from_slice(&key));

View file

@ -1,9 +1,12 @@
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes; use bytes::Bytes;
use futures::sync::{mpsc, BiLock}; use futures::{channel::mpsc, lock::BiLock, Stream, StreamExt};
use futures::{Async, Poll, Stream}; use std::{
use std::collections::HashMap; collections::HashMap,
use std::time::Instant; pin::Pin,
task::{Context, Poll},
time::Instant,
};
use crate::util::SeqGenerator; use crate::util::SeqGenerator;
@ -101,12 +104,10 @@ impl ChannelManager {
} }
impl Channel { impl Channel {
fn recv_packet(&mut self) -> Poll<Bytes, ChannelError> { fn recv_packet(&mut self, cx: &mut Context<'_>) -> Poll<Result<Bytes, ChannelError>> {
let (cmd, packet) = match self.receiver.poll() { let (cmd, packet) = match self.receiver.poll_next_unpin(cx) {
Ok(Async::Ready(Some(t))) => t, Poll::Pending => return Poll::Pending,
Ok(Async::Ready(None)) => return Err(ChannelError), // The channel has been closed. Poll::Ready(o) => o.ok_or(ChannelError)?,
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(()) => unreachable!(),
}; };
if cmd == 0xa { if cmd == 0xa {
@ -115,9 +116,9 @@ impl Channel {
self.state = ChannelState::Closed; self.state = ChannelState::Closed;
Err(ChannelError) Poll::Ready(Err(ChannelError))
} else { } else {
Ok(Async::Ready(packet)) Poll::Ready(Ok(packet))
} }
} }
@ -129,16 +130,19 @@ impl Channel {
} }
impl Stream for Channel { impl Stream for Channel {
type Item = ChannelEvent; type Item = Result<ChannelEvent, ChannelError>;
type Error = ChannelError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop { loop {
match self.state.clone() { match self.state.clone() {
ChannelState::Closed => panic!("Polling already terminated channel"), ChannelState::Closed => panic!("Polling already terminated channel"),
ChannelState::Header(mut data) => { ChannelState::Header(mut data) => {
if data.len() == 0 { if data.len() == 0 {
data = try_ready!(self.recv_packet()); data = match self.recv_packet(cx) {
Poll::Ready(Ok(x)) => x,
Poll::Ready(Err(x)) => return Poll::Ready(Some(Err(x))),
Poll::Pending => return Poll::Pending,
};
} }
let length = BigEndian::read_u16(data.split_to(2).as_ref()) as usize; let length = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
@ -152,19 +156,23 @@ impl Stream for Channel {
self.state = ChannelState::Header(data); self.state = ChannelState::Header(data);
let event = ChannelEvent::Header(header_id, header_data); let event = ChannelEvent::Header(header_id, header_data);
return Ok(Async::Ready(Some(event))); return Poll::Ready(Some(Ok(event)));
} }
} }
ChannelState::Data => { ChannelState::Data => {
let data = try_ready!(self.recv_packet()); let data = match self.recv_packet(cx) {
Poll::Ready(Ok(x)) => x,
Poll::Ready(Err(x)) => return Poll::Ready(Some(Err(x))),
Poll::Pending => return Poll::Pending,
};
if data.len() == 0 { if data.len() == 0 {
self.receiver.close(); self.receiver.close();
self.state = ChannelState::Closed; self.state = ChannelState::Closed;
return Ok(Async::Ready(None)); return Poll::Ready(None);
} else { } else {
let event = ChannelEvent::Data(data); let event = ChannelEvent::Data(data);
return Ok(Async::Ready(Some(event))); return Poll::Ready(Some(Ok(event)));
} }
} }
} }
@ -173,38 +181,45 @@ impl Stream for Channel {
} }
impl Stream for ChannelData { impl Stream for ChannelData {
type Item = Bytes; type Item = Result<Bytes, ChannelError>;
type Error = ChannelError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut channel = match self.0.poll_lock() { let mut channel = match self.0.poll_lock(cx) {
Async::Ready(c) => c, Poll::Ready(c) => c,
Async::NotReady => return Ok(Async::NotReady), Poll::Pending => return Poll::Pending,
}; };
loop { loop {
match try_ready!(channel.poll()) { let x = match channel.poll_next_unpin(cx) {
Poll::Ready(x) => x.transpose()?,
Poll::Pending => return Poll::Pending,
};
match x {
Some(ChannelEvent::Header(..)) => (), Some(ChannelEvent::Header(..)) => (),
Some(ChannelEvent::Data(data)) => return Ok(Async::Ready(Some(data))), Some(ChannelEvent::Data(data)) => return Poll::Ready(Some(Ok(data))),
None => return Ok(Async::Ready(None)), None => return Poll::Ready(None),
} }
} }
} }
} }
impl Stream for ChannelHeaders { impl Stream for ChannelHeaders {
type Item = (u8, Vec<u8>); type Item = Result<(u8, Vec<u8>), ChannelError>;
type Error = ChannelError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut channel = match self.0.poll_lock() { let mut channel = match self.0.poll_lock(cx) {
Async::Ready(c) => c, Poll::Ready(c) => c,
Async::NotReady => return Ok(Async::NotReady), Poll::Pending => return Poll::Pending,
}; };
match try_ready!(channel.poll()) { let x = match channel.poll_next_unpin(cx) {
Some(ChannelEvent::Header(id, data)) => Ok(Async::Ready(Some((id, data)))), Poll::Ready(x) => x.transpose()?,
Some(ChannelEvent::Data(..)) | None => Ok(Async::Ready(None)), Poll::Pending => return Poll::Pending,
};
match x {
Some(ChannelEvent::Header(id, data)) => Poll::Ready(Some(Ok((id, data)))),
Some(ChannelEvent::Data(..)) | None => Poll::Ready(None),
} }
} }
} }

View file

@ -35,29 +35,3 @@ macro_rules! component {
} }
} }
} }
use std::cell::UnsafeCell;
use std::sync::Mutex;
pub(crate) struct Lazy<T>(Mutex<bool>, UnsafeCell<Option<T>>);
unsafe impl<T: Sync> Sync for Lazy<T> {}
unsafe impl<T: Send> Send for Lazy<T> {}
#[cfg_attr(feature = "cargo-clippy", allow(mutex_atomic))]
impl<T> Lazy<T> {
pub(crate) fn new() -> Lazy<T> {
Lazy(Mutex::new(false), UnsafeCell::new(None))
}
pub(crate) fn get<F: FnOnce() -> T>(&self, f: F) -> &T {
let mut inner = self.0.lock().unwrap();
if !*inner {
unsafe {
*self.1.get() = Some(f());
}
*inner = true;
}
unsafe { &*self.1.get() }.as_ref().unwrap()
}
}

View file

@ -2,7 +2,7 @@ use byteorder::{BigEndian, ByteOrder};
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use shannon::Shannon; use shannon::Shannon;
use std::io; use std::io;
use tokio_io::codec::{Decoder, Encoder}; use tokio_util::codec::{Decoder, Encoder};
const HEADER_SIZE: usize = 3; const HEADER_SIZE: usize = 3;
const MAC_SIZE: usize = 4; const MAC_SIZE: usize = 4;
@ -35,8 +35,7 @@ impl APCodec {
} }
} }
impl Encoder for APCodec { impl Encoder<(u8, Vec<u8>)> for APCodec {
type Item = (u8, Vec<u8>);
type Error = io::Error; type Error = io::Error;
fn encode(&mut self, item: (u8, Vec<u8>), buf: &mut BytesMut) -> io::Result<()> { fn encode(&mut self, item: (u8, Vec<u8>), buf: &mut BytesMut) -> io::Result<()> {
@ -45,7 +44,7 @@ impl Encoder for APCodec {
buf.reserve(3 + payload.len()); buf.reserve(3 + payload.len());
buf.put_u8(cmd); buf.put_u8(cmd);
buf.put_u16_be(payload.len() as u16); buf.put_u16(payload.len() as u16);
buf.extend_from_slice(&payload); buf.extend_from_slice(&payload);
self.encode_cipher.nonce_u32(self.encode_nonce); self.encode_cipher.nonce_u32(self.encode_nonce);

View file

@ -1,14 +1,11 @@
use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use futures::{Async, Future, Poll};
use hmac::{Hmac, Mac}; use hmac::{Hmac, Mac};
use protobuf::{self, Message}; use protobuf::{self, Message};
use rand::thread_rng; use rand::thread_rng;
use sha1::Sha1; use sha1::Sha1;
use std::io::{self, Read}; use std::io;
use std::marker::PhantomData; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio_codec::{Decoder, Framed}; use tokio_util::codec::{Decoder, Framed};
use tokio_io::io::{read_exact, write_all, ReadExact, Window, WriteAll};
use tokio_io::{AsyncRead, AsyncWrite};
use super::codec::APCodec; use super::codec::APCodec;
use crate::diffie_hellman::DHLocalKeys; use crate::diffie_hellman::DHLocalKeys;
@ -16,44 +13,13 @@ use crate::protocol;
use crate::protocol::keyexchange::{APResponseMessage, ClientHello, ClientResponsePlaintext}; use crate::protocol::keyexchange::{APResponseMessage, ClientHello, ClientResponsePlaintext};
use crate::util; use crate::util;
pub struct Handshake<T> { pub async fn handshake<T: AsyncRead + AsyncWrite + Unpin>(
keys: DHLocalKeys, mut connection: T,
state: HandshakeState<T>, ) -> io::Result<Framed<T, APCodec>> {
}
enum HandshakeState<T> {
ClientHello(WriteAll<T, Vec<u8>>),
APResponse(RecvPacket<T, APResponseMessage>),
ClientResponse(Option<APCodec>, WriteAll<T, Vec<u8>>),
}
pub fn handshake<T: AsyncRead + AsyncWrite>(connection: T) -> Handshake<T> {
let local_keys = DHLocalKeys::random(&mut thread_rng()); let local_keys = DHLocalKeys::random(&mut thread_rng());
let client_hello = client_hello(connection, local_keys.public_key()); let gc = local_keys.public_key();
let mut accumulator = client_hello(&mut connection, gc).await?;
Handshake { let message: APResponseMessage = recv_packet(&mut connection, &mut accumulator).await?;
keys: local_keys,
state: HandshakeState::ClientHello(client_hello),
}
}
impl<T: AsyncRead + AsyncWrite> Future for Handshake<T> {
type Item = Framed<T, APCodec>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, io::Error> {
use self::HandshakeState::*;
loop {
self.state = match self.state {
ClientHello(ref mut write) => {
let (connection, accumulator) = try_ready!(write.poll());
let read = recv_packet(connection, accumulator);
APResponse(read)
}
APResponse(ref mut read) => {
let (connection, message, accumulator) = try_ready!(read.poll());
let remote_key = message let remote_key = message
.get_challenge() .get_challenge()
.get_login_crypto_challenge() .get_login_crypto_challenge()
@ -61,27 +27,19 @@ impl<T: AsyncRead + AsyncWrite> Future for Handshake<T> {
.get_gs() .get_gs()
.to_owned(); .to_owned();
let shared_secret = self.keys.shared_secret(&remote_key); let shared_secret = local_keys.shared_secret(&remote_key);
let (challenge, send_key, recv_key) = let (challenge, send_key, recv_key) = compute_keys(&shared_secret, &accumulator);
compute_keys(&shared_secret, &accumulator);
let codec = APCodec::new(&send_key, &recv_key); let codec = APCodec::new(&send_key, &recv_key);
let write = client_response(connection, challenge); client_response(&mut connection, challenge).await?;
ClientResponse(Some(codec), write)
}
ClientResponse(ref mut codec, ref mut write) => { Ok(codec.framed(connection))
let (connection, _) = try_ready!(write.poll());
let codec = codec.take().unwrap();
let framed = codec.framed(connection);
return Ok(Async::Ready(framed));
}
}
}
}
} }
fn client_hello<T: AsyncWrite>(connection: T, gc: Vec<u8>) -> WriteAll<T, Vec<u8>> { async fn client_hello<T>(connection: &mut T, gc: Vec<u8>) -> io::Result<Vec<u8>>
where
T: AsyncWrite + Unpin,
{
let mut packet = ClientHello::new(); let mut packet = ClientHello::new();
packet packet
.mut_build_info() .mut_build_info()
@ -106,13 +64,17 @@ fn client_hello<T: AsyncWrite>(connection: T, gc: Vec<u8>) -> WriteAll<T, Vec<u8
let mut buffer = vec![0, 4]; let mut buffer = vec![0, 4];
let size = 2 + 4 + packet.compute_size(); let size = 2 + 4 + packet.compute_size();
buffer.write_u32::<BigEndian>(size).unwrap(); <Vec<u8> as WriteBytesExt>::write_u32::<BigEndian>(&mut buffer, size).unwrap();
packet.write_to_vec(&mut buffer).unwrap(); packet.write_to_vec(&mut buffer).unwrap();
write_all(connection, buffer) connection.write_all(&buffer[..]).await?;
Ok(buffer)
} }
fn client_response<T: AsyncWrite>(connection: T, challenge: Vec<u8>) -> WriteAll<T, Vec<u8>> { async fn client_response<T>(connection: &mut T, challenge: Vec<u8>) -> io::Result<()>
where
T: AsyncWrite + Unpin,
{
let mut packet = ClientResponsePlaintext::new(); let mut packet = ClientResponsePlaintext::new();
packet packet
.mut_login_crypto_response() .mut_login_crypto_response()
@ -123,70 +85,35 @@ fn client_response<T: AsyncWrite>(connection: T, challenge: Vec<u8>) -> WriteAll
let mut buffer = vec![]; let mut buffer = vec![];
let size = 4 + packet.compute_size(); let size = 4 + packet.compute_size();
buffer.write_u32::<BigEndian>(size).unwrap(); <Vec<u8> as WriteBytesExt>::write_u32::<BigEndian>(&mut buffer, size).unwrap();
packet.write_to_vec(&mut buffer).unwrap(); packet.write_to_vec(&mut buffer).unwrap();
write_all(connection, buffer) connection.write_all(&buffer[..]).await?;
Ok(())
} }
enum RecvPacket<T, M: Message> { async fn recv_packet<T, M>(connection: &mut T, acc: &mut Vec<u8>) -> io::Result<M>
Header(ReadExact<T, Window<Vec<u8>>>, PhantomData<M>),
Body(ReadExact<T, Window<Vec<u8>>>, PhantomData<M>),
}
fn recv_packet<T: AsyncRead, M>(connection: T, acc: Vec<u8>) -> RecvPacket<T, M>
where where
T: Read, T: AsyncRead + Unpin,
M: Message, M: Message,
{ {
RecvPacket::Header(read_into_accumulator(connection, 4, acc), PhantomData) let header = read_into_accumulator(connection, 4, acc).await?;
let size = BigEndian::read_u32(header) as usize;
let data = read_into_accumulator(connection, size - 4, acc).await?;
let message = protobuf::parse_from_bytes(data).unwrap();
Ok(message)
} }
impl<T: AsyncRead, M> Future for RecvPacket<T, M> async fn read_into_accumulator<'a, T: AsyncRead + Unpin>(
where connection: &mut T,
T: Read,
M: Message,
{
type Item = (T, M, Vec<u8>);
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, io::Error> {
use self::RecvPacket::*;
loop {
*self = match *self {
Header(ref mut read, _) => {
let (connection, header) = try_ready!(read.poll());
let size = BigEndian::read_u32(header.as_ref()) as usize;
let acc = header.into_inner();
let read = read_into_accumulator(connection, size - 4, acc);
RecvPacket::Body(read, PhantomData)
}
Body(ref mut read, _) => {
let (connection, data) = try_ready!(read.poll());
let message = protobuf::parse_from_bytes(data.as_ref()).unwrap();
let acc = data.into_inner();
return Ok(Async::Ready((connection, message, acc)));
}
}
}
}
}
fn read_into_accumulator<T: AsyncRead>(
connection: T,
size: usize, size: usize,
mut acc: Vec<u8>, acc: &'a mut Vec<u8>,
) -> ReadExact<T, Window<Vec<u8>>> { ) -> io::Result<&'a mut [u8]> {
let offset = acc.len(); let offset = acc.len();
acc.resize(offset + size, 0); acc.resize(offset + size, 0);
let mut window = Window::new(acc); connection.read_exact(&mut acc[offset..]).await?;
window.set_start(offset); Ok(&mut acc[offset..])
read_exact(connection, window)
} }
fn compute_keys(shared_secret: &[u8], packets: &[u8]) -> (Vec<u8>, Vec<u8>, Vec<u8>) { fn compute_keys(shared_secret: &[u8], packets: &[u8]) -> (Vec<u8>, Vec<u8>, Vec<u8>) {

View file

@ -4,13 +4,12 @@ mod handshake;
pub use self::codec::APCodec; pub use self::codec::APCodec;
pub use self::handshake::handshake; pub use self::handshake::handshake;
use futures::{Future, Sink, Stream}; use futures::{SinkExt, StreamExt};
use protobuf::{self, Message}; use protobuf::{self, Message};
use std::io; use std::io;
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use tokio_codec::Framed; use tokio::net::TcpStream;
use tokio_core::net::TcpStream; use tokio_util::codec::Framed;
use tokio_core::reactor::Handle;
use url::Url; use url::Url;
use crate::authentication::Credentials; use crate::authentication::Credentials;
@ -20,53 +19,36 @@ use crate::proxytunnel;
pub type Transport = Framed<TcpStream, APCodec>; pub type Transport = Framed<TcpStream, APCodec>;
pub fn connect( pub async fn connect(addr: String, proxy: &Option<Url>) -> io::Result<Transport> {
addr: String, let socket = if let Some(proxy) = proxy {
handle: &Handle, info!("Using proxy \"{}\"", proxy);
proxy: &Option<Url>, let socket_addr = proxy.to_socket_addrs().and_then(|mut iter| {
) -> Box<dyn Future<Item = Transport, Error = io::Error>> { iter.next().ok_or_else(|| {
let (addr, connect_url) = match *proxy { io::Error::new(
Some(ref url) => {
info!("Using proxy \"{}\"", url);
match url.to_socket_addrs().and_then(|mut iter| {
iter.next().ok_or(io::Error::new(
io::ErrorKind::NotFound, io::ErrorKind::NotFound,
"Can't resolve proxy server address", "Can't resolve proxy server address",
)) )
}) { })
Ok(socket_addr) => (socket_addr, Some(addr)), })?;
Err(error) => return Box::new(futures::future::err(error)), let socket = TcpStream::connect(&socket_addr).await?;
} proxytunnel::connect(socket, &addr).await?
} } else {
None => { let socket_addr = addr.to_socket_addrs().and_then(|mut iter| {
match addr.to_socket_addrs().and_then(|mut iter| { iter.next().ok_or_else(|| {
iter.next().ok_or(io::Error::new( io::Error::new(io::ErrorKind::NotFound, "Can't resolve server address")
io::ErrorKind::NotFound, })
"Can't resolve server address", })?;
)) TcpStream::connect(&socket_addr).await?
}) {
Ok(socket_addr) => (socket_addr, None),
Err(error) => return Box::new(futures::future::err(error)),
}
}
}; };
let socket = TcpStream::connect(&addr, handle); handshake(socket).await
if let Some(connect_url) = connect_url {
let connection = socket
.and_then(move |socket| proxytunnel::connect(socket, &connect_url).and_then(handshake));
Box::new(connection)
} else {
let connection = socket.and_then(handshake);
Box::new(connection)
}
} }
pub fn authenticate( pub async fn authenticate(
transport: Transport, transport: &mut Transport,
credentials: Credentials, credentials: Credentials,
device_id: String, device_id: &str,
) -> Box<dyn Future<Item = (Transport, Credentials), Error = io::Error>> { ) -> io::Result<Credentials> {
use crate::protocol::authentication::{APWelcome, ClientResponseEncrypted, CpuFamily, Os}; use crate::protocol::authentication::{APWelcome, ClientResponseEncrypted, CpuFamily, Os};
use crate::protocol::keyexchange::APLoginFailed; use crate::protocol::keyexchange::APLoginFailed;
@ -91,20 +73,19 @@ pub fn authenticate(
version::short_sha(), version::short_sha(),
version::build_id() version::build_id()
)); ));
packet.mut_system_info().set_device_id(device_id); packet
.mut_system_info()
.set_device_id(device_id.to_string());
packet.set_version_string(version::version_string()); packet.set_version_string(version::version_string());
let cmd = 0xab; let cmd = 0xab;
let data = packet.write_to_bytes().unwrap(); let data = packet.write_to_bytes().unwrap();
Box::new( transport.send((cmd, data)).await?;
transport let (cmd, data) = transport.next().await.expect("EOF")?;
.send((cmd, data)) match cmd {
.and_then(|transport| transport.into_future().map_err(|(err, _stream)| err)) 0xac => {
.and_then(|(packet, transport)| match packet { let welcome_data: APWelcome = protobuf::parse_from_bytes(data.as_ref()).unwrap();
Some((0xac, data)) => {
let welcome_data: APWelcome =
protobuf::parse_from_bytes(data.as_ref()).unwrap();
let reusable_credentials = Credentials { let reusable_credentials = Credentials {
username: welcome_data.get_canonical_username().to_owned(), username: welcome_data.get_canonical_username().to_owned(),
@ -112,20 +93,17 @@ pub fn authenticate(
auth_data: welcome_data.get_reusable_auth_credentials().to_owned(), auth_data: welcome_data.get_reusable_auth_credentials().to_owned(),
}; };
Ok((transport, reusable_credentials)) Ok(reusable_credentials)
} }
Some((0xad, data)) => { 0xad => {
let error_data: APLoginFailed = let error_data: APLoginFailed = protobuf::parse_from_bytes(data.as_ref()).unwrap();
protobuf::parse_from_bytes(data.as_ref()).unwrap();
panic!( panic!(
"Authentication failed with reason: {:?}", "Authentication failed with reason: {:?}",
error_data.get_error_code() error_data.get_error_code()
) )
} }
Some((cmd, _)) => panic!("Unexpected packet {:?}", cmd), _ => panic!("Unexpected packet {:?}", cmd),
None => panic!("EOF"), }
}),
)
} }

View file

@ -1,12 +1,12 @@
use num_bigint::BigUint; use num_bigint::BigUint;
use num_traits::FromPrimitive; use once_cell::sync::Lazy;
use rand::Rng; use rand::Rng;
use crate::util; use crate::util;
lazy_static! { pub static DH_GENERATOR: Lazy<BigUint> = Lazy::new(|| BigUint::from_bytes_be(&[0x02]));
pub static ref DH_GENERATOR: BigUint = BigUint::from_u64(0x2).unwrap(); pub static DH_PRIME: Lazy<BigUint> = Lazy::new(|| {
pub static ref DH_PRIME: BigUint = BigUint::from_bytes_be(&[ BigUint::from_bytes_be(&[
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xc9, 0x0f, 0xda, 0xa2, 0x21, 0x68, 0xc2, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xc9, 0x0f, 0xda, 0xa2, 0x21, 0x68, 0xc2,
0x34, 0xc4, 0xc6, 0x62, 0x8b, 0x80, 0xdc, 0x1c, 0xd1, 0x29, 0x02, 0x4e, 0x08, 0x8a, 0x67, 0x34, 0xc4, 0xc6, 0x62, 0x8b, 0x80, 0xdc, 0x1c, 0xd1, 0x29, 0x02, 0x4e, 0x08, 0x8a, 0x67,
0xcc, 0x74, 0x02, 0x0b, 0xbe, 0xa6, 0x3b, 0x13, 0x9b, 0x22, 0x51, 0x4a, 0x08, 0x79, 0x8e, 0xcc, 0x74, 0x02, 0x0b, 0xbe, 0xa6, 0x3b, 0x13, 0x9b, 0x22, 0x51, 0x4a, 0x08, 0x79, 0x8e,
@ -14,8 +14,8 @@ lazy_static! {
0xf2, 0x5f, 0x14, 0x37, 0x4f, 0xe1, 0x35, 0x6d, 0x6d, 0x51, 0xc2, 0x45, 0xe4, 0x85, 0xb5, 0xf2, 0x5f, 0x14, 0x37, 0x4f, 0xe1, 0x35, 0x6d, 0x6d, 0x51, 0xc2, 0x45, 0xe4, 0x85, 0xb5,
0x76, 0x62, 0x5e, 0x7e, 0xc6, 0xf4, 0x4c, 0x42, 0xe9, 0xa6, 0x3a, 0x36, 0x20, 0xff, 0xff, 0x76, 0x62, 0x5e, 0x7e, 0xc6, 0xf4, 0x4c, 0x42, 0xe9, 0xa6, 0x3a, 0x36, 0x20, 0xff, 0xff,
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
]); ])
} });
pub struct DHLocalKeys { pub struct DHLocalKeys {
private_key: BigUint, private_key: BigUint,

View file

@ -1,8 +1,4 @@
use futures::Future; use crate::{mercury::MercuryError, session::Session};
use serde_json;
use crate::mercury::MercuryError;
use crate::session::Session;
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
@ -13,20 +9,16 @@ pub struct Token {
pub scope: Vec<String>, pub scope: Vec<String>,
} }
pub fn get_token( pub async fn get_token(
session: &Session, session: &Session,
client_id: &str, client_id: &str,
scopes: &str, scopes: &str,
) -> Box<dyn Future<Item = Token, Error = MercuryError>> { ) -> Result<Token, MercuryError> {
let url = format!( let url = format!(
"hm://keymaster/token/authenticated?client_id={}&scope={}", "hm://keymaster/token/authenticated?client_id={}&scope={}",
client_id, scopes client_id, scopes
); );
Box::new(session.mercury().get(url).map(move |response| { let response = session.mercury().get(url).await?;
let data = response.payload.first().expect("Empty payload"); let data = response.payload.first().expect("Empty payload");
let data = String::from_utf8(data.clone()).unwrap(); serde_json::from_slice(data.as_ref()).map_err(|_| MercuryError)
let token: Token = serde_json::from_str(&data).unwrap();
token
}))
} }

View file

@ -1,27 +1,23 @@
#![cfg_attr(feature = "cargo-clippy", allow(unused_io_amount))] #![allow(clippy::unused_io_amount)]
#[macro_use]
extern crate error_chain;
#[macro_use]
extern crate futures;
#[macro_use]
extern crate lazy_static;
#[macro_use] #[macro_use]
extern crate log; extern crate log;
#[macro_use] #[macro_use]
extern crate serde_derive; extern crate serde_derive;
#[macro_use]
extern crate pin_project;
extern crate aes; extern crate aes;
extern crate base64; extern crate base64;
extern crate byteorder; extern crate byteorder;
extern crate bytes; extern crate bytes;
extern crate futures;
extern crate hmac; extern crate hmac;
extern crate httparse; extern crate httparse;
extern crate hyper; extern crate hyper;
extern crate hyper_proxy;
extern crate num_bigint; extern crate num_bigint;
extern crate num_integer; extern crate num_integer;
extern crate num_traits; extern crate num_traits;
extern crate once_cell;
extern crate pbkdf2; extern crate pbkdf2;
extern crate protobuf; extern crate protobuf;
extern crate rand; extern crate rand;
@ -29,9 +25,8 @@ extern crate serde;
extern crate serde_json; extern crate serde_json;
extern crate sha1; extern crate sha1;
extern crate shannon; extern crate shannon;
extern crate tokio_codec; extern crate tokio;
extern crate tokio_core; extern crate tokio_util;
extern crate tokio_io;
extern crate url; extern crate url;
extern crate uuid; extern crate uuid;
@ -39,13 +34,14 @@ extern crate librespot_protocol as protocol;
#[macro_use] #[macro_use]
mod component; mod component;
mod apresolve;
pub mod apresolve;
pub mod audio_key; pub mod audio_key;
pub mod authentication; pub mod authentication;
pub mod cache; pub mod cache;
pub mod channel; pub mod channel;
pub mod config; pub mod config;
mod connection; pub mod connection;
pub mod diffie_hellman; pub mod diffie_hellman;
pub mod keymaster; pub mod keymaster;
pub mod mercury; pub mod mercury;

View file

@ -1,14 +1,14 @@
use crate::protocol; use crate::protocol;
use crate::util::url_encode; use crate::util::url_encode;
use crate::util::SeqGenerator;
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes; use bytes::Bytes;
use futures::sync::{mpsc, oneshot}; use futures::{
use futures::{Async, Future, Poll}; channel::{mpsc, oneshot},
use protobuf; Future,
use std::collections::HashMap; };
use std::mem; use std::{collections::HashMap, task::Poll};
use std::{mem, pin::Pin, task::Context};
use crate::util::SeqGenerator;
mod types; mod types;
pub use self::types::*; pub use self::types::*;
@ -31,17 +31,17 @@ pub struct MercuryPending {
callback: Option<oneshot::Sender<Result<MercuryResponse, MercuryError>>>, callback: Option<oneshot::Sender<Result<MercuryResponse, MercuryError>>>,
} }
pub struct MercuryFuture<T>(oneshot::Receiver<Result<T, MercuryError>>); #[pin_project]
impl<T> Future for MercuryFuture<T> { pub struct MercuryFuture<T>(#[pin] oneshot::Receiver<Result<T, MercuryError>>);
type Item = T;
type Error = MercuryError;
fn poll(&mut self) -> Poll<T, MercuryError> { impl<T> Future for MercuryFuture<T> {
match self.0.poll() { type Output = Result<T, MercuryError>;
Ok(Async::Ready(Ok(value))) => Ok(Async::Ready(value)),
Ok(Async::Ready(Err(err))) => Err(err), fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Ok(Async::NotReady) => Ok(Async::NotReady), match self.project().0.poll(cx) {
Err(oneshot::Canceled) => Err(MercuryError), Poll::Ready(Ok(x)) => Poll::Ready(x),
Poll::Ready(Err(_)) => Poll::Ready(Err(MercuryError)),
Poll::Pending => Poll::Pending,
} }
} }
} }
@ -98,27 +98,28 @@ impl MercuryManager {
MercurySender::new(self.clone(), uri.into()) MercurySender::new(self.clone(), uri.into())
} }
pub fn subscribe<T: Into<String>>( pub async fn subscribe<T: Into<String>>(
&self, &self,
uri: T, uri: T,
) -> Box<dyn Future<Item = mpsc::UnboundedReceiver<MercuryResponse>, Error = MercuryError>> ) -> Result<mpsc::UnboundedReceiver<MercuryResponse>, MercuryError> {
{
let uri = uri.into(); let uri = uri.into();
let request = self.request(MercuryRequest { let response = self
.request(MercuryRequest {
method: MercuryMethod::SUB, method: MercuryMethod::SUB,
uri: uri.clone(), uri: uri.clone(),
content_type: None, content_type: None,
payload: Vec::new(), payload: Vec::new(),
}); })
.await?;
let (tx, rx) = mpsc::unbounded();
let manager = self.clone(); let manager = self.clone();
Box::new(request.map(move |response| {
let (tx, rx) = mpsc::unbounded();
manager.lock(move |inner| { manager.lock(move |inner| {
if !inner.invalid { if !inner.invalid {
debug!("subscribed uri={} count={}", uri, response.payload.len()); debug!("subscribed uri={} count={}", uri, response.payload.len());
if response.payload.len() > 0 { if !response.payload.is_empty() {
// Old subscription protocol, watch the provided list of URIs // Old subscription protocol, watch the provided list of URIs
for sub in response.payload { for sub in response.payload {
let mut sub: protocol::pubsub::Subscription = let mut sub: protocol::pubsub::Subscription =
@ -136,8 +137,7 @@ impl MercuryManager {
} }
}); });
rx Ok(rx)
}))
} }
pub(crate) fn dispatch(&self, cmd: u8, mut data: Bytes) { pub(crate) fn dispatch(&self, cmd: u8, mut data: Bytes) {
@ -193,7 +193,7 @@ impl MercuryManager {
let header: protocol::mercury::Header = protobuf::parse_from_bytes(&header_data).unwrap(); let header: protocol::mercury::Header = protobuf::parse_from_bytes(&header_data).unwrap();
let response = MercuryResponse { let response = MercuryResponse {
uri: url_encode(header.get_uri()).to_owned(), uri: url_encode(header.get_uri()),
status_code: header.get_status_code(), status_code: header.get_status_code(),
payload: pending.parts, payload: pending.parts,
}; };

View file

@ -1,5 +1,5 @@
use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend}; use futures::Sink;
use std::collections::VecDeque; use std::{collections::VecDeque, pin::Pin, task::Context};
use super::*; use super::*;
@ -30,27 +30,38 @@ impl Clone for MercurySender {
} }
} }
impl Sink for MercurySender { impl Sink<Vec<u8>> for MercurySender {
type SinkItem = Vec<u8>; type Error = MercuryError;
type SinkError = MercuryError;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let task = self.mercury.send(self.uri.clone(), item); Poll::Ready(Ok(()))
self.pending.push_back(task);
Ok(AsyncSink::Ready)
} }
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_flush(cx)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
loop { loop {
match self.pending.front_mut() { match self.pending.front_mut() {
Some(task) => { Some(task) => {
try_ready!(task.poll()); match Pin::new(task).poll(cx) {
Poll::Ready(Err(x)) => return Poll::Ready(Err(x)),
Poll::Pending => return Poll::Pending,
_ => (),
};
} }
None => { None => {
return Ok(Async::Ready(())); return Poll::Ready(Ok(()));
} }
} }
self.pending.pop_front(); self.pending.pop_front();
} }
} }
fn start_send(mut self: Pin<&mut Self>, item: Vec<u8>) -> Result<(), Self::Error> {
let task = self.mercury.send(self.uri.clone(), item);
self.pending.push_back(task);
Ok(())
}
} }

View file

@ -1,110 +1,45 @@
use std::io; use std::io;
use std::str::FromStr;
use futures::{Async, Future, Poll};
use httparse;
use hyper::Uri; use hyper::Uri;
use tokio_io::io::{read, write_all, Read, Window, WriteAll}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio_io::{AsyncRead, AsyncWrite};
pub struct ProxyTunnel<T> { pub async fn connect<T: AsyncRead + AsyncWrite + Unpin>(
state: ProxyState<T>, mut connection: T,
} connect_url: &str,
) -> io::Result<T> {
let uri = connect_url.parse::<Uri>().unwrap();
let mut buffer = format!(
"CONNECT {0}:{1} HTTP/1.1\r\n\
\r\n",
uri.host().unwrap_or_else(|| panic!("No host in {}", uri)),
uri.port().unwrap_or_else(|| panic!("No port in {}", uri))
)
.into_bytes();
connection.write_all(buffer.as_ref()).await?;
enum ProxyState<T> { buffer.clear();
ProxyConnect(WriteAll<T, Vec<u8>>), connection.read_to_end(&mut buffer).await?;
ProxyResponse(Read<T, Window<Vec<u8>>>), if buffer.is_empty() {
}
pub fn connect<T: AsyncRead + AsyncWrite>(connection: T, connect_url: &str) -> ProxyTunnel<T> {
let proxy = proxy_connect(connection, connect_url);
ProxyTunnel {
state: ProxyState::ProxyConnect(proxy),
}
}
impl<T: AsyncRead + AsyncWrite> Future for ProxyTunnel<T> {
type Item = T;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, io::Error> {
use self::ProxyState::*;
loop {
self.state = match self.state {
ProxyConnect(ref mut write) => {
let (connection, mut accumulator) = try_ready!(write.poll());
let capacity = accumulator.capacity();
accumulator.resize(capacity, 0);
let window = Window::new(accumulator);
let read = read(connection, window);
ProxyResponse(read)
}
ProxyResponse(ref mut read_f) => {
let (connection, mut window, bytes_read) = try_ready!(read_f.poll());
if bytes_read == 0 {
return Err(io::Error::new(io::ErrorKind::Other, "Early EOF from proxy")); return Err(io::Error::new(io::ErrorKind::Other, "Early EOF from proxy"));
} }
let data_end = window.start() + bytes_read;
let buf = window.get_ref()[0..data_end].to_vec();
let mut headers = [httparse::EMPTY_HEADER; 16]; let mut headers = [httparse::EMPTY_HEADER; 16];
let mut response = httparse::Response::new(&mut headers); let mut response = httparse::Response::new(&mut headers);
let status = match response.parse(&buf) {
Ok(status) => status,
Err(err) => {
return Err(io::Error::new(io::ErrorKind::Other, err.to_string()));
}
};
if status.is_complete() { response
if let Some(code) = response.code { .parse(&buffer[..])
if code == 200 { .map_err(|err| io::Error::new(io::ErrorKind::Other, err.to_string()))?;
// Proxy says all is well
return Ok(Async::Ready(connection)); match response.code {
} else { Some(200) => Ok(connection), // Proxy says all is well
Some(code) => {
let reason = response.reason.unwrap_or("no reason"); let reason = response.reason.unwrap_or("no reason");
let msg = format!("Proxy responded with {}: {}", code, reason); let msg = format!("Proxy responded with {}: {}", code, reason);
Err(io::Error::new(io::ErrorKind::Other, msg))
return Err(io::Error::new(io::ErrorKind::Other, msg));
} }
} else { None => Err(io::Error::new(
return Err(io::Error::new(
io::ErrorKind::Other, io::ErrorKind::Other,
"Malformed response from proxy", "Malformed response from proxy",
)); )),
}
} else {
if data_end >= window.end() {
// Allocate some more buffer space
let newsize = data_end + 100;
window.get_mut().resize(newsize, 0);
window.set_end(newsize);
}
// We did not get a full header
window.set_start(data_end);
let read = read(connection, window);
ProxyResponse(read)
}
}
}
}
} }
} }
fn proxy_connect<T: AsyncWrite>(connection: T, connect_url: &str) -> WriteAll<T, Vec<u8>> {
let uri = Uri::from_str(connect_url).unwrap();
let buffer = format!(
"CONNECT {0}:{1} HTTP/1.1\r\n\
\r\n",
uri.host().expect(&format!("No host in {}", uri)),
uri.port().expect(&format!("No port in {}", uri))
)
.into_bytes();
write_all(connection, buffer)
}

View file

@ -1,20 +1,20 @@
use std::io;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock, Weak}; use std::sync::{Arc, RwLock, Weak};
use std::task::Poll;
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use std::{io, pin::Pin, task::Context};
use once_cell::sync::OnceCell;
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes; use bytes::Bytes;
use futures::sync::mpsc; use futures::{channel::mpsc, Future, FutureExt, StreamExt, TryStream, TryStreamExt};
use futures::{Async, Future, IntoFuture, Poll, Stream};
use tokio_core::reactor::{Handle, Remote};
use crate::apresolve::apresolve_or_fallback; use crate::apresolve::apresolve_or_fallback;
use crate::audio_key::AudioKeyManager; use crate::audio_key::AudioKeyManager;
use crate::authentication::Credentials; use crate::authentication::Credentials;
use crate::cache::Cache; use crate::cache::Cache;
use crate::channel::ChannelManager; use crate::channel::ChannelManager;
use crate::component::Lazy;
use crate::config::SessionConfig; use crate::config::SessionConfig;
use crate::connection; use crate::connection;
use crate::mercury::MercuryManager; use crate::mercury::MercuryManager;
@ -32,13 +32,11 @@ struct SessionInternal {
tx_connection: mpsc::UnboundedSender<(u8, Vec<u8>)>, tx_connection: mpsc::UnboundedSender<(u8, Vec<u8>)>,
audio_key: Lazy<AudioKeyManager>, audio_key: OnceCell<AudioKeyManager>,
channel: Lazy<ChannelManager>, channel: OnceCell<ChannelManager>,
mercury: Lazy<MercuryManager>, mercury: OnceCell<MercuryManager>,
cache: Option<Arc<Cache>>, cache: Option<Arc<Cache>>,
handle: Remote,
session_id: usize, session_id: usize,
} }
@ -48,58 +46,34 @@ static SESSION_COUNTER: AtomicUsize = AtomicUsize::new(0);
pub struct Session(Arc<SessionInternal>); pub struct Session(Arc<SessionInternal>);
impl Session { impl Session {
pub fn connect( pub async fn connect(
config: SessionConfig, config: SessionConfig,
credentials: Credentials, credentials: Credentials,
cache: Option<Cache>, cache: Option<Cache>,
handle: Handle, ) -> io::Result<Session> {
) -> Box<dyn Future<Item = Session, Error = io::Error>> { let ap = apresolve_or_fallback(&config.proxy, &config.ap_port).await;
let access_point =
apresolve_or_fallback::<io::Error>(&handle, &config.proxy, &config.ap_port);
let handle_ = handle.clone(); info!("Connecting to AP \"{}\"", ap);
let proxy = config.proxy.clone(); let mut conn = connection::connect(ap, &config.proxy).await?;
let connection = access_point.and_then(move |addr| {
info!("Connecting to AP \"{}\"", addr);
connection::connect(addr, &handle_, &proxy)
});
let device_id = config.device_id.clone(); let reusable_credentials =
let authentication = connection.and_then(move |connection| { connection::authenticate(&mut conn, credentials, &config.device_id).await?;
connection::authenticate(connection, credentials, device_id)
});
let result = authentication.map(move |(transport, reusable_credentials)| {
info!("Authenticated as \"{}\" !", reusable_credentials.username); info!("Authenticated as \"{}\" !", reusable_credentials.username);
if let Some(ref cache) = cache { if let Some(cache) = &cache {
cache.save_credentials(&reusable_credentials); cache.save_credentials(&reusable_credentials);
} }
let (session, task) = Session::create( let session = Session::create(conn, config, cache, reusable_credentials.username);
&handle,
transport,
config,
cache,
reusable_credentials.username.clone(),
);
handle.spawn(task.map_err(|e| { Ok(session)
error!("{:?}", e);
}));
session
});
Box::new(result)
} }
fn create( fn create(
handle: &Handle,
transport: connection::Transport, transport: connection::Transport,
config: SessionConfig, config: SessionConfig,
cache: Option<Cache>, cache: Option<Cache>,
username: String, username: String,
) -> (Session, Box<dyn Future<Item = (), Error = io::Error>>) { ) -> Session {
let (sink, stream) = transport.split(); let (sink, stream) = transport.split();
let (sender_tx, sender_rx) = mpsc::unbounded(); let (sender_tx, sender_rx) = mpsc::unbounded();
@ -120,53 +94,50 @@ impl Session {
cache: cache.map(Arc::new), cache: cache.map(Arc::new),
audio_key: Lazy::new(), audio_key: OnceCell::new(),
channel: Lazy::new(), channel: OnceCell::new(),
mercury: Lazy::new(), mercury: OnceCell::new(),
handle: handle.remote().clone(),
session_id: session_id, session_id: session_id,
})); }));
let sender_task = sender_rx let sender_task = sender_rx.map(Ok::<_, io::Error>).forward(sink);
.map_err(|e| -> io::Error { panic!(e) })
.forward(sink)
.map(|_| ());
let receiver_task = DispatchTask(stream, session.weak()); let receiver_task = DispatchTask(stream, session.weak());
let task = Box::new( let task =
(receiver_task, sender_task) futures::future::join(sender_task, receiver_task).map(|_| io::Result::<_>::Ok(()));
.into_future() tokio::spawn(task);
.map(|((), ())| ()), session
);
(session, task)
} }
pub fn audio_key(&self) -> &AudioKeyManager { pub fn audio_key(&self) -> &AudioKeyManager {
self.0.audio_key.get(|| AudioKeyManager::new(self.weak())) self.0
.audio_key
.get_or_init(|| AudioKeyManager::new(self.weak()))
} }
pub fn channel(&self) -> &ChannelManager { pub fn channel(&self) -> &ChannelManager {
self.0.channel.get(|| ChannelManager::new(self.weak())) self.0
.channel
.get_or_init(|| ChannelManager::new(self.weak()))
} }
pub fn mercury(&self) -> &MercuryManager { pub fn mercury(&self) -> &MercuryManager {
self.0.mercury.get(|| MercuryManager::new(self.weak())) self.0
.mercury
.get_or_init(|| MercuryManager::new(self.weak()))
} }
pub fn time_delta(&self) -> i64 { pub fn time_delta(&self) -> i64 {
self.0.data.read().unwrap().time_delta self.0.data.read().unwrap().time_delta
} }
pub fn spawn<F, R>(&self, f: F) pub fn spawn<T>(&self, task: T)
where where
F: FnOnce(&Handle) -> R + Send + 'static, T: Future + Send + 'static,
R: IntoFuture<Item = (), Error = ()>, T::Output: Send + 'static,
R::Future: 'static,
{ {
self.0.handle.spawn(f) tokio::spawn(task);
} }
fn debug_info(&self) { fn debug_info(&self) {
@ -178,7 +149,7 @@ impl Session {
); );
} }
#[cfg_attr(feature = "cargo-clippy", allow(match_same_arms))] #[allow(clippy::match_same_arms)]
fn dispatch(&self, cmd: u8, data: Bytes) { fn dispatch(&self, cmd: u8, data: Bytes) {
match cmd { match cmd {
0x4 => { 0x4 => {
@ -273,35 +244,34 @@ impl Drop for SessionInternal {
struct DispatchTask<S>(S, SessionWeak) struct DispatchTask<S>(S, SessionWeak)
where where
S: Stream<Item = (u8, Bytes)>; S: TryStream<Ok = (u8, Bytes)> + Unpin;
impl<S> Future for DispatchTask<S> impl<S> Future for DispatchTask<S>
where where
S: Stream<Item = (u8, Bytes)>, S: TryStream<Ok = (u8, Bytes)> + Unpin,
<S as Stream>::Error: ::std::fmt::Debug, <S as TryStream>::Ok: std::fmt::Debug,
{ {
type Item = (); type Output = Result<(), S::Error>;
type Error = S::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let session = match self.1.try_upgrade() { let session = match self.1.try_upgrade() {
Some(session) => session, Some(session) => session,
None => return Ok(Async::Ready(())), None => return Poll::Ready(Ok(())),
}; };
loop { loop {
let (cmd, data) = match self.0.poll() { let (cmd, data) = match self.0.try_poll_next_unpin(cx) {
Ok(Async::Ready(Some(t))) => t, Poll::Ready(Some(Ok(t))) => t,
Ok(Async::Ready(None)) => { Poll::Ready(None) => {
warn!("Connection to server closed."); warn!("Connection to server closed.");
session.shutdown(); session.shutdown();
return Ok(Async::Ready(())); return Poll::Ready(Ok(()));
} }
Ok(Async::NotReady) => return Ok(Async::NotReady), Poll::Ready(Some(Err(e))) => {
Err(e) => {
session.shutdown(); session.shutdown();
return Err(From::from(e)); return Poll::Ready(Err(e));
} }
Poll::Pending => return Poll::Pending,
}; };
session.dispatch(cmd, data); session.dispatch(cmd, data);
@ -311,7 +281,7 @@ where
impl<S> Drop for DispatchTask<S> impl<S> Drop for DispatchTask<S>
where where
S: Stream<Item = (u8, Bytes)>, S: TryStream<Ok = (u8, Bytes)> + Unpin,
{ {
fn drop(&mut self) { fn drop(&mut self) {
debug!("drop Dispatch"); debug!("drop Dispatch");