From ddbf4c482e28b3c2cea9e5eb5fd4eaeef79e0dd0 Mon Sep 17 00:00:00 2001 From: Piotr Dulikowski Date: Tue, 14 Mar 2023 15:08:27 +0100 Subject: [PATCH] iterator: adjust to the new deserialization framework MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit finishes the work related to adjusting the iterators module to the new deserialization framework. The equivalent of the old RowIterator is now RawIterator (notice the vowel change). Despite the name, it cannot actually be iterated on, as it does not have any information about the column types. After calling the `into_typed()` method it gets converted into TypedRowIterator that can actually be iterated on. Unfortunately, due to the limitations of the Stream trait, currently the TypedRowIterator cannot deserialize types that need to borrow from the serialized frame contents. Co-authored-by: Wojciech Przytuła --- scylla/src/transport/connection.rs | 15 +- scylla/src/transport/iterator.rs | 349 +++++++++++++++++++++++------ scylla/src/transport/session.rs | 15 +- 3 files changed, 302 insertions(+), 77 deletions(-) diff --git a/scylla/src/transport/connection.rs b/scylla/src/transport/connection.rs index a337fdfbd..6b77b1b46 100644 --- a/scylla/src/transport/connection.rs +++ b/scylla/src/transport/connection.rs @@ -47,7 +47,7 @@ use std::{ }; use super::errors::{ProtocolError, UseKeyspaceProtocolError}; -use super::iterator::LegacyRowIterator; +use super::iterator::{LegacyRowIterator, RawIterator}; use super::locator::tablets::{RawTablet, TabletParsingError}; use super::query_result::QueryResult; use super::session::AddressTranslator; @@ -1188,13 +1188,9 @@ impl Connection { .determine_consistency(self.config.default_consistency); let serial_consistency = query.config.serial_consistency.flatten(); - LegacyRowIterator::new_for_connection_query_iter( - query, - self, - consistency, - serial_consistency, - ) - .await + RawIterator::new_for_connection_query_iter(query, self, consistency, serial_consistency) + .await + .map(RawIterator::into_legacy) } /// Executes a prepared statements and fetches its results over multiple pages, using @@ -1209,7 +1205,7 @@ impl Connection { .determine_consistency(self.config.default_consistency); let serial_consistency = prepared_statement.config.serial_consistency.flatten(); - LegacyRowIterator::new_for_connection_execute_iter( + RawIterator::new_for_connection_execute_iter( prepared_statement, values, self, @@ -1217,6 +1213,7 @@ impl Connection { serial_consistency, ) .await + .map(RawIterator::into_legacy) } #[allow(dead_code)] diff --git a/scylla/src/transport/iterator.rs b/scylla/src/transport/iterator.rs index 28d3c8dd3..bbf9347e3 100644 --- a/scylla/src/transport/iterator.rs +++ b/scylla/src/transport/iterator.rs @@ -1,7 +1,6 @@ //! Iterators over rows returned by paged queries use std::future::Future; -use std::mem; use std::net::SocketAddr; use std::ops::ControlFlow; use std::pin::Pin; @@ -9,21 +8,25 @@ use std::sync::Arc; use std::task::{Context, Poll}; use futures::Stream; -use scylla_cql::frame::request::query::PagingStateResponse; +use scylla_cql::frame::frame_errors::RowsParseError; use scylla_cql::frame::response::result::RawRows; use scylla_cql::frame::response::NonErrorResponse; +use scylla_cql::types::deserialize::result::RawRowsLendingIterator; +use scylla_cql::types::deserialize::row::{ColumnIterator, DeserializeRow}; +use scylla_cql::types::deserialize::TypeCheckError; use scylla_cql::types::serialize::row::SerializedValues; use std::result::Result; use thiserror::Error; use tokio::sync::mpsc; use super::execution_profile::ExecutionProfileInner; +use super::query_result::ColumnSpecs; use super::session::RequestSpan; use crate::cql_to_rust::{FromRow, FromRowError}; use crate::frame::response::{ result, - result::{ColumnSpec, Row, Rows}, + result::{ColumnSpec, Row}, }; use crate::history::{self, HistoryListener}; use crate::statement::{prepared_statement::PreparedStatement, query::Query}; @@ -535,46 +538,99 @@ where } } -/// Iterator over rows returned by paged queries\ -/// Allows to easily access rows without worrying about handling multiple pages -pub struct LegacyRowIterator { - current_row_idx: usize, - current_page: Rows, +/// An intermediate object that allows to construct an iterator over a query +/// that is asynchronously paged in the background. +/// +/// Before the results can be processed in a convenient way, the RawIterator +/// needs to be cast into a typed iterator. This is done by use of `into_typed()` method. +/// As the method is generic over the target type, the turbofish syntax +/// can come in handy there, e.g. `raw_iter.into_typed::<(i32, &str, Uuid)>()`. +/// +/// A pre-0.14.0 interface is also available, although deprecated: +/// `into_legacy()` method converts RawIterator to LegacyRowIterator, +/// enabling Stream'ed operation on rows being eagerly deserialized +/// to a middle-man Row type. This is inefficient, especially if +/// the Row type is not the intended target type. +pub struct RawIterator { + current_page: RawRowsLendingIterator, page_receiver: mpsc::Receiver>, tracing_ids: Vec, } -/// Fetching pages is asynchronous so `RowIterator` does not implement the `Iterator` trait.\ -/// Instead it uses the asynchronous `Stream` trait -impl Stream for LegacyRowIterator { - type Item = Result; +/// RawIterator is not an iterator or a stream! However, it implements +/// a `next()` method that returns a ColumnIterator<'r>, which can be used +/// to manually deserialize a row. +/// The ColumnIterator borrows from the RawIterator, and the futures::Stream trait +/// does not allow for such a pattern. Lending streams are not a thing yet. +impl RawIterator { + /// Returns the next item (ColumnIterator) from the stream. + + /// This can be used with `type_check() for manual deserialization - see example below. + /// + /// This is not a part of the Stream interface because the returned iterator + /// borrows from self. + /// + /// This is cancel-safe. + /// + /// # Example + /// + /// // FIXME: change `text` to `rust` when Session API is migrated to the new deserialization framework. + /// ```text + /// # use scylla::Session; + /// # use std::error::Error; + /// # async fn check_only_compiles(session: &Session) -> Result<(), Box> { + /// use futures::stream::StreamExt; + /// use scylla::deserialize::DeserializeRow; + /// + /// let mut raw_iter = session + /// .query_iter("SELECT a, b FROM ks.t", &[]) + /// .await?; + /// + /// // Remember to type check! Failure to call type_check() can result + /// // in panics upon deserialization. + /// raw_iter.type_check::<(i32, i32)>()?; + /// + /// // Now that we type-checked, we can manually deserialize from RawIterator. + /// while let Some(column_iterator) = raw_iter.next().await.transpose()? { + /// let (a, b) = <(i32, i32) as DeserializeRow>::deserialize(column_iterator)?; + /// println!("a, b: {}, {}", a, b); + /// } + /// # Ok(()) + /// # } + /// ``` + pub async fn next(&mut self) -> Option> { + let res = std::future::poll_fn(|cx| Pin::new(&mut *self).poll_fill_page(cx)).await; + match res { + Some(Ok(())) => {} + Some(Err(err)) => return Some(Err(err)), + None => return None, + } - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.poll_next_internal(cx) + // We are guaranteed here to have a non-empty page, so unwrap + Some( + self.current_page + .next() + .unwrap() + .map_err(|e| RowsParseError::from(e).into()), + ) } -} -impl LegacyRowIterator { - fn poll_next_internal( - mut self: Pin<&mut Self>, + /// Tries to acquire a non-empty page, if current page is exhausted. + fn poll_fill_page<'r>( + mut self: Pin<&'r mut Self>, cx: &mut Context<'_>, - ) -> Poll>> { - if self.as_ref().is_current_page_exhausted() { - ready_some_ok!(self.as_mut().poll_next_page(cx)); + ) -> Poll>> { + if !self.is_current_page_exhausted() { + return Poll::Ready(Some(Ok(()))); } - - let mut s = self.as_mut(); - - let idx = s.current_row_idx; - if idx < s.current_page.rows.len() { - let row = mem::take(&mut s.current_page.rows[idx]); - s.current_row_idx += 1; - return Poll::Ready(Some(Ok(row))); + ready_some_ok!(self.as_mut().poll_next_page(cx)); + if self.is_current_page_exhausted() { + // Try again later + cx.waker().wake_by_ref(); + Poll::Pending + } else { + Poll::Ready(Some(Ok(()))) } - // We probably got a zero-sized page - // Yield, but tell that we are ready - cx.waker().wake_by_ref(); - Poll::Pending } /// Makes an attempt to acquire the next page (which may be empty). @@ -589,18 +645,9 @@ impl LegacyRowIterator { let mut s = self.as_mut(); let received_page = ready_some_ok!(Pin::new(&mut s.page_receiver).poll_recv(cx)); - let rows = match received_page - .rows - // As RowIteratorWorker manages paging itself, the paging state response - // returned to the user is always NoMorePages. It used to be so before - // the deserialization refactor, too. - .into_legacy_rows(PagingStateResponse::NoMorePages) - { - Ok(rows) => rows, - Err(err) => return Poll::Ready(Some(Err(err.into()))), - }; - s.current_page = rows; - s.current_row_idx = 0; + let raw_rows_with_deserialized_metadata = + received_page.rows.deserialize_owned_metadata()?; + s.current_page = RawRowsLendingIterator::new(raw_rows_with_deserialized_metadata); if let Some(tracing_id) = received_page.tracing_id { s.tracing_ids.push(tracing_id); @@ -609,12 +656,30 @@ impl LegacyRowIterator { Poll::Ready(Some(Ok(()))) } - /// Converts this iterator into an iterator over rows parsed as given type - pub fn into_typed(self) -> LegacyTypedRowIterator { - LegacyTypedRowIterator { - row_iterator: self, - phantom_data: Default::default(), - } + /// Type-checks the iterator against given type. + /// + /// This is automatically called upon transforming [RawIterator] into [TypedRowIterator]. + /// Can be used with `next()` for manual deserialization. See `next()` for an example. + #[inline] + pub fn type_check<'frame, RowT: DeserializeRow<'frame>>(&self) -> Result<(), TypeCheckError> { + RowT::type_check(self.column_specs().inner()) + } + + /// Casts the iterator to a given row type, enabling Stream'ed operations + /// on rows, which deserialize them in-fly to that given type. + /// Begins with performing type check. + #[inline] + pub fn into_typed<'frame, RowT: DeserializeRow<'frame>>( + self, + ) -> Result, TypeCheckError> { + TypedRowIterator::::new(self) + } + + /// Converts this iterator into an iterator over rows parsed as given type, + /// using the legacy deserialization framework. + #[inline] + pub fn into_legacy(self) -> LegacyRowIterator { + LegacyRowIterator { raw_iterator: self } } pub(crate) async fn new_for_query( @@ -888,16 +953,14 @@ impl LegacyRowIterator { // to the channel (the PageSendAttemptedProof helps enforce this) // - That future is polled in a tokio::task which isn't going to be // cancelled - let pages_received = receiver.recv().await.unwrap()?; - let rows = pages_received - .rows - .into_legacy_rows(PagingStateResponse::NoMorePages)?; + let page_received = receiver.recv().await.unwrap()?; + let raw_rows_with_deserialized_metadata = + page_received.rows.deserialize_owned_metadata()?; Ok(Self { - current_row_idx: 0, - current_page: rows, + current_page: RawRowsLendingIterator::new(raw_rows_with_deserialized_metadata), page_receiver: receiver, - tracing_ids: if let Some(tracing_id) = pages_received.tracing_id { + tracing_ids: if let Some(tracing_id) = page_received.tracing_id { vec![tracing_id] } else { Vec::new() @@ -906,17 +969,175 @@ impl LegacyRowIterator { } /// If tracing was enabled returns tracing ids of all finished page queries - pub fn get_tracing_ids(&self) -> &[Uuid] { + #[inline] + pub fn tracing_ids(&self) -> &[Uuid] { &self.tracing_ids } /// Returns specification of row columns - pub fn get_column_specs(&self) -> &[ColumnSpec] { - self.current_page.metadata.col_specs() + #[inline] + pub fn column_specs(&self) -> ColumnSpecs<'_> { + ColumnSpecs::new(self.current_page.metadata().col_specs()) } fn is_current_page_exhausted(&self) -> bool { - self.current_row_idx >= self.current_page.rows.len() + self.current_page.rows_remaining() == 0 + } +} + +/// Returned by [RawIterator::into_typed]. +/// +/// Does not implement [Stream], but permits deserialization of borrowed types. +/// To use [Stream] API (only accessible for owned types), use [TypedRowIterator::into_stream]. +pub struct TypedRowIterator { + raw_iterator: RawIterator, + _phantom: std::marker::PhantomData, +} + +impl Unpin for TypedRowIterator {} + +impl<'frame, RowT> TypedRowIterator +where + RowT: DeserializeRow<'frame>, +{ + fn new(raw_iterator: RawIterator) -> Result { + raw_iterator.type_check::()?; + + Ok(Self { + raw_iterator, + _phantom: Default::default(), + }) + } + + /// If tracing was enabled, returns tracing ids of all finished page queries. + #[inline] + pub fn tracing_ids(&self) -> &[Uuid] { + self.raw_iterator.tracing_ids() + } + + /// Returns specification of row columns + #[inline] + pub fn column_specs(&self) -> ColumnSpecs { + self.raw_iterator.column_specs() + } + + /// Stream-like next() implementation for TypedRowIterator. + /// + /// It also works with borrowed types! For example, &str is supported. + /// However, this is not a Stream. To create a Stream, use `into_stream()`. + #[inline] + pub async fn next(&'frame mut self) -> Option> { + self.raw_iterator.next().await.map(|res| { + res.and_then(|column_iterator| { + ::deserialize(column_iterator) + .map_err(|err| RowsParseError::from(err).into()) + }) + }) + } + + /// Stream-like try_next() implementation for TypedRowIterator. + /// + /// It also works with borrowed types! For example, &str is supported. + /// However, this is not a Stream. To create a Stream, use `into_stream()`. + #[inline] + pub async fn try_next(&'frame mut self) -> Result, QueryError> { + self.next().await.transpose() + } +} + +impl TypedRowIterator { + /// Transforms [TypedRowIterator] into [TypedRowStream]. + /// + /// If you deserialize to owned types only, use this method to unleash power of the `Stream` API. + /// This operation involves no runtime cost, but it limits the iterator to owned types only. + /// Therefore, if you want to work with borrowed types (e.g., to avoid heap allocations), + /// you can't use the `Stream` trait. + pub fn into_stream(self) -> TypedRowStream { + TypedRowStream { + typed_row_iterator: self, + } + } +} + +/// Returned by [TypedRowIterator::into_stream]. +/// +/// Implements [Stream], but only permits deserialization of owned types. +pub struct TypedRowStream { + typed_row_iterator: TypedRowIterator, +} + +impl Unpin for TypedRowStream {} + +/// Stream implementation for TypedRowStream. +/// +/// It only works with owned types! For example, &str is not supported. +impl Stream for TypedRowStream +where + RowT: for<'r> DeserializeRow<'r>, +{ + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut s = self.as_mut(); + + let next_fut = s.typed_row_iterator.raw_iterator.next(); + futures::pin_mut!(next_fut); + let iter = ready_some_ok!(next_fut.poll(cx)); + let value = >::deserialize(iter) + .map_err(|err| RowsParseError::from(err).into()); + Poll::Ready(Some(value)) + } +} + +/// Iterator over rows returned by paged queries. +/// +/// Allows to easily access rows without worrying about handling multiple pages. +pub struct LegacyRowIterator { + raw_iterator: RawIterator, +} + +impl Stream for LegacyRowIterator { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut s = self.as_mut(); + + let next_fut = s.raw_iterator.next(); + futures::pin_mut!(next_fut); + + let next_elem: Option, QueryError>> = match next_fut.poll(cx) { + Poll::Ready(next_elem) => next_elem, + Poll::Pending => return Poll::Pending, + }; + + let next_ready: Option = match next_elem { + Some(Ok(iter)) => { + Some(Row::deserialize(iter).map_err(|e| RowsParseError::from(e).into())) + } + Some(Err(e)) => Some(Err(e)), + None => None, + }; + + Poll::Ready(next_ready) + } +} + +impl LegacyRowIterator { + /// If tracing was enabled returns tracing ids of all finished page queries + pub fn get_tracing_ids(&self) -> &[Uuid] { + self.raw_iterator.tracing_ids() + } + + /// Returns specification of row columns + pub fn get_column_specs(&self) -> &[ColumnSpec<'_>] { + self.raw_iterator.column_specs().inner() + } + + pub fn into_typed(self) -> LegacyTypedRowIterator { + LegacyTypedRowIterator { + row_iterator: self, + _phantom_data: Default::default(), + } } } @@ -925,16 +1146,18 @@ impl LegacyRowIterator { /// Returned by `RowIterator::into_typed` pub struct LegacyTypedRowIterator { row_iterator: LegacyRowIterator, - phantom_data: std::marker::PhantomData, + _phantom_data: std::marker::PhantomData, } impl LegacyTypedRowIterator { /// If tracing was enabled returns tracing ids of all finished page queries + #[inline] pub fn get_tracing_ids(&self) -> &[Uuid] { self.row_iterator.get_tracing_ids() } /// Returns specification of row columns + #[inline] pub fn get_column_specs(&self) -> &[ColumnSpec] { self.row_iterator.get_column_specs() } diff --git a/scylla/src/transport/session.rs b/scylla/src/transport/session.rs index 0ec99ef41..4315909a1 100644 --- a/scylla/src/transport/session.rs +++ b/scylla/src/transport/session.rs @@ -4,6 +4,7 @@ use crate::batch::batch_values; #[cfg(feature = "cloud")] use crate::cloud::CloudConfig; +use crate::LegacyQueryResult; use crate::history; use crate::history::HistoryListener; @@ -42,7 +43,8 @@ use super::connection::QueryResponse; use super::connection::SslConfig; use super::errors::TracingProtocolError; use super::execution_profile::{ExecutionProfile, ExecutionProfileHandle, ExecutionProfileInner}; -use super::legacy_query_result::{LegacyQueryResult, MaybeFirstRowTypedError}; +use super::iterator::RawIterator; +use super::legacy_query_result::MaybeFirstRowTypedError; #[cfg(feature = "cloud")] use super::node::CloudEndpoint; use super::node::KnownNode; @@ -910,20 +912,21 @@ impl Session { .access(); if values.is_empty() { - LegacyRowIterator::new_for_query( + RawIterator::new_for_query( query, execution_profile, self.cluster.get_data(), self.metrics.clone(), ) .await + .map(RawIterator::into_legacy) } else { - // Making LegacyRowIterator::new_for_query work with values is too hard (if even possible) + // Making RawIterator::new_for_query work with values is too hard (if even possible) // so instead of sending one prepare to a specific connection on each iterator query, // we fully prepare a statement beforehand. let prepared = self.prepare(query).await?; let values = prepared.serialize_values(&values)?; - LegacyRowIterator::new_for_prepared_statement(PreparedIteratorConfig { + RawIterator::new_for_prepared_statement(PreparedIteratorConfig { prepared, values, execution_profile, @@ -931,6 +934,7 @@ impl Session { metrics: self.metrics.clone(), }) .await + .map(RawIterator::into_legacy) } } @@ -1313,7 +1317,7 @@ impl Session { .unwrap_or_else(|| self.get_default_execution_profile_handle()) .access(); - LegacyRowIterator::new_for_prepared_statement(PreparedIteratorConfig { + RawIterator::new_for_prepared_statement(PreparedIteratorConfig { prepared, values: serialized_values, execution_profile, @@ -1321,6 +1325,7 @@ impl Session { metrics: self.metrics.clone(), }) .await + .map(RawIterator::into_legacy) } /// Perform a batch request.\