Skip to content

Commit d2e10c5

Browse files
committed
Refactor cxx rsa integration to use ipc concepts
1 parent ee9024a commit d2e10c5

File tree

7 files changed

+89
-91
lines changed

7 files changed

+89
-91
lines changed

bundles/cxx_remote_services/integration/CMakeLists.txt

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -41,39 +41,36 @@ if (CXX_RSA_INTEGRATION)
4141
endif()
4242

4343
################# Integration examples ##################################
44+
add_celix_container(RemoteCalculatorProvider
45+
CXX
46+
GROUP rsa
47+
PROPERTIES
48+
CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=debug
49+
BUNDLES
50+
Celix::ShellCxx
51+
Celix::shell_tui
4452

45-
if (BUILD_LAUNCHER)
46-
add_celix_container(RemoteCalculatorProvider
47-
CXX
48-
GROUP rsa
49-
PROPERTIES
50-
CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=debug
51-
BUNDLES
52-
Celix::ShellCxx
53-
Celix::shell_tui
53+
#Remote Services
54+
Celix::RemoteServiceAdmin
55+
TestExportImportRemoteServiceFactory #needed to be able to create a ExportedService for ICalculator
5456

55-
#Remote Services
56-
Celix::RemoteServiceAdmin
57-
TestExportImportRemoteServiceFactory #needed to be able to create a ExportedService for ICalculator
57+
CalculatorProvider
58+
)
5859

59-
CalculatorProvider
60-
)
60+
add_celix_container(RemoteCalculatorConsumer
61+
CXX
62+
GROUP rsa
63+
PROPERTIES
64+
CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=debug
65+
BUNDLES
66+
Celix::ShellCxx
67+
Celix::shell_tui
6168

62-
add_celix_container(RemoteCalculatorConsumer
63-
CXX
64-
GROUP rsa
65-
PROPERTIES
66-
CELIX_LOGGING_DEFAULT_ACTIVE_LOG_LEVEL=debug
67-
BUNDLES
68-
Celix::ShellCxx
69-
Celix::shell_tui
69+
#Remote Services
70+
Celix::RsaConfiguredDiscovery
71+
Celix::RemoteServiceAdmin
72+
TestExportImportRemoteServiceFactory #needed to be able to create a ExportedService for ICalculator
7073

71-
#Remote Services
72-
Celix::RsaConfiguredDiscovery
73-
Celix::RemoteServiceAdmin
74-
TestExportImportRemoteServiceFactory #needed to be able to create a ExportedService for ICalculator
75-
76-
CalculatorConsumer
77-
)
78-
endif ()
74+
CalculatorConsumer
75+
)
7976
endif()
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# C++ Remote Service Amdatu Integration
2+
3+
This project part tests the integration of the C++ Remote Service Admin implementation with the
4+
C++ configuration-based remote service discovery.
5+
6+
Because the C++ Remote Service Admin is based on export and import service factories and does not directly
7+
implement a transportation or serializer technology, the integration tests are based on a simple implementation of
8+
inter process communication message queue (IPC mq) for transportation and a simple memcpy for serialization.

bundles/cxx_remote_services/integration/gtest/src/RemoteServicesIntegrationTestSuite.cc

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,6 @@ class RemoteServicesIntegrationTestSuite : public ::testing::Test {
7474
}
7575

7676
void invokeRemoteCalcService() {
77-
installProviderBundles();
78-
installConsumerBundles();
79-
8077
//If a calculator provider bundle is installed I expect a exported calculator interface
8178
auto count = serverCtx->useService<ICalculator>()
8279
.setFilter("(service.exported.interfaces=*)")
@@ -98,8 +95,8 @@ class RemoteServicesIntegrationTestSuite : public ::testing::Test {
9895

9996
/*
10097
* Testing the remote service in a while loop till it is successful or 10 seconds has passed.
101-
* Note that because pubsub does not guarantee a connection when used, it is possible - and likely -
102-
* that the first remote test iteration fails due to not yet completely connected pubsub.
98+
* Note that because mq does not guarantee a connection when used, it is possible - and likely -
99+
* that the first remote test iteration fails due to not yet completely connected mq.
103100
*/
104101
auto start = std::chrono::system_clock::now();
105102
auto now = std::chrono::system_clock::now();
@@ -156,5 +153,7 @@ TEST_F(RemoteServicesIntegrationTestSuite, StartStopFrameworks) {
156153
}
157154

158155
TEST_F(RemoteServicesIntegrationTestSuite, InvokeRemoteCalcService) {
156+
installProviderBundles();
157+
installConsumerBundles();
159158
invokeRemoteCalcService();
160159
}

bundles/cxx_remote_services/integration/resources/endpoint_discovery.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@
44
"endpoint.id": "id-01",
55
"service.imported": true,
66
"service.imported.configs": [
7-
"pubsub"
7+
"ipc-mq"
88
],
99
"service.exported.interfaces": "ICalculator",
1010
"endpoint.objectClass": "ICalculator",
11-
"endpoint.topic": "test",
12-
"endpoint.scope": "default"
11+
"endpoint.client.to.provider.channel.id": "1234",
12+
"endpoint.provider.to.client.channel.id": "1235"
1313
}
1414
]
1515
}

bundles/cxx_remote_services/integration/src/CalculatorConsumer.cc

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,20 +46,8 @@ class CalculatorConsumer final : public celix::IShellCommand {
4646
});
4747
counter++;
4848
}
49-
50-
void start() {
51-
stream = calculator->result();
52-
stream->forEach([](double val) {
53-
fprintf(stdout, "calc result stream: %f\n", val);
54-
});
55-
}
56-
57-
void stop() {
58-
stream.reset();
59-
}
6049
private:
6150
std::shared_ptr<ICalculator> calculator{};
62-
std::shared_ptr<celix::PushStream<double>> stream{};
6351
};
6452

6553
class CalculatorConsumerActivator {
@@ -71,7 +59,6 @@ class CalculatorConsumerActivator {
7159
.setCallbacks(&CalculatorConsumer::setCalculator);
7260
cmp.createProvidedService<celix::IShellCommand>()
7361
.addProperty(celix::IShellCommand::COMMAND_NAME, "calc");
74-
cmp.setCallbacks(nullptr, &CalculatorConsumer::start, &CalculatorConsumer::stop, nullptr);
7562
cmp.build();
7663

7764
//bootstrap own configured import discovery to the configured discovery manager

bundles/cxx_remote_services/integration/src/CalculatorProvider.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ class CalculatorProviderActivator {
9898
.setCallbacks(&CalculatorImpl::setPushStreamProvider);
9999
cmp.createProvidedService<ICalculator>()
100100
.addProperty("service.exported.interfaces", celix::typeName<ICalculator>())
101-
.addProperty("endpoint.topic", "test")
102-
.addProperty("endpoint.scope", "default")
101+
.addProperty("endpoint.client.to.provider.channel.id", "1234")
102+
.addProperty("endpoint.provider.to.client.channel.id", "1235")
103103
.addProperty("service.exported.intents", "osgi.async");
104104

105105
cmp.setCallbacks(&CalculatorImpl::init, &CalculatorImpl::start, &CalculatorImpl::stop, &CalculatorImpl::deinit);

bundles/cxx_remote_services/integration/src/TestExportImportRemoteServiceFactory.cc

Lines changed: 45 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -102,38 +102,19 @@ static void sendMsgWithIpc(const celix::LogHelper& logHelper, int qidSender, con
102102
}
103103
}
104104

105-
static int getConsumer2ProviderChannelId(std::string& scope, std::string& topic) {
106-
auto c2pChannel = std::string{"c2p"} + "/" + scope + "/" + topic; //client 2 provider channel
107-
auto c2pId = (int)celix_utils_stringHash(c2pChannel.c_str());
108-
return c2pId;
109-
}
110-
111-
static long getProvider2ConsumerChannelId(std::string& scope, std::string& topic) {
112-
auto p2cChannel = std::string{"p2c"} + "/" + scope + "/" + topic; //provider 2 client channel
113-
int p2cId = (int)celix_utils_stringHash(p2cChannel.c_str());
114-
return p2cId;
115-
}
116-
117105
/**
118106
* A importedCalculater which acts as a pubsub proxy to a imported remote service.
119107
*/
120108
class ImportedCalculator final : public ICalculator {
121109
public:
122-
explicit ImportedCalculator(celix::LogHelper _logHelper, int c2pChannelId, int p2cChannelId) : logHelper{std::move(_logHelper)} {
110+
explicit ImportedCalculator(celix::LogHelper _logHelper, long c2pChannelId, long p2cChannelId) : logHelper{std::move(_logHelper)} {
123111
setupMsgIpc(c2pChannelId, p2cChannelId);
124112
}
125113

126-
~ImportedCalculator() noexcept override {
127-
//failing al leftover deferreds
128-
{
129-
std::lock_guard lock{mutex};
130-
for (auto& pair : deferreds) {
131-
pair.second.fail(celix::rsa::RemoteServicesException{"Shutting down proxy"});
132-
}
133-
}
134-
};
114+
~ImportedCalculator() noexcept override = default;
135115

136116
std::shared_ptr<celix::PushStream<double>> result() override {
117+
std::lock_guard lock{mutex};
137118
return stream;
138119
}
139120

@@ -158,6 +139,7 @@ class ImportedCalculator final : public ICalculator {
158139
}
159140

160141
int start() {
142+
std::lock_guard lock{mutex};
161143
ses = psp->createSynchronousEventSource<double>(factory);
162144
stream = psp->createStream<double>(ses, factory);
163145
running.store(true, std::memory_order::memory_order_release);
@@ -176,6 +158,8 @@ class ImportedCalculator final : public ICalculator {
176158
running.store(false, std::memory_order::memory_order_release);
177159
receiveThread.join();
178160
receiveThread = {};
161+
cleanupDeferreds();
162+
std::lock_guard lock{mutex};
179163
ses->close();
180164
ses.reset();
181165
stream.reset();
@@ -197,15 +181,30 @@ class ImportedCalculator final : public ICalculator {
197181
}
198182

199183
private:
200-
void setupMsgIpc(int c2pChannelId, int p2cChannelId) {
184+
void setupMsgIpc(long c2pChannelId, long p2cChannelId) {
185+
logHelper.debug("Creating msg queue for ImportedCalculator with c2pChannelId=%li and p2cChannelId=%li",
186+
c2pChannelId,
187+
p2cChannelId);
201188
int keySender = (int)c2pChannelId;
202189
int keyReceiver = (int)p2cChannelId;
203190
qidSender = msgget(keySender, 0666 | IPC_CREAT);
204191
qidReceiver = msgget(keyReceiver, 0666 | IPC_CREAT);
205192

206193
if (qidSender == -1 || qidReceiver == -1) {
207194
throw std::logic_error{"RsaShmClient: Error creating msg queue."};
195+
} else {
196+
logHelper.info("Created msg queue for ImportedCalculator with qidSender=%i and qidReceiver=%i",
197+
qidSender,
198+
qidReceiver);
199+
}
200+
}
201+
202+
void cleanupDeferreds() {
203+
std::lock_guard lock{mutex};
204+
for (auto& pair : deferreds) {
205+
pair.second.tryFail(celix::rsa::RemoteServicesException{"Shutting down proxy"});
208206
}
207+
deferreds.clear();
209208
}
210209

211210
void receiveMessages() {
@@ -233,9 +232,9 @@ class ImportedCalculator final : public ICalculator {
233232
lock.unlock();
234233

235234
if (ret.hasError) {
236-
deferred.fail(celix::rsa::RemoteServicesException{ret.errorMsg});
235+
deferred.tryFail(celix::rsa::RemoteServicesException{ret.errorMsg});
237236
} else {
238-
deferred.resolve(ret.result);
237+
deferred.tryResolve(ret.result);
239238
}
240239
} catch (const IpcException& e) {
241240
logHelper.error("IpcException: %s", e.what());
@@ -249,6 +248,7 @@ class ImportedCalculator final : public ICalculator {
249248
return; // no message available (yet)
250249
}
251250
auto event = msg.value().mtext;
251+
logHelper.trace("Received event %f", event.eventData);
252252

253253
if (event.hasError) {
254254
logHelper.error("Received error event %s", event.errorMsg);
@@ -312,7 +312,7 @@ class ComponentImportRegistration final : public celix::rsa::IImportRegistration
312312
*/
313313
class CalculatorImportServiceFactory final : public celix::rsa::IImportServiceFactory {
314314
public:
315-
static constexpr const char * const CONFIGS = "pubsub";
315+
static constexpr const char * const CONFIGS = "ipc-mq";
316316

317317
explicit CalculatorImportServiceFactory(std::shared_ptr<celix::BundleContext> _ctx) : ctx{std::move(_ctx)}, logHelper{ctx, "celix::rsa::RemoteServiceFactory"} {}
318318
~CalculatorImportServiceFactory() noexcept override = default;
@@ -332,10 +332,11 @@ class CalculatorImportServiceFactory final : public celix::rsa::IImportServiceFa
332332

333333
private:
334334
std::string createImportedCalculatorComponent(const celix::rsa::EndpointDescription& endpoint) {
335-
auto topic = endpoint.getProperties().get("endpoint.topic");
336-
auto scope = endpoint.getProperties().get("endpoint.scope");
337-
auto c2pChannelId = getConsumer2ProviderChannelId(scope, topic);
338-
auto p2cChannelId = getProvider2ConsumerChannelId(scope, topic);
335+
for (auto it : endpoint.getProperties()) {
336+
logHelper.info("Endpoint property %s=%s", it.first.c_str(), it.second.c_str());
337+
}
338+
auto c2pChannelId = endpoint.getProperties().getAsLong("endpoint.client.to.provider.channel.id", -1);
339+
auto p2cChannelId = endpoint.getProperties().getAsLong("endpoint.provider.to.client.channel.id", -1);
339340

340341
auto& cmp = ctx->getDependencyManager()->createComponent(std::make_unique<ImportedCalculator>(logHelper, c2pChannelId, p2cChannelId));
341342
cmp.createServiceDependency<celix::PromiseFactory>()
@@ -378,7 +379,7 @@ class CalculatorImportServiceFactory final : public celix::rsa::IImportServiceFa
378379
*/
379380
class ExportedCalculator final {
380381
public:
381-
explicit ExportedCalculator(celix::LogHelper _logHelper, int c2pChannelId, int p2cChannelId) : logHelper{std::move(_logHelper)} {
382+
explicit ExportedCalculator(celix::LogHelper _logHelper, long c2pChannelId, long p2cChannelId) : logHelper{std::move(_logHelper)} {
382383
setupMsgIpc(c2pChannelId, p2cChannelId);
383384
}
384385

@@ -434,7 +435,10 @@ class ExportedCalculator final {
434435
calculator = calc;
435436
}
436437
private:
437-
void setupMsgIpc(int c2pChannelId, int p2cChannelId) {
438+
void setupMsgIpc(long c2pChannelId, long p2cChannelId) {
439+
logHelper.debug("Creating msg queue for ExportCalculator with c2pChannelId=%li and p2cChannelId=%li",
440+
c2pChannelId,
441+
p2cChannelId);
438442
//note reverse order of sender and receiver compared to ImportedCalculator
439443
int keySender = (int)p2cChannelId;
440444
int keyReceiver = (int)c2pChannelId;
@@ -443,6 +447,10 @@ class ExportedCalculator final {
443447

444448
if (qidSender == -1 || qidReceiver == -1) {
445449
throw std::logic_error{"RsaShmClient: Error creating msg queue."};
450+
} else {
451+
logHelper.info("Created msg queue for ExportCalculator with qidSender=%i and qidReceiver=%i",
452+
qidSender,
453+
qidReceiver);
446454
}
447455
}
448456

@@ -518,7 +526,7 @@ class ComponentExportRegistration final : public celix::rsa::IExportRegistration
518526
*/
519527
class CalculatorExportServiceFactory final : public celix::rsa::IExportServiceFactory {
520528
public:
521-
static constexpr const char * const CONFIGS = "pubsub";
529+
static constexpr const char * const CONFIGS = "ipc-mq";
522530
static constexpr const char * const INTENTS = "osgi.async";
523531

524532
explicit CalculatorExportServiceFactory(std::shared_ptr<celix::BundleContext> _ctx) : ctx{std::move(_ctx)},
@@ -544,13 +552,12 @@ class CalculatorExportServiceFactory final : public celix::rsa::IExportServiceFa
544552

545553
private:
546554
std::string createExportedCalculatorComponent(const celix::Properties& serviceProperties) {
547-
auto topic = serviceProperties.get("endpoint.topic");
548-
auto scope = serviceProperties.get("endpoint.scope");
549-
auto c2pChannelId = getConsumer2ProviderChannelId(scope, topic);
550-
auto p2cChannelId = getProvider2ConsumerChannelId(scope, topic);
555+
auto c2pChannelId = serviceProperties.getAsLong("endpoint.client.to.provider.channel.id", -1);
556+
auto p2cChannelId = serviceProperties.getAsLong("endpoint.provider.to.client.channel.id", -1);
551557
auto svcId = serviceProperties.get(celix::SERVICE_ID);
552558

553-
auto& cmp = ctx->getDependencyManager()->createComponent(std::make_unique<ExportedCalculator>(logHelper, c2pChannelId, p2cChannelId));
559+
auto& cmp = ctx->getDependencyManager()->createComponent(
560+
std::make_unique<ExportedCalculator>(logHelper, c2pChannelId, p2cChannelId));
554561

555562
cmp.createServiceDependency<celix::PromiseFactory>()
556563
.setRequired(true)

0 commit comments

Comments
 (0)