Skip to content

Commit

Permalink
fix OOM hugeCapacity in large scale, use kyro serializer, merge perso…
Browse files Browse the repository at this point in the history
…n and company activities (#94)
  • Loading branch information
qishipengqsp authored Sep 15, 2024
1 parent d05e55d commit 15ab7ac
Show file tree
Hide file tree
Showing 16 changed files with 321 additions and 200 deletions.
5 changes: 4 additions & 1 deletion scripts/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ def run_local(
**({'spark.shuffle.spill.compress': 'true'}),
**({'spark.serializer': 'org.apache.spark.serializer.KryoSerializer'}),
**({'spark.executor.extraJavaOptions': '-XX:+UseG1GC'}),
**({'spark.driver.maxResultSize': '5g'}),
**({'spark.driver.maxResultSize': '0'}),
**({'spark.memory.offHeap.enabled': 'true'}),
**({'spark.memory.offHeap.size': '100g'}),
**({'spark.storage.memoryFraction': '0'}),
# **({'spark.driver.extraJavaOptions': '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005'}), # Debug
# **({'spark.executor.extraJavaOptions': '-verbose:gc -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps'}),
**spark_conf
Expand Down
15 changes: 8 additions & 7 deletions scripts/run_cluster.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,24 @@ echo "start: " `date`
# --conf "spark.dynamicAllocation.enabled=true" \
# --conf "spark.dynamicAllocation.minExecutors=1" \
# --conf "spark.dynamicAllocation.maxExecutors=10" \

# --conf "spark.yarn.maximizeResourceAllocation=true" \
# --conf "spark.memory.offHeap.enabled=true" \
# --conf "spark.memory.offHeap.size=100g" \
time spark-submit --master spark://finbench-large-00:7077 \
--class ldbc.finbench.datagen.LdbcDatagen \
--num-executors 2 \
--conf "spark.default.parallelism=640" \
--conf "spark.default.parallelism=800" \
--conf "spark.network.timeout=100000" \
--conf "spark.shuffle.compress=true" \
--conf "spark.shuffle.spill.compress=true" \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.driver.memory=200g" \
--conf "spark.driver.maxResultSize=5g" \
--conf "spark.executor.memory=300g" \
--conf "spark.executor.memoryOverheadFactor=0.2" \
--conf "spark.driver.memory=100g" \
--conf "spark.driver.maxResultSize=0" \
--conf "spark.executor.memory=400g" \
--conf "spark.executor.memoryOverheadFactor=0.5" \
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" \
${LDBC_FINBENCH_DATAGEN_JAR} \
--scale-factor 30 \
--scale-factor 100 \
--output-dir ${OUTPUT_DIR}

echo "End: " `date`
24 changes: 21 additions & 3 deletions scripts/run_local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,24 @@
LDBC_FINBENCH_DATAGEN_JAR=target/ldbc_finbench_datagen-0.2.0-SNAPSHOT-jar-with-dependencies.jar
OUTPUT_DIR=out

# For more command line arguments, see the main entry for more information at
# src/main/scala/ldbc/finbench/datagen/LdbcDatagen.scala
time python3 scripts/run.py --jar $LDBC_FINBENCH_DATAGEN_JAR --main-class ldbc.finbench.datagen.LdbcDatagen --memory 500g -- --scale-factor 1 --output-dir ${OUTPUT_DIR}
# run locally with the python script
# time python3 scripts/run.py --jar $LDBC_FINBENCH_DATAGEN_JAR --main-class ldbc.finbench.datagen.LdbcDatagen --memory 500g -- --scale-factor 30 --output-dir ${OUTPUT_DIR}

# run locally with spark-submit command
# **({'spark.driver.extraJavaOptions': '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005'}), # Debug
# **({'spark.executor.extraJavaOptions': '-verbose:gc -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps'}),
time spark-submit --master local[*] \
--class ldbc.finbench.datagen.LdbcDatagen \
--driver-memory 500g \
--conf "spark.default.parallelism=500" \
--conf "spark.shuffle.compress=true" \
--conf "spark.shuffle.spill.compress=true" \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \
--conf "spark.memory.offHeap.enabled=true" \
--conf "spark.memory.offHeap.size=100g" \
--conf "spark.storage.memoryFraction=0" \
--conf "spark.driver.maxResultSize=0" \
--conf "spark.executor.extraJavaOptions=-XX:+UseG1GC" \
${LDBC_FINBENCH_DATAGEN_JAR} \
--scale-factor 30 \
--output-dir ${OUTPUT_DIR}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public static void createCompanyGuaranteeCompany(RandomGeneratorFarm farm, Compa
toCompany, creationDate, 0, false,
"business associate", comment);
fromCompany.getGuaranteeSrc().add(companyGuaranteeCompany);
toCompany.getGuaranteeDst().add(companyGuaranteeCompany);
}

public Company getFromCompany() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public static void createPersonGuaranteePerson(RandomGeneratorFarm farm, Person
PersonGuaranteePerson personGuaranteePerson =
new PersonGuaranteePerson(fromPerson, toPerson, creationDate, 0, false, relation, comment);
fromPerson.getGuaranteeSrc().add(personGuaranteePerson);
toPerson.getGuaranteeDst().add(personGuaranteePerson);
}

public Person getFromPerson() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package ldbc.finbench.datagen.generation.events;

import java.io.Serializable;
import java.util.List;
import java.util.Random;
import ldbc.finbench.datagen.entities.edges.CompanyApplyLoan;
import ldbc.finbench.datagen.entities.edges.CompanyGuaranteeCompany;
import ldbc.finbench.datagen.entities.edges.CompanyOwnAccount;
import ldbc.finbench.datagen.entities.nodes.Account;
import ldbc.finbench.datagen.entities.nodes.Company;
import ldbc.finbench.datagen.entities.nodes.Loan;
import ldbc.finbench.datagen.generation.DatagenParams;
import ldbc.finbench.datagen.generation.dictionary.Dictionaries;
import ldbc.finbench.datagen.generation.generators.AccountGenerator;
import ldbc.finbench.datagen.generation.generators.LoanGenerator;
import ldbc.finbench.datagen.util.RandomGeneratorFarm;

public class CompanyActivitiesEvent implements Serializable {
private final RandomGeneratorFarm randomFarm;
private final Random randIndex;

public CompanyActivitiesEvent() {
randomFarm = new RandomGeneratorFarm();
randIndex = new Random(DatagenParams.defaultSeed);
}

private void resetState(int seed) {
randomFarm.resetRandomGenerators(seed);
randIndex.setSeed(seed);
}

public List<Company> companyActivities(List<Company> companies, AccountGenerator accountGenerator,
LoanGenerator loanGenerator, int blockId) {
resetState(blockId);
accountGenerator.resetState(blockId);

Random numAccRand = randomFarm.get(RandomGeneratorFarm.Aspect.NUM_ACCOUNTS_PER_COMPANY);

Random pickCompanyGuaRand = randomFarm.get(RandomGeneratorFarm.Aspect.PICK_COMPANY_GUARANTEE);
Random numGuaranteesRand = randomFarm.get(RandomGeneratorFarm.Aspect.NUM_GUARANTEES_PER_COMPANY);

Random pickCompanyLoanRand = randomFarm.get(RandomGeneratorFarm.Aspect.PICK_COMPANY_FOR_LOAN);
Random numLoansRand = randomFarm.get(RandomGeneratorFarm.Aspect.NUM_LOANS_PER_COMPANY);
Random dateRand = randomFarm.get(RandomGeneratorFarm.Aspect.COMPANY_APPLY_LOAN_DATE);

for (Company from : companies) {
// register accounts
int numAccounts = numAccRand.nextInt(DatagenParams.maxAccountsPerOwner);
for (int i = 0; i < Math.max(1, numAccounts); i++) {
Account account = accountGenerator.generateAccount(from.getCreationDate(), "company", blockId);
CompanyOwnAccount.createCompanyOwnAccount(randomFarm, from, account, account.getCreationDate());
}
// guarantee other companies
if (pickCompanyGuaRand.nextDouble() < DatagenParams.companyGuaranteeFraction) {
int numGuarantees = numGuaranteesRand.nextInt(DatagenParams.maxTargetsToGuarantee);
for (int i = 0; i < Math.max(1, numGuarantees); i++) {
Company to = companies.get(randIndex.nextInt(companies.size()));
if (from.canGuarantee(to)) {
CompanyGuaranteeCompany.createCompanyGuaranteeCompany(randomFarm, from, to);
}
}
}
// apply loans
if (pickCompanyLoanRand.nextDouble() < DatagenParams.companyLoanFraction) {
int numLoans = numLoansRand.nextInt(DatagenParams.maxLoans);
for (int i = 0; i < Math.max(1, numLoans); i++) {
long applyDate = Dictionaries.dates.randomCompanyToLoanDate(dateRand, from);
Loan to = loanGenerator.generateLoan(applyDate, "company", blockId);
CompanyApplyLoan.createCompanyApplyLoan(randomFarm, applyDate, from, to);
}
}
}

return companies;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package ldbc.finbench.datagen.generation.events;

import java.io.Serializable;
import java.util.List;
import java.util.Random;
import ldbc.finbench.datagen.entities.edges.PersonApplyLoan;
import ldbc.finbench.datagen.entities.edges.PersonGuaranteePerson;
import ldbc.finbench.datagen.entities.edges.PersonOwnAccount;
import ldbc.finbench.datagen.entities.nodes.Account;
import ldbc.finbench.datagen.entities.nodes.Loan;
import ldbc.finbench.datagen.entities.nodes.Person;
import ldbc.finbench.datagen.generation.DatagenParams;
import ldbc.finbench.datagen.generation.dictionary.Dictionaries;
import ldbc.finbench.datagen.generation.generators.AccountGenerator;
import ldbc.finbench.datagen.generation.generators.LoanGenerator;
import ldbc.finbench.datagen.util.RandomGeneratorFarm;

public class PersonActivitiesEvent implements Serializable {
private final RandomGeneratorFarm randomFarm;
private final Random randIndex;

public PersonActivitiesEvent() {
randomFarm = new RandomGeneratorFarm();
randIndex = new Random(DatagenParams.defaultSeed);
}

private void resetState(int seed) {
randomFarm.resetRandomGenerators(seed);
randIndex.setSeed(seed);
}

// Generate accounts, guarantees, and loans for persons
public List<Person> personActivities(List<Person> persons, AccountGenerator accountGenerator,
LoanGenerator loanGenerator, int blockId) {
resetState(blockId);
accountGenerator.resetState(blockId);

Random numAccRand = randomFarm.get(RandomGeneratorFarm.Aspect.NUM_ACCOUNTS_PER_PERSON);

Random pickPersonGuaRand = randomFarm.get(RandomGeneratorFarm.Aspect.PICK_PERSON_GUARANTEE);
Random numGuaranteesRand = randomFarm.get(RandomGeneratorFarm.Aspect.NUM_GUARANTEES_PER_PERSON);

Random pickPersonLoanRand = randomFarm.get(RandomGeneratorFarm.Aspect.PICK_PERSON_LOAN);
Random numLoansRand = randomFarm.get(RandomGeneratorFarm.Aspect.NUM_LOANS_PER_PERSON);
Random dateRand = randomFarm.get(RandomGeneratorFarm.Aspect.PERSON_APPLY_LOAN_DATE);

for (Person from : persons) {
// register accounts
int numAccounts = numAccRand.nextInt(DatagenParams.maxAccountsPerOwner);
for (int i = 0; i < Math.max(1, numAccounts); i++) {
Account to = accountGenerator.generateAccount(from.getCreationDate(), "person", blockId);
PersonOwnAccount.createPersonOwnAccount(randomFarm, from, to, to.getCreationDate());
}
// guarantee other persons
if (pickPersonGuaRand.nextDouble() < DatagenParams.personGuaranteeFraction) {
int numGuarantees = numGuaranteesRand.nextInt(DatagenParams.maxTargetsToGuarantee);
for (int i = 0; i < Math.max(1, numGuarantees); i++) {
Person to = persons.get(randIndex.nextInt(persons.size()));
if (from.canGuarantee(to)) {
PersonGuaranteePerson.createPersonGuaranteePerson(randomFarm, from, to);
}
}
}
// apply loans
if (pickPersonLoanRand.nextDouble() < DatagenParams.personLoanFraction) {
int numLoans = numLoansRand.nextInt(DatagenParams.maxLoans);
for (int i = 0; i < Math.max(1, numLoans); i++) {
long applyDate = Dictionaries.dates.randomPersonToLoanDate(dateRand, from);
Loan to = loanGenerator.generateLoan(applyDate, "person", blockId);
PersonApplyLoan.createPersonApplyLoan(randomFarm, applyDate, from, to);
}
}
}

return persons;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public List<Person> personLoan(List<Person> persons, LoanGenerator loanGenerator
resetState(blockId);
loanGenerator.resetState(blockId);

Random pickPersonRand = randomFarm.get(RandomGeneratorFarm.Aspect.PICK_PERSON_FOR_LOAN);
Random pickPersonRand = randomFarm.get(RandomGeneratorFarm.Aspect.PICK_PERSON_LOAN);
Random numLoansRand = randomFarm.get(RandomGeneratorFarm.Aspect.NUM_LOANS_PER_PERSON);
Random dateRand = randomFarm.get(RandomGeneratorFarm.Aspect.PERSON_APPLY_LOAN_DATE);
int numPersonsToTake = (int) (persons.size() * DatagenParams.personLoanFraction);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
package ldbc.finbench.datagen.generation.generators;

import java.util.Iterator;
import java.util.Random;
import ldbc.finbench.datagen.entities.nodes.Medium;
import ldbc.finbench.datagen.generation.DatagenParams;
import ldbc.finbench.datagen.generation.dictionary.Dictionaries;
import ldbc.finbench.datagen.util.RandomGeneratorFarm;

public class MediumGenerator {
private final RandomGeneratorFarm randomFarm;
private final Random blockRandom;
private int nextId = 0;

public MediumGenerator() {
this.randomFarm = new RandomGeneratorFarm();
this.blockRandom = new Random(DatagenParams.defaultSeed);
}

private long composeMediumId(long id, long date) {
Expand All @@ -30,7 +27,7 @@ public Medium generateMedium() {

// Set creationDate
long creationDate = Dictionaries.dates.randomMediumCreationDate(
randomFarm.get(RandomGeneratorFarm.Aspect.ACCOUNT_CREATION_DATE));
randomFarm.get(RandomGeneratorFarm.Aspect.MEDIUM_CREATION_DATE));
medium.setCreationDate(creationDate);

// Set mediumId
Expand All @@ -43,11 +40,12 @@ public Medium generateMedium() {
medium.setMediumName(mediunName);

// Set isBlocked
medium.setBlocked(blockRandom.nextDouble() < DatagenParams.blockedMediumRatio);
medium.setBlocked(
randomFarm.get(RandomGeneratorFarm.Aspect.MEDIUM_BLOCKED).nextDouble() < DatagenParams.blockedMediumRatio);

// Set lastLogin
long lastLogin = Dictionaries.dates.randomMediumLastLogin(
randomFarm.get(RandomGeneratorFarm.Aspect.MEDUIM_LAST_LOGIN_DATE), creationDate);
randomFarm.get(RandomGeneratorFarm.Aspect.MEDIUM_LAST_LOGIN_DATE), creationDate);
medium.setLastLogin(lastLogin);

// Set riskLevel
Expand All @@ -60,7 +58,6 @@ public Medium generateMedium() {

private void resetState(int seed) {
randomFarm.resetRandomGenerators(seed);
blockRandom.setSeed(seed);
}

public Iterator<Medium> generateMediumBlock(int blockId, int blockSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ public enum Aspect {

// vertex: medium
MEDIUM_NAME,
MEDIUM_BLOCKED,
MEDIUM_RISK_LEVEL,
MEDUIM_LAST_LOGIN_DATE,
MEDIUM_CREATION_DATE,
MEDIUM_LAST_LOGIN_DATE,

// vertex: account
ACCOUNT_TYPE,
Expand Down Expand Up @@ -102,7 +104,7 @@ public enum Aspect {
COMPANY_GUARANTEE_DATE,

// edge: person loan
PICK_PERSON_FOR_LOAN,
PICK_PERSON_LOAN,
NUM_LOANS_PER_PERSON,
PERSON_APPLY_LOAN_ORGANIZATION,
PERSON_APPLY_LOAN_DATE,
Expand Down
23 changes: 23 additions & 0 deletions src/main/resources/scale_factors.xml
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,27 @@
</property>
</scale_factor>

<scale_factor name="100">
<property>
<name>generator.numPersons</name>
<value>2000000</value>
</property>
<property>
<name>generator.numCompanies</name>
<value>2000000</value>
</property>
<property>
<name>generator.numMediums</name>
<value>3500000</value>
</property>
<property>
<name>transfer.minNumDegree</name>
<value>1</value>
</property>
<property>
<name>transfer.maxNumDegree</name>
<value>100000</value>
</property>
</scale_factor>

</scale_factors>
Original file line number Diff line number Diff line change
@@ -1,33 +1,13 @@
package ldbc.finbench.datagen.factors

import ldbc.finbench.datagen.util.DatagenStage
import org.apache.spark.sql.{SparkSession, functions => F, DataFrame}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{col, countDistinct, row_number, var_pop}
import org.apache.spark.sql.functions.max
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions.date_format
import org.apache.spark.sql.functions.first
import org.apache.spark.sql.functions.when
import org.apache.spark.sql.functions.collect_set
import org.apache.spark.sql.functions.array
import org.apache.spark.sql.functions.coalesce
import org.apache.spark.sql.functions.array_join
import org.apache.spark.sql.functions.concat
import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.functions.format_string
import org.apache.spark.sql.functions.collect_list
import org.apache.spark.sql.functions.concat_ws
import org.apache.spark.sql.functions.size
import org.apache.spark.sql.functions.expr
import org.apache.spark.sql.types.{ArrayType, StringType}
import org.graphframes.GraphFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession, functions => F}
import org.slf4j.{Logger, LoggerFactory}
import scopt.OptionParser
import shapeless.lens

import scala.util.matching.Regex
import ldbc.finbench.datagen.factors.AccountItemsGenerator

object FactorGenerationStage extends DatagenStage {

Expand Down
Loading

0 comments on commit 15ab7ac

Please sign in to comment.