use byteorder::{BigEndian, ByteOrder, ReadBytesExt, WriteBytesExt}; use std::collections::HashMap; use std::io::{Cursor, Seek, SeekFrom, Write}; use std::sync::mpsc; use util::{ArcVec, FileId}; use connection::PacketHandler; use session::Session; #[derive(Debug)] pub enum StreamEvent { Header(u8, ArcVec), Data(ArcVec), } type ChannelId = u16; enum ChannelMode { Header, Data } struct Channel { mode: ChannelMode, callback: mpsc::Sender } pub struct StreamManager { next_id: ChannelId, channels: HashMap, } impl StreamManager { pub fn new() -> StreamManager { StreamManager { next_id: 0, channels: HashMap::new(), } } pub fn request(&mut self, session: &Session, file: FileId, offset: u32, size: u32) -> mpsc::Receiver { let (tx, rx) = mpsc::channel(); let channel_id = self.next_id; self.next_id += 1; let mut data : Vec = Vec::new(); data.write_u16::(channel_id).unwrap(); data.write_u8(0).unwrap(); data.write_u8(1).unwrap(); data.write_u16::(0x0000).unwrap(); data.write_u32::(0x00000000).unwrap(); data.write_u32::(0x00009C40).unwrap(); data.write_u32::(0x00020000).unwrap(); data.write(&file.0).unwrap(); data.write_u32::(offset).unwrap(); data.write_u32::(offset + size).unwrap(); session.send_packet(0x8, &data).unwrap(); self.channels.insert(channel_id, Channel { mode: ChannelMode::Header, callback: tx }); rx } } impl PacketHandler for StreamManager { fn handle(&mut self, _cmd: u8, data: Vec) { let data = ArcVec::new(data); let mut packet = Cursor::new(&data as &[u8]); let id : ChannelId = packet.read_u16::().unwrap(); let mut close = false; { let channel = match self.channels.get_mut(&id) { Some(ch) => ch, None => { return; } }; match channel.mode { ChannelMode::Header => { let mut length = 0; while packet.position() < data.len() as u64 && !close { length = packet.read_u16::().unwrap(); if length > 0 { let header_id = packet.read_u8().unwrap(); channel.callback .send(StreamEvent::Header( header_id, data.clone() .offset(packet.position() as usize) .limit(length as usize - 1) )) .unwrap_or_else(|_| { close = true; }); packet.seek(SeekFrom::Current(length as i64 - 1)).unwrap(); } } if length == 0 { channel.mode = ChannelMode::Data; } } ChannelMode::Data => { if packet.position() < data.len() as u64 { channel.callback .send(StreamEvent::Data(data.clone().offset(packet.position() as usize))) .unwrap_or_else(|_| { close = true; }); } else { close = true; } } } } if close { self.channels.remove(&id); } } }