diff --git a/src/source_plugin/client/mod.rs b/src/source_plugin/client/mod.rs index 9580606..1d243c9 100644 --- a/src/source_plugin/client/mod.rs +++ b/src/source_plugin/client/mod.rs @@ -176,6 +176,15 @@ impl Client { sender: mpsc::UnboundedSender, zone_exit_tx: mpsc::UnboundedSender, ) -> Result<()> { + // Consistency check: should never hit this + if self.zone_pump_handles.contains_key(&zone_id) { + warn!( + "Pump already exists for zone {}, skipping duplicate", + zone_id + ); + return Ok(()); + } + let (cancel_tx, cancel_rx) = watch::channel::(false); let done_cancel = cancel_tx.clone(); let zone_id_local = zone_id.clone(); @@ -244,7 +253,7 @@ impl Client { cancel_rx: watch::Receiver, zone_exit_tx: mpsc::UnboundedSender, ) -> Result> { - info!("Listening for kernel events from zone {}", zone_id); + debug!("listening for kernel events from zone {}", zone_id); let event = MonitorZoneKernelEventRequest { zone_id: zone_id.clone(), request: Some(zk_req::Request::Update(ZoneKernelEventStreamUpdate { diff --git a/src/source_plugin/client/zone_creation_watcher.rs b/src/source_plugin/client/zone_creation_watcher.rs index 7b9f5cf..f353a8a 100644 --- a/src/source_plugin/client/zone_creation_watcher.rs +++ b/src/source_plugin/client/zone_creation_watcher.rs @@ -5,7 +5,7 @@ use anyhow::Result; use tonic::transport::Channel; use super::edera_client::zone_watcher::ZoneWatcher as watcher; -use log::{debug, error}; +use log::{debug, error, warn}; use tokio::{sync::broadcast, task::JoinHandle}; pub struct ZoneWatcher {} @@ -35,13 +35,19 @@ impl ZoneWatcher { // to zones that are not in Ready state yet. // We don't care about zones that aren't ready yet, so filter them out. if zone.status == ZoneState::Ready { - // if it's ready, add it to the list whether we've seen it or not. - ready_zones.push(zid.clone()); // send it if we haven't seen it before. if !last_ready_zones.contains(&zid) { debug!("got new zone {:?}", zone); - let _ = tx.send(zid); + if tx.send(zid.clone()).is_err() { + // If send fails, don't track it, so we retry next iteration + warn!( + "failed to notify subcribers of zone {}, will retry", + zid + ); + continue; + } } + ready_zones.push(zid.clone()); } } diff --git a/src/source_plugin/mod.rs b/src/source_plugin/mod.rs index 29f0f17..10f2f05 100644 --- a/src/source_plugin/mod.rs +++ b/src/source_plugin/mod.rs @@ -45,40 +45,59 @@ impl EderaSourcePluginInstance { batch: &mut EventBatch, plugin: &mut EderaPlugin, ) -> Result { - let mut drained_count = 0; + let mut drained_event_count = 0; + let mut threadsnap_count = 0; if let Some(rx) = &mut self.event_rx { - for _ in 0..max_count { + // This loop processes both syscall events and new-zone thread snapshots, + // which are interleaved on the same channel. + // The zone thread snapshots are required for initializing zone event + // watchers, and so we want to prioritize processing those, and + // (naturally) avoid counting them against "max_count", since we do not + // include threadsnaps in the set of events we hand to falco proper. + loop { match rx.try_recv() { Ok(event) => { match event.reply { - // extract the internal syscall struct (if present) to - // pull out the timestamp and use it as the scap event's timestamp. - Some(Reply::Syscall(evt)) => { - let encoded = evt.encode_length_delimited_to_vec(); - let mut wrapped_evt = Self::plugin_event(encoded.as_slice()); - // set the wrapped evt TS to the original evt ts - wrapped_evt.metadata.ts = evt.timestamp; - batch.add(wrapped_evt).expect("event should add"); - drained_count += 1; - } Some(Reply::Threadsnap(snap)) => { // We will silently *take* snapshot events here, // filtering them out of the plugin event stream at this point, // and populate our internal thread table with them, rather than // adding them to the batch and passing them back to `scap`. - debug!( - "thread snapshot: zone_id {:?} entry_count: {}", - snap.zone_id, - snap.thread_info.len(), - ); + threadsnap_count += 1; + info!("discovered and initialized zone {}", snap.zone_id,); plugin.threadstate.init_zone_with_snap(snap); } + Some(Reply::Syscall(evt)) => { + // extract the internal syscall struct (if present) to + // pull out the timestamp and use it as the scap event's timestamp. + let encoded = evt.encode_length_delimited_to_vec(); + let mut wrapped_evt = Self::plugin_event(encoded.as_slice()); + // set the wrapped evt TS to the original evt ts + wrapped_evt.metadata.ts = evt.timestamp; + batch.add(wrapped_evt).expect("event should add"); + drained_event_count += 1; + + // If we've hit max_count, stop draining + if drained_event_count >= max_count { + debug!( + "take_events: hit max_count={}, processed {} threadsnaps, {} syscalls", + max_count, threadsnap_count, drained_event_count + ); + break; + } + } None => { warn!("got empty event!") } } } - Err(mpsc::error::TryRecvError::Empty) => break, + Err(mpsc::error::TryRecvError::Empty) => { + debug!( + "take_events: channel empty after processing {} threadsnaps, {} syscalls", + threadsnap_count, drained_event_count + ); + break; + } Err(_) => return Err(anyhow!("channel closed")), } } @@ -88,12 +107,12 @@ impl EderaSourcePluginInstance { // we want to handle these before dealing with any subsequent events if let Some(zone_dead_rx) = &mut self.zone_exit_rx { while let Ok(term_zone_id) = zone_dead_rx.try_recv() { - debug!("zone {term_zone_id} is dead, dropping from threadsnap"); + info!("stopped streaming events from zone {term_zone_id}"); plugin.threadstate.drop_zone_from_snap(&term_zone_id); } } - Ok(drained_count) + Ok(drained_event_count) } } diff --git a/src/threadstate.rs b/src/threadstate.rs index fbed5fa..4105ef6 100644 --- a/src/threadstate.rs +++ b/src/threadstate.rs @@ -3374,7 +3374,7 @@ impl ThreadState { .expect("should parse"); let Some(zinfo) = self.zone_info.get_mut(&event.zone_id) else { - warn!("ignoring event for unknown zone {:?}", &event.zone_id); + debug!("ignoring event for unmonitored zone {:?}", &event.zone_id); return Ok(()); };