Skip to content

Commit

Permalink
[SPARK-50616][SQL] Add File Extension Option to CSV DataSource Writer
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
The existing CSV DataSource allows one to set the delimiter/separator but does not allow the changing of the file extension. This means that a file can have values separated by tabs but me marked as a ".csv" file. This change allows one to change the file extension to match the delimiter/separator (e.g. ".tsv" for a tab separated value file).

### Why are the changes needed?
This PR adds an additional option to set the fileExtension. The end result is that when a separator is set that is not a comma that the output file has a file extension that matches the separator (e.g. file.tsv, file.psv, etc...).

Notes on Previous Pull Request #17973
A pull request adding this option was discussed 7 years ago. One reason it wasn't added was:
"I would like to suggest to leave this out if there is no better reason for now. Downside of this is, it looks this allows arbitrary name and it does not gurantee the extention is, say, tsv when the delmiter is a tab. It is purely up to the user."

I don't believe this is a good reason to not let the user set the extension. If we let them set the delimiter/separator to an arbitrary string/char then why not let the user also set the file extension to specify the separator that the file uses (e.g. tsv, psv, etc...). This addition keeps the "csv" file extension as the default and has the benefit of allowing other separators to match the file extension.

### Does this PR introduce _any_ user-facing change?
Yes. This PR adds one row to the options table for the CSV DataSource documentation to include the "fileExtension" option.

### How was this patch tested?
One unit test was added to validate a file is written with the new extension.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #49233 from jabbaugh/jbaugh-add-csv-file-ext.

Authored-by: Jim Baugh <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
Jim Baugh authored and dongjoon-hyun committed Jan 10, 2025
1 parent d3848ce commit f9cb80a
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 3 deletions.
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -2959,6 +2959,11 @@
"Unsupported dtype: <invalidValue>. Valid values: float64, float32."
]
},
"EXTENSION" : {
"message" : [
"Invalid extension: <invalidValue>. Extension is limited to exactly 3 letters (e.g. csv, tsv, etc...)"
]
},
"INTEGER" : {
"message" : [
"expects an integer literal, but got <invalidValue>."
Expand Down
6 changes: 6 additions & 0 deletions docs/sql-data-sources-csv.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ Data source options of CSV can be set via:
<td>Sets a separator for each field and value. This separator can be one or more characters.</td>
<td>read/write</td>
</tr>
<tr>
<td><code>extension</code></td>
<td>csv</td>
<td>Sets the file extension for the output files. Limited to letters. Length must equal 3.</td>
<td>write</td>
</tr>
<tr>
<td><code>encoding</code><br><code>charset</code></td>
<td>UTF-8</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,16 @@ class CSVOptions(

val delimiter = CSVExprUtils.toDelimiterStr(
parameters.getOrElse(SEP, parameters.getOrElse(DELIMITER, ",")))

val extension = {
val ext = parameters.getOrElse(EXTENSION, "csv")
if (ext.size != 3 && !ext.forall(_.isLetter)) {
throw QueryExecutionErrors.invalidFileExtensionError(EXTENSION, ext)
}

ext
}

val parseMode: ParseMode =
parameters.get(MODE).map(ParseMode.fromString).getOrElse(PermissiveMode)
val charset = parameters.get(ENCODING).orElse(parameters.get(CHARSET))
Expand Down Expand Up @@ -385,6 +395,7 @@ object CSVOptions extends DataSourceOptions {
val NEGATIVE_INF = newOption("negativeInf")
val TIME_ZONE = newOption("timeZone")
val UNESCAPED_QUOTE_HANDLING = newOption("unescapedQuoteHandling")
val EXTENSION = newOption("extension")
// Options with alternative
val ENCODING = "encoding"
val CHARSET = "charset"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2786,6 +2786,16 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
Map.empty
)

def invalidFileExtensionError(functionName: String, extension: String): RuntimeException = {
new SparkIllegalArgumentException(
errorClass = "INVALID_PARAMETER_VALUE.EXTENSION",
messageParameters = Map(
"functionName" -> toSQLId(functionName),
"parameter" -> toSQLId("extension"),
"fileExtension" -> toSQLId(extension),
"acceptable" -> "Extension is limited to exactly 3 letters (e.g. csv, tsv, etc...)"))
}

def invalidCharsetError(functionName: String, charset: String): RuntimeException = {
new SparkIllegalArgumentException(
errorClass = "INVALID_PARAMETER_VALUE.CHARSET",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
}

override def getFileExtension(context: TaskAttemptContext): String = {
".csv" + CodecStreams.getCompressionExtension(context)
"." + csvOptions.extension + CodecStreams.getCompressionExtension(context)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ case class CSVWrite(
}

override def getFileExtension(context: TaskAttemptContext): String = {
".csv" + CodecStreams.getCompressionExtension(context)
"." + csvOptions.extension + CodecStreams.getCompressionExtension(context)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3078,6 +3078,23 @@ abstract class CSVSuite
}
}

test("SPARK-50616: We can write with a tsv file extension") {
withTempPath { path =>
val input = Seq(
"1423-11-12T23:41:00",
"1765-03-28",
"2016-01-28T20:00:00"
).toDF().repartition(1)
input.write.option("extension", "tsv").csv(path.getAbsolutePath)

val files = Files.list(path.toPath)
.iterator().asScala.map(x => x.getFileName.toString)
.toList.filter(x => x.takeRight(3).equals("tsv"))

assert(files.size == 1)
}
}

test("SPARK-39904: Parse incorrect timestamp values") {
withTempPath { path =>
Seq(
Expand Down Expand Up @@ -3308,7 +3325,7 @@ abstract class CSVSuite
}

test("SPARK-40667: validate CSV Options") {
assert(CSVOptions.getAllOptions.size == 39)
assert(CSVOptions.getAllOptions.size == 40)
// Please add validation on any new CSV options here
assert(CSVOptions.isValidOption("header"))
assert(CSVOptions.isValidOption("inferSchema"))
Expand Down Expand Up @@ -3347,6 +3364,7 @@ abstract class CSVSuite
assert(CSVOptions.isValidOption("compression"))
assert(CSVOptions.isValidOption("codec"))
assert(CSVOptions.isValidOption("sep"))
assert(CSVOptions.isValidOption("extension"))
assert(CSVOptions.isValidOption("delimiter"))
assert(CSVOptions.isValidOption("columnPruning"))
// Please add validation on any new parquet options with alternative here
Expand Down

0 comments on commit f9cb80a

Please sign in to comment.