add transactions!
This commit is contained in:
parent
a2e78014ee
commit
cbb5c1fffd
@ -1,4 +1,6 @@
|
||||
use std::convert::From;
|
||||
use thiserror::Error;
|
||||
use futures::Future;
|
||||
|
||||
use crate::entity::account::*;
|
||||
use crate::entity::character::*;
|
||||
@ -15,7 +17,35 @@ pub enum GatewayError {
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait EntityGateway: Send + Sync + Clone {
|
||||
pub trait EntityGatewayTransaction: Send + Sync {
|
||||
fn gateway<'a>(&'a mut self) -> &'a mut dyn EntityGateway {
|
||||
unimplemented!()
|
||||
}
|
||||
|
||||
async fn commit(self: Box<Self>) -> Result<(), GatewayError> {
|
||||
unimplemented!()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[async_trait::async_trait]
|
||||
pub trait EntityGateway: Send + Sync {
|
||||
async fn transaction(&'static mut self) -> Result<Box<dyn EntityGatewayTransaction + 'static>, GatewayError>
|
||||
{
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
async fn with_transaction<F, Fut, R, E>(&'static mut self, _func: F) -> Result<R, E>
|
||||
where
|
||||
Fut: Future<Output = Result<(Box<dyn EntityGatewayTransaction>, R), E>> + Send,
|
||||
F: FnOnce(Box<dyn EntityGatewayTransaction>) -> Fut + Send,
|
||||
R: Send,
|
||||
E: From<GatewayError>,
|
||||
Self: Sized
|
||||
{
|
||||
unimplemented!();
|
||||
}
|
||||
|
||||
async fn create_user(&mut self, _user: NewUserAccountEntity) -> Result<UserAccountEntity, GatewayError> {
|
||||
unimplemented!()
|
||||
}
|
||||
|
@ -1,13 +1,43 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::convert::TryInto;
|
||||
use futures::Future;
|
||||
|
||||
use crate::entity::account::*;
|
||||
use crate::entity::character::*;
|
||||
use crate::entity::gateway::{EntityGateway, GatewayError};
|
||||
use crate::entity::gateway::{EntityGateway, EntityGatewayTransaction, GatewayError};
|
||||
use crate::entity::item::*;
|
||||
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
|
||||
pub struct InMemoryGatewayTransaction {
|
||||
working_gateway: InMemoryGateway,
|
||||
original_gateway: &'static mut InMemoryGateway,
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl EntityGatewayTransaction for InMemoryGatewayTransaction {
|
||||
fn gateway<'b>(&'b mut self) -> &'b mut dyn EntityGateway {
|
||||
&mut self.working_gateway
|
||||
}
|
||||
|
||||
async fn commit(self: Box<Self>) -> Result<(), GatewayError> {
|
||||
self.original_gateway.users = self.working_gateway.users.clone();
|
||||
self.original_gateway.user_settings = self.working_gateway.user_settings.clone();
|
||||
self.original_gateway.characters = self.working_gateway.characters.clone();
|
||||
self.original_gateway.character_meseta = self.working_gateway.character_meseta.clone();
|
||||
self.original_gateway.bank_meseta = self.working_gateway.bank_meseta.clone();
|
||||
self.original_gateway.items = self.working_gateway.items.clone();
|
||||
self.original_gateway.inventories = self.working_gateway.inventories.clone();
|
||||
self.original_gateway.banks = self.working_gateway.banks.clone();
|
||||
self.original_gateway.equips = self.working_gateway.equips.clone();
|
||||
self.original_gateway.mag_modifiers = self.working_gateway.mag_modifiers.clone();
|
||||
self.original_gateway.weapon_modifiers = self.working_gateway.weapon_modifiers.clone();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct InMemoryGateway {
|
||||
users: Arc<Mutex<BTreeMap<UserAccountId, UserAccountEntity>>>,
|
||||
@ -99,6 +129,87 @@ impl InMemoryGateway {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl EntityGateway for InMemoryGateway {
|
||||
async fn transaction(&'static mut self) -> Result<Box<dyn EntityGatewayTransaction + 'static>, GatewayError>
|
||||
{
|
||||
let working_gateway = {
|
||||
let users = self.users.lock().unwrap().clone();
|
||||
let user_settings = self.user_settings.lock().unwrap().clone();
|
||||
let characters = self.characters.lock().unwrap().clone();
|
||||
let character_meseta = self.character_meseta.lock().unwrap().clone();
|
||||
let bank_meseta = self.bank_meseta.lock().unwrap().clone();
|
||||
let items = self.items.lock().unwrap().clone();
|
||||
let inventories = self.inventories.lock().unwrap().clone();
|
||||
let banks = self.banks.lock().unwrap().clone();
|
||||
let equips = self.equips.lock().unwrap().clone();
|
||||
let mag_modifiers = self.mag_modifiers.lock().unwrap().clone();
|
||||
let weapon_modifiers = self.weapon_modifiers.lock().unwrap().clone();
|
||||
|
||||
InMemoryGateway {
|
||||
users: Arc::new(Mutex::new(users)),
|
||||
user_settings: Arc::new(Mutex::new(user_settings)),
|
||||
characters: Arc::new(Mutex::new(characters)),
|
||||
character_meseta: Arc::new(Mutex::new(character_meseta)),
|
||||
bank_meseta: Arc::new(Mutex::new(bank_meseta)),
|
||||
items: Arc::new(Mutex::new(items)),
|
||||
inventories: Arc::new(Mutex::new(inventories)),
|
||||
banks: Arc::new(Mutex::new(banks)),
|
||||
equips: Arc::new(Mutex::new(equips)),
|
||||
mag_modifiers: Arc::new(Mutex::new(mag_modifiers)),
|
||||
weapon_modifiers: Arc::new(Mutex::new(weapon_modifiers)),
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Box::new(InMemoryGatewayTransaction {
|
||||
working_gateway,
|
||||
original_gateway: self,
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
async fn with_transaction<F, Fut, R, E>(&'static mut self, func: F) -> Result<R, E>
|
||||
where
|
||||
Fut: Future<Output = Result<(Box<dyn EntityGatewayTransaction>, R), E>> + Send,
|
||||
F: FnOnce(Box<dyn EntityGatewayTransaction>) -> Fut + Send,
|
||||
R: Send,
|
||||
E: From<GatewayError>,
|
||||
{
|
||||
let users = self.users.lock().unwrap().clone();
|
||||
let user_settings = self.user_settings.lock().unwrap().clone();
|
||||
let characters = self.characters.lock().unwrap().clone();
|
||||
let character_meseta = self.character_meseta.lock().unwrap().clone();
|
||||
let bank_meseta = self.bank_meseta.lock().unwrap().clone();
|
||||
let items = self.items.lock().unwrap().clone();
|
||||
let inventories = self.inventories.lock().unwrap().clone();
|
||||
let banks = self.banks.lock().unwrap().clone();
|
||||
let equips = self.equips.lock().unwrap().clone();
|
||||
let mag_modifiers = self.mag_modifiers.lock().unwrap().clone();
|
||||
let weapon_modifiers = self.weapon_modifiers.lock().unwrap().clone();
|
||||
|
||||
let working_gateway = InMemoryGateway {
|
||||
users: Arc::new(Mutex::new(users)),
|
||||
user_settings: Arc::new(Mutex::new(user_settings)),
|
||||
characters: Arc::new(Mutex::new(characters)),
|
||||
character_meseta: Arc::new(Mutex::new(character_meseta)),
|
||||
bank_meseta: Arc::new(Mutex::new(bank_meseta)),
|
||||
items: Arc::new(Mutex::new(items)),
|
||||
inventories: Arc::new(Mutex::new(inventories)),
|
||||
banks: Arc::new(Mutex::new(banks)),
|
||||
equips: Arc::new(Mutex::new(equips)),
|
||||
mag_modifiers: Arc::new(Mutex::new(mag_modifiers)),
|
||||
weapon_modifiers: Arc::new(Mutex::new(weapon_modifiers)),
|
||||
};
|
||||
|
||||
let transaction = Box::new(InMemoryGatewayTransaction {
|
||||
working_gateway,
|
||||
original_gateway: self,
|
||||
});
|
||||
|
||||
let (mut transaction, result) = func(transaction).await?;
|
||||
|
||||
transaction.commit().await?;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn create_user(&mut self, user: NewUserAccountEntity) -> Result<UserAccountEntity, GatewayError> {
|
||||
let mut users = self.users.lock().unwrap();
|
||||
let id = users
|
||||
|
@ -2,6 +2,6 @@ pub mod entitygateway;
|
||||
pub mod inmemory;
|
||||
pub mod postgres;
|
||||
|
||||
pub use entitygateway::{EntityGateway, GatewayError};
|
||||
pub use entitygateway::{EntityGateway, EntityGatewayTransaction, GatewayError};
|
||||
pub use inmemory::InMemoryGateway;
|
||||
pub use self::postgres::PostgresGateway;
|
||||
|
@ -1,14 +1,17 @@
|
||||
use async_std::sync::{Arc, Mutex};
|
||||
use std::convert::{From, TryFrom, Into};
|
||||
use futures::TryStreamExt;
|
||||
use futures::{Future, TryStreamExt};
|
||||
use async_std::stream::StreamExt;
|
||||
use libpso::character::guildcard;
|
||||
use crate::entity::account::*;
|
||||
use crate::entity::character::*;
|
||||
use crate::entity::gateway::{EntityGateway, GatewayError};
|
||||
use crate::entity::gateway::{EntityGateway, EntityGatewayTransaction, GatewayError};
|
||||
use crate::entity::item::*;
|
||||
use super::models::*;
|
||||
|
||||
use sqlx::postgres::PgPoolOptions;
|
||||
use sqlx::Connection;
|
||||
|
||||
|
||||
mod embedded {
|
||||
use refinery::embed_migrations;
|
||||
@ -16,6 +19,24 @@ mod embedded {
|
||||
}
|
||||
|
||||
|
||||
pub struct PostgresTransaction<'t> {
|
||||
pgtransaction: sqlx::Transaction<'t, sqlx::Postgres>,
|
||||
}
|
||||
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<'t> EntityGatewayTransaction for PostgresTransaction<'t> {
|
||||
fn gateway<'b>(&'b mut self) -> &'b mut dyn EntityGateway {
|
||||
self
|
||||
}
|
||||
|
||||
async fn commit(self: Box<Self>) -> Result<(), GatewayError> {
|
||||
self.pgtransaction.commit().await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct PostgresGateway {
|
||||
pool: sqlx::Pool<sqlx::Postgres>,
|
||||
@ -539,6 +560,28 @@ async fn get_bank_meseta(conn: &mut sqlx::PgConnection, char_id: &CharacterEntit
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl EntityGateway for PostgresGateway {
|
||||
async fn transaction(&'static mut self) -> Result<Box<dyn EntityGatewayTransaction + 'static>, GatewayError>
|
||||
{
|
||||
Ok(Box::new(PostgresTransaction {
|
||||
pgtransaction: self.pool.begin().await?,
|
||||
}))
|
||||
}
|
||||
|
||||
async fn with_transaction<F, Fut, R, E>(&'static mut self, func: F) -> Result<R, E>
|
||||
where
|
||||
Fut: Future<Output = Result<(Box<dyn EntityGatewayTransaction>, R), E>> + Send,
|
||||
F: FnOnce(Box<dyn EntityGatewayTransaction>) -> Fut + Send,
|
||||
R: Send,
|
||||
E: From<GatewayError>,
|
||||
{
|
||||
let mut transaction = Box::new(PostgresTransaction {
|
||||
pgtransaction: self.pool.begin().await.map_err(|_| ()).unwrap()
|
||||
});
|
||||
let (mut transaction, result) = func(transaction).await.map_err(|_| ()).unwrap();
|
||||
transaction.commit().await.map_err(|_| ()).unwrap();
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
async fn create_user(&mut self, user: NewUserAccountEntity) -> Result<UserAccountEntity, GatewayError> {
|
||||
create_user(&mut *self.pool.acquire().await?, user).await
|
||||
}
|
||||
@ -653,3 +696,118 @@ impl EntityGateway for PostgresGateway {
|
||||
}
|
||||
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl<'c> EntityGateway for PostgresTransaction<'c> {
|
||||
async fn create_user(&mut self, user: NewUserAccountEntity) -> Result<UserAccountEntity, GatewayError> {
|
||||
create_user(&mut *self.pgtransaction, user).await
|
||||
}
|
||||
|
||||
async fn get_user_by_id(&mut self, id: UserAccountId) -> Result<UserAccountEntity, GatewayError> {
|
||||
get_user_by_id(&mut *self.pgtransaction, id).await
|
||||
}
|
||||
|
||||
async fn get_user_by_name(&mut self, username: String) -> Result<UserAccountEntity, GatewayError> {
|
||||
get_user_by_name(&mut *self.pgtransaction, username).await
|
||||
}
|
||||
|
||||
async fn save_user(&mut self, user: &UserAccountEntity) -> Result<(), GatewayError> {
|
||||
save_user(&mut *self.pgtransaction, user).await
|
||||
}
|
||||
|
||||
async fn create_user_settings(&mut self, settings: NewUserSettingsEntity) -> Result<UserSettingsEntity, GatewayError> {
|
||||
create_user_settings(&mut *self.pgtransaction, settings).await
|
||||
}
|
||||
|
||||
async fn get_user_settings_by_user(&mut self, user: &UserAccountEntity) -> Result<UserSettingsEntity, GatewayError> {
|
||||
get_user_settings_by_user(&mut *self.pgtransaction, user).await
|
||||
}
|
||||
|
||||
async fn save_user_settings(&mut self, settings: &UserSettingsEntity) -> Result<(), GatewayError> {
|
||||
save_user_settings(&mut *self.pgtransaction, settings).await
|
||||
}
|
||||
|
||||
async fn create_character(&mut self, char: NewCharacterEntity) -> Result<CharacterEntity, GatewayError> {
|
||||
create_character(&mut *self.pgtransaction, char).await
|
||||
}
|
||||
|
||||
async fn get_characters_by_user(&mut self, user: &UserAccountEntity) -> Result<[Option<CharacterEntity>; 4], GatewayError> {
|
||||
get_characters_by_user(&mut *self.pgtransaction, user).await
|
||||
}
|
||||
|
||||
async fn save_character(&mut self, char: &CharacterEntity) -> Result<(), GatewayError> {
|
||||
save_character(&mut *self.pgtransaction, char).await
|
||||
}
|
||||
|
||||
async fn get_guild_card_data_by_user(&mut self, user: &UserAccountEntity) -> Result<GuildCardDataEntity, GatewayError> {
|
||||
Ok(GuildCardDataEntity {
|
||||
id: GuildCardDataId(0),
|
||||
user_id: user.id,
|
||||
guildcard: guildcard::GuildCardData::default(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn create_item(&mut self, item: NewItemEntity) -> Result<ItemEntity, GatewayError> {
|
||||
create_item(&mut *self.pgtransaction, item).await
|
||||
}
|
||||
|
||||
async fn add_item_note(&mut self, item_id: &ItemEntityId, item_note: ItemNote) -> Result<(), GatewayError> {
|
||||
add_item_note(&mut *self.pgtransaction, item_id, item_note).await
|
||||
}
|
||||
|
||||
async fn feed_mag(&mut self, mag_item_id: &ItemEntityId, tool_item_id: &ItemEntityId) -> Result<(), GatewayError> {
|
||||
feed_mag(&mut *self.pgtransaction, mag_item_id, tool_item_id).await
|
||||
}
|
||||
|
||||
async fn change_mag_owner(&mut self, mag_item_id: &ItemEntityId, character: &CharacterEntity) -> Result<(), GatewayError> {
|
||||
change_mag_owner(&mut *self.pgtransaction, mag_item_id, character).await
|
||||
}
|
||||
|
||||
async fn use_mag_cell(&mut self, mag_item_id: &ItemEntityId, mag_cell_id: &ItemEntityId) -> Result<(), GatewayError> {
|
||||
use_mag_cell(&mut *self.pgtransaction, mag_item_id, mag_cell_id).await
|
||||
}
|
||||
|
||||
async fn add_weapon_modifier(&mut self, item_id: &ItemEntityId, modifier: weapon::WeaponModifier) -> Result<(), GatewayError> {
|
||||
add_weapon_modifier(&mut *self.pgtransaction, item_id, modifier).await
|
||||
}
|
||||
|
||||
async fn get_character_inventory(&mut self, char_id: &CharacterEntityId) -> Result<InventoryEntity, GatewayError> {
|
||||
get_character_inventory(&mut *self.pgtransaction, char_id).await
|
||||
}
|
||||
|
||||
async fn get_character_bank(&mut self, char_id: &CharacterEntityId, bank_name: BankName) -> Result<BankEntity, GatewayError> {
|
||||
get_character_bank(&mut *self.pgtransaction, char_id, bank_name).await
|
||||
}
|
||||
|
||||
async fn set_character_inventory(&mut self, char_id: &CharacterEntityId, inventory: &InventoryEntity) -> Result<(), GatewayError> {
|
||||
set_character_inventory(&mut *self.pgtransaction, char_id, inventory).await
|
||||
}
|
||||
|
||||
async fn set_character_bank(&mut self, char_id: &CharacterEntityId, bank: &BankEntity, bank_name: BankName) -> Result<(), GatewayError> {
|
||||
set_character_bank(&mut *self.pgtransaction, char_id, bank, bank_name).await
|
||||
}
|
||||
|
||||
async fn get_character_equips(&mut self, char_id: &CharacterEntityId) -> Result<EquippedEntity, GatewayError> {
|
||||
get_character_equips(&mut *self.pgtransaction, char_id).await
|
||||
}
|
||||
|
||||
async fn set_character_equips(&mut self, char_id: &CharacterEntityId, equips: &EquippedEntity) -> Result<(), GatewayError> {
|
||||
set_character_equips(&mut *self.pgtransaction, char_id, equips).await
|
||||
}
|
||||
|
||||
async fn set_character_meseta(&mut self, char_id: &CharacterEntityId, meseta: Meseta) -> Result<(), GatewayError> {
|
||||
set_character_meseta(&mut *self.pgtransaction, char_id, meseta).await
|
||||
}
|
||||
|
||||
async fn get_character_meseta(&mut self, char_id: &CharacterEntityId) -> Result<Meseta, GatewayError> {
|
||||
get_character_meseta(&mut *self.pgtransaction, char_id).await
|
||||
}
|
||||
|
||||
async fn set_bank_meseta(&mut self, char_id: &CharacterEntityId, bank: BankName, meseta: Meseta) -> Result<(), GatewayError> {
|
||||
set_bank_meseta(&mut *self.pgtransaction, char_id, bank, meseta).await
|
||||
}
|
||||
|
||||
async fn get_bank_meseta(&mut self, char_id: &CharacterEntityId, bank: BankName) -> Result<Meseta, GatewayError> {
|
||||
get_bank_meseta(&mut *self.pgtransaction, char_id, bank).await
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user