Skip to content

Commit

Permalink
feat grpc: update server-streaming service handlers interface
Browse files Browse the repository at this point in the history
commit_hash:fde15fa7fc0df5bd6f34e2eaa365d2d1dce499f1
  • Loading branch information
kpavlov00 committed Oct 12, 2024
1 parent abeec6f commit 6f57b87
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 28 deletions.
7 changes: 5 additions & 2 deletions grpc/include/userver/ugrpc/server/generic_service_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <grpcpp/support/byte_buffer.h>

#include <userver/ugrpc/server/call_context.hpp>
#include <userver/ugrpc/server/result.hpp>
#include <userver/ugrpc/server/service_component_base.hpp>
#include <userver/ugrpc/server/stream.hpp>

Expand Down Expand Up @@ -66,8 +67,10 @@ class GenericServiceBase {
using Component = impl::ServiceComponentBase<GenericServiceBase>;

using GenericCallContext = ugrpc::server::GenericCallContext;

using GenericReaderWriter =
ugrpc::server::ReaderWriter<grpc::ByteBuffer, grpc::ByteBuffer>;
using GenericResult = ugrpc::server::StreamingResult<grpc::ByteBuffer>;

GenericServiceBase(GenericServiceBase&&) = delete;
GenericServiceBase& operator=(GenericServiceBase&&) = delete;
Expand All @@ -80,8 +83,8 @@ class GenericServiceBase {
/// @note The implementation of the method should call `Finish` or
/// `FinishWithError`, otherwise the server will respond with an "internal
/// server error" status.
virtual grpc::Status Handle(GenericCallContext& context,
GenericReaderWriter& stream);
virtual GenericResult Handle(GenericCallContext& context,
GenericReaderWriter& stream);

// Legacy
using Call = BidirectionalStream<grpc::ByteBuffer, grpc::ByteBuffer>;
Expand Down
9 changes: 9 additions & 0 deletions grpc/include/userver/ugrpc/server/impl/call_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ void Finish(Call& call, ugrpc::server::Result<Response>&& result) {
}
}

template <typename Call, typename Response>
void Finish(Call& call, ugrpc::server::StreamingResult<Response>&& result) {
if (result.HasLastResponse()) {
call.WriteAndFinish(std::move(result).ExtractLastResponse());
} else {
Finish(call, result.GetStatus());
}
}

} // namespace ugrpc::server::impl

USERVER_NAMESPACE_END
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

#include <grpcpp/impl/codegen/proto_utils.h>

#include <userver/ugrpc/server/call_context.hpp>
#include <userver/ugrpc/server/impl/service_worker.hpp>
#include <userver/ugrpc/server/result.hpp>
#include <userver/ugrpc/server/service_base.hpp>
Expand Down
44 changes: 41 additions & 3 deletions grpc/include/userver/ugrpc/server/result.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
/// @file userver/ugrpc/server/result.hpp
/// @brief @copybrief ugrpc::server::Result

#include <optional>
#include <variant>

#include <grpcpp/support/status.h>
Expand All @@ -11,20 +12,22 @@ USERVER_NAMESPACE_BEGIN

namespace ugrpc::server {

/// @brief Provides a way to return either Response or grpc::Status
/// @brief Result type for service handlers (non server-streaming)
///
/// Provides a way to return either Response or grpc::Status
template <typename Response>
class Result {
public:
/// Construct instance from Response, imply success status
/*implicit*/ Result(Response&& response) : result_{std::move(response)} {}

/// Construct instance from grpc::Status, only error status allowed
/*implicit*/ Result(const grpc::Status& status) : result_{status} {
/*implicit*/ Result(grpc::Status&& status) : result_{std::move(status)} {
UINVARIANT(!GetErrorStatus().ok(), "Only error status allowed");
}

/// Construct instance from grpc::Status, only error status allowed
/*implicit*/ Result(grpc::Status&& status) : result_{std::move(status)} {
/*implicit*/ Result(const grpc::Status& status) : result_{status} {
UINVARIANT(!GetErrorStatus().ok(), "Only error status allowed");
}

Expand All @@ -51,6 +54,41 @@ class Result {
std::variant<Response, grpc::Status> result_;
};

/// @brief Special result type for server-streaming service handlers
template <typename Response>
class StreamingResult final {
public:
/// Construct instance from grpc::Status
/*implicit*/ StreamingResult(grpc::Status&& status)
: status_{std::move(status)} {}

/// Construct instance from grpc::Status
/*implicit*/ StreamingResult(const grpc::Status& status) : status_{status} {}

/// Construct instance with last response
///
/// Allows perform writing last response and coalesce it with status in a
/// single step.
/*implicit*/ StreamingResult(Response&& last_response)
: last_response_(std::move(last_response)) {}

/// @cond
bool HasLastResponse() const { return last_response_.has_value(); }

Response&& ExtractLastResponse() && {
return std::move(last_response_).value();
}

const Response& GetLastResponse() const { return last_response_.value(); }

const grpc::Status& GetStatus() const { return status_; }
/// @endcond

private:
std::optional<Response> last_response_;
grpc::Status status_{grpc::Status::OK};
};

} // namespace ugrpc::server

USERVER_NAMESPACE_END
3 changes: 3 additions & 0 deletions grpc/include/userver/ugrpc/server/service_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <userver/engine/task/task_processor_fwd.hpp>

#include <userver/ugrpc/server/call_context.hpp>
#include <userver/ugrpc/server/impl/service_worker.hpp>
#include <userver/ugrpc/server/middlewares/fwd.hpp>

Expand All @@ -26,6 +27,8 @@ struct ServiceConfig final {
/// classes, not from this class directly.
class ServiceBase {
public:
using CallContext = ugrpc::server::CallContext;

ServiceBase& operator=(ServiceBase&&) = delete;
virtual ~ServiceBase();

Expand Down
8 changes: 4 additions & 4 deletions grpc/src/ugrpc/server/generic_service_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ namespace ugrpc::server {

GenericServiceBase::~GenericServiceBase() = default;

grpc::Status GenericServiceBase::Handle(GenericCallContext& /*context*/,
GenericReaderWriter& /*stream*/) {
GenericServiceBase::GenericResult GenericServiceBase::Handle(
GenericCallContext& /*context*/, GenericReaderWriter& /*stream*/) {
UASSERT_MSG(
false,
"Called not implemented GenericServiceBase/Handle(GenericCallContext&, "
Expand All @@ -19,8 +19,8 @@ grpc::Status GenericServiceBase::Handle(GenericCallContext& /*context*/,

void GenericServiceBase::Handle(Call& call) {
GenericCallContext context{call};
grpc::Status status = Handle(context, call);
impl::Finish(call, status);
auto result = Handle(context, call);
impl::Finish(call, std::move(result));
}

} // namespace ugrpc::server
Expand Down
15 changes: 8 additions & 7 deletions scripts/grpc/templates/service.usrv.cpp.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,18 @@ constexpr std::string_view k{{service.name}}MethodNames[] = {
{% for method in service.method %}
{% if method.client_streaming and method.server_streaming %}

grpc::Status {{service.name}}Base::{{method.name}}([[maybe_unused]] CallContext& context,
[[maybe_unused]] {{method.name}}ReaderWriter& stream) {
{{service.name}}Base::{{method.name}}Result {{service.name}}Base::{{method.name}}(
[[maybe_unused]] CallContext& context,
[[maybe_unused]] {{method.name}}ReaderWriter& stream) {
UASSERT_MSG(false, "Called not implemented {{proto.package_prefix}}{{service.name}}/{{method.name}}");
return USERVER_NAMESPACE::ugrpc::server::impl::kUnimplementedStatus;
}

// Legacy
void {{service.name}}Base::{{method.name}}({{method.name}}Call& call) {
CallContext context{call};
grpc::Status status = {{method.name}}(context, call);
USERVER_NAMESPACE::ugrpc::server::impl::Finish(call, status);
auto result = {{method.name}}(context, call);
USERVER_NAMESPACE::ugrpc::server::impl::Finish(call, std::move(result));
}

{% elif method.client_streaming %}
Expand All @@ -58,7 +59,7 @@ void {{service.name}}Base::{{method.name}}({{method.name}}Call& call) {

{% elif method.server_streaming %}

grpc::Status {{service.name}}Base::{{method.name}}(
{{service.name}}Base::{{method.name}}Result {{service.name}}Base::{{method.name}}(
[[maybe_unused]] CallContext& context,
[[maybe_unused]] {{ method.input_type | grpc_to_cpp_name }}&& request,
[[maybe_unused]] {{method.name}}Writer& writer) {
Expand All @@ -70,8 +71,8 @@ grpc::Status {{service.name}}Base::{{method.name}}(
void {{service.name}}Base::{{method.name}}({{method.name}}Call& call,
{{ method.input_type | grpc_to_cpp_name }}&& request) {
CallContext context{call};
grpc::Status status = {{method.name}}(context, std::move(request), call);
USERVER_NAMESPACE::ugrpc::server::impl::Finish(call, status);
auto result = {{method.name}}(context, std::move(request), call);
USERVER_NAMESPACE::ugrpc::server::impl::Finish(call, std::move(result));
}

{% else %}
Expand Down
29 changes: 18 additions & 11 deletions scripts/grpc/templates/service.usrv.hpp.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,27 @@ class {{service.name}}Base
public:
using Component = USERVER_NAMESPACE::ugrpc::server::impl::ServiceComponentBase<
{{service.name}}Base>;
using CallContext = USERVER_NAMESPACE::ugrpc::server::CallContext;

{% for method in service.method %}
{% if method.client_streaming and method.server_streaming %}

using {{method.name}}ReaderWriter = USERVER_NAMESPACE::ugrpc::server::ReaderWriter<
{{ method.input_type | grpc_to_cpp_name }}, {{ method.output_type | grpc_to_cpp_name }}>;
virtual grpc::Status {{method.name}}(CallContext& context, {{method.name}}ReaderWriter& stream);
using {{method.name}}Result = USERVER_NAMESPACE::ugrpc::server::StreamingResult<
{{ method.output_type | grpc_to_cpp_name }}>;
virtual {{method.name}}Result {{method.name}}(CallContext& context, {{method.name}}ReaderWriter& stream);
// Legacy
using {{method.name}}Call = USERVER_NAMESPACE::ugrpc::server::BidirectionalStream<
{{ method.input_type | grpc_to_cpp_name }}, {{ method.output_type | grpc_to_cpp_name }}>;
[[deprecated("Use 'grpc::Status {{method.name}}(CallContext& context, {{method.name}}ReaderWriter& stream)'")]]
[[deprecated("Use '{{method.name}}Result {{method.name}}(CallContext& context, {{method.name}}ReaderWriter& stream)'")]]
virtual void {{method.name}}({{method.name}}Call& call);

{% elif method.client_streaming %}

using {{method.name}}Reader = USERVER_NAMESPACE::ugrpc::server::Reader<{{ method.input_type | grpc_to_cpp_name }}>;
using {{method.name}}Result = USERVER_NAMESPACE::ugrpc::server::Result<{{ method.output_type | grpc_to_cpp_name }}>;
using {{method.name}}Reader = USERVER_NAMESPACE::ugrpc::server::Reader<
{{ method.input_type | grpc_to_cpp_name }}>;
using {{method.name}}Result = USERVER_NAMESPACE::ugrpc::server::Result<
{{ method.output_type | grpc_to_cpp_name }}>;
virtual {{method.name}}Result {{method.name}}(CallContext& context, {{method.name}}Reader& reader);
// Legacy
using {{method.name}}Call = USERVER_NAMESPACE::ugrpc::server::InputStream<
Expand All @@ -44,18 +47,22 @@ class {{service.name}}Base

{% elif method.server_streaming %}

using {{method.name}}Writer = USERVER_NAMESPACE::ugrpc::server::Writer<{{ method.output_type | grpc_to_cpp_name }}>;
virtual grpc::Status {{method.name}}(CallContext& context,
{{ method.input_type | grpc_to_cpp_name }}&& request,
{{method.name}}Writer& writer);
using {{method.name}}Writer = USERVER_NAMESPACE::ugrpc::server::Writer<
{{ method.output_type | grpc_to_cpp_name }}>;
using {{method.name}}Result = USERVER_NAMESPACE::ugrpc::server::StreamingResult<
{{ method.output_type | grpc_to_cpp_name }}>;
virtual {{method.name}}Result {{method.name}}(CallContext& context,
{{ method.input_type | grpc_to_cpp_name }}&& request,
{{method.name}}Writer& writer);
// Legacy
using {{method.name}}Call = USERVER_NAMESPACE::ugrpc::server::OutputStream<{{ method.output_type | grpc_to_cpp_name }}>;
[[deprecated("Use 'grpc::Status {{method.name}}(CallContext& context, {{ method.input_type | grpc_to_cpp_name }}&& request, {{method.name}}Writer& writer)'")]]
[[deprecated("Use '{{method.name}}Result {{method.name}}(CallContext& context, {{ method.input_type | grpc_to_cpp_name }}&& request, {{method.name}}Writer& writer)'")]]
virtual void {{method.name}}({{method.name}}Call& call, {{ method.input_type | grpc_to_cpp_name }}&& request);

{% else %}

using {{method.name}}Result = USERVER_NAMESPACE::ugrpc::server::Result<{{ method.output_type | grpc_to_cpp_name }}>;
using {{method.name}}Result = USERVER_NAMESPACE::ugrpc::server::Result<
{{ method.output_type | grpc_to_cpp_name }}>;
virtual {{method.name}}Result {{method.name}}(CallContext& context, {{ method.input_type | grpc_to_cpp_name }}&& request);
// Legacy
using {{method.name}}Call = USERVER_NAMESPACE::ugrpc::server::UnaryCall<{{ method.output_type | grpc_to_cpp_name }}>;
Expand Down

0 comments on commit 6f57b87

Please sign in to comment.