|
@ -8,7 +8,8 @@ use std::collections::HashMap; |
|
|
use serde::Serialize;
|
|
|
use serde::Serialize;
|
|
|
use serde::de::DeserializeOwned;
|
|
|
use serde::de::DeserializeOwned;
|
|
|
|
|
|
|
|
|
use crate::common::interserver::{ServerId, InterserverActor};
|
|
|
|
|
|
|
|
|
use crate::common::serverstate::ClientId;
|
|
|
|
|
|
use crate::common::interserver::{ServerId, InterserverActor, InterserverMessage};
|
|
|
|
|
|
|
|
|
use libpso::crypto::{PSOCipher, NullCipher, CipherError};
|
|
|
use libpso::crypto::{PSOCipher, NullCipher, CipherError};
|
|
|
use crate::common::serverstate::{ServerState, SendServerPacket, RecvServerPacket};
|
|
|
use crate::common::serverstate::{ServerState, SendServerPacket, RecvServerPacket};
|
|
@ -53,11 +54,18 @@ impl MessageReceiver { |
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
async fn interserver_recv_loop<STATE, S, R, E>(mut state: STATE, server_id: ServerId, socket: async_std::net::TcpStream, ships: Arc<RwLock<HashMap<ServerId, channel::Sender<S>>>>)
|
|
|
|
|
|
|
|
|
async fn interserver_recv_loop<STATE, S, R, C, E>(
|
|
|
|
|
|
mut state: STATE,
|
|
|
|
|
|
server_id: ServerId,
|
|
|
|
|
|
socket: async_std::net::TcpStream,
|
|
|
|
|
|
ships: Arc<RwLock<HashMap<ServerId, channel::Sender<S>>>>,
|
|
|
|
|
|
interserver_tx: Option<channel::Sender<(ClientId, C)>>,
|
|
|
|
|
|
)
|
|
|
where
|
|
|
where
|
|
|
STATE: InterserverActor<SendMessage=S, RecvMessage=R, Error=E> + Send,
|
|
|
|
|
|
|
|
|
STATE: InterserverActor<SendServerMessage=S, RecvServerMessage=R, SendClientMessage=C, Error=E> + Send,
|
|
|
S: serde::Serialize + Debug + Send,
|
|
|
S: serde::Serialize + Debug + Send,
|
|
|
R: serde::de::DeserializeOwned + Debug + Send,
|
|
|
R: serde::de::DeserializeOwned + Debug + Send,
|
|
|
|
|
|
C: Debug + Send,
|
|
|
E: Debug + Send,
|
|
|
E: Debug + Send,
|
|
|
{
|
|
|
{
|
|
|
let mut msg_receiver = MessageReceiver::new(socket);
|
|
|
let mut msg_receiver = MessageReceiver::new(socket);
|
|
@ -69,14 +77,23 @@ where |
|
|
match state.on_action(server_id, msg).await {
|
|
|
match state.on_action(server_id, msg).await {
|
|
|
Ok(response) => {
|
|
|
Ok(response) => {
|
|
|
for resp in response {
|
|
|
for resp in response {
|
|
|
ships
|
|
|
|
|
|
.read()
|
|
|
|
|
|
.await
|
|
|
|
|
|
.get(&resp.0)
|
|
|
|
|
|
.unwrap()
|
|
|
|
|
|
.send(resp.1)
|
|
|
|
|
|
.await
|
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
|
|
|
match resp {
|
|
|
|
|
|
InterserverMessage::Server(id, msg) => {
|
|
|
|
|
|
ships
|
|
|
|
|
|
.read()
|
|
|
|
|
|
.await
|
|
|
|
|
|
.get(&id)
|
|
|
|
|
|
.unwrap()
|
|
|
|
|
|
.send(msg)
|
|
|
|
|
|
.await
|
|
|
|
|
|
.unwrap();
|
|
|
|
|
|
},
|
|
|
|
|
|
InterserverMessage::Client(id, msg) => {
|
|
|
|
|
|
if let Some(interserver_tx) = &interserver_tx {
|
|
|
|
|
|
interserver_tx.send((id, msg)).await.unwrap();
|
|
|
|
|
|
}
|
|
|
|
|
|
},
|
|
|
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
},
|
|
|
},
|
|
|
Err(err) => {
|
|
|
Err(err) => {
|
|
@ -133,11 +150,12 @@ where |
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
pub async fn run_interserver_listen<STATE, S, R, E>(mut state: STATE, port: u16)
|
|
|
|
|
|
|
|
|
pub async fn run_interserver_listen<STATE, S, R, C, E>(mut state: STATE, port: u16)
|
|
|
where
|
|
|
where
|
|
|
STATE: InterserverActor<SendMessage=S, RecvMessage=R, Error=E> + Send + 'static,
|
|
|
|
|
|
|
|
|
STATE: InterserverActor<SendServerMessage=S, RecvServerMessage=R, SendClientMessage=C, Error=E> + Send + 'static,
|
|
|
S: serde::Serialize + Debug + Send + 'static,
|
|
|
S: serde::Serialize + Debug + Send + 'static,
|
|
|
R: serde::de::DeserializeOwned + Debug + Send,
|
|
|
R: serde::de::DeserializeOwned + Debug + Send,
|
|
|
|
|
|
C: Debug + Send,
|
|
|
E: Debug + Send,
|
|
|
E: Debug + Send,
|
|
|
{
|
|
|
{
|
|
|
let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap();
|
|
|
let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap();
|
|
@ -168,7 +186,7 @@ where |
|
|
let rsocket = socket.clone();
|
|
|
let rsocket = socket.clone();
|
|
|
let rships = ships.clone();
|
|
|
let rships = ships.clone();
|
|
|
async_std::task::spawn(async move {
|
|
|
async_std::task::spawn(async move {
|
|
|
interserver_recv_loop(rstate, server_id, rsocket, rships).await;
|
|
|
|
|
|
|
|
|
interserver_recv_loop(rstate, server_id, rsocket, rships, None).await;
|
|
|
});
|
|
|
});
|
|
|
async_std::task::spawn(async move {
|
|
|
async_std::task::spawn(async move {
|
|
|
interserver_send_loop(server_id, socket, client_rx).await;
|
|
|
interserver_send_loop(server_id, socket, client_rx).await;
|
|
@ -176,11 +194,12 @@ where |
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
pub async fn run_interserver_connect<STATE, S, R, E>(mut state: STATE, ip: std::net::Ipv4Addr, port: u16)
|
|
|
|
|
|
|
|
|
pub async fn run_interserver_connect<STATE, S, R, C, E>(mut state: STATE, ip: std::net::Ipv4Addr, port: u16, interserver_tx: channel::Sender<(ClientId, C)>)
|
|
|
where
|
|
|
where
|
|
|
STATE: InterserverActor<SendMessage=S, RecvMessage=R, Error=E> + Send + 'static,
|
|
|
|
|
|
|
|
|
STATE: InterserverActor<SendServerMessage=S, RecvServerMessage=R, SendClientMessage=C, Error=E> + Send + 'static,
|
|
|
S: serde::Serialize + Debug + Send + 'static,
|
|
|
S: serde::Serialize + Debug + Send + 'static,
|
|
|
R: serde::de::DeserializeOwned + Debug + Send,
|
|
|
R: serde::de::DeserializeOwned + Debug + Send,
|
|
|
|
|
|
C: Debug + Send + 'static,
|
|
|
E: Debug + Send,
|
|
|
E: Debug + Send,
|
|
|
{
|
|
|
{
|
|
|
let mut id = 0;
|
|
|
let mut id = 0;
|
|
@ -209,8 +228,9 @@ where |
|
|
let other_server = vec![(server_id, client_tx.clone())].into_iter().collect();
|
|
|
let other_server = vec![(server_id, client_tx.clone())].into_iter().collect();
|
|
|
let rstate = state.clone();
|
|
|
let rstate = state.clone();
|
|
|
let rsocket = socket.clone();
|
|
|
let rsocket = socket.clone();
|
|
|
|
|
|
let interserver_tx = interserver_tx.clone();
|
|
|
async_std::task::spawn(async move {
|
|
|
async_std::task::spawn(async move {
|
|
|
interserver_recv_loop(rstate, server_id, rsocket, Arc::new(RwLock::new(other_server))).await;
|
|
|
|
|
|
|
|
|
interserver_recv_loop(rstate, server_id, rsocket, Arc::new(RwLock::new(other_server)), Some(interserver_tx)).await;
|
|
|
});
|
|
|
});
|
|
|
let ssocket = socket.clone();
|
|
|
let ssocket = socket.clone();
|
|
|
async_std::task::spawn(async move {
|
|
|
async_std::task::spawn(async move {
|
|
|