diff --git a/src/bin/main.rs b/src/bin/main.rs index eb7c687..d24a0ac 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -1,5 +1,9 @@ use std::net::Ipv4Addr; use log::{info}; +use async_std::channel; + +use futures::stream::StreamExt; +use futures::future::join_all; use elseware::common::interserver::AuthToken; use elseware::login::login::LoginServerState; @@ -338,20 +342,21 @@ fn main() { let (patch_file_tree, patch_file_lookup) = generate_patch_tree(patch_config.path.as_str()); let patch_state = PatchServerState::new(patch_file_tree, patch_file_lookup, patch_motd); let patch_loop = async_std::task::spawn(async move { - elseware::common::mainloop::run_server(patch_state, patch_config.port).await; + elseware::common::mainloop::run_server(patch_state, patch_config.port, None).await; }); info!("[auth] starting server"); let login_state = LoginServerState::new(entity_gateway.clone(), "127.0.0.1".parse().unwrap()); let login_loop = async_std::task::spawn(async move { - elseware::common::mainloop::run_server(login_state, elseware::login::login::LOGIN_PORT).await; + elseware::common::mainloop::run_server(login_state, elseware::login::login::LOGIN_PORT, None).await; }); info!("[character] starting server"); let char_state = CharacterServerState::new(entity_gateway.clone(), AuthToken("".into())); let sub_char_state = char_state.clone(); + let character_loop = async_std::task::spawn(async move { - elseware::common::mainloop::run_server(sub_char_state, elseware::login::character::CHARACTER_PORT).await; + elseware::common::mainloop::run_server(sub_char_state, elseware::login::character::CHARACTER_PORT, None).await; }); let sub_char_state = char_state.clone(); @@ -368,12 +373,13 @@ fn main() { .gateway(entity_gateway.clone()) .build(); let sub_ship_state = ship_state.clone(); + let (interserver_tx, interserver_rx) = channel::unbounded(); let ship_loop1 = async_std::task::spawn(async move { - elseware::common::mainloop::run_server(sub_ship_state, elseware::ship::ship::SHIP_PORT).await; + elseware::common::mainloop::run_server(sub_ship_state, elseware::ship::ship::SHIP_PORT, Some(interserver_rx)).await; }); let sub_ship_state = ship_state.clone(); let inter_ship_loop1 = async_std::task::spawn(async move { - elseware::common::mainloop::run_interserver_connect(sub_ship_state, std::net::Ipv4Addr::new(127, 0, 0, 1), elseware::login::login::COMMUNICATION_PORT).await; + elseware::common::mainloop::run_interserver_connect(sub_ship_state, std::net::Ipv4Addr::new(127, 0, 0, 1), elseware::login::login::COMMUNICATION_PORT, interserver_tx).await; }); let ship_state = ShipServerStateBuilder::default() @@ -384,12 +390,13 @@ fn main() { .gateway(entity_gateway.clone()) .build(); let sub_ship_state = ship_state.clone(); + let (interserver_tx, interserver_rx) = channel::unbounded(); let ship_loop2 = async_std::task::spawn(async move { - elseware::common::mainloop::run_server(sub_ship_state, elseware::ship::ship::SHIP_PORT+2000).await; + elseware::common::mainloop::run_server(sub_ship_state, elseware::ship::ship::SHIP_PORT+2000, Some(interserver_rx)).await; }); let sub_ship_state = ship_state.clone(); let inter_ship_loop2 = async_std::task::spawn(async move { - elseware::common::mainloop::run_interserver_connect(sub_ship_state, std::net::Ipv4Addr::new(127, 0, 0, 1), elseware::login::login::COMMUNICATION_PORT).await; + elseware::common::mainloop::run_interserver_connect(sub_ship_state, std::net::Ipv4Addr::new(127, 0, 0, 1), elseware::login::login::COMMUNICATION_PORT, interserver_tx).await; }); let ship_state = ShipServerStateBuilder::default() @@ -399,12 +406,13 @@ fn main() { .gateway(entity_gateway.clone()) .build(); let sub_ship_state = ship_state.clone(); + let (interserver_tx, interserver_rx) = channel::unbounded(); let ship_loop3 = async_std::task::spawn(async move { - elseware::common::mainloop::run_server(sub_ship_state, elseware::ship::ship::SHIP_PORT+3000).await; + elseware::common::mainloop::run_server(sub_ship_state, elseware::ship::ship::SHIP_PORT+3000, Some(interserver_rx)).await; }); let sub_ship_state = ship_state.clone(); let inter_ship_loop3 = async_std::task::spawn(async move { - elseware::common::mainloop::run_interserver_connect(sub_ship_state, std::net::Ipv4Addr::new(127, 0, 0, 1), elseware::login::login::COMMUNICATION_PORT).await; + elseware::common::mainloop::run_interserver_connect(sub_ship_state, std::net::Ipv4Addr::new(127, 0, 0, 1), elseware::login::login::COMMUNICATION_PORT, interserver_tx).await; }); futures::future::join_all(vec![patch_loop, login_loop, character_loop, inter_character_loop, diff --git a/src/common/interserver.rs b/src/common/interserver.rs index 1f6e0ff..cef0fd4 100644 --- a/src/common/interserver.rs +++ b/src/common/interserver.rs @@ -2,7 +2,9 @@ use std::net::Ipv4Addr; use async_std::channel; use serde::{Serialize, Deserialize}; use serde::de::DeserializeOwned; +use crate::common::serverstate::{ClientId, SendServerPacket}; use crate::entity::account::UserAccountId; +use crate::entity::team::TeamEntityId; use crate::entity::character::CharacterEntityId; #[derive(Debug, Copy, Clone, Serialize, Deserialize, Hash, PartialEq, Eq, PartialOrd, Ord)] @@ -45,15 +47,22 @@ pub enum ShipMessage { RemoveUser(UserAccountId), } +pub enum InterserverMessage { + Server(ServerId, S), + Client(ClientId, C), +} + #[async_trait::async_trait] pub trait InterserverActor: Clone { - type SendMessage: Serialize; - type RecvMessage: DeserializeOwned; + type SendClientMessage: SendServerPacket; + type SendServerMessage: Serialize; + type RecvServerMessage: DeserializeOwned; type Error; - async fn on_connect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)>; - async fn on_action(&mut self, id: ServerId, msg: Self::RecvMessage) -> Result, Self::Error>; - async fn on_disconnect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)>; - async fn set_sender(&mut self, server_id: ServerId, tx: channel::Sender); + async fn on_connect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendServerMessage)>; + //async fn on_action(&mut self, id: ServerId, msg: Self::RecvMessage) -> Result, Self::Error>; + async fn on_action(&mut self, id: ServerId, msg: Self::RecvServerMessage) -> Result>, Self::Error>; + async fn on_disconnect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendServerMessage)>; + async fn set_sender(&mut self, server_id: ServerId, tx: channel::Sender); } diff --git a/src/common/mainloop/client.rs b/src/common/mainloop/client.rs index 1d24600..d187636 100644 --- a/src/common/mainloop/client.rs +++ b/src/common/mainloop/client.rs @@ -217,7 +217,7 @@ where } } -pub async fn run_server(mut state: STATE, port: u16) +pub async fn run_server(mut state: STATE, port: u16, interserver: Option>) where STATE: ServerState + Send + 'static, S: SendServerPacket + std::fmt::Debug + Send + 'static, @@ -228,7 +228,20 @@ where let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap(); let mut id = 0; - let clients = Arc::new(RwLock::new(HashMap::new())); + let clients = Arc::new(RwLock::new(HashMap::>::new())); + + if let Some(interserver) = interserver { + let clients = clients.clone(); + async_std::task::spawn(async move { + loop { + if let Ok((id, msg)) = interserver.recv().await { + if let Some(client) = clients.read().await.get(&id) { + client.send(msg).await; + } + } + } + }); + } loop { let (mut socket, addr) = listener.accept().await.unwrap(); diff --git a/src/common/mainloop/interserver.rs b/src/common/mainloop/interserver.rs index dbe940d..ebe74c3 100644 --- a/src/common/mainloop/interserver.rs +++ b/src/common/mainloop/interserver.rs @@ -8,7 +8,8 @@ use std::collections::HashMap; use serde::Serialize; use serde::de::DeserializeOwned; -use crate::common::interserver::{ServerId, InterserverActor}; +use crate::common::serverstate::ClientId; +use crate::common::interserver::{ServerId, InterserverActor, InterserverMessage}; use libpso::crypto::{PSOCipher, NullCipher, CipherError}; use crate::common::serverstate::{ServerState, SendServerPacket, RecvServerPacket}; @@ -53,11 +54,18 @@ impl MessageReceiver { } } -async fn interserver_recv_loop(mut state: STATE, server_id: ServerId, socket: async_std::net::TcpStream, ships: Arc>>>) +async fn interserver_recv_loop( + mut state: STATE, + server_id: ServerId, + socket: async_std::net::TcpStream, + ships: Arc>>>, + interserver_tx: Option>, +) where - STATE: InterserverActor + Send, + STATE: InterserverActor + Send, S: serde::Serialize + Debug + Send, R: serde::de::DeserializeOwned + Debug + Send, + C: Debug + Send, E: Debug + Send, { let mut msg_receiver = MessageReceiver::new(socket); @@ -69,14 +77,23 @@ where match state.on_action(server_id, msg).await { Ok(response) => { for resp in response { - ships - .read() - .await - .get(&resp.0) - .unwrap() - .send(resp.1) - .await - .unwrap(); + match resp { + InterserverMessage::Server(id, msg) => { + ships + .read() + .await + .get(&id) + .unwrap() + .send(msg) + .await + .unwrap(); + }, + InterserverMessage::Client(id, msg) => { + if let Some(interserver_tx) = &interserver_tx { + interserver_tx.send((id, msg)).await.unwrap(); + } + }, + } } }, Err(err) => { @@ -133,11 +150,12 @@ where } } -pub async fn run_interserver_listen(mut state: STATE, port: u16) +pub async fn run_interserver_listen(mut state: STATE, port: u16) where - STATE: InterserverActor + Send + 'static, + STATE: InterserverActor + Send + 'static, S: serde::Serialize + Debug + Send + 'static, R: serde::de::DeserializeOwned + Debug + Send, + C: Debug + Send, E: Debug + Send, { let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap(); @@ -168,7 +186,7 @@ where let rsocket = socket.clone(); let rships = ships.clone(); async_std::task::spawn(async move { - interserver_recv_loop(rstate, server_id, rsocket, rships).await; + interserver_recv_loop(rstate, server_id, rsocket, rships, None).await; }); async_std::task::spawn(async move { interserver_send_loop(server_id, socket, client_rx).await; @@ -176,11 +194,12 @@ where } } -pub async fn run_interserver_connect(mut state: STATE, ip: std::net::Ipv4Addr, port: u16) +pub async fn run_interserver_connect(mut state: STATE, ip: std::net::Ipv4Addr, port: u16, interserver_tx: channel::Sender<(ClientId, C)>) where - STATE: InterserverActor + Send + 'static, + STATE: InterserverActor + Send + 'static, S: serde::Serialize + Debug + Send + 'static, R: serde::de::DeserializeOwned + Debug + Send, + C: Debug + Send + 'static, E: Debug + Send, { let mut id = 0; @@ -209,8 +228,9 @@ where let other_server = vec![(server_id, client_tx.clone())].into_iter().collect(); let rstate = state.clone(); let rsocket = socket.clone(); + let interserver_tx = interserver_tx.clone(); async_std::task::spawn(async move { - interserver_recv_loop(rstate, server_id, rsocket, Arc::new(RwLock::new(other_server))).await; + interserver_recv_loop(rstate, server_id, rsocket, Arc::new(RwLock::new(other_server)), Some(interserver_tx)).await; }); let ssocket = socket.clone(); async_std::task::spawn(async move { diff --git a/src/login/character.rs b/src/login/character.rs index b6e28ab..de00ae0 100644 --- a/src/login/character.rs +++ b/src/login/character.rs @@ -648,15 +648,16 @@ impl ServerState for CharacterServerState { #[async_trait::async_trait] impl InterserverActor for CharacterServerState { - type SendMessage = LoginMessage; - type RecvMessage = ShipMessage; + type SendClientMessage = SendCharacterPacket; + type SendServerMessage = LoginMessage; + type RecvServerMessage = ShipMessage; type Error = (); - async fn on_connect(&mut self, _id: ServerId) -> Vec<(ServerId, Self::SendMessage)> { + async fn on_connect(&mut self, _id: ServerId) -> Vec<(ServerId, Self::SendServerMessage)> { Vec::new() } - async fn on_action(&mut self, id: ServerId, msg: Self::RecvMessage) -> Result, Self::Error> { + async fn on_action(&mut self, id: ServerId, msg: Self::RecvServerMessage) -> Result>, Self::Error> { dbg!(&id, &msg); match msg { ShipMessage::Authenticate(auth_token) => { @@ -678,7 +679,7 @@ impl InterserverActor for CharacterServerState { .await .iter() .map(|(id, _)| { - (*id, LoginMessage::ShipList{ ships: ships.clone() }) + InterserverMessage::Server(*id, LoginMessage::ShipList{ ships: ships.clone() }) }) .collect()) }, @@ -700,17 +701,20 @@ impl InterserverActor for CharacterServerState { ShipMessage::RequestShipList => { dbg!("request ship list", &self.authenticated_ships); if self.authenticated_ships.read().await.contains(&id) { - Ok(vec![(id, LoginMessage::ShipList { - ships: self.ships - .read() - .await - .iter() - .map(|(_, ship)| { - ship - }) - .cloned() - .collect() - })]) + Ok(vec![ + InterserverMessage::Server( + id, + LoginMessage::ShipList { + ships: self.ships + .read() + .await + .iter() + .map(|(_, ship)| { + ship + }) + .cloned() + .collect() + })]) } else { Ok(Vec::new()) @@ -722,7 +726,7 @@ impl InterserverActor for CharacterServerState { } } - async fn on_disconnect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)> { + async fn on_disconnect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendServerMessage)> { self.ships.write().await.remove(&id); self.ship_sender.write().await.remove(&id); self.connected_clients diff --git a/src/ship/packet/handler/auth.rs b/src/ship/packet/handler/auth.rs index ddbfeeb..4e096bd 100644 --- a/src/ship/packet/handler/auth.rs +++ b/src/ship/packet/handler/auth.rs @@ -13,7 +13,7 @@ pub async fn validate_login(id: ClientId, entity_gateway: &mut EG, clients: &mut Clients, item_state: &mut ItemState, - shipgate_sender: &Option>, + shipgate_sender: &Arc>>>, ship_name: &str, num_blocks: usize) -> Result, ShipError> @@ -36,7 +36,7 @@ where item_state.load_character(entity_gateway, &character).await?; - if let Some(shipgate_sender) = shipgate_sender.as_ref() { + if let Some(shipgate_sender) = shipgate_sender.read().await.as_ref() { shipgate_sender.send(ShipMessage::AddUser(user.id)).await?; } clients.add(id, ClientState::new(user, settings, character, pkt.session)).await; diff --git a/src/ship/ship.rs b/src/ship/ship.rs index a71cc14..544c5c8 100644 --- a/src/ship/ship.rs +++ b/src/ship/ship.rs @@ -19,7 +19,7 @@ use libpso::packet::ship::{BLOCK_MENU_ID, ROOM_MENU_ID}; use crate::common::cipherkeys::{ELSEWHERE_PRIVATE_KEY, ELSEWHERE_PARRAY}; use crate::common::serverstate::{SendServerPacket, RecvServerPacket, ServerState, OnConnect, ClientId}; -use crate::common::interserver::{AuthToken, Ship, ServerId, InterserverActor, LoginMessage, ShipMessage}; +use crate::common::interserver::{AuthToken, Ship, ServerId, InterserverActor, LoginMessage, ShipMessage, InterserverMessage}; use crate::login::character::SHIP_MENU_ID; @@ -450,7 +450,7 @@ impl ShipServerStateBuilder { auth_token: self.auth_token.unwrap_or_else(|| AuthToken("".into())), ship_list: Arc::new(RwLock::new(Vec::new())), - shipgate_sender: None, + shipgate_sender: Arc::new(RwLock::new(None)), trades: Default::default(), } } @@ -493,7 +493,7 @@ pub struct ShipServerState { auth_token: AuthToken, ship_list: Arc>>, - shipgate_sender: Option>, + shipgate_sender: Arc>>>, trades: TradeState, } @@ -828,7 +828,7 @@ impl ServerState for ShipServerState { if let Some(mut client) = self.clients.remove(&id).await { client.user.at_ship = false; self.entity_gateway.save_user(&client.user).await; - if let Some(shipgate_sender) = self.shipgate_sender.as_ref() { + if let Some(shipgate_sender) = self.shipgate_sender.read().await.as_ref() { shipgate_sender.send(ShipMessage::RemoveUser(client.user.id)).await; } self.item_state.remove_character_from_room(&client.character).await @@ -843,14 +843,15 @@ impl ServerState for ShipServerState { #[async_trait::async_trait] impl InterserverActor for ShipServerState { - type SendMessage = ShipMessage; - type RecvMessage = LoginMessage; - type Error = (); + type SendClientMessage = SendShipPacket; + type SendServerMessage = ShipMessage; + type RecvServerMessage = LoginMessage; + type Error = ShipError; - async fn on_connect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)> { + async fn on_connect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendServerMessage)> { vec![ (id, ShipMessage::Authenticate(self.auth_token.clone())), - (id, ShipMessage::NewShip(Ship { + (id, ShipMessage::NewShip(Ship { name: self.name.clone(), ip: self.ip, port: self.port, @@ -860,7 +861,7 @@ impl InterserverActor for ShipServerState { ] } - async fn on_action(&mut self, _id: ServerId, msg: Self::RecvMessage) -> Result, Self::Error> { + async fn on_action(&mut self, _id: ServerId, msg: Self::RecvServerMessage) -> Result>, Self::Error> { match msg { LoginMessage::SendMail{..} => { Ok(Vec::new()) @@ -887,11 +888,13 @@ impl InterserverActor for ShipServerState { } } - async fn on_disconnect(&mut self, _id: ServerId) -> Vec<(ServerId, Self::SendMessage)> { + async fn on_disconnect(&mut self, _id: ServerId) -> Vec<(ServerId, Self::SendServerMessage)> { Vec::new() } - async fn set_sender(&mut self, _server_id: ServerId, sender: channel::Sender) { - self.shipgate_sender = Some(sender); + async fn set_sender(&mut self, _server_id: ServerId, sender: channel::Sender) { + dbg!("setting sender!"); + //self.shipgate_sender = Arc::new(Some(sender)); + *self.shipgate_sender.write().await = Some(sender); } }