Skip to content

Commit

Permalink
Merge branch 'main' into opt-curation
Browse files Browse the repository at this point in the history
  • Loading branch information
qishipengqsp committed Dec 20, 2024
2 parents 3549497 + 0ed465f commit 0ae6fb9
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 169 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ sf*/
sf*.tar
sf*.tar.gz

paramgen/__pycache__/
tools/paramgen/__pycache__/
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@

table_dir = sys.argv[1]
out_dir = sys.argv[2]
random.seed(42)

THRESH_HOLD = 0
THRESH_HOLD_6 = 0
TRUNCATION_LIMIT = 10000
TRUNCATION_LIMIT = 500
BATCH_SIZE = 5000
TIME_TRUNCATE = True

Expand Down Expand Up @@ -345,7 +346,7 @@ def process_iter_queries(query_id):
else:
amount_bucket_path = os.path.join(table_dir, 'trans_withdraw_bucket')
time_bucket_path = os.path.join(table_dir, 'trans_withdraw_month')
output_path = os.path.join(out_dir, 'tcr8.txt')
output_path = os.path.join(out_dir, 'complex_8_param.csv')
steps = 2

elif query_id == 1:
Expand All @@ -356,7 +357,7 @@ def process_iter_queries(query_id):
else:
amount_bucket_path = os.path.join(table_dir, 'transfer_out_bucket')
time_bucket_path = os.path.join(table_dir, 'transfer_out_month')
output_path = os.path.join(out_dir, 'tcr1.txt')
output_path = os.path.join(out_dir, 'complex_1_param.csv')
steps = 2

elif query_id == 5:
Expand All @@ -378,7 +379,7 @@ def process_iter_queries(query_id):
else:
amount_bucket_path = os.path.join(table_dir, 'transfer_in_bucket')
time_bucket_path = os.path.join(table_dir, 'transfer_in_month')
output_path = os.path.join(out_dir, 'tcr2.txt')
output_path = os.path.join(out_dir, 'complex_2_param.csv')
steps = 3

elif query_id == 3:
Expand All @@ -394,7 +395,7 @@ def process_iter_queries(query_id):
account_account_path = os.path.join(table_dir, 'person_guarantee_list')
amount_bucket_path = os.path.join(table_dir, 'person_guarantee_count')
time_bucket_path = os.path.join(table_dir, 'person_guarantee_month')
output_path = os.path.join(out_dir, 'tcr11.txt')
output_path = os.path.join(out_dir, 'complex_11_param.csv')
steps = 3

first_account_df = process_csv(first_account_path)
Expand Down Expand Up @@ -474,14 +475,14 @@ def process_iter_queries(query_id):

elif query_id == 5:
csvWriter_5 = CSVSerializer()
csvWriter_5.setOutputFile(output_path + 'tcr5.txt')
csvWriter_5.setOutputFile(output_path + 'complex_5_param.csv')
csvWriter_5.registerHandler(hendleIdParam, final_first_items, "id")
csvWriter_5.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter_5.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter_5.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")

csvWriter_12 = CSVSerializer()
csvWriter_12.setOutputFile(output_path + 'tcr12.txt')
csvWriter_12.setOutputFile(output_path + 'complex_12_param.csv')
csvWriter_12.registerHandler(hendleIdParam, final_first_items, "id")
csvWriter_12.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter_12.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
Expand Down Expand Up @@ -519,15 +520,15 @@ def process_iter_queries(query_id):
final_second_items_4.append(final_first_items[k])

csvWriter_3 = CSVSerializer()
csvWriter_3.setOutputFile(output_path + 'tcr3.txt')
csvWriter_3.setOutputFile(output_path + 'complex_3_param.csv')
csvWriter_3.registerHandler(hendleIdParam, final_first_items, "id1")
csvWriter_3.registerHandler(hendleIdParam, final_second_items_3, "id2")
csvWriter_3.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter_3.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter_3.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")

csvWriter_4 = CSVSerializer()
csvWriter_4.setOutputFile(output_path + 'tcr4.txt')
csvWriter_4.setOutputFile(output_path + 'complex_4_param.csv')
csvWriter_4.registerHandler(hendleIdParam, final_first_items, "id1")
csvWriter_4.registerHandler(hendleIdParam, final_second_items_4, "id2")
csvWriter_4.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
Expand Down Expand Up @@ -559,7 +560,7 @@ def process_1_hop_query(query_id):
elif query_id == 10:
first_count_path = os.path.join(table_dir, 'person_invest_company')
time_bucket_path = os.path.join(table_dir, 'invest_month')
output_path = os.path.join(out_dir, 'tcr10.txt')
output_path = os.path.join(out_dir, 'complex_10_param.csv')

first_count_df = process_csv(first_count_path)
time_bucket_df = process_csv(time_bucket_path)
Expand All @@ -579,15 +580,15 @@ def process_1_hop_query(query_id):
if query_id == 7:

csvWriter_7 = CSVSerializer()
csvWriter_7.setOutputFile(output_path + 'tcr7.txt')
csvWriter_7.setOutputFile(output_path + 'complex_7_param.csv')
csvWriter_7.registerHandler(hendleIdParam, final_first_items, "id")
csvWriter_7.registerHandler(handleThresholdParam, thresh_list, "threshold")
csvWriter_7.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
csvWriter_7.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit")
csvWriter_7.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder")

csvWriter_9 = CSVSerializer()
csvWriter_9.setOutputFile(output_path + 'tcr9.txt')
csvWriter_9.setOutputFile(output_path + 'complex_9_param.csv')
csvWriter_9.registerHandler(hendleIdParam, final_first_items, "id")
csvWriter_9.registerHandler(handleThresholdParam, thresh_list, "threshold")
csvWriter_9.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime")
Expand Down Expand Up @@ -627,7 +628,7 @@ def process_withdraw_query():
else:
withdraw_bucket_path = os.path.join(table_dir, 'withdraw_in_bucket')
transfer_bucket_path = os.path.join(table_dir, 'transfer_in_bucket')
output_path = os.path.join(out_dir, 'tcr6.txt')
output_path = os.path.join(out_dir, 'complex_6_param.csv')

first_account_df = process_csv(first_account_path)
time_bucket_df = process_csv(time_bucket_path)
Expand Down Expand Up @@ -676,19 +677,21 @@ def process_withdraw_query():


def main():
queries = [3, 1, 8, 7, 10, 11, 2, 5, 6]
queries = [6, 2, 3, 5, 7, 11, 8, 10, 1]
# queries = [3]

multiprocessing.set_start_method('forkserver')
processes = []

for query_id in queries:
p = multiprocessing.Process(target=process_query, args=(query_id,))
p.start()
processes.append(p)

for p in processes:
p.join()

batch_size = 5
for i in range(0, len(queries), batch_size):
processes = []
for query_id in queries[i:i + batch_size]:
p = multiprocessing.Process(target=process_query, args=(query_id,))
p.start()
processes.append(p)

for p in processes:
p.join()


if __name__ == "__main__":
Expand Down
File renamed without changes.
File renamed without changes.
18 changes: 18 additions & 0 deletions scripts/run_paramgen.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

LDBC_FINBENCH_DATAGEN_JAR=target/ldbc_finbench_datagen-0.2.0-SNAPSHOT-jar-with-dependencies.jar
OUTPUT_DIR=out/

# Note: generate factor tables with --generate-factors

echo "start factor table generation"

time spark-submit --master local[*] \
--class ldbc.finbench.datagen.LdbcDatagen \
--driver-memory 480g \
${LDBC_FINBENCH_DATAGEN_JAR} \
--output-dir ${OUTPUT_DIR} \
--factor-format csv \
--generate-factors

echo "start parameter curation"
45 changes: 11 additions & 34 deletions src/main/resources/scale_factors.xml
Original file line number Diff line number Diff line change
@@ -1,40 +1,17 @@
<?xml version="1.0"?>
<scale_factors>
<scale_factor name="0.01">
<property>
<name>generator.numPersons</name>
<value>800</value>
</property>
<property>
<name>generator.numCompanies</name>
<value>400</value>
</property>
<property>
<name>generator.numMediums</name>
<value>1000</value>
</property>
<property>
<name>transfer.minNumDegree</name>
<value>1</value>
</property>
<property>
<name>transfer.maxNumDegree</name>
<value>1000</value>
</property>
</scale_factor>

<scale_factor name="0.1">
<property>
<name>generator.numPersons</name>
<value>8000</value>
<value>1000</value>
</property>
<property>
<name>generator.numCompanies</name>
<value>4000</value>
<value>1000</value>
</property>
<property>
<name>generator.numMediums</name>
<value>10000</value>
<value>2000</value>
</property>
<property>
<name>transfer.minNumDegree</name>
Expand All @@ -49,15 +26,15 @@
<scale_factor name="0.3">
<property>
<name>generator.numPersons</name>
<value>24000</value>
<value>3000</value>
</property>
<property>
<name>generator.numCompanies</name>
<value>12000</value>
<value>3000</value>
</property>
<property>
<name>generator.numMediums</name>
<value>30000</value>
<value>6000</value>
</property>
<property>
<name>transfer.minNumDegree</name>
Expand Down Expand Up @@ -92,18 +69,18 @@
</property>
</scale_factor>

<!-- <scale_factor name="3">
<scale_factor name="3">
<property>
<name>generator.numPersons</name>
<value>240000</value>
<value>30000</value>
</property>
<property>
<name>generator.numCompanies</name>
<value>120000</value>
<value>30000</value>
</property>
<property>
<name>generator.numMediums</name>
<value>300000</value>
<value>60000</value>
</property>
<property>
<name>transfer.minNumDegree</name>
Expand All @@ -113,7 +90,7 @@
<name>transfer.maxNumDegree</name>
<value>1000</value>
</property>
</scale_factor> -->
</scale_factor>

<scale_factor name="10">
<property>
Expand Down
14 changes: 12 additions & 2 deletions tools/statistic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,18 @@
import collections


def print_counts(counts):
labels = ["person","personOwnAccount","personApplyLoan","personGuarantee","personInvest","blank","company","companyOwnAccount","companyApplyLoan","companyGuarantee","companyInvest","blank","account","transfer","withdraw","blank","loan","loantransfer","deposit","repay","blank","medium","signIn"]

def print_original_counts(counts):
for key, value in collections.OrderedDict(sorted(counts.items())).items():
print("{}:{}".format(key, value))

def print_formatted_counts(counts):
for label in labels:
if label == "blank":
print("================================")
else:
print("{}:{}".format(label, counts[label]))

def count_entites(path):
counts = {}
Expand All @@ -18,7 +26,9 @@ def count_entites(path):
for file in glob.glob(os.path.join(subdir_path, "*.csv")):
num_entites += sum(1 for _ in open(file)) - 1
counts[subdir] = num_entites
print_counts(counts)
print_original_counts(counts)
print("\n========== Formatted Output ============\n")
print_formatted_counts(counts)


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit 0ae6fb9

Please sign in to comment.