diff --git a/paramgen/parameter_curation.py b/paramgen/parameter_curation.py index baa29fda..01782076 100644 --- a/paramgen/parameter_curation.py +++ b/paramgen/parameter_curation.py @@ -21,6 +21,7 @@ table_dir = sys.argv[1] out_dir = sys.argv[2] +random.seed(42) THRESH_HOLD = 0 THRESH_HOLD_6 = 0 @@ -676,19 +677,21 @@ def process_withdraw_query(): def main(): - queries = [3, 1, 8, 7, 10, 11, 2, 5, 6] + queries = [6, 2, 3, 5, 7, 11, 8, 10, 1] # queries = [3] multiprocessing.set_start_method('forkserver') - processes = [] - - for query_id in queries: - p = multiprocessing.Process(target=process_query, args=(query_id,)) - p.start() - processes.append(p) - - for p in processes: - p.join() + + batch_size = 5 + for i in range(0, len(queries), batch_size): + processes = [] + for query_id in queries[i:i + batch_size]: + p = multiprocessing.Process(target=process_query, args=(query_id,)) + p.start() + processes.append(p) + + for p in processes: + p.join() if __name__ == "__main__": diff --git a/scripts/run_paramgen.sh b/scripts/run_paramgen.sh index 67138c61..1f830204 100644 --- a/scripts/run_paramgen.sh +++ b/scripts/run_paramgen.sh @@ -1,7 +1,7 @@ #!/bin/bash LDBC_FINBENCH_DATAGEN_JAR=target/ldbc_finbench_datagen-0.2.0-SNAPSHOT-jar-with-dependencies.jar -OUTPUT_DIR=out/sf3/ +OUTPUT_DIR=out/ # Note: generate factor tables with --generate-factors diff --git a/src/main/scala/ldbc/finbench/datagen/factors/FactorGenerationStage.scala b/src/main/scala/ldbc/finbench/datagen/factors/FactorGenerationStage.scala index 993348d0..82dc85fb 100644 --- a/src/main/scala/ldbc/finbench/datagen/factors/FactorGenerationStage.scala +++ b/src/main/scala/ldbc/finbench/datagen/factors/FactorGenerationStage.scala @@ -77,43 +77,81 @@ object FactorGenerationStage extends DatagenStage { .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") .option("header", "true") .option("delimiter", "|") - .load(s"${args.outputDir}/raw/transfer/*.csv") - .select($"fromId", $"toId", $"amount".cast("double"), $"createTime") + .load(s"${args.outputDir}/snapshot/AccountTransferAccount.csv") + .select( + $"fromId", + $"toId", + $"amount".cast("double"), + (unix_timestamp( + coalesce( + to_timestamp($"createTime", "yyyy-MM-dd HH:mm:ss.SSS"), + to_timestamp($"createTime", "yyyy-MM-dd HH:mm:ss") + ) + ) * 1000).alias("createTime") + ) val withdrawRDD = spark.read .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") .option("header", "true") .option("delimiter", "|") - .load(s"${args.outputDir}/raw/withdraw/*.csv") - .select($"fromId", $"toId", $"amount".cast("double"), $"createTime") + .load(s"${args.outputDir}/snapshot/AccountWithdrawAccount.csv") + .select( + $"fromId", + $"toId", + $"amount".cast("double"), + (unix_timestamp( + coalesce( + to_timestamp($"createTime", "yyyy-MM-dd HH:mm:ss.SSS"), + to_timestamp($"createTime", "yyyy-MM-dd HH:mm:ss") + ) + ) * 1000).alias("createTime") + ) val depositRDD = spark.read .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") .option("header", "true") .option("delimiter", "|") - .load(s"${args.outputDir}/raw/deposit/*.csv") + .load(s"${args.outputDir}/snapshot/LoanDepositAccount.csv") .select($"accountId", $"loanId") val personInvestRDD = spark.read .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") .option("header", "true") .option("delimiter", "|") - .load(s"${args.outputDir}/raw/personInvest/*.csv") - .select($"investorId", $"companyId", $"createTime") + .load(s"${args.outputDir}/snapshot/PersonInvestCompany.csv") + .select( + $"investorId", + $"companyId", + (unix_timestamp( + coalesce( + to_timestamp($"createTime", "yyyy-MM-dd HH:mm:ss.SSS"), + to_timestamp($"createTime", "yyyy-MM-dd HH:mm:ss") + ) + ) * 1000).alias("createTime") + ) val OwnRDD = spark.read .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") .option("header", "true") .option("delimiter", "|") - .load(s"${args.outputDir}/raw/personOwnAccount/*.csv") + .load(s"${args.outputDir}/snapshot/PersonOwnAccount.csv") .select($"personId", $"accountId") val personGuaranteeRDD = spark.read .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") .option("header", "true") .option("delimiter", "|") - .load(s"${args.outputDir}/raw/personGuarantee/*.csv") - .select($"fromId", $"toId", $"createTime") + .load(s"${args.outputDir}/snapshot/PersonGuaranteePerson.csv") + .select( + $"fromId", + $"toId", + (unix_timestamp( + coalesce( + to_timestamp($"createTime", "yyyy-MM-dd HH:mm:ss.SSS"), + to_timestamp($"createTime", "yyyy-MM-dd HH:mm:ss") + ) + ) * 1000).alias("createTime") + ) def transformItems( df: DataFrame,