Skip to content

Commit 1b44a3c

Browse files
adriangbclaude
andcommitted
Address PR feedback: use CsvOptions and supports_repartitioning()
- Remove separate `newlines_in_values` field from `CsvSource` - Use `CsvOptions.newlines_in_values` instead (already exists) - Rename `has_newlines_in_values()` to `supports_repartitioning()` in `FileSource` trait - describes capability, not CSV-specific reason - `CsvSource` overrides `supports_repartitioning()` to check options - Update proto serialization to set value in `CsvOptions` - Update documentation and migration guide 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent a712c5d commit 1b44a3c

File tree

5 files changed

+30
-47
lines changed

5 files changed

+30
-47
lines changed

datafusion/datasource-csv/src/file_format.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,7 @@ impl FileFormat for CsvFormat {
437437

438438
let mut csv_options = self.options.clone();
439439
csv_options.has_header = Some(has_header);
440+
csv_options.newlines_in_values = Some(newlines_in_values);
440441

441442
// Get the existing CsvSource and update its options
442443
// We need to preserve the table_schema from the original source (which includes partition columns)
@@ -445,12 +446,7 @@ impl FileFormat for CsvFormat {
445446
.as_any()
446447
.downcast_ref::<CsvSource>()
447448
.expect("file_source should be a CsvSource");
448-
let source = Arc::new(
449-
csv_source
450-
.clone()
451-
.with_csv_options(csv_options)
452-
.with_newlines_in_values(newlines_in_values),
453-
);
449+
let source = Arc::new(csv_source.clone().with_csv_options(csv_options));
454450

455451
let config = FileScanConfigBuilder::from(conf)
456452
.with_file_compression_type(self.options.compression.into())

datafusion/datasource-csv/src/source.rs

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,12 @@ use tokio::io::AsyncWriteExt;
7272
/// has_header: Some(true),
7373
/// delimiter: b',',
7474
/// quote: b'"',
75+
/// newlines_in_values: Some(true), // The file contains newlines in values
7576
/// ..Default::default()
7677
/// };
7778
/// let source = Arc::new(CsvSource::new(file_schema.clone())
7879
/// .with_csv_options(options)
7980
/// .with_terminator(Some(b'#'))
80-
/// .with_newlines_in_values(true) // The file contains newlines in values
8181
/// );
8282
/// // Create a DataSourceExec for reading the first 100MB of `file1.csv`
8383
/// let config = FileScanConfigBuilder::new(object_store_url, source)
@@ -93,8 +93,6 @@ pub struct CsvSource {
9393
projection: SplitProjection,
9494
metrics: ExecutionPlanMetricsSet,
9595
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
96-
/// Whether values may contain newline characters
97-
newlines_in_values: bool,
9896
}
9997

10098
impl CsvSource {
@@ -108,7 +106,6 @@ impl CsvSource {
108106
batch_size: None,
109107
metrics: ExecutionPlanMetricsSet::new(),
110108
schema_adapter_factory: None,
111-
newlines_in_values: false,
112109
}
113110
}
114111

@@ -180,19 +177,9 @@ impl CsvSource {
180177
conf
181178
}
182179

183-
/// Whether values may contain newline characters.
184-
///
185-
/// When enabled, scanning cannot be parallelized across a single file
186-
/// because newlines in values prevent determining record boundaries
187-
/// by byte offset alone.
180+
/// Whether values may contain newline characters
188181
pub fn newlines_in_values(&self) -> bool {
189-
self.newlines_in_values
190-
}
191-
192-
/// Set whether values may contain newline characters
193-
pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
194-
self.newlines_in_values = newlines_in_values;
195-
self
182+
self.options.newlines_in_values.unwrap_or(false)
196183
}
197184
}
198185

@@ -316,8 +303,10 @@ impl FileSource for CsvSource {
316303
"csv"
317304
}
318305

319-
fn has_newlines_in_values(&self) -> bool {
320-
self.newlines_in_values
306+
fn supports_repartitioning(&self) -> bool {
307+
// Cannot repartition if values may contain newlines, as record
308+
// boundaries cannot be determined by byte offset alone
309+
!self.options.newlines_in_values.unwrap_or(false)
321310
}
322311

323312
fn fmt_extra(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {

datafusion/datasource/src/file.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -84,17 +84,19 @@ pub trait FileSource: Send + Sync {
8484
Ok(())
8585
}
8686

87-
/// Returns whether this file source has values that may contain newline characters.
87+
/// Returns whether this file source supports repartitioning files by byte ranges.
8888
///
89-
/// This is primarily relevant for CSV files where quoted values can contain
90-
/// embedded newlines. When this returns `true`, files cannot be repartitioned
91-
/// by byte ranges because record boundaries cannot be determined by simple
92-
/// newline scanning.
89+
/// When this returns `true`, files can be split into multiple partitions
90+
/// based on byte offsets for parallel reading.
9391
///
94-
/// The default implementation returns `false`. CSV sources should override
95-
/// this method to return the appropriate value based on their configuration.
96-
fn has_newlines_in_values(&self) -> bool {
97-
false
92+
/// When this returns `false`, files cannot be repartitioned (e.g., CSV files
93+
/// with `newlines_in_values` enabled cannot be split because record boundaries
94+
/// cannot be determined by byte offset alone).
95+
///
96+
/// The default implementation returns `true`. File sources that cannot support
97+
/// repartitioning should override this method.
98+
fn supports_repartitioning(&self) -> bool {
99+
true
98100
}
99101

100102
/// If supported by the [`FileSource`], redistribute files across partitions
@@ -110,7 +112,7 @@ pub trait FileSource: Send + Sync {
110112
output_ordering: Option<LexOrdering>,
111113
config: &FileScanConfig,
112114
) -> Result<Option<FileScanConfig>> {
113-
if config.file_compression_type.is_compressed() || self.has_newlines_in_values() {
115+
if config.file_compression_type.is_compressed() || !self.supports_repartitioning() {
114116
return Ok(None);
115117
}
116118

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -634,6 +634,7 @@ impl protobuf::PhysicalPlanNode {
634634
has_header: Some(scan.has_header),
635635
delimiter: str_to_byte(&scan.delimiter, "delimiter")?,
636636
quote: str_to_byte(&scan.quote, "quote")?,
637+
newlines_in_values: Some(scan.newlines_in_values),
637638
..Default::default()
638639
};
639640
let source = Arc::new(
@@ -643,15 +644,6 @@ impl protobuf::PhysicalPlanNode {
643644
.with_comment(comment),
644645
);
645646

646-
let source = Arc::new(
647-
source
648-
.as_any()
649-
.downcast_ref::<CsvSource>()
650-
.expect("source must be CsvSource")
651-
.clone()
652-
.with_newlines_in_values(scan.newlines_in_values),
653-
);
654-
655647
let conf = FileScanConfigBuilder::from(parse_protobuf_file_scan_config(
656648
scan.base_conf.as_ref().unwrap(),
657649
ctx,

docs/source/library-user-guide/upgrading.md

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,17 +57,17 @@ See <https://github.com/apache/datafusion/issues/19056> for more details.
5757

5858
Note that the internal API has changed to use a trait `ListFilesCache` instead of a type alias.
5959

60-
### `newlines_in_values` moved from `FileScanConfig` to `CsvSource`
60+
### `newlines_in_values` moved from `FileScanConfig` to `CsvOptions`
6161

62-
The CSV-specific `newlines_in_values` configuration option has been moved from `FileScanConfig` to `CsvSource`, as it only applies to CSV file parsing.
62+
The CSV-specific `newlines_in_values` configuration option has been moved from `FileScanConfig` to `CsvOptions`, as it only applies to CSV file parsing.
6363

6464
**Who is affected:**
6565

6666
- Users who set `newlines_in_values` via `FileScanConfigBuilder::with_newlines_in_values()`
6767

6868
**Migration guide:**
6969

70-
Set `newlines_in_values` on `CsvSource` instead of `FileScanConfigBuilder`:
70+
Set `newlines_in_values` in `CsvOptions` instead of on `FileScanConfigBuilder`:
7171

7272
**Before:**
7373

@@ -81,8 +81,12 @@ let config = FileScanConfigBuilder::new(object_store_url, source)
8181
**After:**
8282

8383
```rust,ignore
84+
let options = CsvOptions {
85+
newlines_in_values: Some(true),
86+
..Default::default()
87+
};
8488
let source = Arc::new(CsvSource::new(file_schema.clone())
85-
.with_newlines_in_values(true));
89+
.with_csv_options(options));
8690
let config = FileScanConfigBuilder::new(object_store_url, source)
8791
.build();
8892
```

0 commit comments

Comments
 (0)