pub mod debug;
pub mod hidusb;
use crate::api::Endpoint;
use crate::protocol::hidio::*;
use std::collections::HashMap;
use std::sync::mpsc;
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use std::io::{Read, Write};
pub trait HIDIOTransport: Read + Write {}
const MAX_RECV_SIZE: usize = 1024;
pub struct HIDIOEndpoint {
socket: Box<dyn HIDIOTransport>,
max_packet_len: u32,
}
impl HIDIOEndpoint {
pub fn new(socket: Box<dyn HIDIOTransport>, max_packet_len: u32) -> HIDIOEndpoint {
HIDIOEndpoint {
socket,
max_packet_len,
}
}
pub fn recv_chunk(&mut self, buffer: &mut HIDIOPacketBuffer) -> Result<usize, std::io::Error> {
let mut rbuf = [0; MAX_RECV_SIZE];
match self.socket.read(&mut rbuf) {
Ok(len) => {
if len > 0 {
let slice = &rbuf[0..len];
let ret = buffer.decode_packet(&mut slice.to_vec());
if let Err(e) = ret {
error!("recv_chunk({}) {:?}", len, e);
println!("received: {:?}", slice);
println!("current state: {:?}", buffer);
std::process::exit(2);
} else {
info!("R{} {:x?}", buffer.data.len(), buffer);
}
}
Ok(len)
}
Err(e) => Err(e),
}
}
pub fn create_buffer(&self) -> HIDIOPacketBuffer {
let mut buffer = HIDIOPacketBuffer::new();
buffer.max_len = self.max_packet_len;
buffer
}
pub fn recv_packet(&mut self) -> HIDIOPacketBuffer {
let mut deserialized = self.create_buffer();
while !deserialized.done {
if let Ok(len) = self.recv_chunk(&mut deserialized) {
if len > 0 {
match &deserialized.ptype {
HIDIOPacketType::Sync => {
deserialized = self.create_buffer();
}
HIDIOPacketType::ACK => {
}
HIDIOPacketType::NAK => {
println!("NACK");
break;
}
HIDIOPacketType::Continued | HIDIOPacketType::Data => {
self.send_ack(deserialized.id, vec![]);
}
}
}
}
}
deserialized
}
pub fn send_packet(&mut self, mut packet: HIDIOPacketBuffer) -> Result<(), std::io::Error> {
info!("Sending {:x?}", packet);
let buf: Vec<u8> = packet.serialize_buffer().unwrap();
for chunk in buf
.chunks(self.max_packet_len as usize)
.collect::<Vec<&[u8]>>()
.iter()
{
let _i = self.socket.write(chunk)?;
}
Ok(())
}
pub fn send_sync(&mut self) {
self.send_packet(HIDIOPacketBuffer {
ptype: HIDIOPacketType::Sync,
id: 0,
max_len: 64,
data: vec![],
done: true,
})
.unwrap();
}
pub fn send_ack(&mut self, _id: u32, data: Vec<u8>) {
self.send_packet(HIDIOPacketBuffer {
ptype: HIDIOPacketType::ACK,
id: 0,
max_len: 64,
data,
done: true,
})
.unwrap();
}
}
pub struct HIDIOController {
id: String,
device: HIDIOEndpoint,
received: HIDIOPacketBuffer,
last_sync: Instant,
message_queue: std::sync::mpsc::Sender<HIDIOPacketBuffer>,
response_queue: std::sync::mpsc::Receiver<HIDIOPacketBuffer>,
}
impl HIDIOController {
pub fn new(
id: String,
device: HIDIOEndpoint,
message_queue: std::sync::mpsc::Sender<HIDIOPacketBuffer>,
response_queue: std::sync::mpsc::Receiver<HIDIOPacketBuffer>,
) -> HIDIOController {
let received = device.create_buffer();
let last_sync = Instant::now();
HIDIOController {
device,
id,
received,
last_sync,
message_queue,
response_queue,
}
}
pub fn process(&mut self) -> Result<(), std::io::Error> {
match self.device.recv_chunk(&mut self.received) {
Ok(recv) => {
if recv > 0 {
self.last_sync = Instant::now();
match &self.received.ptype {
HIDIOPacketType::Sync => {
self.received = self.device.create_buffer();
}
HIDIOPacketType::ACK => {
}
HIDIOPacketType::NAK => {
println!("NACK. Resetting buffer");
self.received = self.device.create_buffer();
}
HIDIOPacketType::Continued | HIDIOPacketType::Data => {}
}
if !self.received.done {
self.device.send_ack(self.received.id, vec![]);
}
}
}
Err(e) => {
return Err(e);
}
};
if self.received.done {
self.message_queue.send(self.received.clone()).unwrap();
self.received = self.device.create_buffer();
}
if self.last_sync.elapsed().as_secs() >= 5 {
self.device.send_sync();
self.received = self.device.create_buffer();
self.last_sync = Instant::now();
return Ok(());
}
match self.response_queue.try_recv() {
Ok(mut p) => {
p.max_len = self.device.max_packet_len;
self.device.send_packet(p.clone())?;
if p.ptype == HIDIOPacketType::Sync {
self.received = self.device.create_buffer();
}
}
Err(std::sync::mpsc::TryRecvError::Empty) => {}
Err(std::sync::mpsc::TryRecvError::Disconnected) => {
return Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, ""));
}
}
Ok(())
}
}
pub struct HIDIOQueue {
pub info: Endpoint,
message_queue: std::sync::mpsc::Receiver<HIDIOPacketBuffer>,
response_queue: std::sync::mpsc::Sender<HIDIOPacketBuffer>,
}
impl HIDIOQueue {
pub fn new(
info: Endpoint,
message_queue: std::sync::mpsc::Receiver<HIDIOPacketBuffer>,
response_queue: std::sync::mpsc::Sender<HIDIOPacketBuffer>,
) -> HIDIOQueue {
HIDIOQueue {
info,
message_queue,
response_queue,
}
}
pub fn send_packet(
&self,
packet: HIDIOPacketBuffer,
) -> Result<(), mpsc::SendError<HIDIOPacketBuffer>> {
self.response_queue.send(packet)
}
pub fn recv_packet(&mut self) -> HIDIOPacketBuffer {
self.message_queue.recv().unwrap()
}
pub fn messages(&mut self) -> mpsc::TryIter<HIDIOPacketBuffer> {
self.message_queue.try_iter()
}
}
#[derive(Debug, Clone)]
pub struct HIDIOMessage {
pub device: String,
pub message: HIDIOPacketBuffer,
}
pub struct HIDIOMailer {
devices: HashMap<String, HIDIOQueue>,
connected: Arc<RwLock<Vec<Endpoint>>>,
incoming: std::sync::mpsc::Receiver<HIDIOMessage>,
outgoing: Vec<std::sync::mpsc::Sender<HIDIOMessage>>,
}
impl HIDIOMailer {
pub fn new(incoming: std::sync::mpsc::Receiver<HIDIOMessage>) -> HIDIOMailer {
let devices = HashMap::new();
let outgoing = vec![];
let connected = Arc::new(RwLock::new(vec![]));
HIDIOMailer {
devices,
connected,
incoming,
outgoing,
}
}
pub fn register_device(&mut self, id: String, device: HIDIOQueue) {
info!("Registering device: {}", id);
let mut connected = self.connected.write().unwrap();
(*connected).push(device.info.clone());
self.devices.insert(id, device);
}
pub fn unregister_device(&mut self, id: &str) {
info!("Unregistering device: {}", id);
let mut connected = self.connected.write().unwrap();
*connected = connected
.drain_filter(|dev| dev.id.to_string() != id)
.collect::<Vec<_>>();
self.devices.remove(id);
}
pub fn devices(&self) -> Arc<RwLock<Vec<Endpoint>>> {
self.connected.clone()
}
pub fn register_listener(&mut self, sink: std::sync::mpsc::Sender<HIDIOMessage>) {
self.outgoing.push(sink);
}
pub fn process(&mut self) {
for (device, queue) in self.devices.iter_mut() {
for message in queue.messages() {
let m = HIDIOMessage {
device: device.to_string(),
message,
};
for sink in self.outgoing.iter() {
sink.send(m.clone()).unwrap();
}
}
}
for message in self.incoming.try_iter() {
let device = &self.devices[&message.device];
let ret = device.send_packet(message.message);
if ret.is_err() {
info!("Device queue disconnected. Unregistering.");
self.devices.remove(&message.device);
}
}
}
}
pub struct HIDIOMailbox {
pub nodes: Arc<RwLock<Vec<Endpoint>>>,
incoming: std::sync::mpsc::Receiver<HIDIOMessage>,
outgoing: std::sync::mpsc::Sender<HIDIOMessage>,
}
impl HIDIOMailbox {
pub fn new(
nodes: Arc<RwLock<Vec<Endpoint>>>,
incoming: std::sync::mpsc::Receiver<HIDIOMessage>,
outgoing: std::sync::mpsc::Sender<HIDIOMessage>,
) -> HIDIOMailbox {
HIDIOMailbox {
nodes,
incoming,
outgoing,
}
}
pub fn from_sender(
dest: mpsc::Sender<HIDIOMessage>,
nodes: Arc<RwLock<Vec<Endpoint>>>,
) -> (mpsc::Sender<HIDIOMessage>, HIDIOMailbox) {
let (writer, reader) = channel::<HIDIOMessage>();
let mailbox = HIDIOMailbox::new(nodes, reader, dest);
(writer, mailbox)
}
pub fn send_packet(&self, device: String, packet: HIDIOPacketBuffer) {
let result = self.outgoing.send(HIDIOMessage {
device,
message: packet,
});
if let Err(e) = result {
error!("send_packet {}", e);
}
}
pub fn recv(&self) -> HIDIOMessage {
self.incoming.recv().unwrap()
}
pub fn recv_psuedoblocking(&self) -> Option<HIDIOMessage> {
match self.incoming.recv_timeout(Duration::from_millis(1)) {
Ok(m) => Some(m),
Err(mpsc::RecvTimeoutError::Timeout) => None,
Err(mpsc::RecvTimeoutError::Disconnected) => {
warn!("Lost socket");
std::process::exit(1);
}
}
}
pub fn iter(&self) -> mpsc::Iter<HIDIOMessage> {
self.incoming.iter()
}
pub fn send_sync(&self, device: String) {
self.send_packet(
device,
HIDIOPacketBuffer {
ptype: HIDIOPacketType::Sync,
id: 0,
max_len: 64,
data: vec![],
done: true,
},
);
}
pub fn send_ack(&self, device: String, _id: u32, data: Vec<u8>) {
self.send_packet(
device,
HIDIOPacketBuffer {
ptype: HIDIOPacketType::ACK,
id: 0,
max_len: 64,
data,
done: true,
},
);
}
pub fn send_nack(&self, device: String, id: u32, data: Vec<u8>) {
self.send_packet(
device,
HIDIOPacketBuffer {
ptype: HIDIOPacketType::NAK,
id,
max_len: 64,
data,
done: true,
},
);
}
pub fn send_command(&self, device: String, id: HIDIOCommandID, data: Vec<u8>) {
self.send_packet(
device,
HIDIOPacketBuffer {
ptype: HIDIOPacketType::Data,
id: id as u32,
max_len: 64,
data,
done: true,
},
);
}
}
pub fn initialize(mailer: HIDIOMailer) {
info!("Initializing devices...");
hidusb::initialize(mailer);
debug::initialize();
}