Browse Source

add transactions!

kill_counters
jake 3 years ago
committed by andy
parent
commit
1cdf091a2c
  1. 32
      src/entity/gateway/entitygateway.rs
  2. 113
      src/entity/gateway/inmemory.rs
  3. 2
      src/entity/gateway/mod.rs
  4. 162
      src/entity/gateway/postgres/postgres.rs

32
src/entity/gateway/entitygateway.rs

@ -1,4 +1,6 @@
use std::convert::From;
use thiserror::Error; use thiserror::Error;
use futures::Future;
use crate::entity::account::*; use crate::entity::account::*;
use crate::entity::character::*; use crate::entity::character::*;
@ -15,7 +17,35 @@ pub enum GatewayError {
} }
#[async_trait::async_trait] #[async_trait::async_trait]
pub trait EntityGateway: Send + Sync + Clone {
pub trait EntityGatewayTransaction: Send + Sync {
fn gateway<'a>(&'a mut self) -> &'a mut dyn EntityGateway {
unimplemented!()
}
async fn commit(self: Box<Self>) -> Result<(), GatewayError> {
unimplemented!()
}
}
#[async_trait::async_trait]
pub trait EntityGateway: Send + Sync {
async fn transaction(&'static mut self) -> Result<Box<dyn EntityGatewayTransaction + 'static>, GatewayError>
{
unimplemented!();
}
async fn with_transaction<F, Fut, R, E>(&'static mut self, _func: F) -> Result<R, E>
where
Fut: Future<Output = Result<(Box<dyn EntityGatewayTransaction>, R), E>> + Send,
F: FnOnce(Box<dyn EntityGatewayTransaction>) -> Fut + Send,
R: Send,
E: From<GatewayError>,
Self: Sized
{
unimplemented!();
}
async fn create_user(&mut self, _user: NewUserAccountEntity) -> Result<UserAccountEntity, GatewayError> { async fn create_user(&mut self, _user: NewUserAccountEntity) -> Result<UserAccountEntity, GatewayError> {
unimplemented!() unimplemented!()
} }

113
src/entity/gateway/inmemory.rs

@ -1,13 +1,43 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::convert::TryInto; use std::convert::TryInto;
use futures::Future;
use crate::entity::account::*; use crate::entity::account::*;
use crate::entity::character::*; use crate::entity::character::*;
use crate::entity::gateway::{EntityGateway, GatewayError};
use crate::entity::gateway::{EntityGateway, EntityGatewayTransaction, GatewayError};
use crate::entity::item::*; use crate::entity::item::*;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
pub struct InMemoryGatewayTransaction {
working_gateway: InMemoryGateway,
original_gateway: &'static mut InMemoryGateway,
}
#[async_trait::async_trait]
impl EntityGatewayTransaction for InMemoryGatewayTransaction {
fn gateway<'b>(&'b mut self) -> &'b mut dyn EntityGateway {
&mut self.working_gateway
}
async fn commit(self: Box<Self>) -> Result<(), GatewayError> {
self.original_gateway.users = self.working_gateway.users.clone();
self.original_gateway.user_settings = self.working_gateway.user_settings.clone();
self.original_gateway.characters = self.working_gateway.characters.clone();
self.original_gateway.character_meseta = self.working_gateway.character_meseta.clone();
self.original_gateway.bank_meseta = self.working_gateway.bank_meseta.clone();
self.original_gateway.items = self.working_gateway.items.clone();
self.original_gateway.inventories = self.working_gateway.inventories.clone();
self.original_gateway.banks = self.working_gateway.banks.clone();
self.original_gateway.equips = self.working_gateway.equips.clone();
self.original_gateway.mag_modifiers = self.working_gateway.mag_modifiers.clone();
self.original_gateway.weapon_modifiers = self.working_gateway.weapon_modifiers.clone();
Ok(())
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct InMemoryGateway { pub struct InMemoryGateway {
users: Arc<Mutex<BTreeMap<UserAccountId, UserAccountEntity>>>, users: Arc<Mutex<BTreeMap<UserAccountId, UserAccountEntity>>>,
@ -115,6 +145,87 @@ impl InMemoryGateway {
#[async_trait::async_trait] #[async_trait::async_trait]
impl EntityGateway for InMemoryGateway { impl EntityGateway for InMemoryGateway {
async fn transaction(&'static mut self) -> Result<Box<dyn EntityGatewayTransaction + 'static>, GatewayError>
{
let working_gateway = {
let users = self.users.lock().unwrap().clone();
let user_settings = self.user_settings.lock().unwrap().clone();
let characters = self.characters.lock().unwrap().clone();
let character_meseta = self.character_meseta.lock().unwrap().clone();
let bank_meseta = self.bank_meseta.lock().unwrap().clone();
let items = self.items.lock().unwrap().clone();
let inventories = self.inventories.lock().unwrap().clone();
let banks = self.banks.lock().unwrap().clone();
let equips = self.equips.lock().unwrap().clone();
let mag_modifiers = self.mag_modifiers.lock().unwrap().clone();
let weapon_modifiers = self.weapon_modifiers.lock().unwrap().clone();
InMemoryGateway {
users: Arc::new(Mutex::new(users)),
user_settings: Arc::new(Mutex::new(user_settings)),
characters: Arc::new(Mutex::new(characters)),
character_meseta: Arc::new(Mutex::new(character_meseta)),
bank_meseta: Arc::new(Mutex::new(bank_meseta)),
items: Arc::new(Mutex::new(items)),
inventories: Arc::new(Mutex::new(inventories)),
banks: Arc::new(Mutex::new(banks)),
equips: Arc::new(Mutex::new(equips)),
mag_modifiers: Arc::new(Mutex::new(mag_modifiers)),
weapon_modifiers: Arc::new(Mutex::new(weapon_modifiers)),
}
};
Ok(Box::new(InMemoryGatewayTransaction {
working_gateway,
original_gateway: self,
}))
}
async fn with_transaction<F, Fut, R, E>(&'static mut self, func: F) -> Result<R, E>
where
Fut: Future<Output = Result<(Box<dyn EntityGatewayTransaction>, R), E>> + Send,
F: FnOnce(Box<dyn EntityGatewayTransaction>) -> Fut + Send,
R: Send,
E: From<GatewayError>,
{
let users = self.users.lock().unwrap().clone();
let user_settings = self.user_settings.lock().unwrap().clone();
let characters = self.characters.lock().unwrap().clone();
let character_meseta = self.character_meseta.lock().unwrap().clone();
let bank_meseta = self.bank_meseta.lock().unwrap().clone();
let items = self.items.lock().unwrap().clone();
let inventories = self.inventories.lock().unwrap().clone();
let banks = self.banks.lock().unwrap().clone();
let equips = self.equips.lock().unwrap().clone();
let mag_modifiers = self.mag_modifiers.lock().unwrap().clone();
let weapon_modifiers = self.weapon_modifiers.lock().unwrap().clone();
let working_gateway = InMemoryGateway {
users: Arc::new(Mutex::new(users)),
user_settings: Arc::new(Mutex::new(user_settings)),
characters: Arc::new(Mutex::new(characters)),
character_meseta: Arc::new(Mutex::new(character_meseta)),
bank_meseta: Arc::new(Mutex::new(bank_meseta)),
items: Arc::new(Mutex::new(items)),
inventories: Arc::new(Mutex::new(inventories)),
banks: Arc::new(Mutex::new(banks)),
equips: Arc::new(Mutex::new(equips)),
mag_modifiers: Arc::new(Mutex::new(mag_modifiers)),
weapon_modifiers: Arc::new(Mutex::new(weapon_modifiers)),
};
let transaction = Box::new(InMemoryGatewayTransaction {
working_gateway,
original_gateway: self,
});
let (mut transaction, result) = func(transaction).await?;
transaction.commit().await?;
Ok(result)
}
async fn create_user(&mut self, user: NewUserAccountEntity) -> Result<UserAccountEntity, GatewayError> { async fn create_user(&mut self, user: NewUserAccountEntity) -> Result<UserAccountEntity, GatewayError> {
let mut users = self.users.lock().unwrap(); let mut users = self.users.lock().unwrap();
let id = users let id = users

2
src/entity/gateway/mod.rs

@ -2,6 +2,6 @@ pub mod entitygateway;
pub mod inmemory; pub mod inmemory;
pub mod postgres; pub mod postgres;
pub use entitygateway::{EntityGateway, GatewayError};
pub use entitygateway::{EntityGateway, EntityGatewayTransaction, GatewayError};
pub use inmemory::InMemoryGateway; pub use inmemory::InMemoryGateway;
pub use self::postgres::PostgresGateway; pub use self::postgres::PostgresGateway;

162
src/entity/gateway/postgres/postgres.rs

@ -1,14 +1,17 @@
use async_std::sync::{Arc, Mutex};
use std::convert::{From, TryFrom, Into}; use std::convert::{From, TryFrom, Into};
use futures::TryStreamExt;
use futures::{Future, TryStreamExt};
use async_std::stream::StreamExt; use async_std::stream::StreamExt;
use libpso::character::guildcard; use libpso::character::guildcard;
use crate::entity::account::*; use crate::entity::account::*;
use crate::entity::character::*; use crate::entity::character::*;
use crate::entity::gateway::{EntityGateway, GatewayError};
use crate::entity::gateway::{EntityGateway, EntityGatewayTransaction, GatewayError};
use crate::entity::item::*; use crate::entity::item::*;
use super::models::*; use super::models::*;
use sqlx::postgres::PgPoolOptions; use sqlx::postgres::PgPoolOptions;
use sqlx::Connection;
mod embedded { mod embedded {
use refinery::embed_migrations; use refinery::embed_migrations;
@ -16,6 +19,24 @@ mod embedded {
} }
pub struct PostgresTransaction<'t> {
pgtransaction: sqlx::Transaction<'t, sqlx::Postgres>,
}
#[async_trait::async_trait]
impl<'t> EntityGatewayTransaction for PostgresTransaction<'t> {
fn gateway<'b>(&'b mut self) -> &'b mut dyn EntityGateway {
self
}
async fn commit(self: Box<Self>) -> Result<(), GatewayError> {
self.pgtransaction.commit().await?;
Ok(())
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct PostgresGateway { pub struct PostgresGateway {
pool: sqlx::Pool<sqlx::Postgres>, pool: sqlx::Pool<sqlx::Postgres>,
@ -539,6 +560,28 @@ async fn get_bank_meseta(conn: &mut sqlx::PgConnection, char_id: &CharacterEntit
#[async_trait::async_trait] #[async_trait::async_trait]
impl EntityGateway for PostgresGateway { impl EntityGateway for PostgresGateway {
async fn transaction(&'static mut self) -> Result<Box<dyn EntityGatewayTransaction + 'static>, GatewayError>
{
Ok(Box::new(PostgresTransaction {
pgtransaction: self.pool.begin().await?,
}))
}
async fn with_transaction<F, Fut, R, E>(&'static mut self, func: F) -> Result<R, E>
where
Fut: Future<Output = Result<(Box<dyn EntityGatewayTransaction>, R), E>> + Send,
F: FnOnce(Box<dyn EntityGatewayTransaction>) -> Fut + Send,
R: Send,
E: From<GatewayError>,
{
let mut transaction = Box::new(PostgresTransaction {
pgtransaction: self.pool.begin().await.map_err(|_| ()).unwrap()
});
let (mut transaction, result) = func(transaction).await.map_err(|_| ()).unwrap();
transaction.commit().await.map_err(|_| ()).unwrap();
Ok(result)
}
async fn create_user(&mut self, user: NewUserAccountEntity) -> Result<UserAccountEntity, GatewayError> { async fn create_user(&mut self, user: NewUserAccountEntity) -> Result<UserAccountEntity, GatewayError> {
create_user(&mut *self.pool.acquire().await?, user).await create_user(&mut *self.pool.acquire().await?, user).await
} }
@ -653,3 +696,118 @@ impl EntityGateway for PostgresGateway {
} }
#[async_trait::async_trait]
impl<'c> EntityGateway for PostgresTransaction<'c> {
async fn create_user(&mut self, user: NewUserAccountEntity) -> Result<UserAccountEntity, GatewayError> {
create_user(&mut *self.pgtransaction, user).await
}
async fn get_user_by_id(&mut self, id: UserAccountId) -> Result<UserAccountEntity, GatewayError> {
get_user_by_id(&mut *self.pgtransaction, id).await
}
async fn get_user_by_name(&mut self, username: String) -> Result<UserAccountEntity, GatewayError> {
get_user_by_name(&mut *self.pgtransaction, username).await
}
async fn save_user(&mut self, user: &UserAccountEntity) -> Result<(), GatewayError> {
save_user(&mut *self.pgtransaction, user).await
}
async fn create_user_settings(&mut self, settings: NewUserSettingsEntity) -> Result<UserSettingsEntity, GatewayError> {
create_user_settings(&mut *self.pgtransaction, settings).await
}
async fn get_user_settings_by_user(&mut self, user: &UserAccountEntity) -> Result<UserSettingsEntity, GatewayError> {
get_user_settings_by_user(&mut *self.pgtransaction, user).await
}
async fn save_user_settings(&mut self, settings: &UserSettingsEntity) -> Result<(), GatewayError> {
save_user_settings(&mut *self.pgtransaction, settings).await
}
async fn create_character(&mut self, char: NewCharacterEntity) -> Result<CharacterEntity, GatewayError> {
create_character(&mut *self.pgtransaction, char).await
}
async fn get_characters_by_user(&mut self, user: &UserAccountEntity) -> Result<[Option<CharacterEntity>; 4], GatewayError> {
get_characters_by_user(&mut *self.pgtransaction, user).await
}
async fn save_character(&mut self, char: &CharacterEntity) -> Result<(), GatewayError> {
save_character(&mut *self.pgtransaction, char).await
}
async fn get_guild_card_data_by_user(&mut self, user: &UserAccountEntity) -> Result<GuildCardDataEntity, GatewayError> {
Ok(GuildCardDataEntity {
id: GuildCardDataId(0),
user_id: user.id,
guildcard: guildcard::GuildCardData::default(),
})
}
async fn create_item(&mut self, item: NewItemEntity) -> Result<ItemEntity, GatewayError> {
create_item(&mut *self.pgtransaction, item).await
}
async fn add_item_note(&mut self, item_id: &ItemEntityId, item_note: ItemNote) -> Result<(), GatewayError> {
add_item_note(&mut *self.pgtransaction, item_id, item_note).await
}
async fn feed_mag(&mut self, mag_item_id: &ItemEntityId, tool_item_id: &ItemEntityId) -> Result<(), GatewayError> {
feed_mag(&mut *self.pgtransaction, mag_item_id, tool_item_id).await
}
async fn change_mag_owner(&mut self, mag_item_id: &ItemEntityId, character: &CharacterEntity) -> Result<(), GatewayError> {
change_mag_owner(&mut *self.pgtransaction, mag_item_id, character).await
}
async fn use_mag_cell(&mut self, mag_item_id: &ItemEntityId, mag_cell_id: &ItemEntityId) -> Result<(), GatewayError> {
use_mag_cell(&mut *self.pgtransaction, mag_item_id, mag_cell_id).await
}
async fn add_weapon_modifier(&mut self, item_id: &ItemEntityId, modifier: weapon::WeaponModifier) -> Result<(), GatewayError> {
add_weapon_modifier(&mut *self.pgtransaction, item_id, modifier).await
}
async fn get_character_inventory(&mut self, char_id: &CharacterEntityId) -> Result<InventoryEntity, GatewayError> {
get_character_inventory(&mut *self.pgtransaction, char_id).await
}
async fn get_character_bank(&mut self, char_id: &CharacterEntityId, bank_name: BankName) -> Result<BankEntity, GatewayError> {
get_character_bank(&mut *self.pgtransaction, char_id, bank_name).await
}
async fn set_character_inventory(&mut self, char_id: &CharacterEntityId, inventory: &InventoryEntity) -> Result<(), GatewayError> {
set_character_inventory(&mut *self.pgtransaction, char_id, inventory).await
}
async fn set_character_bank(&mut self, char_id: &CharacterEntityId, bank: &BankEntity, bank_name: BankName) -> Result<(), GatewayError> {
set_character_bank(&mut *self.pgtransaction, char_id, bank, bank_name).await
}
async fn get_character_equips(&mut self, char_id: &CharacterEntityId) -> Result<EquippedEntity, GatewayError> {
get_character_equips(&mut *self.pgtransaction, char_id).await
}
async fn set_character_equips(&mut self, char_id: &CharacterEntityId, equips: &EquippedEntity) -> Result<(), GatewayError> {
set_character_equips(&mut *self.pgtransaction, char_id, equips).await
}
async fn set_character_meseta(&mut self, char_id: &CharacterEntityId, meseta: Meseta) -> Result<(), GatewayError> {
set_character_meseta(&mut *self.pgtransaction, char_id, meseta).await
}
async fn get_character_meseta(&mut self, char_id: &CharacterEntityId) -> Result<Meseta, GatewayError> {
get_character_meseta(&mut *self.pgtransaction, char_id).await
}
async fn set_bank_meseta(&mut self, char_id: &CharacterEntityId, bank: BankName, meseta: Meseta) -> Result<(), GatewayError> {
set_bank_meseta(&mut *self.pgtransaction, char_id, bank, meseta).await
}
async fn get_bank_meseta(&mut self, char_id: &CharacterEntityId, bank: BankName) -> Result<Meseta, GatewayError> {
get_bank_meseta(&mut *self.pgtransaction, char_id, bank).await
}
}
Loading…
Cancel
Save