Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support CQL Vector type #1022

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions scylla-cql/src/frame/frame_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub enum ParseError {
IoError(#[from] std::io::Error),
#[error("type not yet implemented, id: {0}")]
TypeNotImplemented(u16),
#[error("invalid custom type: {0}")]
InvalidCustomType(String),
#[error(transparent)]
SerializeValuesError(#[from] SerializeValuesError),
#[error(transparent)]
Expand Down
92 changes: 90 additions & 2 deletions scylla-cql/src/frame/response/result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ use crate::frame::{frame_errors::ParseError, types};
use crate::types::deserialize::result::{RowIterator, TypedRowIterator};
use crate::types::deserialize::value::{
mk_deser_err, BuiltinDeserializationErrorKind, DeserializeValue, MapIterator, UdtIterator,
VectorIterator,
};
use crate::types::deserialize::{DeserializationError, FrameSlice};
use bytes::{Buf, Bytes};
use std::borrow::Cow;
use std::{convert::TryInto, net::IpAddr, result::Result as StdResult, str};
use std::{convert::TryInto, mem, net::IpAddr, result::Result as StdResult, str};
use std::mem::ManuallyDrop;
use std::ops::Deref;
use uuid::Uuid;

#[derive(Debug)]
Expand Down Expand Up @@ -69,6 +72,7 @@ pub enum ColumnType {
Tuple(Vec<ColumnType>),
Uuid,
Varint,
Vector(Box<ColumnType>, u32),
}

#[derive(Clone, Debug, PartialEq)]
Expand All @@ -94,6 +98,7 @@ pub enum CqlValue {
List(Vec<CqlValue>),
Map(Vec<(CqlValue, CqlValue)>),
Set(Vec<CqlValue>),
Vector(DropOptimizedVec<CqlValue>),
UserDefinedType {
keyspace: String,
type_name: String,
Expand All @@ -112,6 +117,52 @@ pub enum CqlValue {
Varint(CqlVarint),
}


#[derive(Clone, Debug, PartialEq)]
pub struct DropOptimizedVec<T> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pkolaczk I'm curious, do you have a microbenchmark that suggests this does anything different to a regular Vec<T> on release mode?

data: Vec<T>,
drop_elements: bool
}

impl<T> DropOptimizedVec<T> {
pub fn new(data: Vec<T>, drop_elements: bool) -> DropOptimizedVec<T> {
DropOptimizedVec {
data,
drop_elements,
}
}

pub fn dropping(data: Vec<T>) -> DropOptimizedVec<T> {
Self::new(data, true)
}

pub fn non_dropping(data: Vec<T>) -> DropOptimizedVec<T> {
Self::new(data, false)
}

pub fn into_vec(mut self) -> Vec<T> {
let mut vec = vec![];
mem::swap(&mut self.data, &mut vec);
vec
}
}

impl<T> Deref for DropOptimizedVec<T> {
type Target = Vec<T>;

fn deref(&self) -> &Self::Target {
&self.data
}
}

impl<T> Drop for DropOptimizedVec<T> {
fn drop(&mut self) {
if !self.drop_elements {
unsafe { self.data.set_len(0); }
}
}
}

impl<'a> TableSpec<'a> {
pub const fn borrowed(ks: &'a str, table: &'a str) -> Self {
Self {
Expand Down Expand Up @@ -352,6 +403,7 @@ impl CqlValue {
pub fn as_list(&self) -> Option<&Vec<CqlValue>> {
match self {
Self::List(s) => Some(s),
Self::Vector(s) => Some(&s),
_ => None,
}
}
Expand Down Expand Up @@ -381,6 +433,7 @@ impl CqlValue {
match self {
Self::List(s) => Some(s),
Self::Set(s) => Some(s),
Self::Vector(s) => Some(s.into_vec()),
_ => None,
}
}
Expand Down Expand Up @@ -489,8 +542,19 @@ fn deser_type(buf: &mut &[u8]) -> StdResult<ColumnType, ParseError> {
Ok(match id {
0x0000 => {
let type_str: String = types::read_string(buf)?.to_string();
match type_str.as_str() {
let type_parts: Vec<_> = type_str.split(&[',', '(', ')']).collect();
match type_parts[0] {
"org.apache.cassandra.db.marshal.DurationType" => Duration,
"org.apache.cassandra.db.marshal.VectorType" => {
if type_parts.len() < 3 {
return Err(ParseError::InvalidCustomType(type_str));
}
let elem_type = parse_type_str(type_parts[1].trim())?;
let Ok(dimensions) = type_parts[2].trim().parse() else {
return Err(ParseError::InvalidCustomType(type_str));
};
Vector(Box::new(elem_type), dimensions)
}
_ => Custom(type_str),
}
}
Expand Down Expand Up @@ -552,6 +616,18 @@ fn deser_type(buf: &mut &[u8]) -> StdResult<ColumnType, ParseError> {
})
}

fn parse_type_str(name: &str) -> StdResult<ColumnType, ParseError> {
match name {
"org.apache.cassandra.db.marshal.BigIntType" => Ok(ColumnType::BigInt),
"org.apache.cassandra.db.marshal.DoubleType" => Ok(ColumnType::Double),
"org.apache.cassandra.db.marshal.FloatType" => Ok(ColumnType::Float),
"org.apache.cassandra.db.marshal.IntType" => Ok(ColumnType::Int),
"org.apache.cassandra.db.marshal.SmallIntType" => Ok(ColumnType::SmallInt),
"org.apache.cassandra.db.marshal.TinyIntType" => Ok(ColumnType::TinyInt),
_ => Err(ParseError::InvalidCustomType(name.to_string())),
}
}

fn deser_col_specs(
buf: &mut &[u8],
global_table_spec: &Option<TableSpec<'static>>,
Expand Down Expand Up @@ -802,6 +878,18 @@ pub fn deser_cql_value(
.collect::<StdResult<_, _>>()?;
CqlValue::Tuple(t)
}
// Specialization for faster deserialization of vectors of floats, which are currently
// the only type of vector
Vector(elem_type, _) if matches!(elem_type.as_ref(), Float) => {
let v = VectorIterator::<CqlValue>::deserialize_vector_of_float_to_vec_of_cql_value(
typ, v,
)?;
CqlValue::Vector(DropOptimizedVec::non_dropping(v))
}
Vector(_, _) => {
let v = Vec::<CqlValue>::deserialize(typ, v)?;
CqlValue::Vector(DropOptimizedVec::dropping(v))
}
})
}

Expand Down
2 changes: 1 addition & 1 deletion scylla-cql/src/frame/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl<'a> RawValue<'a> {
}
}

fn read_raw_bytes<'a>(
pub fn read_raw_bytes<'a>(
count: usize,
buf: &mut &'a [u8],
) -> Result<&'a [u8], LowLevelDeserializationError> {
Expand Down
1 change: 1 addition & 0 deletions scylla-cql/src/frame/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1414,6 +1414,7 @@ impl Value for CqlValue {

CqlValue::Ascii(s) | CqlValue::Text(s) => s.serialize(buf),
CqlValue::List(v) | CqlValue::Set(v) => v.serialize(buf),
CqlValue::Vector(v) => v.serialize(buf),

CqlValue::Blob(b) => b.serialize(buf),
CqlValue::Boolean(b) => b.serialize(buf),
Expand Down
2 changes: 1 addition & 1 deletion scylla-cql/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub mod _macro_internal {
pub use crate::frame::response::cql_to_rust::{
FromCqlVal, FromCqlValError, FromRow, FromRowError,
};
pub use crate::frame::response::result::{ColumnSpec, ColumnType, CqlValue, Row};
pub use crate::frame::response::result::{ColumnSpec, ColumnType, CqlValue, DropOptimizedVec, Row};
pub use crate::frame::value::{
LegacySerializedValues, SerializedResult, Value, ValueList, ValueTooBig,
};
Expand Down
29 changes: 29 additions & 0 deletions scylla-cql/src/types/deserialize/frame_slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,35 @@ impl<'frame> FrameSlice<'frame> {
original_frame: self.original_frame,
}))
}

/// Reads and consumes a fixed number of bytes item from the beginning of the frame,
/// returning a subslice that encompasses that item.
///
/// If this slice is empty, returns `Ok(None)`.
/// Otherwise, if the slice does not contain enough data, it returns `Err`.
/// If the operation fails then the slice remains unchanged.
#[inline]
pub(super) fn read_subslice(
&mut self,
count: usize,
) -> Result<Option<FrameSlice<'frame>>, LowLevelDeserializationError> {
if self.is_empty() {
return Ok(None);
}

// We copy the slice reference, not to mutate the FrameSlice in case of an error.
let mut slice = self.frame_subslice;

let cql_bytes = types::read_raw_bytes(count, &mut slice)?;

// `read_raw_bytes` hasn't failed, so now we must update the FrameSlice.
self.frame_subslice = slice;

Ok(Some(Self {
frame_subslice: cql_bytes,
original_frame: self.original_frame,
}))
}
}

#[cfg(test)]
Expand Down
Loading
Loading