Skip to content

Commit

Permalink
Merge branch 'opt-curation' into opt
Browse files Browse the repository at this point in the history
  • Loading branch information
qishipengqsp committed Dec 20, 2024
2 parents 49c203c + 0ae6fb9 commit 5fa318c
Show file tree
Hide file tree
Showing 126 changed files with 656 additions and 835 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ sf*/
sf*.tar
sf*.tar.gz

paramgen/__pycache__/
tools/paramgen/__pycache__/

Large diffs are not rendered by default.

151 changes: 151 additions & 0 deletions paramgen/search_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import math
from operator import itemgetter


def allclose(a, b, rtol=1e-05, atol=1e-08):
return abs(a - b) <= (atol + rtol * abs(b))


def readFactors(f):
res = []
for line in f.readlines():
values = [item if index == 0 else int(item) for (index, item) in enumerate(line.split(","))]
res.append(values)

return res


class Window:
def __init__(self, paramId, start, end):
self.param = paramId
self.start = start
self.end = end
self.avg = 0.0
self.stddev = 0.0
self.size = end - start + 1

def __str__(self):
res = "[%d, %d] " % (self.start, self.end)
res += "size: %d, avg: %0.2f, stddev: %0.2f" % (self.size, self.avg, self.stddev)
return res


def getAverageCost(rows, key):
return float(sum([key(r) for r in rows])) / len(rows)


def getCostStdDev(rows, avg, key):
return math.sqrt(sum([math.pow(key(r) - avg, 2) for r in rows]) / len(rows))


def updateAverageCost(avg, oldelem, newelem, samplesize):
return avg + (newelem - oldelem) / samplesize


def findWindows(factors, param, amount, bounds):
data = factors[bounds[0]: bounds[1]]
allWindows = []
start = 0

initWindow = Window(param, start, amount - 1)
initWindow.avg = getAverageCost(data[start:amount], itemgetter(param))
initWindow.stddev = getCostStdDev(data[start:amount], initWindow.avg, itemgetter(param))

s1 = sum([x[param] for x in data[start:start + amount]])
s2 = sum([x[param] * x[param] for x in data[start:start + amount]])
start += 1
allWindows.append(initWindow)

while start + amount < len(data):
end = start + amount
if data[end - 1][param] < 10:
break

window = Window(param, bounds[0] + start, bounds[0] + end - 1)

# update the streaming stats about avg and stddev
s1 -= data[start - 1][param]
s1 += data[end - 1][param]
s2 -= data[start - 1][param] * data[start - 1][param]
s2 += data[end - 1][param] * data[end - 1][param]

window.avg = float(s1) / amount
window.stddev = math.sqrt(float(amount * s2 - s1 * s1)) / amount

allWindows.append(window)
start += 1

allWindows.sort(key=lambda windows: windows.stddev)

res = []
first = allWindows[0]
iter = 0

while iter < len(allWindows) and allWindows[iter].stddev == first.stddev:
res.append(allWindows[iter])
iter += 1

return res


def mergeWindows(windows):
res = []

cur = windows[0]

iter = 1
constucted = cur

while iter < len(windows):
while iter < len(windows) and windows[iter].start == cur.start + 1 and allclose(windows[iter].avg, cur.avg):
cur = windows[iter]
constucted.end = cur.end
constucted.size += 1
iter += 1

res.append(constucted)
if iter >= len(windows):
break

constucted = windows[iter]
cur = windows[iter]
iter += 1

return res


def generate(factors, portion):
amount = int(len(factors) * portion)
params = len(factors[0]) - 1

keys = [i for i in range(1, params + 1)]

factors = sorted(factors, key=itemgetter(*keys), reverse=True)
result = []
paramId = 1

current_windows = findWindows(factors, paramId, amount, (0, len(factors)))

while len(current_windows) > 1 and paramId < params:
paramId += 1
current_windows = mergeWindows(current_windows)

new_windows = []
for w in current_windows:
w2 = findWindows(factors, paramId, amount, (w.start, w.end + 1))
new_windows.extend(w2)

new_windows.sort(key=lambda w: w.stddev)

current_windows = []
first = new_windows[0]
iter = 0

while iter < len(new_windows) and new_windows[iter].stddev == first.stddev:
current_windows.append(new_windows[iter])
iter += 1

w = current_windows[0]

result.extend([factors[w.start + i][0] for i in range(amount)])
return result
56 changes: 27 additions & 29 deletions tools/paramgen/time_select.py → paramgen/time_select.py
Original file line number Diff line number Diff line change
@@ -1,95 +1,95 @@
LAST_MONTHS = 3
START_YEAR = 2020


class MonthYearCount:
def __init__(self, month, year, count):
self.month=month
self.year=year
self.count=count
self.month = month
self.year = year
self.count = count


class TimeParameter:
def __init__(self, year, month, day, duration):
self.month=month
self.year=year
self.day=day
self.duration=duration
self.month = month
self.year = year
self.day = day
self.duration = duration


def getMedian(data, sort_key, getEntireTuple = False):
def getMedian(data, sort_key, getEntireTuple=False):
if len(data) == 0:
if getEntireTuple:
return MonthYearCount(0,0,0)
return MonthYearCount(0, 0, 0)
return 0

if len(data) == 1:
if getEntireTuple:
return data[0]
return data[0].count

srtd = sorted(data,key=sort_key)
mid = int(len(data)/2)
srtd = sorted(data, key=sort_key)
mid = int(len(data) / 2)

if len(data) % 2 == 0:
if getEntireTuple:
return srtd[mid]
return (sort_key(srtd[mid-1]) + sort_key(srtd[mid])) / 2.0
return (sort_key(srtd[mid - 1]) + sort_key(srtd[mid])) / 2.0

if getEntireTuple:
return srtd[mid]
return sort_key(srtd[mid])


def computeTimeMedians(factors, lastmonthcount = LAST_MONTHS):
def computeTimeMedians(factors, lastmonthcount=LAST_MONTHS):
mediantimes = []
lastmonths = []
firstmonths = []
for values in factors:
values.sort(key=lambda myc: (myc.year, myc.month))

l = len(values)
lastmonthsum = sum(myc.count for myc in values[max(l-lastmonthcount,0):l])
lastmonthsum = sum(myc.count for myc in values[max(l - lastmonthcount, 0):l])
lastmonths.append(lastmonthsum)
cutoff_max = l-lastmonthcount
cutoff_max = l - lastmonthcount
if cutoff_max < 0:
cutoff_max = l
firstmonthsum = sum(myc.count for myc in values[0:cutoff_max])
firstmonths.append(firstmonthsum)
mediantimes.append(getMedian(values,lambda myc: myc.count))
mediantimes.append(getMedian(values, lambda myc: myc.count))

median = getMedian(mediantimes, lambda x: x)
medianLastMonth = getMedian(lastmonths, lambda x: x)
medianFirstMonth = getMedian(firstmonths, lambda x: x)

return medianFirstMonth, medianLastMonth, median


def getTimeParamsWithMedian(factors, medianFirstMonth, medianLastMonth, median):
# strategy: find the median of the given distribution, then increase the time interval until it matches the given parameter
res = []
for values in factors:
input = sorted(values,key=lambda myc: (myc.year, myc.month))
currentMedian = getMedian(values,lambda myc: myc.count, True)
input = sorted(values, key=lambda myc: (myc.year, myc.month))
currentMedian = getMedian(values, lambda myc: myc.count, True)
if int(median) == 0 or int(currentMedian.count) == 0 or int(currentMedian.year) == 0:
res.append(TimeParameter(START_YEAR,1,1,0))
res.append(TimeParameter(START_YEAR, 1, 1, 0))
continue
if currentMedian.count > median:
duration = int(28*currentMedian.count/median)
duration = int(28 * currentMedian.count / median)
res.append(TimeParameter(currentMedian.year, currentMedian.month, 1, duration))
else:
duration = int(28*median/currentMedian.count)
duration = int(28 * median / currentMedian.count)
res.append(TimeParameter(currentMedian.year, currentMedian.month, 1, duration))
return res


def findTimeParameters(factors):

medianFirstMonth, medianLastMonth, median = computeTimeMedians(factors)
timeParams = getTimeParamsWithMedian(factors, medianFirstMonth, medianLastMonth, median)

return timeParams


def findTimeParams(input_loan_list, time_bucket_df):
time_list = [x for x in time_bucket_df.iloc[0].index.tolist()[1:]]
factors = []
Expand All @@ -103,7 +103,5 @@ def findTimeParams(input_loan_list, time_bucket_df):
year = START_YEAR + month / 12
temp_factors.append(MonthYearCount(month % 12 + 1, int(year), count))
factors.append(temp_factors)

return findTimeParameters(factors)


return findTimeParameters(factors)
24 changes: 24 additions & 0 deletions scripts/run_local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@
LDBC_FINBENCH_DATAGEN_JAR=target/ldbc_finbench_datagen-0.2.0-SNAPSHOT-jar-with-dependencies.jar
OUTPUT_DIR=out

# Note: generate factor tables with --generate-factors

# run locally with the python script
# time python3 scripts/run.py --jar $LDBC_FINBENCH_DATAGEN_JAR --main-class ldbc.finbench.datagen.LdbcDatagen --memory 500g -- --scale-factor 30 --output-dir ${OUTPUT_DIR}

# run locally with spark-submit command
# **({'spark.driver.extraJavaOptions': '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005'}), # Debug
# **({'spark.executor.extraJavaOptions': '-verbose:gc -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps'}),
# --conf "spark.memory.offHeap.enabled=true" \
# --conf "spark.memory.offHeap.size=100g" \
# --conf "spark.storage.memoryFraction=0" \
# --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \

time spark-submit --master local[*] \
--class ldbc.finbench.datagen.LdbcDatagen \
--driver-memory 480g \
Expand All @@ -24,3 +31,20 @@ time spark-submit --master local[*] \
${LDBC_FINBENCH_DATAGEN_JAR} \
--scale-factor 10 \
--output-dir ${OUTPUT_DIR}

# currently works on SF100
#time spark-submit --master local[*] \
# --class ldbc.finbench.datagen.LdbcDatagen \
# --driver-memory 400g \
# --conf "spark.default.parallelism=800" \
# --conf "spark.shuffle.compress=true" \
# --conf "spark.shuffle.spill.compress=true" \
# --conf "spark.kryoserializer.buffer.max=512m" \
# --conf "spark.driver.maxResultSize=0" \
# --conf "spark.driver.extraJavaOptions=-Xss512m" \
# --conf "spark.executor.extraJavaOptions=-Xss512m -XX:+UseG1GC" \
# --conf "spark.kryo.referenceTracking=false" \
# ${LDBC_FINBENCH_DATAGEN_JAR} \
# --scale-factor 100 \
# --output-dir ${OUTPUT_DIR}

18 changes: 18 additions & 0 deletions scripts/run_paramgen.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#!/bin/bash

LDBC_FINBENCH_DATAGEN_JAR=target/ldbc_finbench_datagen-0.2.0-SNAPSHOT-jar-with-dependencies.jar
OUTPUT_DIR=out/

# Note: generate factor tables with --generate-factors

echo "start factor table generation"

time spark-submit --master local[*] \
--class ldbc.finbench.datagen.LdbcDatagen \
--driver-memory 480g \
${LDBC_FINBENCH_DATAGEN_JAR} \
--output-dir ${OUTPUT_DIR} \
--factor-format csv \
--generate-factors

echo "start parameter curation"
Loading

0 comments on commit 5fa318c

Please sign in to comment.