Skip to content

Commit

Permalink
Split up UI mapper into separate files for clarity
Browse files Browse the repository at this point in the history
  • Loading branch information
pflooky committed Oct 17, 2024
1 parent a29d98b commit c6e75b0
Show file tree
Hide file tree
Showing 16 changed files with 1,457 additions and 1,280 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class DeleteRecordProcessor(connectionConfigsByName: Map[String, Map[String, Str
})
}

def deleteRecords(dataSourceName: String, plan: Plan, step: Step, stepsByName: Map[String, Step] = Map(),
private def deleteRecords(dataSourceName: String, plan: Plan, step: Step, stepsByName: Map[String, Step] = Map(),
optSourceForeignKey: Option[String] = None, optFullForeignKey: Option[(ForeignKeyRelation, String)] = None): Unit = {
val format = step.options(FORMAT)
val subDataSourcePath = getSubDataSourcePath(dataSourceName, plan.name, step, recordTrackingFolderPath)
Expand Down Expand Up @@ -147,13 +147,13 @@ class DeleteRecordProcessor(connectionConfigsByName: Map[String, Map[String, Str
}
}

def getTrackedRecords(dataSourcePath: String): DataFrame = {
private def getTrackedRecords(dataSourcePath: String): DataFrame = {
sparkSession.read.format(RECORD_TRACKING_VALIDATION_FORMAT)
.option(PATH, dataSourcePath)
.load()
}

def deleteTrackedRecordsFile(dataSourcePath: String): Unit = {
private def deleteTrackedRecordsFile(dataSourcePath: String): Unit = {
new Directory(new File(dataSourcePath)).deleteRecursively()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package io.github.datacatering.datacaterer.core.ui.mapper

import io.github.datacatering.datacaterer.api.DataCatererConfigurationBuilder
import io.github.datacatering.datacaterer.api.connection.ConnectionTaskBuilder
import io.github.datacatering.datacaterer.api.model.Constants._
import io.github.datacatering.datacaterer.core.ui.model.ConfigurationRequest
import org.apache.log4j.Logger

object ConfigurationMapper {

private val LOGGER = Logger.getLogger(getClass.getName)

def configurationMapping(
configurationRequest: ConfigurationRequest,
installDirectory: String,
connections: List[ConnectionTaskBuilder[_]]
): DataCatererConfigurationBuilder = {
val isConnectionContainsMetadataSource = connections.exists(conn => conn.connectionConfigWithTaskBuilder.options.contains(METADATA_SOURCE_TYPE))
val configUpdatedFromConnections = if (isConnectionContainsMetadataSource) {
configurationRequest.copy(flag = configurationRequest.flag ++ Map(CONFIG_FLAGS_GENERATE_PLAN_AND_TASKS -> isConnectionContainsMetadataSource.toString))
} else configurationRequest

val baseConfig = DataCatererConfigurationBuilder()
val withFlagConf = mapFlagsConfiguration(configUpdatedFromConnections, baseConfig)
val withFolderConf = mapFolderConfiguration(configUpdatedFromConnections, installDirectory, withFlagConf)
val withMetadataConf = mapMetadataConfiguration(configUpdatedFromConnections, withFolderConf)
val withGenerationConf = mapGenerationConfiguration(configUpdatedFromConnections, withMetadataConf)
val withValidationConf = mapValidationConfiguration(configUpdatedFromConnections, withGenerationConf)
val withAlertConf = mapAlertConfiguration(configUpdatedFromConnections, withValidationConf)

withAlertConf
}

def mapFlagsConfiguration(configurationRequest: ConfigurationRequest, baseConfig: DataCatererConfigurationBuilder): DataCatererConfigurationBuilder = {
configurationRequest.flag.foldLeft(baseConfig)((conf, c) => {
val boolVal = c._2.toBoolean
c._1 match {
case CONFIG_FLAGS_COUNT => conf.enableCount(boolVal)
case CONFIG_FLAGS_GENERATE_DATA => conf.enableGenerateData(boolVal)
case CONFIG_FLAGS_RECORD_TRACKING => conf.enableRecordTracking(boolVal)
case CONFIG_FLAGS_DELETE_GENERATED_RECORDS => conf.enableDeleteGeneratedRecords(boolVal)
case CONFIG_FLAGS_GENERATE_PLAN_AND_TASKS => conf.enableGeneratePlanAndTasks(boolVal)
case CONFIG_FLAGS_FAIL_ON_ERROR => conf.enableFailOnError(boolVal)
case CONFIG_FLAGS_UNIQUE_CHECK => conf.enableUniqueCheck(boolVal)
case CONFIG_FLAGS_SINK_METADATA => conf.enableSinkMetadata(boolVal)
case CONFIG_FLAGS_SAVE_REPORTS => conf.enableSaveReports(boolVal)
case CONFIG_FLAGS_VALIDATION => conf.enableValidation(boolVal)
case CONFIG_FLAGS_GENERATE_VALIDATIONS => conf.enableGenerateValidations(boolVal)
case CONFIG_FLAGS_ALERTS => conf.enableAlerts(boolVal)
case _ =>
LOGGER.warn(s"Unexpected flags configuration key, key=${c._1}")
conf
}
})
}

def mapAlertConfiguration(configurationRequest: ConfigurationRequest, baseConfig: DataCatererConfigurationBuilder): DataCatererConfigurationBuilder = {
configurationRequest.alert.foldLeft(baseConfig)((conf, c) => {
c._1 match {
case CONFIG_ALERT_TRIGGER_ON => conf.alertTriggerOn(c._2)
case CONFIG_ALERT_SLACK_TOKEN => conf.slackAlertToken(c._2)
case CONFIG_ALERT_SLACK_CHANNELS => conf.slackAlertChannels(c._2.split(",").map(_.trim): _*)
case _ =>
LOGGER.warn(s"Unexpected alert configuration key, key=${c._1}")
conf
}
})
}

def mapValidationConfiguration(configurationRequest: ConfigurationRequest, baseConfig: DataCatererConfigurationBuilder): DataCatererConfigurationBuilder = {
configurationRequest.validation.foldLeft(baseConfig)((conf, c) => {
c._1 match {
case CONFIG_VALIDATION_NUM_SAMPLE_ERROR_RECORDS => conf.numErrorSampleRecords(c._2.toInt)
case CONFIG_VALIDATION_ENABLE_DELETE_RECORD_TRACKING_FILES => conf.enableDeleteRecordTrackingFiles(c._2.toBoolean)
case _ =>
LOGGER.warn(s"Unexpected validation configuration key, key=${c._1}")
conf
}
})
}

def mapGenerationConfiguration(configurationRequest: ConfigurationRequest, baseConfig: DataCatererConfigurationBuilder): DataCatererConfigurationBuilder = {
configurationRequest.generation.foldLeft(baseConfig)((conf, c) => {
c._1 match {
case CONFIG_GENERATION_NUM_RECORDS_PER_BATCH => conf.numRecordsPerBatch(c._2.toLong)
case CONFIG_GENERATION_NUM_RECORDS_PER_STEP =>
val parsedNum = c._2.toLong
if (parsedNum != -1) conf.numRecordsPerStep(c._2.toLong) else conf
case _ =>
LOGGER.warn(s"Unexpected generation configuration key, key=${c._1}")
conf
}
})
}

def mapMetadataConfiguration(configurationRequest: ConfigurationRequest, baseConfig: DataCatererConfigurationBuilder): DataCatererConfigurationBuilder = {
configurationRequest.metadata.foldLeft(baseConfig)((conf, c) => {
c._1 match {
case CONFIG_METADATA_NUM_RECORDS_FROM_DATA_SOURCE => conf.numRecordsFromDataSourceForDataProfiling(c._2.toInt)
case CONFIG_METADATA_NUM_RECORDS_FOR_ANALYSIS => conf.numRecordsForAnalysisForDataProfiling(c._2.toInt)
case CONFIG_METADATA_ONE_OF_DISTINCT_COUNT_VS_COUNT_THRESHOLD => conf.oneOfDistinctCountVsCountThreshold(c._2.toDouble)
case CONFIG_METADATA_ONE_OF_MIN_COUNT => conf.oneOfMinCount(c._2.toLong)
case CONFIG_METADATA_NUM_GENERATED_SAMPLES => conf.numGeneratedSamples(c._2.toInt)
case _ =>
LOGGER.warn(s"Unexpected metadata configuration key, key=${c._1}")
conf
}
})
}

def mapFolderConfiguration(configurationRequest: ConfigurationRequest, installDirectory: String, baseConfig: DataCatererConfigurationBuilder): DataCatererConfigurationBuilder = {
val nonEmptyFolderConfig = configurationRequest.folder.filter(_._2.nonEmpty).foldLeft(baseConfig)((conf, c) => {
c._1 match {
case CONFIG_FOLDER_PLAN_FILE_PATH => conf.planFilePath(c._2)
case CONFIG_FOLDER_TASK_FOLDER_PATH => conf.taskFolderPath(c._2)
case CONFIG_FOLDER_GENERATED_PLAN_AND_TASK_FOLDER_PATH => conf.generatedPlanAndTaskFolderPath(c._2)
case CONFIG_FOLDER_GENERATED_REPORTS_FOLDER_PATH => conf.generatedReportsFolderPath(c._2)
case CONFIG_FOLDER_RECORD_TRACKING_FOLDER_PATH => conf.recordTrackingFolderPath(c._2)
case CONFIG_FOLDER_VALIDATION_FOLDER_PATH => conf.validationFolderPath(c._2)
case CONFIG_FOLDER_RECORD_TRACKING_FOR_VALIDATION_FOLDER_PATH => conf.recordTrackingForValidationFolderPath(c._2)
case _ =>
LOGGER.warn(s"Unexpected folder configuration key, key=${c._1}")
conf
}
})
// should set the base directory to the install directory for most folders if not overridden
configurationRequest.folder.filter(_._2.isEmpty).foldLeft(nonEmptyFolderConfig)((conf, c) => {
c._1 match {
case CONFIG_FOLDER_PLAN_FILE_PATH => conf
case CONFIG_FOLDER_TASK_FOLDER_PATH => conf.taskFolderPath(s"$installDirectory/task")
case CONFIG_FOLDER_GENERATED_PLAN_AND_TASK_FOLDER_PATH => conf.generatedPlanAndTaskFolderPath(s"$installDirectory/generated-plan-task")
case CONFIG_FOLDER_GENERATED_REPORTS_FOLDER_PATH => conf.generatedReportsFolderPath(s"$installDirectory/report")
case CONFIG_FOLDER_RECORD_TRACKING_FOLDER_PATH => conf.recordTrackingFolderPath(s"$installDirectory/record-tracking")
case CONFIG_FOLDER_VALIDATION_FOLDER_PATH => conf.validationFolderPath(s"$installDirectory/validation")
case CONFIG_FOLDER_RECORD_TRACKING_FOR_VALIDATION_FOLDER_PATH => conf.recordTrackingForValidationFolderPath(s"$installDirectory/record-tracking-validation")
case _ =>
LOGGER.warn(s"Unexpected folder configuration key, key=${c._1}")
conf
}
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package io.github.datacatering.datacaterer.core.ui.mapper

import io.github.datacatering.datacaterer.api.ConnectionConfigWithTaskBuilder
import io.github.datacatering.datacaterer.api.connection.{ConnectionTaskBuilder, FileBuilder, JdbcBuilder}
import io.github.datacatering.datacaterer.api.model.Constants.{CASSANDRA_KEYSPACE, CASSANDRA_NAME, CASSANDRA_TABLE, CSV, DELTA, HTTP, ICEBERG, ICEBERG_CATALOG_GLUE, ICEBERG_CATALOG_HADOOP, ICEBERG_CATALOG_HIVE, ICEBERG_CATALOG_REST, ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_URI, JMS_CONNECTION_FACTORY, JMS_DESTINATION_NAME, JMS_INITIAL_CONTEXT_FACTORY, JMS_VPN_NAME, JSON, KAFKA, KAFKA_TOPIC, MYSQL, ORC, PARQUET, PASSWORD, PATH, POSTGRES, SCHEMA, SOLACE, SPARK_ICEBERG_CATALOG_TYPE, SPARK_ICEBERG_CATALOG_URI, SPARK_ICEBERG_CATALOG_WAREHOUSE, TABLE, URL, USERNAME}
import io.github.datacatering.datacaterer.core.ui.mapper.UiMapper.checkOptions
import io.github.datacatering.datacaterer.core.ui.model.DataSourceRequest

object ConnectionMapper {

def connectionMapping(dataSourceRequest: DataSourceRequest): ConnectionTaskBuilder[_] = {
dataSourceRequest.`type` match {
case Some(CASSANDRA_NAME) => createCassandraConnection(dataSourceRequest)
case Some(POSTGRES) => createJdbcConnection(dataSourceRequest, POSTGRES)
case Some(MYSQL) => createJdbcConnection(dataSourceRequest, MYSQL)
case Some(CSV) => createFileConnection(dataSourceRequest, CSV)
case Some(JSON) => createFileConnection(dataSourceRequest, JSON)
case Some(PARQUET) => createFileConnection(dataSourceRequest, PARQUET)
case Some(ORC) => createFileConnection(dataSourceRequest, ORC)
case Some(DELTA) => createFileConnection(dataSourceRequest, DELTA)
case Some(ICEBERG) => createIcebergConnection(dataSourceRequest)
case Some(SOLACE) =>
val opt = dataSourceRequest.options.getOrElse(Map())
checkOptions(dataSourceRequest.name, List(URL, USERNAME, PASSWORD, JMS_DESTINATION_NAME, JMS_VPN_NAME, JMS_CONNECTION_FACTORY, JMS_INITIAL_CONTEXT_FACTORY), opt)
ConnectionConfigWithTaskBuilder().solace(dataSourceRequest.name, opt(URL), opt(USERNAME), opt(PASSWORD),
opt(JMS_VPN_NAME), opt(JMS_CONNECTION_FACTORY), opt(JMS_INITIAL_CONTEXT_FACTORY), opt)
case Some(KAFKA) =>
val opt = dataSourceRequest.options.getOrElse(Map())
checkOptions(dataSourceRequest.name, List(URL, KAFKA_TOPIC), opt)
ConnectionConfigWithTaskBuilder().kafka(dataSourceRequest.name, opt(URL), opt)
case Some(HTTP) =>
val opt = dataSourceRequest.options.getOrElse(Map())
ConnectionConfigWithTaskBuilder().http(dataSourceRequest.name, opt.getOrElse(USERNAME, ""), opt.getOrElse(PASSWORD, ""), opt)
case Some(x) =>
throw new IllegalArgumentException(s"Unsupported data source from UI, data-source-type=$x")
case _ =>
throw new IllegalArgumentException(s"No data source type defined, unable to create connections, " +
s"data-source-name=${dataSourceRequest.name}, task-name=${dataSourceRequest.taskName}")
}
}

private def createFileConnection(dataSourceRequest: DataSourceRequest, format: String): FileBuilder = {
val opt = dataSourceRequest.options.getOrElse(Map())
checkOptions(dataSourceRequest.name, List(PATH), opt)
ConnectionConfigWithTaskBuilder().file(dataSourceRequest.name, format, opt(PATH), opt)
}

private def createIcebergConnection(dataSourceRequest: DataSourceRequest): FileBuilder = {
val opt = dataSourceRequest.options.getOrElse(Map())
val name = dataSourceRequest.name
checkOptions(name, List(ICEBERG_CATALOG_TYPE, TABLE), opt)
val baseSparkOpts = Map(
SPARK_ICEBERG_CATALOG_TYPE -> opt(ICEBERG_CATALOG_TYPE),
TABLE -> opt(TABLE)
)
val sparkOpts = opt(ICEBERG_CATALOG_TYPE) match {
case ICEBERG_CATALOG_HADOOP | ICEBERG_CATALOG_GLUE =>
checkOptions(name, List(PATH), opt)
Map(SPARK_ICEBERG_CATALOG_WAREHOUSE -> opt(PATH))
case ICEBERG_CATALOG_HIVE | ICEBERG_CATALOG_REST =>
checkOptions(name, List(ICEBERG_CATALOG_URI), opt)
Map(SPARK_ICEBERG_CATALOG_URI -> opt(ICEBERG_CATALOG_URI))
case _ => Map()
}
ConnectionConfigWithTaskBuilder().file(name, ICEBERG, opt.getOrElse(PATH, ""), baseSparkOpts ++ sparkOpts)
}

private def createJdbcConnection(dataSourceRequest: DataSourceRequest, format: String): JdbcBuilder[_] = {
val opt = dataSourceRequest.options.getOrElse(Map())
checkOptions(dataSourceRequest.name, List(URL, USERNAME, PASSWORD), opt)
val connectionConfigWithTaskBuilder = ConnectionConfigWithTaskBuilder()

val baseConnection = format match {
case POSTGRES => connectionConfigWithTaskBuilder.postgres(dataSourceRequest.name, opt(URL), opt(USERNAME), opt(PASSWORD), opt)
case MYSQL => connectionConfigWithTaskBuilder.mysql(dataSourceRequest.name, opt(URL), opt(USERNAME), opt(PASSWORD), opt)
case x => throw new IllegalArgumentException(s"Unsupported connection format, format=$x")
}

(opt.get(SCHEMA), opt.get(TABLE)) match {
case (Some(schema), Some(table)) => baseConnection.table(schema, table)
case (Some(schema), None) =>
assert(schema.nonEmpty, s"Empty schema name for $format connection, data-source-name=${dataSourceRequest.name}")
throw new IllegalArgumentException(s"Missing table name for $format connection, data-source-name=${dataSourceRequest.name}, schema=$schema")
case (None, Some(table)) =>
assert(table.nonEmpty, s"Empty table name for $format connection, data-source-name=${dataSourceRequest.name}")
throw new IllegalArgumentException(s"Missing schema name for $format connection, data-source-name=${dataSourceRequest.name}, table=$table")
case (None, None) => baseConnection // TODO this is allowed only when there is metadata collection enabled
}
}

private def createCassandraConnection(dataSourceRequest: DataSourceRequest) = {
val opt = dataSourceRequest.options.getOrElse(Map())
checkOptions(dataSourceRequest.name, List(URL, USERNAME, PASSWORD), opt)

val cassandraConnection = ConnectionConfigWithTaskBuilder().cassandra(dataSourceRequest.name, opt(URL), opt(USERNAME), opt(PASSWORD), opt)
(opt.get(CASSANDRA_KEYSPACE), opt.get(CASSANDRA_TABLE)) match {
case (Some(keyspace), Some(table)) => cassandraConnection.table(keyspace, table)
case (Some(keyspace), None) =>
assert(keyspace.nonEmpty, s"Empty keyspace name for Cassandra connection, data-source-name=${dataSourceRequest.name}")
throw new IllegalArgumentException(s"Missing table name for Cassandra connection, data-source-name=${dataSourceRequest.name}, keyspace=$keyspace")
case (None, Some(table)) =>
assert(table.nonEmpty, s"Empty table name for Cassandra connection, data-source-name=${dataSourceRequest.name}")
throw new IllegalArgumentException(s"Missing keyspace name for Cassandra connection, data-source-name=${dataSourceRequest.name}, table=$table")
case (None, None) => cassandraConnection // TODO this is allowed only when there is metadata collection enabled
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.github.datacatering.datacaterer.core.ui.mapper

import io.github.datacatering.datacaterer.api.{CountBuilder, GeneratorBuilder}
import io.github.datacatering.datacaterer.api.model.Constants.{DEFAULT_COUNT_RECORDS, DEFAULT_PER_COLUMN_COUNT_RECORDS, DISTRIBUTION_EXPONENTIAL, DISTRIBUTION_NORMAL}
import io.github.datacatering.datacaterer.core.ui.model.DataSourceRequest

object CountMapper {

def countMapping(dataSourceRequest: DataSourceRequest): CountBuilder = {
dataSourceRequest.count.map(recordCountRequest => {
val baseRecordCount = (recordCountRequest.records, recordCountRequest.recordsMin, recordCountRequest.recordsMax) match {
case (Some(records), None, None) => CountBuilder().records(records)
case (None, Some(min), Some(max)) => CountBuilder().generator(GeneratorBuilder().min(min).max(max))
case _ => CountBuilder().records(DEFAULT_COUNT_RECORDS)
}

val perColumnNames = recordCountRequest.perColumnNames.getOrElse(List())
if (perColumnNames.nonEmpty) {
(recordCountRequest.perColumnRecords, recordCountRequest.perColumnRecordsMin, recordCountRequest.perColumnRecordsMax,
recordCountRequest.perColumnRecordsDistribution, recordCountRequest.perColumnRecordsDistributionRateParam) match {
case (Some(records), None, None, None, None) => baseRecordCount.recordsPerColumn(records, perColumnNames: _*)
case (None, Some(min), Some(max), None, None) => baseRecordCount.recordsPerColumnGenerator(GeneratorBuilder().min(min).max(max), perColumnNames: _*)
case (None, Some(min), Some(max), Some(DISTRIBUTION_EXPONENTIAL), Some(rate)) => baseRecordCount.recordsPerColumnExponentialDistribution(min, max, rate.toDouble, perColumnNames: _*)
case (None, None, None, Some(DISTRIBUTION_EXPONENTIAL), Some(rate)) => baseRecordCount.recordsPerColumnExponentialDistribution(rate.toDouble, perColumnNames: _*)
case (None, Some(min), Some(max), Some(DISTRIBUTION_NORMAL), None) => baseRecordCount.recordsPerColumnNormalDistribution(min, max, perColumnNames: _*)
case _ => baseRecordCount.recordsPerColumn(DEFAULT_PER_COLUMN_COUNT_RECORDS, perColumnNames: _*)
}
} else {
baseRecordCount
}
}).getOrElse(CountBuilder())
}

}
Loading

0 comments on commit c6e75b0

Please sign in to comment.