Browse Source
Merge pull request 'interserver communication' (#202) from interserver_communication into master
pbs
Merge pull request 'interserver communication' (#202) from interserver_communication into master
pbs
jake
4 years ago
15 changed files with 696 additions and 133 deletions
-
65src/bin/main.rs
-
54src/common/interserver.rs
-
98src/common/mainloop/client.rs
-
220src/common/mainloop/interserver.rs
-
45src/common/mainloop/mod.rs
-
1src/common/mod.rs
-
1src/common/serverstate.rs
-
3src/entity/character.rs
-
66src/login/character.rs
-
2src/login/login.rs
-
94src/ship/ship.rs
-
104tests/test_bank.rs
-
16tests/test_exp_gain.rs
-
40tests/test_item_pickup.rs
-
20tests/test_item_use.rs
@ -0,0 +1,54 @@ |
|||||
|
use std::net::Ipv4Addr;
|
||||
|
use serde::{Serialize, Deserialize};
|
||||
|
use serde::de::DeserializeOwned;
|
||||
|
use crate::entity::character::CharacterEntityId;
|
||||
|
|
||||
|
#[derive(Debug, Copy, Clone, Serialize, Deserialize, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
||||
|
pub struct ServerId(pub usize);
|
||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||
|
pub struct AuthToken(pub String);
|
||||
|
|
||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||
|
pub struct Ship {
|
||||
|
pub name: String,
|
||||
|
pub ip: Ipv4Addr,
|
||||
|
pub port: u16,
|
||||
|
pub block_count: u32,
|
||||
|
}
|
||||
|
|
||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||
|
pub enum LoginMessage {
|
||||
|
SendMail {
|
||||
|
character_id: CharacterEntityId,
|
||||
|
title: String,
|
||||
|
message: String,
|
||||
|
},
|
||||
|
ShipList {
|
||||
|
ships: Vec<Ship>,
|
||||
|
}
|
||||
|
}
|
||||
|
|
||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||
|
pub enum ShipMessage {
|
||||
|
Authenticate(AuthToken),
|
||||
|
NewShip(Ship),
|
||||
|
SendMail {
|
||||
|
character_id: CharacterEntityId,
|
||||
|
title: String,
|
||||
|
message: String,
|
||||
|
},
|
||||
|
RequestShipList,
|
||||
|
|
||||
|
}
|
||||
|
|
||||
|
|
||||
|
#[async_trait::async_trait]
|
||||
|
pub trait InterserverActor {
|
||||
|
type SendMessage: Serialize;
|
||||
|
type RecvMessage: DeserializeOwned;
|
||||
|
type Error;
|
||||
|
|
||||
|
async fn on_connect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)>;
|
||||
|
async fn action(&mut self, id: ServerId, msg: Self::RecvMessage) -> Result<Vec<(ServerId, Self::SendMessage)>, Self::Error>;
|
||||
|
async fn on_disconnect(&mut self, id: ServerId) -> Vec<(ServerId, Self::SendMessage)>;
|
||||
|
}
|
@ -0,0 +1,220 @@ |
|||||
|
use std::time::Duration;
|
||||
|
use std::pin::Pin;
|
||||
|
use futures::future::Future;
|
||||
|
use log::{info, warn};
|
||||
|
use async_std::sync::{Arc, Mutex};
|
||||
|
use async_std::io::prelude::{ReadExt, WriteExt};
|
||||
|
use std::collections::HashMap;
|
||||
|
use serde::Serialize;
|
||||
|
use serde::de::DeserializeOwned;
|
||||
|
|
||||
|
use crate::common::interserver::{ServerId, InterserverActor};
|
||||
|
|
||||
|
use crate::login::character::CharacterServerState;
|
||||
|
use crate::ship::ship::ShipServerState;
|
||||
|
use crate::entity::gateway::entitygateway::EntityGateway;
|
||||
|
|
||||
|
#[derive(Debug)]
|
||||
|
enum MessageReceiverError {
|
||||
|
//InvalidSize,
|
||||
|
InvalidPayload,
|
||||
|
//NetworkError(std::io::Error),
|
||||
|
Disconnected,
|
||||
|
}
|
||||
|
|
||||
|
struct MessageReceiver {
|
||||
|
socket: async_std::net::TcpStream,
|
||||
|
}
|
||||
|
|
||||
|
impl MessageReceiver {
|
||||
|
fn new(socket: async_std::net::TcpStream) -> MessageReceiver {
|
||||
|
MessageReceiver {
|
||||
|
socket: socket,
|
||||
|
}
|
||||
|
}
|
||||
|
|
||||
|
async fn recv<R: DeserializeOwned + std::fmt::Debug + Send>(&mut self) -> Result<R, MessageReceiverError> {
|
||||
|
let mut size_buf = [0u8; 4];
|
||||
|
self.socket.read_exact(&mut size_buf).await.map_err(|_| MessageReceiverError::Disconnected)?;
|
||||
|
let size = u32::from_le_bytes(size_buf) as usize;
|
||||
|
|
||||
|
let mut payload = vec![0u8; size];
|
||||
|
self.socket.read_exact(&mut payload).await.map_err(|err| MessageReceiverError::Disconnected)?;
|
||||
|
let payload = String::from_utf8(payload).map_err(|_| MessageReceiverError::InvalidPayload)?;
|
||||
|
|
||||
|
let msg = serde_json::from_str(&payload).map_err(|_| MessageReceiverError::InvalidPayload)?;
|
||||
|
Ok(msg)
|
||||
|
}
|
||||
|
}
|
||||
|
|
||||
|
#[derive(Debug)]
|
||||
|
enum InterserverInputAction<S, R> {
|
||||
|
NewConnection(ServerId, async_std::sync::Sender<S>),
|
||||
|
Message(ServerId, R),
|
||||
|
Disconnect(ServerId),
|
||||
|
}
|
||||
|
|
||||
|
async fn interserver_state_loop<A, S, R>(state: Arc<Mutex<A>>, action_receiver: async_std::sync::Receiver<InterserverInputAction<S, R>>)
|
||||
|
where
|
||||
|
A: InterserverActor<SendMessage=S, RecvMessage=R, Error=()> + Send + 'static,
|
||||
|
S: Serialize + Send + 'static,
|
||||
|
R: DeserializeOwned + Send + 'static,
|
||||
|
{
|
||||
|
async_std::task::spawn(async move {
|
||||
|
let mut ships = HashMap::new();
|
||||
|
|
||||
|
loop {
|
||||
|
info!("interserver loop");
|
||||
|
let action = action_receiver.recv().await.unwrap();
|
||||
|
let mut state = state.lock().await;
|
||||
|
|
||||
|
match action {
|
||||
|
InterserverInputAction::NewConnection(server_id, ship_action_sender) => {
|
||||
|
ships.insert(server_id, ship_action_sender);
|
||||
|
for (server, action) in state.on_connect(server_id).await {
|
||||
|
if let Some(sender) = ships.get_mut(&server) {
|
||||
|
sender.send(action).await;
|
||||
|
}
|
||||
|
}
|
||||
|
},
|
||||
|
InterserverInputAction::Message(server_id, message) => {
|
||||
|
let actions = state.action(server_id, message).await;
|
||||
|
match actions {
|
||||
|
Ok(actions) => {
|
||||
|
for (server, action) in actions{
|
||||
|
if let Some(sender) = ships.get_mut(&server) {
|
||||
|
sender.send(action).await;
|
||||
|
}
|
||||
|
}
|
||||
|
},
|
||||
|
Err(err) => {
|
||||
|
warn!("[server {:?} state handler error] {:?}", server_id, err);
|
||||
|
}
|
||||
|
}
|
||||
|
},
|
||||
|
InterserverInputAction::Disconnect(server_id) => {
|
||||
|
let actions = state.on_disconnect(server_id).await;
|
||||
|
for (server, action) in actions {
|
||||
|
if let Some(sender) = ships.get_mut(&server) {
|
||||
|
sender.send(action).await;
|
||||
|
}
|
||||
|
}
|
||||
|
break;
|
||||
|
}
|
||||
|
}
|
||||
|
}
|
||||
|
});
|
||||
|
}
|
||||
|
|
||||
|
async fn login_recv_loop<S, R>(server_id: ServerId,
|
||||
|
socket: async_std::net::TcpStream,
|
||||
|
state_loop_sender: async_std::sync::Sender<InterserverInputAction<S, R>>,
|
||||
|
output_loop_sender: async_std::sync::Sender<S>)
|
||||
|
where
|
||||
|
S: Serialize + std::fmt::Debug + Send + 'static,
|
||||
|
R: DeserializeOwned + std::fmt::Debug + Send + 'static,
|
||||
|
{
|
||||
|
async_std::task::spawn(async move {
|
||||
|
state_loop_sender.send(InterserverInputAction::NewConnection(server_id, output_loop_sender)).await;
|
||||
|
let mut msg_receiver = MessageReceiver::new(socket);
|
||||
|
|
||||
|
loop {
|
||||
|
info!("login recv loop");
|
||||
|
match msg_receiver.recv().await {
|
||||
|
Ok(msg) => {
|
||||
|
info!("[login recv loop msg] {:?}", msg);
|
||||
|
state_loop_sender.send(InterserverInputAction::Message(server_id, msg)).await
|
||||
|
},
|
||||
|
Err(err) => {
|
||||
|
if let MessageReceiverError::Disconnected = err {
|
||||
|
info!("[login recv loop disconnect] {:?}", server_id);
|
||||
|
state_loop_sender.send(InterserverInputAction::Disconnect(server_id)).await;
|
||||
|
break;
|
||||
|
}
|
||||
|
info!("[login recv loop err] {:?}", err);
|
||||
|
}
|
||||
|
}
|
||||
|
}
|
||||
|
});
|
||||
|
}
|
||||
|
|
||||
|
async fn interserver_send_loop<S>(server_id: ServerId,
|
||||
|
mut socket: async_std::net::TcpStream,
|
||||
|
output_loop_receiver: async_std::sync::Receiver<S>)
|
||||
|
where
|
||||
|
S: Serialize + std::fmt::Debug + Send + 'static,
|
||||
|
{
|
||||
|
async_std::task::spawn(async move {
|
||||
|
loop {
|
||||
|
info!("login send loop");
|
||||
|
let msg = output_loop_receiver.recv().await.unwrap();
|
||||
|
|
||||
|
let payload = serde_json::to_string(&msg);
|
||||
|
if let Ok(payload) = payload {
|
||||
|
let len_bytes = u32::to_le_bytes(payload.len() as u32);
|
||||
|
|
||||
|
match socket.write_all(&len_bytes).await {
|
||||
|
Ok(_) => {},
|
||||
|
Err(err) => warn!("send failed: {:?}", err),
|
||||
|
}
|
||||
|
match socket.write_all(&payload.as_bytes()).await {
|
||||
|
Ok(_) => {},
|
||||
|
Err(err) => warn!("send failed: {:?}", err),
|
||||
|
}
|
||||
|
}
|
||||
|
}
|
||||
|
});
|
||||
|
}
|
||||
|
|
||||
|
|
||||
|
|
||||
|
pub fn login_listen_mainloop<EG: EntityGateway + 'static>(state: Arc<Mutex<CharacterServerState<EG>>>, port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
|
||||
|
Box::pin(async_std::task::spawn(async move {
|
||||
|
let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap();
|
||||
|
let mut id = 0;
|
||||
|
|
||||
|
let (server_state_sender, server_state_receiver) = async_std::sync::channel(1024);
|
||||
|
interserver_state_loop(state, server_state_receiver).await;
|
||||
|
|
||||
|
loop {
|
||||
|
let (socket, addr) = listener.accept().await.unwrap();
|
||||
|
info!("new ship server: {:?} {:?}", socket, addr);
|
||||
|
|
||||
|
id += 1;
|
||||
|
let server_id = crate::common::interserver::ServerId(id);
|
||||
|
|
||||
|
let (client_sender, client_receiver) = async_std::sync::channel(64);
|
||||
|
|
||||
|
login_recv_loop(server_id, socket.clone(), server_state_sender.clone(), client_sender).await;
|
||||
|
interserver_send_loop(server_id, socket.clone(), client_receiver).await;
|
||||
|
}
|
||||
|
}))
|
||||
|
}
|
||||
|
|
||||
|
pub fn ship_connect_mainloop<EG: EntityGateway + 'static>(state: Arc<Mutex<ShipServerState<EG>>>, ip: std::net::Ipv4Addr, port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
|
||||
|
Box::pin(async_std::task::spawn(async move {
|
||||
|
let mut id = 0;
|
||||
|
|
||||
|
let (server_state_sender, server_state_receiver) = async_std::sync::channel(1024);
|
||||
|
interserver_state_loop(state, server_state_receiver).await;
|
||||
|
|
||||
|
loop {
|
||||
|
let socket = async_std::net::TcpStream::connect((ip, port)).await.unwrap();
|
||||
|
id += 1;
|
||||
|
let server_id = crate::common::interserver::ServerId(id);
|
||||
|
let (client_sender, client_receiver) = async_std::sync::channel(64);
|
||||
|
|
||||
|
info!("ship connected to login: {:?}", socket);
|
||||
|
login_recv_loop(server_id, socket.clone(), server_state_sender.clone(), client_sender).await;
|
||||
|
interserver_send_loop(server_id, socket.clone(), client_receiver).await;
|
||||
|
loop {
|
||||
|
if let Err(_) = socket.peer_addr() {
|
||||
|
info!("ship connected to login: {:?}", socket);
|
||||
|
break;
|
||||
|
}
|
||||
|
async_std::task::sleep(Duration::from_secs(10)).await;
|
||||
|
}
|
||||
|
}
|
||||
|
}))
|
||||
|
}
|
||||
|
|
@ -0,0 +1,45 @@ |
|||||
|
mod client;
|
||||
|
mod interserver;
|
||||
|
|
||||
|
use std::pin::Pin;
|
||||
|
use futures::future::{Future, join_all, FutureExt};
|
||||
|
use async_std::sync::{Arc, Mutex};
|
||||
|
|
||||
|
use crate::common::mainloop::client::client_accept_mainloop;
|
||||
|
use crate::common::mainloop::interserver::{ship_connect_mainloop, login_listen_mainloop};
|
||||
|
pub use crate::common::mainloop::client::NetworkError;
|
||||
|
|
||||
|
use crate::patch::patch::PatchServerState;
|
||||
|
use crate::login::login::LoginServerState;
|
||||
|
use crate::login::character::CharacterServerState;
|
||||
|
use crate::ship::ship::ShipServerState;
|
||||
|
use crate::entity::gateway::entitygateway::EntityGateway;
|
||||
|
|
||||
|
|
||||
|
|
||||
|
pub fn patch_mainloop(patch_state: PatchServerState, patch_port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
|
||||
|
let patch_state = Arc::new(Mutex::new(patch_state));
|
||||
|
let client_mainloop = client_accept_mainloop(patch_state, patch_port);
|
||||
|
Box::pin(client_mainloop)
|
||||
|
}
|
||||
|
|
||||
|
pub fn login_mainloop<EG: EntityGateway + 'static>(login_state: LoginServerState<EG>, login_port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
|
||||
|
let login_state = Arc::new(Mutex::new(login_state));
|
||||
|
let client_mainloop = client_accept_mainloop(login_state.clone(), login_port);
|
||||
|
Box::pin(client_mainloop)
|
||||
|
}
|
||||
|
|
||||
|
pub fn character_mainloop<EG: EntityGateway + 'static>(character_state: CharacterServerState<EG>, character_port: u16, comm_port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
|
||||
|
let character_state = Arc::new(Mutex::new(character_state));
|
||||
|
let client_mainloop = client_accept_mainloop(character_state.clone(), character_port);
|
||||
|
let ship_communication_mainloop = login_listen_mainloop(character_state.clone(), comm_port);
|
||||
|
Box::pin(join_all(vec![client_mainloop, ship_communication_mainloop]).map(|_| ()))
|
||||
|
}
|
||||
|
|
||||
|
|
||||
|
pub fn ship_mainloop<EG: EntityGateway + 'static>(ship_state: ShipServerState<EG>, ship_port: u16, comm_ip: std::net::Ipv4Addr, comm_port: u16) -> Pin<Box<dyn Future<Output = ()>>> {
|
||||
|
let ship_state = Arc::new(Mutex::new(ship_state));
|
||||
|
let client_mainloop = client_accept_mainloop(ship_state.clone(), ship_port);
|
||||
|
let login_communication_mainloop = ship_connect_mainloop(ship_state.clone(), comm_ip, comm_port);
|
||||
|
Box::pin(join_all(vec![client_mainloop, login_communication_mainloop]).map(|_| ()))
|
||||
|
}
|
Write
Preview
Loading…
Cancel
Save
Reference in new issue