Skip to content

Commit

Permalink
finalize
Browse files Browse the repository at this point in the history
Took 12 minutes
  • Loading branch information
kagurazaka-ayano committed Jan 28, 2024
1 parent 1359a71 commit 34da393
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 52 deletions.
36 changes: 18 additions & 18 deletions src/test/OperatorTest/ProducerConsumerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ TEST(ProducerTest, SubscribeToNewTopic) {
Producer<IntMessage> producer;
Queue<IntMessage> queue;
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
Topic topic1{"topic1"};
manager->relate(topic1, queue);

ASSERT_NO_THROW(producer.subscribe(topic1));
manager->unrelate(topic1, queue);
manager->flush();
}

TEST(ProducerTest, SubscribeToAlreadySubscribedTopic) {
Expand All @@ -32,7 +34,7 @@ TEST(ProducerTest, SubscribeToAlreadySubscribedTopic) {
manager->relate(topic1, queue);
producer.subscribe(topic1);
ASSERT_THROW(producer.subscribe(topic1), std::runtime_error);
manager->unrelate(topic1, queue);
manager->flush();
}

TEST(ProducerTest, UnsubscribeFromSubscribedTopic) {
Expand All @@ -43,7 +45,7 @@ TEST(ProducerTest, UnsubscribeFromSubscribedTopic) {
manager->relate(topic1, queue);
producer.subscribe(topic1);
ASSERT_NO_THROW(producer.unsubscribe(topic1));
manager->unrelate(topic1, queue);
manager->flush();
}

TEST(ProducerTest, UnsubscribeFromNotSubscribedTopic) {
Expand All @@ -53,7 +55,7 @@ TEST(ProducerTest, UnsubscribeFromNotSubscribedTopic) {
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->relate(topic1, queue);
ASSERT_THROW(producer.unsubscribe(topic1), std::runtime_error);
manager->unrelate(topic1, queue);
manager->flush();
}

TEST(ProducerTest, PublishMessageToSubscribedTopic) {
Expand All @@ -65,7 +67,7 @@ TEST(ProducerTest, PublishMessageToSubscribedTopic) {
auto message = IntMessage(1);
producer.subscribe(topic1);
ASSERT_NO_THROW(producer.publishMessage(topic1, message));
manager->unrelate(topic1, queue);
manager->flush();
}

TEST(ProducerTest, PublishMessageToNotSubscribedTopic) {
Expand All @@ -84,7 +86,6 @@ TEST(ProducerTest, BroadcastMessageWhenNoTopicSubscribed) {
TEST(ProducerTest, BroadcastMessageWhenTopicsSubscribed) {
Producer<IntMessage> producer;
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
Topic topic1{"topic1"};
Topic topic2{"topic2"};
auto queue1 = Queue<IntMessage>();
Expand All @@ -95,75 +96,75 @@ TEST(ProducerTest, BroadcastMessageWhenTopicsSubscribed) {
producer.subscribe(topic1);
producer.subscribe(topic2);
ASSERT_NO_THROW(producer.broadcastMessage(message));
manager->flush();
}


TEST(ConsumerTest, SubscribeToNewTopic) {
Consumer<IntMessage> consumer;
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
Topic topic1{"topic1"};
Queue<IntMessage> queue;
manager->relate(topic1, queue);
ASSERT_NO_THROW(consumer.subscribe(topic1));
manager->flush();
}

TEST(ConsumerTest, SubscribeToAlreadySubscribedTopic) {
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
Consumer<IntMessage> consumer;
Topic topic1{"topic1"};
Queue<IntMessage> queue;
manager->relate(topic1, queue);
consumer.subscribe(topic1);
ASSERT_THROW(consumer.subscribe(topic1), std::runtime_error);
manager->flush();
}

TEST(ConsumerTest, UnsubscribeFromSubscribedTopic) {
Consumer<IntMessage> consumer;
Topic topic1{"topic1"};
Queue<IntMessage> queue;
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
manager->relate(topic1, queue);
consumer.subscribe(topic1);
ASSERT_NO_THROW(consumer.unsubscribe(topic1));
manager->flush();
}

TEST(ConsumerTest, UnsubscribeFromNotSubscribedTopic) {
Consumer<IntMessage> consumer;
Topic topic1{"topic1"};
Queue<IntMessage> queue;
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
manager->relate(topic1, queue);
ASSERT_THROW(consumer.unsubscribe(topic1), std::runtime_error);
manager->flush();
}

TEST(ConsumerTest, FetchMessageFromSubscribedEmptyTopic) {
Consumer<IntMessage> consumer;
Topic topic1{"topic1"};
Queue<IntMessage> queue;
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
manager->relate(topic1, queue);
consumer.subscribe(topic1);
ASSERT_THROW(consumer.fetchSingleTopic(topic1), std::runtime_error);
manager->flush();
}

TEST(ConsumerTest, FetchMessageFromNotSubscribedTopic) {
Consumer<IntMessage> consumer;
Topic topic1{"topic1"};
Queue<IntMessage> queue;
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
manager->relate(topic1, queue);
ASSERT_THROW(consumer.fetchSingleTopic(topic1), std::runtime_error);
manager->flush();
}

TEST(ConsumerTest, FetchMessageFromAllSubscribedTopics) {
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
Consumer<IntMessage> consumer;
Topic topic1{"topic1"};
Queue<IntMessage> queue1;
Expand All @@ -178,6 +179,7 @@ TEST(ConsumerTest, FetchMessageFromAllSubscribedTopics) {
consumer.subscribe(topic1);
consumer.subscribe(topic2);
ASSERT_NO_THROW(consumer.fetchMessage());
manager->flush();
}

TEST(ConsumerTest, FetchMessageWhenNoTopicSubscribed) {
Expand All @@ -190,7 +192,6 @@ TEST(ProducerTest, ConcurrentPublishToSubscribedTopic) {
Producer<IntMessage> producer;
Queue<IntMessage> queue;
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
Topic topic1{"topic1"};
manager->relate(topic1, queue);
auto message = IntMessage(1);
Expand All @@ -204,14 +205,14 @@ TEST(ProducerTest, ConcurrentPublishToSubscribedTopic) {

ASSERT_EQ(queue.size(), 2);
manager->unrelate(topic1, queue);
manager->flush();
}

TEST(ConsumerTest, ConcurrentFetchFromSubscribedTopic) {
Consumer<IntMessage> consumer;
Topic topic1{"topic1"};
Queue<IntMessage> queue;
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
manager->relate(topic1, queue);
consumer.subscribe(topic1);

Expand All @@ -228,6 +229,7 @@ TEST(ConsumerTest, ConcurrentFetchFromSubscribedTopic) {

ASSERT_EQ(queue.size(), 0);
manager->unrelate(topic1, queue);
manager->flush();
}

TEST(ProducerConsumerTest, MultipleConsumersFetchingFromSingleProducer) {
Expand All @@ -236,7 +238,6 @@ TEST(ProducerConsumerTest, MultipleConsumersFetchingFromSingleProducer) {
Topic topic1{"topic1"};
Queue<IntMessage> queue;
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
manager->relate(topic1, queue);
producer.subscribe(topic1);
consumer1.subscribe(topic1);
Expand All @@ -256,7 +257,7 @@ TEST(ProducerConsumerTest, MultipleConsumersFetchingFromSingleProducer) {
t3.join();

ASSERT_EQ(queue.size(), 0);
manager->unrelate(topic1, queue);
manager->flush();
}

TEST(ProducerConsumerTest, NoDataMissedWithMultipleConsumersFetchingFromSingleProducer) {
Expand All @@ -265,7 +266,6 @@ TEST(ProducerConsumerTest, NoDataMissedWithMultipleConsumersFetchingFromSinglePr
Topic topic1{"topic1"};
Queue<IntMessage> queue;
auto manager = MessageQueueManager<IntMessage>::Instance();
manager->flush();
manager->relate(topic1, queue);
producer.subscribe(topic1);
consumer1.subscribe(topic1);
Expand All @@ -282,5 +282,5 @@ TEST(ProducerConsumerTest, NoDataMissedWithMultipleConsumersFetchingFromSinglePr

ASSERT_EQ(f1.get()[0]->getContent() + f2.get()[0]->getContent() + f3.get()[0]->getContent(), 6);

manager->unrelate(topic1, queue);
manager->flush();
}
99 changes: 65 additions & 34 deletions src/test/QueueTest/QueueTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,45 +12,76 @@
#include <string>
#include <chrono>

TEST(QueueTest, PushAndWait) {
auto queue = messaging::Queue<messaging::StringMessage>("test");
std::string out;
std::stringstream str;

for(int i = 0; i < 10; ++i){
str << i;
queue.push(messaging::StringMessage(str.str()));
out = queue.wait()->getContent();
EXPECT_EQ(out, str.str());
str.str("");
}
}
using namespace messaging;

void pusher(messaging::Queue<messaging::StringMessage>& queue_in) {
for (int i = 0; i < 10; ++i) {
std::stringstream str;
str << i;
queue_in.push(str.str());
str.str("");
TEST(QueueTest, HighDataFlowWithConcurrentAccess) {
Queue<IntMessage> queue;
const int dataFlow = 10000;
std::vector<IntMessage> messages;
for (int i = 0; i < dataFlow; ++i) {
messages.push_back(IntMessage(i));
}
}

void fetcher(messaging::Queue<messaging::StringMessage>& queue_in) {
while(!queue_in.empty()) {
for (int i = 0; i < 10; ++i) {
std::stringstream str;
str << i;
auto ret = queue_in.wait()->getContent();
EXPECT_EQ(ret, str.str());
str.str("");
std::thread producer([&]() {
for (const auto& message : messages) {
queue.push(message);
}
});

std::vector<IntMessage> fetchedMessages;
std::thread consumer([&]() {
for (int i = 0; i < dataFlow; ++i) {
fetchedMessages.push_back(*queue.wait());
}
});

producer.join();
consumer.join();

ASSERT_EQ(fetchedMessages.size(), dataFlow);
for (int i = 0; i < dataFlow; ++i) {
ASSERT_EQ(fetchedMessages[i].getContent(), i);
}
}

TEST(QueueTest, Multithread) {
auto queue = messaging::Queue<messaging::StringMessage>("test");
auto th1 = std::thread(pusher, std::ref(queue));
auto th2 = std::thread(fetcher, std::ref(queue));
th1.join();
th2.join();
TEST(QueueTest, HighDataFlowWithMultipleProducersAndConsumers) {
Queue<IntMessage> queue;
const int dataFlow = 10000;
std::vector<IntMessage> messages;
for (int i = 0; i < dataFlow; ++i) {
messages.push_back(IntMessage(i));
}

std::thread producer1([&]() {
for (const auto& message : messages) {
queue.push(message);
}
});

std::thread producer2([&]() {
for (const auto& message : messages) {
queue.push(message);
}
});

std::vector<IntMessage> fetchedMessages1;
std::thread consumer1([&]() {
for (int i = 0; i < dataFlow; ++i) {
fetchedMessages1.push_back(*queue.wait());
}
});

std::vector<IntMessage> fetchedMessages2;
std::thread consumer2([&]() {
for (int i = 0; i < dataFlow; ++i) {
fetchedMessages2.push_back(*queue.wait());
}
});

producer1.join();
producer2.join();
consumer1.join();
consumer2.join();

ASSERT_EQ(fetchedMessages1.size() + fetchedMessages2.size(), 2 * dataFlow);
}

0 comments on commit 34da393

Please sign in to comment.