Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error building commit #2981

Open
Dan-J-D opened this issue Nov 8, 2024 · 0 comments
Open

Error building commit #2981

Dan-J-D opened this issue Nov 8, 2024 · 0 comments
Labels
bug Something isn't working

Comments

@Dan-J-D
Copy link

Dan-J-D commented Nov 8, 2024

Environment

Delta-rs version:
0.21.0

Binding: None (rust)

Environment: Linux Ubuntu 24

  • Cloud provider: None (Local)
  • OS: Linux
  • Other:

I am trying to create a fluvio ingest stream using PartitionWriter.

What happened: Building a commit causes an Error
Error building commit: Arrow { source: InvalidArgumentError("Found unmasked nulls for non-nullable StructArray field \"timestamp\"") }

What you expected to happen: Build the commit successfully and apply it.

How to reproduce it:

fn get_consumed_table_schema() -> Vec<StructField> {
    vec![
        StructField::new(
            String::from("timestamp"),
            DataType::Primitive(PrimitiveType::Timestamp),
            false,
        ),
        StructField::new(
            String::from("id"),
            DataType::Primitive(PrimitiveType::Long),
            true,
        ),
    ]
}

fn get_consumed_table_arrow_schema() -> Arc<ArrowSchema> {
    Arc::new(ArrowSchema::new(vec![
        Field::new(
            String::from("timestamp"),
            ArrowDataType::Timestamp(deltalake::arrow::datatypes::TimeUnit::Microsecond, None),
            false,
        ),
        Field::new(String::from("id"), ArrowDataType::Int32, true),
    ]))
}

async fn run_consumer(fluvio: Fluvio) {
    let db = match DeltaOps::try_from_uri("./delta-files/consumed").await {
        Ok(delta_ops) => {
            println!("delta lake created successfully");
            delta_ops
        }
        Err(e) => panic!("Error creating DeltaOps: {}", e),
    };

    let mut table = db
        .create()
        .with_columns(get_consumed_table_schema())
        .with_partition_columns(vec!["timestamp"])
        .with_table_name("consumed")
        .with_comment("consumed table")
        .with_configuration_property(deltalake::TableProperty::MinReaderVersion, Some("3"))
        .with_configuration_property(deltalake::TableProperty::MinWriterVersion, Some("7"))
        .await
        .expect("Error creating table");

    let writer_props = WriterProperties::builder()
        .set_compression(deltalake::parquet::basic::Compression::GZIP(
            GzipLevel::try_new(3).expect("Invalid level"),
        ))
        .build();

    let config = PartitionWriterConfig::try_new(
        get_consumed_table_arrow_schema(),
        IndexMap::new(),
        Some(writer_props),
        Some(10000),
        Some(100),
    )
    .expect("Error creating partition writer config");

    let mut writer = PartitionWriter::try_with_config(
        table.object_store(),
        config,
        2,
        Some(vec![String::from("timestamp"), String::from("id")]),
    )
    .expect("Error creating partition writer");

    let consumer_offset = match fluvio
        .consumer_offsets()
        .await
        .unwrap_or_default()
        .iter()
        .filter(|offset| offset.consumer_id == CONSUMER_ID)
        .next()
    {
        Some(offset) => offset.offset + 1,
        None => 0,
    };

    let consumer_config = match ConsumerConfigExt::builder()
        .topic(TOPIC_NAME)
        .isolation(Isolation::ReadCommitted)
        .offset_consumer(CONSUMER_ID)
        .offset_strategy(OffsetManagementStrategy::Manual)
        .offset_start(Offset::absolute(consumer_offset).expect("Invalid offset"))
        .build()
    {
        Ok(config) => config,
        Err(e) => panic!("Error creating consumer config: {}", e),
    };

    let mut consumer = match fluvio.consumer_with_config(consumer_config).await {
        Ok(consumer) => consumer,
        Err(e) => panic!("Error creating consumer: {}", e),
    };

    loop {
        select! {
            _ = tokio::signal::ctrl_c() => {
                break;
            }
            _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {
                consumer.offset_flush().await.expect("Error flushing offset");
            }
            message = consumer.next() => {
                match message {
                    Some(Ok(record)) => {
                        let value = String::from_utf8_lossy(record.value());
                        println!("Received message: {}", value);
                        let mut split = value.split(",");

                        let timestamp_vec = vec![split.next().expect("invalid timestamp").parse::<i64>().expect("Invalid timestamp")];
                        let id_vec = vec![split.next().expect("invalid id").parse::<i32>().expect("Invalid id")];

                        assert_eq!(id_vec.len(), 1);
                        assert_eq!(timestamp_vec.len(), 1);

                        writer
                        .write(
                            &RecordBatch::try_new(get_consumed_table_arrow_schema(),
                            vec![
                                Arc::new(TimestampMicrosecondArray::from(timestamp_vec)),
                                Arc::new(Int32Array::from(id_vec)),
                            ]).expect("Error creating record batch")
                        )
                        .await
                        .expect("Error writing record batch");

                        let add_metadata = writer.get_files_written_and_clear().await;
                        if add_metadata.len() > 0 {
                            println!("Added metadata: {:?}", add_metadata);

                            table.load().await.expect("Error loading table");
                            let _commit = CommitBuilder::default()
                                .with_actions(
                                    add_metadata
                                        .iter()
                                        .map(|x| Action::Add(x.clone()))
                                        .collect(),
                                )
                                // .with_max_retries(3)
                                .build(
                                    // Some(&table as &DeltaTableState).map(|s| s as &dyn TableReference),
                                    table.snapshot().ok().map(|s| s as &dyn TableReference),
                                    table.log_store().clone(),
                                    DeltaOperation::Write {
                                        mode: SaveMode::Append,
                                        partition_by: Some(vec![String::from("timestamp")]),
                                        predicate: None,
                                    },
                                ).await
                                .expect("Error building commit");

                            // match commit {
                            //     Ok(_) => {
                            //         println!("Commit successful");
                            //     }
                            //     Err(e) => {
                            //         println!("Error committing: {:?}", e);
                            //     }
                            // }

                            // table.update().await.expect("Error updating table");
                        }

                        println!("Received message: {}", value);

                        consumer.offset_commit().expect("Error committing offset");
                    }
                    Some(Err(e)) => panic!("Error fetching message: {}", e),
                    None => break,
                }
            }
        };
    }
}

More details:
The metadata printed out: [Add { path: "part-00001-cfd1b70a-ee26-4de8-9591-ee80ba136360-c000.gz.parquet", partition_values: {}, size: 4787, modification_time: 1731096447338, data_change: true, stats: Some("{\"numRecords\":678,\"minValues\":{\"id\":196,\"timestamp\":\"1970-01-21T00:51:21.460097Z\"},\"maxValues\":{\"timestamp\":\"1970-01-21T00:51:21.461534Z\",\"id\":873},\"nullCount\":{\"timestamp\":0,\"id\":0}}"), tags: None, deletion_vector: None, base_row_id: None, default_row_commit_version: None, clustering_provider: None, stats_parsed: None }]

I even asserted before it wrote any data into the arrow writer but it never panicked.

@Dan-J-D Dan-J-D added the bug Something isn't working label Nov 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant