Skip to content

Commit

Permalink
MINIFICPP-2502 Add processorBulletins C2 metric node to FlowInformation
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Jan 24, 2025
1 parent e30f9c3 commit f8355ad
Show file tree
Hide file tree
Showing 24 changed files with 260 additions and 31 deletions.
1 change: 1 addition & 0 deletions extensions/python/ExecutePythonProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ std::unique_ptr<PythonScriptEngine> ExecutePythonProcessor::createScriptEngine()
auto engine = std::make_unique<PythonScriptEngine>();

python_logger_ = core::logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName());
setLoggerCallback(python_logger_, bulletin_store_);
engine->initialize(Success, Failure, Original, python_logger_);

return engine;
Expand Down
47 changes: 47 additions & 0 deletions libminifi/include/core/BulletinStore.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#pragma once

#include <string>
#include <deque>
#include <mutex>

namespace org::apache::nifi::minifi::core {

struct Bulletin {
uint64_t id;
std::string timestamp; // "Tue Dec 10 08:40:26 CET 2024",
// std::string node_address; // empty
std::string level;
std::string category;
std::string message;
std::string group_id;
std::string group_name;
std::string group_path;
std::string source_id;
std::string source_name;
// std::string flow_file_uuid; // empty
};

class BulletinStore {
public:
void addBulletin(Bulletin& bulletin) {
std::lock_guard<std::mutex> lock(mutex_);
bulletin.id = id_counter++;
if (bulletins_.size() >= MAX_BULLETIN_COUNT) {
bulletins_.pop_front();
}
bulletins_.push_back(bulletin);
}

std::deque<Bulletin> getBulletins() const {
std::lock_guard<std::mutex> lock(mutex_);
return bulletins_;
}

private:
static constexpr uint64_t MAX_BULLETIN_COUNT = 10000;
mutable std::mutex mutex_;
uint64_t id_counter = 1;
std::deque<Bulletin> bulletins_;
};

} // namespace org::apache::nifi::minifi::core
3 changes: 3 additions & 0 deletions libminifi/include/core/FlowConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "utils/file/FileSystem.h"
#include "utils/ChecksumCalculator.h"
#include "ParameterContext.h"
#include "core/BulletinStore.h"

namespace org::apache::nifi::minifi::core {

Expand All @@ -60,6 +61,7 @@ struct ConfigurationContext {
std::optional<std::filesystem::path> path{std::nullopt};
std::shared_ptr<utils::file::FileSystem> filesystem{std::make_shared<utils::file::FileSystem>()};
std::optional<utils::crypto::EncryptionProvider> sensitive_values_encryptor{std::nullopt};
core::BulletinStore* bulletin_store{nullptr};
};

enum class FlowSerializationType { Json, NifiJson, Yaml };
Expand Down Expand Up @@ -149,6 +151,7 @@ class FlowConfiguration : public CoreComponent {
std::shared_ptr<utils::file::FileSystem> filesystem_;
utils::crypto::EncryptionProvider sensitive_values_encryptor_;
utils::ChecksumCalculator checksum_calculator_;
core::BulletinStore* bulletin_store_ = nullptr;

private:
virtual std::string serialize(const ProcessGroup&) { return ""; }
Expand Down
12 changes: 11 additions & 1 deletion libminifi/include/core/ProcessGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class ProcessGroup : public CoreComponent {
parent_process_group_ = parent;
}
// get parent process group
ProcessGroup *getParent() {
ProcessGroup *getParent() const {
std::lock_guard<std::recursive_mutex> lock(mutex_);
return parent_process_group_;
}
Expand Down Expand Up @@ -225,6 +225,16 @@ class ProcessGroup : public CoreComponent {
return child_process_groups_;
}

std::string buildGroupPath() const {
std::string path = getName();
auto parent = getParent();
while (parent != nullptr) {
path = parent->getName() + " / " + path;
parent = parent->getParent();
}
return path;
}

protected:
void startProcessingProcessors(TimerDrivenSchedulingAgent& timeScheduler, EventDrivenSchedulingAgent& eventScheduler, CronDrivenSchedulingAgent& cronScheduler);

Expand Down
53 changes: 53 additions & 0 deletions libminifi/include/core/Processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
#include "ProcessorMetrics.h"
#include "utils/gsl.h"
#include "OutputAttributeDefinition.h"
#include "core/BulletinStore.h"
#include "core/logging/LoggerConfiguration.h"
#include "utils/TimeUtil.h"

#define ADD_GET_PROCESSOR_NAME \
std::string getProcessorType() const override { \
Expand Down Expand Up @@ -173,6 +176,22 @@ class Processor : public Connectable, public ConfigurableComponent, public state
process_group_uuid_ = uuid;
}

std::string getProcessGroupName() const {
return process_group_name_;
}

void setProcessGroupName(const std::string &name) {
process_group_name_ = name;
}

std::string getProcessGroupPath() const {
return process_group_path_;
}

void setProcessGroupPath(const std::string &path) {
process_group_path_ = path;
}

void yield() override;

void yield(std::chrono::steady_clock::duration delta_time);
Expand Down Expand Up @@ -226,6 +245,15 @@ class Processor : public Connectable, public ConfigurableComponent, public state
return metrics_;
}

void setLogBulletinLevel(spdlog::level::level_enum level) {
log_bulletin_level_ = level;
}

void setLoggerCallback(core::BulletinStore* bulletin_store) {
bulletin_store_ = bulletin_store;
setLoggerCallback(logger_, bulletin_store);
}

static constexpr auto DynamicProperties = std::array<DynamicProperty, 0>{};

static constexpr auto OutputAttributes = std::array<OutputAttributeReference, 0>{};
Expand All @@ -234,6 +262,27 @@ class Processor : public Connectable, public ConfigurableComponent, public state
virtual void notifyStop() {
}

void setLoggerCallback(const std::shared_ptr<logging::Logger>& logger, core::BulletinStore* bulletin_store) {
logger->addLogCallback([this, bulletin_store](spdlog::level::level_enum level, const std::string& message) {
if (level < log_bulletin_level_) {
return;
}
if (bulletin_store) {
core::Bulletin bulletin;
bulletin.timestamp = minifi::utils::timeutils::getDateTimeStr(std::chrono::time_point_cast<std::chrono::seconds>(std::chrono::system_clock::now()));
bulletin.level = core::logging::mapSpdLogLevelToString(level);
bulletin.category = "Log Message";
bulletin.message = message;
bulletin.group_id = getProcessGroupUUIDStr();
bulletin.group_name = getProcessGroupName();
bulletin.group_path = getProcessGroupPath();
bulletin.source_id = getUUIDStr();
bulletin.source_name = getName();
bulletin_store->addBulletin(bulletin);
}
});
}

std::atomic<ScheduledState> state_;

std::atomic<std::chrono::steady_clock::duration> scheduling_period_;
Expand All @@ -247,6 +296,8 @@ class Processor : public Connectable, public ConfigurableComponent, public state
gsl::not_null<std::shared_ptr<ProcessorMetrics>> metrics_;

std::shared_ptr<logging::Logger> logger_;
spdlog::level::level_enum log_bulletin_level_ = spdlog::level::warn;
core::BulletinStore* bulletin_store_ = nullptr;

private:
mutable std::mutex mutex_;
Expand All @@ -266,6 +317,8 @@ class Processor : public Connectable, public ConfigurableComponent, public state
std::unordered_map<Connection*, std::unordered_set<Processor*>> reachable_processors_;

std::string process_group_uuid_;
std::string process_group_name_;
std::string process_group_path_;
};

} // namespace core
Expand Down
19 changes: 4 additions & 15 deletions libminifi/include/core/ProcessorConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_
#define LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_
#pragma once

#include <string>
#include <vector>

#include "core/Core.h"
#include "core/Property.h"

namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace core {

namespace org::apache::nifi::minifi::core {

constexpr const char* DEFAULT_SCHEDULING_STRATEGY{"TIMER_DRIVEN"};
constexpr const char* DEFAULT_SCHEDULING_PERIOD_STR{"1 sec"};
Expand All @@ -47,16 +41,11 @@ struct ProcessorConfig {
std::string schedulingPeriod;
std::string penalizationPeriod;
std::string yieldPeriod;
std::string bulletinLevel;
std::string runDurationNanos;
std::vector<std::string> autoTerminatedRelationships;
std::vector<core::Property> properties;
std::string parameterContextName;
};

} // namespace core
} // namespace minifi
} // namespace nifi
} // namespace apache
} // namespace org

#endif // LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_
} // namespace org::apache::nifi::minifi::core
1 change: 1 addition & 0 deletions libminifi/include/core/flow/FlowSchema.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct FlowSchema {
Keys max_concurrent_tasks;
Keys penalization_period;
Keys proc_yield_period;
Keys bulletin_level;
Keys runduration_nanos;

Keys connections;
Expand Down
44 changes: 43 additions & 1 deletion libminifi/include/core/logging/Logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,38 @@ inline LOG_LEVEL mapFromSpdLogLevel(spdlog::level::level_enum level) {
throw std::invalid_argument(fmt::format("Invalid spdlog::level::level_enum {}", magic_enum::enum_underlying(level)));
}

inline std::string mapSpdLogLevelToString(spdlog::level::level_enum level) {
switch (level) {
case spdlog::level::trace: return "TRACE";
case spdlog::level::debug: return "DEBUG";
case spdlog::level::info: return "INFO";
case spdlog::level::warn: return "WARN";
case spdlog::level::err: return "ERROR";
case spdlog::level::critical: return "CRITICAL";
case spdlog::level::off: return "OFF";
case spdlog::level::n_levels: break;
}
throw std::invalid_argument(fmt::format("Invalid spdlog::level::level_enum {}", magic_enum::enum_underlying(level)));
}

inline spdlog::level::level_enum mapStringToSpdLogLevel(const std::string& level_str) {
if (level_str == "TRACE") {
return spdlog::level::trace;
} else if (level_str == "DEBUG") {
return spdlog::level::debug;
} else if (level_str == "INFO") {
return spdlog::level::info;
} else if (level_str == "WARN") {
return spdlog::level::warn;
} else if (level_str == "ERROR") {
return spdlog::level::err;
} else if (level_str == "CRITICAL") {
return spdlog::level::critical;
}

return spdlog::level::n_levels;
}

class BaseLogger {
public:
virtual ~BaseLogger();
Expand Down Expand Up @@ -156,6 +188,11 @@ class Logger : public BaseLogger {

virtual std::optional<std::string> get_id() = 0;

void addLogCallback(std::function<void(spdlog::level::level_enum level, const std::string&)> callback) {
std::lock_guard<std::mutex> lock(mutex_);
log_callbacks_.push_back(std::move(callback));
}

protected:
Logger(std::shared_ptr<spdlog::logger> delegate, std::shared_ptr<LoggerControl> controller);

Expand Down Expand Up @@ -191,10 +228,15 @@ class Logger : public BaseLogger {
if (!delegate_->should_log(level)) {
return;
}
delegate_->log(level, stringify(std::move(fmt), map_args(std::forward<Args>(args))...));
auto message = stringify(std::move(fmt), map_args(std::forward<Args>(args))...);
for (const auto& callback : log_callbacks_) {
callback(level, message);
}
delegate_->log(level, message);
}

std::atomic<int> max_log_size_{LOG_BUFFER_SIZE};
std::vector<std::function<void(spdlog::level::level_enum level, const std::string&)>> log_callbacks_;
};

} // namespace org::apache::nifi::minifi::core::logging
3 changes: 2 additions & 1 deletion libminifi/include/core/state/MetricsPublisherStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@
#include "utils/gsl.h"
#include "core/ProcessGroup.h"
#include "utils/file/AssetManager.h"
#include "core/BulletinStore.h"

namespace org::apache::nifi::minifi::state {

class MetricsPublisherStore {
public:
MetricsPublisherStore(std::shared_ptr<Configure> configuration, const std::vector<std::shared_ptr<core::RepositoryMetricsSource>>& repository_metric_sources,
std::shared_ptr<core::FlowConfiguration> flow_configuration, utils::file::AssetManager* asset_manager = nullptr);
std::shared_ptr<core::FlowConfiguration> flow_configuration, utils::file::AssetManager* asset_manager = nullptr, core::BulletinStore* bulletin_store = nullptr);
void initialize(core::controller::ControllerServiceProvider* controller, state::StateMonitor* update_sink);
void loadMetricNodes(core::ProcessGroup* root);
void clearMetricNodes();
Expand Down
6 changes: 6 additions & 0 deletions libminifi/include/core/state/nodes/FlowInformation.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "Connection.h"
#include "core/state/ConnectionStore.h"
#include "core/Processor.h"
#include "core/BulletinStore.h"

namespace org::apache::nifi::minifi::state::response {

Expand Down Expand Up @@ -121,13 +122,18 @@ class FlowInformation : public StateMonitorNode {
processors_ = std::move(processors);
}

void setBulletinStore(core::BulletinStore* bulletin_store) {
bulletin_store_ = bulletin_store;
}

std::vector<SerializedResponseNode> serialize() override;
std::vector<PublishedMetric> calculateMetrics() override;

private:
std::shared_ptr<state::response::FlowVersion> flow_version_;
ConnectionStore connection_store_;
std::vector<core::Processor*> processors_;
core::BulletinStore* bulletin_store_ = nullptr;
};

} // namespace org::apache::nifi::minifi::state::response
4 changes: 3 additions & 1 deletion libminifi/include/core/state/nodes/ResponseNodeLoader.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,14 @@
#include "utils/expected.h"
#include "core/RepositoryMetricsSource.h"
#include "utils/file/AssetManager.h"
#include "core/BulletinStore.h"

namespace org::apache::nifi::minifi::state::response {

class ResponseNodeLoader {
public:
ResponseNodeLoader(std::shared_ptr<Configure> configuration, std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repository_metric_sources,
std::shared_ptr<core::FlowConfiguration> flow_configuration, utils::file::AssetManager* asset_manager = nullptr);
std::shared_ptr<core::FlowConfiguration> flow_configuration, utils::file::AssetManager* asset_manager = nullptr, core::BulletinStore* bulletin_store = nullptr);

void setNewConfigRoot(core::ProcessGroup* root);
void clearConfigRoot();
Expand Down Expand Up @@ -80,6 +81,7 @@ class ResponseNodeLoader {
utils::file::AssetManager* asset_manager_{};
core::controller::ControllerServiceProvider* controller_{};
state::StateMonitor* update_sink_{};
core::BulletinStore* bulletin_store_{};
std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<ResponseNodeLoader>::getLogger()};
};

Expand Down
1 change: 1 addition & 0 deletions libminifi/src/core/FlowConfiguration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ FlowConfiguration::FlowConfiguration(ConfigurationContext ctx)
service_provider_(std::make_shared<core::controller::StandardControllerServiceProvider>(std::make_unique<core::controller::ControllerServiceNodeMap>(), configuration_)),
filesystem_(std::move(ctx.filesystem)),
sensitive_values_encryptor_(std::move(ctx.sensitive_values_encryptor.value())),
bulletin_store_(ctx.bulletin_store),
logger_(logging::LoggerFactory<FlowConfiguration>::getLogger()) {
std::string flowUrl;
std::string bucket_id = "default";
Expand Down
Loading

0 comments on commit f8355ad

Please sign in to comment.