Skip to content

Commit

Permalink
sort files and separate factor with datagen (#111)
Browse files Browse the repository at this point in the history
* sort files

* update tools markdown doc

* seperate factor and generate

* add note
  • Loading branch information
qishipengqsp authored Sep 29, 2024
1 parent 7599200 commit 946fc48
Show file tree
Hide file tree
Showing 117 changed files with 281 additions and 266 deletions.
24 changes: 24 additions & 0 deletions scripts/run_local.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@
LDBC_FINBENCH_DATAGEN_JAR=target/ldbc_finbench_datagen-0.2.0-SNAPSHOT-jar-with-dependencies.jar
OUTPUT_DIR=out

# Note: generate factor tables with --generate-factors

# run locally with the python script
# time python3 scripts/run.py --jar $LDBC_FINBENCH_DATAGEN_JAR --main-class ldbc.finbench.datagen.LdbcDatagen --memory 500g -- --scale-factor 30 --output-dir ${OUTPUT_DIR}

# run locally with spark-submit command
# **({'spark.driver.extraJavaOptions': '-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005'}), # Debug
# **({'spark.executor.extraJavaOptions': '-verbose:gc -XX:+UseG1GC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps'}),
# --conf "spark.memory.offHeap.enabled=true" \
# --conf "spark.memory.offHeap.size=100g" \
# --conf "spark.storage.memoryFraction=0" \
# --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \

time spark-submit --master local[*] \
--class ldbc.finbench.datagen.LdbcDatagen \
--driver-memory 480g \
Expand All @@ -24,3 +31,20 @@ time spark-submit --master local[*] \
${LDBC_FINBENCH_DATAGEN_JAR} \
--scale-factor 10 \
--output-dir ${OUTPUT_DIR}

# currently works on SF100
#time spark-submit --master local[*] \
# --class ldbc.finbench.datagen.LdbcDatagen \
# --driver-memory 400g \
# --conf "spark.default.parallelism=800" \
# --conf "spark.shuffle.compress=true" \
# --conf "spark.shuffle.spill.compress=true" \
# --conf "spark.kryoserializer.buffer.max=512m" \
# --conf "spark.driver.maxResultSize=0" \
# --conf "spark.driver.extraJavaOptions=-Xss512m" \
# --conf "spark.executor.extraJavaOptions=-Xss512m -XX:+UseG1GC" \
# --conf "spark.kryo.referenceTracking=false" \
# ${LDBC_FINBENCH_DATAGEN_JAR} \
# --scale-factor 100 \
# --output-dir ${OUTPUT_DIR}

34 changes: 16 additions & 18 deletions src/main/scala/ldbc/finbench/datagen/LdbcDatagen.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package ldbc.finbench.datagen

import ldbc.finbench.datagen.factors.FactorGenerationStage
import ldbc.finbench.datagen.generation.dictionary.Dictionaries
import ldbc.finbench.datagen.generation.GenerationStage
import ldbc.finbench.datagen.transformation.TransformationStage
import ldbc.finbench.datagen.util.{Logging, SparkApp}
import shapeless.lens

Expand All @@ -24,7 +22,7 @@ object LdbcDatagen extends SparkApp with Logging {
format: String = "csv",
formatOptions: Map[String, String] = Map.empty,
epochMillis: Boolean = false,
generateFactors: Boolean = true,
generateFactors: Boolean = false,
factorFormat: String = "parquet"
)

Expand Down Expand Up @@ -121,22 +119,22 @@ object LdbcDatagen extends SparkApp with Logging {
}

override def run(args: ArgsType): Unit = {
val generationArgs = GenerationStage.Args(
scaleFactor = args.scaleFactor,
outputDir = args.outputDir,
format = args.format,
partitionsOpt = args.numPartitions
)
log.info("[Main] Starting generation stage")
GenerationStage.run(generationArgs)

if (args.generateFactors) {
val factorArgs = FactorGenerationStage.Args(
outputDir = args.outputDir,
format = args.factorFormat
if (!args.generateFactors) {
GenerationStage.run(
GenerationStage.Args(
scaleFactor = args.scaleFactor,
outputDir = args.outputDir,
format = args.format,
partitionsOpt = args.numPartitions
)
)
} else {
FactorGenerationStage.run(
FactorGenerationStage.Args(
outputDir = args.outputDir,
format = args.factorFormat
)
)
log.info("[Main] Starting factoring stage")
// FactorGenerationStage.run(factorArgs)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ldbc.finbench.datagen.factors

import ldbc.finbench.datagen.LdbcDatagen.log
import ldbc.finbench.datagen.util.DatagenStage
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, SparkSession, functions => F}
Expand All @@ -10,7 +11,6 @@ import shapeless.lens
import scala.util.matching.Regex

object FactorGenerationStage extends DatagenStage {

@transient lazy val log: Logger = LoggerFactory.getLogger(this.getClass)

case class Args(
Expand Down Expand Up @@ -64,14 +64,14 @@ object FactorGenerationStage extends DatagenStage {
run(parsedArgs)
}

// execute factorization process
// TODO: finish all

override def run(args: Args) = {
parameterCuration(args)
factortables(args)
}

def parameterCuration(args: Args)(implicit spark: SparkSession) = {
def factortables(args: Args)(implicit spark: SparkSession) = {
import spark.implicits._
log.info("[Main] Starting factoring stage")

val transferRDD = spark.read
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
Expand Down Expand Up @@ -533,6 +533,5 @@ object FactorGenerationStage extends DatagenStage {
.option("delimiter", "|")
.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
.save(s"${args.outputDir}/factor_table/upstream_amount")

}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ldbc.finbench.datagen.generation

import ldbc.finbench.datagen.LdbcDatagen.log
import ldbc.finbench.datagen.config.{ConfigParser, DatagenConfiguration}
import ldbc.finbench.datagen.io.raw.{Csv, Parquet, RawSink}
import ldbc.finbench.datagen.util._
Expand All @@ -25,6 +26,7 @@ object GenerationStage extends DatagenStage with Logging {
override type ArgsType = Args

override def run(args: Args): Unit = {
log.info("[Main] Starting generation stage")
// build and initialize the configs
val config = buildConfig(args)
// OPT: It is called in each SparkGenerator in Spark to initialize the context on the executors.
Expand Down
4 changes: 0 additions & 4 deletions tools/DataProfiler/result/db139/profile.log

This file was deleted.

2 changes: 0 additions & 2 deletions tools/DataProfiler/result/db177/edges.log

This file was deleted.

4 changes: 0 additions & 4 deletions tools/DataProfiler/result/db177/profile.log

This file was deleted.

4 changes: 0 additions & 4 deletions tools/DataProfiler/result/db184/profile.log

This file was deleted.

2 changes: 0 additions & 2 deletions tools/DataProfiler/result/transfer/edges.log

This file was deleted.

4 changes: 0 additions & 4 deletions tools/DataProfiler/result/transfer/profile.log

This file was deleted.

64 changes: 60 additions & 4 deletions tools/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,62 @@
# Tools

Here lists some tools for graph data processing.
- dataprofiler: a tool for profiling graph data, including degree distribution, etc.
- graphgen: a simple tool/example code to generate power-law distributed graph data.
- paramgen: a Parameter Search tool to generate parameters for queries using TuGraph.
- paramgen:
- parameter_curation: a tool for generating parameters for finbench queries
- check_*.py: python scripts used for check the data features like consistency, distribution
- merge_cluster_output.py: a python script to merge the output in cluster mode
- statistic.py: a python script to calculate the statistics of the data
- legacy: some legacy tools
- dataprofiler: a tool for profiling graph data, including degree distribution, etc.
- graphgen: a simple tool/example code to generate power-law distributed graph data.
- factorgen: factor table generators in python version


## ParamsGen

`params_gen.py` uses the CREATE_VALIDATION feature to generate parameters.

The specific steps are as follows:

1. Select vertices of type Account, Person, and Loan from the dataset, and generate a parameter file that meets the input specifications for ldbc_finbench_driver.
2. Execute CREATE_VALIDATION to generate validation_params.csv.
3. Select non-empty results from validation_params.csv.

Example:

```bash
python3 params_gen.py 1 # gen tcr1 params
```

Other notes:

1. The generated start_timestamp and end_timestamp in the current version are fixed values.
2. For tcr4 and tcr10, this method is not efficient enough. Use the following Cypher query to search for parameters:

```Cypher
// tcr4
MATCH
(n1:Account)-[:transfer]->
(n2:Account)-[:transfer]->
(n3:Account)-[:transfer]->(n4:Account)
WHERE
n1.id = n4.id AND n1.id > n2.id AND n2.id > n3.id
WITH
n1.id as n1id,
n2.id as n2id,
n3.id as n3id,
n4.id as n4id
LIMIT 1000
RETURN DISTINCT toString(n1id)+"|"+toString(n2id)
// tcr10
MATCH
(c:Company)<-[:invest]-(p:Person)
WITH
c.id as cid,
count(p.id) as num,
collect(p.id) as person
WHERE num >= 2
RETURN
tostring(person[0])+"|"+tostring(person[1])
LIMIT 1000
```
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
49 changes: 0 additions & 49 deletions tools/paramgen/README.md

This file was deleted.

Loading

0 comments on commit 946fc48

Please sign in to comment.