Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When N of batch of statements is smaller than N of values, session.batch() silently drops extra values. #1114

Open
terencezl opened this issue Oct 30, 2024 · 2 comments

Comments

@terencezl
Copy link

I was trying to bulk ingest with the same statement. So I went like this:

    let N = 100;
    let mut batch = Batch::default();
    for _ in 0..N {
        batch.append_statement("INSERT INTO metadata.metadata (id, data) VALUES(?, ?)");
    }
    let mut prepared_batch = session.prepare_batch(&batch).await?;

    for chunk in &db_iter.into_iter().chunks(N) {
        ...
        let mut values = vec![];
        let mut c = 0;
        for item in chunk {
            values.push((id_str.to_owned(), data));
            c += 1;
        }
        if c != N {
            // init new prepared_batch to size
            batch = Batch::default();
            for _ in 0..c {
                batch.append_statement(
                    "INSERT INTO metadata.metadata (id, data) VALUES(?, ?)",
                );
            }
            prepared_batch = session.prepare_batch(&batch).await?;
        }
        session.batch(&prepared_batch, values).await?;
    }

My understanding is you need to duplicate the prepared statement 100 times in a batch, even if they are identical. And the N of values need to match N of statements in a batch. So I had to take care of the last batch being not rounded to the step size.

When I tried to play around and have more statements than values in the batch:

    for _ in 0..200 {
        batch.append_statement("INSERT INTO metadata.metadata (id, data) VALUES(?, ?)");
    }

I am seeing

Error: Invalid message: Frame error: Could not serialize frame: Length of provided values must be equal to number of batch statements (got 100 values, 200 statements)

That's good. However, when I commented out the initial, basically having an empty batch of statements:

//    for _ in 0..N {
//        batch.append_statement("INSERT INTO metadata.metadata (id, data) VALUES(?, ?)");
//    }

This was when N statements < N values, it's silently going through. I looked at the count through cqlsh, and the ingested count was 0. If I only had 1 statement in the batch, and 100 values, it would just ingest 1 single entry. So maybe the client was doing an iterator zip, and trimmed the extra values silently. I would like the client to error on the inequality of N statements and N values.

@Lorak-mmk
Copy link
Collaborator

I think I see the problem.
The code that serializes the batch does check for this condition:

if value_lists.skip_next().is_some() {
return Err(counts_mismatch_err(
n_serialized_statements + 1 /*skipped above*/ + value_lists.count(),
n_serialized_statements,
));
}

The problem is that in connection code here
let contexts = batch.statements.iter().map(|bs| match bs {
BatchStatement::Query(_) => RowSerializationContext::empty(),
BatchStatement::PreparedStatement(ps) => {
RowSerializationContext::from_prepared(ps.get_prepared_metadata())
}
});
let values = RawBatchValuesAdapter::new(values, contexts);
let batch_frame = batch::Batch {
statements: Cow::Borrowed(&batch.statements),
values,
batch_type: batch.get_type(),
consistency,
serial_consistency,
timestamp: batch.get_timestamp(),
};
we create a RawBatchValuesAdapter from values (with length N) and contexts (with length equal to amount of statements, so < N).
RawBatchValuesIteratorAdapter (
fn serialize_next(&mut self, writer: &mut RowWriter) -> Option<Result<(), SerializationError>> {
let ctx = self.contexts.next()?;
self.batch_values_iterator.serialize_next(&ctx, writer)
}
fn is_empty_next(&mut self) -> Option<bool> {
self.contexts.next()?;
let ret = self.batch_values_iterator.is_empty_next()?;
Some(ret)
}
#[inline]
fn skip_next(&mut self) -> Option<()> {
self.contexts.next()?;
self.batch_values_iterator.skip_next()?;
Some(())
}
), in its methods first calls let ctx = self.contexts.next()?; - effectively limiting the length of the iterator to the amount of statements.

Possible fix: RawBatchValuesIteratorAdapter should return Some(Err(...)) when length of contexts and values doesn't match,and the code in connection should propagate this error (instead of only checking for the element to be Some).

@Lorak-mmk
Copy link
Collaborator

It's a bit sad that we don't have a test for that :(

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants