Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow user-defined transaction filter to strip transactions from indexer GRPC #15648

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::metrics::{
BYTES_READY_TO_TRANSFER_FROM_SERVER, BYTES_READY_TO_TRANSFER_FROM_SERVER_AFTER_STRIPPING,
CONNECTION_COUNT, ERROR_COUNT, LATEST_PROCESSED_VERSION_PER_PROCESSOR,
NUM_TRANSACTIONS_STRIPPED, PROCESSED_LATENCY_IN_SECS_PER_PROCESSOR,
PROCESSED_VERSIONS_COUNT_PER_PROCESSOR, SHORT_CONNECTION_COUNT,
use crate::{
metrics::{
BYTES_READY_TO_TRANSFER_FROM_SERVER, BYTES_READY_TO_TRANSFER_FROM_SERVER_AFTER_STRIPPING,
CONNECTION_COUNT, ERROR_COUNT, LATEST_PROCESSED_VERSION_PER_PROCESSOR,
NUM_TRANSACTIONS_STRIPPED, PROCESSED_LATENCY_IN_SECS_PER_PROCESSOR,
PROCESSED_VERSIONS_COUNT_PER_PROCESSOR, SHORT_CONNECTION_COUNT,
},
IndexerGrpcDataServiceConfig,
};
use anyhow::{Context, Result};
use aptos_indexer_grpc_utils::{
Expand Down Expand Up @@ -160,6 +163,22 @@ impl RawData for RawDataServerWrapper {
let request = req.into_inner();

let transactions_count = request.transactions_count;
let maybe_user_txn_to_strip_filter = match request.txns_to_strip_filter {
Some(txns_to_strip_filter) => serde_json::from_str(&txns_to_strip_filter),
None => Ok(IndexerGrpcDataServiceConfig::default_txns_to_strip_filter()),
};
let user_txn_to_strip_filter = match maybe_user_txn_to_strip_filter {
Ok(txns_to_strip_filter) => txns_to_strip_filter,
Err(e) => {
error!(
error = e.to_string(),
"[Data Service] Failed to parse txns_to_strip_filter."
);
return Result::Err(Status::invalid_argument(
"Failed to parse txns_to_strip_filter.",
));
},
};

// Response channel to stream the data to the client.
let (tx, rx) = channel(self.data_service_response_channel_size);
Expand Down Expand Up @@ -193,7 +212,11 @@ impl RawData for RawDataServerWrapper {
let redis_client = self.redis_client.clone();
let cache_storage_format = self.cache_storage_format;
let request_metadata = Arc::new(request_metadata);
let txns_to_strip_filter = self.txns_to_strip_filter.clone();
let config_txns_to_strip_filter = self.txns_to_strip_filter.clone();
let txns_to_strip_filter = BooleanTransactionFilter::new_or(vec![
user_txn_to_strip_filter,
config_txns_to_strip_filter,
]);
let in_memory_cache = self.in_memory_cache.clone();
tokio::spawn({
let request_metadata = request_metadata.clone();
Expand Down Expand Up @@ -1276,6 +1299,7 @@ mod tests {
assert_eq!(user_transaction.events.len(), 0);
assert_eq!(txn.info.as_ref().unwrap().changes.len(), 0);
}

#[test]
fn test_transactions_are_not_stripped() {
let txn = create_test_transaction(
Expand Down
7 changes: 7 additions & 0 deletions protos/proto/aptos/indexer/v1/raw_data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ message GetTransactionsRequest {
// Optional; number of transactions in each `TransactionsResponse` for current stream.
// If not present, default to 1000. If larger than 1000, request will be rejected.
optional uint64 batch_size = 3;

// Optional; JSON representation of BooleanTransactionFilter.
// Any transaction that matches this filter will be stripped. This means we remove
// the payload, signature, events, and writesets from it before sending it downstream.
// Generally you will want to start with this with an OR, and then list out
// separate filters that describe each type of txn we want to strip.
optional string txns_to_strip_filter = 4;
}

// TransactionsResponse is a batch of transactions.
Expand Down
31 changes: 19 additions & 12 deletions protos/python/aptos_protos/aptos/indexer/v1/raw_data_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 11 additions & 3 deletions protos/python/aptos_protos/aptos/indexer/v1/raw_data_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ from google.protobuf.internal import containers as _containers
DESCRIPTOR: _descriptor.FileDescriptor

class TransactionsInStorage(_message.Message):
__slots__ = ["transactions", "starting_version"]
__slots__ = ("transactions", "starting_version")
TRANSACTIONS_FIELD_NUMBER: _ClassVar[int]
STARTING_VERSION_FIELD_NUMBER: _ClassVar[int]
transactions: _containers.RepeatedCompositeFieldContainer[
Expand All @@ -28,22 +28,30 @@ class TransactionsInStorage(_message.Message):
) -> None: ...

class GetTransactionsRequest(_message.Message):
__slots__ = ["starting_version", "transactions_count", "batch_size"]
__slots__ = (
"starting_version",
"transactions_count",
"batch_size",
"txns_to_strip_filter",
)
STARTING_VERSION_FIELD_NUMBER: _ClassVar[int]
TRANSACTIONS_COUNT_FIELD_NUMBER: _ClassVar[int]
BATCH_SIZE_FIELD_NUMBER: _ClassVar[int]
TXNS_TO_STRIP_FILTER_FIELD_NUMBER: _ClassVar[int]
starting_version: int
transactions_count: int
batch_size: int
txns_to_strip_filter: str
def __init__(
self,
starting_version: _Optional[int] = ...,
transactions_count: _Optional[int] = ...,
batch_size: _Optional[int] = ...,
txns_to_strip_filter: _Optional[str] = ...,
) -> None: ...

class TransactionsResponse(_message.Message):
__slots__ = ["transactions", "chain_id"]
__slots__ = ("transactions", "chain_id")
TRANSACTIONS_FIELD_NUMBER: _ClassVar[int]
CHAIN_ID_FIELD_NUMBER: _ClassVar[int]
transactions: _containers.RepeatedCompositeFieldContainer[
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper
DESCRIPTOR: _descriptor.FileDescriptor

class TransactionsOutput(_message.Message):
__slots__ = ["transactions"]
__slots__ = ("transactions",)
TRANSACTIONS_FIELD_NUMBER: _ClassVar[int]
transactions: _containers.RepeatedCompositeFieldContainer[
_transaction_pb2.Transaction
Expand All @@ -26,10 +26,10 @@ class TransactionsOutput(_message.Message):
) -> None: ...

class StreamStatus(_message.Message):
__slots__ = ["type", "start_version", "end_version"]
__slots__ = ("type", "start_version", "end_version")

class StatusType(int, metaclass=_enum_type_wrapper.EnumTypeWrapper):
__slots__ = []
__slots__ = ()
STATUS_TYPE_UNSPECIFIED: _ClassVar[StreamStatus.StatusType]
STATUS_TYPE_INIT: _ClassVar[StreamStatus.StatusType]
STATUS_TYPE_BATCH_END: _ClassVar[StreamStatus.StatusType]
Expand All @@ -50,7 +50,7 @@ class StreamStatus(_message.Message):
) -> None: ...

class GetTransactionsFromNodeRequest(_message.Message):
__slots__ = ["starting_version", "transactions_count"]
__slots__ = ("starting_version", "transactions_count")
STARTING_VERSION_FIELD_NUMBER: _ClassVar[int]
TRANSACTIONS_COUNT_FIELD_NUMBER: _ClassVar[int]
starting_version: int
Expand All @@ -62,7 +62,7 @@ class GetTransactionsFromNodeRequest(_message.Message):
) -> None: ...

class TransactionsFromNodeResponse(_message.Message):
__slots__ = ["status", "data", "chain_id"]
__slots__ = ("status", "data", "chain_id")
STATUS_FIELD_NUMBER: _ClassVar[int]
DATA_FIELD_NUMBER: _ClassVar[int]
CHAIN_ID_FIELD_NUMBER: _ClassVar[int]
Expand Down
61 changes: 37 additions & 24 deletions protos/python/aptos_protos/aptos/transaction/v1/transaction_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading