Skip to content

Commit

Permalink
Rebuild the download content queue on every server restart
Browse files Browse the repository at this point in the history
  • Loading branch information
yazz committed Sep 26, 2023
1 parent 1fa0f7d commit 9368530
Showing 1 changed file with 12 additions and 10 deletions.
22 changes: 12 additions & 10 deletions src/yazz_helper_module.js
Original file line number Diff line number Diff line change
Expand Up @@ -609,9 +609,6 @@ module.exports = {
"INSERT OR REPLACE INTO table_versions (table_name , version_number) VALUES ('level_1_ipfs_hash_metadata',1);",
"CREATE INDEX IF NOT EXISTS ipfs_hashes_idx ON level_1_ipfs_hash_metadata (ipfs_hash);",

"CREATE TABLE IF NOT EXISTS level_1_download_content_queue (ipfs_hash TEXT, master_time_millis INTEGER, lcreated_time_millis INTEGER, status TEXT, server TEXT, read_from TEXT, time_read_millis INTEGER , debug_master_time_millis TEXT, UNIQUE(ipfs_hash));",
"INSERT OR REPLACE INTO table_versions (table_name , version_number) VALUES ('level_1_download_content_queue',1);",


// LEVEL 2
// This can all be derived from the IPFS content data. It is useful to keep around, but can be deleted
Expand Down Expand Up @@ -688,7 +685,12 @@ module.exports = {

"DROP TABLE IF EXISTS level_8_system_process_errors;",
"CREATE TABLE IF NOT EXISTS level_8_system_process_errors (yazz_instance_id TEXT, id TEXT, timestamp INTEGER, process TEXT, status TEXT , base_component_id TEXT, event TEXT, system_code_id TEXT, args TEXT, error_message TEXT);",
"INSERT OR REPLACE INTO table_versions (table_name , version_number) VALUES ('level_8_system_process_errors',1);"
"INSERT OR REPLACE INTO table_versions (table_name , version_number) VALUES ('level_8_system_process_errors',1);",

"DROP TABLE IF EXISTS level_8_download_content_queue;",
"CREATE TABLE IF NOT EXISTS level_8_download_content_queue (ipfs_hash TEXT, master_time_millis INTEGER, lcreated_time_millis INTEGER, status TEXT, server TEXT, read_from TEXT, time_read_millis INTEGER , debug_master_time_millis TEXT, UNIQUE(ipfs_hash));",
"INSERT OR REPLACE INTO table_versions (table_name , version_number) VALUES ('level_8_download_content_queue',1);"

])
await async.map(
sqlTorun
Expand Down Expand Up @@ -2382,7 +2384,7 @@ module.exports = {
//
// get content from master server
//
let ipfsDownloadQueueSize = await mm.getQuickSqlOneRow(thisDb, "select count(ipfs_hash) as queue_count from level_1_download_content_queue where STATUS = 'QUEUED'")
let ipfsDownloadQueueSize = await mm.getQuickSqlOneRow(thisDb, "select count(ipfs_hash) as queue_count from level_8_download_content_queue where STATUS = 'QUEUED'")
if (ipfsDownloadQueueSize.queue_count == 0) {
let maxMasterTimeMillis = await mm.getQuickSqlOneRow(thisDb, "select max(master_time_millis) as max_master_time_millis from level_1_ipfs_hash_metadata")
let outstandingRequests = await mm.sendQuickJsonGetRequest(
Expand All @@ -2396,12 +2398,12 @@ module.exports = {
console.log("hash record to add to queue: " + JSON.stringify(hashRecord,null, 2))
let recordAlreadyInQueue = await mm.getQuickSqlOneRow(
thisDb,
"select ipfs_hash from level_1_download_content_queue where ipfs_hash = ?",
"select ipfs_hash from level_8_download_content_queue where ipfs_hash = ?",
[hashRecord.ipfs_hash])
if (recordAlreadyInQueue == null) {
await mm.executeQuickSql(
thisDb,
"insert into level_1_download_content_queue ( ipfs_hash , master_time_millis , status , debug_master_time_millis ) values ( ? , ? , ? , ?)",
"insert into level_8_download_content_queue ( ipfs_hash , master_time_millis , status , debug_master_time_millis ) values ( ? , ? , ? , ?)",
[hashRecord.ipfs_hash, hashRecord.local_time_millis, "QUEUED", mm.msToTime(hashRecord.local_time_millis)]
)
} else {
Expand All @@ -2417,7 +2419,7 @@ module.exports = {
if (ipfsDownloadQueueSize.queue_count != 0) {
let nextIpfsQueueRecord = await mm.getQuickSqlOneRow(
thisDb,
"select ipfs_hash, master_time_millis from level_1_download_content_queue where status = ? order by master_time_millis asc limit 1",
"select ipfs_hash, master_time_millis from level_8_download_content_queue where status = ? order by master_time_millis asc limit 1",
["QUEUED"])
if (nextIpfsQueueRecord) {
let ipfsContent = await mm.getContentFromMaster(thisDb, nextIpfsQueueRecord.ipfs_hash)
Expand Down Expand Up @@ -2462,13 +2464,13 @@ module.exports = {
[nextIpfsQueueRecord.master_time_millis, nextIpfsQueueRecord.ipfs_hash])
await mm.executeQuickSql(
thisDb,
"update level_1_download_content_queue set status = ? where ipfs_hash = ?",
"update level_8_download_content_queue set status = ? where ipfs_hash = ?",
["DONE", nextIpfsQueueRecord.ipfs_hash]
)
} else {
await mm.executeQuickSql(
thisDb,
"update level_1_download_content_queue set status = ? where ipfs_hash = ?",
"update level_8_download_content_queue set status = ? where ipfs_hash = ?",
["ERROR", nextIpfsQueueRecord.ipfs_hash]
)
}
Expand Down

0 comments on commit 9368530

Please sign in to comment.