jake
5 years ago
12 changed files with 284 additions and 470 deletions
-
2Cargo.toml
-
128src/common/client.rs
-
184src/common/clientpool.rs
-
307src/common/mainloop.rs
-
2src/common/mod.rs
-
41src/common/network.rs
-
4src/common/serverstate.rs
-
2src/login/character.rs
-
2src/login/login.rs
-
66src/main.rs
-
4src/patch/patch.rs
-
12src/ship/ship.rs
@ -1,128 +0,0 @@ |
|||||
use log::{info, trace, warn};
|
|
||||
|
|
||||
use libpso::crypto::{PSOCipher, NullCipher};
|
|
||||
|
|
||||
use crate::common::serverstate::{SendServerPacket, RecvServerPacket, ClientId};
|
|
||||
use crate::common::network::PacketNetworkError;
|
|
||||
|
|
||||
use std::io::{Read, Write};
|
|
||||
|
|
||||
pub struct Client<S, R> {
|
|
||||
pub id: ClientId,
|
|
||||
running: bool,
|
|
||||
pub socket: mio::tcp::TcpStream,
|
|
||||
cipher_in: Box<dyn PSOCipher + Send>,
|
|
||||
cipher_out: Box<dyn PSOCipher + Send>,
|
|
||||
recv_buffer: Vec<u8>,
|
|
||||
incoming_data: Vec<u8>,
|
|
||||
send_buffer: Vec<u8>,
|
|
||||
_s: std::marker::PhantomData<S>,
|
|
||||
_r: std::marker::PhantomData<R>,
|
|
||||
}
|
|
||||
|
|
||||
impl<S, R> Client<S, R> where
|
|
||||
S: SendServerPacket + std::fmt::Debug,
|
|
||||
R: RecvServerPacket + std::fmt::Debug,
|
|
||||
{
|
|
||||
pub fn new(id: ClientId, socket: mio::tcp::TcpStream) -> Client<S, R> {
|
|
||||
Client {
|
|
||||
id: id,
|
|
||||
running: true,
|
|
||||
socket: socket,
|
|
||||
cipher_in: Box::new(NullCipher {}),
|
|
||||
cipher_out: Box::new(NullCipher {}),
|
|
||||
recv_buffer: Vec::with_capacity(32),
|
|
||||
incoming_data: Vec::new(),
|
|
||||
send_buffer: Vec::new(),
|
|
||||
_s: std::marker::PhantomData,
|
|
||||
_r: std::marker::PhantomData,
|
|
||||
}
|
|
||||
}
|
|
||||
|
|
||||
pub fn set_cipher(&mut self, cin: Box<dyn PSOCipher + Send>, out: Box<dyn PSOCipher + Send>) {
|
|
||||
self.cipher_in = cin;
|
|
||||
self.cipher_out = out;
|
|
||||
}
|
|
||||
|
|
||||
pub fn send_data(&mut self) {
|
|
||||
if self.send_buffer.len() == 0 {
|
|
||||
return;
|
|
||||
}
|
|
||||
match self.socket.write(&self.send_buffer) {
|
|
||||
Ok(len) => {
|
|
||||
if len == 0 {
|
|
||||
self.running = false;
|
|
||||
}
|
|
||||
self.send_buffer.drain(..len);
|
|
||||
},
|
|
||||
Err(err) => {
|
|
||||
warn!("[client] error sending data to {:?}: {:?}", self.socket, err);
|
|
||||
}
|
|
||||
}
|
|
||||
}
|
|
||||
|
|
||||
fn read_data_into_buffer(&mut self) -> Result<(), PacketNetworkError> {
|
|
||||
let mut new_data = [0u8; 0x8000];
|
|
||||
let len = self.socket.read(&mut new_data)?;
|
|
||||
if len == 0 {
|
|
||||
return Err(PacketNetworkError::ClientDisconnected);
|
|
||||
}
|
|
||||
|
|
||||
self.recv_buffer.extend_from_slice(&mut new_data[..len]);
|
|
||||
|
|
||||
let block_chunk_len = self.recv_buffer.len() / self.cipher_in.block_size() * self.cipher_in.block_size();
|
|
||||
let buf = self.recv_buffer.drain(..block_chunk_len).collect();
|
|
||||
let mut dec_buf = self.cipher_in.decrypt(&buf)?;
|
|
||||
self.incoming_data.append(&mut dec_buf);
|
|
||||
Ok(())
|
|
||||
}
|
|
||||
|
|
||||
pub fn read_pkts(&mut self) -> Result<Vec<R>, PacketNetworkError> {
|
|
||||
self.read_data_into_buffer()?;
|
|
||||
let mut result = Vec::new();
|
|
||||
|
|
||||
loop {
|
|
||||
if self.incoming_data.len() < 2 {
|
|
||||
break;
|
|
||||
}
|
|
||||
let pkt_size = u16::from_le_bytes([self.incoming_data[0], self.incoming_data[1]]) as usize;
|
|
||||
let mut pkt_len = pkt_size;
|
|
||||
while pkt_len % self.cipher_in.block_size() != 0 {
|
|
||||
pkt_len += 1;
|
|
||||
}
|
|
||||
|
|
||||
if pkt_len > self.incoming_data.len() {
|
|
||||
break;
|
|
||||
}
|
|
||||
|
|
||||
let pkt_data = self.incoming_data.drain(..pkt_len).collect::<Vec<_>>();
|
|
||||
|
|
||||
trace!("[recv buf from {:?}] {:?}", self.id, pkt_data);
|
|
||||
let pkt = match R::from_bytes(&pkt_data[..pkt_size]) {
|
|
||||
Ok(p) => p,
|
|
||||
Err(err) => {
|
|
||||
warn!("error RecvServerPacket::from_bytes: {:?}", err);
|
|
||||
continue
|
|
||||
},
|
|
||||
};
|
|
||||
|
|
||||
trace!("[recv from {:?}] {:?}", self.id, pkt);
|
|
||||
result.push(pkt);
|
|
||||
}
|
|
||||
Ok(result)
|
|
||||
}
|
|
||||
|
|
||||
pub fn send_pkt(&mut self, pkt: S) {
|
|
||||
trace!("[send to {:?}] {:?}", self.id, pkt);
|
|
||||
let buf = pkt.as_bytes();
|
|
||||
if buf.len() < 1024*2 {
|
|
||||
trace!("[send: buf] {:?}", buf);
|
|
||||
}
|
|
||||
else {
|
|
||||
trace!("[send: buf] [...large buffer...]");
|
|
||||
}
|
|
||||
let mut cbuf = self.cipher_out.encrypt(&buf).unwrap();
|
|
||||
self.send_buffer.append(&mut cbuf);
|
|
||||
self.send_data();
|
|
||||
}
|
|
||||
}
|
|
@ -1,184 +0,0 @@ |
|||||
//use std::thread;
|
|
||||
use std::collections::HashMap;
|
|
||||
|
|
||||
use std::net::{SocketAddr, Ipv4Addr};
|
|
||||
|
|
||||
use std::sync::mpsc::TryRecvError;
|
|
||||
use mio::tcp::TcpListener;
|
|
||||
use mio::{Events, Poll, Token, Ready, PollOpt};
|
|
||||
use log::{info, warn};
|
|
||||
|
|
||||
use crate::common::client::Client;
|
|
||||
use crate::common::serverstate::{SendServerPacket, RecvServerPacket, ClientId};
|
|
||||
use libpso::crypto::PSOCipher;
|
|
||||
|
|
||||
use mio_extras::channel::{Sender, Receiver};
|
|
||||
|
|
||||
use crate::common::network::PacketNetworkError;
|
|
||||
|
|
||||
//use threadpool::ThreadPool;
|
|
||||
|
|
||||
//const THREAD_COUNT: usize = 4;
|
|
||||
|
|
||||
fn client_read<S, R>(sender: &Sender<ClientPoolAction<R>>, client: &mut Client<S, R>) -> Result<(), PacketNetworkError> where
|
|
||||
S: SendServerPacket + std::fmt::Debug,
|
|
||||
R: RecvServerPacket + std::fmt::Debug,
|
|
||||
{
|
|
||||
let pkts = client.read_pkts();
|
|
||||
|
|
||||
for pkt in pkts? {
|
|
||||
sender.send(ClientPoolAction::Packet(client.id, pkt)).unwrap();
|
|
||||
}
|
|
||||
Ok(())
|
|
||||
}
|
|
||||
|
|
||||
fn client_write<S, R>(client: &mut Client<S, R>) where
|
|
||||
S: SendServerPacket + std::fmt::Debug,
|
|
||||
R: RecvServerPacket + std::fmt::Debug,
|
|
||||
{
|
|
||||
client.send_data();
|
|
||||
}
|
|
||||
|
|
||||
pub enum ClientAction<S> {
|
|
||||
EncryptionKeys(ClientId, Box<dyn PSOCipher + Send>, Box<dyn PSOCipher + Send>),
|
|
||||
Packet(ClientId, S)
|
|
||||
}
|
|
||||
|
|
||||
#[derive(Debug)]
|
|
||||
pub enum ClientPoolAction<R> {
|
|
||||
NewClient(ClientId),
|
|
||||
Packet(ClientId, R),
|
|
||||
Disconnect(ClientId),
|
|
||||
}
|
|
||||
|
|
||||
|
|
||||
pub struct ClientPool<S, R>{
|
|
||||
poll: Poll,
|
|
||||
receiver: Receiver<ClientAction<S>>,
|
|
||||
sender: Sender<ClientPoolAction<R>>,
|
|
||||
client_ids: HashMap<Token, ClientId>,
|
|
||||
clients: HashMap<ClientId, Client<S, R>>,
|
|
||||
listener: TcpListener,
|
|
||||
client_id_incr: usize,
|
|
||||
}
|
|
||||
|
|
||||
|
|
||||
|
|
||||
|
|
||||
impl<S, R> ClientPool<S, R> where
|
|
||||
S: SendServerPacket + std::fmt::Debug,
|
|
||||
R: RecvServerPacket + std::fmt::Debug,
|
|
||||
{
|
|
||||
pub fn new(
|
|
||||
receiver: Receiver<ClientAction<S>>,
|
|
||||
sender: Sender<ClientPoolAction<R>>,
|
|
||||
port: u16 |
|
||||
) -> ClientPool<S, R> {
|
|
||||
ClientPool {
|
|
||||
poll: Poll::new().unwrap(),
|
|
||||
receiver: receiver,
|
|
||||
sender: sender,
|
|
||||
client_ids: HashMap::new(),
|
|
||||
clients: HashMap::new(),
|
|
||||
listener: TcpListener::bind(&SocketAddr::from((Ipv4Addr::new(0,0,0,0), port))).unwrap(),
|
|
||||
client_id_incr: 3,
|
|
||||
}
|
|
||||
}
|
|
||||
|
|
||||
fn new_client(&mut self) {
|
|
||||
let (socket, _addr) = self.listener.accept().unwrap();
|
|
||||
|
|
||||
let client_id = ClientId(self.client_id_incr);
|
|
||||
self.client_id_incr += 1;
|
|
||||
|
|
||||
self.poll.register(&socket, Token(client_id.0), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap();
|
|
||||
let client = Client::new(client_id, socket);
|
|
||||
|
|
||||
self.client_ids.insert(Token(client_id.0), client_id);
|
|
||||
self.clients.insert(client_id, client);
|
|
||||
self.sender.send(ClientPoolAction::NewClient(client_id)).unwrap();
|
|
||||
}
|
|
||||
|
|
||||
fn packet_to_send(&mut self) {
|
|
||||
loop {
|
|
||||
match self.receiver.try_recv() {
|
|
||||
Ok(action) => {
|
|
||||
match action {
|
|
||||
ClientAction::EncryptionKeys(client_id, cipher_in, cipher_out) => {
|
|
||||
self.clients.get_mut(&client_id)
|
|
||||
.map(|client| {
|
|
||||
client.set_cipher(cipher_in, cipher_out);
|
|
||||
});
|
|
||||
}
|
|
||||
ClientAction::Packet(client_id, pkt) => {
|
|
||||
self.clients.get_mut(&client_id)
|
|
||||
.map(|client| {
|
|
||||
client.send_pkt(pkt);
|
|
||||
});
|
|
||||
}
|
|
||||
}
|
|
||||
},
|
|
||||
Err(err) => {
|
|
||||
match err {
|
|
||||
TryRecvError::Empty => break,
|
|
||||
TryRecvError::Disconnected => {
|
|
||||
break;
|
|
||||
// TODO!
|
|
||||
}
|
|
||||
}
|
|
||||
}
|
|
||||
}
|
|
||||
}
|
|
||||
}
|
|
||||
|
|
||||
|
|
||||
pub fn io_loop(mut self) {
|
|
||||
self.poll.register(&self.listener, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
|
|
||||
self.poll.register(&self.receiver, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
|
|
||||
|
|
||||
let mut events = Events::with_capacity(1024);
|
|
||||
|
|
||||
loop {
|
|
||||
self.poll.poll(&mut events, None).unwrap();
|
|
||||
|
|
||||
for event in &events {
|
|
||||
match event.token() {
|
|
||||
Token(0) => self.new_client(),
|
|
||||
Token(1) => self.packet_to_send(),
|
|
||||
_ => {
|
|
||||
let client_id = match self.client_ids.get(&event.token()) {
|
|
||||
Some(client_id) => client_id,
|
|
||||
None => continue,
|
|
||||
};
|
|
||||
|
|
||||
let client = match self.clients.get_mut(&client_id) {
|
|
||||
Some(client) => client,
|
|
||||
None => continue,
|
|
||||
};
|
|
||||
|
|
||||
if event.readiness().is_writable() {
|
|
||||
client_write(client);
|
|
||||
}
|
|
||||
if event.readiness().is_readable() {
|
|
||||
match client_read(&self.sender, client) {
|
|
||||
Ok(()) =>{},
|
|
||||
Err(err) => {
|
|
||||
match err {
|
|
||||
PacketNetworkError::ClientDisconnected => {
|
|
||||
info!("client {:?} disconnected", client_id);
|
|
||||
self.poll.deregister(&client.socket).unwrap();
|
|
||||
self.sender.send(ClientPoolAction::Disconnect(*client_id)).unwrap();
|
|
||||
},
|
|
||||
_ => {
|
|
||||
warn!("pkt err: {:?}", err);
|
|
||||
},
|
|
||||
}
|
|
||||
}
|
|
||||
}
|
|
||||
}
|
|
||||
}
|
|
||||
}
|
|
||||
}
|
|
||||
}
|
|
||||
}
|
|
||||
}
|
|
@ -1,99 +1,260 @@ |
|||||
use std::thread;
|
|
||||
use log::warn;
|
|
||||
use mio::{Events, Poll, Token, Ready, PollOpt};
|
|
||||
use mio_extras::channel::{channel, Sender, Receiver};
|
|
||||
|
use log::{trace, info, warn};
|
||||
|
use async_std::sync::{Arc, Mutex};
|
||||
|
use async_std::io::{Read, Write};
|
||||
|
use async_std::io::prelude::{ReadExt, WriteExt};
|
||||
|
use async_std::prelude::{StreamExt};
|
||||
|
use std::collections::HashMap;
|
||||
|
|
||||
use crate::common::clientpool::{ClientPool, ClientAction, ClientPoolAction};
|
|
||||
|
use libpso::crypto::{PSOCipher, NullCipher};
|
||||
|
use crate::common::serverstate::ClientId;
|
||||
use crate::common::serverstate::{RecvServerPacket, SendServerPacket, ServerState, OnConnect};
|
use crate::common::serverstate::{RecvServerPacket, SendServerPacket, ServerState, OnConnect};
|
||||
|
|
||||
fn recv_from_clientpool<STATE, S, R, E>(state: &mut STATE,
|
|
||||
pool_recv: &Receiver<ClientPoolAction<R>>,
|
|
||||
pool_send: &Sender<ClientAction<S>>) where
|
|
||||
STATE: ServerState<SendPacket=S, RecvPacket=R, PacketError=E>,
|
|
||||
S: SendServerPacket,
|
|
||||
R: RecvServerPacket,
|
|
||||
E: std::fmt::Debug,
|
|
||||
|
|
||||
|
enum PacketReceiverError {
|
||||
|
ClientDisconnect,
|
||||
|
}
|
||||
|
|
||||
|
struct PacketReceiver {
|
||||
|
socket: Arc<async_std::net::TcpStream>,
|
||||
|
cipher: Arc<Mutex<Box<dyn PSOCipher + Send>>>,
|
||||
|
recv_buffer: Vec<u8>,
|
||||
|
incoming_data: Vec<u8>,
|
||||
|
}
|
||||
|
|
||||
|
impl PacketReceiver {
|
||||
|
fn new(socket: Arc<async_std::net::TcpStream>, cipher: Arc<Mutex<Box<dyn PSOCipher + Send>>>) -> PacketReceiver {
|
||||
|
PacketReceiver {
|
||||
|
socket: socket,
|
||||
|
cipher: cipher,
|
||||
|
recv_buffer: Vec::new(),
|
||||
|
incoming_data: Vec::new(),
|
||||
|
}
|
||||
|
}
|
||||
|
|
||||
|
async fn fill_recv_buffer(&mut self) -> Result<(), PacketReceiverError>{
|
||||
|
let mut data = [0u8; 0x8000];
|
||||
|
|
||||
|
let mut socket = &*self.socket;
|
||||
|
let len = socket.read(&mut data).await.unwrap();
|
||||
|
if len == 0 {
|
||||
|
return Err(PacketReceiverError::ClientDisconnect);
|
||||
|
}
|
||||
|
|
||||
|
self.recv_buffer.extend_from_slice(&mut data[..len]);
|
||||
|
|
||||
|
let mut dec_buf = {
|
||||
|
let mut cipher = self.cipher.lock().await;
|
||||
|
let block_chunk_len = self.recv_buffer.len() / cipher.block_size() * cipher.block_size();
|
||||
|
let buf = self.recv_buffer.drain(..block_chunk_len).collect();
|
||||
|
cipher.decrypt(&buf).unwrap()
|
||||
|
};
|
||||
|
self.incoming_data.append(&mut dec_buf);
|
||||
|
|
||||
|
Ok(())
|
||||
|
}
|
||||
|
|
||||
|
async fn recv_pkts<R: RecvServerPacket + Send + std::fmt::Debug>(&mut self) -> Result<Vec<R>, PacketReceiverError> {
|
||||
|
self.fill_recv_buffer().await?;
|
||||
|
|
||||
|
let mut result = Vec::new();
|
||||
|
loop {
|
||||
|
if self.incoming_data.len() < 2 {
|
||||
|
break;
|
||||
|
}
|
||||
|
let pkt_size = u16::from_le_bytes([self.incoming_data[0], self.incoming_data[1]]) as usize;
|
||||
|
let mut pkt_len = pkt_size;
|
||||
|
while pkt_len % self.cipher.lock().await.block_size() != 0 {
|
||||
|
pkt_len += 1;
|
||||
|
}
|
||||
|
|
||||
|
if pkt_len > self.incoming_data.len() {
|
||||
|
break;
|
||||
|
}
|
||||
|
|
||||
|
let pkt_data = self.incoming_data.drain(..pkt_len).collect::<Vec<_>>();
|
||||
|
|
||||
|
trace!("[recv buf] {:?}", pkt_data);
|
||||
|
let pkt = match R::from_bytes(&pkt_data[..pkt_size]) {
|
||||
|
Ok(p) => p,
|
||||
|
Err(err) => {
|
||||
|
warn!("error RecvServerPacket::from_bytes: {:?}", err);
|
||||
|
continue
|
||||
|
},
|
||||
|
};
|
||||
|
|
||||
|
result.push(pkt);
|
||||
|
}
|
||||
|
|
||||
|
Ok(result)
|
||||
|
}
|
||||
|
}
|
||||
|
|
||||
|
async fn send_pkt<S: SendServerPacket + Send + std::fmt::Debug>(socket: Arc<async_std::net::TcpStream>, cipher: Arc<Mutex<Box<dyn PSOCipher + Send>>>, pkt: S) {
|
||||
|
let buf = pkt.as_bytes();
|
||||
|
let cbuf = cipher.lock().await.encrypt(&buf).unwrap();
|
||||
|
let mut ssock = &*socket;
|
||||
|
ssock.write_all(&cbuf).await;
|
||||
|
|
||||
|
}
|
||||
|
|
||||
|
|
||||
|
enum ClientAction<S, R> {
|
||||
|
NewClient(ClientId, async_std::sync::Sender<S>),
|
||||
|
Packet(ClientId, R),
|
||||
|
Disconnect(ClientId),
|
||||
|
}
|
||||
|
|
||||
|
enum ServerStateAction<S> {
|
||||
|
Cipher(Box<dyn PSOCipher + Send + Sync>, Box<dyn PSOCipher + Send + Sync>),
|
||||
|
Packet(S),
|
||||
|
Disconnect,
|
||||
|
}
|
||||
|
|
||||
|
async fn server_state_loop<STATE, S, R, E>(mut state: STATE,
|
||||
|
server_state_receiver: async_std::sync::Receiver<ClientAction<ServerStateAction<S>, R>>) where
|
||||
|
STATE: ServerState<SendPacket=S, RecvPacket=R, PacketError=E> + Send + 'static,
|
||||
|
S: SendServerPacket + std::fmt::Debug + Send + 'static,
|
||||
|
R: RecvServerPacket + std::fmt::Debug + Send + 'static,
|
||||
|
E: std::fmt::Debug + Send,
|
||||
{
|
{
|
||||
loop {
|
|
||||
match pool_recv.try_recv() {
|
|
||||
Ok(incoming) => {
|
|
||||
match incoming {
|
|
||||
ClientPoolAction::NewClient(client_id) => {
|
|
||||
for action in state.on_connect(client_id).into_iter() {
|
|
||||
match action {
|
|
||||
OnConnect::Cipher((in_cipher, out_cipher)) => {
|
|
||||
pool_send.send(ClientAction::EncryptionKeys(client_id, in_cipher, out_cipher)).unwrap();
|
|
||||
}
|
|
||||
OnConnect::Packet(pkt) => {
|
|
||||
pool_send.send(ClientAction::Packet(client_id, pkt)).unwrap();
|
|
||||
}
|
|
||||
}
|
|
||||
}
|
|
||||
},
|
|
||||
ClientPoolAction::Packet(client_id, pkt) => {
|
|
||||
let to_send = state.handle(client_id, &pkt);
|
|
||||
match to_send {
|
|
||||
Ok(pkts) => {
|
|
||||
for pkt in pkts {
|
|
||||
pool_send.send(ClientAction::Packet(pkt.0, pkt.1)).unwrap();
|
|
||||
}
|
|
||||
|
async_std::task::spawn(async move {
|
||||
|
let mut clients = HashMap::new();
|
||||
|
|
||||
|
{
|
||||
|
let action = server_state_receiver.recv().await.unwrap();
|
||||
|
|
||||
|
match action {
|
||||
|
ClientAction::NewClient(client_id, sender) => {
|
||||
|
clients.insert(client_id, sender.clone());
|
||||
|
for action in state.on_connect(client_id) {
|
||||
|
match action {
|
||||
|
OnConnect::Cipher((inc, outc)) => {
|
||||
|
sender.send(ServerStateAction::Cipher(inc, outc)).await;
|
||||
},
|
},
|
||||
Err(err) => {
|
|
||||
// TODO: break?
|
|
||||
warn!("[handler error]: {:?} {:?}", client_id, err);
|
|
||||
|
OnConnect::Packet(pkt) => {
|
||||
|
sender.send(ServerStateAction::Packet(pkt)).await;
|
||||
}
|
}
|
||||
}
|
}
|
||||
},
|
|
||||
ClientPoolAction::Disconnect(client_id) => {
|
|
||||
for pkt in state.on_disconnect(client_id) {
|
|
||||
pool_send.send(ClientAction::Packet(pkt.0, pkt.1)).unwrap();
|
|
||||
|
}
|
||||
|
},
|
||||
|
ClientAction::Packet(client_id, pkt) => {
|
||||
|
let k = state.handle(client_id, &pkt);
|
||||
|
let pkts = k.unwrap().collect::<Vec<_>>();
|
||||
|
for (client_id, pkt) in pkts {
|
||||
|
let client = clients.get_mut(&client_id).unwrap();
|
||||
|
client.send(ServerStateAction::Packet(pkt)).await;
|
||||
|
}
|
||||
|
},
|
||||
|
ClientAction::Disconnect(client_id) => {
|
||||
|
let pkts = state.on_disconnect(client_id);
|
||||
|
for (client_id, pkt) in pkts {
|
||||
|
let client = clients.get_mut(&client_id).unwrap();
|
||||
|
client.send(ServerStateAction::Packet(pkt)).await;
|
||||
|
}
|
||||
|
|
||||
|
let client = clients.remove(&client_id).unwrap();
|
||||
|
client.send(ServerStateAction::Disconnect).await;
|
||||
|
}
|
||||
|
}
|
||||
|
}
|
||||
|
});
|
||||
|
}
|
||||
|
|
||||
|
async fn client_recv_loop<S, R>(client_id: ClientId,
|
||||
|
socket: Arc<async_std::net::TcpStream>,
|
||||
|
cipher: Arc<Mutex<Box<dyn PSOCipher + Send>>>,
|
||||
|
server_sender: async_std::sync::Sender<ClientAction<ServerStateAction<S>, R>>,
|
||||
|
client_sender: async_std::sync::Sender<ServerStateAction<S>>) where
|
||||
|
S: SendServerPacket + std::fmt::Debug + Send + 'static,
|
||||
|
R: RecvServerPacket + std::fmt::Debug + Send + 'static,
|
||||
|
{
|
||||
|
async_std::task::spawn(async move {
|
||||
|
server_sender.send(ClientAction::NewClient(client_id, client_sender)).await;
|
||||
|
let mut pkt_receiver = PacketReceiver::new(socket, cipher);
|
||||
|
|
||||
|
loop {
|
||||
|
match pkt_receiver.recv_pkts().await {
|
||||
|
Ok(pkts) => {
|
||||
|
for pkt in pkts {
|
||||
|
trace!("[recv from {:?}] {:?}", client_id, pkt);
|
||||
|
server_sender.send(ClientAction::Packet(client_id, pkt)).await;
|
||||
|
}
|
||||
|
},
|
||||
|
Err(err) => {
|
||||
|
match err {
|
||||
|
PacketReceiverError::ClientDisconnect => {
|
||||
|
trace!("[client disconnected] {:?}", client_id);
|
||||
|
server_sender.send(ClientAction::Disconnect(client_id));
|
||||
|
break;
|
||||
}
|
}
|
||||
}
|
}
|
||||
}
|
}
|
||||
},
|
|
||||
Err(_err) => {
|
|
||||
break;
|
|
||||
}
|
}
|
||||
}
|
}
|
||||
}
|
|
||||
|
});
|
||||
}
|
}
|
||||
|
|
||||
|
async fn client_send_loop<S>(client_id: ClientId,
|
||||
|
socket: Arc<async_std::net::TcpStream>,
|
||||
|
cipher_in: Arc<Mutex<Box<dyn PSOCipher + Send>>>,
|
||||
|
cipher_out: Arc<Mutex<Box<dyn PSOCipher + Send>>>,
|
||||
|
client_receiver: async_std::sync::Receiver<ServerStateAction<S>>,
|
||||
|
|
||||
pub fn mainloop<STATE, S, R, E>(mut state: STATE, port: u16) where
|
|
||||
STATE: ServerState<SendPacket=S, RecvPacket=R, PacketError=E> + Send + 'static,
|
|
||||
|
) where
|
||||
S: SendServerPacket + std::fmt::Debug + Send + 'static,
|
S: SendServerPacket + std::fmt::Debug + Send + 'static,
|
||||
R: RecvServerPacket + std::fmt::Debug + Send + 'static,
|
|
||||
E: std::fmt::Debug,
|
|
||||
{
|
{
|
||||
let (pool_send, pool_recv) = channel();
|
|
||||
//let (patch_handler_send, patch_handler_recv) = channel::<ClientPoolAction<RecvPatchPacket>>();
|
|
||||
let (handler_send, handler_recv) = channel();
|
|
||||
|
|
||||
//let sender_clone = patch_handler_send.clone();
|
|
||||
let client_thread = thread::spawn(move || {
|
|
||||
let clientpool = ClientPool::new(pool_recv, handler_send, port);
|
|
||||
clientpool.io_loop();
|
|
||||
|
async_std::task::spawn(async move {
|
||||
|
loop {
|
||||
|
let action = client_receiver.recv().await.unwrap();
|
||||
|
match action {
|
||||
|
ServerStateAction::Cipher(inc, outc) => {
|
||||
|
*cipher_in.lock().await = inc;
|
||||
|
*cipher_out.lock().await = outc;
|
||||
|
}
|
||||
|
ServerStateAction::Packet(pkt) => {
|
||||
|
trace!("[send to {:?}] {:?}", client_id, pkt);
|
||||
|
send_pkt(socket.clone(), cipher_out.clone(), pkt).await
|
||||
|
},
|
||||
|
ServerStateAction::Disconnect => {
|
||||
|
break;
|
||||
|
}
|
||||
|
};
|
||||
|
}
|
||||
});
|
});
|
||||
|
}
|
||||
|
|
||||
//let handler_threadpool = threadpool::ThreadPool::new(4);
|
|
||||
let handler_thread = thread::spawn(move || {
|
|
||||
let poll = Poll::new().unwrap();
|
|
||||
poll.register(&handler_recv, Token(0), Ready::readable(), PollOpt::edge()).unwrap();
|
|
||||
|
|
||||
let mut events = Events::with_capacity(1024);
|
|
||||
|
pub async fn mainloop_async<STATE, S, R, E>(mut state: STATE, port: u16) where
|
||||
|
STATE: ServerState<SendPacket=S, RecvPacket=R, PacketError=E> + Send + 'static,
|
||||
|
S: SendServerPacket + std::fmt::Debug + Send + Sync + 'static,
|
||||
|
R: RecvServerPacket + std::fmt::Debug + Send + Sync + 'static,
|
||||
|
E: std::fmt::Debug + Send,
|
||||
|
{
|
||||
|
|
||||
|
let listener = async_std::task::spawn(async move {
|
||||
|
let listener = async_std::net::TcpListener::bind(&std::net::SocketAddr::from((std::net::Ipv4Addr::new(0,0,0,0), port))).await.unwrap();
|
||||
|
let mut id = 1;
|
||||
|
|
||||
|
let (server_state_sender, server_state_receiver) = async_std::sync::channel(1024);
|
||||
|
|
||||
|
server_state_loop(state, server_state_receiver).await;
|
||||
|
|
||||
loop {
|
loop {
|
||||
poll.poll(&mut events, None).unwrap();
|
|
||||
|
|
||||
for event in &events {
|
|
||||
match event.token() {
|
|
||||
Token(0) => recv_from_clientpool(&mut state, &handler_recv, &pool_send),
|
|
||||
_ => panic!()
|
|
||||
}
|
|
||||
}
|
|
||||
|
let (sock, addr) = listener.accept().await.unwrap();
|
||||
|
let client_id = crate::common::serverstate::ClientId(id);
|
||||
|
id += 1;
|
||||
|
|
||||
|
info!("new client {:?} {:?} {:?}", client_id, sock, addr);
|
||||
|
|
||||
|
let (client_sender, client_receiver) = async_std::sync::channel(64);
|
||||
|
let socket = Arc::new(sock);
|
||||
|
let cipher_in: Arc<Mutex<Box<dyn PSOCipher + Send>>> = Arc::new(Mutex::new(Box::new(NullCipher {})));
|
||||
|
let cipher_out: Arc<Mutex<Box<dyn PSOCipher + Send>>> = Arc::new(Mutex::new(Box::new(NullCipher {})));
|
||||
|
|
||||
|
client_recv_loop(client_id, socket.clone(), cipher_in.clone(), server_state_sender.clone(), client_sender).await;
|
||||
|
client_send_loop(client_id, socket.clone(), cipher_in.clone(), cipher_out.clone(), client_receiver).await;
|
||||
}
|
}
|
||||
});
|
});
|
||||
|
|
||||
client_thread.join().unwrap();
|
|
||||
handler_thread.join().unwrap();
|
|
||||
|
listener.await
|
||||
}
|
}
|
Write
Preview
Loading…
Cancel
Save
Reference in new issue