Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
lordgamez committed Jan 28, 2025
1 parent 1652b2a commit 2c53059
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 84 deletions.
38 changes: 31 additions & 7 deletions libminifi/include/core/BulletinStore.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <string>
Expand All @@ -9,8 +26,14 @@

#include "properties/Configure.h"
#include "core/logging/LoggerFactory.h"
#include "core/Processor.h"

namespace org::apache::nifi::minifi::core {
namespace org::apache::nifi::minifi {
namespace test {
class BulletinStoreTestAccessor;
} // namespace test

namespace core {

struct Bulletin {
uint64_t id = 0;
Expand All @@ -27,14 +50,14 @@ struct Bulletin {

class BulletinStore {
public:
BulletinStore(const Configure& configure);
void addBulletin(Bulletin&& bulletin);
explicit BulletinStore(const Configure& configure);
void addProcessorBulletin(const core::Processor& processor, core::logging::LOG_LEVEL log_level, const std::string& message);
std::deque<Bulletin> getBulletins(std::optional<std::chrono::system_clock::duration> time_interval_to_include = {}) const;
size_t getMaxBulletinCount() const {
return max_bulletin_count_;
}
size_t getMaxBulletinCount() const;

private:
friend class minifi::test::BulletinStoreTestAccessor;

static constexpr size_t DEFAULT_BULLETIN_COUNT = 1000;
size_t max_bulletin_count_;
mutable std::mutex mutex_;
Expand All @@ -43,4 +66,5 @@ class BulletinStore {
std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<BulletinStore>::getLogger()};
};

} // namespace org::apache::nifi::minifi::core
} // namespace core
} // namespace org::apache::nifi::minifi
45 changes: 1 addition & 44 deletions libminifi/include/core/ProcessGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,13 @@ class ProcessGroup : public CoreComponentImpl {
ProcessGroup(ProcessGroupType type, std::string_view name);
ProcessGroup(ProcessGroupType type, std::string_view name, const utils::Identifier& uuid);
ProcessGroup(ProcessGroupType type, std::string_view name, const utils::Identifier& uuid, int version);
// Destructor
~ProcessGroup() override;
// Set URL
void setURL(std::string url) {
url_ = std::move(url);
}
// Get URL
std::string getURL() {
return (url_);
}
// SetTransmitting
void setTransmitting(bool val) {
transmitting_ = val;
}
Expand All @@ -93,7 +89,6 @@ class ProcessGroup : public CoreComponentImpl {
uint64_t getTimeout() {
return timeout_;
}
// setInterface
void setInterface(std::string &ifc) {
local_network_interface_ = ifc;
}
Expand Down Expand Up @@ -124,11 +119,9 @@ class ProcessGroup : public CoreComponentImpl {
http::HTTPProxy getHTTPProxy() {
return proxy_;
}
// Set Processor yield period in MilliSecond
void setYieldPeriodMsec(std::chrono::milliseconds period) {
yield_period_msec_ = period;
}
// Get Processor yield period in MilliSecond
std::chrono::milliseconds getYieldPeriodMsec() {
return (yield_period_msec_);
}
Expand All @@ -147,12 +140,10 @@ class ProcessGroup : public CoreComponentImpl {
const std::function<bool(const Processor*)>& filter = nullptr);

bool isRemoteProcessGroup();
// set parent process group
void setParent(ProcessGroup *parent) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
parent_process_group_ = parent;
}
// get parent process group
ProcessGroup *getParent() const {
std::lock_guard<std::recursive_mutex> lock(mutex_);
return parent_process_group_;
Expand Down Expand Up @@ -185,25 +176,17 @@ class ProcessGroup : public CoreComponentImpl {
}
return nullptr;
}
// findProcessor based on UUID
Processor* findProcessorById(const utils::Identifier& uuid, Traverse traverse = Traverse::IncludeChildren) const;
// findProcessor based on name
Processor* findProcessorByName(const std::string &processorName, Traverse traverse = Traverse::IncludeChildren) const;

void getAllProcessors(std::vector<Processor*>& processor_vec) const;

/**
* Add controller service
* @param nodeId node identifier
* @param node controller service node.
*/
void addControllerService(const std::string &nodeId, const std::shared_ptr<core::controller::ControllerServiceNode> &node);

core::controller::ControllerServiceNode* findControllerService(const std::string &nodeId, Traverse traverse = Traverse::ExcludeChildren) const;

std::vector<const core::controller::ControllerServiceNode*> getAllControllerServices() const;

// update property value
void updatePropertyValue(const std::string& processorName, const std::string& propertyName, const std::string& propertyValue);

void getConnections(std::map<std::string, Connection*>& connectionMap);
Expand All @@ -225,58 +208,32 @@ class ProcessGroup : public CoreComponentImpl {
return child_process_groups_;
}

std::string buildGroupPath() const {
std::string path = getName();
auto parent = getParent();
while (parent != nullptr) {
path = parent->getName() + " / " + path;
parent = parent->getParent();
}
return path;
}

protected:
void startProcessingProcessors(TimerDrivenSchedulingAgent& timeScheduler, EventDrivenSchedulingAgent& eventScheduler, CronDrivenSchedulingAgent& cronScheduler);

// version
int config_version_;
// Process Group Type
const ProcessGroupType type_;
// Processors (ProcessNode) inside this process group which include Remote Process Group input/Output port
std::set<std::unique_ptr<Processor>> processors_;
std::set<Processor*> failed_processors_;
std::set<Port*> ports_;
std::set<std::unique_ptr<ProcessGroup>> child_process_groups_;
// Connections between the processor inside the group;
std::set<std::unique_ptr<Connection>> connections_;
// Parent Process Group
ProcessGroup* parent_process_group_;
// Yield Period in Milliseconds
std::atomic<std::chrono::milliseconds> yield_period_msec_;
std::atomic<uint64_t> timeout_;

// URL
std::string url_;
// local network interface
std::string local_network_interface_;
// Transmitting
std::atomic<bool> transmitting_;
// http proxy
http::HTTPProxy proxy_;
std::string transport_protocol_;

// controller services

core::controller::ControllerServiceNodeMap controller_service_map_;

ParameterContext* parameter_context_ = nullptr;

private:
static Port* findPortById(const std::set<Port*>& ports, const utils::Identifier& uuid);
std::string buildGroupPath() const;

// Mutex for protection
mutable std::recursive_mutex mutex_;
// Logger
std::shared_ptr<logging::Logger> logger_;

ProcessGroup(const ProcessGroup &parent);
Expand Down
36 changes: 35 additions & 1 deletion libminifi/src/core/BulletinStore.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,24 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "core/BulletinStore.h"

#include "core/logging/LoggerBase.h"

namespace org::apache::nifi::minifi::core {

BulletinStore::BulletinStore(const Configure &configure) {
Expand All @@ -17,9 +36,19 @@ BulletinStore::BulletinStore(const Configure &configure) {
}
}

void BulletinStore::addBulletin(Bulletin&& bulletin) {
void BulletinStore::addProcessorBulletin(const core::Processor& processor, core::logging::LOG_LEVEL log_level, const std::string& message) {
std::lock_guard<std::mutex> lock(mutex_);
Bulletin bulletin;
bulletin.id = id_counter++;
bulletin.timestamp = std::chrono::system_clock::now();
bulletin.level = core::logging::mapLogLevelToString(log_level);
bulletin.category = "Log Message";
bulletin.message = message;
bulletin.group_id = processor.getProcessGroupUUIDStr();
bulletin.group_name = processor.getProcessGroupName();
bulletin.group_path = processor.getProcessGroupPath();
bulletin.source_id = processor.getUUIDStr();
bulletin.source_name = processor.getName();
if (bulletins_.size() >= max_bulletin_count_) {
bulletins_.pop_front();
}
Expand All @@ -39,4 +68,9 @@ std::deque<Bulletin> BulletinStore::getBulletins(std::optional<std::chrono::syst
return {};
}

size_t BulletinStore::getMaxBulletinCount() const {
std::lock_guard<std::mutex> lock(mutex_);
return max_bulletin_count_;
}

} // namespace org::apache::nifi::minifi::core
11 changes: 11 additions & 0 deletions libminifi/src/core/ProcessGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,4 +487,15 @@ ParameterContext* ProcessGroup::getParameterContext() const {
return parameter_context_;
}

std::string ProcessGroup::buildGroupPath() const {
std::lock_guard<std::recursive_mutex> lock(mutex_);
std::string path = name_;
auto parent = parent_process_group_;
while (parent != nullptr) {
path = parent->getName() + " / " + path;
parent = parent->getParent();
}
return path;
}

} // namespace org::apache::nifi::minifi::core
14 changes: 2 additions & 12 deletions libminifi/src/core/flow/StructuredConfiguration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,22 +297,12 @@ void StructuredConfiguration::parseProcessorNode(const Node& processors_node, co
if (!procCfg.bulletinLevel.empty()) {
processor->setLogBulletinLevel(core::logging::mapStringToLogLevel(procCfg.bulletinLevel));
}
processor->setLoggerCallback([this, processor=processor.get()](core::logging::LOG_LEVEL level, const std::string& message) {
processor->setLoggerCallback([this, processor = processor.get()](core::logging::LOG_LEVEL level, const std::string& message) {
if (level < processor->getLogBulletinLevel()) {
return;
}
if (bulletin_store_) {
core::Bulletin bulletin;
bulletin.timestamp = std::chrono::system_clock::now();
bulletin.level = core::logging::mapLogLevelToString(level);
bulletin.category = "Log Message";
bulletin.message = message;
bulletin.group_id = processor->getProcessGroupUUIDStr();
bulletin.group_name = processor->getProcessGroupName();
bulletin.group_path = processor->getProcessGroupPath();
bulletin.source_id = processor->getUUIDStr();
bulletin.source_name = processor->getName();
bulletin_store_->addBulletin(std::move(bulletin));
bulletin_store_->addProcessorBulletin(*processor, level, message);
}
});

Expand Down
45 changes: 25 additions & 20 deletions libminifi/test/unit/BulletinStoreTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <memory>

#include "unit/TestBase.h"
#include "unit/Catch.h"
#include "core/BulletinStore.h"
#include "properties/Configure.h"
#include "unit/DummyProcessor.h"

using namespace std::literals::chrono_literals;

namespace org::apache::nifi::minifi::test {

core::Bulletin createBulletin() {
return core::Bulletin{
.id = 0,
.timestamp = std::chrono::system_clock::now(),
.level = "WARN",
.category = "Log Message",
.message = "Warning message",
.group_id = "68fa9ae4-b9fc-4873-b0d9-edab59fdb0c2",
.group_name = "sub_group",
.group_path = "root/sub_group",
.source_id = "4d7fa7e6-2459-46dd-b2ba-61517239edf5",
.source_name = "DummyProcessor"
};
class BulletinStoreTestAccessor {
public:
static std::deque<core::Bulletin>& getBulletins(core::BulletinStore& store) {
return store.bulletins_;
}
};

std::unique_ptr<core::Processor> createDummyProcessor() {
auto processor = std::make_unique<DummyProcessor>("DummyProcessor", minifi::utils::Identifier::parse("4d7fa7e6-2459-46dd-b2ba-61517239edf5").value());
processor->setProcessGroupUUIDStr("68fa9ae4-b9fc-4873-b0d9-edab59fdb0c2");
processor->setProcessGroupName("sub_group");
processor->setProcessGroupPath("root/sub_group");
return processor;
}

TEST_CASE("Create BulletinStore with default max size of 1000", "[bulletinStore]") {
Expand All @@ -60,8 +63,9 @@ TEST_CASE("Remove oldest entries when limit is reached", "[bulletinStore]") {
ConfigureImpl configuration;
configuration.set(Configure::nifi_c2_flow_info_processor_bulletin_limit, "2");
core::BulletinStore bulletin_store(configuration);
auto processor = createDummyProcessor();
for (size_t i = 0; i < 3; ++i) {
bulletin_store.addBulletin(createBulletin());
bulletin_store.addProcessorBulletin(*processor, logging::LOG_LEVEL::warn, "Warning message");
}
auto bulletins = bulletin_store.getBulletins();
REQUIRE(bulletins.size() == 2);
Expand All @@ -73,8 +77,9 @@ TEST_CASE("Remove oldest entries when limit is reached", "[bulletinStore]") {
TEST_CASE("Return all bulletins when no time interval is defined or all entries are part of the time interval", "[bulletinStore]") {
ConfigureImpl configuration;
core::BulletinStore bulletin_store(configuration);
auto processor = createDummyProcessor();
for (size_t i = 0; i < 3; ++i) {
bulletin_store.addBulletin(createBulletin());
bulletin_store.addProcessorBulletin(*processor, logging::LOG_LEVEL::warn, "Warning message");
}
auto bulletins = bulletin_store.getBulletins();
REQUIRE(bulletins.size() == 3);
Expand All @@ -87,12 +92,12 @@ TEST_CASE("Return all bulletins when no time interval is defined or all entries
TEST_CASE("Return only bulletins that are inside the defined time interval", "[bulletinStore]") {
ConfigureImpl configuration;
core::BulletinStore bulletin_store(configuration);
auto old_bulletin = createBulletin();
old_bulletin.timestamp -= 5min;
bulletin_store.addBulletin(std::move(old_bulletin));
for (size_t i = 0; i < 2; ++i) {
bulletin_store.addBulletin(createBulletin());
auto processor = createDummyProcessor();
for (size_t i = 0; i < 3; ++i) {
bulletin_store.addProcessorBulletin(*processor, logging::LOG_LEVEL::warn, "Warning message");
}
BulletinStoreTestAccessor::getBulletins(bulletin_store)[0].timestamp -= 5min;

auto bulletins = bulletin_store.getBulletins(3min);
REQUIRE(bulletins.size() == 2);
REQUIRE(bulletins[0].id == 2);
Expand Down

0 comments on commit 2c53059

Please sign in to comment.