use std::time::Duration; use std::pin::Pin; use futures::future::Future; use log::{info, warn}; use async_std::sync::{Arc, RwLock}; use async_std::io::prelude::{ReadExt, WriteExt}; use std::collections::HashMap; use serde::Serialize; use serde::de::DeserializeOwned; use crate::interserver::{ServerId, InterserverActor}; use libpso::crypto::{PSOCipher, NullCipher, CipherError}; use crate::serverstate::{ServerState, SendServerPacket, RecvServerPacket}; use entity::gateway::entitygateway::EntityGateway; use async_std::channel; use std::fmt::Debug; #[derive(Debug)] enum MessageReceiverError { //InvalidSize, InvalidPayload, //NetworkError(std::io::Error), Disconnected, } struct MessageReceiver { socket: async_std::net::TcpStream, } impl MessageReceiver { fn new(socket: async_std::net::TcpStream) -> MessageReceiver { MessageReceiver { socket, } } async fn recv(&mut self) -> Result { let mut size_buf = [0u8; 4]; self.socket.read_exact(&mut size_buf).await.map_err(|_| MessageReceiverError::Disconnected)?; let size = u32::from_le_bytes(size_buf) as usize; let mut payload = vec![0u8; size]; self.socket.read_exact(&mut payload).await.map_err(|_| MessageReceiverError::Disconnected)?; let payload = String::from_utf8(payload).map_err(|_| MessageReceiverError::InvalidPayload)?; let msg = serde_json::from_str(&payload).map_err(|_| MessageReceiverError::InvalidPayload)?; Ok(msg) } } async fn interserver_recv_loop(mut state: STATE, server_id: ServerId, socket: async_std::net::TcpStream, ships: Arc>>>) where STATE: InterserverActor + Send, S: serde::Serialize + Debug + Send, R: serde::de::DeserializeOwned + Debug + Send, E: Debug + Send, { let mut msg_receiver = MessageReceiver::new(socket); loop { match msg_receiver.recv::().await { Ok(msg) => { info!("[interserver recv {:?}] {:?}", server_id, msg); match state.on_action(server_id, msg).await { Ok(response) => { for resp in response { ships .read() .await .get(&resp.0) .unwrap() .send(resp.1) .await .unwrap(); } }, Err(err) => { warn!("[interserver recv {:?}] error {:?}", server_id, err); } } }, Err(err) => { if let MessageReceiverError::Disconnected = err { info!("[interserver recv {:?}] disconnected", server_id); for (_, _sender) in ships.read().await.iter() { for pkt in state.on_disconnect(server_id).await { ships .read() .await .get(&pkt.0) .unwrap() .send(pkt.1) .await .unwrap(); } } ships .write() .await .remove(&server_id); break; } info!("[interserver recv {:?}] error {:?}", server_id, err); } } } } async fn interserver_send_loop(server_id: ServerId, mut socket: async_std::net::TcpStream, to_send: channel::Receiver) where S: serde::Serialize + std::fmt::Debug, { loop { let msg = to_send.recv().await.unwrap(); let payload = serde_json::to_string(&msg); if let Ok(payload) = payload { let len_bytes = u32::to_le_bytes(payload.len() as u32); if let Err(err) = socket.write_all(&len_bytes).await { warn!("[interserver send {:?}] failed: {:?}", server_id, err); break; } if let Err(err) = socket.write_all(payload.as_bytes()).await { warn!("[interserver send {:?}] failed: {:?}", server_id, err); break; } } } } pub async fn run_interserver_listen(mut state: STATE, port: u16) where STATE: InterserverActor + Send + 'static, S: serde::Serialize + Debug + Send + 'static, R: serde::de::DeserializeOwned + Debug + Send, E: Debug + Send, { let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap(); let mut id = 0; let ships = Arc::new(RwLock::new(HashMap::new())); loop { let (socket, addr) = listener.accept().await.unwrap(); info!("[interserver listen] new server: {:?} {:?}", socket, addr); id += 1; let server_id = crate::interserver::ServerId(id); let (client_tx, client_rx) = async_std::channel::unbounded(); state.set_sender(server_id, client_tx.clone()).await; ships .write() .await .insert(server_id, client_tx.clone()); for msg in state.on_connect(server_id).await { if let Some(ship_sender) = ships.read().await.get(&msg.0) { ship_sender.send(msg.1).await.unwrap(); } } let rstate = state.clone(); let rsocket = socket.clone(); let rships = ships.clone(); async_std::task::spawn(async move { interserver_recv_loop(rstate, server_id, rsocket, rships).await; }); async_std::task::spawn(async move { interserver_send_loop(server_id, socket, client_rx).await; }); } } pub async fn run_interserver_connect(mut state: STATE, ip: std::net::Ipv4Addr, port: u16) where STATE: InterserverActor + Send + 'static, S: serde::Serialize + Debug + Send + 'static, R: serde::de::DeserializeOwned + Debug + Send, E: Debug + Send, { let mut id = 0; loop { info!("[interserver connect] trying to connect to server"); let socket = match async_std::net::TcpStream::connect((ip, port)).await { Ok(socket) => socket, Err(err) => { info!("err trying to connect to loginserv {:?}", err); async_std::task::sleep(std::time::Duration::from_secs(10)).await; continue; } }; id += 1; let server_id = crate::interserver::ServerId(id); info!("[interserver connect] found loginserv: {:?} {:?}", server_id, socket); let (client_tx, client_rx) = async_std::channel::unbounded(); state.set_sender(server_id, client_tx.clone()).await; for msg in state.on_connect(server_id).await { client_tx.send(msg.1).await.unwrap(); } let other_server = vec![(server_id, client_tx.clone())].into_iter().collect(); let rstate = state.clone(); let rsocket = socket.clone(); async_std::task::spawn(async move { interserver_recv_loop(rstate, server_id, rsocket, Arc::new(RwLock::new(other_server))).await; }); let ssocket = socket.clone(); async_std::task::spawn(async move { interserver_send_loop(server_id, ssocket, client_rx).await; }); let mut buf = [0u8; 1]; loop { let peek = socket.peek(&mut buf).await; if let Ok(0) = peek { break } } } }