diff --git a/.gitignore b/.gitignore index 0640ec27..53852d66 100644 --- a/.gitignore +++ b/.gitignore @@ -25,4 +25,5 @@ sf*/ sf*.tar sf*.tar.gz +paramgen/__pycache__/ tools/paramgen/__pycache__/ \ No newline at end of file diff --git a/tools/paramgen/parameter_curation.py b/paramgen/parameter_curation.py similarity index 91% rename from tools/paramgen/parameter_curation.py rename to paramgen/parameter_curation.py index 7c92e6f2..01782076 100644 --- a/tools/paramgen/parameter_curation.py +++ b/paramgen/parameter_curation.py @@ -17,10 +17,15 @@ from glob import glob import concurrent.futures from functools import partial +import sys + +table_dir = sys.argv[1] +out_dir = sys.argv[2] +random.seed(42) THRESH_HOLD = 0 THRESH_HOLD_6 = 0 -TRUNCATION_LIMIT = 10000 +TRUNCATION_LIMIT = 500 BATCH_SIZE = 5000 TIME_TRUNCATE = True @@ -29,20 +34,16 @@ else: TRUNCATION_ORDER = "AMOUNT_DESCENDING" -table_dir = '../../out/factor_table' -out_dir = '../../out/substitute_parameters/' def process_csv(file_path): all_files = glob(file_path + '/*.csv') - df_list = [] - + for filename in all_files: df = pd.read_csv(filename, delimiter='|') df_list.append(df) - + combined_df = pd.concat(df_list, ignore_index=True) - return combined_df @@ -52,7 +53,7 @@ def __init__(self): self.inputs = [] def setOutputFile(self, outputFile): - self.outputFile=outputFile + self.outputFile = outputFile def registerHandler(self, handler, inputParams, header): handler.header = header @@ -63,7 +64,7 @@ def writeCSV(self): dir_path = os.path.dirname(self.outputFile) if not os.path.exists(dir_path): os.makedirs(dir_path) - output = codecs.open( self.outputFile, "w",encoding="utf-8") + output = codecs.open(self.outputFile, "w", encoding="utf-8") if len(self.inputs) == 0: return @@ -105,7 +106,8 @@ def find_neighbors(account_list, account_account_df, account_amount_df, amount_b transfer_in_amount = account_amount_df.loc[item]['amount'] except KeyError: transfer_in_amount = 0 - result.update(neighbors_with_truncate_threshold(transfer_in_amount, rows_account_list, rows_amount_bucket, num_list)) + result.update( + neighbors_with_truncate_threshold(transfer_in_amount, rows_account_list, rows_amount_bucket, num_list)) elif query_id in [1, 2, 5]: for item in account_list: @@ -131,7 +133,6 @@ def find_neighbors(account_list, account_account_df, account_amount_df, amount_b def filter_neighbors(account_list, amount_bucket_df, num_list, account_id): - # print(f'account_list: {account_list}') rows_amount_bucket = amount_bucket_df.loc[account_id] @@ -144,20 +145,20 @@ def filter_neighbors(account_list, amount_bucket_df, num_list, account_id): header_at_limit = int(col) break - partial_apply = partial(filter_neighbors_with_truncate_threshold, amount_bucket_df=amount_bucket_df, num_list=num_list, header_at_limit=header_at_limit) + partial_apply = partial(filter_neighbors_with_truncate_threshold, amount_bucket_df=amount_bucket_df, + num_list=num_list, header_at_limit=header_at_limit) account_mapped = map(partial_apply, account_list) result = [x for x in account_mapped if x is not None] return result def filter_neighbors_with_truncate_threshold(item, amount_bucket_df, num_list, header_at_limit): - if item[1] > THRESH_HOLD_6: if header_at_limit == -1: return item[0] if (TIME_TRUNCATE and item[2] >= header_at_limit) or (not TIME_TRUNCATE and item[1] >= header_at_limit): return item[0] - + return None @@ -183,7 +184,7 @@ def neighbors_with_truncate_threshold(transfer_in_amount, rows_account_list, row return [t[0] for t in temp if t[1] >= header_at_limit] else: return [t[0] for t in temp] - + def neighbors_with_trancate(rows_account_list, rows_amount_bucket, num_list): if rows_amount_bucket is None: @@ -215,7 +216,6 @@ def process_get_neighbors(chunk, account_account_df, account_amount_df, amount_b def process_filter_neighbors(chunk, amount_bucket_df, num_list): - first_column_name = chunk.columns[0] second_column_name = chunk.columns[1] @@ -227,7 +227,6 @@ def process_filter_neighbors(chunk, amount_bucket_df, num_list): def get_next_neighbor_list(neighbors_df, account_account_df, account_amount_df, amount_bucket_df, query_id): - num_list = [] if query_id != 3 and query_id != 11: num_list = [x for x in amount_bucket_df.columns.tolist()] @@ -237,10 +236,12 @@ def get_next_neighbor_list(neighbors_df, account_account_df, account_amount_df, chunks = np.array_split(neighbors_df, query_parallelism) with concurrent.futures.ProcessPoolExecutor(max_workers=query_parallelism) as executor: - futures = [executor.submit(process_get_neighbors, chunk, account_account_df, account_amount_df, amount_bucket_df, num_list, query_id) for chunk in chunks] + futures = [ + executor.submit(process_get_neighbors, chunk, account_account_df, account_amount_df, amount_bucket_df, + num_list, query_id) for chunk in chunks] results = [future.result() for future in concurrent.futures.as_completed(futures)] executor.shutdown(wait=True) - + next_neighbors_df = pd.concat(results) next_neighbors_df = next_neighbors_df.sort_index() return next_neighbors_df @@ -271,16 +272,18 @@ def process_batch(batch, basic_sum_df, first_column_name, second_column_name): ).drop(columns=[second_column_name]) return merged_df.groupby(first_column_name).sum() + def get_next_sum_table(neighbors_df, basic_sum_df, batch_size=BATCH_SIZE): first_column_name = neighbors_df.columns[0] second_column_name = neighbors_df.columns[1] batches = [neighbors_df.iloc[start:start + batch_size] for start in range(0, len(neighbors_df), batch_size)] - + result_list = [] query_parallelism = max(1, multiprocessing.cpu_count() // 4) with concurrent.futures.ProcessPoolExecutor(max_workers=query_parallelism) as executor: - futures = [executor.submit(process_batch, batch, basic_sum_df, first_column_name, second_column_name) for batch in batches] + futures = [executor.submit(process_batch, batch, basic_sum_df, first_column_name, second_column_name) for batch + in batches] for future in futures: result_list.append(future.result()) executor.shutdown(wait=True) @@ -293,28 +296,32 @@ def get_next_sum_table(neighbors_df, basic_sum_df, batch_size=BATCH_SIZE): def handleThresholdParam(threshold): return str(threshold) + def handleThreshold2Param(threshold2): return str(threshold2) + def hendleIdParam(id): return str(id) + def handleTruncateLimitParam(truncateLimit): return str(truncateLimit) + def handleTruncateOrderParam(truncateOrder): return truncateOrder def handleTimeDurationParam(timeParam): - start = timegm(date(year=int(timeParam.year), month=int(timeParam.month), day=int(timeParam.day)).timetuple())*1000 + start = timegm( + date(year=int(timeParam.year), month=int(timeParam.month), day=int(timeParam.day)).timetuple()) * 1000 end = start + timeParam.duration * 3600 * 24 * 1000 res = str(start) + "|" + str(end) return res def process_query(query_id): - if query_id in [7, 10]: process_1_hop_query(query_id) elif query_id in [1, 2, 3, 5, 8, 11]: @@ -324,7 +331,6 @@ def process_query(query_id): def process_iter_queries(query_id): - upstream_amount_df = None upstream_amount_path = None amount_bucket_path = None @@ -340,7 +346,7 @@ def process_iter_queries(query_id): else: amount_bucket_path = os.path.join(table_dir, 'trans_withdraw_bucket') time_bucket_path = os.path.join(table_dir, 'trans_withdraw_month') - output_path = os.path.join(out_dir, 'tcr8.txt') + output_path = os.path.join(out_dir, 'complex_8_param.csv') steps = 2 elif query_id == 1: @@ -351,7 +357,7 @@ def process_iter_queries(query_id): else: amount_bucket_path = os.path.join(table_dir, 'transfer_out_bucket') time_bucket_path = os.path.join(table_dir, 'transfer_out_month') - output_path = os.path.join(out_dir, 'tcr1.txt') + output_path = os.path.join(out_dir, 'complex_1_param.csv') steps = 2 elif query_id == 5: @@ -373,7 +379,7 @@ def process_iter_queries(query_id): else: amount_bucket_path = os.path.join(table_dir, 'transfer_in_bucket') time_bucket_path = os.path.join(table_dir, 'transfer_in_month') - output_path = os.path.join(out_dir, 'tcr2.txt') + output_path = os.path.join(out_dir, 'complex_2_param.csv') steps = 3 elif query_id == 3: @@ -389,10 +395,9 @@ def process_iter_queries(query_id): account_account_path = os.path.join(table_dir, 'person_guarantee_list') amount_bucket_path = os.path.join(table_dir, 'person_guarantee_count') time_bucket_path = os.path.join(table_dir, 'person_guarantee_month') - output_path = os.path.join(out_dir, 'tcr11.txt') + output_path = os.path.join(out_dir, 'complex_11_param.csv') steps = 3 - first_account_df = process_csv(first_account_path) account_account_df = process_csv(account_account_path) if query_id == 8: @@ -407,7 +412,7 @@ def process_iter_queries(query_id): first_account_name = account_account_df.columns[0] second_account_name = account_account_df.columns[1] first_amount_name = amount_bucket_df.columns[0] - first_time_name = time_bucket_df.columns[0] + first_time_name = time_bucket_df.columns[0] account_account_df[second_account_name] = account_account_df[second_account_name].apply(literal_eval) first_account_df[second_column_name] = first_account_df[second_column_name].apply(literal_eval) @@ -434,7 +439,8 @@ def process_iter_queries(query_id): if current_step == steps - 1: next_time_bucket = get_next_sum_table(first_neighbors_df, time_bucket_df) else: - first_neighbors_df = get_next_neighbor_list(first_neighbors_df, account_account_df, upstream_amount_df, amount_bucket_df, query_id) + first_neighbors_df = get_next_neighbor_list(first_neighbors_df, account_account_df, upstream_amount_df, + amount_bucket_df, query_id) current_step += 1 @@ -469,14 +475,14 @@ def process_iter_queries(query_id): elif query_id == 5: csvWriter_5 = CSVSerializer() - csvWriter_5.setOutputFile(output_path + 'tcr5.txt') + csvWriter_5.setOutputFile(output_path + 'complex_5_param.csv') csvWriter_5.registerHandler(hendleIdParam, final_first_items, "id") csvWriter_5.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime") csvWriter_5.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit") csvWriter_5.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder") csvWriter_12 = CSVSerializer() - csvWriter_12.setOutputFile(output_path + 'tcr12.txt') + csvWriter_12.setOutputFile(output_path + 'complex_12_param.csv') csvWriter_12.registerHandler(hendleIdParam, final_first_items, "id") csvWriter_12.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime") csvWriter_12.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit") @@ -506,23 +512,23 @@ def process_iter_queries(query_id): j = 0 k = 0 while True: - j = random.randint(0, len(final_first_items)-1) - k = random.randint(0, len(final_first_items)-1) + j = random.randint(0, len(final_first_items) - 1) + k = random.randint(0, len(final_first_items) - 1) if final_first_items[j] != account_id and final_first_items[k] != account_id: break final_second_items_3.append(final_first_items[j]) final_second_items_4.append(final_first_items[k]) - + csvWriter_3 = CSVSerializer() - csvWriter_3.setOutputFile(output_path + 'tcr3.txt') + csvWriter_3.setOutputFile(output_path + 'complex_3_param.csv') csvWriter_3.registerHandler(hendleIdParam, final_first_items, "id1") csvWriter_3.registerHandler(hendleIdParam, final_second_items_3, "id2") csvWriter_3.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime") csvWriter_3.registerHandler(handleTruncateLimitParam, truncate_limit_list, "truncationLimit") csvWriter_3.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder") - + csvWriter_4 = CSVSerializer() - csvWriter_4.setOutputFile(output_path + 'tcr4.txt') + csvWriter_4.setOutputFile(output_path + 'complex_4_param.csv') csvWriter_4.registerHandler(hendleIdParam, final_first_items, "id1") csvWriter_4.registerHandler(hendleIdParam, final_second_items_4, "id2") csvWriter_4.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime") @@ -547,7 +553,6 @@ def process_iter_queries(query_id): def process_1_hop_query(query_id): - if query_id == 7: first_count_path = os.path.join(table_dir, 'account_in_out_count') time_bucket_path = os.path.join(table_dir, 'account_in_out_month') @@ -555,7 +560,7 @@ def process_1_hop_query(query_id): elif query_id == 10: first_count_path = os.path.join(table_dir, 'person_invest_company') time_bucket_path = os.path.join(table_dir, 'invest_month') - output_path = os.path.join(out_dir, 'tcr10.txt') + output_path = os.path.join(out_dir, 'complex_10_param.csv') first_count_df = process_csv(first_count_path) time_bucket_df = process_csv(time_bucket_path) @@ -575,7 +580,7 @@ def process_1_hop_query(query_id): if query_id == 7: csvWriter_7 = CSVSerializer() - csvWriter_7.setOutputFile(output_path + 'tcr7.txt') + csvWriter_7.setOutputFile(output_path + 'complex_7_param.csv') csvWriter_7.registerHandler(hendleIdParam, final_first_items, "id") csvWriter_7.registerHandler(handleThresholdParam, thresh_list, "threshold") csvWriter_7.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime") @@ -583,7 +588,7 @@ def process_1_hop_query(query_id): csvWriter_7.registerHandler(handleTruncateOrderParam, truncate_order_list, "truncationOrder") csvWriter_9 = CSVSerializer() - csvWriter_9.setOutputFile(output_path + 'tcr9.txt') + csvWriter_9.setOutputFile(output_path + 'complex_9_param.csv') csvWriter_9.registerHandler(hendleIdParam, final_first_items, "id") csvWriter_9.registerHandler(handleThresholdParam, thresh_list, "threshold") csvWriter_9.registerHandler(handleTimeDurationParam, time_list, "startTime|endTime") @@ -594,13 +599,13 @@ def process_1_hop_query(query_id): csvWriter_9.writeCSV() print(f'query_id 7 and 9 finished') - + elif query_id == 10: final_second_items = [] for person in final_first_items: j = 0 while True: - j = random.randint(0, len(final_first_items)-1) + j = random.randint(0, len(final_first_items) - 1) if final_first_items[j] != person: break final_second_items.append(final_first_items[j]) @@ -616,7 +621,6 @@ def process_1_hop_query(query_id): def process_withdraw_query(): - first_account_path = os.path.join(table_dir, 'account_withdraw_in_items') time_bucket_path = os.path.join(table_dir, 'transfer_in_month') if TIME_TRUNCATE: @@ -624,7 +628,7 @@ def process_withdraw_query(): else: withdraw_bucket_path = os.path.join(table_dir, 'withdraw_in_bucket') transfer_bucket_path = os.path.join(table_dir, 'transfer_in_bucket') - output_path = os.path.join(out_dir, 'tcr6.txt') + output_path = os.path.join(out_dir, 'complex_6_param.csv') first_account_df = process_csv(first_account_path) time_bucket_df = process_csv(time_bucket_path) @@ -640,7 +644,7 @@ def process_withdraw_query(): withdraw_bucket_df.set_index(withdraw_first_name, inplace=True) transfer_bucket_df.set_index(transfer_first_name, inplace=True) time_bucket_df.set_index(time_first_name, inplace=True) - + first_account_df[second_column_name] = first_account_df[second_column_name].apply(literal_eval) first_neighbors_df = first_account_df.sort_values(by=first_column_name) @@ -673,19 +677,21 @@ def process_withdraw_query(): def main(): - queries = [3, 1, 8, 7, 10, 11, 2, 5, 6] + queries = [6, 2, 3, 5, 7, 11, 8, 10, 1] # queries = [3] multiprocessing.set_start_method('forkserver') - processes = [] - - for query_id in queries: - p = multiprocessing.Process(target=process_query, args=(query_id,)) - p.start() - processes.append(p) - - for p in processes: - p.join() + + batch_size = 5 + for i in range(0, len(queries), batch_size): + processes = [] + for query_id in queries[i:i + batch_size]: + p = multiprocessing.Process(target=process_query, args=(query_id,)) + p.start() + processes.append(p) + + for p in processes: + p.join() if __name__ == "__main__": diff --git a/paramgen/search_params.py b/paramgen/search_params.py new file mode 100644 index 00000000..3d104270 --- /dev/null +++ b/paramgen/search_params.py @@ -0,0 +1,151 @@ +import math +from operator import itemgetter + + +def allclose(a, b, rtol=1e-05, atol=1e-08): + return abs(a - b) <= (atol + rtol * abs(b)) + + +def readFactors(f): + res = [] + for line in f.readlines(): + values = [item if index == 0 else int(item) for (index, item) in enumerate(line.split(","))] + res.append(values) + + return res + + +class Window: + def __init__(self, paramId, start, end): + self.param = paramId + self.start = start + self.end = end + self.avg = 0.0 + self.stddev = 0.0 + self.size = end - start + 1 + + def __str__(self): + res = "[%d, %d] " % (self.start, self.end) + res += "size: %d, avg: %0.2f, stddev: %0.2f" % (self.size, self.avg, self.stddev) + return res + + +def getAverageCost(rows, key): + return float(sum([key(r) for r in rows])) / len(rows) + + +def getCostStdDev(rows, avg, key): + return math.sqrt(sum([math.pow(key(r) - avg, 2) for r in rows]) / len(rows)) + + +def updateAverageCost(avg, oldelem, newelem, samplesize): + return avg + (newelem - oldelem) / samplesize + + +def findWindows(factors, param, amount, bounds): + data = factors[bounds[0]: bounds[1]] + allWindows = [] + start = 0 + + initWindow = Window(param, start, amount - 1) + initWindow.avg = getAverageCost(data[start:amount], itemgetter(param)) + initWindow.stddev = getCostStdDev(data[start:amount], initWindow.avg, itemgetter(param)) + + s1 = sum([x[param] for x in data[start:start + amount]]) + s2 = sum([x[param] * x[param] for x in data[start:start + amount]]) + start += 1 + allWindows.append(initWindow) + + while start + amount < len(data): + end = start + amount + if data[end - 1][param] < 10: + break + + window = Window(param, bounds[0] + start, bounds[0] + end - 1) + + # update the streaming stats about avg and stddev + s1 -= data[start - 1][param] + s1 += data[end - 1][param] + s2 -= data[start - 1][param] * data[start - 1][param] + s2 += data[end - 1][param] * data[end - 1][param] + + window.avg = float(s1) / amount + window.stddev = math.sqrt(float(amount * s2 - s1 * s1)) / amount + + allWindows.append(window) + start += 1 + + allWindows.sort(key=lambda windows: windows.stddev) + + res = [] + first = allWindows[0] + iter = 0 + + while iter < len(allWindows) and allWindows[iter].stddev == first.stddev: + res.append(allWindows[iter]) + iter += 1 + + return res + + +def mergeWindows(windows): + res = [] + + cur = windows[0] + + iter = 1 + constucted = cur + + while iter < len(windows): + while iter < len(windows) and windows[iter].start == cur.start + 1 and allclose(windows[iter].avg, cur.avg): + cur = windows[iter] + constucted.end = cur.end + constucted.size += 1 + iter += 1 + + res.append(constucted) + if iter >= len(windows): + break + + constucted = windows[iter] + cur = windows[iter] + iter += 1 + + return res + + +def generate(factors, portion): + amount = int(len(factors) * portion) + params = len(factors[0]) - 1 + + keys = [i for i in range(1, params + 1)] + + factors = sorted(factors, key=itemgetter(*keys), reverse=True) + result = [] + paramId = 1 + + current_windows = findWindows(factors, paramId, amount, (0, len(factors))) + + while len(current_windows) > 1 and paramId < params: + paramId += 1 + current_windows = mergeWindows(current_windows) + + new_windows = [] + for w in current_windows: + w2 = findWindows(factors, paramId, amount, (w.start, w.end + 1)) + new_windows.extend(w2) + + new_windows.sort(key=lambda w: w.stddev) + + current_windows = [] + first = new_windows[0] + iter = 0 + + while iter < len(new_windows) and new_windows[iter].stddev == first.stddev: + current_windows.append(new_windows[iter]) + iter += 1 + + w = current_windows[0] + + result.extend([factors[w.start + i][0] for i in range(amount)]) + return result diff --git a/tools/paramgen/time_select.py b/paramgen/time_select.py similarity index 71% rename from tools/paramgen/time_select.py rename to paramgen/time_select.py index 0d06b7c5..78450e19 100644 --- a/tools/paramgen/time_select.py +++ b/paramgen/time_select.py @@ -1,25 +1,26 @@ LAST_MONTHS = 3 START_YEAR = 2020 + class MonthYearCount: def __init__(self, month, year, count): - self.month=month - self.year=year - self.count=count + self.month = month + self.year = year + self.count = count class TimeParameter: def __init__(self, year, month, day, duration): - self.month=month - self.year=year - self.day=day - self.duration=duration - + self.month = month + self.year = year + self.day = day + self.duration = duration + -def getMedian(data, sort_key, getEntireTuple = False): +def getMedian(data, sort_key, getEntireTuple=False): if len(data) == 0: if getEntireTuple: - return MonthYearCount(0,0,0) + return MonthYearCount(0, 0, 0) return 0 if len(data) == 1: @@ -27,20 +28,20 @@ def getMedian(data, sort_key, getEntireTuple = False): return data[0] return data[0].count - srtd = sorted(data,key=sort_key) - mid = int(len(data)/2) + srtd = sorted(data, key=sort_key) + mid = int(len(data) / 2) if len(data) % 2 == 0: if getEntireTuple: return srtd[mid] - return (sort_key(srtd[mid-1]) + sort_key(srtd[mid])) / 2.0 + return (sort_key(srtd[mid - 1]) + sort_key(srtd[mid])) / 2.0 if getEntireTuple: return srtd[mid] return sort_key(srtd[mid]) -def computeTimeMedians(factors, lastmonthcount = LAST_MONTHS): +def computeTimeMedians(factors, lastmonthcount=LAST_MONTHS): mediantimes = [] lastmonths = [] firstmonths = [] @@ -48,48 +49,47 @@ def computeTimeMedians(factors, lastmonthcount = LAST_MONTHS): values.sort(key=lambda myc: (myc.year, myc.month)) l = len(values) - lastmonthsum = sum(myc.count for myc in values[max(l-lastmonthcount,0):l]) + lastmonthsum = sum(myc.count for myc in values[max(l - lastmonthcount, 0):l]) lastmonths.append(lastmonthsum) - cutoff_max = l-lastmonthcount + cutoff_max = l - lastmonthcount if cutoff_max < 0: cutoff_max = l firstmonthsum = sum(myc.count for myc in values[0:cutoff_max]) firstmonths.append(firstmonthsum) - mediantimes.append(getMedian(values,lambda myc: myc.count)) + mediantimes.append(getMedian(values, lambda myc: myc.count)) median = getMedian(mediantimes, lambda x: x) medianLastMonth = getMedian(lastmonths, lambda x: x) medianFirstMonth = getMedian(firstmonths, lambda x: x) return medianFirstMonth, medianLastMonth, median - + def getTimeParamsWithMedian(factors, medianFirstMonth, medianLastMonth, median): # strategy: find the median of the given distribution, then increase the time interval until it matches the given parameter res = [] for values in factors: - input = sorted(values,key=lambda myc: (myc.year, myc.month)) - currentMedian = getMedian(values,lambda myc: myc.count, True) + input = sorted(values, key=lambda myc: (myc.year, myc.month)) + currentMedian = getMedian(values, lambda myc: myc.count, True) if int(median) == 0 or int(currentMedian.count) == 0 or int(currentMedian.year) == 0: - res.append(TimeParameter(START_YEAR,1,1,0)) + res.append(TimeParameter(START_YEAR, 1, 1, 0)) continue if currentMedian.count > median: - duration = int(28*currentMedian.count/median) + duration = int(28 * currentMedian.count / median) res.append(TimeParameter(currentMedian.year, currentMedian.month, 1, duration)) else: - duration = int(28*median/currentMedian.count) + duration = int(28 * median / currentMedian.count) res.append(TimeParameter(currentMedian.year, currentMedian.month, 1, duration)) return res - + def findTimeParameters(factors): - medianFirstMonth, medianLastMonth, median = computeTimeMedians(factors) timeParams = getTimeParamsWithMedian(factors, medianFirstMonth, medianLastMonth, median) return timeParams - + def findTimeParams(input_loan_list, time_bucket_df): time_list = [x for x in time_bucket_df.iloc[0].index.tolist()[1:]] factors = [] @@ -103,7 +103,5 @@ def findTimeParams(input_loan_list, time_bucket_df): year = START_YEAR + month / 12 temp_factors.append(MonthYearCount(month % 12 + 1, int(year), count)) factors.append(temp_factors) - - return findTimeParameters(factors) - + return findTimeParameters(factors) diff --git a/scripts/run_local.sh b/scripts/run_local.sh index 3185dbe5..bc01c09f 100644 --- a/scripts/run_local.sh +++ b/scripts/run_local.sh @@ -3,12 +3,19 @@ LDBC_FINBENCH_DATAGEN_JAR=target/ldbc_finbench_datagen-0.2.0-SNAPSHOT-jar-with-dependencies.jar OUTPUT_DIR=out +# Note: generate factor tables with --generate-factors + # run locally with the python script # time python3 scripts/run.py --jar $LDBC_FINBENCH_DATAGEN_JAR --main-class ldbc.finbench.datagen.LdbcDatagen --memory 500g -- --scale-factor 30 --output-dir ${OUTPUT_DIR} # run locally with spark-submit command # **({'spark.driver.extraJavaOptions': '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005'}), # Debug # **({'spark.executor.extraJavaOptions': '-verbose:gc -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps'}), +# --conf "spark.memory.offHeap.enabled=true" \ +# --conf "spark.memory.offHeap.size=100g" \ +# --conf "spark.storage.memoryFraction=0" \ +# --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ + time spark-submit --master local[*] \ --class ldbc.finbench.datagen.LdbcDatagen \ --driver-memory 480g \ @@ -24,3 +31,20 @@ time spark-submit --master local[*] \ ${LDBC_FINBENCH_DATAGEN_JAR} \ --scale-factor 10 \ --output-dir ${OUTPUT_DIR} + +# currently works on SF100 +#time spark-submit --master local[*] \ +# --class ldbc.finbench.datagen.LdbcDatagen \ +# --driver-memory 400g \ +# --conf "spark.default.parallelism=800" \ +# --conf "spark.shuffle.compress=true" \ +# --conf "spark.shuffle.spill.compress=true" \ +# --conf "spark.kryoserializer.buffer.max=512m" \ +# --conf "spark.driver.maxResultSize=0" \ +# --conf "spark.driver.extraJavaOptions=-Xss512m" \ +# --conf "spark.executor.extraJavaOptions=-Xss512m -XX:+UseG1GC" \ +# --conf "spark.kryo.referenceTracking=false" \ +# ${LDBC_FINBENCH_DATAGEN_JAR} \ +# --scale-factor 100 \ +# --output-dir ${OUTPUT_DIR} + diff --git a/scripts/run_paramgen.sh b/scripts/run_paramgen.sh new file mode 100644 index 00000000..1f830204 --- /dev/null +++ b/scripts/run_paramgen.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +LDBC_FINBENCH_DATAGEN_JAR=target/ldbc_finbench_datagen-0.2.0-SNAPSHOT-jar-with-dependencies.jar +OUTPUT_DIR=out/ + +# Note: generate factor tables with --generate-factors + +echo "start factor table generation" + +time spark-submit --master local[*] \ + --class ldbc.finbench.datagen.LdbcDatagen \ + --driver-memory 480g \ + ${LDBC_FINBENCH_DATAGEN_JAR} \ + --output-dir ${OUTPUT_DIR} \ + --factor-format csv \ + --generate-factors + +echo "start parameter curation" \ No newline at end of file diff --git a/src/main/resources/scale_factors.xml b/src/main/resources/scale_factors.xml index 18d70cdb..192a223e 100644 --- a/src/main/resources/scale_factors.xml +++ b/src/main/resources/scale_factors.xml @@ -1,40 +1,17 @@ - - - generator.numPersons - 800 - - - generator.numCompanies - 400 - - - generator.numMediums - 1000 - - - transfer.minNumDegree - 1 - - - transfer.maxNumDegree - 1000 - - - generator.numPersons - 8000 + 1000 generator.numCompanies - 4000 + 1000 generator.numMediums - 10000 + 2000 transfer.minNumDegree @@ -49,15 +26,15 @@ generator.numPersons - 24000 + 3000 generator.numCompanies - 12000 + 3000 generator.numMediums - 30000 + 6000 transfer.minNumDegree @@ -92,18 +69,18 @@ - + diff --git a/src/main/scala/ldbc/finbench/datagen/LdbcDatagen.scala b/src/main/scala/ldbc/finbench/datagen/LdbcDatagen.scala index 18d74096..16f17d7e 100644 --- a/src/main/scala/ldbc/finbench/datagen/LdbcDatagen.scala +++ b/src/main/scala/ldbc/finbench/datagen/LdbcDatagen.scala @@ -1,9 +1,7 @@ package ldbc.finbench.datagen import ldbc.finbench.datagen.factors.FactorGenerationStage -import ldbc.finbench.datagen.generation.dictionary.Dictionaries import ldbc.finbench.datagen.generation.GenerationStage -import ldbc.finbench.datagen.transformation.TransformationStage import ldbc.finbench.datagen.util.{Logging, SparkApp} import shapeless.lens @@ -24,7 +22,7 @@ object LdbcDatagen extends SparkApp with Logging { format: String = "csv", formatOptions: Map[String, String] = Map.empty, epochMillis: Boolean = false, - generateFactors: Boolean = true, + generateFactors: Boolean = false, factorFormat: String = "parquet" ) @@ -121,22 +119,22 @@ object LdbcDatagen extends SparkApp with Logging { } override def run(args: ArgsType): Unit = { - val generationArgs = GenerationStage.Args( - scaleFactor = args.scaleFactor, - outputDir = args.outputDir, - format = args.format, - partitionsOpt = args.numPartitions - ) - log.info("[Main] Starting generation stage") - GenerationStage.run(generationArgs) - - if (args.generateFactors) { - val factorArgs = FactorGenerationStage.Args( - outputDir = args.outputDir, - format = args.factorFormat + if (!args.generateFactors) { + GenerationStage.run( + GenerationStage.Args( + scaleFactor = args.scaleFactor, + outputDir = args.outputDir, + format = args.format, + partitionsOpt = args.numPartitions + ) + ) + } else { + FactorGenerationStage.run( + FactorGenerationStage.Args( + outputDir = args.outputDir, + format = args.factorFormat + ) ) - log.info("[Main] Starting factoring stage") -// FactorGenerationStage.run(factorArgs) } } } diff --git a/src/main/scala/ldbc/finbench/datagen/factors/FactorGenerationStage.scala b/src/main/scala/ldbc/finbench/datagen/factors/FactorGenerationStage.scala index e0bf0973..b178d0cc 100644 --- a/src/main/scala/ldbc/finbench/datagen/factors/FactorGenerationStage.scala +++ b/src/main/scala/ldbc/finbench/datagen/factors/FactorGenerationStage.scala @@ -10,7 +10,6 @@ import shapeless.lens import scala.util.matching.Regex object FactorGenerationStage extends DatagenStage { - @transient lazy val log: Logger = LoggerFactory.getLogger(this.getClass) case class Args( @@ -64,158 +63,152 @@ object FactorGenerationStage extends DatagenStage { run(parsedArgs) } - // execute factorization process - // TODO: finish all override def run(args: Args) = { - parameterCuration(args) + factortables(args) } - def parameterCuration(args: Args)(implicit spark: SparkSession) = { - import spark.implicits._ + def readCSV(path: String): DataFrame = { + val csvOptions = Map("header" -> "true", "delimiter" -> "|") + spark.read + .format("csv") + .options(csvOptions) + .load(path) + } - val transferRDD = spark.read - .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") - .option("header", "true") - .option("delimiter", "|") - .load(s"${args.outputDir}/raw/transfer/*.csv") - .select($"fromId", $"toId", $"amount".cast("double"), $"createTime") + def transformItems( + df: DataFrame, + groupByCol: String, + selectCol: String + ): DataFrame = { + val itemAmountRDD = df + .groupBy(groupByCol, selectCol) + .agg( + max(col("amount")).alias("maxAmount"), + max(col("createTime")).alias("maxCreateTime") + ) - val withdrawRDD = spark.read - .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") - .option("header", "true") - .option("delimiter", "|") - .load(s"${args.outputDir}/raw/withdraw/*.csv") - .select($"fromId", $"toId", $"amount".cast("double"), $"createTime") + val itemsRDD = itemAmountRDD + .groupBy(groupByCol) + .agg( + collect_list( + array(col(selectCol), col("maxAmount"), col("maxCreateTime")) + ).alias("items") + ) - val depositRDD = spark.read - .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") - .option("header", "true") - .option("delimiter", "|") - .load(s"${args.outputDir}/raw/deposit/*.csv") - .select($"accountId", $"loanId") + itemsRDD + .withColumn( + "items", + F.expr( + "transform(items, array -> concat('[', concat_ws(',', array), ']'))" + ) + ) + .withColumn( + "items", + F.concat(lit("["), F.concat_ws(",", col("items")), lit("]")) + ) + } - val personInvestRDD = spark.read - .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") - .option("header", "true") - .option("delimiter", "|") - .load(s"${args.outputDir}/raw/personInvest/*.csv") - .select($"investorId", $"companyId", $"createTime") + def bucketAndCount( + df: DataFrame, + idCol: String, + amountCol: String, + groupCol: String + ): DataFrame = { + + val buckets = Array(10000, 30000, 100000, 300000, 1000000, 2000000, 3000000, + 4000000, 5000000, 6000000, 7000000, 8000000, 9000000, 10000000) + + val bucketedRDD = df.withColumn( + "bucket", + when(col(amountCol) <= buckets(0), buckets(0)) + .when(col(amountCol) <= buckets(1), buckets(1)) + .when(col(amountCol) <= buckets(2), buckets(2)) + .when(col(amountCol) <= buckets(3), buckets(3)) + .when(col(amountCol) <= buckets(4), buckets(4)) + .when(col(amountCol) <= buckets(5), buckets(5)) + .when(col(amountCol) <= buckets(6), buckets(6)) + .when(col(amountCol) <= buckets(7), buckets(7)) + .when(col(amountCol) <= buckets(8), buckets(8)) + .when(col(amountCol) <= buckets(9), buckets(9)) + .when(col(amountCol) <= buckets(10), buckets(10)) + .when(col(amountCol) <= buckets(11), buckets(11)) + .when(col(amountCol) <= buckets(12), buckets(12)) + .otherwise(buckets(13)) + ) - val OwnRDD = spark.read - .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") - .option("header", "true") - .option("delimiter", "|") - .load(s"${args.outputDir}/raw/personOwnAccount/*.csv") - .select($"personId", $"accountId") + bucketedRDD + .groupBy(idCol) + .pivot("bucket", buckets.map(_.toString)) + .count() + .na + .fill(0) + .withColumnRenamed(idCol, groupCol) + } - val personGuaranteeRDD = spark.read - .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") - .option("header", "true") - .option("delimiter", "|") - .load(s"${args.outputDir}/raw/personGuarantee/*.csv") - .select($"fromId", $"toId", $"createTime") - - def transformItems( - df: DataFrame, - groupByCol: String, - selectCol: String - ): DataFrame = { - val itemAmountRDD = df - .groupBy(groupByCol, selectCol) - .agg( - max($"amount").alias("maxAmount"), - max($"createTime").alias("maxCreateTime") - ) + def processByMonth( + df: DataFrame, + idCol: String, + timeCol: String, + newIdColName: String + ): DataFrame = { + val byMonthRDD = df + .withColumn( + "year_month", + date_format((col(timeCol) / 1000).cast("timestamp"), "yyyy-MM") + ) + .groupBy(idCol, "year_month") + .count() - val itemsRDD = itemAmountRDD - .groupBy(groupByCol) - .agg(collect_list(array(col(selectCol), $"maxAmount", $"maxCreateTime")).alias("items")) + val timestampedByMonthRDD = byMonthRDD + .withColumn( + "year_month_ts", + unix_timestamp(col("year_month"), "yyyy-MM") * 1000 + ) + .drop("year_month") - val accountItemsRDD = itemsRDD - .withColumn( - "items", - F.expr( - "transform(items, array -> concat('[', concat_ws(',', array), ']'))" - ) - ) - .withColumn( - "items", - F.concat(lit("["), F.concat_ws(",", $"items"), lit("]")) - ) + val pivotRDD = timestampedByMonthRDD + .groupBy(idCol) + .pivot("year_month_ts") + .agg(first("count")) + .na + .fill(0) + .withColumnRenamed(idCol, newIdColName) - accountItemsRDD - } + pivotRDD + } - def bucketAndCount( - df: DataFrame, - idCol: String, - amountCol: String, - groupCol: String - ): DataFrame = { - - val buckets = Array(10000, 30000, 100000, 300000, 1000000, 2000000, - 3000000, 4000000, 5000000, 6000000, 7000000, 8000000, 9000000, 10000000) - - val bucketedRDD = df.withColumn( - "bucket", - when(col(amountCol) <= buckets(0), buckets(0)) - .when(col(amountCol) <= buckets(1), buckets(1)) - .when(col(amountCol) <= buckets(2), buckets(2)) - .when(col(amountCol) <= buckets(3), buckets(3)) - .when(col(amountCol) <= buckets(4), buckets(4)) - .when(col(amountCol) <= buckets(5), buckets(5)) - .when(col(amountCol) <= buckets(6), buckets(6)) - .when(col(amountCol) <= buckets(7), buckets(7)) - .when(col(amountCol) <= buckets(8), buckets(8)) - .when(col(amountCol) <= buckets(9), buckets(9)) - .when(col(amountCol) <= buckets(10), buckets(10)) - .when(col(amountCol) <= buckets(11), buckets(11)) - .when(col(amountCol) <= buckets(12), buckets(12)) - .otherwise(buckets(13)) - ) + def factortables(args: Args)(implicit spark: SparkSession) = { +// import spark.implicits._ + log.info("[Main] Starting factoring stage") - val bucketCountsRDD = bucketedRDD - .groupBy(idCol) - .pivot("bucket", buckets.map(_.toString)) - .count() - .na - .fill(0) - .withColumnRenamed(idCol, groupCol) + val transferRDD = readCSV(s"${args.outputDir}/raw/transfer/*.csv") + .select( + col("fromId"), + col("toId"), + col("amount").cast("double"), + col("createTime") + ) - bucketCountsRDD - } + val withdrawRDD = readCSV(s"${args.outputDir}/raw/withdraw/*.csv") + .select( + col("fromId"), + col("toId"), + col("amount").cast("double"), + col("createTime") + ) - def processByMonth( - df: DataFrame, - idCol: String, - timeCol: String, - newIdColName: String - ): DataFrame = { - val byMonthRDD = df - .withColumn( - "year_month", - date_format((col(timeCol) / 1000).cast("timestamp"), "yyyy-MM") - ) - .groupBy(idCol, "year_month") - .count() + val depositRDD = readCSV(s"${args.outputDir}/raw/deposit/*.csv") + .select(col("accountId"), col("loanId")) - val timestampedByMonthRDD = byMonthRDD - .withColumn( - "year_month_ts", - unix_timestamp(col("year_month"), "yyyy-MM") * 1000 - ) - .drop("year_month") + val personInvestRDD = readCSV(s"${args.outputDir}/raw/personInvest/*.csv") + .select(col("investorId"), col("companyIdstorId"), col("createTime")) - val pivotRDD = timestampedByMonthRDD - .groupBy(idCol) - .pivot("year_month_ts") - .agg(first("count")) - .na - .fill(0) - .withColumnRenamed(idCol, newIdColName) + val ownRDD = readCSV(s"${args.outputDir}/raw/personOwnAccount/*.csv") + .select(col("personId"), col("accountId")) - pivotRDD - } + val personGuaranteeRDD = + readCSV(s"${args.outputDir}/raw/personGuarantee/*.csv") + .select(col("fromId"), col("toId"), col("createTime")) val PersonInvestCompanyRDD = personInvestRDD .groupBy("investorId") @@ -229,7 +222,12 @@ object FactorGenerationStage extends DatagenStage { .save(s"${args.outputDir}/factor_table/person_invest_company") val transferItRDD = - transferRDD.select($"fromId", $"toId", $"amount".cast("double"), $"createTime") + transferRDD.select( + col("fromId"), + col("toId"), + col("amount").cast("double"), + col("createTime") + ) val transferOutAccountItemsRDD = transformItems(transferItRDD, "fromId", "toId") @@ -254,14 +252,14 @@ object FactorGenerationStage extends DatagenStage { .save(s"${args.outputDir}/factor_table/account_transfer_in_items") val transferOutLRDD = transferItRDD - .select($"fromId", $"toId") - .groupBy($"fromId") - .agg(F.collect_list($"toId").alias("transfer_out_list")) - .select($"fromId".alias("account_id"), $"transfer_out_list") + .select(col("fromId"), col("toId")) + .groupBy(col("fromId")) + .agg(F.collect_list(col("toId")).alias("transfer_out_list")) + .select(col("fromId").alias("account_id"), col("transfer_out_list")) val transferOutListRDD = transferOutLRDD.withColumn( "transfer_out_list", - F.concat(lit("["), F.concat_ws(",", $"transfer_out_list"), lit("]")) + F.concat(lit("["), F.concat_ws(",", col("transfer_out_list")), lit("]")) ) transferOutListRDD @@ -273,14 +271,14 @@ object FactorGenerationStage extends DatagenStage { .save(s"${args.outputDir}/factor_table/account_transfer_out_list") val fromAccounts = transferRDD.select( - $"fromId".alias("account_id"), - $"toId".alias("corresponding_account_id"), - $"createTime" + col("fromId").alias("account_id"), + col("toId").alias("corresponding_account_id"), + col("createTime") ) val toAccounts = transferRDD.select( - $"toId".alias("account_id"), - $"fromId".alias("corresponding_account_id"), - $"createTime" + col("toId").alias("account_id"), + col("fromId").alias("corresponding_account_id"), + col("createTime") ) val allAccounts = fromAccounts.union(toAccounts) @@ -290,7 +288,7 @@ object FactorGenerationStage extends DatagenStage { .agg(collect_set("corresponding_account_id").alias("account_list")) val accountCountDF = accountListRDD - .select($"account_id", F.size($"account_list").alias("sum")) + .select(col("account_id"), F.size(col("account_list")).alias("sum")) accountCountDF.write .option("header", "true") @@ -300,7 +298,7 @@ object FactorGenerationStage extends DatagenStage { val transferAccountListRDD = accountListRDD.withColumn( "account_list", - F.concat(lit("["), F.concat_ws(",", $"account_list"), lit("]")) + F.concat(lit("["), F.concat_ws(",", col("account_list")), lit("]")) ) transferAccountListRDD @@ -390,7 +388,12 @@ object FactorGenerationStage extends DatagenStage { .save(s"${args.outputDir}/factor_table/trans_withdraw_month") val withdrawInRDD = - withdrawRDD.select($"fromId", $"toId", $"amount".cast("double"), $"createTime") + withdrawRDD.select( + col("fromId"), + col("toId"), + col("amount").cast("double"), + col("createTime") + ) val combinedRDD = transferItRDD.union(withdrawInRDD) val transformedAccountItemsRDD = @@ -415,15 +418,15 @@ object FactorGenerationStage extends DatagenStage { .save(s"${args.outputDir}/factor_table/account_withdraw_in_items") val transferOutAmountRDD = - transferItRDD.select($"fromId", $"amount".cast("double")) + transferItRDD.select(col("fromId"), col("amount").cast("double")) val transferInAmountRDD = - transferItRDD.select($"toId", $"amount".cast("double")) + transferItRDD.select(col("toId"), col("amount").cast("double")) val transactionsAmountRDD = transferOutAmountRDD .union(withdrawRDD.select(col("fromId"), col("amount").cast("double"))) val withdrawInBucketAmountRDD = - withdrawInRDD.select($"toId", $"amount".cast("double")) + withdrawInRDD.select(col("toId"), col("amount").cast("double")) val bucketCountsRDD = bucketAndCount(transactionsAmountRDD, "fromId", "amount", "account_id") @@ -460,8 +463,8 @@ object FactorGenerationStage extends DatagenStage { .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") .save(s"${args.outputDir}/factor_table/transfer_out_bucket") - val PersonOwnAccountRDD = OwnRDD - .select($"personId", $"accountId") + val PersonOwnAccountRDD = ownRDD + .select(col("personId"), col("accountId")) .groupBy("personId") .agg(coalesce(collect_set("accountId"), array()).alias("account_list")) .select( @@ -477,17 +480,17 @@ object FactorGenerationStage extends DatagenStage { .save(s"${args.outputDir}/factor_table/person_account_list") val PersonGuaranteeListRDD = personGuaranteeRDD - .select($"fromId", $"toId") + .select(col("fromId"), col("toId")) .groupBy("fromId") .agg(coalesce(collect_set("toId"), array()).alias("guaranteee_list")) val PersonGuaranteePersonRDD = PersonGuaranteeListRDD.withColumn( "guaranteee_list", - F.concat(lit("["), F.concat_ws(",", $"guaranteee_list"), lit("]")) + F.concat(lit("["), F.concat_ws(",", col("guaranteee_list")), lit("]")) ) val PersonGuaranteeCount = PersonGuaranteeListRDD - .select($"fromId", F.size($"guaranteee_list").alias("sum")) + .select(col("fromId"), F.size(col("guaranteee_list")).alias("sum")) PersonGuaranteePersonRDD.write .option("header", "true") @@ -533,6 +536,5 @@ object FactorGenerationStage extends DatagenStage { .option("delimiter", "|") .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat") .save(s"${args.outputDir}/factor_table/upstream_amount") - } } diff --git a/src/main/scala/ldbc/finbench/datagen/generation/serializers/ActivitySerializer.scala b/src/main/scala/ldbc/finbench/datagen/generation/ActivitySerializer.scala similarity index 99% rename from src/main/scala/ldbc/finbench/datagen/generation/serializers/ActivitySerializer.scala rename to src/main/scala/ldbc/finbench/datagen/generation/ActivitySerializer.scala index d69b32ed..1416967e 100644 --- a/src/main/scala/ldbc/finbench/datagen/generation/serializers/ActivitySerializer.scala +++ b/src/main/scala/ldbc/finbench/datagen/generation/ActivitySerializer.scala @@ -1,4 +1,4 @@ -package ldbc.finbench.datagen.generation.serializers +package ldbc.finbench.datagen.generation import ldbc.finbench.datagen.entities.edges._ import ldbc.finbench.datagen.entities.nodes._ diff --git a/src/main/scala/ldbc/finbench/datagen/generation/ActivitySimulator.scala b/src/main/scala/ldbc/finbench/datagen/generation/ActivitySimulator.scala index c3b4fc7f..bbcea2b9 100644 --- a/src/main/scala/ldbc/finbench/datagen/generation/ActivitySimulator.scala +++ b/src/main/scala/ldbc/finbench/datagen/generation/ActivitySimulator.scala @@ -3,7 +3,6 @@ package ldbc.finbench.datagen.generation import ldbc.finbench.datagen.config.DatagenConfiguration import ldbc.finbench.datagen.entities.nodes._ import ldbc.finbench.datagen.generation.generators.{ActivityGenerator, SparkCompanyGenerator, SparkMediumGenerator, SparkPersonGenerator} -import ldbc.finbench.datagen.generation.serializers.ActivitySerializer import ldbc.finbench.datagen.io.Writer import ldbc.finbench.datagen.io.raw.RawSink import ldbc.finbench.datagen.util.Logging diff --git a/src/main/scala/ldbc/finbench/datagen/generation/GenerationStage.scala b/src/main/scala/ldbc/finbench/datagen/generation/GenerationStage.scala index 3283a7ac..72d4c6b2 100644 --- a/src/main/scala/ldbc/finbench/datagen/generation/GenerationStage.scala +++ b/src/main/scala/ldbc/finbench/datagen/generation/GenerationStage.scala @@ -1,5 +1,6 @@ package ldbc.finbench.datagen.generation +import ldbc.finbench.datagen.LdbcDatagen.log import ldbc.finbench.datagen.config.{ConfigParser, DatagenConfiguration} import ldbc.finbench.datagen.io.raw.{Csv, Parquet, RawSink} import ldbc.finbench.datagen.util._ @@ -25,6 +26,7 @@ object GenerationStage extends DatagenStage with Logging { override type ArgsType = Args override def run(args: Args): Unit = { + log.info("[Main] Starting generation stage") // build and initialize the configs val config = buildConfig(args) // OPT: It is called in each SparkGenerator in Spark to initialize the context on the executors. diff --git a/src/main/scala/ldbc/finbench/datagen/transformation/TransformationStage.scala b/src/main/scala/ldbc/finbench/datagen/transformation/TransformationStage.scala deleted file mode 100644 index d58dc6e8..00000000 --- a/src/main/scala/ldbc/finbench/datagen/transformation/TransformationStage.scala +++ /dev/null @@ -1,204 +0,0 @@ -package ldbc.finbench.datagen.transformation - -import ldbc.finbench.datagen.generation.DatagenParams -import ldbc.finbench.datagen.generation.dictionary.Dictionaries -import ldbc.finbench.datagen.util.sql.qcol -import ldbc.finbench.datagen.util.{DatagenStage, Logging} -import ldbc.finbench.datagen.syntax._ -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession} -import org.apache.spark.sql.functions.{ - col, - date_format, - date_trunc, - from_unixtime, - lit, - to_timestamp -} -import scopt.OptionParser -import shapeless.lens - -// Note: transformation is not used now. Data conversion is done by python scripts. -object TransformationStage extends DatagenStage with Logging { - private val options: Map[String, String] = - Map("header" -> "true", "delimiter" -> "|") - - case class Args( - outputDir: String = "out", - bulkloadPortion: Double = 0.0, - keepImplicitDeletes: Boolean = false, - simulationStart: Long = 0, - simulationEnd: Long = 0, - irFormat: String = "csv", - format: String = "csv", - formatOptions: Map[String, String] = Map.empty, - epochMillis: Boolean = false, - batchPeriod: String = "day" - ) - - override type ArgsType = Args - - def main(args: Array[String]): Unit = { - val parser = new OptionParser[Args](getClass.getName.dropRight(1)) { - head(appName) - - val args = lens[Args] - - opt[String]('o', "output-dir") - .action((x, c) => args.outputDir.set(c)(x)) - .text( - "path on the cluster filesystem, where Datagen outputs. Can be a URI (e.g S3, ADLS, HDFS) or a " + - "path in which case the default cluster file system is used." - ) - - opt[String]("ir-format") - .action((x, c) => args.irFormat.set(c)(x)) - .text("Format of the raw input") - - opt[String]("format") - .action((x, c) => args.format.set(c)(x)) - .text("Output format") - - help('h', "help").text("prints this usage text") - } - val parsedArgs = - parser - .parse(args, Args()) - .getOrElse(throw new RuntimeException("Invalid arguments")) - - run(parsedArgs) - } - - // execute the transform process - override def run(args: Args): Unit = { - - val rawPathPrefix = args.outputDir / "raw" - val outputPathPrefix = args.outputDir / "history_data" - - val filterDeletion = false - - val simulationStart = Dictionaries.dates.getSimulationStart - val simulationEnd = Dictionaries.dates.getSimulationEnd - val bulkLoadThreshold = calculateBulkLoadThreshold( - args.bulkloadPortion, - simulationStart, - simulationEnd - ) - - // val batch_id = (col: Column) => date_format(date_trunc(args.batchPeriod, to_timestamp(col / lit(1000L))), batchPeriodFormat(args.batchPeriod)) - // - // def inBatch(col: Column, batchStart: Long, batchEnd: Long) = - // col >= lit(batchStart) && col < lit(batchEnd) - // - // val batched = (df: DataFrame) => - // df - // .select( - // df.columns.map(qcol) ++ Seq( - // batch_id($"creationDate").as("insert_batch_id"), - // batch_id($"deletionDate").as("delete_batch_id") - // ): _* - // ) - // - // val insertBatchPart = (tpe: EntityType, df: DataFrame, batchStart: Long, batchEnd: Long) => { - // df - // .filter(inBatch($"creationDate", batchStart, batchEnd)) - // .pipe(batched) - // .select( - // Seq($"insert_batch_id".as("batch_id")) ++ columns(tpe, df.columns).map(qcol): _* - // ) - // } - // - // val deleteBatchPart = (tpe: EntityType, df: DataFrame, batchStart: Long, batchEnd: Long) => { - // val idColumns = tpe.primaryKey.map(qcol) - // df - // .filter(inBatch($"deletionDate", batchStart, batchEnd)) - // .filter(if (df.columns.contains("explicitlyDeleted")) col("explicitlyDeleted") else lit(true)) - // .pipe(batched) - // .select(Seq($"delete_batch_id".as("batch_id"), $"deletionDate") ++ idColumns: _*) - // } - - val readRaw = (target: String) => { - spark.read - .format(args.irFormat) - .options(options) - .option("inferSchema", "true") - .load(s"$rawPathPrefix/$target/*.csv") - } - - val extractSnapshot = (df: DataFrame) => { - df.filter( - $"creationDate" < lit(bulkLoadThreshold) - && (!lit(filterDeletion) || $"deletionDate" >= lit(bulkLoadThreshold)) - ) - // .select(_: _*) - } - - val transferSnapshot = extractSnapshot(readRaw("transfer")) - // .select("fromId", "toId", "multiplicityId", "createTime", "deleteTime", "amount", "isExplicitDeleted") - // .map(extractSnapshot) - .withColumn( - "createTime", - from_unixtime( - col("createTime") / 1000, - batchPeriodFormat(args.batchPeriod) - ) - ) - .withColumn( - "deleteTime", - from_unixtime( - col("deleteTime") / 1000, - batchPeriodFormat(args.batchPeriod) - ) - ) - .orderBy("createTime", "deleteTime") - write(transferSnapshot, (outputPathPrefix / "transfer").toString) - - // val accountDf = spark.read.format("csv") - // .option("header", "true") - // .option("delimiter", "|") - // .load("./out/account/part-00000-4b0e57cb-23bb-447f-89f1-e7e71a4ee017-c000.csv") - // - // transferDf.join(accountDf, transferDf("fromId") === accountDf("id"), "left") - // .select() - } - - // def columns(tpe: EntityType, cols: Seq[String]) = tpe match { - // case tpe if tpe.isStatic => cols - // case Edge("Knows", PersonType, PersonType, NN, false, _, _) => - // val rawCols = Set("deletionDate", "explicitlyDeleted", "weight") - // cols.filter(!rawCols.contains(_)) - // case _ => - // val rawCols = Set("deletionDate", "explicitlyDeleted") - // cols.filter(!rawCols.contains(_)) - // } - private def write(data: DataFrame, path: String): Unit = { - data - .toDF() - .coalesce(1) - .write - .format("csv") - .options(options) - .option("encoding", "UTF-8") - .mode("overwrite") - .save(path) - } - - private def calculateBulkLoadThreshold( - bulkLoadPortion: Double, - simulationStart: Long, - simulationEnd: Long - ) = { - (simulationEnd - ((simulationEnd - simulationStart) * (1 - bulkLoadPortion)).toLong) - } - - private def batchPeriodFormat(batchPeriod: String) = batchPeriod match { - case "year" => "yyyy" - case "month" => "yyyy-MM" - case "day" => "yyyy-MM-dd" - case "hour" => "yyyy-MM-dd'T'hh" - case "minute" => "yyyy-MM-dd'T'hh:mm" - case "second" => "yyyy-MM-dd'T'hh:mm:ss" - case "millisecond" => "yyyy-MM-dd'T'hh:mm:ss.SSS" - case _ => throw new IllegalArgumentException("Unrecognized partition key") - } -} diff --git a/tools/DataProfiler/result/db139/profile.log b/tools/DataProfiler/result/db139/profile.log deleted file mode 100644 index 1ce23716..00000000 --- a/tools/DataProfiler/result/db139/profile.log +++ /dev/null @@ -1,4 +0,0 @@ -Run degree profiling. -load_cost = 547.45(s) -exec_cost = 897.82(s) -total_cost = 1445.27(s) diff --git a/tools/DataProfiler/result/db177/edges.log b/tools/DataProfiler/result/db177/edges.log deleted file mode 100644 index f189d9ae..00000000 --- a/tools/DataProfiler/result/db177/edges.log +++ /dev/null @@ -1,2 +0,0 @@ -V 242919058, E 508196712, E/V 2.09204 -Unique edges: 2.58058e+08 / 508196712, Multiplicity: 1.96931 diff --git a/tools/DataProfiler/result/db177/profile.log b/tools/DataProfiler/result/db177/profile.log deleted file mode 100644 index d702927a..00000000 --- a/tools/DataProfiler/result/db177/profile.log +++ /dev/null @@ -1,4 +0,0 @@ -Run degree profiling. -load_cost = 418.57(s) -exec_cost = 1180.35(s) -total_cost = 1598.92(s) diff --git a/tools/DataProfiler/result/db184/profile.log b/tools/DataProfiler/result/db184/profile.log deleted file mode 100644 index 21a09254..00000000 --- a/tools/DataProfiler/result/db184/profile.log +++ /dev/null @@ -1,4 +0,0 @@ -Run degree profiling. -load_cost = 377.06(s) -exec_cost = 713.83(s) -total_cost = 1090.90(s) diff --git a/tools/DataProfiler/result/transfer/edges.log b/tools/DataProfiler/result/transfer/edges.log deleted file mode 100644 index 71307300..00000000 --- a/tools/DataProfiler/result/transfer/edges.log +++ /dev/null @@ -1,2 +0,0 @@ -V 181123257, E 527220984, E/V 2.91084 -Unique edges: 2.52555e+08 / 527220984, Multiplicity: 2.08755 diff --git a/tools/DataProfiler/result/transfer/profile.log b/tools/DataProfiler/result/transfer/profile.log deleted file mode 100644 index 0b7c6899..00000000 --- a/tools/DataProfiler/result/transfer/profile.log +++ /dev/null @@ -1,4 +0,0 @@ -Run degree profiling. -load_cost = 399.40(s) -exec_cost = 893.24(s) -total_cost = 1292.64(s) diff --git a/tools/README.md b/tools/README.md index 352d8d7c..73177dda 100644 --- a/tools/README.md +++ b/tools/README.md @@ -1,6 +1,62 @@ # Tools -Here lists some tools for graph data processing. -- dataprofiler: a tool for profiling graph data, including degree distribution, etc. -- graphgen: a simple tool/example code to generate power-law distributed graph data. -- paramgen: a Parameter Search tool to generate parameters for queries using TuGraph. \ No newline at end of file +- paramgen: + - parameter_curation: a tool for generating parameters for finbench queries +- check_*.py: python scripts used for check the data features like consistency, distribution +- merge_cluster_output.py: a python script to merge the output in cluster mode +- statistic.py: a python script to calculate the statistics of the data +- legacy: some legacy tools + - dataprofiler: a tool for profiling graph data, including degree distribution, etc. + - graphgen: a simple tool/example code to generate power-law distributed graph data. + - factorgen: factor table generators in python version + + +## ParamsGen + +`params_gen.py` uses the CREATE_VALIDATION feature to generate parameters. + +The specific steps are as follows: + +1. Select vertices of type Account, Person, and Loan from the dataset, and generate a parameter file that meets the input specifications for ldbc_finbench_driver. +2. Execute CREATE_VALIDATION to generate validation_params.csv. +3. Select non-empty results from validation_params.csv. + +Example: + +```bash +python3 params_gen.py 1 # gen tcr1 params +``` + +Other notes: + +1. The generated start_timestamp and end_timestamp in the current version are fixed values. +2. For tcr4 and tcr10, this method is not efficient enough. Use the following Cypher query to search for parameters: + +```Cypher +// tcr4 +MATCH + (n1:Account)-[:transfer]-> + (n2:Account)-[:transfer]-> + (n3:Account)-[:transfer]->(n4:Account) +WHERE + n1.id = n4.id AND n1.id > n2.id AND n2.id > n3.id +WITH + n1.id as n1id, + n2.id as n2id, + n3.id as n3id, + n4.id as n4id +LIMIT 1000 +RETURN DISTINCT toString(n1id)+"|"+toString(n2id) + +// tcr10 +MATCH + (c:Company)<-[:invest]-(p:Person) +WITH + c.id as cid, + count(p.id) as num, + collect(p.id) as person +WHERE num >= 2 +RETURN + tostring(person[0])+"|"+tostring(person[1]) +LIMIT 1000 +``` diff --git a/tools/DataProfiler/.gitignore b/tools/legacy/dataprofiler/.gitignore similarity index 100% rename from tools/DataProfiler/.gitignore rename to tools/legacy/dataprofiler/.gitignore diff --git a/tools/DataProfiler/CMakeLists.txt b/tools/legacy/dataprofiler/CMakeLists.txt similarity index 100% rename from tools/DataProfiler/CMakeLists.txt rename to tools/legacy/dataprofiler/CMakeLists.txt diff --git a/tools/DataProfiler/README.md b/tools/legacy/dataprofiler/README.md similarity index 100% rename from tools/DataProfiler/README.md rename to tools/legacy/dataprofiler/README.md diff --git a/tools/DataProfiler/algo.h b/tools/legacy/dataprofiler/algo.h similarity index 100% rename from tools/DataProfiler/algo.h rename to tools/legacy/dataprofiler/algo.h diff --git a/tools/DataProfiler/compile.sh b/tools/legacy/dataprofiler/compile.sh similarity index 100% rename from tools/DataProfiler/compile.sh rename to tools/legacy/dataprofiler/compile.sh diff --git a/tools/DataProfiler/de_core.cpp b/tools/legacy/dataprofiler/de_core.cpp similarity index 100% rename from tools/DataProfiler/de_core.cpp rename to tools/legacy/dataprofiler/de_core.cpp diff --git a/tools/DataProfiler/plot.py b/tools/legacy/dataprofiler/plot.py similarity index 100% rename from tools/DataProfiler/plot.py rename to tools/legacy/dataprofiler/plot.py diff --git a/tools/DataProfiler/profiler.cpp b/tools/legacy/dataprofiler/profiler.cpp similarity index 100% rename from tools/DataProfiler/profiler.cpp rename to tools/legacy/dataprofiler/profiler.cpp diff --git a/tools/DataProfiler/result/db139/edges.txt b/tools/legacy/dataprofiler/result/db139/edges.txt similarity index 100% rename from tools/DataProfiler/result/db139/edges.txt rename to tools/legacy/dataprofiler/result/db139/edges.txt diff --git a/tools/DataProfiler/result/db139/in-out.txt b/tools/legacy/dataprofiler/result/db139/in-out.txt similarity index 100% rename from tools/DataProfiler/result/db139/in-out.txt rename to tools/legacy/dataprofiler/result/db139/in-out.txt diff --git a/tools/DataProfiler/result/db139/in_degree_dist.png b/tools/legacy/dataprofiler/result/db139/in_degree_dist.png similarity index 100% rename from tools/DataProfiler/result/db139/in_degree_dist.png rename to tools/legacy/dataprofiler/result/db139/in_degree_dist.png diff --git a/tools/DataProfiler/result/db139/in_degree_dist.txt b/tools/legacy/dataprofiler/result/db139/in_degree_dist.txt similarity index 100% rename from tools/DataProfiler/result/db139/in_degree_dist.txt rename to tools/legacy/dataprofiler/result/db139/in_degree_dist.txt diff --git a/tools/DataProfiler/result/db139/in_degree_dist_regression.png b/tools/legacy/dataprofiler/result/db139/in_degree_dist_regression.png similarity index 100% rename from tools/DataProfiler/result/db139/in_degree_dist_regression.png rename to tools/legacy/dataprofiler/result/db139/in_degree_dist_regression.png diff --git a/tools/DataProfiler/result/db139/in_degree_dist_regression.txt b/tools/legacy/dataprofiler/result/db139/in_degree_dist_regression.txt similarity index 100% rename from tools/DataProfiler/result/db139/in_degree_dist_regression.txt rename to tools/legacy/dataprofiler/result/db139/in_degree_dist_regression.txt diff --git a/tools/DataProfiler/result/db139/out-in.txt b/tools/legacy/dataprofiler/result/db139/out-in.txt similarity index 100% rename from tools/DataProfiler/result/db139/out-in.txt rename to tools/legacy/dataprofiler/result/db139/out-in.txt diff --git a/tools/DataProfiler/result/db139/out_degree_dist.png b/tools/legacy/dataprofiler/result/db139/out_degree_dist.png similarity index 100% rename from tools/DataProfiler/result/db139/out_degree_dist.png rename to tools/legacy/dataprofiler/result/db139/out_degree_dist.png diff --git a/tools/DataProfiler/result/db139/out_degree_dist.txt b/tools/legacy/dataprofiler/result/db139/out_degree_dist.txt similarity index 100% rename from tools/DataProfiler/result/db139/out_degree_dist.txt rename to tools/legacy/dataprofiler/result/db139/out_degree_dist.txt diff --git a/tools/DataProfiler/result/db139/out_degree_dist_regression.png b/tools/legacy/dataprofiler/result/db139/out_degree_dist_regression.png similarity index 100% rename from tools/DataProfiler/result/db139/out_degree_dist_regression.png rename to tools/legacy/dataprofiler/result/db139/out_degree_dist_regression.png diff --git a/tools/DataProfiler/result/db139/out_degree_dist_regression.txt b/tools/legacy/dataprofiler/result/db139/out_degree_dist_regression.txt similarity index 100% rename from tools/DataProfiler/result/db139/out_degree_dist_regression.txt rename to tools/legacy/dataprofiler/result/db139/out_degree_dist_regression.txt diff --git a/tools/DataProfiler/result/db177/in-out.txt b/tools/legacy/dataprofiler/result/db177/in-out.txt similarity index 100% rename from tools/DataProfiler/result/db177/in-out.txt rename to tools/legacy/dataprofiler/result/db177/in-out.txt diff --git a/tools/DataProfiler/result/db177/in_degree_dist.png b/tools/legacy/dataprofiler/result/db177/in_degree_dist.png similarity index 100% rename from tools/DataProfiler/result/db177/in_degree_dist.png rename to tools/legacy/dataprofiler/result/db177/in_degree_dist.png diff --git a/tools/DataProfiler/result/db177/in_degree_dist.txt b/tools/legacy/dataprofiler/result/db177/in_degree_dist.txt similarity index 100% rename from tools/DataProfiler/result/db177/in_degree_dist.txt rename to tools/legacy/dataprofiler/result/db177/in_degree_dist.txt diff --git a/tools/DataProfiler/result/db177/in_degree_dist_regression.png b/tools/legacy/dataprofiler/result/db177/in_degree_dist_regression.png similarity index 100% rename from tools/DataProfiler/result/db177/in_degree_dist_regression.png rename to tools/legacy/dataprofiler/result/db177/in_degree_dist_regression.png diff --git a/tools/DataProfiler/result/db177/in_degree_dist_regression.txt b/tools/legacy/dataprofiler/result/db177/in_degree_dist_regression.txt similarity index 100% rename from tools/DataProfiler/result/db177/in_degree_dist_regression.txt rename to tools/legacy/dataprofiler/result/db177/in_degree_dist_regression.txt diff --git a/tools/DataProfiler/result/db177/out-in.txt b/tools/legacy/dataprofiler/result/db177/out-in.txt similarity index 100% rename from tools/DataProfiler/result/db177/out-in.txt rename to tools/legacy/dataprofiler/result/db177/out-in.txt diff --git a/tools/DataProfiler/result/db177/out_degree_dist.png b/tools/legacy/dataprofiler/result/db177/out_degree_dist.png similarity index 100% rename from tools/DataProfiler/result/db177/out_degree_dist.png rename to tools/legacy/dataprofiler/result/db177/out_degree_dist.png diff --git a/tools/DataProfiler/result/db177/out_degree_dist.txt b/tools/legacy/dataprofiler/result/db177/out_degree_dist.txt similarity index 100% rename from tools/DataProfiler/result/db177/out_degree_dist.txt rename to tools/legacy/dataprofiler/result/db177/out_degree_dist.txt diff --git a/tools/DataProfiler/result/db177/out_degree_dist_regression.png b/tools/legacy/dataprofiler/result/db177/out_degree_dist_regression.png similarity index 100% rename from tools/DataProfiler/result/db177/out_degree_dist_regression.png rename to tools/legacy/dataprofiler/result/db177/out_degree_dist_regression.png diff --git a/tools/DataProfiler/result/db177/out_degree_dist_regression.txt b/tools/legacy/dataprofiler/result/db177/out_degree_dist_regression.txt similarity index 100% rename from tools/DataProfiler/result/db177/out_degree_dist_regression.txt rename to tools/legacy/dataprofiler/result/db177/out_degree_dist_regression.txt diff --git a/tools/DataProfiler/result/db184/edges.txt b/tools/legacy/dataprofiler/result/db184/edges.txt similarity index 100% rename from tools/DataProfiler/result/db184/edges.txt rename to tools/legacy/dataprofiler/result/db184/edges.txt diff --git a/tools/DataProfiler/result/db184/in-out.txt b/tools/legacy/dataprofiler/result/db184/in-out.txt similarity index 100% rename from tools/DataProfiler/result/db184/in-out.txt rename to tools/legacy/dataprofiler/result/db184/in-out.txt diff --git a/tools/DataProfiler/result/db184/in_degree_dist.png b/tools/legacy/dataprofiler/result/db184/in_degree_dist.png similarity index 100% rename from tools/DataProfiler/result/db184/in_degree_dist.png rename to tools/legacy/dataprofiler/result/db184/in_degree_dist.png diff --git a/tools/DataProfiler/result/db184/in_degree_dist.txt b/tools/legacy/dataprofiler/result/db184/in_degree_dist.txt similarity index 100% rename from tools/DataProfiler/result/db184/in_degree_dist.txt rename to tools/legacy/dataprofiler/result/db184/in_degree_dist.txt diff --git a/tools/DataProfiler/result/db184/in_degree_dist_regression.png b/tools/legacy/dataprofiler/result/db184/in_degree_dist_regression.png similarity index 100% rename from tools/DataProfiler/result/db184/in_degree_dist_regression.png rename to tools/legacy/dataprofiler/result/db184/in_degree_dist_regression.png diff --git a/tools/DataProfiler/result/db184/in_degree_dist_regression.txt b/tools/legacy/dataprofiler/result/db184/in_degree_dist_regression.txt similarity index 100% rename from tools/DataProfiler/result/db184/in_degree_dist_regression.txt rename to tools/legacy/dataprofiler/result/db184/in_degree_dist_regression.txt diff --git a/tools/DataProfiler/result/db184/out-in.txt b/tools/legacy/dataprofiler/result/db184/out-in.txt similarity index 100% rename from tools/DataProfiler/result/db184/out-in.txt rename to tools/legacy/dataprofiler/result/db184/out-in.txt diff --git a/tools/DataProfiler/result/db184/out_degree_dist.png b/tools/legacy/dataprofiler/result/db184/out_degree_dist.png similarity index 100% rename from tools/DataProfiler/result/db184/out_degree_dist.png rename to tools/legacy/dataprofiler/result/db184/out_degree_dist.png diff --git a/tools/DataProfiler/result/db184/out_degree_dist.txt b/tools/legacy/dataprofiler/result/db184/out_degree_dist.txt similarity index 100% rename from tools/DataProfiler/result/db184/out_degree_dist.txt rename to tools/legacy/dataprofiler/result/db184/out_degree_dist.txt diff --git a/tools/DataProfiler/result/db184/out_degree_dist_regression.png b/tools/legacy/dataprofiler/result/db184/out_degree_dist_regression.png similarity index 100% rename from tools/DataProfiler/result/db184/out_degree_dist_regression.png rename to tools/legacy/dataprofiler/result/db184/out_degree_dist_regression.png diff --git a/tools/DataProfiler/result/db184/out_degree_dist_regression.txt b/tools/legacy/dataprofiler/result/db184/out_degree_dist_regression.txt similarity index 100% rename from tools/DataProfiler/result/db184/out_degree_dist_regression.txt rename to tools/legacy/dataprofiler/result/db184/out_degree_dist_regression.txt diff --git a/tools/DataProfiler/result/hubvertex_indeg/hub_indeg_1.png b/tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_1.png similarity index 100% rename from tools/DataProfiler/result/hubvertex_indeg/hub_indeg_1.png rename to tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_1.png diff --git a/tools/DataProfiler/result/hubvertex_indeg/hub_indeg_1.txt b/tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_1.txt similarity index 100% rename from tools/DataProfiler/result/hubvertex_indeg/hub_indeg_1.txt rename to tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_1.txt diff --git a/tools/DataProfiler/result/hubvertex_indeg/hub_indeg_1_regression.png b/tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_1_regression.png similarity index 100% rename from tools/DataProfiler/result/hubvertex_indeg/hub_indeg_1_regression.png rename to tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_1_regression.png diff --git a/tools/DataProfiler/result/hubvertex_indeg/hub_indeg_1_regression.txt b/tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_1_regression.txt similarity index 100% rename from tools/DataProfiler/result/hubvertex_indeg/hub_indeg_1_regression.txt rename to tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_1_regression.txt diff --git a/tools/DataProfiler/result/hubvertex_indeg/hub_indeg_2.png b/tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_2.png similarity index 100% rename from tools/DataProfiler/result/hubvertex_indeg/hub_indeg_2.png rename to tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_2.png diff --git a/tools/DataProfiler/result/hubvertex_indeg/hub_indeg_2.txt b/tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_2.txt similarity index 100% rename from tools/DataProfiler/result/hubvertex_indeg/hub_indeg_2.txt rename to tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_2.txt diff --git a/tools/DataProfiler/result/hubvertex_indeg/hub_indeg_2_regression.png b/tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_2_regression.png similarity index 100% rename from tools/DataProfiler/result/hubvertex_indeg/hub_indeg_2_regression.png rename to tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_2_regression.png diff --git a/tools/DataProfiler/result/hubvertex_indeg/hub_indeg_2_regression.txt b/tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_2_regression.txt similarity index 100% rename from tools/DataProfiler/result/hubvertex_indeg/hub_indeg_2_regression.txt rename to tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_2_regression.txt diff --git a/tools/DataProfiler/result/hubvertex_indeg/hub_indeg_3.png b/tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_3.png similarity index 100% rename from tools/DataProfiler/result/hubvertex_indeg/hub_indeg_3.png rename to tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_3.png diff --git a/tools/DataProfiler/result/hubvertex_indeg/hub_indeg_3.txt b/tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_3.txt similarity index 100% rename from tools/DataProfiler/result/hubvertex_indeg/hub_indeg_3.txt rename to tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_3.txt diff --git a/tools/DataProfiler/result/hubvertex_indeg/hub_indeg_3_regression.png b/tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_3_regression.png similarity index 100% rename from tools/DataProfiler/result/hubvertex_indeg/hub_indeg_3_regression.png rename to tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_3_regression.png diff --git a/tools/DataProfiler/result/hubvertex_indeg/hub_indeg_3_regression.txt b/tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_3_regression.txt similarity index 100% rename from tools/DataProfiler/result/hubvertex_indeg/hub_indeg_3_regression.txt rename to tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_3_regression.txt diff --git a/tools/DataProfiler/result/hubvertex_indeg/hub_indeg_4.png b/tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_4.png similarity index 100% rename from tools/DataProfiler/result/hubvertex_indeg/hub_indeg_4.png rename to tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_4.png diff --git a/tools/DataProfiler/result/hubvertex_indeg/hub_indeg_4.txt b/tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_4.txt similarity index 100% rename from tools/DataProfiler/result/hubvertex_indeg/hub_indeg_4.txt rename to tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_4.txt diff --git a/tools/DataProfiler/result/hubvertex_indeg/hub_indeg_4_regression.png b/tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_4_regression.png similarity index 100% rename from tools/DataProfiler/result/hubvertex_indeg/hub_indeg_4_regression.png rename to tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_4_regression.png diff --git a/tools/DataProfiler/result/hubvertex_indeg/hub_indeg_4_regression.txt b/tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_4_regression.txt similarity index 100% rename from tools/DataProfiler/result/hubvertex_indeg/hub_indeg_4_regression.txt rename to tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_4_regression.txt diff --git a/tools/DataProfiler/result/hubvertex_indeg/hub_indeg_5.png b/tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_5.png similarity index 100% rename from tools/DataProfiler/result/hubvertex_indeg/hub_indeg_5.png rename to tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_5.png diff --git a/tools/DataProfiler/result/hubvertex_indeg/hub_indeg_5.txt b/tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_5.txt similarity index 100% rename from tools/DataProfiler/result/hubvertex_indeg/hub_indeg_5.txt rename to tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_5.txt diff --git a/tools/DataProfiler/result/hubvertex_indeg/hub_indeg_5_regression.png b/tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_5_regression.png similarity index 100% rename from tools/DataProfiler/result/hubvertex_indeg/hub_indeg_5_regression.png rename to tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_5_regression.png diff --git a/tools/DataProfiler/result/hubvertex_indeg/hub_indeg_5_regression.txt b/tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_5_regression.txt similarity index 100% rename from tools/DataProfiler/result/hubvertex_indeg/hub_indeg_5_regression.txt rename to tools/legacy/dataprofiler/result/hubvertex_indeg/hub_indeg_5_regression.txt diff --git a/tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_1.png b/tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_1.png similarity index 100% rename from tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_1.png rename to tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_1.png diff --git a/tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_1.txt b/tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_1.txt similarity index 100% rename from tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_1.txt rename to tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_1.txt diff --git a/tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_1_regression.png b/tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_1_regression.png similarity index 100% rename from tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_1_regression.png rename to tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_1_regression.png diff --git a/tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_1_regression.txt b/tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_1_regression.txt similarity index 100% rename from tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_1_regression.txt rename to tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_1_regression.txt diff --git a/tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_2.png b/tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_2.png similarity index 100% rename from tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_2.png rename to tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_2.png diff --git a/tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_2.txt b/tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_2.txt similarity index 100% rename from tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_2.txt rename to tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_2.txt diff --git a/tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_2_regression.png b/tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_2_regression.png similarity index 100% rename from tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_2_regression.png rename to tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_2_regression.png diff --git a/tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_2_regression.txt b/tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_2_regression.txt similarity index 100% rename from tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_2_regression.txt rename to tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_2_regression.txt diff --git a/tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_3.png b/tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_3.png similarity index 100% rename from tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_3.png rename to tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_3.png diff --git a/tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_3.txt b/tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_3.txt similarity index 100% rename from tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_3.txt rename to tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_3.txt diff --git a/tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_3_regression.png b/tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_3_regression.png similarity index 100% rename from tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_3_regression.png rename to tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_3_regression.png diff --git a/tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_3_regression.txt b/tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_3_regression.txt similarity index 100% rename from tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_3_regression.txt rename to tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_3_regression.txt diff --git a/tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_4.png b/tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_4.png similarity index 100% rename from tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_4.png rename to tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_4.png diff --git a/tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_4.txt b/tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_4.txt similarity index 100% rename from tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_4.txt rename to tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_4.txt diff --git a/tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_4_regression.png b/tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_4_regression.png similarity index 100% rename from tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_4_regression.png rename to tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_4_regression.png diff --git a/tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_4_regression.txt b/tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_4_regression.txt similarity index 100% rename from tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_4_regression.txt rename to tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_4_regression.txt diff --git a/tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_5.png b/tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_5.png similarity index 100% rename from tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_5.png rename to tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_5.png diff --git a/tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_5.txt b/tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_5.txt similarity index 100% rename from tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_5.txt rename to tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_5.txt diff --git a/tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_5_regression.png b/tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_5_regression.png similarity index 100% rename from tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_5_regression.png rename to tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_5_regression.png diff --git a/tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_5_regression.txt b/tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_5_regression.txt similarity index 100% rename from tools/DataProfiler/result/hubvertex_outdeg/hub_outdeg_5_regression.txt rename to tools/legacy/dataprofiler/result/hubvertex_outdeg/hub_outdeg_5_regression.txt diff --git a/tools/DataProfiler/result/transfer/in-out.txt b/tools/legacy/dataprofiler/result/transfer/in-out.txt similarity index 100% rename from tools/DataProfiler/result/transfer/in-out.txt rename to tools/legacy/dataprofiler/result/transfer/in-out.txt diff --git a/tools/DataProfiler/result/transfer/in_degree_dist.png b/tools/legacy/dataprofiler/result/transfer/in_degree_dist.png similarity index 100% rename from tools/DataProfiler/result/transfer/in_degree_dist.png rename to tools/legacy/dataprofiler/result/transfer/in_degree_dist.png diff --git a/tools/DataProfiler/result/transfer/in_degree_dist.txt b/tools/legacy/dataprofiler/result/transfer/in_degree_dist.txt similarity index 100% rename from tools/DataProfiler/result/transfer/in_degree_dist.txt rename to tools/legacy/dataprofiler/result/transfer/in_degree_dist.txt diff --git a/tools/DataProfiler/result/transfer/in_degree_dist_regression.png b/tools/legacy/dataprofiler/result/transfer/in_degree_dist_regression.png similarity index 100% rename from tools/DataProfiler/result/transfer/in_degree_dist_regression.png rename to tools/legacy/dataprofiler/result/transfer/in_degree_dist_regression.png diff --git a/tools/DataProfiler/result/transfer/in_degree_dist_regression.txt b/tools/legacy/dataprofiler/result/transfer/in_degree_dist_regression.txt similarity index 100% rename from tools/DataProfiler/result/transfer/in_degree_dist_regression.txt rename to tools/legacy/dataprofiler/result/transfer/in_degree_dist_regression.txt diff --git a/tools/DataProfiler/result/transfer/out-in.txt b/tools/legacy/dataprofiler/result/transfer/out-in.txt similarity index 100% rename from tools/DataProfiler/result/transfer/out-in.txt rename to tools/legacy/dataprofiler/result/transfer/out-in.txt diff --git a/tools/DataProfiler/result/transfer/out_degree_dist.png b/tools/legacy/dataprofiler/result/transfer/out_degree_dist.png similarity index 100% rename from tools/DataProfiler/result/transfer/out_degree_dist.png rename to tools/legacy/dataprofiler/result/transfer/out_degree_dist.png diff --git a/tools/DataProfiler/result/transfer/out_degree_dist.txt b/tools/legacy/dataprofiler/result/transfer/out_degree_dist.txt similarity index 100% rename from tools/DataProfiler/result/transfer/out_degree_dist.txt rename to tools/legacy/dataprofiler/result/transfer/out_degree_dist.txt diff --git a/tools/DataProfiler/result/transfer/out_degree_dist_regression.png b/tools/legacy/dataprofiler/result/transfer/out_degree_dist_regression.png similarity index 100% rename from tools/DataProfiler/result/transfer/out_degree_dist_regression.png rename to tools/legacy/dataprofiler/result/transfer/out_degree_dist_regression.png diff --git a/tools/DataProfiler/result/transfer/out_degree_dist_regression.txt b/tools/legacy/dataprofiler/result/transfer/out_degree_dist_regression.txt similarity index 100% rename from tools/DataProfiler/result/transfer/out_degree_dist_regression.txt rename to tools/legacy/dataprofiler/result/transfer/out_degree_dist_regression.txt diff --git a/tools/DataProfiler/wcc_core.cpp b/tools/legacy/dataprofiler/wcc_core.cpp similarity index 100% rename from tools/DataProfiler/wcc_core.cpp rename to tools/legacy/dataprofiler/wcc_core.cpp diff --git a/tools/paramgen/legacy/factor_table.sh b/tools/legacy/factorgen/factor_table.sh similarity index 100% rename from tools/paramgen/legacy/factor_table.sh rename to tools/legacy/factorgen/factor_table.sh diff --git a/tools/paramgen/legacy/generate_account.py b/tools/legacy/factorgen/generate_account.py similarity index 100% rename from tools/paramgen/legacy/generate_account.py rename to tools/legacy/factorgen/generate_account.py diff --git a/tools/paramgen/legacy/loan.py b/tools/legacy/factorgen/loan.py similarity index 100% rename from tools/paramgen/legacy/loan.py rename to tools/legacy/factorgen/loan.py diff --git a/tools/paramgen/legacy/params_gen.properties b/tools/legacy/factorgen/params_gen.properties similarity index 100% rename from tools/paramgen/legacy/params_gen.properties rename to tools/legacy/factorgen/params_gen.properties diff --git a/tools/paramgen/legacy/params_gen.py b/tools/legacy/factorgen/params_gen.py similarity index 100% rename from tools/paramgen/legacy/params_gen.py rename to tools/legacy/factorgen/params_gen.py diff --git a/tools/paramgen/legacy/split_amount.py b/tools/legacy/factorgen/split_amount.py similarity index 100% rename from tools/paramgen/legacy/split_amount.py rename to tools/legacy/factorgen/split_amount.py diff --git a/tools/paramgen/legacy/time_split.py b/tools/legacy/factorgen/time_split.py similarity index 100% rename from tools/paramgen/legacy/time_split.py rename to tools/legacy/factorgen/time_split.py diff --git a/tools/GraphGen/Makefile b/tools/legacy/graphgen/Makefile similarity index 100% rename from tools/GraphGen/Makefile rename to tools/legacy/graphgen/Makefile diff --git a/tools/GraphGen/README.md b/tools/legacy/graphgen/README.md similarity index 100% rename from tools/GraphGen/README.md rename to tools/legacy/graphgen/README.md diff --git a/tools/GraphGen/graph_gen.c b/tools/legacy/graphgen/graph_gen.c similarity index 100% rename from tools/GraphGen/graph_gen.c rename to tools/legacy/graphgen/graph_gen.c diff --git a/scripts/merge_cluster_output.py b/tools/merge_cluster_output.py similarity index 100% rename from scripts/merge_cluster_output.py rename to tools/merge_cluster_output.py diff --git a/tools/paramgen/README.md b/tools/paramgen/README.md deleted file mode 100644 index 41d0e732..00000000 --- a/tools/paramgen/README.md +++ /dev/null @@ -1,49 +0,0 @@ -## ParamsGen - -`params_gen.py` uses the CREATE_VALIDATION feature to generate parameters. - -The specific steps are as follows: - -1. Select vertices of type Account, Person, and Loan from the dataset, and generate a parameter file that meets the input specifications for ldbc_finbench_driver. -2. Execute CREATE_VALIDATION to generate validation_params.csv. -3. Select non-empty results from validation_params.csv. - -Example: - -```bash -python3 params_gen.py 1 # gen tcr1 params -``` - -Other notes: - -1. The generated start_timestamp and end_timestamp in the current version are fixed values. -2. For tcr4 and tcr10, this method is not efficient enough. Use the following Cypher query to search for parameters: - -```Cypher -// tcr4 -MATCH - (n1:Account)-[:transfer]-> - (n2:Account)-[:transfer]-> - (n3:Account)-[:transfer]->(n4:Account) -WHERE - n1.id = n4.id AND n1.id > n2.id AND n2.id > n3.id -WITH - n1.id as n1id, - n2.id as n2id, - n3.id as n3id, - n4.id as n4id -LIMIT 1000 -RETURN DISTINCT toString(n1id)+"|"+toString(n2id) - -// tcr10 -MATCH - (c:Company)<-[:invest]-(p:Person) -WITH - c.id as cid, - count(p.id) as num, - collect(p.id) as person -WHERE num >= 2 -RETURN - tostring(person[0])+"|"+tostring(person[1]) -LIMIT 1000 -``` diff --git a/tools/paramgen/search_params.py b/tools/paramgen/search_params.py deleted file mode 100644 index 722cbaf2..00000000 --- a/tools/paramgen/search_params.py +++ /dev/null @@ -1,147 +0,0 @@ -import math -from operator import itemgetter - - -def allclose(a, b, rtol=1e-05, atol=1e-08): - return abs(a - b) <= (atol + rtol * abs(b)) - -def readFactors(f): - res = [] - for line in f.readlines(): - values = [item if index == 0 else int(item) for (index, item) in enumerate(line.split(","))] - res.append(values) - - return res - -class Window: - def __init__(self, paramId, start, end): - self.param = paramId - self.start = start - self.end = end - self.avg = 0.0 - self.stddev = 0.0 - self.size = end-start+1 - - def __str__(self): - res = "[%d, %d] "%(self.start, self.end) - res += "size: %d, avg: %0.2f, stddev: %0.2f" % (self.size, self.avg, self.stddev) - return res - -def getAverageCost(rows, key): - return float(sum([key(r) for r in rows])) / len(rows) - -def getCostStdDev(rows, avg, key): - return math.sqrt(sum([math.pow(key(r)-avg,2) for r in rows]) / len(rows)) - -def updateAverageCost(avg, oldelem, newelem, samplesize): - return avg + (newelem - oldelem) / samplesize - - -def findWindows(factors, param, amount, bounds): - - data = factors[bounds[0]: bounds[1]] - allWindows = [] - start = 0 - - initWindow = Window(param, start, amount-1) - initWindow.avg = getAverageCost(data[start:amount], itemgetter(param)) - initWindow.stddev = getCostStdDev(data[start:amount], initWindow.avg, itemgetter(param)) - - s1 = sum([x[param] for x in data[start:start+amount]]) - s2 = sum([x[param]*x[param] for x in data[start:start+amount]]) - start += 1 - allWindows.append(initWindow) - - while start + amount < len(data): - end = start + amount - if data[end-1][param]<10: - break - - window = Window(param, bounds[0]+start, bounds[0]+end-1) - - # update the streaming stats about avg and stddev - s1 -= data[start-1][param] - s1 += data[end-1][param] - s2 -= data[start-1][param]*data[start-1][param] - s2 += data[end-1][param]*data[end-1][param] - - window.avg = float(s1) / amount - window.stddev = math.sqrt(float(amount*s2 - s1*s1))/amount - - allWindows.append(window) - start+=1 - - allWindows.sort(key=lambda windows: windows.stddev) - - res = [] - first = allWindows[0] - iter = 0 - - while iter < len(allWindows) and allWindows[iter].stddev == first.stddev: - res.append(allWindows[iter]) - iter+=1 - - return res - - -def mergeWindows(windows): - res = [] - - cur = windows[0] - - iter = 1 - constucted = cur - - while iter < len(windows): - while iter < len(windows) and windows[iter].start == cur.start+1 and allclose(windows[iter].avg, cur.avg): - cur = windows[iter] - constucted.end=cur.end - constucted.size+=1 - iter+=1 - - res.append(constucted) - if iter >= len(windows): - break - - constucted = windows[iter] - cur = windows[iter] - iter += 1 - - return res - - -def generate(factors, portion): - amount = int(len(factors)*portion) - params = len(factors[0]) -1 - - keys = [i for i in range(1,params+1)] - - factors = sorted(factors, key=itemgetter(*keys), reverse=True) - result = [] - paramId = 1 - - current_windows = findWindows(factors, paramId, amount, (0,len(factors))) - - while len(current_windows) > 1 and paramId < params: - paramId += 1 - current_windows = mergeWindows(current_windows) - - new_windows = [] - for w in current_windows: - w2 = findWindows(factors, paramId, amount, (w.start, w.end+1)) - new_windows.extend(w2) - - new_windows.sort(key=lambda w: w.stddev) - - current_windows = [] - first = new_windows[0] - iter = 0 - - while iter < len(new_windows) and new_windows[iter].stddev == first.stddev : - current_windows.append(new_windows[iter]) - iter+=1 - - w = current_windows[0] - - result.extend([factors[w.start+i][0] for i in range(amount)]) - return result diff --git a/tools/statistic.py b/tools/statistic.py index 39623a03..45e5a2c9 100644 --- a/tools/statistic.py +++ b/tools/statistic.py @@ -4,10 +4,18 @@ import collections -def print_counts(counts): +labels = ["person","personOwnAccount","personApplyLoan","personGuarantee","personInvest","blank","company","companyOwnAccount","companyApplyLoan","companyGuarantee","companyInvest","blank","account","transfer","withdraw","blank","loan","loantransfer","deposit","repay","blank","medium","signIn"] + +def print_original_counts(counts): for key, value in collections.OrderedDict(sorted(counts.items())).items(): print("{}:{}".format(key, value)) +def print_formatted_counts(counts): + for label in labels: + if label == "blank": + print("================================") + else: + print("{}:{}".format(label, counts[label])) def count_entites(path): counts = {} @@ -18,7 +26,9 @@ def count_entites(path): for file in glob.glob(os.path.join(subdir_path, "*.csv")): num_entites += sum(1 for _ in open(file)) - 1 counts[subdir] = num_entites - print_counts(counts) + print_original_counts(counts) + print("\n========== Formatted Output ============\n") + print_formatted_counts(counts) if __name__ == "__main__": diff --git a/transformation/snapshot.sql b/transformation/snapshot.sql index 5d4b10f3..8ebe6ea9 100644 --- a/transformation/snapshot.sql +++ b/transformation/snapshot.sql @@ -1,14 +1,13 @@ --- Person COPY ( -SELECT Person.id AS personId, - Person.name AS personName, - Person.isBlocked AS isBlocked, - epoch_ms(Person.createTime) AS createTime, - Person.gender AS gender, - epoch_ms(Person.birthday)::DATE AS birthday, - Person.country AS country, - Person.city AS city +SELECT Person.id AS personId, + Person.name AS personName, + Person.isBlocked AS isBlocked, + strftime(epoch_ms(Person.createTime), '%Y-%m-%d %H:%M:%S.%g') AS createTime, + Person.gender AS gender, + epoch_ms(Person.birthday)::DATE AS birthday, Person.country AS country, + Person.city AS city FROM Person WHERE Person.createTime <= :start_date_long ORDER BY Person.createTime ) @@ -17,15 +16,15 @@ TO ':output_dir/snapshot/Person.:output_format'; --- Company COPY ( -SELECT Company.id AS companyId, - Company.name AS companyName, - Company.isBlocked AS isBlocked, - epoch_ms(Company.createTime) AS createTime, - Company.country AS country, - Company.city AS city, - Company.business AS business, - Company.description AS description, - Company.url AS url +SELECT Company.id AS companyId, + Company.name AS companyName, + Company.isBlocked AS isBlocked, + strftime(epoch_ms(Company.createTime), '%Y-%m-%d %H:%M:%S.%g') AS createTime, + Company.country AS country, + Company.city AS city, + Company.business AS business, + Company.description AS description, + Company.url AS url FROM Company WHERE Company.createTime <= :start_date_long ORDER BY Company.createTime ) @@ -34,31 +33,31 @@ TO ':output_dir/snapshot/Company.:output_format'; -- Account. It has deletes. COPY ( -SELECT Account.id AS accountId, - epoch_ms(Account.createTime) AS createTime, - Account.isBlocked AS isBlocked, - Account.type AS accountType, - Account.nickname AS nickname, - Account.phonenum AS phonenum, - Account.email AS email, - Account.freqLoginType AS freqLoginType, - epoch_ms(Account.lastLoginTime) AS lastLoginTime, - Account.accountLevel AS accountLevel, +SELECT Account.id AS accountId, + strftime(epoch_ms(Account.createTime), '%Y-%m-%d %H:%M:%S.%g') AS createTime, + Account.isBlocked AS isBlocked, + Account.type AS accountType, + Account.nickname AS nickname, + Account.phonenum AS phonenum, + Account.email AS email, + Account.freqLoginType AS freqLoginType, + strftime(epoch_ms(Account.lastLoginTime), '%Y-%m-%d %H:%M:%S.%g') AS lastLoginTime, + Account.accountLevel AS accountLevel, FROM Account WHERE Account.createTime <= :start_date_long - -- AND Account.deleteTime > :start_date_long +-- AND Account.deleteTime > :start_date_long ORDER BY Account.createTime ) TO ':output_dir/snapshot/Account.:output_format'; -- Loan. COPY ( -SELECT Loan.id AS loanId, - Loan.loanAmount AS loanAmount, - Loan.balance AS balance, - epoch_ms(Loan.createTime) AS createTime, - Loan.usage AS loanUsage, - Loan.interestRate AS interestRate +SELECT Loan.id AS loanId, + Loan.loanAmount AS loanAmount, + Loan.balance AS balance, + strftime(epoch_ms(Loan.createTime), '%Y-%m-%d %H:%M:%S.%g') AS createTime, + Loan.usage AS loanUsage, + Loan.interestRate AS interestRate FROM Loan WHERE Loan.createTime <= :start_date_long ORDER BY Loan.createTime ) @@ -67,12 +66,12 @@ ORDER BY Loan.createTime ) -- Medium. COPY ( -SELECT Medium.id AS mediumId, - Medium.type AS mediumType, - Medium.isBlocked AS isBlocked, - epoch_ms(Medium.createTime) AS createTime, - epoch_ms(Medium.lastLogin) AS lastLoginTime, - Medium.riskLevel AS riskLevel +SELECT Medium.id AS mediumId, + Medium.type AS mediumType, + Medium.isBlocked AS isBlocked, + strftime(epoch_ms(Medium.createTime), '%Y-%m-%d %H:%M:%S.%g') AS createTime, + strftime(epoch_ms(Medium.lastLogin), '%Y-%m-%d %H:%M:%S.%g') AS lastLoginTime, + Medium.riskLevel AS riskLevel FROM Medium WHERE Medium.createTime <= :start_date_long ORDER BY Medium.createTime ) @@ -85,7 +84,7 @@ COPY Transfer.fromId AS fromId, Transfer.toId AS toId, Transfer.amount AS amount, - epoch_ms(Transfer.createTime) AS createTime, + strftime(epoch_ms(Transfer.createTime), '%Y-%m-%d %H:%M:%S.%g') AS createTime, Transfer.orderNum::VARCHAR AS orderNum, Transfer.comment AS comment, Transfer.payType AS payType, @@ -99,7 +98,7 @@ COPY LoanTransfer.fromId AS fromId, LoanTransfer.toId AS toId, LoanTransfer.amount AS amount, - epoch_ms(LoanTransfer.createTime) AS createTime, + strftime(epoch_ms(LoanTransfer.createTime), '%Y-%m-%d %H:%M:%S.%g') AS createTime, LoanTransfer.orderNum::VARCHAR AS orderNum, LoanTransfer.comment AS comment, LoanTransfer.payType AS payType, @@ -115,69 +114,69 @@ TO ':output_dir/snapshot/AccountTransferAccount.:output_format'; -- Withdraw. It has deletes. COPY ( -SELECT Withdraw.fromId AS fromId, - Withdraw.toId AS toId, - Withdraw.fromType AS fromType, - Withdraw.toType AS toType, - Withdraw.amount AS amount, - epoch_ms(Withdraw.createTime) AS createTime, - Withdraw.comment AS comment +SELECT Withdraw.fromId AS fromId, + Withdraw.toId AS toId, + Withdraw.fromType AS fromType, + Withdraw.toType AS toType, + Withdraw.amount AS amount, + strftime(epoch_ms(Withdraw.createTime), '%Y-%m-%d %H:%M:%S.%g') AS createTime, + Withdraw.comment AS comment FROM Withdraw WHERE Withdraw.createTime <= :start_date_long - -- AND Withdraw.deleteTime > :start_date_long +-- AND Withdraw.deleteTime > :start_date_long ORDER BY Withdraw.createTime ) TO ':output_dir/snapshot/AccountWithdrawAccount.:output_format'; -- Repay. It has deletes. COPY ( -SELECT Repay.accountId AS accountId, - Repay.loanId AS loanId, - Repay.amount AS amount, - epoch_ms(Repay.createTime) AS createTime, - Repay.comment AS comment +SELECT Repay.accountId AS accountId, + Repay.loanId AS loanId, + Repay.amount AS amount, + strftime(epoch_ms(Repay.createTime), '%Y-%m-%d %H:%M:%S.%g') AS createTime, + Repay.comment AS comment FROM Repay WHERE Repay.createTime <= :start_date_long - -- AND Repay.deleteTime > :start_date_long +-- AND Repay.deleteTime > :start_date_long ORDER BY Repay.createTime ) TO ':output_dir/snapshot/AccountRepayLoan.:output_format'; -- Deposit. It has deletes. COPY ( -SELECT Deposit.loanId AS loanId, - Deposit.accountId AS accountId, - Deposit.amount AS amount, - epoch_ms(Deposit.createTime) AS createTime, - Deposit.comment AS comment +SELECT Deposit.loanId AS loanId, + Deposit.accountId AS accountId, + Deposit.amount AS amount, + strftime(epoch_ms(Deposit.createTime), '%Y-%m-%d %H:%M:%S.%g') AS createTime, + Deposit.comment AS comment FROM Deposit WHERE Deposit.createTime <= :start_date_long - -- AND Deposit.deleteTime > :start_date_long +-- AND Deposit.deleteTime > :start_date_long ORDER BY Deposit.createTime ) TO ':output_dir/snapshot/LoanDepositAccount.:output_format'; -- SignIn. It has deletes. COPY ( -SELECT SignIn.mediumId AS mediumId, - SignIn.accountId AS accountId, - epoch_ms(SignIn.createTime) AS createTime, - SignIn.location AS location, - SignIn.comment AS comment +SELECT SignIn.mediumId AS mediumId, + SignIn.accountId AS accountId, + strftime(epoch_ms(SignIn.createTime), '%Y-%m-%d %H:%M:%S.%g') AS createTime, + SignIn.location AS location, + SignIn.comment AS comment FROM SignIn WHERE SignIn.createTime <= :start_date_long - -- AND SignIn.deleteTime > :start_date_long +-- AND SignIn.deleteTime > :start_date_long ORDER BY SignIn.createTime ) TO ':output_dir/snapshot/MediumSignInAccount.:output_format'; -- PersonInvest. COPY ( -SELECT PersonInvest.investorId AS investorId, - PersonInvest.companyId AS companyId, - PersonInvest.ratio AS ratio, - epoch_ms(PersonInvest.createTime) AS createTime, - PersonInvest.comment AS comment +SELECT PersonInvest.investorId AS investorId, + PersonInvest.companyId AS companyId, + PersonInvest.ratio AS ratio, + strftime(epoch_ms(PersonInvest.createTime), '%Y-%m-%d %H:%M:%S.%g') AS createTime, + PersonInvest.comment AS comment FROM PersonInvest WHERE PersonInvest.createTime <= :start_date_long ORDER BY PersonInvest.createTime ) @@ -186,11 +185,11 @@ TO ':output_dir/snapshot/PersonInvestCompany.:output_format'; -- CompanyInvest. COPY ( -SELECT CompanyInvest.investorId AS investorId, - CompanyInvest.companyId AS companyId, - CompanyInvest.ratio AS ratio, - epoch_ms(CompanyInvest.createTime) AS createTime, - CompanyInvest.comment AS comment +SELECT CompanyInvest.investorId AS investorId, + CompanyInvest.companyId AS companyId, + CompanyInvest.ratio AS ratio, + strftime(epoch_ms(CompanyInvest.createTime), '%Y-%m-%d %H:%M:%S.%g') AS createTime, + CompanyInvest.comment AS comment FROM CompanyInvest WHERE CompanyInvest.createTime <= :start_date_long ORDER BY CompanyInvest.createTime ) @@ -199,12 +198,12 @@ TO ':output_dir/snapshot/CompanyInvestCompany.:output_format'; -- PersonApplyLoan. COPY ( -SELECT PersonApplyLoan.personId AS personId, - PersonApplyLoan.loanId AS loanId, - PersonApplyLoan.loanAmount AS loanAmount, - epoch_ms(PersonApplyLoan.createTime) AS createTime, - PersonApplyLoan.org AS org, - PersonApplyLoan.comment AS comment +SELECT PersonApplyLoan.personId AS personId, + PersonApplyLoan.loanId AS loanId, + PersonApplyLoan.loanAmount AS loanAmount, + strftime(epoch_ms(PersonApplyLoan.createTime), '%Y-%m-%d %H:%M:%S.%g') AS createTime, + PersonApplyLoan.org AS org, + PersonApplyLoan.comment AS comment FROM PersonApplyLoan WHERE PersonApplyLoan.createTime <= :start_date_long ORDER BY PersonApplyLoan.createTime ) @@ -213,12 +212,12 @@ TO ':output_dir/snapshot/PersonApplyLoan.:output_format'; -- CompanyApplyLoan. COPY ( -SELECT CompanyApplyLoan.companyId AS companyId, - CompanyApplyLoan.loanId AS loanId, - CompanyApplyLoan.loanAmount AS loanAmount, - epoch_ms(CompanyApplyLoan.createTime) AS createTime, - CompanyApplyLoan.org AS org, - CompanyApplyLoan.comment AS comment +SELECT CompanyApplyLoan.companyId AS companyId, + CompanyApplyLoan.loanId AS loanId, + CompanyApplyLoan.loanAmount AS loanAmount, + strftime(epoch_ms(CompanyApplyLoan.createTime), '%Y-%m-%d %H:%M:%S.%g') AS createTime, + CompanyApplyLoan.org AS org, + CompanyApplyLoan.comment AS comment FROM CompanyApplyLoan WHERE CompanyApplyLoan.createTime <= :start_date_long ORDER BY CompanyApplyLoan.createTime ) @@ -227,11 +226,11 @@ TO ':output_dir/snapshot/CompanyApplyLoan.:output_format'; -- PersonGuaranteePerson. COPY ( -SELECT PersonGuarantee.fromId AS fromId, - PersonGuarantee.toId AS toId, - epoch_ms(PersonGuarantee.createTime) AS createTime, - PersonGuarantee.relation AS relation, - PersonGuarantee.comment AS comment +SELECT PersonGuarantee.fromId AS fromId, + PersonGuarantee.toId AS toId, + strftime(epoch_ms(PersonGuarantee.createTime), '%Y-%m-%d %H:%M:%S.%g') AS createTime, + PersonGuarantee.relation AS relation, + PersonGuarantee.comment AS comment FROM PersonGuarantee WHERE PersonGuarantee.createTime <= :start_date_long ORDER BY PersonGuarantee.createTime ) @@ -240,11 +239,11 @@ TO ':output_dir/snapshot/PersonGuaranteePerson.:output_format'; -- CompanyGuaranteeCompany. COPY ( -SELECT CompanyGuarantee.fromId AS fromId, - CompanyGuarantee.toId AS toId, - epoch_ms(CompanyGuarantee.createTime) AS createTime, - CompanyGuarantee.relation AS relation, - CompanyGuarantee.comment AS comment +SELECT CompanyGuarantee.fromId AS fromId, + CompanyGuarantee.toId AS toId, + strftime(epoch_ms(CompanyGuarantee.createTime), '%Y-%m-%d %H:%M:%S.%g') AS createTime, + CompanyGuarantee.relation AS relation, + CompanyGuarantee.comment AS comment FROM CompanyGuarantee WHERE CompanyGuarantee.createTime <= :start_date_long ORDER BY CompanyGuarantee.createTime ) @@ -253,25 +252,25 @@ TO ':output_dir/snapshot/CompanyGuaranteeCompany.:output_format'; -- PersonOwnAccount. It has deletes. COPY ( -SELECT PersonOwnAccount.personId AS personId, - PersonOwnAccount.accountId AS accountId, - epoch_ms(PersonOwnAccount.createTime) AS createTime, - PersonOwnAccount.comment AS comment +SELECT PersonOwnAccount.personId AS personId, + PersonOwnAccount.accountId AS accountId, + strftime(epoch_ms(PersonOwnAccount.createTime), '%Y-%m-%d %H:%M:%S.%g') AS createTime, + PersonOwnAccount.comment AS comment FROM PersonOwnAccount WHERE PersonOwnAccount.createTime <= :start_date_long - -- AND PersonOwnAccount.deleteTime > :start_date_long +-- AND PersonOwnAccount.deleteTime > :start_date_long ORDER BY PersonOwnAccount.createTime ) TO ':output_dir/snapshot/PersonOwnAccount.:output_format'; -- CompanyOwnAccount. It has deletes. COPY ( -SELECT CompanyOwnAccount.companyId AS companyId, - CompanyOwnAccount.accountId AS accountId, - epoch_ms(CompanyOwnAccount.createTime) AS createTime, - CompanyOwnAccount.comment AS comment +SELECT CompanyOwnAccount.companyId AS companyId, + CompanyOwnAccount.accountId AS accountId, + strftime(epoch_ms(CompanyOwnAccount.createTime), '%Y-%m-%d %H:%M:%S.%g') AS createTime, + CompanyOwnAccount.comment AS comment FROM CompanyOwnAccount WHERE CompanyOwnAccount.createTime <= :start_date_long - -- AND CompanyOwnAccount.deleteTime > :start_date_long +-- AND CompanyOwnAccount.deleteTime > :start_date_long ORDER BY CompanyOwnAccount.createTime ) TO ':output_dir/snapshot/CompanyOwnAccount.:output_format';