From f8355adb2cc1851802090c57dc8c456ce85ea955 Mon Sep 17 00:00:00 2001 From: Gabor Gyimesi <gamezbird@gmail.com> Date: Tue, 7 Jan 2025 15:49:09 +0100 Subject: [PATCH] MINIFICPP-2502 Add processorBulletins C2 metric node to FlowInformation --- extensions/python/ExecutePythonProcessor.cpp | 1 + libminifi/include/core/BulletinStore.h | 47 ++++++++++++++++ libminifi/include/core/FlowConfiguration.h | 3 ++ libminifi/include/core/ProcessGroup.h | 12 ++++- libminifi/include/core/Processor.h | 53 +++++++++++++++++++ libminifi/include/core/ProcessorConfig.h | 19 ++----- libminifi/include/core/flow/FlowSchema.h | 1 + libminifi/include/core/logging/Logger.h | 44 ++++++++++++++- .../core/state/MetricsPublisherStore.h | 3 +- .../core/state/nodes/FlowInformation.h | 6 +++ .../core/state/nodes/ResponseNodeLoader.h | 4 +- libminifi/src/core/FlowConfiguration.cpp | 1 + libminifi/src/core/ProcessGroup.cpp | 2 + libminifi/src/core/Processor.cpp | 1 - libminifi/src/core/flow/FlowSchema.cpp | 2 + .../src/core/flow/StructuredConfiguration.cpp | 10 ++++ .../src/core/state/MetricsPublisherStore.cpp | 4 +- .../src/core/state/nodes/FlowInformation.cpp | 26 +++++++++ .../core/state/nodes/ResponseNodeLoader.cpp | 9 +++- libminifi/test/integration/C2MetricsTest.cpp | 24 ++++++++- .../libtest/integration/IntegrationBase.cpp | 6 ++- .../libtest/integration/IntegrationBase.h | 2 + libminifi/test/resources/TestC2Metrics.yml | 2 + minifi_main/MiNiFiMain.cpp | 9 ++-- 24 files changed, 260 insertions(+), 31 deletions(-) create mode 100644 libminifi/include/core/BulletinStore.h diff --git a/extensions/python/ExecutePythonProcessor.cpp b/extensions/python/ExecutePythonProcessor.cpp index 3cc2825148..e0f1ed52e1 100644 --- a/extensions/python/ExecutePythonProcessor.cpp +++ b/extensions/python/ExecutePythonProcessor.cpp @@ -162,6 +162,7 @@ std::unique_ptr<PythonScriptEngine> ExecutePythonProcessor::createScriptEngine() auto engine = std::make_unique<PythonScriptEngine>(); python_logger_ = core::logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName()); + setLoggerCallback(python_logger_, bulletin_store_); engine->initialize(Success, Failure, Original, python_logger_); return engine; diff --git a/libminifi/include/core/BulletinStore.h b/libminifi/include/core/BulletinStore.h new file mode 100644 index 0000000000..244997047c --- /dev/null +++ b/libminifi/include/core/BulletinStore.h @@ -0,0 +1,47 @@ +#pragma once + +#include <string> +#include <deque> +#include <mutex> + +namespace org::apache::nifi::minifi::core { + +struct Bulletin { + uint64_t id; + std::string timestamp; // "Tue Dec 10 08:40:26 CET 2024", + // std::string node_address; // empty + std::string level; + std::string category; + std::string message; + std::string group_id; + std::string group_name; + std::string group_path; + std::string source_id; + std::string source_name; + // std::string flow_file_uuid; // empty +}; + +class BulletinStore { + public: + void addBulletin(Bulletin& bulletin) { + std::lock_guard<std::mutex> lock(mutex_); + bulletin.id = id_counter++; + if (bulletins_.size() >= MAX_BULLETIN_COUNT) { + bulletins_.pop_front(); + } + bulletins_.push_back(bulletin); + } + + std::deque<Bulletin> getBulletins() const { + std::lock_guard<std::mutex> lock(mutex_); + return bulletins_; + } + + private: + static constexpr uint64_t MAX_BULLETIN_COUNT = 10000; + mutable std::mutex mutex_; + uint64_t id_counter = 1; + std::deque<Bulletin> bulletins_; +}; + +} // namespace org::apache::nifi::minifi::core diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index c73f0d9347..3a2193a499 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -42,6 +42,7 @@ #include "utils/file/FileSystem.h" #include "utils/ChecksumCalculator.h" #include "ParameterContext.h" +#include "core/BulletinStore.h" namespace org::apache::nifi::minifi::core { @@ -60,6 +61,7 @@ struct ConfigurationContext { std::optional<std::filesystem::path> path{std::nullopt}; std::shared_ptr<utils::file::FileSystem> filesystem{std::make_shared<utils::file::FileSystem>()}; std::optional<utils::crypto::EncryptionProvider> sensitive_values_encryptor{std::nullopt}; + core::BulletinStore* bulletin_store{nullptr}; }; enum class FlowSerializationType { Json, NifiJson, Yaml }; @@ -149,6 +151,7 @@ class FlowConfiguration : public CoreComponent { std::shared_ptr<utils::file::FileSystem> filesystem_; utils::crypto::EncryptionProvider sensitive_values_encryptor_; utils::ChecksumCalculator checksum_calculator_; + core::BulletinStore* bulletin_store_ = nullptr; private: virtual std::string serialize(const ProcessGroup&) { return ""; } diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h index 8f64ec2784..98cff31bea 100644 --- a/libminifi/include/core/ProcessGroup.h +++ b/libminifi/include/core/ProcessGroup.h @@ -153,7 +153,7 @@ class ProcessGroup : public CoreComponent { parent_process_group_ = parent; } // get parent process group - ProcessGroup *getParent() { + ProcessGroup *getParent() const { std::lock_guard<std::recursive_mutex> lock(mutex_); return parent_process_group_; } @@ -225,6 +225,16 @@ class ProcessGroup : public CoreComponent { return child_process_groups_; } + std::string buildGroupPath() const { + std::string path = getName(); + auto parent = getParent(); + while (parent != nullptr) { + path = parent->getName() + " / " + path; + parent = parent->getParent(); + } + return path; + } + protected: void startProcessingProcessors(TimerDrivenSchedulingAgent& timeScheduler, EventDrivenSchedulingAgent& eventScheduler, CronDrivenSchedulingAgent& cronScheduler); diff --git a/libminifi/include/core/Processor.h b/libminifi/include/core/Processor.h index 8dfc3db778..00e2a4881d 100644 --- a/libminifi/include/core/Processor.h +++ b/libminifi/include/core/Processor.h @@ -43,6 +43,9 @@ #include "ProcessorMetrics.h" #include "utils/gsl.h" #include "OutputAttributeDefinition.h" +#include "core/BulletinStore.h" +#include "core/logging/LoggerConfiguration.h" +#include "utils/TimeUtil.h" #define ADD_GET_PROCESSOR_NAME \ std::string getProcessorType() const override { \ @@ -173,6 +176,22 @@ class Processor : public Connectable, public ConfigurableComponent, public state process_group_uuid_ = uuid; } + std::string getProcessGroupName() const { + return process_group_name_; + } + + void setProcessGroupName(const std::string &name) { + process_group_name_ = name; + } + + std::string getProcessGroupPath() const { + return process_group_path_; + } + + void setProcessGroupPath(const std::string &path) { + process_group_path_ = path; + } + void yield() override; void yield(std::chrono::steady_clock::duration delta_time); @@ -226,6 +245,15 @@ class Processor : public Connectable, public ConfigurableComponent, public state return metrics_; } + void setLogBulletinLevel(spdlog::level::level_enum level) { + log_bulletin_level_ = level; + } + + void setLoggerCallback(core::BulletinStore* bulletin_store) { + bulletin_store_ = bulletin_store; + setLoggerCallback(logger_, bulletin_store); + } + static constexpr auto DynamicProperties = std::array<DynamicProperty, 0>{}; static constexpr auto OutputAttributes = std::array<OutputAttributeReference, 0>{}; @@ -234,6 +262,27 @@ class Processor : public Connectable, public ConfigurableComponent, public state virtual void notifyStop() { } + void setLoggerCallback(const std::shared_ptr<logging::Logger>& logger, core::BulletinStore* bulletin_store) { + logger->addLogCallback([this, bulletin_store](spdlog::level::level_enum level, const std::string& message) { + if (level < log_bulletin_level_) { + return; + } + if (bulletin_store) { + core::Bulletin bulletin; + bulletin.timestamp = minifi::utils::timeutils::getDateTimeStr(std::chrono::time_point_cast<std::chrono::seconds>(std::chrono::system_clock::now())); + bulletin.level = core::logging::mapSpdLogLevelToString(level); + bulletin.category = "Log Message"; + bulletin.message = message; + bulletin.group_id = getProcessGroupUUIDStr(); + bulletin.group_name = getProcessGroupName(); + bulletin.group_path = getProcessGroupPath(); + bulletin.source_id = getUUIDStr(); + bulletin.source_name = getName(); + bulletin_store->addBulletin(bulletin); + } + }); + } + std::atomic<ScheduledState> state_; std::atomic<std::chrono::steady_clock::duration> scheduling_period_; @@ -247,6 +296,8 @@ class Processor : public Connectable, public ConfigurableComponent, public state gsl::not_null<std::shared_ptr<ProcessorMetrics>> metrics_; std::shared_ptr<logging::Logger> logger_; + spdlog::level::level_enum log_bulletin_level_ = spdlog::level::warn; + core::BulletinStore* bulletin_store_ = nullptr; private: mutable std::mutex mutex_; @@ -266,6 +317,8 @@ class Processor : public Connectable, public ConfigurableComponent, public state std::unordered_map<Connection*, std::unordered_set<Processor*>> reachable_processors_; std::string process_group_uuid_; + std::string process_group_name_; + std::string process_group_path_; }; } // namespace core diff --git a/libminifi/include/core/ProcessorConfig.h b/libminifi/include/core/ProcessorConfig.h index e4706204cb..dd29562aa5 100644 --- a/libminifi/include/core/ProcessorConfig.h +++ b/libminifi/include/core/ProcessorConfig.h @@ -14,8 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_ -#define LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_ +#pragma once #include <string> #include <vector> @@ -23,12 +22,7 @@ #include "core/Core.h" #include "core/Property.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace core { - +namespace org::apache::nifi::minifi::core { constexpr const char* DEFAULT_SCHEDULING_STRATEGY{"TIMER_DRIVEN"}; constexpr const char* DEFAULT_SCHEDULING_PERIOD_STR{"1 sec"}; @@ -47,16 +41,11 @@ struct ProcessorConfig { std::string schedulingPeriod; std::string penalizationPeriod; std::string yieldPeriod; + std::string bulletinLevel; std::string runDurationNanos; std::vector<std::string> autoTerminatedRelationships; std::vector<core::Property> properties; std::string parameterContextName; }; -} // namespace core -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org - -#endif // LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_ +} // namespace org::apache::nifi::minifi::core diff --git a/libminifi/include/core/flow/FlowSchema.h b/libminifi/include/core/flow/FlowSchema.h index 6cbad427a0..238b481eaf 100644 --- a/libminifi/include/core/flow/FlowSchema.h +++ b/libminifi/include/core/flow/FlowSchema.h @@ -35,6 +35,7 @@ struct FlowSchema { Keys max_concurrent_tasks; Keys penalization_period; Keys proc_yield_period; + Keys bulletin_level; Keys runduration_nanos; Keys connections; diff --git a/libminifi/include/core/logging/Logger.h b/libminifi/include/core/logging/Logger.h index d3ddac807a..bf26564c77 100644 --- a/libminifi/include/core/logging/Logger.h +++ b/libminifi/include/core/logging/Logger.h @@ -89,6 +89,38 @@ inline LOG_LEVEL mapFromSpdLogLevel(spdlog::level::level_enum level) { throw std::invalid_argument(fmt::format("Invalid spdlog::level::level_enum {}", magic_enum::enum_underlying(level))); } +inline std::string mapSpdLogLevelToString(spdlog::level::level_enum level) { + switch (level) { + case spdlog::level::trace: return "TRACE"; + case spdlog::level::debug: return "DEBUG"; + case spdlog::level::info: return "INFO"; + case spdlog::level::warn: return "WARN"; + case spdlog::level::err: return "ERROR"; + case spdlog::level::critical: return "CRITICAL"; + case spdlog::level::off: return "OFF"; + case spdlog::level::n_levels: break; + } + throw std::invalid_argument(fmt::format("Invalid spdlog::level::level_enum {}", magic_enum::enum_underlying(level))); +} + +inline spdlog::level::level_enum mapStringToSpdLogLevel(const std::string& level_str) { + if (level_str == "TRACE") { + return spdlog::level::trace; + } else if (level_str == "DEBUG") { + return spdlog::level::debug; + } else if (level_str == "INFO") { + return spdlog::level::info; + } else if (level_str == "WARN") { + return spdlog::level::warn; + } else if (level_str == "ERROR") { + return spdlog::level::err; + } else if (level_str == "CRITICAL") { + return spdlog::level::critical; + } + + return spdlog::level::n_levels; +} + class BaseLogger { public: virtual ~BaseLogger(); @@ -156,6 +188,11 @@ class Logger : public BaseLogger { virtual std::optional<std::string> get_id() = 0; + void addLogCallback(std::function<void(spdlog::level::level_enum level, const std::string&)> callback) { + std::lock_guard<std::mutex> lock(mutex_); + log_callbacks_.push_back(std::move(callback)); + } + protected: Logger(std::shared_ptr<spdlog::logger> delegate, std::shared_ptr<LoggerControl> controller); @@ -191,10 +228,15 @@ class Logger : public BaseLogger { if (!delegate_->should_log(level)) { return; } - delegate_->log(level, stringify(std::move(fmt), map_args(std::forward<Args>(args))...)); + auto message = stringify(std::move(fmt), map_args(std::forward<Args>(args))...); + for (const auto& callback : log_callbacks_) { + callback(level, message); + } + delegate_->log(level, message); } std::atomic<int> max_log_size_{LOG_BUFFER_SIZE}; + std::vector<std::function<void(spdlog::level::level_enum level, const std::string&)>> log_callbacks_; }; } // namespace org::apache::nifi::minifi::core::logging diff --git a/libminifi/include/core/state/MetricsPublisherStore.h b/libminifi/include/core/state/MetricsPublisherStore.h index a1deb3cec0..a09a5509f1 100644 --- a/libminifi/include/core/state/MetricsPublisherStore.h +++ b/libminifi/include/core/state/MetricsPublisherStore.h @@ -28,13 +28,14 @@ #include "utils/gsl.h" #include "core/ProcessGroup.h" #include "utils/file/AssetManager.h" +#include "core/BulletinStore.h" namespace org::apache::nifi::minifi::state { class MetricsPublisherStore { public: MetricsPublisherStore(std::shared_ptr<Configure> configuration, const std::vector<std::shared_ptr<core::RepositoryMetricsSource>>& repository_metric_sources, - std::shared_ptr<core::FlowConfiguration> flow_configuration, utils::file::AssetManager* asset_manager = nullptr); + std::shared_ptr<core::FlowConfiguration> flow_configuration, utils::file::AssetManager* asset_manager = nullptr, core::BulletinStore* bulletin_store = nullptr); void initialize(core::controller::ControllerServiceProvider* controller, state::StateMonitor* update_sink); void loadMetricNodes(core::ProcessGroup* root); void clearMetricNodes(); diff --git a/libminifi/include/core/state/nodes/FlowInformation.h b/libminifi/include/core/state/nodes/FlowInformation.h index 5bfc257e14..53deebadea 100644 --- a/libminifi/include/core/state/nodes/FlowInformation.h +++ b/libminifi/include/core/state/nodes/FlowInformation.h @@ -27,6 +27,7 @@ #include "Connection.h" #include "core/state/ConnectionStore.h" #include "core/Processor.h" +#include "core/BulletinStore.h" namespace org::apache::nifi::minifi::state::response { @@ -121,6 +122,10 @@ class FlowInformation : public StateMonitorNode { processors_ = std::move(processors); } + void setBulletinStore(core::BulletinStore* bulletin_store) { + bulletin_store_ = bulletin_store; + } + std::vector<SerializedResponseNode> serialize() override; std::vector<PublishedMetric> calculateMetrics() override; @@ -128,6 +133,7 @@ class FlowInformation : public StateMonitorNode { std::shared_ptr<state::response::FlowVersion> flow_version_; ConnectionStore connection_store_; std::vector<core::Processor*> processors_; + core::BulletinStore* bulletin_store_ = nullptr; }; } // namespace org::apache::nifi::minifi::state::response diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h b/libminifi/include/core/state/nodes/ResponseNodeLoader.h index c947434318..6ab55faae9 100644 --- a/libminifi/include/core/state/nodes/ResponseNodeLoader.h +++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h @@ -35,13 +35,14 @@ #include "utils/expected.h" #include "core/RepositoryMetricsSource.h" #include "utils/file/AssetManager.h" +#include "core/BulletinStore.h" namespace org::apache::nifi::minifi::state::response { class ResponseNodeLoader { public: ResponseNodeLoader(std::shared_ptr<Configure> configuration, std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repository_metric_sources, - std::shared_ptr<core::FlowConfiguration> flow_configuration, utils::file::AssetManager* asset_manager = nullptr); + std::shared_ptr<core::FlowConfiguration> flow_configuration, utils::file::AssetManager* asset_manager = nullptr, core::BulletinStore* bulletin_store = nullptr); void setNewConfigRoot(core::ProcessGroup* root); void clearConfigRoot(); @@ -80,6 +81,7 @@ class ResponseNodeLoader { utils::file::AssetManager* asset_manager_{}; core::controller::ControllerServiceProvider* controller_{}; state::StateMonitor* update_sink_{}; + core::BulletinStore* bulletin_store_{}; std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<ResponseNodeLoader>::getLogger()}; }; diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index 53b1e8def1..69287a1555 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -36,6 +36,7 @@ FlowConfiguration::FlowConfiguration(ConfigurationContext ctx) service_provider_(std::make_shared<core::controller::StandardControllerServiceProvider>(std::make_unique<core::controller::ControllerServiceNodeMap>(), configuration_)), filesystem_(std::move(ctx.filesystem)), sensitive_values_encryptor_(std::move(ctx.sensitive_values_encryptor.value())), + bulletin_store_(ctx.bulletin_store), logger_(logging::LoggerFactory<FlowConfiguration>::getLogger()) { std::string flowUrl; std::string bucket_id = "default"; diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp index fcf916ceb1..074def2330 100644 --- a/libminifi/src/core/ProcessGroup.cpp +++ b/libminifi/src/core/ProcessGroup.cpp @@ -87,6 +87,8 @@ std::tuple<Processor*, bool> ProcessGroup::addProcessor(std::unique_ptr<Processo const auto name = processor->getName(); std::lock_guard<std::recursive_mutex> lock(mutex_); processor->setProcessGroupUUIDStr(getUUIDStr()); + processor->setProcessGroupName(getName()); + processor->setProcessGroupPath(buildGroupPath()); const auto [iter, inserted] = processors_.insert(std::move(processor)); if (inserted) { logger_->log_debug("Add processor {} into process group {}", name, name_); diff --git a/libminifi/src/core/Processor.cpp b/libminifi/src/core/Processor.cpp index 77b8d3fa1d..b0a7b385cb 100644 --- a/libminifi/src/core/Processor.cpp +++ b/libminifi/src/core/Processor.cpp @@ -29,7 +29,6 @@ #include "Connection.h" #include "core/Connectable.h" -#include "core/logging/LoggerConfiguration.h" #include "core/ProcessorConfig.h" #include "core/ProcessContext.h" #include "core/ProcessSessionFactory.h" diff --git a/libminifi/src/core/flow/FlowSchema.cpp b/libminifi/src/core/flow/FlowSchema.cpp index 4ab3aba0b3..53637ec76c 100644 --- a/libminifi/src/core/flow/FlowSchema.cpp +++ b/libminifi/src/core/flow/FlowSchema.cpp @@ -31,6 +31,7 @@ FlowSchema FlowSchema::getDefault() { .max_concurrent_tasks = {"max concurrent tasks"}, .penalization_period = {"penalization period"}, .proc_yield_period = {"yield period"}, + .bulletin_level = {"bulletin level"}, .runduration_nanos = {"run duration nanos"}, .connections = {"Connections"}, @@ -96,6 +97,7 @@ FlowSchema FlowSchema::getNiFiFlowJson() { .max_concurrent_tasks = {"concurrentlySchedulableTaskCount"}, .penalization_period = {"penaltyDuration"}, .proc_yield_period = {"yieldDuration"}, + .bulletin_level = {"bulletinLevel"}, // TODO(adebreceni): MINIFICPP-2033 since this is unused the mismatch between nano and milli is not an issue .runduration_nanos = {"runDurationMillis"}, diff --git a/libminifi/src/core/flow/StructuredConfiguration.cpp b/libminifi/src/core/flow/StructuredConfiguration.cpp index d79550b460..bc210cf307 100644 --- a/libminifi/src/core/flow/StructuredConfiguration.cpp +++ b/libminifi/src/core/flow/StructuredConfiguration.cpp @@ -247,6 +247,11 @@ void StructuredConfiguration::parseProcessorNode(const Node& processors_node, co logger_->log_debug("parseProcessorNode: yield period => [{}]", procCfg.yieldPeriod); } + if (auto bulletin_level_node = procNode[schema_.bulletin_level]) { + procCfg.bulletinLevel = bulletin_level_node.getString().value(); + logger_->log_debug("parseProcessorNode: bulletin level => [{}]", procCfg.bulletinLevel); + } + if (auto runNode = procNode[schema_.runduration_nanos]) { procCfg.runDurationNanos = runNode.getIntegerAsString().value(); logger_->log_debug("parseProcessorNode: run duration nanos => [{}]", procCfg.runDurationNanos); @@ -289,6 +294,11 @@ void StructuredConfiguration::parseProcessorNode(const Node& processors_node, co processor->setYieldPeriodMsec(yield_period.value()); } + if (auto level = core::logging::mapStringToSpdLogLevel(procCfg.bulletinLevel); level != spdlog::level::n_levels) { + processor->setLogBulletinLevel(level); + } + processor->setLoggerCallback(bulletin_store_); + // Default to running processor->setScheduledState(core::RUNNING); diff --git a/libminifi/src/core/state/MetricsPublisherStore.cpp b/libminifi/src/core/state/MetricsPublisherStore.cpp index abd1550d88..31941ed310 100644 --- a/libminifi/src/core/state/MetricsPublisherStore.cpp +++ b/libminifi/src/core/state/MetricsPublisherStore.cpp @@ -23,9 +23,9 @@ namespace org::apache::nifi::minifi::state { MetricsPublisherStore::MetricsPublisherStore(std::shared_ptr<Configure> configuration, const std::vector<std::shared_ptr<core::RepositoryMetricsSource>>& repository_metric_sources, - std::shared_ptr<core::FlowConfiguration> flow_configuration, utils::file::AssetManager* asset_manager) + std::shared_ptr<core::FlowConfiguration> flow_configuration, utils::file::AssetManager* asset_manager, core::BulletinStore* bulletin_store) : configuration_(configuration), - response_node_loader_(std::make_shared<response::ResponseNodeLoader>(std::move(configuration), repository_metric_sources, std::move(flow_configuration), asset_manager)) { + response_node_loader_(std::make_shared<response::ResponseNodeLoader>(std::move(configuration), repository_metric_sources, std::move(flow_configuration), asset_manager, bulletin_store)) { } void MetricsPublisherStore::initialize(core::controller::ControllerServiceProvider* controller, state::StateMonitor* update_sink) { diff --git a/libminifi/src/core/state/nodes/FlowInformation.cpp b/libminifi/src/core/state/nodes/FlowInformation.cpp index f31b26c297..e7727681dc 100644 --- a/libminifi/src/core/state/nodes/FlowInformation.cpp +++ b/libminifi/src/core/state/nodes/FlowInformation.cpp @@ -98,6 +98,32 @@ std::vector<SerializedResponseNode> FlowInformation::serialize() { serialized.push_back(processorsStatusesNode); } + if (bulletin_store_) { + SerializedResponseNode processorBulletinsNode{.name = "processorBulletins", .array = true, .collapsible = false}; + auto bulletins = bulletin_store_->getBulletins(); + for (const auto& bulletin : bulletins) { + processorBulletinsNode.children.push_back({ + .name = std::to_string(bulletin.id), + .collapsible = false, + .children = { + {.name = "id", .value = bulletin.id}, + {.name = "timestamp", .value = bulletin.timestamp}, + // {.name = "nodeAddress", .value = bulletin.node_address}, + {.name = "level", .value = bulletin.level}, + {.name = "category", .value = bulletin.category}, + {.name = "message", .value = bulletin.message}, + {.name = "groupId", .value = bulletin.group_id}, + {.name = "groupName", .value = bulletin.group_name}, + {.name = "groupPath", .value = bulletin.group_path}, + {.name = "sourceId", .value = bulletin.source_id}, + {.name = "sourceName", .value = bulletin.source_name} + // {.name = "flowFileUuid", .value = bulletin.flow_file_uuid} + } + }); + } + serialized.push_back(processorBulletinsNode); + } + return serialized; } diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp index 37337e3b0b..aa5c760ec1 100644 --- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp +++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp @@ -34,11 +34,12 @@ namespace org::apache::nifi::minifi::state::response { ResponseNodeLoader::ResponseNodeLoader(std::shared_ptr<Configure> configuration, std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repository_metric_sources, - std::shared_ptr<core::FlowConfiguration> flow_configuration, utils::file::AssetManager* asset_manager) + std::shared_ptr<core::FlowConfiguration> flow_configuration, utils::file::AssetManager* asset_manager, core::BulletinStore* bulletin_store) : configuration_(std::move(configuration)), repository_metric_sources_(std::move(repository_metric_sources)), flow_configuration_(std::move(flow_configuration)), - asset_manager_(asset_manager) { + asset_manager_(asset_manager), + bulletin_store_(bulletin_store) { } void ResponseNodeLoader::clearConfigRoot() { @@ -233,6 +234,10 @@ void ResponseNodeLoader::initializeFlowInformation(const SharedResponseNode& res flow_information->setFlowVersion(flow_configuration_->getFlowVersion()); } + if (bulletin_store_) { + flow_information->setBulletinStore(bulletin_store_); + } + if (root_) { std::vector<core::Processor*> processors; root_->getAllProcessors(processors); diff --git a/libminifi/test/integration/C2MetricsTest.cpp b/libminifi/test/integration/C2MetricsTest.cpp index 6f6ec14caa..bd7ab181c6 100644 --- a/libminifi/test/integration/C2MetricsTest.cpp +++ b/libminifi/test/integration/C2MetricsTest.cpp @@ -45,7 +45,7 @@ class VerifyC2Metrics : public VerifyC2Base { LogTestController::getInstance().setTrace<minifi::c2::C2Agent>(); LogTestController::getInstance().setDebug<minifi::c2::RESTSender>(); LogTestController::getInstance().setDebug<minifi::FlowController>(); - LogTestController::getInstance().setOff<minifi::processors::GetTCP>(); + LogTestController::getInstance().setDebug<minifi::processors::GetTCP>(); VerifyC2Base::testSetup(); } @@ -93,6 +93,7 @@ class MetricsHandler: public HeartbeatHandler { VERIFY_UPDATED_METRICS }; + static constexpr const char* PROCESS_GROUP_UUID = "2438e3c8-015a-1000-79ca-83af40ec1990"; static constexpr const char* GETTCP_UUID = "2438e3c8-015a-1000-79ca-83af40ec1991"; static constexpr const char* LOGATTRIBUTE1_UUID = "2438e3c8-015a-1000-79ca-83af40ec1992"; static constexpr const char* LOGATTRIBUTE2_UUID = "5128e3c8-015a-1000-79ca-83af40ec1990"; @@ -160,6 +161,24 @@ class MetricsHandler: public HeartbeatHandler { runtime_metrics["flowInfo"].HasMember("processorStatuses"); } + static bool verifyProcessorBulletins(const rapidjson::Value& runtime_metrics) { + if (!runtime_metrics["flowInfo"].HasMember("processorBulletins")) { + return false; + } + auto bulletins = runtime_metrics["flowInfo"]["processorBulletins"].GetArray(); + return std::any_of(bulletins.begin(), bulletins.end(), [](const auto& bulletin) { + return bulletin["id"].GetInt() > 0 && + bulletin["level"].GetString() == std::string("ERROR") && + bulletin["category"].GetString() == std::string("Log Message") && + bulletin["message"].GetString() == "Error connecting to localhost:8776 due to Connection refused (" + std::string(GETTCP_UUID) + ")" && + bulletin["groupId"].GetString() == std::string(PROCESS_GROUP_UUID) && + bulletin["groupName"].GetString() == std::string("MiNiFi Flow") && + bulletin["groupPath"].GetString() == std::string("MiNiFi Flow") && + bulletin["sourceId"].GetString() == std::string(GETTCP_UUID) && + bulletin["sourceName"].GetString() == std::string("GetTCP"); + }); + } + static bool verifyRuntimeMetrics(const rapidjson::Value& runtime_metrics) { return verifyCommonRuntimeMetricNodes(runtime_metrics, "2438e3c8-015a-1000-79ca-83af40ec1997") && [&]() { @@ -173,7 +192,8 @@ class MetricsHandler: public HeartbeatHandler { } return processorMetricsAreValid(processor); }); - }(); + }() && + verifyProcessorBulletins(runtime_metrics); } static bool verifyUpdatedRuntimeMetrics(const rapidjson::Value& runtime_metrics) { diff --git a/libminifi/test/libtest/integration/IntegrationBase.cpp b/libminifi/test/libtest/integration/IntegrationBase.cpp index 72ae844fd7..b1c8077331 100644 --- a/libminifi/test/libtest/integration/IntegrationBase.cpp +++ b/libminifi/test/libtest/integration/IntegrationBase.cpp @@ -91,6 +91,7 @@ void IntegrationBase::run(const std::optional<std::filesystem::path>& test_file_ std::string nifi_configuration_class_name = "adaptiveconfiguration"; configuration->get(minifi::Configure::nifi_configuration_class_name, nifi_configuration_class_name); + bulletin_store_ = std::make_unique<core::BulletinStore>(); std::shared_ptr<core::FlowConfiguration> flow_config = core::createFlowConfiguration( core::ConfigurationContext{ .flow_file_repo = test_repo, @@ -98,7 +99,8 @@ void IntegrationBase::run(const std::optional<std::filesystem::path>& test_file_ .configuration = configuration, .path = test_file_location, .filesystem = filesystem, - .sensitive_values_encryptor = sensitive_values_encryptor + .sensitive_values_encryptor = sensitive_values_encryptor, + .bulletin_store = bulletin_store_.get() }, nifi_configuration_class_name); auto controller_service_provider = flow_config->getControllerServiceProvider(); @@ -119,7 +121,7 @@ void IntegrationBase::run(const std::optional<std::filesystem::path>& test_file_ std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repo_metric_sources{test_repo, test_flow_repo, content_repo}; asset_manager_ = std::make_unique<minifi::utils::file::AssetManager>(*configuration); - auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configuration, repo_metric_sources, flow_config, asset_manager_.get()); + auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configuration, repo_metric_sources, flow_config, asset_manager_.get(), bulletin_store_.get()); flowController_ = std::make_unique<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(flow_config), content_repo, std::move(metrics_publisher_store), filesystem, request_restart, asset_manager_.get()); flowController_->load(); diff --git a/libminifi/test/libtest/integration/IntegrationBase.h b/libminifi/test/libtest/integration/IntegrationBase.h index 86a46e3f85..1c1d09e3c2 100644 --- a/libminifi/test/libtest/integration/IntegrationBase.h +++ b/libminifi/test/libtest/integration/IntegrationBase.h @@ -29,6 +29,7 @@ #include "FlowController.h" #include "properties/Configure.h" #include "utils/file/AssetManager.h" +#include "core/BulletinStore.h" namespace minifi = org::apache::nifi::minifi; namespace core = minifi::core; @@ -109,6 +110,7 @@ class IntegrationBase { void configureSecurity(); std::shared_ptr<minifi::Configure> configuration; std::unique_ptr<minifi::utils::file::AssetManager> asset_manager_; + std::unique_ptr<core::BulletinStore> bulletin_store_; std::unique_ptr<minifi::state::response::ResponseNodeLoader> response_node_loader_; std::unique_ptr<minifi::FlowController> flowController_; std::chrono::milliseconds wait_time_; diff --git a/libminifi/test/resources/TestC2Metrics.yml b/libminifi/test/resources/TestC2Metrics.yml index 6a0af5e4c1..fdb8924198 100644 --- a/libminifi/test/resources/TestC2Metrics.yml +++ b/libminifi/test/resources/TestC2Metrics.yml @@ -29,6 +29,7 @@ Processors: penalization period: 30 sec yield period: 10 sec run duration nanos: 0 + bulletin level: ERROR auto-terminated relationships list: Properties: Endpoint List: localhost:8776 @@ -43,6 +44,7 @@ Processors: scheduling period: 30 sec penalization period: 30 sec yield period: 1 sec + bulletin level: ERROR run duration nanos: 0 auto-terminated relationships list: - response diff --git a/minifi_main/MiNiFiMain.cpp b/minifi_main/MiNiFiMain.cpp index f2b66df40f..697d91dfd7 100644 --- a/minifi_main/MiNiFiMain.cpp +++ b/minifi_main/MiNiFiMain.cpp @@ -65,10 +65,10 @@ #include "MainHelper.h" #include "agent/JsonSchema.h" #include "core/state/nodes/ResponseNodeLoader.h" -#include "c2/C2Agent.h" #include "core/state/MetricsPublisherStore.h" #include "argparse/argparse.hpp" #include "agent/agent_version.h" +#include "core/BulletinStore.h" namespace minifi = org::apache::nifi::minifi; namespace core = minifi::core; @@ -390,6 +390,8 @@ int main(int argc, char **argv) { should_encrypt_flow_config, utils::crypto::EncryptionProvider::create(minifiHome)); + std::unique_ptr<core::BulletinStore> bulletin_store = std::make_unique<core::BulletinStore>(); + std::shared_ptr<core::FlowConfiguration> flow_configuration = core::createFlowConfiguration( core::ConfigurationContext{ .flow_file_repo = flow_repo, @@ -397,13 +399,14 @@ int main(int argc, char **argv) { .configuration = configure, .path = configure->get(minifi::Configure::nifi_flow_configuration_file), .filesystem = filesystem, - .sensitive_values_encryptor = utils::crypto::EncryptionProvider::createSensitivePropertiesEncryptor(minifiHome) + .sensitive_values_encryptor = utils::crypto::EncryptionProvider::createSensitivePropertiesEncryptor(minifiHome), + .bulletin_store = bulletin_store.get() }, nifi_configuration_class_name); auto asset_manager = std::make_unique<utils::file::AssetManager>(*configure); std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repo_metric_sources{prov_repo, flow_repo, content_repo}; - auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configure, repo_metric_sources, flow_configuration, asset_manager.get()); + auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configure, repo_metric_sources, flow_configuration, asset_manager.get(), bulletin_store.get()); const auto controller = std::make_unique<minifi::FlowController>( prov_repo, flow_repo, configure, std::move(flow_configuration), content_repo, std::move(metrics_publisher_store), filesystem, request_restart, asset_manager.get());