Skip to content

Commit

Permalink
simplify loan activities (#102)
Browse files Browse the repository at this point in the history
  • Loading branch information
qishipengqsp authored Sep 19, 2024
1 parent 2fbf67a commit cfa327a
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public Deposit(Loan loan, Account account, double amount, long creationDate, lon
this.comment = comment;
}

public static Deposit createDeposit(RandomGeneratorFarm farm, Loan loan, Account account, double amount) {
public static void createDeposit(RandomGeneratorFarm farm, Loan loan, Account account, double amount) {
long creationDate =
Dictionaries.dates.randomLoanToAccountDate(farm.get(RandomGeneratorFarm.Aspect.LOAN_SUBEVENTS_DATE), loan,
account, account.getDeletionDate());
Expand All @@ -38,9 +38,7 @@ public static Deposit createDeposit(RandomGeneratorFarm farm, Loan loan, Account
new Deposit(loan, account, amount, creationDate, account.getDeletionDate(), account.isExplicitlyDeleted(),
comment);
loan.addDeposit(deposit);
account.getDeposits().add(deposit);

return deposit;
//account.getDeposits().add(deposit);
}

public double getAmount() {
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/ldbc/finbench/datagen/entities/edges/Repay.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public Repay(Account account, Loan loan, double amount, long creationDate, long
this.comment = comment;
}

public static Repay createRepay(RandomGeneratorFarm farm, Account account, Loan loan, double amount) {
public static void createRepay(RandomGeneratorFarm farm, Account account, Loan loan, double amount) {
long creationDate =
Dictionaries.dates.randomAccountToLoanDate(farm.get(RandomGeneratorFarm.Aspect.LOAN_SUBEVENTS_DATE),
account, loan, account.getDeletionDate());
Expand All @@ -37,9 +37,7 @@ public static Repay createRepay(RandomGeneratorFarm farm, Account account, Loan
Repay repay = new Repay(account, loan, amount, creationDate, account.getDeletionDate(),
account.isExplicitlyDeleted(), comment);
loan.addRepay(repay);
account.getRepays().add(repay);

return repay;
//account.getRepays().add(repay);
}

public double getAmount() {
Expand Down
11 changes: 6 additions & 5 deletions src/main/java/ldbc/finbench/datagen/entities/edges/Transfer.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Comparator;
import ldbc.finbench.datagen.entities.DynamicActivity;
import ldbc.finbench.datagen.entities.nodes.Account;
import ldbc.finbench.datagen.entities.nodes.Loan;
import ldbc.finbench.datagen.generation.DatagenParams;
import ldbc.finbench.datagen.generation.dictionary.Dictionaries;
import ldbc.finbench.datagen.util.RandomGeneratorFarm;
Expand Down Expand Up @@ -69,8 +70,8 @@ public static void createTransfer(RandomGeneratorFarm farm, Account from, Accoun
to.getTransferIns().add(transfer);
}

public static Transfer createLoanTransfer(RandomGeneratorFarm farm, Account from, Account to,
long multiplicityId,
public static void createLoanTransfer(RandomGeneratorFarm farm, Account from, Account to,
Loan loan, long multiplicityId,
double amount) {
long deleteDate = Math.min(from.getDeletionDate(), to.getDeletionDate());
long creationDate =
Expand Down Expand Up @@ -101,9 +102,9 @@ public static Transfer createLoanTransfer(RandomGeneratorFarm farm, Account from
farm.get(RandomGeneratorFarm.Aspect.TRANSFER_GOODSTYPE));
transfer.setGoodsType(goodsType);

from.getTransferOuts().add(transfer);
to.getTransferIns().add(transfer);
return transfer;
//from.getTransferOuts().add(transfer);
//to.getTransferIns().add(transfer);
loan.addLoanTransfer(transfer);
}

public static class FullComparator implements Comparator<Transfer> {
Expand Down
16 changes: 14 additions & 2 deletions src/main/java/ldbc/finbench/datagen/entities/nodes/Loan.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import java.io.Serializable;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import ldbc.finbench.datagen.entities.edges.Deposit;
import ldbc.finbench.datagen.entities.edges.Repay;
import ldbc.finbench.datagen.entities.edges.Transfer;

public class Loan implements Serializable, Comparable<Loan> {
private long loanId;
Expand All @@ -19,6 +21,7 @@ public class Loan implements Serializable, Comparable<Loan> {
private Company ownerCompany;
private final List<Deposit> deposits;
private final List<Repay> repays;
private final List<Transfer> loanTransfers;

public Loan(long loanId, double loanAmount, double balance, long creationDate, long maxDegree,
String usage, double interestRate) {
Expand All @@ -29,8 +32,9 @@ public Loan(long loanId, double loanAmount, double balance, long creationDate, l
this.maxDegree = maxDegree;
this.usage = usage;
this.interestRate = interestRate;
deposits = new ArrayList<>();
repays = new ArrayList<>();
deposits = new LinkedList<>();
repays = new LinkedList<>();
loanTransfers = new LinkedList<>();
}

@Override
Expand Down Expand Up @@ -118,6 +122,14 @@ public List<Repay> getRepays() {
return repays;
}

public void addLoanTransfer(Transfer transfer) {
loanTransfers.add(transfer);
}

public List<Transfer> getLoanTransfers() {
return loanTransfers;
}

public Person getOwnerPerson() {
return ownerPerson;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.io.Serializable;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
Expand All @@ -20,29 +19,22 @@
import ldbc.finbench.datagen.generation.DatagenParams;
import ldbc.finbench.datagen.util.RandomGeneratorFarm;

public class LoanSubEvents implements Serializable {
public class LoanActivitiesEvents implements Serializable {
private final RandomGeneratorFarm randomFarm;
private final Random indexRandom;
private final Random actionRandom;
private final Random amountRandom;
private final List<Consumer<Loan>> consumers;
private final List<Account> targetAccounts;
private final List<Deposit> deposits;
private final List<Repay> repays;
private final List<Transfer> transfers;
// Note: Don't make it static. It will be accessed by different Spark workers, which makes multiplicity wrong.
private final Map<String, AtomicLong> multiplicityMap;
private List<Account> targetAccounts;

public LoanSubEvents(List<Account> targets) {
public LoanActivitiesEvents() {
multiplicityMap = new ConcurrentHashMap<>();
randomFarm = new RandomGeneratorFarm();
indexRandom = new Random(DatagenParams.defaultSeed);
actionRandom = new Random(DatagenParams.defaultSeed);
amountRandom = new Random(DatagenParams.defaultSeed);
targetAccounts = targets;
deposits = new LinkedList<>();
repays = new LinkedList<>();
transfers = new LinkedList<>();
// Add all defined subevents to the consumers list
consumers = Arrays.asList(this::depositSubEvent,
this::repaySubEvent,
Expand All @@ -56,47 +48,33 @@ public void resetState(int seed) {
amountRandom.setSeed(seed);
}

public List<Deposit> getDeposits() {
return deposits;
}

public List<Repay> getRepays() {
return repays;
}

public List<Transfer> getTransfers() {
return transfers;
}

public void afterLoanApplied(List<Loan> loans, int blockId) {
public List<Loan> afterLoanApplied(List<Loan> loans, List<Account> targets, int blockId) {
resetState(blockId);
targetAccounts = targets;
for (Loan loan : loans) {
int count = 0;
while (count++ < DatagenParams.numLoanActions) {
Consumer<Loan> consumer = consumers.get(actionRandom.nextInt(consumers.size()));
consumer.accept(loan);
}
}
return loans;
}

private void depositSubEvent(Loan loan) {
Account account = getAccount(loan);
if (loan.getBalance() == 0 || cannotDeposit(loan, account)) {
return;
if (!cannotDeposit(loan, account)) {
double amount = amountRandom.nextDouble() * loan.getBalance();
Deposit.createDeposit(randomFarm, loan, account, amount);
}
double amount = amountRandom.nextDouble() * loan.getBalance();
Deposit deposit = Deposit.createDeposit(randomFarm, loan, account, amount);
deposits.add(deposit);
}

private void repaySubEvent(Loan loan) {
Account account = getAccount(loan);
if (loan.getLoanAmount() == loan.getBalance() || cannotRepay(account, loan)) {
return;
if (!cannotRepay(account, loan)) {
double amount = amountRandom.nextDouble() * (loan.getLoanAmount() - loan.getBalance());
Repay.createRepay(randomFarm, account, loan, amount);
}
double amount = amountRandom.nextDouble() * (loan.getLoanAmount() - loan.getBalance());
Repay repay = Repay.createRepay(randomFarm, account, loan, amount);
repays.add(repay);
}

public long getMultiplicityIdAndInc(Account from, Account to) {
Expand All @@ -111,17 +89,15 @@ private void transferSubEvent(Loan loan) {
Account target = targetAccounts.get(indexRandom.nextInt(targetAccounts.size()));
if (actionRandom.nextDouble() < 0.5) {
if (!cannotTransfer(account, target)) {
transfers.add(
Transfer.createLoanTransfer(randomFarm, account, target,
getMultiplicityIdAndInc(account, target),
amountRandom.nextDouble() * DatagenParams.transferMaxAmount));
Transfer.createLoanTransfer(randomFarm, account, target, loan,
getMultiplicityIdAndInc(account, target),
amountRandom.nextDouble() * DatagenParams.transferMaxAmount);
}
} else {
if (!cannotTransfer(target, account)) {
transfers.add(
Transfer.createLoanTransfer(randomFarm, target, account,
getMultiplicityIdAndInc(target, account),
amountRandom.nextDouble() * DatagenParams.transferMaxAmount));
Transfer.createLoanTransfer(randomFarm, target, account, loan,
getMultiplicityIdAndInc(target, account),
amountRandom.nextDouble() * DatagenParams.transferMaxAmount);
}
}
}
Expand All @@ -132,11 +108,12 @@ public boolean cannotTransfer(Account from, Account to) {
}

public boolean cannotDeposit(Loan from, Account to) {
return from.getCreationDate() + DatagenParams.activityDelta > to.getDeletionDate();
return from.getBalance() == 0 || from.getCreationDate() + DatagenParams.activityDelta > to.getDeletionDate();
}

public boolean cannotRepay(Account from, Loan to) {
return from.getDeletionDate() < to.getCreationDate() + DatagenParams.activityDelta;
return to.getLoanAmount() == to.getBalance()
|| from.getDeletionDate() < to.getCreationDate() + DatagenParams.activityDelta;
}

private Account getAccount(Loan loan) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,9 @@ class ActivitySimulator(sink: RawSink)(implicit spark: SparkSession)
+ s"[Simulation] signIn RDD partitions: ${mediumWithSignInRdd.getNumPartitions}"
)

val loanRdd =
mergeLoans(personWithAccGuaLoan, companyWithAccGuaLoan) // merge
log.info(s"[Simulation] Loan RDD partitions: ${loanRdd.getNumPartitions}")
val (depositsRdd, repaysRdd, loanTrasfersRdd) =
activityGenerator.afterLoanSubEvents(loanRdd, accountRdd)
log.info(
s"[Simulation] deposits RDD partitions: ${depositsRdd.getNumPartitions}, " +
s"repays RDD partitions: ${repaysRdd.getNumPartitions}, " +
s"loanTrasfers RDD partitions: ${loanTrasfersRdd.getNumPartitions}"
)
val loanRdd = mergeLoans(personWithAccGuaLoan, companyWithAccGuaLoan)
val loanWithActivitiesRdd = activityGenerator.afterLoanSubEvents(loanRdd, accountRdd)
log.info(s"[Simulation] Loan RDD partitions: ${loanWithActivitiesRdd.getNumPartitions}")

// Serialize
val allFutures = Seq(
Expand All @@ -71,12 +64,7 @@ class ActivitySimulator(sink: RawSink)(implicit spark: SparkSession)
activitySerializer.writeMediumWithActivities(mediumWithSignInRdd),
activitySerializer.writeAccountWithActivities(accountWithTransferWithdraw),
activitySerializer.writeInvestCompanies(companyRddAfterInvest),
activitySerializer.writeLoanActivities(
loanRdd,
depositsRdd,
repaysRdd,
loanTrasfersRdd
)
activitySerializer.writeLoanActivities(loanWithActivitiesRdd)
).flatten

Await.result(Future.sequence(allFutures), Duration.Inf)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package ldbc.finbench.datagen.generation.generators

import ldbc.finbench.datagen.entities.edges._
import ldbc.finbench.datagen.entities.nodes._
import ldbc.finbench.datagen.generation.DatagenParams
import ldbc.finbench.datagen.generation.events._
Expand Down Expand Up @@ -161,11 +160,10 @@ class ActivityGenerator()(implicit spark: SparkSession)
})
}

// TODO: rewrite it with loan centric
def afterLoanSubEvents(
loanRDD: RDD[Loan],
accountRDD: RDD[Account]
): (RDD[Deposit], RDD[Repay], RDD[Transfer]) = {
): (RDD[Loan]) = {
val sampledAccounts = spark.sparkContext.broadcast(
accountRDD
.sample(
Expand All @@ -177,25 +175,16 @@ class ActivityGenerator()(implicit spark: SparkSession)
.toList
)

// TODO: optimize the map function with the Java-Scala part.
val afterLoanActions = loanRDD
.mapPartitionsWithIndex((index, loans) => {
val loanSubEvents = new LoanSubEvents(sampledAccounts.value.asJava)
loanSubEvents.afterLoanApplied(loans.toList.asJava, index)
Iterator(
(
loanSubEvents.getDeposits.asScala,
loanSubEvents.getRepays.asScala,
loanSubEvents.getTransfers.asScala
)
loanRDD.mapPartitionsWithIndex((index, loans) => {
val loanSubEvents = new LoanActivitiesEvents
loanSubEvents
.afterLoanApplied(
loans.toList.asJava,
sampledAccounts.value.asJava,
index
)
})
.cache()

(
afterLoanActions.flatMap(_._1),
afterLoanActions.flatMap(_._2),
afterLoanActions.flatMap(_._3)
)
.iterator()
.asScala
})
}
}
Loading

0 comments on commit cfa327a

Please sign in to comment.