You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
253 lines
9.1 KiB
253 lines
9.1 KiB
use std::time::Duration;
|
|
use std::pin::Pin;
|
|
use futures::future::Future;
|
|
use log::{info, warn};
|
|
use async_std::sync::{Arc, RwLock};
|
|
use async_std::io::prelude::{ReadExt, WriteExt};
|
|
use std::collections::HashMap;
|
|
use serde::Serialize;
|
|
use serde::de::DeserializeOwned;
|
|
|
|
use crate::common::serverstate::ClientId;
|
|
use crate::common::interserver::{ServerId, InterserverActor, InterserverMessage};
|
|
|
|
use libpso::crypto::{PSOCipher, NullCipher, CipherError};
|
|
use crate::common::serverstate::{ServerState, SendServerPacket, RecvServerPacket};
|
|
use crate::login::character::CharacterServerState;
|
|
//use crate::ship::ship::ShipServerState;
|
|
use crate::entity::gateway::entitygateway::EntityGateway;
|
|
|
|
use async_std::channel;
|
|
use std::fmt::Debug;
|
|
|
|
|
|
#[derive(Debug)]
|
|
enum MessageReceiverError {
|
|
//InvalidSize,
|
|
InvalidPayload,
|
|
//NetworkError(std::io::Error),
|
|
Disconnected,
|
|
}
|
|
|
|
struct MessageReceiver {
|
|
socket: async_std::net::TcpStream,
|
|
}
|
|
|
|
impl MessageReceiver {
|
|
fn new(socket: async_std::net::TcpStream) -> MessageReceiver {
|
|
MessageReceiver {
|
|
socket,
|
|
}
|
|
}
|
|
|
|
async fn recv<R: serde::de::DeserializeOwned + std::fmt::Debug>(&mut self) -> Result<R, MessageReceiverError> {
|
|
let mut size_buf = [0u8; 4];
|
|
self.socket.read_exact(&mut size_buf).await.map_err(|_| MessageReceiverError::Disconnected)?;
|
|
let size = u32::from_le_bytes(size_buf) as usize;
|
|
|
|
let mut payload = vec![0u8; size];
|
|
self.socket.read_exact(&mut payload).await.map_err(|_| MessageReceiverError::Disconnected)?;
|
|
let payload = String::from_utf8(payload).map_err(|_| MessageReceiverError::InvalidPayload)?;
|
|
|
|
let msg = serde_json::from_str(&payload).map_err(|_| MessageReceiverError::InvalidPayload)?;
|
|
Ok(msg)
|
|
}
|
|
}
|
|
|
|
async fn interserver_recv_loop<STATE, S, R, C, E>(
|
|
mut state: STATE,
|
|
server_id: ServerId,
|
|
socket: async_std::net::TcpStream,
|
|
ships: Arc<RwLock<HashMap<ServerId, channel::Sender<S>>>>,
|
|
interserver_tx: Option<channel::Sender<(ClientId, C)>>,
|
|
)
|
|
where
|
|
STATE: InterserverActor<SendServerMessage=S, RecvServerMessage=R, SendClientMessage=C, Error=E> + Send,
|
|
S: serde::Serialize + Debug + Send,
|
|
R: serde::de::DeserializeOwned + Debug + Send,
|
|
C: Debug + Send,
|
|
E: Debug + Send,
|
|
{
|
|
let mut msg_receiver = MessageReceiver::new(socket);
|
|
|
|
loop {
|
|
match msg_receiver.recv::<R>().await {
|
|
Ok(msg) => {
|
|
info!("[interserver recv {:?}] {:?}", server_id, msg);
|
|
match state.on_action(server_id, msg).await {
|
|
Ok(response) => {
|
|
for resp in response {
|
|
match resp {
|
|
InterserverMessage::Server(id, msg) => {
|
|
ships
|
|
.read()
|
|
.await
|
|
.get(&id)
|
|
.unwrap()
|
|
.send(msg)
|
|
.await
|
|
.unwrap();
|
|
},
|
|
InterserverMessage::Client(id, msg) => {
|
|
if let Some(interserver_tx) = &interserver_tx {
|
|
interserver_tx.send((id, msg)).await.unwrap();
|
|
}
|
|
},
|
|
}
|
|
}
|
|
},
|
|
Err(err) => {
|
|
warn!("[interserver recv {:?}] error {:?}", server_id, err);
|
|
}
|
|
}
|
|
},
|
|
Err(err) => {
|
|
if let MessageReceiverError::Disconnected = err {
|
|
info!("[interserver recv {:?}] disconnected", server_id);
|
|
for (_, _sender) in ships.read().await.iter() {
|
|
for pkt in state.on_disconnect(server_id).await {
|
|
ships
|
|
.read()
|
|
.await
|
|
.get(&pkt.0)
|
|
.unwrap()
|
|
.send(pkt.1)
|
|
.await
|
|
.unwrap();
|
|
}
|
|
}
|
|
ships
|
|
.write()
|
|
.await
|
|
.remove(&server_id);
|
|
break;
|
|
}
|
|
info!("[interserver recv {:?}] error {:?}", server_id, err);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn interserver_send_loop<S>(server_id: ServerId, mut socket: async_std::net::TcpStream, to_send: channel::Receiver<S>)
|
|
where
|
|
S: serde::Serialize + std::fmt::Debug,
|
|
{
|
|
loop {
|
|
let msg = to_send.recv().await.unwrap();
|
|
let payload = serde_json::to_string(&msg);
|
|
|
|
if let Ok(payload) = payload {
|
|
let len_bytes = u32::to_le_bytes(payload.len() as u32);
|
|
if let Err(err) = socket.write_all(&len_bytes).await {
|
|
warn!("[interserver send {:?}] failed: {:?}", server_id, err);
|
|
break;
|
|
}
|
|
if let Err(err) = socket.write_all(payload.as_bytes()).await {
|
|
warn!("[interserver send {:?}] failed: {:?}", server_id, err);
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
pub async fn run_interserver_listen<STATE, S, R, C, E>(mut state: STATE, port: u16)
|
|
where
|
|
STATE: InterserverActor<SendServerMessage=S, RecvServerMessage=R, SendClientMessage=C, Error=E> + Send + 'static,
|
|
S: serde::Serialize + Debug + Send + 'static,
|
|
R: serde::de::DeserializeOwned + Debug + Send,
|
|
C: Debug + Send,
|
|
E: Debug + Send,
|
|
{
|
|
let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap();
|
|
let mut id = 0;
|
|
let ships = Arc::new(RwLock::new(HashMap::new()));
|
|
|
|
loop {
|
|
let (socket, addr) = listener.accept().await.unwrap();
|
|
info!("[interserver listen] new server: {:?} {:?}", socket, addr);
|
|
|
|
id += 1;
|
|
let server_id = crate::common::interserver::ServerId(id);
|
|
let (client_tx, client_rx) = async_std::channel::unbounded();
|
|
state.set_sender(server_id, client_tx.clone()).await;
|
|
|
|
ships
|
|
.write()
|
|
.await
|
|
.insert(server_id, client_tx.clone());
|
|
|
|
for msg in state.on_connect(server_id).await {
|
|
if let Some(ship_sender) = ships.read().await.get(&msg.0) {
|
|
ship_sender.send(msg.1).await.unwrap();
|
|
}
|
|
}
|
|
|
|
let rstate = state.clone();
|
|
let rsocket = socket.clone();
|
|
let rships = ships.clone();
|
|
async_std::task::spawn(async move {
|
|
interserver_recv_loop(rstate, server_id, rsocket, rships, None).await;
|
|
});
|
|
async_std::task::spawn(async move {
|
|
interserver_send_loop(server_id, socket, client_rx).await;
|
|
});
|
|
}
|
|
}
|
|
|
|
pub async fn run_interserver_connect<STATE, S, R, C, E>(mut state: STATE, ip: std::net::Ipv4Addr, port: u16, interserver_tx: channel::Sender<(ClientId, C)>)
|
|
where
|
|
STATE: InterserverActor<SendServerMessage=S, RecvServerMessage=R, SendClientMessage=C, Error=E> + Send + 'static,
|
|
S: serde::Serialize + Debug + Send + 'static,
|
|
R: serde::de::DeserializeOwned + Debug + Send,
|
|
C: Debug + Send + 'static,
|
|
E: Debug + Send,
|
|
{
|
|
let mut id = 0;
|
|
|
|
loop {
|
|
info!("[interserver connect] trying to connect to server");
|
|
let socket = match async_std::net::TcpStream::connect((ip, port)).await {
|
|
Ok(socket) => socket,
|
|
Err(err) => {
|
|
info!("err trying to connect to loginserv {:?}", err);
|
|
async_std::task::sleep(std::time::Duration::from_secs(10)).await;
|
|
continue;
|
|
}
|
|
};
|
|
id += 1;
|
|
let server_id = crate::common::interserver::ServerId(id);
|
|
info!("[interserver connect] found loginserv: {:?} {:?}", server_id, socket);
|
|
|
|
let (client_tx, client_rx) = async_std::channel::unbounded();
|
|
state.set_sender(server_id, client_tx.clone()).await;
|
|
|
|
for msg in state.on_connect(server_id).await {
|
|
client_tx.send(msg.1).await.unwrap();
|
|
}
|
|
|
|
let other_server = vec![(server_id, client_tx.clone())].into_iter().collect();
|
|
let rstate = state.clone();
|
|
let rsocket = socket.clone();
|
|
let interserver_tx = interserver_tx.clone();
|
|
async_std::task::spawn(async move {
|
|
interserver_recv_loop(rstate, server_id, rsocket, Arc::new(RwLock::new(other_server)), Some(interserver_tx)).await;
|
|
});
|
|
let ssocket = socket.clone();
|
|
async_std::task::spawn(async move {
|
|
interserver_send_loop(server_id, ssocket, client_rx).await;
|
|
});
|
|
|
|
let mut buf = [0u8; 1];
|
|
loop {
|
|
let peek = socket.peek(&mut buf).await;
|
|
match peek {
|
|
Ok(len) if len == 0 => {
|
|
break
|
|
},
|
|
_ => {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|