From 93426dc52a690ae36cb509a35b98ff06f2d6e0dc Mon Sep 17 00:00:00 2001 From: jake Date: Thu, 23 Jan 2020 18:00:34 -0800 Subject: [PATCH] async! --- Cargo.toml | 2 + src/common/client.rs | 128 ---------------- src/common/clientpool.rs | 184 ----------------------- src/common/mainloop.rs | 307 +++++++++++++++++++++++++++++--------- src/common/mod.rs | 2 - src/common/network.rs | 41 ----- src/common/serverstate.rs | 4 +- src/login/character.rs | 2 +- src/login/login.rs | 2 +- src/main.rs | 66 ++++---- src/patch/patch.rs | 4 +- src/ship/ship.rs | 12 +- 12 files changed, 284 insertions(+), 470 deletions(-) delete mode 100644 src/common/client.rs delete mode 100644 src/common/clientpool.rs diff --git a/Cargo.toml b/Cargo.toml index e22fdf8..409ba3a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,8 @@ path = "src/login_main.rs" [dependencies] libpso = { git = "http://git.sharnoth.com/jake/libpso" } +async-std = { version = "1.4.0", features = ["unstable"] } +futures = "0.3.1" rand = "0.6.5" mio = "0.6" mio-extras = "2.0.5" diff --git a/src/common/client.rs b/src/common/client.rs deleted file mode 100644 index b079e82..0000000 --- a/src/common/client.rs +++ /dev/null @@ -1,128 +0,0 @@ -use log::{info, trace, warn}; - -use libpso::crypto::{PSOCipher, NullCipher}; - -use crate::common::serverstate::{SendServerPacket, RecvServerPacket, ClientId}; -use crate::common::network::PacketNetworkError; - -use std::io::{Read, Write}; - -pub struct Client { - pub id: ClientId, - running: bool, - pub socket: mio::tcp::TcpStream, - cipher_in: Box, - cipher_out: Box, - recv_buffer: Vec, - incoming_data: Vec, - send_buffer: Vec, - _s: std::marker::PhantomData, - _r: std::marker::PhantomData, -} - -impl Client where - S: SendServerPacket + std::fmt::Debug, - R: RecvServerPacket + std::fmt::Debug, -{ - pub fn new(id: ClientId, socket: mio::tcp::TcpStream) -> Client { - Client { - id: id, - running: true, - socket: socket, - cipher_in: Box::new(NullCipher {}), - cipher_out: Box::new(NullCipher {}), - recv_buffer: Vec::with_capacity(32), - incoming_data: Vec::new(), - send_buffer: Vec::new(), - _s: std::marker::PhantomData, - _r: std::marker::PhantomData, - } - } - - pub fn set_cipher(&mut self, cin: Box, out: Box) { - self.cipher_in = cin; - self.cipher_out = out; - } - - pub fn send_data(&mut self) { - if self.send_buffer.len() == 0 { - return; - } - match self.socket.write(&self.send_buffer) { - Ok(len) => { - if len == 0 { - self.running = false; - } - self.send_buffer.drain(..len); - }, - Err(err) => { - warn!("[client] error sending data to {:?}: {:?}", self.socket, err); - } - } - } - - fn read_data_into_buffer(&mut self) -> Result<(), PacketNetworkError> { - let mut new_data = [0u8; 0x8000]; - let len = self.socket.read(&mut new_data)?; - if len == 0 { - return Err(PacketNetworkError::ClientDisconnected); - } - - self.recv_buffer.extend_from_slice(&mut new_data[..len]); - - let block_chunk_len = self.recv_buffer.len() / self.cipher_in.block_size() * self.cipher_in.block_size(); - let buf = self.recv_buffer.drain(..block_chunk_len).collect(); - let mut dec_buf = self.cipher_in.decrypt(&buf)?; - self.incoming_data.append(&mut dec_buf); - Ok(()) - } - - pub fn read_pkts(&mut self) -> Result, PacketNetworkError> { - self.read_data_into_buffer()?; - let mut result = Vec::new(); - - loop { - if self.incoming_data.len() < 2 { - break; - } - let pkt_size = u16::from_le_bytes([self.incoming_data[0], self.incoming_data[1]]) as usize; - let mut pkt_len = pkt_size; - while pkt_len % self.cipher_in.block_size() != 0 { - pkt_len += 1; - } - - if pkt_len > self.incoming_data.len() { - break; - } - - let pkt_data = self.incoming_data.drain(..pkt_len).collect::>(); - - trace!("[recv buf from {:?}] {:?}", self.id, pkt_data); - let pkt = match R::from_bytes(&pkt_data[..pkt_size]) { - Ok(p) => p, - Err(err) => { - warn!("error RecvServerPacket::from_bytes: {:?}", err); - continue - }, - }; - - trace!("[recv from {:?}] {:?}", self.id, pkt); - result.push(pkt); - } - Ok(result) - } - - pub fn send_pkt(&mut self, pkt: S) { - trace!("[send to {:?}] {:?}", self.id, pkt); - let buf = pkt.as_bytes(); - if buf.len() < 1024*2 { - trace!("[send: buf] {:?}", buf); - } - else { - trace!("[send: buf] [...large buffer...]"); - } - let mut cbuf = self.cipher_out.encrypt(&buf).unwrap(); - self.send_buffer.append(&mut cbuf); - self.send_data(); - } -} diff --git a/src/common/clientpool.rs b/src/common/clientpool.rs deleted file mode 100644 index 3725565..0000000 --- a/src/common/clientpool.rs +++ /dev/null @@ -1,184 +0,0 @@ -//use std::thread; -use std::collections::HashMap; - -use std::net::{SocketAddr, Ipv4Addr}; - -use std::sync::mpsc::TryRecvError; -use mio::tcp::TcpListener; -use mio::{Events, Poll, Token, Ready, PollOpt}; -use log::{info, warn}; - -use crate::common::client::Client; -use crate::common::serverstate::{SendServerPacket, RecvServerPacket, ClientId}; -use libpso::crypto::PSOCipher; - -use mio_extras::channel::{Sender, Receiver}; - -use crate::common::network::PacketNetworkError; - -//use threadpool::ThreadPool; - -//const THREAD_COUNT: usize = 4; - -fn client_read(sender: &Sender>, client: &mut Client) -> Result<(), PacketNetworkError> where - S: SendServerPacket + std::fmt::Debug, - R: RecvServerPacket + std::fmt::Debug, -{ - let pkts = client.read_pkts(); - - for pkt in pkts? { - sender.send(ClientPoolAction::Packet(client.id, pkt)).unwrap(); - } - Ok(()) -} - -fn client_write(client: &mut Client) where - S: SendServerPacket + std::fmt::Debug, - R: RecvServerPacket + std::fmt::Debug, -{ - client.send_data(); -} - -pub enum ClientAction { - EncryptionKeys(ClientId, Box, Box), - Packet(ClientId, S) -} - -#[derive(Debug)] -pub enum ClientPoolAction { - NewClient(ClientId), - Packet(ClientId, R), - Disconnect(ClientId), -} - - -pub struct ClientPool{ - poll: Poll, - receiver: Receiver>, - sender: Sender>, - client_ids: HashMap, - clients: HashMap>, - listener: TcpListener, - client_id_incr: usize, -} - - - - -impl ClientPool where - S: SendServerPacket + std::fmt::Debug, - R: RecvServerPacket + std::fmt::Debug, -{ - pub fn new( - receiver: Receiver>, - sender: Sender>, - port: u16 - ) -> ClientPool { - ClientPool { - poll: Poll::new().unwrap(), - receiver: receiver, - sender: sender, - client_ids: HashMap::new(), - clients: HashMap::new(), - listener: TcpListener::bind(&SocketAddr::from((Ipv4Addr::new(0,0,0,0), port))).unwrap(), - client_id_incr: 3, - } - } - - fn new_client(&mut self) { - let (socket, _addr) = self.listener.accept().unwrap(); - - let client_id = ClientId(self.client_id_incr); - self.client_id_incr += 1; - - self.poll.register(&socket, Token(client_id.0), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap(); - let client = Client::new(client_id, socket); - - self.client_ids.insert(Token(client_id.0), client_id); - self.clients.insert(client_id, client); - self.sender.send(ClientPoolAction::NewClient(client_id)).unwrap(); - } - - fn packet_to_send(&mut self) { - loop { - match self.receiver.try_recv() { - Ok(action) => { - match action { - ClientAction::EncryptionKeys(client_id, cipher_in, cipher_out) => { - self.clients.get_mut(&client_id) - .map(|client| { - client.set_cipher(cipher_in, cipher_out); - }); - } - ClientAction::Packet(client_id, pkt) => { - self.clients.get_mut(&client_id) - .map(|client| { - client.send_pkt(pkt); - }); - } - } - }, - Err(err) => { - match err { - TryRecvError::Empty => break, - TryRecvError::Disconnected => { - break; - // TODO! - } - } - } - } - } - } - - - pub fn io_loop(mut self) { - self.poll.register(&self.listener, Token(0), Ready::readable(), PollOpt::edge()).unwrap(); - self.poll.register(&self.receiver, Token(1), Ready::readable(), PollOpt::edge()).unwrap(); - - let mut events = Events::with_capacity(1024); - - loop { - self.poll.poll(&mut events, None).unwrap(); - - for event in &events { - match event.token() { - Token(0) => self.new_client(), - Token(1) => self.packet_to_send(), - _ => { - let client_id = match self.client_ids.get(&event.token()) { - Some(client_id) => client_id, - None => continue, - }; - - let client = match self.clients.get_mut(&client_id) { - Some(client) => client, - None => continue, - }; - - if event.readiness().is_writable() { - client_write(client); - } - if event.readiness().is_readable() { - match client_read(&self.sender, client) { - Ok(()) =>{}, - Err(err) => { - match err { - PacketNetworkError::ClientDisconnected => { - info!("client {:?} disconnected", client_id); - self.poll.deregister(&client.socket).unwrap(); - self.sender.send(ClientPoolAction::Disconnect(*client_id)).unwrap(); - }, - _ => { - warn!("pkt err: {:?}", err); - }, - } - } - } - } - } - } - } - } - } -} diff --git a/src/common/mainloop.rs b/src/common/mainloop.rs index 62b7a59..00a1a1f 100644 --- a/src/common/mainloop.rs +++ b/src/common/mainloop.rs @@ -1,99 +1,260 @@ -use std::thread; -use log::warn; -use mio::{Events, Poll, Token, Ready, PollOpt}; -use mio_extras::channel::{channel, Sender, Receiver}; +use log::{trace, info, warn}; +use async_std::sync::{Arc, Mutex}; +use async_std::io::{Read, Write}; +use async_std::io::prelude::{ReadExt, WriteExt}; +use async_std::prelude::{StreamExt}; +use std::collections::HashMap; -use crate::common::clientpool::{ClientPool, ClientAction, ClientPoolAction}; +use libpso::crypto::{PSOCipher, NullCipher}; +use crate::common::serverstate::ClientId; use crate::common::serverstate::{RecvServerPacket, SendServerPacket, ServerState, OnConnect}; -fn recv_from_clientpool(state: &mut STATE, - pool_recv: &Receiver>, - pool_send: &Sender>) where - STATE: ServerState, - S: SendServerPacket, - R: RecvServerPacket, - E: std::fmt::Debug, + +enum PacketReceiverError { + ClientDisconnect, +} + +struct PacketReceiver { + socket: Arc, + cipher: Arc>>, + recv_buffer: Vec, + incoming_data: Vec, +} + +impl PacketReceiver { + fn new(socket: Arc, cipher: Arc>>) -> PacketReceiver { + PacketReceiver { + socket: socket, + cipher: cipher, + recv_buffer: Vec::new(), + incoming_data: Vec::new(), + } + } + + async fn fill_recv_buffer(&mut self) -> Result<(), PacketReceiverError>{ + let mut data = [0u8; 0x8000]; + + let mut socket = &*self.socket; + let len = socket.read(&mut data).await.unwrap(); + if len == 0 { + return Err(PacketReceiverError::ClientDisconnect); + } + + self.recv_buffer.extend_from_slice(&mut data[..len]); + + let mut dec_buf = { + let mut cipher = self.cipher.lock().await; + let block_chunk_len = self.recv_buffer.len() / cipher.block_size() * cipher.block_size(); + let buf = self.recv_buffer.drain(..block_chunk_len).collect(); + cipher.decrypt(&buf).unwrap() + }; + self.incoming_data.append(&mut dec_buf); + + Ok(()) + } + + async fn recv_pkts(&mut self) -> Result, PacketReceiverError> { + self.fill_recv_buffer().await?; + + let mut result = Vec::new(); + loop { + if self.incoming_data.len() < 2 { + break; + } + let pkt_size = u16::from_le_bytes([self.incoming_data[0], self.incoming_data[1]]) as usize; + let mut pkt_len = pkt_size; + while pkt_len % self.cipher.lock().await.block_size() != 0 { + pkt_len += 1; + } + + if pkt_len > self.incoming_data.len() { + break; + } + + let pkt_data = self.incoming_data.drain(..pkt_len).collect::>(); + + trace!("[recv buf] {:?}", pkt_data); + let pkt = match R::from_bytes(&pkt_data[..pkt_size]) { + Ok(p) => p, + Err(err) => { + warn!("error RecvServerPacket::from_bytes: {:?}", err); + continue + }, + }; + + result.push(pkt); + } + + Ok(result) + } +} + +async fn send_pkt(socket: Arc, cipher: Arc>>, pkt: S) { + let buf = pkt.as_bytes(); + let cbuf = cipher.lock().await.encrypt(&buf).unwrap(); + let mut ssock = &*socket; + ssock.write_all(&cbuf).await; + +} + + +enum ClientAction { + NewClient(ClientId, async_std::sync::Sender), + Packet(ClientId, R), + Disconnect(ClientId), +} + +enum ServerStateAction { + Cipher(Box, Box), + Packet(S), + Disconnect, +} + +async fn server_state_loop(mut state: STATE, + server_state_receiver: async_std::sync::Receiver, R>>) where + STATE: ServerState + Send + 'static, + S: SendServerPacket + std::fmt::Debug + Send + 'static, + R: RecvServerPacket + std::fmt::Debug + Send + 'static, + E: std::fmt::Debug + Send, { - loop { - match pool_recv.try_recv() { - Ok(incoming) => { - match incoming { - ClientPoolAction::NewClient(client_id) => { - for action in state.on_connect(client_id).into_iter() { - match action { - OnConnect::Cipher((in_cipher, out_cipher)) => { - pool_send.send(ClientAction::EncryptionKeys(client_id, in_cipher, out_cipher)).unwrap(); - } - OnConnect::Packet(pkt) => { - pool_send.send(ClientAction::Packet(client_id, pkt)).unwrap(); - } - } - } - }, - ClientPoolAction::Packet(client_id, pkt) => { - let to_send = state.handle(client_id, &pkt); - match to_send { - Ok(pkts) => { - for pkt in pkts { - pool_send.send(ClientAction::Packet(pkt.0, pkt.1)).unwrap(); - } + async_std::task::spawn(async move { + let mut clients = HashMap::new(); + + { + let action = server_state_receiver.recv().await.unwrap(); + + match action { + ClientAction::NewClient(client_id, sender) => { + clients.insert(client_id, sender.clone()); + for action in state.on_connect(client_id) { + match action { + OnConnect::Cipher((inc, outc)) => { + sender.send(ServerStateAction::Cipher(inc, outc)).await; }, - Err(err) => { - // TODO: break? - warn!("[handler error]: {:?} {:?}", client_id, err); + OnConnect::Packet(pkt) => { + sender.send(ServerStateAction::Packet(pkt)).await; } } - }, - ClientPoolAction::Disconnect(client_id) => { - for pkt in state.on_disconnect(client_id) { - pool_send.send(ClientAction::Packet(pkt.0, pkt.1)).unwrap(); + } + }, + ClientAction::Packet(client_id, pkt) => { + let k = state.handle(client_id, &pkt); + let pkts = k.unwrap().collect::>(); + for (client_id, pkt) in pkts { + let client = clients.get_mut(&client_id).unwrap(); + client.send(ServerStateAction::Packet(pkt)).await; + } + }, + ClientAction::Disconnect(client_id) => { + let pkts = state.on_disconnect(client_id); + for (client_id, pkt) in pkts { + let client = clients.get_mut(&client_id).unwrap(); + client.send(ServerStateAction::Packet(pkt)).await; + } + + let client = clients.remove(&client_id).unwrap(); + client.send(ServerStateAction::Disconnect).await; + } + } + } + }); +} + +async fn client_recv_loop(client_id: ClientId, + socket: Arc, + cipher: Arc>>, + server_sender: async_std::sync::Sender, R>>, + client_sender: async_std::sync::Sender>) where + S: SendServerPacket + std::fmt::Debug + Send + 'static, + R: RecvServerPacket + std::fmt::Debug + Send + 'static, +{ + async_std::task::spawn(async move { + server_sender.send(ClientAction::NewClient(client_id, client_sender)).await; + let mut pkt_receiver = PacketReceiver::new(socket, cipher); + + loop { + match pkt_receiver.recv_pkts().await { + Ok(pkts) => { + for pkt in pkts { + trace!("[recv from {:?}] {:?}", client_id, pkt); + server_sender.send(ClientAction::Packet(client_id, pkt)).await; + } + }, + Err(err) => { + match err { + PacketReceiverError::ClientDisconnect => { + trace!("[client disconnected] {:?}", client_id); + server_sender.send(ClientAction::Disconnect(client_id)); + break; } } } - }, - Err(_err) => { - break; } } - } + }); } +async fn client_send_loop(client_id: ClientId, + socket: Arc, + cipher_in: Arc>>, + cipher_out: Arc>>, + client_receiver: async_std::sync::Receiver>, -pub fn mainloop(mut state: STATE, port: u16) where - STATE: ServerState + Send + 'static, +) where S: SendServerPacket + std::fmt::Debug + Send + 'static, - R: RecvServerPacket + std::fmt::Debug + Send + 'static, - E: std::fmt::Debug, { - let (pool_send, pool_recv) = channel(); - //let (patch_handler_send, patch_handler_recv) = channel::>(); - let (handler_send, handler_recv) = channel(); - - //let sender_clone = patch_handler_send.clone(); - let client_thread = thread::spawn(move || { - let clientpool = ClientPool::new(pool_recv, handler_send, port); - clientpool.io_loop(); + async_std::task::spawn(async move { + loop { + let action = client_receiver.recv().await.unwrap(); + match action { + ServerStateAction::Cipher(inc, outc) => { + *cipher_in.lock().await = inc; + *cipher_out.lock().await = outc; + } + ServerStateAction::Packet(pkt) => { + trace!("[send to {:?}] {:?}", client_id, pkt); + send_pkt(socket.clone(), cipher_out.clone(), pkt).await + }, + ServerStateAction::Disconnect => { + break; + } + }; + } }); +} - //let handler_threadpool = threadpool::ThreadPool::new(4); - let handler_thread = thread::spawn(move || { - let poll = Poll::new().unwrap(); - poll.register(&handler_recv, Token(0), Ready::readable(), PollOpt::edge()).unwrap(); - let mut events = Events::with_capacity(1024); +pub async fn mainloop_async(mut state: STATE, port: u16) where + STATE: ServerState + Send + 'static, + S: SendServerPacket + std::fmt::Debug + Send + Sync + 'static, + R: RecvServerPacket + std::fmt::Debug + Send + Sync + 'static, + E: std::fmt::Debug + Send, +{ + + let listener = async_std::task::spawn(async move { + let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap(); + let mut id = 1; + + let (server_state_sender, server_state_receiver) = async_std::sync::channel(1024); + + server_state_loop(state, server_state_receiver).await; loop { - poll.poll(&mut events, None).unwrap(); - - for event in &events { - match event.token() { - Token(0) => recv_from_clientpool(&mut state, &handler_recv, &pool_send), - _ => panic!() - } - } + let (sock, addr) = listener.accept().await.unwrap(); + let client_id = crate::common::serverstate::ClientId(id); + id += 1; + + info!("new client {:?} {:?} {:?}", client_id, sock, addr); + + let (client_sender, client_receiver) = async_std::sync::channel(64); + let socket = Arc::new(sock); + let cipher_in: Arc>> = Arc::new(Mutex::new(Box::new(NullCipher {}))); + let cipher_out: Arc>> = Arc::new(Mutex::new(Box::new(NullCipher {}))); + + client_recv_loop(client_id, socket.clone(), cipher_in.clone(), server_state_sender.clone(), client_sender).await; + client_send_loop(client_id, socket.clone(), cipher_in.clone(), cipher_out.clone(), client_receiver).await; } }); - client_thread.join().unwrap(); - handler_thread.join().unwrap(); + listener.await } diff --git a/src/common/mod.rs b/src/common/mod.rs index dbee73e..55a50a1 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -1,8 +1,6 @@ pub mod cipherkeys; pub mod network; pub mod serverstate; -pub mod client; -pub mod clientpool; pub mod mainloop; pub mod leveltable; diff --git a/src/common/network.rs b/src/common/network.rs index bd91b1d..713cf05 100644 --- a/src/common/network.rs +++ b/src/common/network.rs @@ -31,44 +31,3 @@ impl From for PacketNetworkError { PacketNetworkError::PacketParseError(err) } } - -pub fn recv_packet(socket: &mut T, cipher: &mut dyn PSOCipher) -> Result, PacketNetworkError> { - let mut size_buf = vec![0u8; cipher.header_size()]; - let mut offset = 0; - while offset < cipher.header_size() { - let diff = socket.read(&mut size_buf[offset..])?; - if diff == 0 { - return Err(PacketNetworkError::ClientDisconnected); - } - offset += diff; - } - - let mut dec_size_buf = cipher.decrypt(&size_buf)?; - let pkt_size = u16::from_le_bytes([dec_size_buf[0], dec_size_buf[1]]) as usize; - - let mut buf_size = pkt_size; - while buf_size % cipher.block_size() != 0 { - buf_size += 1; - } - buf_size -= cipher.header_size(); - - let mut data_buf = vec![0u8; buf_size]; - let mut offset = 0; - while offset < buf_size { - let diff = socket.read(&mut data_buf[offset..])?; - if diff == 0 { - return Err(PacketNetworkError::ClientDisconnected); - } - offset += diff; - } - - let mut dec_data_buf = cipher.decrypt(&data_buf.to_vec())?; - - let mut full_buf = Vec::new(); - full_buf.append(&mut dec_size_buf); - full_buf.append(&mut dec_data_buf); - full_buf = full_buf[..pkt_size].to_vec(); - - trace!("[recv: buf]: {:X?}", full_buf); - Ok(full_buf) -} diff --git a/src/common/serverstate.rs b/src/common/serverstate.rs index 3ed7fa7..2bf0dad 100644 --- a/src/common/serverstate.rs +++ b/src/common/serverstate.rs @@ -6,7 +6,7 @@ pub struct ClientId(pub usize); pub enum OnConnect { Packet(S), - Cipher((Box, Box)), + Cipher((Box, Box)), } pub trait RecvServerPacket: Sized { @@ -24,7 +24,7 @@ pub trait ServerState { fn on_connect(&mut self, id: ClientId) -> Vec>; fn handle(&mut self, id: ClientId, pkt: &Self::RecvPacket) - -> Result>, Self::PacketError>; + -> Result + Send>, Self::PacketError>; fn on_disconnect(&mut self, id: ClientId) -> Vec<(ClientId, Self::SendPacket)>; } diff --git a/src/login/character.rs b/src/login/character.rs index ace3e1e..d1de8c3 100644 --- a/src/login/character.rs +++ b/src/login/character.rs @@ -477,7 +477,7 @@ impl ServerState for CharacterServerState { } fn handle(&mut self, id: ClientId, pkt: &RecvCharacterPacket) - -> Result>, CharacterError> { + -> Result + Send>, CharacterError> { Ok(match pkt { RecvCharacterPacket::Login(login) => { if login.session.action == SessionAction::SelectCharacter { diff --git a/src/login/login.rs b/src/login/login.rs index 25dcb02..07af784 100644 --- a/src/login/login.rs +++ b/src/login/login.rs @@ -120,7 +120,7 @@ impl ServerState for LoginServerState { } fn handle(&mut self, id: ClientId, pkt: &Self::RecvPacket) - -> Result>, LoginError> { + -> Result + Send>, LoginError> { Ok(match pkt { RecvLoginPacket::Login(login) => { Box::new(self.validate_login(login) diff --git a/src/main.rs b/src/main.rs index 00535f2..fa534e3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -145,35 +145,41 @@ fn main() { character.character.name = utf8_to_utf16_array!("Test Char 8", 0x10); entity_gateway.set_character(&character); - let patch_thread = thread::spawn(|| { - info!("[patch] starting server"); - let patch_config = load_config(); - let patch_motd = load_motd(); - let (patch_file_tree, patch_file_lookup) = generate_patch_tree(patch_config.path.as_str()); - let patch_state = PatchServerState::new(patch_file_tree, patch_file_lookup, patch_motd); - common::mainloop::mainloop(patch_state, patch_config.port); - }); - let thread_entity_gateway = entity_gateway.clone(); - let auth_thread = thread::spawn(|| { - info!("[auth] starting server"); - let auth_state = LoginServerState::new(thread_entity_gateway); - common::mainloop::mainloop(auth_state, login::login::LOGIN_PORT); - }); - let thread_entity_gateway = entity_gateway.clone(); - let char_thread = thread::spawn(|| { - info!("[character] starting server"); - let char_state = CharacterServerState::new(thread_entity_gateway); - common::mainloop::mainloop(char_state, login::character::CHARACTER_PORT); - }); - let thread_entity_gateway = entity_gateway.clone(); - let ship_thread = thread::spawn(|| { - info!("[ship] starting server"); - let ship_state = ShipServerState::new(thread_entity_gateway); - common::mainloop::mainloop(ship_state, ship::ship::SHIP_PORT); - }); + async_std::task::block_on(async move { + let thread_entity_gateway = entity_gateway.clone(); + let patch = async_std::task::spawn(async { + info!("[patch] starting server"); + let patch_config = load_config(); + let patch_motd = load_motd(); + let (patch_file_tree, patch_file_lookup) = generate_patch_tree(patch_config.path.as_str()); + let patch_state = PatchServerState::new(patch_file_tree, patch_file_lookup, patch_motd); + + crate::common::mainloop::mainloop_async(patch_state, patch_config.port).await; + }); + + let thread_entity_gateway = entity_gateway.clone(); + let auth = async_std::task::spawn(async { + info!("[auth] starting server"); + let auth_state = LoginServerState::new(thread_entity_gateway); - patch_thread.join().unwrap(); - auth_thread.join().unwrap(); - char_thread.join().unwrap(); - ship_thread.join().unwrap(); + common::mainloop::mainloop_async(auth_state, login::login::LOGIN_PORT).await; + }); + + let thread_entity_gateway = entity_gateway.clone(); + let character = async_std::task::spawn(async { + info!("[character] starting server"); + let char_state = CharacterServerState::new(thread_entity_gateway); + + common::mainloop::mainloop_async(char_state, login::character::CHARACTER_PORT).await; + }); + + let thread_entity_gateway = entity_gateway.clone(); + let ship = async_std::task::spawn(async { + info!("[ship] starting server"); + let ship_state = ShipServerState::new(thread_entity_gateway); + common::mainloop::mainloop_async(ship_state, ship::ship::SHIP_PORT).await; + }); + + futures::join!(patch, auth, character, ship); + }); } diff --git a/src/patch/patch.rs b/src/patch/patch.rs index a9ff1b3..eafa771 100644 --- a/src/patch/patch.rs +++ b/src/patch/patch.rs @@ -170,7 +170,7 @@ impl ServerState for PatchServerState { } fn handle(&mut self, id: ClientId, pkt: &RecvPatchPacket) - -> Result>, PatchError> { + -> Result + Send>, PatchError> { Ok(match pkt { RecvPatchPacket::PatchWelcomeReply(_pkt) => { Box::new(vec![SendPatchPacket::RequestLogin(RequestLogin {})].into_iter().map(move |pkt| (id, pkt))) @@ -287,7 +287,7 @@ fn does_file_need_updating(file_info: &FileInfoReply, patch_file_lookup: &HashMa struct SendFileIterator { done: bool, - file_iter: Box>, + file_iter: Box + Send>, patch_file_lookup: HashMap, current_file: Option>, chunk_num: u32, diff --git a/src/ship/ship.rs b/src/ship/ship.rs index a6196b0..ba22e50 100644 --- a/src/ship/ship.rs +++ b/src/ship/ship.rs @@ -314,7 +314,7 @@ impl ShipServerState { Ok(v) } - fn message(&mut self, id: ClientId, msg: &Message) -> Box> { + fn message(&mut self, id: ClientId, msg: &Message) -> Box + Send> { let cmsg = msg.clone(); Box::new(self.client_location.get_area_by_user(id).clients().iter() .filter(|client| client.client_id != id) @@ -323,7 +323,7 @@ impl ShipServerState { }).collect::>().into_iter()) } - fn direct_message(&mut self, id: ClientId, msg: &DirectMessage) -> Box> { + fn direct_message(&mut self, id: ClientId, msg: &DirectMessage) -> Box + Send> { let cmsg = msg.clone(); Box::new(self.client_location.get_area_by_user(id).clients().iter() .filter(|client| client.index == cmsg.flag as usize) @@ -332,7 +332,7 @@ impl ShipServerState { }).collect::>().into_iter()) } - fn player_chat(&mut self, id: ClientId, msg: &PlayerChat) -> Result>, ShipError> { + fn player_chat(&mut self, id: ClientId, msg: &PlayerChat) -> Result + Send>, ShipError> { let client = self.clients.get_mut(&id).ok_or(ShipError::ClientNotFound(id))?; let cmsg = PlayerChat::new(client.user.guildcard.unwrap(), msg.message.clone()); @@ -342,7 +342,7 @@ impl ShipServerState { }).collect::>().into_iter())) } - fn create_room(&mut self, id: ClientId, create_room: &CreateRoom) -> Box> { + fn create_room(&mut self, id: ClientId, create_room: &CreateRoom) -> Box + Send> { let area = self.client_location.get_area_by_user(id); let area_client = area.clients().into_iter().filter(|client| { client.client_id == id @@ -394,7 +394,7 @@ impl ShipServerState { }))) } - fn room_name_request(&mut self, id: ClientId) -> Box> { + fn room_name_request(&mut self, id: ClientId) -> Box + Send> { let area = self.client_location.get_area_by_user(id); let room_state = self.rooms[area.id()].as_ref().unwrap(); Box::new(vec![(id, SendShipPacket::RoomNameResponse(RoomNameResponse {name: room_state.name.clone()}))].into_iter()) @@ -422,7 +422,7 @@ impl ServerState for ShipServerState { } fn handle(&mut self, id: ClientId, pkt: &RecvShipPacket) - -> Result>, ShipError> { + -> Result + Send>, ShipError> { Ok(match pkt { RecvShipPacket::Login(login) => { Box::new(self.validate_login(id, login)?.into_iter().map(move |pkt| (id, pkt)))