Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/source_plugin/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,15 @@ impl Client {
sender: mpsc::UnboundedSender<MonitorZoneKernelEventReply>,
zone_exit_tx: mpsc::UnboundedSender<String>,
) -> Result<()> {
// Consistency check: should never hit this
if self.zone_pump_handles.contains_key(&zone_id) {
warn!(
"Pump already exists for zone {}, skipping duplicate",
zone_id
);
return Ok(());
}

let (cancel_tx, cancel_rx) = watch::channel::<bool>(false);
let done_cancel = cancel_tx.clone();
let zone_id_local = zone_id.clone();
Expand Down Expand Up @@ -244,7 +253,7 @@ impl Client {
cancel_rx: watch::Receiver<bool>,
zone_exit_tx: mpsc::UnboundedSender<String>,
) -> Result<JoinSet<()>> {
info!("Listening for kernel events from zone {}", zone_id);
debug!("listening for kernel events from zone {}", zone_id);
let event = MonitorZoneKernelEventRequest {
zone_id: zone_id.clone(),
request: Some(zk_req::Request::Update(ZoneKernelEventStreamUpdate {
Expand Down
14 changes: 10 additions & 4 deletions src/source_plugin/client/zone_creation_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anyhow::Result;
use tonic::transport::Channel;

use super::edera_client::zone_watcher::ZoneWatcher as watcher;
use log::{debug, error};
use log::{debug, error, warn};
use tokio::{sync::broadcast, task::JoinHandle};

pub struct ZoneWatcher {}
Expand Down Expand Up @@ -35,13 +35,19 @@ impl ZoneWatcher {
// to zones that are not in Ready state yet.
// We don't care about zones that aren't ready yet, so filter them out.
if zone.status == ZoneState::Ready {
// if it's ready, add it to the list whether we've seen it or not.
ready_zones.push(zid.clone());
// send it if we haven't seen it before.
if !last_ready_zones.contains(&zid) {
debug!("got new zone {:?}", zone);
let _ = tx.send(zid);
if tx.send(zid.clone()).is_err() {
// If send fails, don't track it, so we retry next iteration
warn!(
"failed to notify subcribers of zone {}, will retry",
zid
);
continue;
}
}
ready_zones.push(zid.clone());
}
}

Expand Down
59 changes: 39 additions & 20 deletions src/source_plugin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,40 +45,59 @@ impl EderaSourcePluginInstance {
batch: &mut EventBatch,
plugin: &mut EderaPlugin,
) -> Result<u32> {
let mut drained_count = 0;
let mut drained_event_count = 0;
let mut threadsnap_count = 0;
if let Some(rx) = &mut self.event_rx {
for _ in 0..max_count {
// This loop processes both syscall events and new-zone thread snapshots,
// which are interleaved on the same channel.
// The zone thread snapshots are required for initializing zone event
// watchers, and so we want to prioritize processing those, and
// (naturally) avoid counting them against "max_count", since we do not
// include threadsnaps in the set of events we hand to falco proper.
loop {
match rx.try_recv() {
Ok(event) => {
match event.reply {
// extract the internal syscall struct (if present) to
// pull out the timestamp and use it as the scap event's timestamp.
Some(Reply::Syscall(evt)) => {
let encoded = evt.encode_length_delimited_to_vec();
let mut wrapped_evt = Self::plugin_event(encoded.as_slice());
// set the wrapped evt TS to the original evt ts
wrapped_evt.metadata.ts = evt.timestamp;
batch.add(wrapped_evt).expect("event should add");
drained_count += 1;
}
Some(Reply::Threadsnap(snap)) => {
// We will silently *take* snapshot events here,
// filtering them out of the plugin event stream at this point,
// and populate our internal thread table with them, rather than
// adding them to the batch and passing them back to `scap`.
debug!(
"thread snapshot: zone_id {:?} entry_count: {}",
snap.zone_id,
snap.thread_info.len(),
);
threadsnap_count += 1;
info!("discovered and initialized zone {}", snap.zone_id,);
plugin.threadstate.init_zone_with_snap(snap);
}
Some(Reply::Syscall(evt)) => {
// extract the internal syscall struct (if present) to
// pull out the timestamp and use it as the scap event's timestamp.
let encoded = evt.encode_length_delimited_to_vec();
let mut wrapped_evt = Self::plugin_event(encoded.as_slice());
// set the wrapped evt TS to the original evt ts
wrapped_evt.metadata.ts = evt.timestamp;
batch.add(wrapped_evt).expect("event should add");
drained_event_count += 1;

// If we've hit max_count, stop draining
if drained_event_count >= max_count {
debug!(
"take_events: hit max_count={}, processed {} threadsnaps, {} syscalls",
max_count, threadsnap_count, drained_event_count
);
break;
}
}
None => {
warn!("got empty event!")
}
}
}
Err(mpsc::error::TryRecvError::Empty) => break,
Err(mpsc::error::TryRecvError::Empty) => {
debug!(
"take_events: channel empty after processing {} threadsnaps, {} syscalls",
threadsnap_count, drained_event_count
);
break;
}
Err(_) => return Err(anyhow!("channel closed")),
}
}
Expand All @@ -88,12 +107,12 @@ impl EderaSourcePluginInstance {
// we want to handle these before dealing with any subsequent events
if let Some(zone_dead_rx) = &mut self.zone_exit_rx {
while let Ok(term_zone_id) = zone_dead_rx.try_recv() {
debug!("zone {term_zone_id} is dead, dropping from threadsnap");
info!("stopped streaming events from zone {term_zone_id}");
plugin.threadstate.drop_zone_from_snap(&term_zone_id);
}
}

Ok(drained_count)
Ok(drained_event_count)
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/threadstate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3374,7 +3374,7 @@ impl ThreadState {
.expect("should parse");

let Some(zinfo) = self.zone_info.get_mut(&event.zone_id) else {
warn!("ignoring event for unknown zone {:?}", &event.zone_id);
debug!("ignoring event for unmonitored zone {:?}", &event.zone_id);
return Ok(());
};

Expand Down
Loading