use std::convert::{From, TryFrom, Into}; use futures::TryStreamExt; use async_std::stream::StreamExt; use libpso::character::guildcard; use crate::entity::account::*; use crate::entity::character::*; use crate::entity::gateway::{EntityGateway, GatewayError}; use crate::entity::item::*; use super::models::*; use sqlx::postgres::PgPoolOptions; mod embedded { use refinery::embed_migrations; embed_migrations!("src/entity/gateway/postgres/migrations"); } #[derive(Clone)] pub struct PostgresGateway { pool: sqlx::Pool, } impl PostgresGateway { pub fn new(host: &str, dbname: &str, username: &str, password: &str) -> PostgresGateway { let mut conn = refinery::config::Config::new(refinery::config::ConfigDbType::Postgres) .set_db_host(host) .set_db_user(username) .set_db_pass(password) .set_db_name(dbname); embedded::migrations::runner().run(&mut conn).unwrap(); let pool = async_std::task::block_on(async move { let pool = PgPoolOptions::new() .max_connections(5) .connect(&format!("postgresql://{}:{}@{}:5432/{}", username, password, host, dbname)).await.unwrap(); pool }); PostgresGateway { pool, } } async fn apply_item_modifications(&self, item: ItemEntity) -> ItemEntity { let ItemEntity {id, item} = item; let item = match item { ItemDetail::Weapon(mut weapon) => { let q = r#"select weapon, modifier from weapon_modifier where weapon = $1 order by created_at"#; let weapon_modifiers = sqlx::query_as::<_, PgWeaponModifier>(q) .bind(id.0 as i32) .fetch(&self.pool); weapon_modifiers.for_each(|modifier| { if let Ok(modifier) = modifier { weapon.apply_modifier(&modifier.modifier); } }).await; ItemDetail::Weapon(weapon) }, ItemDetail::Mag(mut mag) => { let q = r#"select mag, modifier, item.item -> 'Tool' as feed, item2.item -> 'Tool' as cell from mag_modifier left join item on item.id = cast (modifier ->> 'FeedMag' as integer) left join item as item2 on item2.id = cast (modifier ->> 'MagCell' as integer) where mag = $1 order by created_at"#; let mag_modifiers = sqlx::query_as::<_, PgMagModifierWithParameters>(q) .bind(id.0 as i32) .fetch(&self.pool); mag_modifiers.for_each(|modifier| { let PgMagModifierWithParameters {modifier, feed, cell, ..} = modifier.unwrap(); let modifier: mag::MagModifier = modifier.0.into(); match modifier { mag::MagModifier::FeedMag{..} => { mag.feed(feed.unwrap().tool) }, mag::MagModifier::BankMag => { mag.bank() }, mag::MagModifier::MagCell(_) => { mag.apply_mag_cell(mag::MagCell::try_from(Into::::into(cell.unwrap().0).tool).unwrap()) }, mag::MagModifier::OwnerChange(class, section_id) => { mag.change_owner(class, section_id) }, } }).await; ItemDetail::Mag(mag) }, item => item }; ItemEntity { id, item, } } } #[async_trait::async_trait] impl EntityGateway for PostgresGateway { async fn create_user(&mut self, user: NewUserAccountEntity) -> Result { let new_user = sqlx::query_as::<_, PgUserAccount>("insert into user_accounts (email, username, password, activated) values ($1, $2, $3, $4) returning *;") .bind(user.email) .bind(user.username) .bind(user.password) .bind(user.activated) .fetch_one(&self.pool).await?; Ok(new_user.into()) } async fn get_user_by_id(&self, id: UserAccountId) -> Result { let user = sqlx::query_as::<_, PgUserAccount>("select * from user_accounts where id = $1") .bind(id.0) .fetch_one(&self.pool).await?; Ok(user.into()) } async fn get_user_by_name(&self, username: String) -> Result { let user = sqlx::query_as::<_, PgUserAccount>("select * from user_accounts where username = $1") .bind(username) .fetch_one(&self.pool).await?; Ok(user.into()) } async fn save_user(&mut self, user: &UserAccountEntity) -> Result<(), GatewayError> { sqlx::query("UPDATE user_accounts set username=$1, password=$2, banned=$3, muted=$4, flags=$5 where id=$6") .bind(&user.username) .bind(&user.password) .bind(&user.banned_until) .bind(&user.muted_until) .bind(&user.flags) .bind(&user.id.0) .execute(&self.pool).await?; Ok(()) } async fn create_user_settings(&mut self, settings: NewUserSettingsEntity) -> Result { let new_settings = sqlx::query_as::<_, PgUserSettings>("insert into user_settings (user_account, blocked_users, key_config, joystick_config, option_flags, shortcuts, symbol_chats, team_name) values ($1, $2, $3, $4, $5, $6, $7, $8) returning *;") .bind(settings.user_id.0) .bind(settings.settings.blocked_users.iter().copied().flat_map(|i| i.to_le_bytes().to_vec()).collect::>()) .bind(settings.settings.key_config.to_vec()) .bind(settings.settings.joystick_config.to_vec()) .bind(settings.settings.option_flags as i32) .bind(settings.settings.shortcuts.to_vec()) .bind(settings.settings.symbol_chats.to_vec()) .bind(settings.settings.team_name.iter().copied().flat_map(|i| i.to_le_bytes().to_vec()).collect::>()) .fetch_one(&self.pool).await?; Ok(new_settings.into()) } async fn get_user_settings_by_user(&self, user: &UserAccountEntity) -> Result { let settings = sqlx::query_as::<_, PgUserSettings>("select * from user_settings where user_account = $1") .bind(user.id.0) .fetch_one(&self.pool).await?; Ok(settings.into()) } async fn save_user_settings(&mut self, settings: &UserSettingsEntity) -> Result<(), GatewayError> { sqlx::query("update user_settings set blocked_users=$1, key_config=$2, joystick_config=$3, option_flags=$4, shortcuts=$5, symbol_chats=$6, team_name=$7 where id=$8") .bind(settings.settings.blocked_users.iter().copied().flat_map(|i| i.to_le_bytes().to_vec()).collect::>()) .bind(&settings.settings.key_config.to_vec()) .bind(&settings.settings.joystick_config.to_vec()) .bind(&settings.settings.option_flags) .bind(&settings.settings.shortcuts.to_vec()) .bind(&settings.settings.symbol_chats.to_vec()) .bind(settings.settings.team_name.iter().copied().flat_map(|i| i.to_le_bytes().to_vec()).collect::>()) .bind(&settings.id.0) .fetch_one(&self.pool).await?; Ok(()) } async fn create_character(&mut self, char: NewCharacterEntity) -> Result { let q = r#"insert into player_character (user_account, slot, name, exp, class, section_id, costume, skin, face, head, hair, hair_r, hair_g, hair_b, prop_x, prop_y, techs, config, infoboard, guildcard, power, mind, def, evade, luck, hp, tp, tech_menu, option_flags) values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31) returning *;"#; let character = sqlx::query_as::<_, PgCharacter>(q) .bind(char.user_id.0) .bind(char.slot as i16) .bind(char.name) .bind(char.exp as i32) .bind(char.char_class.to_string()) .bind(char.section_id.to_string()) .bind(char.appearance.costume as i16) .bind(char.appearance.skin as i16) .bind(char.appearance.face as i16) .bind(char.appearance.head as i16) .bind(char.appearance.hair as i16) .bind(char.appearance.hair_r as i16) .bind(char.appearance.hair_g as i16) .bind(char.appearance.hair_b as i16) .bind(char.appearance.prop_x) .bind(char.appearance.prop_y) .bind(&char.techs.as_bytes().to_vec()) .bind(&char.config.as_bytes().to_vec()) .bind(String::from_utf16_lossy(&char.info_board.board).trim_matches(char::from(0))) .bind(char.guildcard.description) .bind(char.materials.power as i16) .bind(char.materials.mind as i16) .bind(char.materials.def as i16) .bind(char.materials.evade as i16) .bind(char.materials.luck as i16) .bind(char.materials.hp as i16) .bind(char.materials.tp as i16) .bind(char.tech_menu.tech_menu.to_vec()) .bind(char.option_flags as i32) .fetch_one(&self.pool).await?; Ok(character.into()) } async fn get_characters_by_user(&self, user: &UserAccountEntity) -> Result<[Option; 4], GatewayError> { let mut stream = sqlx::query_as::<_, PgCharacter>("select * from player_character where user_account = $1 and slot < 4 order by slot") .bind(user.id.0) .fetch(&self.pool); const NONE: Option = None; let mut result = [NONE; 4]; while let Some(character) = stream.try_next().await? { let index = character.slot as usize; result[index] = Some(character.into()) } Ok(result) } async fn save_character(&mut self, char: &CharacterEntity) -> Result<(), GatewayError> { let q = r#"update player_character set user_account=$1, slot=$2, name=$3, exp=$4, class=$5, section_id=$6, costume=$7, skin=$8, face=$9, head=$10, hair=$11, hair_r=$12, hair_g=$13, hair_b=$14, prop_x=$15, prop_y=$16, techs=$17, config=$18, infoboard=$19, guildcard=$20, power=$21, mind=$22, def=$23, evade=$24, luck=$25, hp=$26, tp=$27, tech_menu=$28, option_flags=$29 where id=$32;"#; sqlx::query(q) .bind(char.user_id.0) .bind(char.slot as i16) .bind(&char.name) .bind(char.exp as i32) .bind(char.char_class.to_string()) .bind(char.section_id.to_string()) .bind(char.appearance.costume as i16) .bind(char.appearance.skin as i16) .bind(char.appearance.face as i16) .bind(char.appearance.head as i16) .bind(char.appearance.hair as i16) .bind(char.appearance.hair_r as i16) .bind(char.appearance.hair_g as i16) .bind(char.appearance.hair_b as i16) .bind(char.appearance.prop_x) .bind(char.appearance.prop_y) .bind(&char.techs.as_bytes().to_vec()) .bind(&char.config.as_bytes().to_vec()) .bind(String::from_utf16_lossy(&char.info_board.board).trim_matches(char::from(0))) .bind(&char.guildcard.description) .bind(char.materials.power as i16) .bind(char.materials.mind as i16) .bind(char.materials.def as i16) .bind(char.materials.evade as i16) .bind(char.materials.luck as i16) .bind(char.materials.hp as i16) .bind(char.materials.tp as i16) .bind(char.tech_menu.tech_menu.to_vec()) .bind(char.option_flags as i32) .bind(char.id.0 as i32) .execute(&self.pool).await?; Ok(()) } async fn get_guild_card_data_by_user(&self, user: &UserAccountEntity) -> Result { Ok(GuildCardDataEntity { id: GuildCardDataId(0), user_id: user.id, guildcard: guildcard::GuildCardData::default(), }) } async fn create_item(&mut self, item: NewItemEntity) -> Result { let mut tx = self.pool.begin().await?; let new_item = sqlx::query_as::<_, PgItem>("insert into item (item) values ($1) returning *;") .bind(sqlx::types::Json(PgItemDetail::from(item.item))) .fetch_one(&mut tx).await?; tx.commit().await?; Ok(ItemEntity { id: ItemEntityId(new_item.id as u32), item: new_item.item.0.into(), }) } async fn add_item_note(&mut self, item_id: &ItemEntityId, item_note: ItemNote) -> Result<(), GatewayError> { sqlx::query("insert into item_note(item, note) values ($1, $2)") .bind(item_id.0) .bind(sqlx::types::Json(PgItemNoteDetail::from(item_note))) .execute(&self.pool).await?; Ok(()) /* let mut tx = self.pool.begin().await?; if let ItemLocation::Inventory{slot, ..} = &item_location { sqlx::query("delete from inventory_slot where item = $1") .bind(item_id.0 as i32) .execute(&mut tx).await?; sqlx::query("insert into inventory_slot (item, slot) values ($1, $2)") .bind(item_id.0) .bind(*slot as i32) .execute(&mut tx).await?; sqlx::query(r#"insert into item_location (item, location) select $1, $2 where (select jsonb_object_keys(location) from item_location where item=$1 order by created_at desc limit 1) != 'Inventory'"#) .bind(item_id.0) .bind(sqlx::types::Json(PgItemLocationDetail::from(item_location))) .execute(&mut tx).await?; } else { sqlx::query("insert into item_location (item, location) values ($1, $2)") .bind(item_id.0) .bind(sqlx::types::Json(PgItemLocationDetail::from(item_location))) .execute(&mut tx).await?; } tx.commit().await?; Ok(()) */ } async fn feed_mag(&mut self, mag_item_id: &ItemEntityId, tool_item_id: &ItemEntityId) -> Result<(), GatewayError> { sqlx::query("insert into mag_modifier (mag, modifier) values ($1, $2);") .bind(mag_item_id.0) .bind(sqlx::types::Json(PgMagModifierDetail::from(mag::MagModifier::FeedMag{food: *tool_item_id}))) .execute(&self.pool).await?; Ok(()) } async fn change_mag_owner(&mut self, mag_item_id: &ItemEntityId, character: &CharacterEntity) -> Result<(), GatewayError> { sqlx::query("insert into mag_modifier (mag, modifier) values ($1, $2);") .bind(mag_item_id.0) .bind(sqlx::types::Json(PgMagModifierDetail::from(mag::MagModifier::OwnerChange(character.char_class, character.section_id)))) .execute(&self.pool).await?; Ok(()) } async fn use_mag_cell(&mut self, mag_item_id: &ItemEntityId, mag_cell_id: &ItemEntityId) -> Result<(), GatewayError> { sqlx::query("insert into mag_modifier (mag, modifier) values ($1, $2);") .bind(mag_item_id.0) .bind(sqlx::types::Json(PgMagModifierDetail::from(mag::MagModifier::MagCell(*mag_cell_id)))) .execute(&self.pool).await?; Ok(()) } async fn add_weapon_modifier(&mut self, item_id: &ItemEntityId, modifier: weapon::WeaponModifier) -> Result<(), GatewayError> { sqlx::query("insert into weapon_modifier (weapon, modifier) values ($1, $2);") .bind(item_id.0) .bind(sqlx::types::Json(modifier)) .execute(&self.pool).await?; Ok(()) } /* async fn get_items_by_character(&self, char_id: &CharacterEntityId) -> Result, GatewayError> { let q = r#"select * from ( select distinct on (item_location.item) item.id, case when item_location.location -> 'Inventory' is not null then jsonb_set(item_location.location, '{Inventory,slot}', inventory_slot.slot::text::jsonb) else item_location.location end, item.item from item_location join item on item.id = item_location.item join inventory_slot on inventory_slot.item = item.id order by item_location.item, item_location.created_at desc ) as i where cast (location -> 'Inventory' ->> 'character_id' as integer) = $1 or cast (location -> 'Bank' ->> 'character_id' as integer) = $1 "#; let items = sqlx::query_as::<_, PgItemWithLocation>(q) .bind(char_id.0) .fetch(&self.pool); Ok(join_all(items .filter_map(|item: Result| { let item = item.ok()?; Some(ItemEntity { id: ItemEntityId(item.id as u32), item: item.item.0.into(), location: item.location.0.into() }) }) .map(|item: ItemEntity| { self.apply_item_modifications(item) }) .collect::>() .await ).await) } */ async fn get_character_inventory(&mut self, char_id: &CharacterEntityId) -> Result { let inventory = sqlx::query_as::<_, PgInventoryEntity>("select * from inventory where pchar = $1") .bind(char_id.0) .fetch_one(&self.pool).await?; // TODO: inefficient let mut real_inventory = Vec::new(); for inv_item in inventory.items.0.into_iter() { match inv_item { PgInventoryItemEntity::Individual(item) => { let entity = sqlx::query_as::<_, PgItemEntity>("select item.id, item.item from item where id = $1") .bind(item) .fetch_one(&self.pool).await .map(|item| item.into()) .map(|item| self.apply_item_modifications(item))? .await; real_inventory.push(InventoryItemEntity::Individual(entity)); }, PgInventoryItemEntity::Stacked(items) => { let mut stacked_item = Vec::new(); for s_item in items { stacked_item.push(sqlx::query_as::<_, PgItemEntity>("select item.id, item.item from item where id = $1") .bind(s_item) .fetch_one(&self.pool).await .map(|item| item.into()) .map(|item| self.apply_item_modifications(item))? .await) } real_inventory.push(InventoryItemEntity::Stacked(stacked_item)); } } } Ok(InventoryEntity::new(real_inventory)) } async fn get_character_bank(&mut self, char_id: &CharacterEntityId, bank_name: BankName) -> Result { let bank = sqlx::query_as::<_, PgInventoryEntity>("select * from bank where pchar = $1 and name = $2") .bind(char_id.0) .bind(bank_name.0) .fetch_one(&self.pool).await?; // TODO: inefficient let mut real_bank = Vec::new(); for bank_item in bank.items.0.into_iter() { match bank_item { PgInventoryItemEntity::Individual(item) => { let entity = sqlx::query_as::<_, PgItemEntity>("select item.id, item.item from item where id = $1") .bind(item) .fetch_one(&self.pool).await .map(|item| item.into()) .map(|item| self.apply_item_modifications(item))? .await; real_bank.push(BankItemEntity::Individual(entity)); }, PgInventoryItemEntity::Stacked(items) => { let mut stacked_item = Vec::new(); for s_item in items { stacked_item.push(sqlx::query_as::<_, PgItemEntity>("select item.id, item.item from item where id = $1") .bind(s_item) .fetch_one(&self.pool).await .map(|item| item.into()) .map(|item| self.apply_item_modifications(item))? .await) } real_bank.push(BankItemEntity::Stacked(stacked_item)); } } } Ok(BankEntity::new(real_bank)) } async fn set_character_inventory(&mut self, char_id: &CharacterEntityId, inventory: &InventoryEntity) -> Result<(), GatewayError> { let inventory = inventory.items.iter() .map(|item| { match item { InventoryItemEntity::Individual(item) => { PgInventoryItemEntity::Individual(item.id.0 as i32) }, InventoryItemEntity::Stacked(items) => { PgInventoryItemEntity::Stacked(items.iter().map(|i| i.id.0 as i32).collect()) }, } }) .collect::>(); sqlx::query("insert into inventory (pchar, items) values ($1, $2) on conflict (pchar) do update set items = $2") .bind(char_id.0) .bind(sqlx::types::Json(inventory)) .execute(&self.pool) .await?; Ok(()) } async fn set_character_bank(&mut self, char_id: &CharacterEntityId, bank: &BankEntity, bank_name: BankName) -> Result<(), GatewayError> { let bank = bank.items.iter() .map(|item| { match item { BankItemEntity::Individual(item) => { PgInventoryItemEntity::Individual(item.id.0 as i32) }, BankItemEntity::Stacked(items) => { PgInventoryItemEntity::Stacked(items.iter().map(|i| i.id.0 as i32).collect()) }, } }) .collect::>(); sqlx::query("insert into bank (pchar, items, name) values ($1, $2, $3) on conflict (pchar, name) do update set items = $2") .bind(char_id.0) .bind(sqlx::types::Json(bank)) .bind(bank_name.0) .execute(&self.pool) .await?; Ok(()) } async fn get_character_equips(&mut self, char_id: &CharacterEntityId) -> Result { let equips = sqlx::query_as::<_, PgEquipped>("select * from equipped where pchar = $1") .bind(char_id.0) .fetch_one(&self.pool) .await?; Ok(equips.into()) } async fn set_character_equips(&mut self, char_id: &CharacterEntityId, equips: &EquippedEntity) -> Result<(), GatewayError> { sqlx::query(r#"insert into equipped (pchar, weapon, armor, shield, unit0, unit1, unit2, unit3, mag) values ($1, $2, $3, $4, $5, $6, $7, $8, $9) on conflict (pchar) do update set weapon=$2, armor=$3, shield=$4, unit0=$5, unit1=$6, unit2=$7, unit3=$8, mag=$9"#) .bind(char_id.0) .bind(equips.weapon.map(|i| i.0 as i32)) .bind(equips.armor.map(|i| i.0 as i32)) .bind(equips.shield.map(|i| i.0 as i32)) .bind(equips.unit[0].map(|i| i.0 as i32)) .bind(equips.unit[1].map(|i| i.0 as i32)) .bind(equips.unit[2].map(|i| i.0 as i32)) .bind(equips.unit[3].map(|i| i.0 as i32)) .bind(equips.mag.map(|i| i.0 as i32)) .execute(&self.pool) .await?; Ok(()) } async fn set_character_meseta(&mut self, char_id: &CharacterEntityId, meseta: Meseta) -> Result<(), GatewayError> { sqlx::query("insert into character_meseta values ($1, $2) on conflict (pchar) do update set items = $2") .bind(char_id.0) .bind(meseta.0 as i32) .execute(&self.pool) .await?; Ok(()) } async fn get_character_meseta(&mut self, char_id: &CharacterEntityId) -> Result { #[derive(sqlx::FromRow)] struct PgMeseta(i32); let meseta = sqlx::query_as::<_, PgMeseta>(r#"select meseta from character_meseta where id = $1"#) .bind(char_id.0) .fetch_one(&self.pool) .await?; Ok(Meseta(meseta.0 as u32)) } async fn set_bank_meseta(&mut self, char_id: &CharacterEntityId, bank: BankName, meseta: Meseta) -> Result<(), GatewayError> { sqlx::query("insert into bank_meseta values ($1, $2, $3) on conflict (pchar, bank) do update set items = $2") .bind(char_id.0) .bind(meseta.0 as i32) .bind(bank.0) .execute(&self.pool) .await?; Ok(()) } async fn get_bank_meseta(&mut self, char_id: &CharacterEntityId, bank: BankName) -> Result { #[derive(sqlx::FromRow)] struct PgMeseta(i32); let meseta = sqlx::query_as::<_, PgMeseta>(r#"select meseta from character_meseta where id = $1 and bank = $2"#) .bind(char_id.0) .bind(bank.0) .fetch_one(&self.pool) .await?; Ok(Meseta(meseta.0 as u32)) } }