Skip to content

Commit

Permalink
insert download metadata tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid committed Dec 1, 2023
1 parent ec74b61 commit 2776c25
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 26 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions nft_ingester2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ publish = { workspace = true }
anyhow = { workspace = true }
async-stream = { workspace = true }
atty = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true, features = ["cargo", "derive"] }
digital_asset_types = { workspace = true }
futures = { workspace = true }
hyper = { workspace = true, features = ["server"] }
json5 = { workspace = true }
Expand All @@ -20,7 +22,10 @@ opentelemetry_sdk = { workspace = true, features = ["trace"] }
program_transformers = { workspace = true }
prometheus = { workspace = true }
redis = { workspace = true, features = ["tokio-comp", "tokio-native-tls-comp"] }
rust-crypto = { workspace = true }
sea-orm = { workspace = true, features = ["sqlx-postgres"] }
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
solana-sdk = { workspace = true }
sqlx = { workspace = true, features = ["runtime-tokio-rustls", "postgres"] }
Expand Down
6 changes: 4 additions & 2 deletions nft_ingester2/config-run.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ redis:
postgres:
url: postgres://solana:solana@localhost/solana
min_connections: 10
max_connections: 25
max_connections: 50 # `max_connection` should be bigger than `program_transformer.max_tasks_in_process` otherwise unresolved lock is possible
program_transformer:
transactions_cl_audits: false
max_tasks_in_process: 100
max_tasks_in_process: 40
download_metadata_handler:
max_attempts: 3
29 changes: 27 additions & 2 deletions nft_ingester2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use {
serde::{de, Deserialize},
std::{collections::HashMap, net::SocketAddr, path::Path, time::Duration},
tokio::fs,
tracing::warn,
yellowstone_grpc_proto::prelude::SubscribeRequest,
yellowstone_grpc_tools::config::{
deserialize_usize_str, ConfigGrpcRequestAccounts, ConfigGrpcRequestCommitment,
Expand Down Expand Up @@ -181,6 +182,15 @@ pub struct ConfigIngester {
pub redis: ConfigIngesterRedis,
pub postgres: ConfigIngesterPostgres,
pub program_transformer: ConfigIngesterProgramTransformer,
pub download_metadata_handler: ConfigDownloadMetadataHandler,
}

impl ConfigIngester {
pub fn check(&self) {
if self.postgres.max_connections < self.program_transformer.max_tasks_in_process {
warn!("`postgres.max_connections` should be bigger than `program_transformer.max_tasks_in_process` otherwise unresolved lock is possible");
}
}
}

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -332,7 +342,7 @@ impl ConfigIngesterPostgres {
}

pub const fn default_max_connections() -> usize {
25
50
}
}

Expand All @@ -353,6 +363,21 @@ impl ConfigIngesterProgramTransformer {
}

pub const fn default_max_tasks_in_process() -> usize {
100
40
}
}

#[derive(Debug, Deserialize)]
pub struct ConfigDownloadMetadataHandler {
#[serde(
default = "ConfigDownloadMetadataHandler::default_max_attempts",
deserialize_with = "deserialize_usize_str"
)]
pub max_attempts: usize,
}

impl ConfigDownloadMetadataHandler {
pub const fn default_max_attempts() -> usize {
3
}
}
104 changes: 84 additions & 20 deletions nft_ingester2/src/ingester.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use {
crate::{
config::ConfigIngester,
config::{ConfigDownloadMetadataHandler, ConfigIngester},
postgres::{create_pool as pg_create_pool, metrics_pgpool},
prom::{
program_transformer_task_status_inc, program_transformer_tasks_total_set,
ProgramTransformerTaskStatusKind,
download_metadata_inserted_total_inc, program_transformer_task_status_inc,
program_transformer_tasks_total_set, ProgramTransformerTaskStatusKind,
},
redis::{metrics_xlen, ProgramTransformerInfo, RedisStream},
util::create_shutdown,
},
chrono::Utc,
crypto::{digest::Digest, sha2::Sha256},
digital_asset_types::dao::{sea_orm_active_enums::TaskStatus, tasks},
futures::{
future::{pending, BoxFuture, FusedFuture, FutureExt},
stream::StreamExt,
Expand All @@ -17,9 +20,18 @@ use {
error::ProgramTransformerError, DownloadMetadataInfo, DownloadMetadataNotifier,
ProgramTransformer,
},
std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
sea_orm::{
entity::{ActiveModelTrait, ActiveValue},
error::{DbErr, RuntimeErr},
SqlxPostgresConnector,
},
sqlx::{Error as SqlxError, PgPool},
std::{
borrow::Cow,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
},
tokio::{
task::JoinSet,
Expand All @@ -29,8 +41,6 @@ use {
};

pub async fn run(config: ConfigIngester) -> anyhow::Result<()> {
println!("{:#?}", config);

// connect to Redis
let client = redis::Client::open(config.redis.url.clone())?;
let connection = client.get_multiplexed_tokio_connection().await?;
Expand All @@ -55,19 +65,23 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> {
async move { metrics_pgpool(pgpool).await }
});

// spawn extra task to save tasks about download asset metadata
let download_metadata_handler =
DownloadMetadataHandler::new(pgpool.clone(), config.download_metadata_handler)?;

// create redis stream reader
let (mut redis_messages, redis_tasks_fut) = RedisStream::new(config.redis, connection).await?;
tokio::pin!(redis_tasks_fut);

// program transforms related
let pt_accounts = Arc::new(ProgramTransformer::new(
pgpool.clone(),
create_notifier(),
download_metadata_handler.create_notifier(),
false,
));
let pt_transactions = Arc::new(ProgramTransformer::new(
pgpool.clone(),
create_notifier(),
download_metadata_handler.create_notifier(),
config.program_transformer.transactions_cl_audits,
));
let pt_max_tasks_in_process = config.program_transformer.max_tasks_in_process;
Expand Down Expand Up @@ -214,14 +228,64 @@ pub async fn run(config: ConfigIngester) -> anyhow::Result<()> {
result
}

fn create_notifier() -> DownloadMetadataNotifier {
Box::new(
move |_info: DownloadMetadataInfo| -> BoxFuture<
'static,
Result<(), Box<dyn std::error::Error + Send + Sync>>,
> {
// TODO
Box::pin(async move { Ok(()) })
},
)
#[derive(Debug)]
struct DownloadMetadataHandler {
pgpool: PgPool,
max_attempts: i16,
}

impl DownloadMetadataHandler {
pub fn new(pgpool: PgPool, config: ConfigDownloadMetadataHandler) -> anyhow::Result<Self> {
Ok(Self {
pgpool,
max_attempts: config.max_attempts.try_into()?,
})
}

pub fn create_notifier(&self) -> DownloadMetadataNotifier {
let pgpool = self.pgpool.clone();
let max_attempts = self.max_attempts;
Box::new(
move |info: DownloadMetadataInfo| -> BoxFuture<
'static,
Result<(), Box<dyn std::error::Error + Send + Sync>>,
> {
let pgpool = pgpool.clone();
Box::pin(async move {
const NAME: &str = "DownloadMetadata";

let data = serde_json::to_value(info)?;

let mut hasher = Sha256::new();
hasher.input(NAME.as_bytes());
hasher.input(serde_json::to_vec(&data)?.as_slice());
let hash = hasher.result_str();

let model = tasks::ActiveModel {
id: ActiveValue::Set(hash),
task_type: ActiveValue::Set(NAME.to_owned()),
data: ActiveValue::Set(data),
status: ActiveValue::Set(TaskStatus::Pending),
created_at: ActiveValue::Set(Utc::now().naive_utc()),
locked_until: ActiveValue::Set(None),
locked_by: ActiveValue::Set(None),
max_attempts: ActiveValue::Set(max_attempts),
attempts: ActiveValue::Set(0),
duration: ActiveValue::Set(None),
errors: ActiveValue::Set(None),
};
let conn = SqlxPostgresConnector::from_sqlx_postgres_pool(pgpool);

match model.insert(&conn).await.map(|_mode| ()) {
// unique_violation
Err(DbErr::Query(RuntimeErr::SqlxError(SqlxError::Database(dberr)))) if dberr.code() == Some(Cow::Borrowed("23505")) => {},
value => value?,
};
download_metadata_inserted_total_inc();

Ok(())
})
},
)
}
}
1 change: 1 addition & 0 deletions nft_ingester2/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ async fn main() -> anyhow::Result<()> {
let config = config_load::<ConfigIngester>(&args.config)
.await
.with_context(|| format!("failed to parse config from: {}", args.config))?;
config.check();
ingester::run(config).await
}
}
Expand Down
11 changes: 10 additions & 1 deletion nft_ingester2/src/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
service::{make_service_fn, service_fn},
Body, Request, Response, Server, StatusCode,
},
prometheus::{IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder},
prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder},
std::{net::SocketAddr, sync::Once},
tracing::{error, info},
};
Expand Down Expand Up @@ -46,6 +46,10 @@ lazy_static::lazy_static! {
Opts::new("program_transformer_task_status", "Status of processed messages"),
&["status"],
).unwrap();

static ref DOWNLOAD_METADATA_INSERTED_TOTAL: IntCounter = IntCounter::new(
"download_metadata_inserted_total", "Total number of inserted tasks for download metadata"
).unwrap();
}

pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
Expand All @@ -65,6 +69,7 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
register!(PGPOOL_CONNECTIONS_TOTAL);
register!(PROGRAM_TRANSFORMER_TASKS_TOTAL);
register!(PROGRAM_TRANSFORMER_TASK_STATUS);
register!(DOWNLOAD_METADATA_INSERTED_TOTAL);

VERSION
.with_label_values(&[
Expand Down Expand Up @@ -171,3 +176,7 @@ pub fn program_transformer_task_status_inc(kind: ProgramTransformerTaskStatusKin
}])
.inc()
}

pub fn download_metadata_inserted_total_inc() {
DOWNLOAD_METADATA_INSERTED_TOTAL.inc()
}
1 change: 1 addition & 0 deletions program_transformers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ futures = { workspace = true }
mpl-bubblegum = { workspace = true }
num-traits = { workspace = true }
sea-orm = { workspace = true, features = [] }
serde = { workspace = true }
serde_json = { workspace = true }
solana-sdk = { workspace = true }
solana-transaction-status = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion program_transformers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use {
},
futures::future::BoxFuture,
sea_orm::{DatabaseConnection, SqlxPostgresConnector},
serde::Serialize,
solana_sdk::{instruction::CompiledInstruction, pubkey::Pubkey, signature::Signature},
solana_transaction_status::InnerInstructions,
sqlx::PgPool,
Expand Down Expand Up @@ -44,7 +45,7 @@ pub struct TransactionInfo {
pub meta_inner_instructions: Vec<InnerInstructions>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct DownloadMetadataInfo {
asset_data_id: Vec<u8>,
uri: String,
Expand Down

0 comments on commit 2776c25

Please sign in to comment.