Skip to content

Commit

Permalink
Merge pull request #3 from k-wasniowski/Dev
Browse files Browse the repository at this point in the history
Added execution context
  • Loading branch information
k-wasniowski authored Sep 30, 2023
2 parents 61e30d2 + 985e4d8 commit 78311a6
Show file tree
Hide file tree
Showing 21 changed files with 335 additions and 47 deletions.
8 changes: 2 additions & 6 deletions App/src/main.cpp
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
#include <HttpServer/Server.hpp>
#include <MediaServer/MediaManager/MediaManager.hpp>
#include <MediaServer/RtspClient/RtspClient.hpp>
#include <MediaServer/GenericRtpClient/GenericRtpClient.hpp>
#include <MediaServer/Server.hpp>

#include <iostream>

int main()
{
std::cout << "Initializing!" << std::endl;

boost::asio::io_context ioContext{};

auto pRtspClient = MediaServer::Rtsp::RtspClient::Create(ioContext);
auto pGenericRtpClient = MediaServer::Rtp::GenericRtpClient::Create(ioContext);
auto pMediaServer = MediaServer::Server::Create();

auto pMediaManager = MediaServer::MediaManager::Create();

Expand Down
4 changes: 3 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ set(CMAKE_TOOLCHAIN_FILE ${vcpkg_SOURCE_DIR}/scripts/buildsystems/vcpkg.cmake)

project(ArdaLiveMediaServer LANGUAGES CXX VERSION 0.0.1)

enable_testing()

set(CMAKE_CXX_STANDARD 23)

set(CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/CMake)
Expand All @@ -28,7 +30,7 @@ include(Docs)
find_package(Drogon CONFIG REQUIRED)
find_package(LibDataChannel CONFIG REQUIRED)
find_package(opentelemetry-cpp CONFIG REQUIRED)
find_package(Boost REQUIRED)
find_package(Boost COMPONENTS thread REQUIRED)

add_subdirectory(Gondor)
add_subdirectory(Media)
Expand Down
6 changes: 1 addition & 5 deletions DevApps/GenericRtpClientDevApp/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@

int main()
{
boost::asio::io_context ioContext{};

auto pRtpClientSession = MediaServer::Rtp::GenericRtpClient::Create(ioContext);
auto pRtpClientSession = MediaServer::Rtp::GenericRtpClient::Create();

std::string address{"127.0.0.1"};
uint16_t port = 5004;
Expand All @@ -22,7 +20,5 @@ int main()

pRtpClientSession->InitiateNewSession(address, port, sessionDescription);

ioContext.run();

return 0;
}
13 changes: 13 additions & 0 deletions Gondor/Execution/include/Gondor/Execution/ExecutionContext.hpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#pragma once

#include <Gondor/Execution/ITasksExecutor.hpp>

#include <memory>
Expand All @@ -6,12 +8,23 @@ namespace Gondor
{
namespace Execution
{
class ExecutionContext;
using ExecutionContextSharedPtr = std::shared_ptr<ExecutionContext>;
using ExecutionContextWeakPtr = std::weak_ptr<ExecutionContext>;

class ExecutionContext : public ITasksExecutor
{
public:
static ExecutionContextSharedPtr Create(uint32_t threadsCount = 1);

ExecutionContext(uint32_t threadsCount = 1);
~ExecutionContext() override;

bool PostTask(std::function<void()> task) override;

void Run();
void Terminate();

private:
class Impl;
std::unique_ptr<Impl> m_pImpl;
Expand Down
5 changes: 5 additions & 0 deletions Gondor/Execution/include/Gondor/Execution/ITasksExecutor.hpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
#pragma once

#include <functional>

namespace Gondor
{
Expand All @@ -7,6 +10,8 @@ namespace Gondor
{
public:
virtual ~ITasksExecutor() = default;

virtual bool PostTask(std::function<void()> task) = 0;
};
}
}
61 changes: 60 additions & 1 deletion Gondor/Execution/src/ExecutionContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,82 @@ namespace Gondor
}

Impl(uint32_t threadsCount)
: m_pool{1}
: m_pool{threadsCount}
, m_executor{m_pool.executor()}
{
std::cout << "Creating ExecutionContext!" << std::endl;
}

~Impl() {}

void Run()
{
std::cout << "Running ExecutionContext!" << std::endl;
}

void Terminate()
{
std::cout << "Terminating ExecutionContext!" << std::endl;

m_pool.join();
}

bool PostTask(std::function<void()> task)
{
if (!task)
{
return false;
}

boost::asio::post(m_executor, task);

return true;
}

private:
boost::asio::thread_pool m_pool;
boost::asio::executor m_executor;
};

ExecutionContextSharedPtr ExecutionContext::Create(uint32_t threadsCount)
{
return std::make_shared<ExecutionContext>(threadsCount);
}

ExecutionContext::ExecutionContext(uint32_t threadsCount)
: m_pImpl{Impl::Create(threadsCount)}
{}

ExecutionContext::~ExecutionContext() {}

void ExecutionContext::Run()
{
if (!m_pImpl)
{
return;
}

m_pImpl->Run();
}

void ExecutionContext::Terminate()
{
if (!m_pImpl)
{
return;
}

m_pImpl->Terminate();
}

bool ExecutionContext::PostTask(std::function<void()> task)
{
if (!m_pImpl)
{
return false;
}

return m_pImpl->PostTask(task);
}
}
}
5 changes: 5 additions & 0 deletions MediaServer/GenericRtpClient/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,9 @@ target_link_libraries(GenericRtpClient
MediaServer::MediaManager
)

target_link_libraries(GenericRtpClient
PRIVATE
Boost::thread
)

add_library(MediaServer::GenericRtpClient ALIAS GenericRtpClient)
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#pragma once

#include <MediaServer/GenericRtpClient/IGenericRtpClient.hpp>

#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>

#include <memory>
#include <string>
Expand All @@ -12,19 +15,20 @@ namespace MediaServer
class GenericRtpClient;
using GenericRtpClientSharedPtr_t = std::shared_ptr<GenericRtpClient>;

class GenericRtpClient
class GenericRtpClient : public IGenericRtpClient
{
public:
static GenericRtpClientSharedPtr_t Create(boost::asio::io_context& ioContext);
static GenericRtpClientSharedPtr_t Create();

explicit GenericRtpClient(boost::asio::io_context& ioContext);
explicit GenericRtpClient();

virtual ~GenericRtpClient();

bool InitiateNewSession(std::string ip, uint16_t port, std::string sessionDescription);

private:
boost::asio::io_context& m_ioContext;
boost::asio::io_context m_ioContext;
boost::thread_group m_threadPool;
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#pragma once

#include <memory>
#include <string>

namespace MediaServer
{
namespace Rtp
{
class IGenericRtpClient
{
public:
virtual ~IGenericRtpClient() = default;

virtual bool InitiateNewSession(std::string ip, uint16_t port, std::string sessionDescription) = 0;
};

using IGenericRtpClientSharedPtr_t = std::shared_ptr<IGenericRtpClient>;
}
}
28 changes: 21 additions & 7 deletions MediaServer/GenericRtpClient/src/GenericRtpClient.cpp
Original file line number Diff line number Diff line change
@@ -1,24 +1,38 @@
#include <MediaServer/GenericRtpClient/GenericRtpClient.hpp>
#include <MediaServer/GenericRtpClient/Details/RtpClientSession.hpp>
#include <MediaServer/GenericRtpClient/GenericRtpClient.hpp>

#include <Media/Sdp/SessionDescription.hpp>

#include <iostream>

namespace
{
constexpr auto kDefaultNumberOfThreads = 4;
}

namespace MediaServer
{
namespace Rtp
{
GenericRtpClientSharedPtr_t GenericRtpClient::Create(boost::asio::io_context& ioContext)
GenericRtpClientSharedPtr_t GenericRtpClient::Create()
{
return std::make_shared<GenericRtpClient>(ioContext);
return std::make_shared<GenericRtpClient>();
}

GenericRtpClient::GenericRtpClient(boost::asio::io_context& ioContext)
: m_ioContext{ioContext}
{}
GenericRtpClient::GenericRtpClient()
: m_ioContext{}
{
for (auto i = 0; i < kDefaultNumberOfThreads; ++i)
{
m_threadPool.create_thread(boost::bind(&boost::asio::io_context::run, &m_ioContext));
}
}

GenericRtpClient::~GenericRtpClient() {}
GenericRtpClient::~GenericRtpClient()
{
m_ioContext.stop();
m_threadPool.join_all();
}

bool GenericRtpClient::InitiateNewSession(std::string ip, uint16_t port, std::string sessionDescription)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
#pragma once

#include <boost/asio.hpp>
#include <MediaServer/GenericRtpClient/IGenericRtpClient.hpp>

#include <MediaServer/GenericRtpClient/GenericRtpClient.hpp>

#include <Gondor/Execution/ExecutionContext.hpp>

#include <memory>

Expand All @@ -9,15 +13,20 @@ namespace MediaServer
class GenericRtpClientProxy;
using GenericRtpClientProxySharedPtr_t = std::shared_ptr<GenericRtpClientProxy>;

class GenericRtpClientProxy
class GenericRtpClientProxy : public MediaServer::Rtp::IGenericRtpClient
{
public:
static GenericRtpClientProxySharedPtr_t Create(boost::asio::io_context& ioContext);
static GenericRtpClientProxySharedPtr_t Create(Gondor::Execution::ExecutionContextWeakPtr pExecutionContext,
MediaServer::Rtp::GenericRtpClientSharedPtr_t pGenericRtpClient);

GenericRtpClientProxy(boost::asio::io_context& ioContext);
GenericRtpClientProxy(Gondor::Execution::ExecutionContextWeakPtr pExecutionContext,
MediaServer::Rtp::GenericRtpClientSharedPtr_t pGenericRtpClient);
~GenericRtpClientProxy();

bool InitiateNewSession(std::string ip, uint16_t port, std::string sessionDescription);

private:
boost::asio::io_context& m_ioContext;
Gondor::Execution::ExecutionContextWeakPtr m_pExecutionContext;
MediaServer::Rtp::GenericRtpClientSharedPtr_t m_pGenericRtpClient;
};
}
10 changes: 10 additions & 0 deletions MediaServer/MediaServer/include/MediaServer/Server.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <MediaServer/GenericRtpClient/IGenericRtpClient.hpp>

#include <memory>

namespace MediaServer
Expand Down Expand Up @@ -44,6 +46,14 @@ namespace MediaServer
*/
void Run();

/**
* @brief Terminate
* @details This method will stop the server.
*/
void Terminate();

MediaServer::Rtp::IGenericRtpClientSharedPtr_t MakeGenericRtpClient();

private:
class Impl;

Expand Down
Loading

0 comments on commit 78311a6

Please sign in to comment.