WIP Futures

Fix apresolve

WIP session

[Core] More migration

Playing with `ReadExact` and `WriteAll`

Add some simple checks

Take little steps
This commit is contained in:
ashthespy 2021-01-23 22:21:42 +00:00
parent c69ccf77e9
commit 47a1575c00
16 changed files with 431 additions and 373 deletions

View file

@ -14,7 +14,8 @@ version = "0.1.3"
bit-set = "0.5" bit-set = "0.5"
byteorder = "1.3" byteorder = "1.3"
bytes = "0.4" bytes = "0.4"
futures = "0.1" futures = "0.3"
tokio = { version = "0.2", features = ["full"] } # Temp "rt-core", "sync"
lewton = "0.9" lewton = "0.9"
log = "0.4" log = "0.4"
num-bigint = "0.3" num-bigint = "0.3"

View file

@ -1,9 +1,6 @@
use crate::range_set::{Range, RangeSet}; use crate::range_set::{Range, RangeSet};
use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use bytes::Bytes; use bytes::Bytes;
use futures::sync::{mpsc, oneshot};
use futures::Stream;
use futures::{Async, Future, Poll};
use std::cmp::{max, min}; use std::cmp::{max, min};
use std::fs; use std::fs;
use std::io::{self, Read, Seek, SeekFrom, Write}; use std::io::{self, Read, Seek, SeekFrom, Write};
@ -11,13 +8,23 @@ use std::sync::{Arc, Condvar, Mutex};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
use futures::sync::mpsc::unbounded;
use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders}; use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders};
use librespot_core::session::Session; use librespot_core::session::Session;
use librespot_core::spotify_id::FileId; use librespot_core::spotify_id::FileId;
use std::sync::atomic; use std::sync::atomic;
use std::sync::atomic::AtomicUsize; use std::sync::atomic::AtomicUsize;
use futures::{
channel::{mpsc, mpsc::unbounded, oneshot},
ready, Future, Stream,
};
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::task;
const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16; const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16;
// The minimum size of a block that is requested from the Spotify servers in one request. // The minimum size of a block that is requested from the Spotify servers in one request.
// This is the block size that is typically requested while doing a seek() on a file. // This is the block size that is typically requested while doing a seek() on a file.
@ -329,6 +336,7 @@ impl AudioFileOpenStreaming {
complete_tx, complete_tx,
); );
self.session.spawn(fetcher); self.session.spawn(fetcher);
// tokio::spawn(move |_| fetcher);
AudioFileStreaming { AudioFileStreaming {
read_file: read_file, read_file: read_file,
@ -343,36 +351,37 @@ impl AudioFileOpenStreaming {
} }
impl Future for AudioFileOpen { impl Future for AudioFileOpen {
type Item = AudioFile; type Output = Result<AudioFile, ChannelError>;
type Error = ChannelError;
fn poll(&mut self) -> Poll<AudioFile, ChannelError> { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<AudioFile, ChannelError>> {
match *self { match *self {
AudioFileOpen::Streaming(ref mut open) => { AudioFileOpen::Streaming(ref mut open) => {
let file = try_ready!(open.poll()); let file = ready!(open.poll());
Ok(Async::Ready(AudioFile::Streaming(file))) Poll::Ready(Ok(AudioFile::Streaming(file)))
} }
AudioFileOpen::Cached(ref mut file) => { AudioFileOpen::Cached(ref mut file) => {
let file = file.take().unwrap(); let file = file.take().unwrap();
Ok(Async::Ready(AudioFile::Cached(file))) Poll::Ready(Ok(AudioFile::Cached(file)))
} }
} }
} }
} }
impl Future for AudioFileOpenStreaming { impl Future for AudioFileOpenStreaming {
type Item = AudioFileStreaming; type Output = Result<AudioFileStreaming, ChannelError>;
type Error = ChannelError;
fn poll(&mut self) -> Poll<AudioFileStreaming, ChannelError> { fn poll(
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Result<AudioFileStreaming, ChannelError>> {
loop { loop {
let (id, data) = try_ready!(self.headers.poll()).unwrap(); let (id, data) = ready!(self.headers.poll()).unwrap();
if id == 0x3 { if id == 0x3 {
let size = BigEndian::read_u32(&data) as usize * 4; let size = BigEndian::read_u32(&data) as usize * 4;
let file = self.finish(size); let file = self.finish(size);
return Ok(Async::Ready(file)); return Poll::Ready(Ok(file));
} }
} }
} }
@ -563,13 +572,12 @@ impl AudioFileFetchDataReceiver {
} }
impl Future for AudioFileFetchDataReceiver { impl Future for AudioFileFetchDataReceiver {
type Item = (); type Output = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
loop { loop {
match self.data_rx.poll() { match self.data_rx.poll() {
Ok(Async::Ready(Some(data))) => { Poll::Ready(Some(data)) => {
if self.measure_ping_time { if self.measure_ping_time {
if let Some(request_sent_time) = self.request_sent_time { if let Some(request_sent_time) = self.request_sent_time {
let duration = Instant::now() - request_sent_time; let duration = Instant::now() - request_sent_time;
@ -603,26 +611,24 @@ impl Future for AudioFileFetchDataReceiver {
} }
if self.request_length == 0 { if self.request_length == 0 {
self.finish(); self.finish();
return Ok(Async::Ready(())); return Poll::Ready(());
} }
} }
Ok(Async::Ready(None)) => { Poll::Ready(None) => {
if self.request_length > 0 { if self.request_length > 0 {
warn!("Data receiver for range {} (+{}) received less data from server than requested.", self.initial_data_offset, self.initial_request_length); warn!("Data receiver for range {} (+{}) received less data from server than requested.", self.initial_data_offset, self.initial_request_length);
} }
self.finish(); self.finish();
return Ok(Async::Ready(())); return Poll::Ready(());
}
Ok(Async::NotReady) => {
return Ok(Async::NotReady);
} }
Poll::Pending => return Poll::Pending,
Err(ChannelError) => { Err(ChannelError) => {
warn!( warn!(
"Error from channel for data receiver for range {} (+{}).", "Error from channel for data receiver for range {} (+{}).",
self.initial_data_offset, self.initial_request_length self.initial_data_offset, self.initial_request_length
); );
self.finish(); self.finish();
return Ok(Async::Ready(())); return Poll::Ready(());
} }
} }
} }
@ -672,6 +678,7 @@ impl AudioFileFetch {
); );
session.spawn(initial_data_receiver); session.spawn(initial_data_receiver);
// tokio::spawn(move |_| initial_data_receiver);
AudioFileFetch { AudioFileFetch {
session: session, session: session,
@ -747,6 +754,7 @@ impl AudioFileFetch {
); );
self.session.spawn(receiver); self.session.spawn(receiver);
// tokio::spawn(move |_| receiver);
} }
} }
@ -794,13 +802,11 @@ impl AudioFileFetch {
} }
} }
fn poll_file_data_rx(&mut self) -> Poll<(), ()> { fn poll_file_data_rx(&mut self) -> Poll<()> {
loop { loop {
match self.file_data_rx.poll() { match self.file_data_rx.poll() {
Ok(Async::Ready(None)) => { Poll::Ready(None) => return Poll::Ready(()),
return Ok(Async::Ready(())); Poll::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms))) => {
}
Ok(Async::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms)))) => {
trace!("Ping time estimated as: {} ms.", response_time_ms); trace!("Ping time estimated as: {} ms.", response_time_ms);
// record the response time // record the response time
@ -832,7 +838,7 @@ impl AudioFileFetch {
.ping_time_ms .ping_time_ms
.store(ping_time_ms, atomic::Ordering::Relaxed); .store(ping_time_ms, atomic::Ordering::Relaxed);
} }
Ok(Async::Ready(Some(ReceivedData::Data(data)))) => { Poll::Ready(Some(ReceivedData::Data(data))) => {
self.output self.output
.as_mut() .as_mut()
.unwrap() .unwrap()
@ -864,39 +870,34 @@ impl AudioFileFetch {
if full { if full {
self.finish(); self.finish();
return Ok(Async::Ready(())); return Poll::Ready(());
} }
} }
Ok(Async::NotReady) => { Poll::Pending => return Poll::Pending,
return Ok(Async::NotReady); // Err(()) => unreachable!(),
}
Err(()) => unreachable!(),
} }
} }
} }
fn poll_stream_loader_command_rx(&mut self) -> Poll<(), ()> { fn poll_stream_loader_command_rx(&mut self) -> Poll<()> {
loop { loop {
match self.stream_loader_command_rx.poll() { match self.stream_loader_command_rx.poll() {
Ok(Async::Ready(None)) => { Poll::Ready(None) => return Poll::Ready(()),
return Ok(Async::Ready(()));
} Poll::Ready(Some(StreamLoaderCommand::Fetch(request))) => {
Ok(Async::Ready(Some(StreamLoaderCommand::Fetch(request)))) => {
self.download_range(request.start, request.length); self.download_range(request.start, request.length);
} }
Ok(Async::Ready(Some(StreamLoaderCommand::RandomAccessMode()))) => { Poll::Ready(Some(StreamLoaderCommand::RandomAccessMode())) => {
*(self.shared.download_strategy.lock().unwrap()) = *(self.shared.download_strategy.lock().unwrap()) =
DownloadStrategy::RandomAccess(); DownloadStrategy::RandomAccess();
} }
Ok(Async::Ready(Some(StreamLoaderCommand::StreamMode()))) => { Poll::Ready(Some(StreamLoaderCommand::StreamMode())) => {
*(self.shared.download_strategy.lock().unwrap()) = *(self.shared.download_strategy.lock().unwrap()) =
DownloadStrategy::Streaming(); DownloadStrategy::Streaming();
} }
Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => { Poll::Ready(Some(StreamLoaderCommand::Close())) => return Poll::Ready(()),
return Ok(Async::Ready(())); Poll::Pending => return Poll::Pending,
} // Err(()) => unreachable!(),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(()) => unreachable!(),
} }
} }
} }
@ -911,24 +912,19 @@ impl AudioFileFetch {
} }
impl Future for AudioFileFetch { impl Future for AudioFileFetch {
type Item = (); type Output = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> {
match self.poll_stream_loader_command_rx() { match self.poll_stream_loader_command_rx() {
Ok(Async::NotReady) => (), Poll::Pending => (),
Ok(Async::Ready(_)) => { Poll::Ready(_) => return Poll::Ready(()),
return Ok(Async::Ready(())); // Err(()) => unreachable!(),
}
Err(()) => unreachable!(),
} }
match self.poll_file_data_rx() { match self.poll_file_data_rx() {
Ok(Async::NotReady) => (), Poll::Pending => (),
Ok(Async::Ready(_)) => { Poll::Ready(_) => return Poll::Ready(()),
return Ok(Async::Ready(())); // Err(()) => unreachable!(),
}
Err(()) => unreachable!(),
} }
if let DownloadStrategy::Streaming() = self.get_download_strategy() { if let DownloadStrategy::Streaming() = self.get_download_strategy() {
@ -969,7 +965,7 @@ impl Future for AudioFileFetch {
} }
} }
return Ok(Async::NotReady); return Poll::Pending;
} }
} }

View file

@ -17,10 +17,10 @@ base64 = "0.13"
byteorder = "1.3" byteorder = "1.3"
bytes = "0.4" bytes = "0.4"
error-chain = { version = "0.12", default_features = false } error-chain = { version = "0.12", default_features = false }
futures = "0.1" futures = {version = "0.3",features =["unstable","bilock"]}
httparse = "1.3" httparse = "1.3"
hyper = "0.12" hyper = "0.13"
hyper-proxy = { version = "0.5", default_features = false } hyper-proxy = { version = "0.6", default_features = false }
lazy_static = "1.3" lazy_static = "1.3"
log = "0.4" log = "0.4"
num-bigint = "0.3" num-bigint = "0.3"
@ -32,9 +32,10 @@ serde = "1.0"
serde_derive = "1.0" serde_derive = "1.0"
serde_json = "1.0" serde_json = "1.0"
shannon = "0.2.0" shannon = "0.2.0"
tokio-codec = "0.1" tokio = {version = "0.2", features = ["full","io-util","tcp"]} # io-util
tokio = "0.1" tokio-util = {version = "0.3", features = ["compat","codec"]}
tokio-io = "0.1" # tokio-codec = "0.1"
# tokio-io = "0.1"
url = "1.7" url = "1.7"
uuid = { version = "0.8", features = ["v4"] } uuid = { version = "0.8", features = ["v4"] }
sha-1 = "0.8" sha-1 = "0.8"

View file

@ -1,41 +1,33 @@
const AP_FALLBACK: &'static str = "ap.spotify.com:443"; const AP_FALLBACK: &'static str = "ap.spotify.com:443";
const APRESOLVE_ENDPOINT: &'static str = "http://apresolve.spotify.com/"; const APRESOLVE_ENDPOINT: &'static str = "http://apresolve.spotify.com/";
use futures::{Future, Stream};
use hyper::client::HttpConnector; use hyper::client::HttpConnector;
use hyper::{self, Client, Request, Uri}; use hyper::{self, Body, Client, Request, Uri};
use hyper_proxy::{Intercept, Proxy, ProxyConnector}; use hyper_proxy::{Intercept, Proxy, ProxyConnector};
use serde_json; use serde_json;
use std::error;
use std::str::FromStr; use std::str::FromStr;
use url::Url; use url::Url;
error_chain! {}
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct APResolveData { pub struct APResolveData {
ap_list: Vec<String>, ap_list: Vec<String>,
} }
type Result<T> = std::result::Result<T, Box<dyn error::Error>>;
fn apresolve( async fn apresolve(proxy: &Option<Url>, ap_port: &Option<u16>) -> Result<String> {
proxy: &Option<Url>, let url = Uri::from_str(APRESOLVE_ENDPOINT)?; //.expect("invalid AP resolve URL");
ap_port: &Option<u16>,
) -> Box<dyn Future<Item = String, Error = Error>> {
let url = Uri::from_str(APRESOLVE_ENDPOINT).expect("invalid AP resolve URL");
let use_proxy = proxy.is_some(); let use_proxy = proxy.is_some();
// let mut req = Request::new(url.clone()); let mut req = Request::get(&url).body(Body::empty())?;
let mut req = Request::get(url.clone())
.body(hyper::Body::from(vec![]))
.unwrap();
let response = match *proxy { let response = match *proxy {
Some(ref val) => { Some(ref val) => {
let proxy_url = Uri::from_str(val.as_str()).expect("invalid http proxy"); let proxy_url = Uri::from_str(val.as_str()).expect("invalid http proxy");
let proxy = Proxy::new(Intercept::All, proxy_url); let proxy = Proxy::new(Intercept::All, proxy_url);
let connector = HttpConnector::new(4); let connector = HttpConnector::new();
let proxy_connector = ProxyConnector::from_proxy_unsecured(connector, proxy); let proxy_connector = ProxyConnector::from_proxy_unsecured(connector, proxy);
if let Some(headers) = proxy_connector.http_headers(&url) { if let Some(headers) = proxy_connector.http_headers(&url) {
req.headers_mut().extend(headers.clone().into_iter()); req.headers_mut().extend(headers.clone().into_iter());
// req.set_proxy(true);
} }
let client = Client::builder().build(proxy_connector); let client = Client::builder().build(proxy_connector);
client.request(req) client.request(req)
@ -44,29 +36,19 @@ fn apresolve(
let client = Client::new(); let client = Client::new();
client.request(req) client.request(req)
} }
}; }
.await?;
let body = response.and_then(|response| { let body = hyper::body::to_bytes(response.into_body()).await?;
response.into_body().fold(Vec::new(), |mut acc, chunk| { let body = String::from_utf8(body.to_vec())?;
acc.extend_from_slice(chunk.as_ref()); let data = serde_json::from_str::<APResolveData>(&body)?;
Ok::<_, hyper::Error>(acc)
})
});
let body = body.then(|result| result.chain_err(|| "HTTP error"));
let body =
body.and_then(|body| String::from_utf8(body).chain_err(|| "invalid UTF8 in response"));
let data = body let ap = {
.and_then(|body| serde_json::from_str::<APResolveData>(&body).chain_err(|| "invalid JSON"));
let p = ap_port.clone();
let ap = data.and_then(move |data| {
let mut aps = data.ap_list.iter().filter(|ap| { let mut aps = data.ap_list.iter().filter(|ap| {
if p.is_some() { if let Some(p) = ap_port {
Uri::from_str(ap).ok().map_or(false, |uri| { Uri::from_str(ap)
uri.port_u16().map_or(false, |port| port == p.unwrap()) .ok()
}) .map_or(false, |uri| uri.port_u16().map_or(false, |port| &port == p))
} else if use_proxy { } else if use_proxy {
// It is unlikely that the proxy will accept CONNECT on anything other than 443. // It is unlikely that the proxy will accept CONNECT on anything other than 443.
Uri::from_str(ap).ok().map_or(false, |uri| { Uri::from_str(ap).ok().map_or(false, |uri| {
@ -79,23 +61,23 @@ fn apresolve(
let ap = aps.next().ok_or("empty AP List")?; let ap = aps.next().ok_or("empty AP List")?;
Ok(ap.clone()) Ok(ap.clone())
}); };
Box::new(ap) ap
} }
pub(crate) fn apresolve_or_fallback<E>( pub(crate) async fn apresolve_or_fallback<E>(
proxy: &Option<Url>, proxy: &Option<Url>,
ap_port: &Option<u16>, ap_port: &Option<u16>,
) -> Box<dyn Future<Item = String, Error = E>> ) -> Result<String> {
where // match apresolve.await {
E: 'static, // Ok(ap)
{ // }
let ap = apresolve(proxy, ap_port).or_else(|e| { let ap = apresolve(proxy, ap_port).await.or_else(|e| {
warn!("Failed to resolve Access Point: {}", e.description()); warn!("Failed to resolve Access Point: {:?}", e);
warn!("Using fallback \"{}\"", AP_FALLBACK); warn!("Using fallback \"{}\"", AP_FALLBACK);
Ok(AP_FALLBACK.into()) Ok(AP_FALLBACK.into())
}); });
Box::new(ap) ap
} }

View file

@ -1,10 +1,14 @@
use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use bytes::Bytes; use bytes::Bytes;
use futures::sync::oneshot;
use futures::{Async, Future, Poll};
use std::collections::HashMap; use std::collections::HashMap;
use std::io::Write; use std::io::Write;
use futures::{channel::oneshot, Future};
use std::{
pin::Pin,
task::{Context, Poll},
};
use crate::spotify_id::{FileId, SpotifyId}; use crate::spotify_id::{FileId, SpotifyId};
use crate::util::SeqGenerator; use crate::util::SeqGenerator;
@ -73,14 +77,13 @@ impl AudioKeyManager {
pub struct AudioKeyFuture<T>(oneshot::Receiver<Result<T, AudioKeyError>>); pub struct AudioKeyFuture<T>(oneshot::Receiver<Result<T, AudioKeyError>>);
impl<T> Future for AudioKeyFuture<T> { impl<T> Future for AudioKeyFuture<T> {
type Item = T; type Output = Result<T, AudioKeyError>;
type Error = AudioKeyError;
fn poll(&mut self) -> Poll<T, AudioKeyError> { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
match self.0.poll() { match self.0.poll() {
Ok(Async::Ready(Ok(value))) => Ok(Async::Ready(value)), Poll::Ready(Ok(Ok(value))) => Poll::Ready(Ok(value)),
Ok(Async::Ready(Err(err))) => Err(err), Poll::Ready(Ok(Err(err))) => Err(err),
Ok(Async::NotReady) => Ok(Async::NotReady), Poll::Pending => Poll::Pending,
Err(oneshot::Canceled) => Err(AudioKeyError), Err(oneshot::Canceled) => Err(AudioKeyError),
} }
} }

View file

@ -1,12 +1,16 @@
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes; use bytes::Bytes;
use futures::sync::{mpsc, BiLock};
use futures::{Async, Poll, Stream};
use std::collections::HashMap; use std::collections::HashMap;
use std::time::Instant; use std::time::Instant;
use crate::util::SeqGenerator; use crate::util::SeqGenerator;
use futures::{channel::mpsc, lock::BiLock, Stream};
use std::{
pin::Pin,
task::{Context, Poll},
};
component! { component! {
ChannelManager : ChannelManagerInner { ChannelManager : ChannelManagerInner {
sequence: SeqGenerator<u16> = SeqGenerator::new(0), sequence: SeqGenerator<u16> = SeqGenerator::new(0),
@ -101,11 +105,11 @@ impl ChannelManager {
} }
impl Channel { impl Channel {
fn recv_packet(&mut self) -> Poll<Bytes, ChannelError> { fn recv_packet(&mut self) -> Poll<Result<Bytes, ChannelError>> {
let (cmd, packet) = match self.receiver.poll() { let (cmd, packet) = match self.receiver.poll() {
Ok(Async::Ready(Some(t))) => t, Poll::Ready(Ok(Some(t))) => t,
Ok(Async::Ready(None)) => return Err(ChannelError), // The channel has been closed. Poll::Ready(Ok(t)) => return Err(ChannelError), // The channel has been closed.
Ok(Async::NotReady) => return Ok(Async::NotReady), Poll::Pending => return Poll::Pending,
Err(()) => unreachable!(), Err(()) => unreachable!(),
}; };
@ -117,7 +121,7 @@ impl Channel {
Err(ChannelError) Err(ChannelError)
} else { } else {
Ok(Async::Ready(packet)) Poll::Ready(Ok(packet))
} }
} }
@ -129,16 +133,15 @@ impl Channel {
} }
impl Stream for Channel { impl Stream for Channel {
type Item = ChannelEvent; type Item = Result<Option<ChannelEvent>, ChannelError>;
type Error = ChannelError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop { loop {
match self.state.clone() { match self.state.clone() {
ChannelState::Closed => panic!("Polling already terminated channel"), ChannelState::Closed => panic!("Polling already terminated channel"),
ChannelState::Header(mut data) => { ChannelState::Header(mut data) => {
if data.len() == 0 { if data.len() == 0 {
data = try_ready!(self.recv_packet()); data = ready!(self.recv_packet());
} }
let length = BigEndian::read_u16(data.split_to(2).as_ref()) as usize; let length = BigEndian::read_u16(data.split_to(2).as_ref()) as usize;
@ -152,19 +155,19 @@ impl Stream for Channel {
self.state = ChannelState::Header(data); self.state = ChannelState::Header(data);
let event = ChannelEvent::Header(header_id, header_data); let event = ChannelEvent::Header(header_id, header_data);
return Ok(Async::Ready(Some(event))); return Poll::Ready(Ok(Some(event)));
} }
} }
ChannelState::Data => { ChannelState::Data => {
let data = try_ready!(self.recv_packet()); let data = ready!(self.recv_packet());
if data.len() == 0 { if data.len() == 0 {
self.receiver.close(); self.receiver.close();
self.state = ChannelState::Closed; self.state = ChannelState::Closed;
return Ok(Async::Ready(None)); return Poll::Ready(Ok(None));
} else { } else {
let event = ChannelEvent::Data(data); let event = ChannelEvent::Data(data);
return Ok(Async::Ready(Some(event))); return Poll::Ready(Ok(Some(event)));
} }
} }
} }
@ -173,38 +176,36 @@ impl Stream for Channel {
} }
impl Stream for ChannelData { impl Stream for ChannelData {
type Item = Bytes; type Item = Result<Option<Bytes>, ChannelError>;
type Error = ChannelError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut channel = match self.0.poll_lock() { let mut channel = match self.0.poll_lock() {
Async::Ready(c) => c, Poll::Ready(c) => c,
Async::NotReady => return Ok(Async::NotReady), Poll::Pending => return Poll::Pending,
}; };
loop { loop {
match try_ready!(channel.poll()) { match ready!(channel.poll()) {
Some(ChannelEvent::Header(..)) => (), Some(ChannelEvent::Header(..)) => (),
Some(ChannelEvent::Data(data)) => return Ok(Async::Ready(Some(data))), Some(ChannelEvent::Data(data)) => return Poll::Ready(Ok(Some(data))),
None => return Ok(Async::Ready(None)), None => return Poll::Ready(Ok(None)),
} }
} }
} }
} }
impl Stream for ChannelHeaders { impl Stream for ChannelHeaders {
type Item = (u8, Vec<u8>); type Item = Result<Option<(u8, Vec<u8>)>, ChannelError>;
type Error = ChannelError;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let mut channel = match self.0.poll_lock() { let mut channel = match self.0.poll_lock() {
Async::Ready(c) => c, Poll::Ready(c) => c,
Async::NotReady => return Ok(Async::NotReady), Poll::Pending => return Poll::Pending,
}; };
match try_ready!(channel.poll()) { match ready!(channel.poll()) {
Some(ChannelEvent::Header(id, data)) => Ok(Async::Ready(Some((id, data)))), Some(ChannelEvent::Header(id, data)) => Poll::Ready(Ok(Some((id, data)))),
Some(ChannelEvent::Data(..)) | None => Ok(Async::Ready(None)), Some(ChannelEvent::Data(..)) | None => Poll::Ready(Ok(None)),
} }
} }
} }

View file

@ -2,7 +2,7 @@ use byteorder::{BigEndian, ByteOrder};
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use shannon::Shannon; use shannon::Shannon;
use std::io; use std::io;
use tokio_io::codec::{Decoder, Encoder}; use tokio_util::codec::{Decoder, Encoder};
const HEADER_SIZE: usize = 3; const HEADER_SIZE: usize = 3;
const MAC_SIZE: usize = 4; const MAC_SIZE: usize = 4;
@ -35,11 +35,11 @@ impl APCodec {
} }
} }
impl Encoder for APCodec { type APCodecItem = (u8, Vec<u8>);
type Item = (u8, Vec<u8>); impl Encoder<APCodecItem> for APCodec {
type Error = io::Error; type Error = io::Error;
fn encode(&mut self, item: (u8, Vec<u8>), buf: &mut BytesMut) -> io::Result<()> { fn encode(&mut self, item: APCodecItem, buf: &mut BytesMut) -> io::Result<()> {
let (cmd, payload) = item; let (cmd, payload) = item;
let offset = buf.len(); let offset = buf.len();

View file

@ -1,14 +1,13 @@
use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use futures::{Async, Future, Poll};
use hmac::{Hmac, Mac}; use hmac::{Hmac, Mac};
use protobuf::{self, Message}; use protobuf::{self, Message};
use rand::thread_rng; use rand::thread_rng;
use sha1::Sha1; use sha1::Sha1;
use std::io::{self, Read}; use std::io::{self, Read};
use std::marker::PhantomData; use std::marker::PhantomData;
use tokio_codec::{Decoder, Framed}; // use tokio_codec::{Decoder, Framed};
use tokio_io::io::{read_exact, write_all, ReadExact, Window, WriteAll}; // use tokio_io::io::{read_exact, write_all, ReadExact, Window, WriteAll};
use tokio_io::{AsyncRead, AsyncWrite}; // use tokio_io::{AsyncRead, AsyncWrite};
use super::codec::APCodec; use super::codec::APCodec;
use crate::diffie_hellman::DHLocalKeys; use crate::diffie_hellman::DHLocalKeys;
@ -16,18 +15,30 @@ use crate::protocol;
use crate::protocol::keyexchange::{APResponseMessage, ClientHello, ClientResponsePlaintext}; use crate::protocol::keyexchange::{APResponseMessage, ClientHello, ClientResponsePlaintext};
use crate::util; use crate::util;
pub struct Handshake<T> { use futures::{
io::{ReadExact, Window, WriteAll},
Future,
};
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
use tokio_util::codec::{Decoder, Framed};
pub struct Handshake<'a, T> {
keys: DHLocalKeys, keys: DHLocalKeys,
state: HandshakeState<T>, state: HandshakeState<'a, T>,
} }
enum HandshakeState<T> { enum HandshakeState<'a, T> {
ClientHello(WriteAll<T, Vec<u8>>), ClientHello(WriteAll<'a, T>),
APResponse(RecvPacket<T, APResponseMessage>), APResponse(RecvPacket<'a, T, APResponseMessage>),
ClientResponse(Option<APCodec>, WriteAll<T, Vec<u8>>), ClientResponse(Option<APCodec>, WriteAll<'a, T>),
} }
pub fn handshake<T: AsyncRead + AsyncWrite>(connection: T) -> Handshake<T> { pub fn handshake<'a, T: AsyncRead + AsyncWrite>(connection: T) -> Handshake<'a, T> {
let local_keys = DHLocalKeys::random(&mut thread_rng()); let local_keys = DHLocalKeys::random(&mut thread_rng());
let client_hello = client_hello(connection, local_keys.public_key()); let client_hello = client_hello(connection, local_keys.public_key());
@ -37,23 +48,22 @@ pub fn handshake<T: AsyncRead + AsyncWrite>(connection: T) -> Handshake<T> {
} }
} }
impl<T: AsyncRead + AsyncWrite> Future for Handshake<T> { impl<'a, T: AsyncRead + AsyncWrite> Future for Handshake<'a, T> {
type Item = Framed<T, APCodec>; type Output = Result<Framed<T, APCodec>, io::Error>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, io::Error> { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use self::HandshakeState::*; use self::HandshakeState::*;
loop { loop {
self.state = match self.state { self.state = match self.state {
ClientHello(ref mut write) => { ClientHello(ref mut write) => {
let (connection, accumulator) = try_ready!(write.poll()); let (connection, accumulator) = ready!(write.poll());
let read = recv_packet(connection, accumulator); let read = recv_packet(connection, accumulator);
APResponse(read) APResponse(read)
} }
APResponse(ref mut read) => { APResponse(ref mut read) => {
let (connection, message, accumulator) = try_ready!(read.poll()); let (connection, message, accumulator) = ready!(read.poll());
let remote_key = message let remote_key = message
.get_challenge() .get_challenge()
.get_login_crypto_challenge() .get_login_crypto_challenge()
@ -71,17 +81,17 @@ impl<T: AsyncRead + AsyncWrite> Future for Handshake<T> {
} }
ClientResponse(ref mut codec, ref mut write) => { ClientResponse(ref mut codec, ref mut write) => {
let (connection, _) = try_ready!(write.poll()); let (connection, _) = ready!(write.poll());
let codec = codec.take().unwrap(); let codec = codec.take().unwrap();
let framed = codec.framed(connection); let framed = codec.framed(connection);
return Ok(Async::Ready(framed)); return Poll::Ready(Ok(framed));
} }
} }
} }
} }
} }
fn client_hello<T: AsyncWrite>(connection: T, gc: Vec<u8>) -> WriteAll<T, Vec<u8>> { fn client_hello<'a, T: AsyncWrite>(connection: T, gc: Vec<u8>) -> WriteAll<'a, T> {
let mut packet = ClientHello::new(); let mut packet = ClientHello::new();
packet packet
.mut_build_info() .mut_build_info()
@ -109,10 +119,11 @@ fn client_hello<T: AsyncWrite>(connection: T, gc: Vec<u8>) -> WriteAll<T, Vec<u8
buffer.write_u32::<BigEndian>(size).unwrap(); buffer.write_u32::<BigEndian>(size).unwrap();
packet.write_to_vec(&mut buffer).unwrap(); packet.write_to_vec(&mut buffer).unwrap();
write_all(connection, buffer) // write_all(connection, buffer)
connection.write_all(&buffer)
} }
fn client_response<T: AsyncWrite>(connection: T, challenge: Vec<u8>) -> WriteAll<T, Vec<u8>> { fn client_response<'a, T: AsyncWrite>(connection: T, challenge: Vec<u8>) -> WriteAll<'a, T> {
let mut packet = ClientResponsePlaintext::new(); let mut packet = ClientResponsePlaintext::new();
packet packet
.mut_login_crypto_response() .mut_login_crypto_response()
@ -126,15 +137,16 @@ fn client_response<T: AsyncWrite>(connection: T, challenge: Vec<u8>) -> WriteAll
buffer.write_u32::<BigEndian>(size).unwrap(); buffer.write_u32::<BigEndian>(size).unwrap();
packet.write_to_vec(&mut buffer).unwrap(); packet.write_to_vec(&mut buffer).unwrap();
write_all(connection, buffer) // write_all(connection, buffer)
connection.write_all(&buffer)
} }
enum RecvPacket<T, M: Message> { enum RecvPacket<'a, T, M: Message> {
Header(ReadExact<T, Window<Vec<u8>>>, PhantomData<M>), Header(ReadExact<'a, T>, PhantomData<M>),
Body(ReadExact<T, Window<Vec<u8>>>, PhantomData<M>), Body(ReadExact<'a, T>, PhantomData<M>),
} }
fn recv_packet<T: AsyncRead, M>(connection: T, acc: Vec<u8>) -> RecvPacket<T, M> fn recv_packet<'a, T: AsyncRead, M>(connection: T, acc: Vec<u8>) -> RecvPacket<'a, T, M>
where where
T: Read, T: Read,
M: Message, M: Message,
@ -142,20 +154,19 @@ where
RecvPacket::Header(read_into_accumulator(connection, 4, acc), PhantomData) RecvPacket::Header(read_into_accumulator(connection, 4, acc), PhantomData)
} }
impl<T: AsyncRead, M> Future for RecvPacket<T, M> impl<'a, T: AsyncRead, M> Future for RecvPacket<'a, T, M>
where where
T: Read, T: Read,
M: Message, M: Message,
{ {
type Item = (T, M, Vec<u8>); type Output = Result<(T, M, Vec<u8>), io::Error>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, io::Error> { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use self::RecvPacket::*; use self::RecvPacket::*;
loop { loop {
*self = match *self { *self = match *self {
Header(ref mut read, _) => { Header(ref mut read, _) => {
let (connection, header) = try_ready!(read.poll()); let (connection, header) = ready!(read.poll());
let size = BigEndian::read_u32(header.as_ref()) as usize; let size = BigEndian::read_u32(header.as_ref()) as usize;
let acc = header.into_inner(); let acc = header.into_inner();
@ -164,29 +175,30 @@ where
} }
Body(ref mut read, _) => { Body(ref mut read, _) => {
let (connection, data) = try_ready!(read.poll()); let (connection, data) = ready!(read.poll());
let message = protobuf::parse_from_bytes(data.as_ref()).unwrap(); let message = protobuf::parse_from_bytes(data.as_ref()).unwrap();
let acc = data.into_inner(); let acc = data.into_inner();
return Ok(Async::Ready((connection, message, acc))); return Poll::Ready(Ok((connection, message, acc)));
} }
} }
} }
} }
} }
fn read_into_accumulator<T: AsyncRead>( fn read_into_accumulator<'a, T: AsyncRead>(
connection: T, connection: T,
size: usize, size: usize,
mut acc: Vec<u8>, mut acc: Vec<u8>,
) -> ReadExact<T, Window<Vec<u8>>> { ) -> ReadExact<'a, T> {
let offset = acc.len(); let offset = acc.len();
acc.resize(offset + size, 0); acc.resize(offset + size, 0);
let mut window = Window::new(acc); let mut window = Window::new(acc);
window.set_start(offset); window.set_start(offset);
read_exact(connection, window) // read_exact(connection, window)
connection.read_exact(window)
} }
fn compute_keys(shared_secret: &[u8], packets: &[u8]) -> (Vec<u8>, Vec<u8>, Vec<u8>) { fn compute_keys(shared_secret: &[u8], packets: &[u8]) -> (Vec<u8>, Vec<u8>, Vec<u8>) {

View file

@ -3,13 +3,18 @@ mod handshake;
pub use self::codec::APCodec; pub use self::codec::APCodec;
pub use self::handshake::handshake; pub use self::handshake::handshake;
use tokio::net::TcpStream;
use futures::{Future, Sink, Stream}; use futures::{AsyncRead, AsyncWrite, Future, Sink, SinkExt, Stream, StreamExt};
use protobuf::{self, Message}; use protobuf::{self, Message};
use std::io; use std::io;
use std::net::ToSocketAddrs; use std::net::ToSocketAddrs;
use tokio::net::TcpStream; use tokio_util::codec::Framed;
use tokio_codec::Framed; // use futures::compat::{AsyncWrite01CompatExt, AsyncRead01CompatExt};
// use tokio_util::compat::{self, Tokio02AsyncReadCompatExt, Tokio02AsyncWriteCompatExt};
// use tokio_codec::Framed;
// use tokio_core::net::TcpStream;
// use tokio_core::reactor::Handle;
use url::Url; use url::Url;
use crate::authentication::Credentials; use crate::authentication::Credentials;
@ -19,52 +24,46 @@ use crate::proxytunnel;
pub type Transport = Framed<TcpStream, APCodec>; pub type Transport = Framed<TcpStream, APCodec>;
pub fn connect( pub async fn connect(addr: String, proxy: &Option<Url>) -> Result<Transport, io::Error> {
addr: String, let (addr, connect_url): (_, Option<String>) = match *proxy {
proxy: &Option<Url>,
) -> Box<dyn Future<Item = Transport, Error = io::Error>> {
let (addr, connect_url) = match *proxy {
Some(ref url) => { Some(ref url) => {
info!("Using proxy \"{}\"", url); unimplemented!()
match url.to_socket_addrs().and_then(|mut iter| { // info!("Using proxy \"{}\"", url);
iter.next().ok_or(io::Error::new( //
io::ErrorKind::NotFound, // let mut iter = url.to_socket_addrs()?;
"Can't resolve proxy server address", // let socket_addr = iter.next().ok_or(io::Error::new(
)) // io::ErrorKind::NotFound,
}) { // "Can't resolve proxy server address",
Ok(socket_addr) => (socket_addr, Some(addr)), // ))?;
Err(error) => return Box::new(futures::future::err(error)), // (socket_addr, Some(addr))
}
} }
None => { None => {
match addr.to_socket_addrs().and_then(|mut iter| { let mut iter = addr.to_socket_addrs()?;
iter.next().ok_or(io::Error::new( let socket_addr = iter.next().ok_or(io::Error::new(
io::ErrorKind::NotFound, io::ErrorKind::NotFound,
"Can't resolve server address", "Can't resolve server address",
)) ))?;
}) { (socket_addr, None)
Ok(socket_addr) => (socket_addr, None),
Err(error) => return Box::new(futures::future::err(error)),
}
} }
}; };
let socket = TcpStream::connect(&addr); let connection = TcpStream::connect(&addr).await?;
if let Some(connect_url) = connect_url { if let Some(connect_url) = connect_url {
let connection = socket unimplemented!()
.and_then(move |socket| proxytunnel::connect(socket, &connect_url).and_then(handshake)); // let connection = proxytunnel::connect(connection, &connect_url).await?;
Box::new(connection) // let connection = handshake(connection).await?;
// Ok(connection)
} else { } else {
let connection = socket.and_then(handshake); let connection = handshake(connection).await?;
Box::new(connection) Ok(connection)
} }
} }
pub fn authenticate( pub async fn authenticate(
transport: Transport, mut transport: Transport,
credentials: Credentials, credentials: Credentials,
device_id: String, device_id: String,
) -> Box<dyn Future<Item = (Transport, Credentials), Error = io::Error>> { ) -> Result<(Transport, Credentials), io::Error> {
use crate::protocol::authentication::{APWelcome, ClientResponseEncrypted, CpuFamily, Os}; use crate::protocol::authentication::{APWelcome, ClientResponseEncrypted, CpuFamily, Os};
use crate::protocol::keyexchange::APLoginFailed; use crate::protocol::keyexchange::APLoginFailed;
@ -92,38 +91,39 @@ pub fn authenticate(
packet.mut_system_info().set_device_id(device_id); packet.mut_system_info().set_device_id(device_id);
packet.set_version_string(version::version_string()); packet.set_version_string(version::version_string());
let cmd = 0xab; let cmd: u8 = 0xab;
let data = packet.write_to_bytes().unwrap(); let data = packet.write_to_bytes().unwrap();
Box::new( transport.send((cmd, data)).await;
transport
.send((cmd, data))
.and_then(|transport| transport.into_future().map_err(|(err, _stream)| err))
.and_then(|(packet, transport)| match packet {
Some((0xac, data)) => {
let welcome_data: APWelcome =
protobuf::parse_from_bytes(data.as_ref()).unwrap();
let reusable_credentials = Credentials { let packet = transport.next().await;
username: welcome_data.get_canonical_username().to_owned(), // let (packet, transport) = transport
auth_type: welcome_data.get_reusable_auth_credentials_type(), // .into_future()
auth_data: welcome_data.get_reusable_auth_credentials().to_owned(), // .map_err(|(err, _stream)| err)
}; // .await?;
match packet {
Some(Ok((0xac, data))) => {
let welcome_data: APWelcome = protobuf::parse_from_bytes(data.as_ref()).unwrap();
Ok((transport, reusable_credentials)) let reusable_credentials = Credentials {
} username: welcome_data.get_canonical_username().to_owned(),
auth_type: welcome_data.get_reusable_auth_credentials_type(),
auth_data: welcome_data.get_reusable_auth_credentials().to_owned(),
};
Some((0xad, data)) => { Ok((transport, reusable_credentials))
let error_data: APLoginFailed = }
protobuf::parse_from_bytes(data.as_ref()).unwrap();
panic!(
"Authentication failed with reason: {:?}",
error_data.get_error_code()
)
}
Some((cmd, _)) => panic!("Unexpected packet {:?}", cmd), Some(Ok((0xad, data))) => {
None => panic!("EOF"), let error_data: APLoginFailed = protobuf::parse_from_bytes(data.as_ref()).unwrap();
}), panic!(
) "Authentication failed with reason: {:?}",
error_data.get_error_code()
)
}
Some(Ok((cmd, _))) => panic!("Unexpected packet {:?}", cmd),
Some(err @ Err(_)) => panic!("Packet error: {:?}", err),
None => panic!("EOF"),
}
} }

View file

@ -1,4 +1,4 @@
use futures::Future; // use futures::Future;
use serde_json; use serde_json;
use crate::mercury::MercuryError; use crate::mercury::MercuryError;
@ -13,20 +13,22 @@ pub struct Token {
pub scope: Vec<String>, pub scope: Vec<String>,
} }
pub fn get_token( pub async fn get_token(
session: &Session, session: &Session,
client_id: &str, client_id: &str,
scopes: &str, scopes: &str,
) -> Box<dyn Future<Item = Token, Error = MercuryError>> { ) -> Result<Token, MercuryError> {
let url = format!( let url = format!(
"hm://keymaster/token/authenticated?client_id={}&scope={}", "hm://keymaster/token/authenticated?client_id={}&scope={}",
client_id, scopes client_id, scopes
); );
Box::new(session.mercury().get(url).map(move |response| {
// Box::new(session.mercury().get(url).map(move |response| {
session.mercury().get(url).await.map(move |response| {
let data = response.payload.first().expect("Empty payload"); let data = response.payload.first().expect("Empty payload");
let data = String::from_utf8(data.clone()).unwrap(); let data = String::from_utf8(data.clone()).unwrap();
let token: Token = serde_json::from_str(&data).unwrap(); let token: Token = serde_json::from_str(&data).unwrap();
token token
})) })
} }

View file

@ -1,7 +1,7 @@
#![cfg_attr(feature = "cargo-clippy", allow(unused_io_amount))] #![cfg_attr(feature = "cargo-clippy", allow(unused_io_amount))]
#[macro_use] // #[macro_use]
extern crate error_chain; // extern crate error_chain;
#[macro_use] #[macro_use]
extern crate futures; extern crate futures;
#[macro_use] #[macro_use]
@ -30,8 +30,8 @@ extern crate serde_json;
extern crate sha1; extern crate sha1;
extern crate shannon; extern crate shannon;
extern crate tokio; extern crate tokio;
extern crate tokio_codec; // extern crate tokio_codec;
extern crate tokio_io; // extern crate tokio_io;
extern crate url; extern crate url;
extern crate uuid; extern crate uuid;
@ -45,11 +45,11 @@ pub mod authentication;
pub mod cache; pub mod cache;
pub mod channel; pub mod channel;
pub mod config; pub mod config;
mod connection; pub mod connection;
pub mod diffie_hellman; pub mod diffie_hellman;
pub mod keymaster; pub mod keymaster;
pub mod mercury; pub mod mercury;
mod proxytunnel; pub mod proxytunnel;
pub mod session; pub mod session;
pub mod spotify_id; pub mod spotify_id;
pub mod util; pub mod util;

View file

@ -2,12 +2,16 @@ use crate::protocol;
use crate::util::url_encode; use crate::util::url_encode;
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes; use bytes::Bytes;
use futures::sync::{mpsc, oneshot};
use futures::{Async, Future, Poll};
use protobuf; use protobuf;
use std::collections::HashMap; use std::collections::HashMap;
use std::mem; use std::mem;
use futures::{
channel::{mpsc, oneshot},
Future, FutureExt,
};
use std::task::Poll;
use crate::util::SeqGenerator; use crate::util::SeqGenerator;
mod types; mod types;
@ -33,14 +37,13 @@ pub struct MercuryPending {
pub struct MercuryFuture<T>(oneshot::Receiver<Result<T, MercuryError>>); pub struct MercuryFuture<T>(oneshot::Receiver<Result<T, MercuryError>>);
impl<T> Future for MercuryFuture<T> { impl<T> Future for MercuryFuture<T> {
type Item = T; type Output = Result<T, MercuryError>;
type Error = MercuryError;
fn poll(&mut self) -> Poll<T, MercuryError> { fn poll(&mut self) -> Poll<Self::Output> {
match self.0.poll() { match self.0.poll() {
Ok(Async::Ready(Ok(value))) => Ok(Async::Ready(value)), Poll::Ready(Ok(Ok(value))) => Poll::Ready(Ok(value)),
Ok(Async::Ready(Err(err))) => Err(err), Poll::Ready(Ok(Err(err))) => Err(err),
Ok(Async::NotReady) => Ok(Async::NotReady), Poll::Pending => Poll::Pending,
Err(oneshot::Canceled) => Err(MercuryError), Err(oneshot::Canceled) => Err(MercuryError),
} }
} }
@ -98,11 +101,10 @@ impl MercuryManager {
MercurySender::new(self.clone(), uri.into()) MercurySender::new(self.clone(), uri.into())
} }
pub fn subscribe<T: Into<String>>( pub async fn subscribe<T: Into<String>>(
&self, &self,
uri: T, uri: T,
) -> Box<dyn Future<Item = mpsc::UnboundedReceiver<MercuryResponse>, Error = MercuryError>> ) -> Result<mpsc::UnboundedReceiver<MercuryResponse>, MercuryError> {
{
let uri = uri.into(); let uri = uri.into();
let request = self.request(MercuryRequest { let request = self.request(MercuryRequest {
method: MercuryMethod::SUB, method: MercuryMethod::SUB,
@ -112,7 +114,7 @@ impl MercuryManager {
}); });
let manager = self.clone(); let manager = self.clone();
Box::new(request.map(move |response| { request.await.map(move |response| {
let (tx, rx) = mpsc::unbounded(); let (tx, rx) = mpsc::unbounded();
manager.lock(move |inner| { manager.lock(move |inner| {
@ -137,7 +139,7 @@ impl MercuryManager {
}); });
rx rx
})) })
} }
pub(crate) fn dispatch(&self, cmd: u8, mut data: Bytes) { pub(crate) fn dispatch(&self, cmd: u8, mut data: Bytes) {

View file

@ -1,7 +1,11 @@
use futures::{Async, AsyncSink, Future, Poll, Sink, StartSend}; use futures::{Future, Sink};
use std::collections::VecDeque; use std::collections::VecDeque;
use super::*; use super::*;
use std::{
pin::Pin,
task::{Context, Poll},
};
pub struct MercurySender { pub struct MercurySender {
mercury: MercuryManager, mercury: MercuryManager,
@ -30,25 +34,23 @@ impl Clone for MercurySender {
} }
} }
impl Sink for MercurySender { type SinkItem = Vec<u8>;
type SinkItem = Vec<u8>; impl Sink<SinkItem> for MercurySender {
type SinkError = MercuryError; type Error = MercuryError;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> { fn start_send(self: Pin<&mut Self>, item: SinkItem) -> Result<(), Self::Error> {
let task = self.mercury.send(self.uri.clone(), item); let task = self.mercury.send(self.uri.clone(), item);
self.pending.push_back(task); self.pending.push_back(task);
Ok(AsyncSink::Ready) Poll::Ready(Ok(()))
} }
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> { fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
loop { loop {
match self.pending.front_mut() { match self.pending.front_mut() {
Some(task) => { Some(task) => {
try_ready!(task.poll()); ready!(task.poll());
}
None => {
return Ok(Async::Ready(()));
} }
None => return Poll::Ready(Ok(())),
} }
self.pending.pop_front(); self.pending.pop_front();
} }

View file

@ -1,49 +1,61 @@
use std::io; use std::io;
use std::str::FromStr; use std::str::FromStr;
use futures::{Async, Future, Poll};
use httparse; use httparse;
use hyper::Uri; use hyper::Uri;
use tokio_io::io::{read, write_all, Read, Window, WriteAll}; // use tokio_io::io::{read, write_all, Read, Window, WriteAll};
use tokio_io::{AsyncRead, AsyncWrite}; // use tokio_io::{AsyncRead, AsyncWrite};
pub struct ProxyTunnel<T> { use futures::{
state: ProxyState<T>, io::{Read, Window, WriteAll},
AsyncRead, AsyncWrite, Future,
};
use std::{
pin::Pin,
task::{Context, Poll},
};
// use tokio::io::{AsyncReadExt, AsyncWriteExt};
pub struct ProxyTunnel<'a, T> {
state: ProxyState<'a, T>,
} }
enum ProxyState<T> { enum ProxyState<'a, T> {
ProxyConnect(WriteAll<T, Vec<u8>>), ProxyConnect(WriteAll<'a, T>),
ProxyResponse(Read<T, Window<Vec<u8>>>), ProxyResponse(Read<'a, T>),
} }
pub fn connect<T: AsyncRead + AsyncWrite>(connection: T, connect_url: &str) -> ProxyTunnel<T> { pub fn connect<'a, T: AsyncRead + AsyncWrite>(
connection: T,
connect_url: &str,
) -> ProxyTunnel<'a, T> {
let proxy = proxy_connect(connection, connect_url); let proxy = proxy_connect(connection, connect_url);
ProxyTunnel { ProxyTunnel {
state: ProxyState::ProxyConnect(proxy), state: ProxyState::ProxyConnect(proxy),
} }
} }
impl<T: AsyncRead + AsyncWrite> Future for ProxyTunnel<T> { impl<'a, T: AsyncRead + AsyncWrite> Future for ProxyTunnel<'a, T> {
type Item = T; type Output = Result<T, io::Error>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Self::Item, io::Error> { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
use self::ProxyState::*; use self::ProxyState::*;
loop { loop {
self.state = match self.state { self.state = match self.state {
ProxyConnect(ref mut write) => { ProxyConnect(ref mut write) => {
let (connection, mut accumulator) = try_ready!(write.poll()); let (connection, mut accumulator) = ready!(write.poll());
let capacity = accumulator.capacity(); let capacity = accumulator.capacity();
accumulator.resize(capacity, 0); accumulator.resize(capacity, 0);
let window = Window::new(accumulator); let window = Window::new(accumulator);
let read = read(connection, window); // let read = read(connection, window);
ProxyResponse(read) // ProxyResponse(read)
ProxyResponse(connection.read(window))
} }
ProxyResponse(ref mut read_f) => { ProxyResponse(ref mut read_f) => {
let (connection, mut window, bytes_read) = try_ready!(read_f.poll()); let (connection, mut window, bytes_read) = ready!(read_f.poll());
if bytes_read == 0 { if bytes_read == 0 {
return Err(io::Error::new(io::ErrorKind::Other, "Early EOF from proxy")); return Err(io::Error::new(io::ErrorKind::Other, "Early EOF from proxy"));
@ -65,7 +77,7 @@ impl<T: AsyncRead + AsyncWrite> Future for ProxyTunnel<T> {
if let Some(code) = response.code { if let Some(code) = response.code {
if code == 200 { if code == 200 {
// Proxy says all is well // Proxy says all is well
return Ok(Async::Ready(connection)); return Poll::Ready(connection);
} else { } else {
let reason = response.reason.unwrap_or("no reason"); let reason = response.reason.unwrap_or("no reason");
let msg = format!("Proxy responded with {}: {}", code, reason); let msg = format!("Proxy responded with {}: {}", code, reason);
@ -87,8 +99,9 @@ impl<T: AsyncRead + AsyncWrite> Future for ProxyTunnel<T> {
} }
// We did not get a full header // We did not get a full header
window.set_start(data_end); window.set_start(data_end);
let read = read(connection, window); // let read = read(connection, window);
ProxyResponse(read) // ProxyResponse(read)
ProxyResponse(connection.read(window))
} }
} }
} }
@ -96,7 +109,7 @@ impl<T: AsyncRead + AsyncWrite> Future for ProxyTunnel<T> {
} }
} }
fn proxy_connect<T: AsyncWrite>(connection: T, connect_url: &str) -> WriteAll<T, Vec<u8>> { fn proxy_connect<T: AsyncWrite>(connection: T, connect_url: &str) -> WriteAll<T> {
let uri = Uri::from_str(connect_url).unwrap(); let uri = Uri::from_str(connect_url).unwrap();
let buffer = format!( let buffer = format!(
"CONNECT {0}:{1} HTTP/1.1\r\n\ "CONNECT {0}:{1} HTTP/1.1\r\n\
@ -106,5 +119,6 @@ fn proxy_connect<T: AsyncWrite>(connection: T, connect_url: &str) -> WriteAll<T,
) )
.into_bytes(); .into_bytes();
write_all(connection, buffer) // write_all(connection, buffer)
connection.write_all(buffer)
} }

View file

@ -5,9 +5,24 @@ use std::time::{SystemTime, UNIX_EPOCH};
use byteorder::{BigEndian, ByteOrder}; use byteorder::{BigEndian, ByteOrder};
use bytes::Bytes; use bytes::Bytes;
use futures::sync::mpsc; // use futures::sync::mpsc;
use futures::{Async, Future, IntoFuture, Poll, Stream}; // use futures::{Async, Future, IntoFuture, Poll, Stream};
use tokio::runtime::{current_thread, current_thread::Handle}; // use tokio::runtime::{current_thread, current_thread::Handle};
// use futures::future::{IntoFuture, Remote};
use futures::{
channel::mpsc,
// future::{IntoFuture, Remote},
Future,
Stream,
TryFutureExt,
};
use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::runtime::Handle;
use crate::apresolve::apresolve_or_fallback; use crate::apresolve::apresolve_or_fallback;
use crate::audio_key::AudioKeyManager; use crate::audio_key::AudioKeyManager;
@ -46,49 +61,52 @@ static SESSION_COUNTER: AtomicUsize = AtomicUsize::new(0);
#[derive(Clone)] #[derive(Clone)]
pub struct Session(Arc<SessionInternal>); pub struct Session(Arc<SessionInternal>);
// TODO: Define better errors!
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
impl Session { impl Session {
pub fn connect( pub async fn connect(
config: SessionConfig, config: SessionConfig,
credentials: Credentials, credentials: Credentials,
cache: Option<Cache>, cache: Option<Cache>,
handle: Handle, handle: Handle,
) -> Box<dyn Future<Item = Session, Error = io::Error>> { ) -> Result<Session> {
let access_point = apresolve_or_fallback::<io::Error>(&config.proxy, &config.ap_port); unimplemented!()
// let access_point_addr =
let proxy = config.proxy.clone(); // apresolve_or_fallback::<io::Error>(&config.proxy, &config.ap_port).await?;
let connection = access_point.and_then(move |addr| { //
info!("Connecting to AP \"{}\"", addr); // let proxy = config.proxy.clone();
connection::connect(addr, &proxy) // info!("Connecting to AP \"{}\"", access_point_addr);
}); // let connection = connection::connect(access_point_addr, &proxy);
//
let device_id = config.device_id.clone(); // let device_id = config.device_id.clone();
let authentication = connection.and_then(move |connection| { // let authentication = connection.and_then(move |connection| {
connection::authenticate(connection, credentials, device_id) // connection::authenticate(connection, credentials, device_id)
}); // });
//
let result = authentication.map(move |(transport, reusable_credentials)| { // let result = authentication.map(move |(transport, reusable_credentials)| {
info!("Authenticated as \"{}\" !", reusable_credentials.username); // info!("Authenticated as \"{}\" !", reusable_credentials.username);
if let Some(ref cache) = cache { // if let Some(ref cache) = cache {
cache.save_credentials(&reusable_credentials); // cache.save_credentials(&reusable_credentials);
} // }
//
let (session, task) = Session::create( // let (session, task) = Session::create(
&handle, // &handle,
transport, // transport,
config, // config,
cache, // cache,
reusable_credentials.username.clone(), // reusable_credentials.username.clone(),
); // );
//
current_thread::spawn(task.map_err(|e| { // tokio::spawn(task.map_err(|e| {
error!("SessionError: {}", e.to_string()); // error!("SessionError: {}", e.to_string());
std::process::exit(0); // std::process::exit(0);
})); // }));
//
session // session
}); // });
//
Box::new(result) // result
} }
fn create( fn create(
@ -97,7 +115,7 @@ impl Session {
config: SessionConfig, config: SessionConfig,
cache: Option<Cache>, cache: Option<Cache>,
username: String, username: String,
) -> (Session, Box<dyn Future<Item = (), Error = io::Error>>) { ) -> (Session, Box<dyn Future<Output = Result<()>>>) {
let (sink, stream) = transport.split(); let (sink, stream) = transport.split();
let (sender_tx, sender_rx) = mpsc::unbounded(); let (sender_tx, sender_rx) = mpsc::unbounded();
@ -160,7 +178,7 @@ impl Session {
// Spawn a future directly // Spawn a future directly
pub fn spawn<F>(&self, f: F) pub fn spawn<F>(&self, f: F)
where where
F: Future<Item = (), Error = ()> + Send + 'static, F: Future<Output = ()> + Send + 'static,
{ {
let handle = self.0.handle.lock().unwrap(); let handle = self.0.handle.lock().unwrap();
let spawn_res = handle.spawn(f); let spawn_res = handle.spawn(f);
@ -293,34 +311,35 @@ impl Drop for SessionInternal {
} }
} }
// type SErr = ::std::fmt::Debug;
struct DispatchTask<S>(S, SessionWeak) struct DispatchTask<S>(S, SessionWeak)
where where
S: Stream<Item = (u8, Bytes)>; S: Stream<Item = Result<((u8, Bytes), ())>>;
impl<S> Future for DispatchTask<S> impl<S> Future for DispatchTask<S>
where where
S: Stream<Item = (u8, Bytes)>, // SErr: ::std::fmt::Debug,
<S as Stream>::Error: ::std::fmt::Debug, S: Stream<Item = Result<((u8, Bytes), ())>>,
{ {
type Item = (); type Output = Result<((), ())>;
type Error = S::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let session = match self.1.try_upgrade() { let session = match self.1.try_upgrade() {
Some(session) => session, Some(session) => session,
None => return Ok(Async::Ready(())), None => return Poll::Ready(()),
}; };
loop { loop {
let (cmd, data) = match self.0.poll() { let (cmd, data) = match self.unwrap().0.poll() {
Ok(Async::Ready(Some(t))) => t, Poll::Ready(Ok(Some(t))) => t,
Ok(Async::Ready(None)) => { Poll::Ready(Ok(None)) => {
warn!("Connection to server closed."); warn!("Connection to server closed.");
session.shutdown(); session.shutdown();
return Ok(Async::Ready(())); return Ok(Poll::Ready(()));
} }
Ok(Async::NotReady) => return Ok(Async::NotReady), Poll::Pending => return Poll::Pending,
Err(e) => { Poll::Ready(Err(e)) => {
session.shutdown(); session.shutdown();
return Err(From::from(e)); return Err(From::from(e));
} }
@ -333,7 +352,7 @@ where
impl<S> Drop for DispatchTask<S> impl<S> Drop for DispatchTask<S>
where where
S: Stream<Item = (u8, Bytes)>, S: Stream<Item = Result<((u8, Bytes), ())>>,
{ {
fn drop(&mut self) { fn drop(&mut self) {
debug!("drop Dispatch"); debug!("drop Dispatch");

23
core/tests/connect.rs Normal file
View file

@ -0,0 +1,23 @@
use env_logger;
use std::env;
use tokio::runtime::Runtime;
use librespot_core::{apresolve::apresolve_or_fallback, connection};
// TODO: Rewrite this into an actual test instead of this wonder
fn main() {
env_logger::init();
let mut rt = Runtime::new().unwrap();
let args: Vec<_> = env::args().collect();
if args.len() != 4 {
println!("Usage: {} USERNAME PASSWORD PLAYLIST", args[0]);
}
// let username = args[1].to_owned();
// let password = args[2].to_owned();
let ap = rt.block_on(apresolve_or_fallback(&None, &Some(80)));
println!("AP: {:?}", ap);
let connection = rt.block_on(connection::connect(&None));
}