Skip to content

Commit

Permalink
put back transactions, fix error messages
Browse files Browse the repository at this point in the history
  • Loading branch information
braindigitalis committed Jul 17, 2024
1 parent 46f8b85 commit b5afc26
Show file tree
Hide file tree
Showing 9 changed files with 171 additions and 84 deletions.
77 changes: 72 additions & 5 deletions include/ssod/database.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,30 @@ namespace db {
* and push_back().
*/
struct resultset {

/**
* Row values
*/
std::vector<row> rows;

/**
* Error message of last query or an empty string on success
*/
std::string error;

/**
* Number of affected rows, if an UPDATE, DELETE, INSERT
*/
size_t affected_rows{};

/**
* Returns true if the query succeeded
* @return true if no error
*/
[[nodiscard]] inline bool ok() const {
return error.empty();
}

/**
* Get a row by index
* @param index row to retrieve
Expand All @@ -65,7 +76,7 @@ namespace db {

/**
* Get a row by index with range checking
* @param index row to rerieve
* @param index row to retrieve
* @return row
*/
[[nodiscard]] inline const row& at(size_t index) const {
Expand Down Expand Up @@ -106,16 +117,27 @@ namespace db {
return rows.end();
}

/**
* True if the recordset is empty
* @return true if empty
*/
[[nodiscard]] inline bool empty() const {
return rows.empty();
}

/**
* Number of rows in the recordset
* @return row count
*/
[[nodiscard]] inline size_t size() const {
return rows.size();
}
};


/**
* @brief A callback which happens when an asynchronous SQL query is completed
*/
using sql_query_callback = std::function<void(const resultset&)>;

/**
* @brief Possible parameter types for SQL parameters
Expand Down Expand Up @@ -179,6 +201,24 @@ namespace db {
*/
resultset query(const std::string &format, const paramlist &parameters = {});

/**
* @brief Run a mysql query asynchronously, with automatic escaping of parameters
* to prevent SQL injection. Call the callback on completion
*
* @param format Format string, where each parameter should be indicated by a ? symbol
* @param parameters Parameters to prepare into the query in place of the ?'s
* @param cb Callback to call on completion of the query. The callback will be passed
* the resultset as its parameter.
*
* The parameters given should be a vector of strings. You can instantiate this using "{}".
* The queries are cached as prepared statements and therefore do not need quote symbols
* to be placed around parameters in the query. These will be automatically added if required.
*
* @note If you can you should use co_query instead to avoid callback hell. co_query uses this
* internally, wrapping it with dpp::async<>.
*/
void query_callback(const std::string &format, const paramlist &parameters, const sql_query_callback& cb);

#ifdef DPP_CORO
/**
* @brief Run a mysql query, with automatic escaping of parameters to prevent SQL injection.
Expand Down Expand Up @@ -229,16 +269,18 @@ namespace db {
*
* @note This value is by any db::query() call. Take a copy!
* @return size_t Number of affected rows
* @deprecated This is not coroutine-safe and you should use resultset::affected_rows instead
*/
size_t affected_rows();
[[deprecated("Use resultset::affected_rows instead")]] size_t affected_rows();

/**
* @brief Returns the last error string.
*
* @note This value is by any db::query() call. Take a copy!
* @return const std::string& Error mesage
* @deprecated This is not coroutine-safe and you should use resultset::error instead
*/
const std::string& error();
[[deprecated("Use resultset::error instead")]] const std::string& error();

/**
* @brief Returns the size of the query cache
Expand Down Expand Up @@ -267,14 +309,39 @@ namespace db {
* will be forced to wait.
*
* @param closure The transactional code to execute.
* @param callback Callback to call when the transaction completes
*
* @note The closure should only ever execute queries using db::query(),
* it should NOT use async queries/co_query() as these cannot be executed
* atomically.
* Returning false from the closure, or throwing any exception at all
* will roll back the transaction, else it will be committed when the
* closure ends.
* @warning The coroutine will execute asynchronously in a different thread.
* You should use the callback to be notified when it completes. The resultset
* passed as the parameter will be empty.
*/
void transaction(std::function<bool()> closure);
void transaction(std::function<bool()> closure, sql_query_callback callback = {});

#ifdef DPP_CORO
/**
* @brief Start an SQL transaction in a coroutine that can be awaited.
* SQL transactions are atomic in nature, ALL other queries will be forced
* to wait. The transaction will be inserted into the queue to run as one atomic
* operation, meaning that db::co_query cannot disrupt it or insert queries in
* the middle of it, and db::query function calls that are not within the closure
* will be forced to wait.
*
* @param closure The transactional code to execute.
* @return Awaitable, returns an empty resultset on completion of transaction
*
* @note The closure should only ever execute queries using db::query(),
* it should NOT use async queries/co_query() as these cannot be executed
* atomically.
* Returning false from the closure, or throwing any exception at all
* will roll back the transaction, else it will be committed when the
* closure ends.
*/
dpp::async<resultset> co_transaction(std::function<bool()> closure);
#endif
};
2 changes: 1 addition & 1 deletion include/ssod/game_player.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ struct player {
long max_silver();
long max_mana();
long max_rations();
long max_crits();
long max_crits() const;

void tick_mana();

Expand Down
2 changes: 1 addition & 1 deletion src/campfire.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ dpp::task<void> campfire(const dpp::interaction_create_t& event, player p) {
event.reply(event.command.type == dpp::it_application_command ? dpp::ir_channel_message_with_source : dpp::ir_update_message, m.set_flags(dpp::m_ephemeral), [event, &bot, m](const auto& cc) {
if (cc.is_error()) {
bot.log(dpp::ll_error, "Internal error displaying campfire:\n```json\n" + cc.http_info.body + "\n```\nMessage:\n```json\n" + m.build_json() + "\n```");
event.reply(dpp::message("Internal error displaying campfire:\n```json\n" + cc.http_info.body + "\n```\nMessage:\n```json\n" + m.build_json() + "\n```").set_flags(dpp::m_ephemeral));
event.reply(dpp::message("Internal error displaying campfire:\nPlease report to developers via 'Get Help' button```").set_flags(dpp::m_ephemeral));
}
});
}
Expand Down
76 changes: 41 additions & 35 deletions src/combat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ dpp::task<void> continue_pvp_combat(const dpp::interaction_create_t& event, play
event.reply(event.command.type == dpp::it_component_button ? dpp::ir_update_message : dpp::ir_channel_message_with_source, m.set_flags(dpp::m_ephemeral), [event, m, p](const auto& cc) {
if (cc.is_error()) {
//bot.log(dpp::ll_error, "Internal error displaying PvP combat location " + std::to_string(p.paragraph) + ": " + cc.http_info.body);
event.reply(dpp::message("Internal error displaying Pvp combat location " + std::to_string(p.paragraph) + ":\n```json\n" + cc.http_info.body + "\n```\nMessage:\n```json\n" + m.build_json() + "\n```").set_flags(dpp::m_ephemeral));
event.reply(dpp::message("Internal error displaying Pvp combat location " + std::to_string(p.paragraph) + ":\nPlease report to developers via 'Get Help' button").set_flags(dpp::m_ephemeral));
}
});
co_return;
Expand Down Expand Up @@ -659,27 +659,30 @@ dpp::task<void> continue_combat(const dpp::interaction_create_t& event, player p

output << "__" << tr("COMBAT", event) << "__: **" << p.name << "** vs. **" << p.combatant.name << "**\n\n";

auto r = co_await db::co_query("SELECT * FROM criticals WHERE user_id = ?", {event.command.usr.id});
long banked{0};
bool critical{};
if (!r.empty()) {
long counter = atol(r[0].at("critical_counter"));
banked = atol(r[0].at("banked_criticals"));
if (banked > 0 && p.next_crit) {
co_await db::co_query("UPDATE criticals SET banked_criticals = banked_criticals - 1 WHERE user_id = ?", {event.command.usr.id});
critical = true;
p.next_crit = false;
banked--;
}
long next = 1000 + (p.get_level() * 4);
int percent = (double)counter / (double)next * 100.0f;
output << tr("CRITICAL_METER", event) << ": ";
for (int x = 0; x < 100; x += 10) {
output << (x < percent ? sprite::bar_green.get_mention() : sprite::bar_red.get_mention());
co_await db::co_transaction([event, &output, &p, &critical, &banked]() -> bool {
auto r = db::query("SELECT * FROM criticals WHERE user_id = ?", {event.command.usr.id});
if (!r.empty()) {
long counter = atol(r[0].at("critical_counter"));
banked = atol(r[0].at("banked_criticals"));
if (banked > 0 && p.next_crit) {
db::query("UPDATE criticals SET banked_criticals = banked_criticals - 1 WHERE user_id = ?", {event.command.usr.id});
critical = true;
p.next_crit = false;
banked--;
}
long next = 1000 + (p.get_level() * 4);
int percent = (double)counter / (double)next * 100.0f;
output << tr("CRITICAL_METER", event) << ": ";
for (int x = 0; x < 100; x += 10) {
output << (x < percent ? sprite::bar_green.get_mention() : sprite::bar_red.get_mention());
}
output << " (" + std::to_string(percent) + "%)\n";
output << tr("CRITICALS", event) << ": " << std::max(banked, 0L) << "/" << p.max_crits();
}
output << " (" + std::to_string(percent) + "%)\n";
output << tr("CRITICALS", event) << ": " << std::max(banked, 0L) << "/" << p.max_crits();
}
return true;
});

if (EStamina <= 0) {
output << tr("HES_DEAD_JIM", event) << "\n\n";
Expand Down Expand Up @@ -755,22 +758,25 @@ dpp::task<void> continue_combat(const dpp::interaction_create_t& event, player p
} else {
output << "__**" << tr("YOUHITENEMY", event) << "**__.";

/* If you hit enemy, critical meter ticks up based on your luck.
* If critical meter reaches max, you gain a critical hit, that you
* can spend on an overwhelming attack.
*/
co_await db::co_query("INSERT INTO criticals (user_id, critical_counter, banked_criticals) VALUES(?,1,0) ON DUPLICATE KEY UPDATE critical_counter = critical_counter + ?", {event.command.usr.id, p.luck + 1});
auto r = co_await db::co_query("SELECT * FROM criticals WHERE user_id = ?", {event.command.usr.id});
long counter = atol(r[0].at("critical_counter"));
if (counter > 1000 + (p.get_level() * 4)) {
/* User gains a new banked critical */
long new_banked = atol(r[0].at("banked_criticals")) + 1;
if (new_banked <= p.max_crits()) {
co_await db::co_query("UPDATE criticals SET critical_counter = 0, banked_criticals = ? WHERE user_id = ?", {new_banked, event.command.usr.id});
} else {
co_await db::co_query("UPDATE criticals SET critical_counter = 0 WHERE user_id = ?", {event.command.usr.id});
co_await db::co_transaction([p, event]() -> bool {
/* If you hit enemy, critical meter ticks up based on your luck.
* If critical meter reaches max, you gain a critical hit, that you
* can spend on an overwhelming attack.
*/
db::query("INSERT INTO criticals (user_id, critical_counter, banked_criticals) VALUES(?,1,0) ON DUPLICATE KEY UPDATE critical_counter = critical_counter + ?", {event.command.usr.id, p.luck + 1});
auto r = db::query("SELECT * FROM criticals WHERE user_id = ?", {event.command.usr.id});
long counter = atol(r[0].at("critical_counter"));
if (counter > 1000 + (p.get_level() * 4)) {
/* User gains a new banked critical */
long new_banked = atol(r[0].at("banked_criticals")) + 1;
if (new_banked <= p.max_crits()) {
db::query("UPDATE criticals SET critical_counter = 0, banked_criticals = ? WHERE user_id = ?", {new_banked, event.command.usr.id});
} else {
db::query("UPDATE criticals SET critical_counter = 0 WHERE user_id = ?", {event.command.usr.id});
}
}
}
return true;
});

if (EStance == DEFENSIVE) {
output << tr("ATKBONUS", event);
Expand Down Expand Up @@ -1030,7 +1036,7 @@ dpp::task<void> continue_combat(const dpp::interaction_create_t& event, player p
event.reply(event.command.type == dpp::it_component_button ? dpp::ir_update_message : dpp::ir_channel_message_with_source, m.set_flags(dpp::m_ephemeral), [event, &bot, m, p](const auto& cc) {
if (cc.is_error()) {
bot.log(dpp::ll_error, "Internal error displaying PvE combat " + std::to_string(p.after_fragment) + " location " + std::to_string(p.paragraph) + ": " + cc.http_info.body + " -- " + m.build_json());
event.reply(dpp::message("Internal error displaying PvE combat " + std::to_string(p.after_fragment) + " location " + std::to_string(p.paragraph) + ":\n```json\n" + cc.http_info.body + "\n```\nMessage:\n```json\n" + m.build_json() + "\n```").set_flags(dpp::m_ephemeral));
event.reply(dpp::message("Internal error displaying PvE combat " + std::to_string(p.after_fragment) + " location " + std::to_string(p.paragraph) + ":\nPlease report to developers via 'Get Help' button").set_flags(dpp::m_ephemeral));
}
});

Expand Down
31 changes: 16 additions & 15 deletions src/database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,6 @@ namespace db {
*/
std::map<std::string, cached_query> cached_queries;

using sql_query_callback = std::function<void(const resultset&)>;

std::condition_variable sql_worker_cv;

/**
Expand All @@ -129,10 +127,8 @@ namespace db {
const double expiry;
};

#ifdef DPP_CORO
std::queue<cached_query_results> sql_query_queue;
std::mutex query_queue_mtx;
#endif

template<class> inline constexpr bool always_false_v = false;

Expand Down Expand Up @@ -220,7 +216,6 @@ namespace db {
return unsafe_connect(host, user, pass, db, port, socket);
}

#ifdef DPP_CORO
void query_callback(const std::string &format, const paramlist &parameters, const sql_query_callback& cb) {
{
std::unique_lock<std::mutex> queue_lock(query_queue_mtx);
Expand All @@ -229,6 +224,7 @@ namespace db {
sql_worker_cv.notify_one();
}

#ifdef DPP_CORO
dpp::async<resultset> co_query(const std::string &format, const paramlist &parameters) {
return dpp::async<resultset>{ [format, parameters] <typename C> (C &&cc) { return query_callback(format, parameters, std::forward<C>(cc)); }};
}
Expand All @@ -241,7 +237,6 @@ namespace db {
creator->log(dpp::ll_critical, fmt::format("Database connection error connecting to {}: {}", dbconf["database"], mysql_error(&connection)));
exit(2);
}
#ifdef DPP_CORO
std::thread([&]() {
dpp::utility::set_thread_name("sql/coro");
while (true) {
Expand All @@ -257,12 +252,6 @@ namespace db {
qr = std::move(sql_query_queue.front());
sql_query_queue.pop();
}
if (!qr.format.empty()) {
auto results = query(qr.format, qr.parameters);
if (qr.callback) {
qr.callback(results);
}
}
/**
* If a transaction is waiting to be executed, fit it atomically into
* the queue here. The holds_transaction_lock is a thread_local variable
Expand All @@ -277,9 +266,15 @@ namespace db {
holds_transaction_lock = false;
transaction_function = {};
}
resultset results{};
if (!qr.format.empty()) {
results = query(qr.format, qr.parameters);
}
if (qr.callback) {
qr.callback(results);
}
}
}).detach();
#endif
creator->log(dpp::ll_info, fmt::format("Connected to database: {}", dbconf["database"]));
}

Expand Down Expand Up @@ -309,7 +304,7 @@ namespace db {
return raw_query("ROLLBACK");
}

void transaction(std::function<bool()> closure) {
void transaction(std::function<bool()> closure, sql_query_callback callback) {

if (transaction_in_progress) {
throw std::runtime_error("Transaction already in progress");
Expand Down Expand Up @@ -348,9 +343,15 @@ namespace db {
* query to signal the condition variable and make the SQL queue advance.
*/
transaction_in_progress = true;
query_callback("", {}, [](auto){});
query_callback("", {}, callback);
}

#ifdef DPP_CORO
dpp::async<resultset> co_transaction(std::function<bool()> closure) {
return dpp::async<resultset>{ [closure] <typename C> (C &&cc) { return transaction(closure, std::forward<C>(cc)); }};
}
#endif

bool close() {
std::lock_guard<std::mutex> db_lock(db_mutex);
mysql_close(&connection);
Expand Down
Loading

0 comments on commit b5afc26

Please sign in to comment.