Skip to content

Commit

Permalink
geyser: impl default filters limit in config (#103)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanatid authored Mar 27, 2023
1 parent ef39ed3 commit fc3f82a
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 113 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion yellowstone-grpc-geyser/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "yellowstone-grpc-geyser"
version = "0.6.0+solana.1.15.2"
version = "0.6.1+solana.1.15.2"
authors = ["Triton One"]
edition = "2021"
description = "Yellowstone gRPC Geyser Plugin"
Expand Down
102 changes: 67 additions & 35 deletions yellowstone-grpc-geyser/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ pub struct ConfigGrpc {
/// Capacity of the channel per connection
#[serde(
default = "ConfigGrpc::channel_capacity_default",
deserialize_with = "UsizeStr::deserialize_usize"
deserialize_with = "deserialize_usize_str"
)]
pub channel_capacity: usize,
/// Limits for possible filters
#[serde(default)]
pub filters: Option<ConfigGrpcFilters>,
pub filters: ConfigGrpcFilters,
}

impl ConfigGrpc {
Expand All @@ -75,7 +75,7 @@ impl ConfigGrpc {
}
}

#[derive(Debug, Clone, Deserialize)]
#[derive(Debug, Default, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigGrpcFilters {
pub accounts: ConfigGrpcFiltersAccounts,
Expand Down Expand Up @@ -135,76 +135,108 @@ pub struct ConfigGrpcFiltersAccounts {
pub owner_reject: HashSet<Pubkey>,
}

impl Default for ConfigGrpcFiltersAccounts {
fn default() -> Self {
Self {
max: usize::MAX,
any: true,
account_max: usize::MAX,
account_reject: HashSet::new(),
owner_max: usize::MAX,
owner_reject: HashSet::new(),
}
}
}

#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigGrpcFiltersSlots {
#[serde(deserialize_with = "deserialize_usize_str")]
pub max: usize,
}

impl Default for ConfigGrpcFiltersSlots {
fn default() -> Self {
Self { max: usize::MAX }
}
}

#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigGrpcFiltersTransactions {
#[serde(deserialize_with = "deserialize_usize_str")]
pub max: usize,
pub any: bool,
#[serde(deserialize_with = "deserialize_usize_str")]
pub account_include_max: usize,
#[serde(deserialize_with = "deserialize_pubkey_set")]
pub account_include_reject: HashSet<Pubkey>,
#[serde(deserialize_with = "deserialize_usize_str")]
pub account_exclude_max: usize,
}

impl Default for ConfigGrpcFiltersTransactions {
fn default() -> Self {
Self {
max: usize::MAX,
any: true,
account_include_max: usize::MAX,
account_include_reject: HashSet::new(),
account_exclude_max: usize::MAX,
}
}
}

#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigGrpcFiltersBlocks {
#[serde(deserialize_with = "deserialize_usize_str")]
pub max: usize,
}

impl Default for ConfigGrpcFiltersBlocks {
fn default() -> Self {
Self { max: usize::MAX }
}
}

#[derive(Debug, Clone, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigGrpcFiltersBlocksMeta {
#[serde(deserialize_with = "deserialize_usize_str")]
pub max: usize,
}

impl Default for ConfigGrpcFiltersBlocksMeta {
fn default() -> Self {
Self { max: usize::MAX }
}
}

#[derive(Debug, Clone, Copy, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct ConfigPrometheus {
/// Address of Prometheus service.
pub address: SocketAddr,
}

#[derive(Debug, Default, PartialEq, Eq, Hash)]
struct UsizeStr {
value: usize,
}

impl<'de> Deserialize<'de> for UsizeStr {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum Value {
Integer(usize),
String(String),
}

match Value::deserialize(deserializer)? {
Value::Integer(value) => Ok(UsizeStr { value }),
Value::String(value) => value
.replace('_', "")
.parse::<usize>()
.map_err(de::Error::custom)
.map(|value| UsizeStr { value }),
}
fn deserialize_usize_str<'de, D>(deserializer: D) -> Result<usize, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum Value {
Integer(usize),
String(String),
}
}

impl UsizeStr {
fn deserialize_usize<'de, D>(deserializer: D) -> Result<usize, D::Error>
where
D: Deserializer<'de>,
{
Ok(Self::deserialize(deserializer)?.value)
match Value::deserialize(deserializer)? {
Value::Integer(value) => Ok(value),
Value::String(value) => value
.replace('_', "")
.parse::<usize>()
.map_err(de::Error::custom),
}
}

Expand Down
111 changes: 46 additions & 65 deletions yellowstone-grpc-geyser/src/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,33 +35,25 @@ pub struct Filter {
}

impl Filter {
pub fn new(
config: &SubscribeRequest,
limit: Option<&ConfigGrpcFilters>,
) -> anyhow::Result<Self> {
pub fn new(config: &SubscribeRequest, limit: &ConfigGrpcFilters) -> anyhow::Result<Self> {
Ok(Self {
accounts: FilterAccounts::new(&config.accounts, limit.map(|v| &v.accounts))?,
slots: FilterSlots::new(&config.slots, limit.map(|v| &v.slots))?,
transactions: FilterTransactions::new(
&config.transactions,
limit.map(|v| &v.transactions),
)?,
blocks: FilterBlocks::new(&config.blocks, limit.map(|v| &v.blocks))?,
blocks_meta: FilterBlocksMeta::new(&config.blocks_meta, limit.map(|v| &v.blocks_meta))?,
accounts: FilterAccounts::new(&config.accounts, &limit.accounts)?,
slots: FilterSlots::new(&config.slots, &limit.slots)?,
transactions: FilterTransactions::new(&config.transactions, &limit.transactions)?,
blocks: FilterBlocks::new(&config.blocks, &limit.blocks)?,
blocks_meta: FilterBlocksMeta::new(&config.blocks_meta, &limit.blocks_meta)?,
})
}

fn decode_pubkeys<T: FromIterator<Pubkey>>(
pubkeys: &[String],
limit: Option<&HashSet<Pubkey>>,
limit: &HashSet<Pubkey>,
) -> anyhow::Result<T> {
pubkeys
.iter()
.map(|value| match Pubkey::from_str(value) {
Ok(pubkey) => {
if let Some(limit) = limit {
ConfigGrpcFilters::check_pubkey_reject(&pubkey, limit)?;
}
ConfigGrpcFilters::check_pubkey_reject(&pubkey, limit)?;
Ok(pubkey)
}
Err(error) => Err(error.into()),
Expand Down Expand Up @@ -92,35 +84,31 @@ struct FilterAccounts {
impl FilterAccounts {
fn new(
configs: &HashMap<String, SubscribeRequestFilterAccounts>,
limit: Option<&ConfigGrpcFiltersAccounts>,
limit: &ConfigGrpcFiltersAccounts,
) -> anyhow::Result<Self> {
if let Some(limit) = limit {
ConfigGrpcFilters::check_max(configs.len(), limit.max)?;
}
ConfigGrpcFilters::check_max(configs.len(), limit.max)?;

let mut this = Self::default();
for (name, filter) in configs {
if let Some(limit) = limit {
ConfigGrpcFilters::check_any(
filter.account.is_empty() && filter.owner.is_empty(),
limit.any,
)?;
ConfigGrpcFilters::check_pubkey_max(filter.account.len(), limit.account_max)?;
ConfigGrpcFilters::check_pubkey_max(filter.owner.len(), limit.owner_max)?;
}
ConfigGrpcFilters::check_any(
filter.account.is_empty() && filter.owner.is_empty(),
limit.any,
)?;
ConfigGrpcFilters::check_pubkey_max(filter.account.len(), limit.account_max)?;
ConfigGrpcFilters::check_pubkey_max(filter.owner.len(), limit.owner_max)?;

Self::set(
&mut this.account,
&mut this.account_required,
name,
Filter::decode_pubkeys(&filter.account, limit.map(|v| &v.account_reject))?,
Filter::decode_pubkeys(&filter.account, &limit.account_reject)?,
);

Self::set(
&mut this.owner,
&mut this.owner_required,
name,
Filter::decode_pubkeys(&filter.owner, limit.map(|v| &v.owner_reject))?,
Filter::decode_pubkeys(&filter.owner, &limit.owner_reject)?,
);

this.filters
Expand Down Expand Up @@ -310,11 +298,9 @@ struct FilterSlots {
impl FilterSlots {
fn new(
configs: &HashMap<String, SubscribeRequestFilterSlots>,
limit: Option<&ConfigGrpcFiltersSlots>,
limit: &ConfigGrpcFiltersSlots,
) -> anyhow::Result<Self> {
if let Some(limit) = limit {
ConfigGrpcFilters::check_max(configs.len(), limit.max)?;
}
ConfigGrpcFilters::check_max(configs.len(), limit.max)?;

Ok(Self {
filters: configs
Expand Down Expand Up @@ -347,31 +333,27 @@ pub struct FilterTransactions {
impl FilterTransactions {
fn new(
configs: &HashMap<String, SubscribeRequestFilterTransactions>,
limit: Option<&ConfigGrpcFiltersTransactions>,
limit: &ConfigGrpcFiltersTransactions,
) -> anyhow::Result<Self> {
if let Some(limit) = limit {
ConfigGrpcFilters::check_max(configs.len(), limit.max)?;
}
ConfigGrpcFilters::check_max(configs.len(), limit.max)?;

let mut this = Self::default();
for (name, filter) in configs {
if let Some(limit) = limit {
ConfigGrpcFilters::check_any(
filter.vote.is_none()
&& filter.failed.is_none()
&& filter.account_include.is_empty()
&& filter.account_exclude.is_empty(),
limit.any,
)?;
ConfigGrpcFilters::check_pubkey_max(
filter.account_include.len(),
limit.account_include_max,
)?;
ConfigGrpcFilters::check_pubkey_max(
filter.account_exclude.len(),
limit.account_exclude_max,
)?;
}
ConfigGrpcFilters::check_any(
filter.vote.is_none()
&& filter.failed.is_none()
&& filter.account_include.is_empty()
&& filter.account_exclude.is_empty(),
limit.any,
)?;
ConfigGrpcFilters::check_pubkey_max(
filter.account_include.len(),
limit.account_include_max,
)?;
ConfigGrpcFilters::check_pubkey_max(
filter.account_exclude.len(),
limit.account_exclude_max,
)?;

this.filters.insert(
name.clone(),
Expand All @@ -389,9 +371,12 @@ impl FilterTransactions {
.transpose()?,
account_include: Filter::decode_pubkeys(
&filter.account_include,
limit.map(|v| &v.account_include_reject),
&limit.account_include_reject,
)?,
account_exclude: Filter::decode_pubkeys(
&filter.account_exclude,
&HashSet::new(),
)?,
account_exclude: Filter::decode_pubkeys(&filter.account_exclude, None)?,
},
);
}
Expand Down Expand Up @@ -459,11 +444,9 @@ struct FilterBlocks {
impl FilterBlocks {
fn new(
configs: &HashMap<String, SubscribeRequestFilterBlocks>,
limit: Option<&ConfigGrpcFiltersBlocks>,
limit: &ConfigGrpcFiltersBlocks,
) -> anyhow::Result<Self> {
if let Some(limit) = limit {
ConfigGrpcFilters::check_max(configs.len(), limit.max)?;
}
ConfigGrpcFilters::check_max(configs.len(), limit.max)?;

Ok(Self {
filters: configs
Expand All @@ -487,11 +470,9 @@ struct FilterBlocksMeta {
impl FilterBlocksMeta {
fn new(
configs: &HashMap<String, SubscribeRequestFilterBlocksMeta>,
limit: Option<&ConfigGrpcFiltersBlocksMeta>,
limit: &ConfigGrpcFiltersBlocksMeta,
) -> anyhow::Result<Self> {
if let Some(limit) = limit {
ConfigGrpcFilters::check_max(configs.len(), limit.max)?;
}
ConfigGrpcFilters::check_max(configs.len(), limit.max)?;

Ok(Self {
filters: configs
Expand Down
Loading

0 comments on commit fc3f82a

Please sign in to comment.