Skip to content

Commit a712c5d

Browse files
adriangbclaude
andcommitted
Move newlines_in_values from FileScanConfig to CsvSource
This PR moves the CSV-specific `newlines_in_values` configuration option from `FileScanConfig` (a shared format-agnostic configuration) to `CsvSource` where it belongs. Changes: - Add `newlines_in_values` field and methods to `CsvSource` - Add `has_newlines_in_values()` method to `FileSource` trait - Update `FileSource::repartitioned()` to use the trait method - Remove `new_lines_in_values` from `FileScanConfig` and builder - Update proto serialization to use `CsvSource` - Update tests and documentation Closes apache#18453 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent fedddbc commit a712c5d

File tree

8 files changed

+85
-44
lines changed

8 files changed

+85
-44
lines changed

datafusion/datasource-arrow/src/source.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -460,9 +460,7 @@ impl FileSource for ArrowSource {
460460
// Use the default trait implementation logic for file format
461461
use datafusion_datasource::file_groups::FileGroupPartitioner;
462462

463-
if config.file_compression_type.is_compressed()
464-
|| config.new_lines_in_values
465-
{
463+
if config.file_compression_type.is_compressed() {
466464
return Ok(None);
467465
}
468466

datafusion/datasource-csv/src/file_format.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -445,11 +445,15 @@ impl FileFormat for CsvFormat {
445445
.as_any()
446446
.downcast_ref::<CsvSource>()
447447
.expect("file_source should be a CsvSource");
448-
let source = Arc::new(csv_source.clone().with_csv_options(csv_options));
448+
let source = Arc::new(
449+
csv_source
450+
.clone()
451+
.with_csv_options(csv_options)
452+
.with_newlines_in_values(newlines_in_values),
453+
);
449454

450455
let config = FileScanConfigBuilder::from(conf)
451456
.with_file_compression_type(self.options.compression.into())
452-
.with_newlines_in_values(newlines_in_values)
453457
.with_source(source)
454458
.build();
455459

datafusion/datasource-csv/src/source.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,11 @@ use tokio::io::AsyncWriteExt;
7777
/// let source = Arc::new(CsvSource::new(file_schema.clone())
7878
/// .with_csv_options(options)
7979
/// .with_terminator(Some(b'#'))
80+
/// .with_newlines_in_values(true) // The file contains newlines in values
8081
/// );
8182
/// // Create a DataSourceExec for reading the first 100MB of `file1.csv`
8283
/// let config = FileScanConfigBuilder::new(object_store_url, source)
8384
/// .with_file(PartitionedFile::new("file1.csv", 100*1024*1024))
84-
/// .with_newlines_in_values(true) // The file contains newlines in values;
8585
/// .build();
8686
/// let exec = (DataSourceExec::from_data_source(config));
8787
/// ```
@@ -93,6 +93,8 @@ pub struct CsvSource {
9393
projection: SplitProjection,
9494
metrics: ExecutionPlanMetricsSet,
9595
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
96+
/// Whether values may contain newline characters
97+
newlines_in_values: bool,
9698
}
9799

98100
impl CsvSource {
@@ -106,6 +108,7 @@ impl CsvSource {
106108
batch_size: None,
107109
metrics: ExecutionPlanMetricsSet::new(),
108110
schema_adapter_factory: None,
111+
newlines_in_values: false,
109112
}
110113
}
111114

@@ -176,6 +179,21 @@ impl CsvSource {
176179
conf.options.truncated_rows = Some(truncate_rows);
177180
conf
178181
}
182+
183+
/// Whether values may contain newline characters.
184+
///
185+
/// When enabled, scanning cannot be parallelized across a single file
186+
/// because newlines in values prevent determining record boundaries
187+
/// by byte offset alone.
188+
pub fn newlines_in_values(&self) -> bool {
189+
self.newlines_in_values
190+
}
191+
192+
/// Set whether values may contain newline characters
193+
pub fn with_newlines_in_values(mut self, newlines_in_values: bool) -> Self {
194+
self.newlines_in_values = newlines_in_values;
195+
self
196+
}
179197
}
180198

181199
impl CsvSource {
@@ -297,6 +315,11 @@ impl FileSource for CsvSource {
297315
fn file_type(&self) -> &str {
298316
"csv"
299317
}
318+
319+
fn has_newlines_in_values(&self) -> bool {
320+
self.newlines_in_values
321+
}
322+
300323
fn fmt_extra(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
301324
match t {
302325
DisplayFormatType::Default | DisplayFormatType::Verbose => {

datafusion/datasource/src/file.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,19 @@ pub trait FileSource: Send + Sync {
8484
Ok(())
8585
}
8686

87+
/// Returns whether this file source has values that may contain newline characters.
88+
///
89+
/// This is primarily relevant for CSV files where quoted values can contain
90+
/// embedded newlines. When this returns `true`, files cannot be repartitioned
91+
/// by byte ranges because record boundaries cannot be determined by simple
92+
/// newline scanning.
93+
///
94+
/// The default implementation returns `false`. CSV sources should override
95+
/// this method to return the appropriate value based on their configuration.
96+
fn has_newlines_in_values(&self) -> bool {
97+
false
98+
}
99+
87100
/// If supported by the [`FileSource`], redistribute files across partitions
88101
/// according to their size. Allows custom file formats to implement their
89102
/// own repartitioning logic.
@@ -97,7 +110,7 @@ pub trait FileSource: Send + Sync {
97110
output_ordering: Option<LexOrdering>,
98111
config: &FileScanConfig,
99112
) -> Result<Option<FileScanConfig>> {
100-
if config.file_compression_type.is_compressed() || config.new_lines_in_values {
113+
if config.file_compression_type.is_compressed() || self.has_newlines_in_values() {
101114
return Ok(None);
102115
}
103116

datafusion/datasource/src/file_scan_config.rs

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,6 @@ pub struct FileScanConfig {
159159
pub output_ordering: Vec<LexOrdering>,
160160
/// File compression type
161161
pub file_compression_type: FileCompressionType,
162-
/// Are new lines in values supported for CSVOptions
163-
pub new_lines_in_values: bool,
164162
/// File source such as `ParquetSource`, `CsvSource`, `JsonSource`, etc.
165163
pub file_source: Arc<dyn FileSource>,
166164
/// Batch size while creating new batches
@@ -250,7 +248,6 @@ pub struct FileScanConfigBuilder {
250248
statistics: Option<Statistics>,
251249
output_ordering: Vec<LexOrdering>,
252250
file_compression_type: Option<FileCompressionType>,
253-
new_lines_in_values: Option<bool>,
254251
batch_size: Option<usize>,
255252
expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
256253
partitioned_by_file_group: bool,
@@ -274,7 +271,6 @@ impl FileScanConfigBuilder {
274271
statistics: None,
275272
output_ordering: vec![],
276273
file_compression_type: None,
277-
new_lines_in_values: None,
278274
limit: None,
279275
constraints: None,
280276
batch_size: None,
@@ -413,16 +409,6 @@ impl FileScanConfigBuilder {
413409
self
414410
}
415411

416-
/// Set whether new lines in values are supported for CSVOptions
417-
///
418-
/// Parsing newlines in quoted values may be affected by execution behaviour such as
419-
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
420-
/// parsed successfully, which may reduce performance.
421-
pub fn with_newlines_in_values(mut self, new_lines_in_values: bool) -> Self {
422-
self.new_lines_in_values = Some(new_lines_in_values);
423-
self
424-
}
425-
426412
/// Set the batch_size property
427413
pub fn with_batch_size(mut self, batch_size: Option<usize>) -> Self {
428414
self.batch_size = batch_size;
@@ -472,7 +458,6 @@ impl FileScanConfigBuilder {
472458
statistics,
473459
output_ordering,
474460
file_compression_type,
475-
new_lines_in_values,
476461
batch_size,
477462
expr_adapter_factory: expr_adapter,
478463
partitioned_by_file_group,
@@ -484,7 +469,6 @@ impl FileScanConfigBuilder {
484469
});
485470
let file_compression_type =
486471
file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
487-
let new_lines_in_values = new_lines_in_values.unwrap_or(false);
488472

489473
FileScanConfig {
490474
object_store_url,
@@ -494,7 +478,6 @@ impl FileScanConfigBuilder {
494478
file_groups,
495479
output_ordering,
496480
file_compression_type,
497-
new_lines_in_values,
498481
batch_size,
499482
expr_adapter_factory: expr_adapter,
500483
statistics,
@@ -512,7 +495,6 @@ impl From<FileScanConfig> for FileScanConfigBuilder {
512495
statistics: Some(config.statistics),
513496
output_ordering: config.output_ordering,
514497
file_compression_type: Some(config.file_compression_type),
515-
new_lines_in_values: Some(config.new_lines_in_values),
516498
limit: config.limit,
517499
constraints: Some(config.constraints),
518500
batch_size: config.batch_size,
@@ -913,17 +895,6 @@ impl FileScanConfig {
913895
props.constraints().clone()
914896
}
915897

916-
/// Specifies whether newlines in (quoted) values are supported.
917-
///
918-
/// Parsing newlines in quoted values may be affected by execution behaviour such as
919-
/// parallel file scanning. Setting this to `true` ensures that newlines in values are
920-
/// parsed successfully, which may reduce performance.
921-
///
922-
/// The default behaviour depends on the `datafusion.catalog.newlines_in_values` setting.
923-
pub fn newlines_in_values(&self) -> bool {
924-
self.new_lines_in_values
925-
}
926-
927898
#[deprecated(
928899
since = "52.0.0",
929900
note = "This method is no longer used, use eq_properties instead. It will be removed in 58.0.0 or 6 months after 52.0.0 is released, whichever comes first."
@@ -1722,7 +1693,6 @@ mod tests {
17221693
.into(),
17231694
])
17241695
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
1725-
.with_newlines_in_values(true)
17261696
.build();
17271697

17281698
// Verify the built config has all the expected values
@@ -1749,7 +1719,6 @@ mod tests {
17491719
config.file_compression_type,
17501720
FileCompressionType::UNCOMPRESSED
17511721
);
1752-
assert!(config.new_lines_in_values);
17531722
assert_eq!(config.output_ordering.len(), 1);
17541723
}
17551724

@@ -1844,7 +1813,6 @@ mod tests {
18441813
config.file_compression_type,
18451814
FileCompressionType::UNCOMPRESSED
18461815
);
1847-
assert!(!config.new_lines_in_values);
18481816
assert!(config.output_ordering.is_empty());
18491817
assert!(config.constraints.is_empty());
18501818

@@ -1892,7 +1860,6 @@ mod tests {
18921860
.with_limit(Some(10))
18931861
.with_file(file.clone())
18941862
.with_constraints(Constraints::default())
1895-
.with_newlines_in_values(true)
18961863
.build();
18971864

18981865
// Create a new builder from the config
@@ -1922,7 +1889,6 @@ mod tests {
19221889
"test_file.parquet"
19231890
);
19241891
assert_eq!(new_config.constraints, Constraints::default());
1925-
assert!(new_config.new_lines_in_values);
19261892
}
19271893

19281894
#[test]

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -643,13 +643,21 @@ impl protobuf::PhysicalPlanNode {
643643
.with_comment(comment),
644644
);
645645

646+
let source = Arc::new(
647+
source
648+
.as_any()
649+
.downcast_ref::<CsvSource>()
650+
.expect("source must be CsvSource")
651+
.clone()
652+
.with_newlines_in_values(scan.newlines_in_values),
653+
);
654+
646655
let conf = FileScanConfigBuilder::from(parse_protobuf_file_scan_config(
647656
scan.base_conf.as_ref().unwrap(),
648657
ctx,
649658
extension_codec,
650659
source,
651660
)?)
652-
.with_newlines_in_values(scan.newlines_in_values)
653661
.with_file_compression_type(FileCompressionType::UNCOMPRESSED)
654662
.build();
655663
Ok(DataSourceExec::from_data_source(conf))
@@ -2628,7 +2636,7 @@ impl protobuf::PhysicalPlanNode {
26282636
} else {
26292637
None
26302638
},
2631-
newlines_in_values: maybe_csv.newlines_in_values(),
2639+
newlines_in_values: csv_config.newlines_in_values(),
26322640
truncate_rows: csv_config.truncate_rows(),
26332641
},
26342642
)),

datafusion/proto/tests/cases/roundtrip_physical_plan.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -933,7 +933,6 @@ async fn roundtrip_parquet_exec_with_table_partition_cols() -> Result<()> {
933933
FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source)
934934
.with_projection_indices(Some(vec![0, 1]))?
935935
.with_file_group(FileGroup::new(vec![file_group]))
936-
.with_newlines_in_values(false)
937936
.build();
938937

939938
roundtrip_test(DataSourceExec::from_data_source(scan_config))

docs/source/library-user-guide/upgrading.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,36 @@ See <https://github.com/apache/datafusion/issues/19056> for more details.
5757

5858
Note that the internal API has changed to use a trait `ListFilesCache` instead of a type alias.
5959

60+
### `newlines_in_values` moved from `FileScanConfig` to `CsvSource`
61+
62+
The CSV-specific `newlines_in_values` configuration option has been moved from `FileScanConfig` to `CsvSource`, as it only applies to CSV file parsing.
63+
64+
**Who is affected:**
65+
66+
- Users who set `newlines_in_values` via `FileScanConfigBuilder::with_newlines_in_values()`
67+
68+
**Migration guide:**
69+
70+
Set `newlines_in_values` on `CsvSource` instead of `FileScanConfigBuilder`:
71+
72+
**Before:**
73+
74+
```rust,ignore
75+
let source = Arc::new(CsvSource::new(file_schema.clone()));
76+
let config = FileScanConfigBuilder::new(object_store_url, source)
77+
.with_newlines_in_values(true)
78+
.build();
79+
```
80+
81+
**After:**
82+
83+
```rust,ignore
84+
let source = Arc::new(CsvSource::new(file_schema.clone())
85+
.with_newlines_in_values(true));
86+
let config = FileScanConfigBuilder::new(object_store_url, source)
87+
.build();
88+
```
89+
6090
### Removal of `pyarrow` feature
6191

6292
The `pyarrow` feature flag has been removed. This feature has been migrated to

0 commit comments

Comments
 (0)