Skip to content

Commit 13744a6

Browse files
perf: Change StreamingReadRpc::Read interface to take ResponseType (#15319)
1 parent 8e234ce commit 13744a6

37 files changed

+694
-483
lines changed

generator/integration_tests/golden/v1/internal/golden_kitchen_sink_connection_impl.cc

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,8 +182,16 @@ GoldenKitchenSinkConnectionImpl::StreamingRead(google::test::admin::database::v1
182182
internal::MakeResumableStreamingReadRpc<google::test::admin::database::v1::Response, google::test::admin::database::v1::Request>(
183183
retry_policy(*current), backoff_policy(*current), factory,
184184
GoldenKitchenSinkStreamingReadStreamingUpdater, request);
185-
return internal::MakeStreamRange(internal::StreamReader<google::test::admin::database::v1::Response>(
186-
[resumable] { return resumable->Read(); }));
185+
return internal::MakeStreamRange<google::test::admin::database::v1::Response>(
186+
[resumable = std::move(resumable)]()
187+
-> absl::variant<
188+
Status,
189+
google::test::admin::database::v1::Response> {
190+
google::test::admin::database::v1::Response response;
191+
auto status = resumable->Read(&response);
192+
if (status.has_value()) return *status;
193+
return response;
194+
});
187195
}
188196

189197
std::unique_ptr<::google::cloud::AsyncStreamingReadWriteRpc<

generator/integration_tests/tests/golden_kitchen_sink_auth_decorator_test.cc

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ using ::google::test::admin::database::v1::Request;
3737
using ::google::test::admin::database::v1::Response;
3838
using ::testing::ByMove;
3939
using ::testing::IsNull;
40+
using ::testing::Optional;
4041
using ::testing::Return;
41-
using ::testing::VariantWith;
4242

4343
// The general pattern of these test is to make two requests, both of which
4444
// return an error. The first one because the auth strategy fails, the second
@@ -129,13 +129,14 @@ TEST(GoldenKitchenSinkAuthDecoratorTest, StreamingRead) {
129129
grpc::ClientContext ctx;
130130
auto auth_failure = under_test.StreamingRead(
131131
std::make_shared<grpc::ClientContext>(), Options{}, request);
132-
EXPECT_THAT(auth_failure->Read(),
133-
VariantWith<Status>(StatusIs(StatusCode::kInvalidArgument)));
132+
Response response;
133+
EXPECT_THAT(auth_failure->Read(&response),
134+
Optional(StatusIs(StatusCode::kInvalidArgument)));
134135

135136
auto auth_success = under_test.StreamingRead(
136137
std::make_shared<grpc::ClientContext>(), Options{}, request);
137-
EXPECT_THAT(auth_success->Read(),
138-
VariantWith<Status>(StatusIs(StatusCode::kPermissionDenied)));
138+
EXPECT_THAT(auth_success->Read(&response),
139+
Optional(StatusIs(StatusCode::kPermissionDenied)));
139140
}
140141

141142
TEST(GoldenKitchenSinkAuthDecoratorTest, ListServiceAccountKeys) {

generator/integration_tests/tests/golden_kitchen_sink_logging_decorator_test.cc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ using ::google::test::admin::database::v1::Response;
3737
using ::testing::ByMove;
3838
using ::testing::Contains;
3939
using ::testing::HasSubstr;
40+
using ::testing::Optional;
4041
using ::testing::Return;
4142
using ::testing::StartsWith;
42-
using ::testing::VariantWith;
4343

4444
class LoggingDecoratorTest : public ::testing::Test {
4545
protected:
@@ -202,7 +202,8 @@ TEST_F(LoggingDecoratorTest, StreamingReadRpcNoRpcStreams) {
202202
GoldenKitchenSinkLogging stub(mock_, TracingOptions{}, {});
203203
auto response = stub.StreamingRead(std::make_shared<grpc::ClientContext>(),
204204
Options{}, Request{});
205-
EXPECT_THAT(response->Read(), VariantWith<Status>(IsOk()));
205+
Response r;
206+
EXPECT_THAT(response->Read(&r), Optional(IsOk()));
206207

207208
auto const log_lines = log_.ExtractLines();
208209
EXPECT_THAT(log_lines, Contains(HasSubstr("StreamingRead(")));
@@ -218,7 +219,8 @@ TEST_F(LoggingDecoratorTest, StreamingReadRpcWithRpcStreams) {
218219
GoldenKitchenSinkLogging stub(mock_, TracingOptions{}, {"rpc-streams"});
219220
auto response = stub.StreamingRead(std::make_shared<grpc::ClientContext>(),
220221
Options{}, Request{});
221-
EXPECT_THAT(response->Read(), VariantWith<Status>(IsOk()));
222+
Response r;
223+
EXPECT_THAT(response->Read(&r), Optional(IsOk()));
222224

223225
auto const log_lines = log_.ExtractLines();
224226
EXPECT_THAT(log_lines, Contains(HasSubstr("StreamingRead(")));

generator/integration_tests/tests/golden_kitchen_sink_metadata_decorator_test.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,10 @@ using ::testing::AllOf;
4343
using ::testing::AnyOf;
4444
using ::testing::Contains;
4545
using ::testing::Not;
46+
using ::testing::Optional;
4647
using ::testing::Pair;
4748
using ::testing::Return;
4849
using ::testing::UnorderedElementsAre;
49-
using ::testing::VariantWith;
5050

5151
class MetadataDecoratorTest : public ::testing::Test {
5252
protected:
@@ -299,7 +299,8 @@ TEST_F(MetadataDecoratorTest, StreamingRead) {
299299
GoldenKitchenSinkMetadata stub(mock_, {});
300300
auto response = stub.StreamingRead(std::make_shared<grpc::ClientContext>(),
301301
Options{}, Request{});
302-
EXPECT_THAT(response->Read(), VariantWith<Status>(Not(IsOk())));
302+
Response r;
303+
EXPECT_THAT(response->Read(&r), Optional(Not(IsOk())));
303304
}
304305

305306
TEST_F(MetadataDecoratorTest, StreamingWrite) {

generator/integration_tests/tests/golden_kitchen_sink_stub_test.cc

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ using ::google::cloud::testing_util::StatusIs;
3434
using ::google::test::admin::database::v1::Request;
3535
using ::google::test::admin::database::v1::Response;
3636
using ::testing::_;
37+
using ::testing::Optional;
3738
using ::testing::Return;
38-
using ::testing::VariantWith;
3939

4040
class MockGrpcGoldenKitchenSinkStub : public ::google::test::admin::database::
4141
v1::GoldenKitchenSink::StubInterface {
@@ -764,11 +764,12 @@ TEST_F(GoldenKitchenSinkStubTest, StreamingRead) {
764764
std::move(iampolicy_stub_), std::move(location_stub_));
765765
auto success_stream = stub.StreamingRead(
766766
std::make_shared<grpc::ClientContext>(), Options{}, request);
767-
EXPECT_THAT(success_stream->Read(), VariantWith<Status>(IsOk()));
767+
Response response;
768+
EXPECT_THAT(success_stream->Read(&response), Optional(IsOk()));
768769
auto failure_stream = stub.StreamingRead(
769770
std::make_shared<grpc::ClientContext>(), Options{}, request);
770-
EXPECT_THAT(failure_stream->Read(),
771-
VariantWith<Status>(StatusIs(StatusCode::kUnavailable)));
771+
EXPECT_THAT(failure_stream->Read(&response),
772+
Optional(StatusIs(StatusCode::kUnavailable)));
772773
}
773774

774775
class MockWriteObjectResponse

generator/integration_tests/tests/golden_kitchen_sink_tracing_stub_test.cc

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ using ::google::test::admin::database::v1::Response;
5050
using ::testing::_;
5151
using ::testing::IsEmpty;
5252
using ::testing::Not;
53-
using ::testing::VariantWith;
53+
using ::testing::Optional;
5454

5555
auto constexpr kErrorCode = "ABORTED";
5656

@@ -189,8 +189,9 @@ TEST(GoldenKitchenSinkTracingStubTest, StreamingRead) {
189189
auto under_test = GoldenKitchenSinkTracingStub(mock);
190190
auto stream = under_test.StreamingRead(
191191
std::make_shared<grpc::ClientContext>(), Options{}, Request{});
192-
auto v = stream->Read();
193-
EXPECT_THAT(v, VariantWith<Status>(StatusIs(StatusCode::kAborted)));
192+
Response response;
193+
EXPECT_THAT(stream->Read(&response),
194+
Optional(StatusIs(StatusCode::kAborted)));
194195

195196
auto spans = span_catcher->GetSpans();
196197
EXPECT_THAT(

generator/integration_tests/tests/mock_golden_kitchen_sink_stub.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,8 @@ class MockStreamingReadRpc
148148
::google::test::admin::database::v1::Response> {
149149
public:
150150
MOCK_METHOD(void, Cancel, (), (override));
151-
MOCK_METHOD(
152-
(absl::variant<Status, ::google::test::admin::database::v1::Response>),
153-
Read, (), (override));
151+
MOCK_METHOD((absl::optional<Status>), Read,
152+
(::google::test::admin::database::v1::Response*), (override));
154153
MOCK_METHOD(RpcMetadata, GetRequestMetadata, (), (const, override));
155154
};
156155

generator/internal/connection_impl_generator.cc

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -352,8 +352,16 @@ StreamRange<$response_type$>
352352
internal::MakeResumableStreamingReadRpc<$response_type$, $request_type$>(
353353
retry_policy(*current), backoff_policy(*current), factory,
354354
$service_name$$method_name$StreamingUpdater, request);
355-
return internal::MakeStreamRange(internal::StreamReader<$response_type$>(
356-
[resumable] { return resumable->Read(); }));
355+
return internal::MakeStreamRange<$response_type$>(
356+
[resumable = std::move(resumable)]()
357+
-> absl::variant<
358+
Status,
359+
$response_type$> {
360+
$response_type$ response;
361+
auto status = resumable->Read(&response);
362+
if (status.has_value()) return *status;
363+
return response;
364+
});
357365
}
358366
)""";
359367
}

google/cloud/aiplatform/v1/internal/featurestore_online_serving_connection_impl.cc

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,17 @@ FeaturestoreOnlineServingServiceConnectionImpl::StreamingReadFeatureValues(
108108
retry_policy(*current), backoff_policy(*current), factory,
109109
FeaturestoreOnlineServingServiceStreamingReadFeatureValuesStreamingUpdater,
110110
request);
111-
return internal::MakeStreamRange(
112-
internal::StreamReader<
113-
google::cloud::aiplatform::v1::ReadFeatureValuesResponse>(
114-
[resumable] { return resumable->Read(); }));
111+
return internal::MakeStreamRange<
112+
google::cloud::aiplatform::v1::ReadFeatureValuesResponse>(
113+
[resumable = std::move(resumable)]()
114+
-> absl::variant<
115+
Status,
116+
google::cloud::aiplatform::v1::ReadFeatureValuesResponse> {
117+
google::cloud::aiplatform::v1::ReadFeatureValuesResponse response;
118+
auto status = resumable->Read(&response);
119+
if (status.has_value()) return *status;
120+
return response;
121+
});
115122
}
116123

117124
StatusOr<google::cloud::aiplatform::v1::WriteFeatureValuesResponse>

google/cloud/aiplatform/v1/internal/prediction_connection_impl.cc

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,14 @@ PredictionServiceConnectionImpl::StreamRawPredict(
118118
google::cloud::aiplatform::v1::StreamRawPredictRequest>(
119119
retry_policy(*current), backoff_policy(*current), factory,
120120
PredictionServiceStreamRawPredictStreamingUpdater, request);
121-
return internal::MakeStreamRange(
122-
internal::StreamReader<google::api::HttpBody>(
123-
[resumable] { return resumable->Read(); }));
121+
return internal::MakeStreamRange<google::api::HttpBody>(
122+
[resumable = std::move(
123+
resumable)]() -> absl::variant<Status, google::api::HttpBody> {
124+
google::api::HttpBody response;
125+
auto status = resumable->Read(&response);
126+
if (status.has_value()) return *status;
127+
return response;
128+
});
124129
}
125130

126131
StatusOr<google::cloud::aiplatform::v1::DirectPredictResponse>
@@ -196,10 +201,16 @@ PredictionServiceConnectionImpl::ServerStreamingPredict(
196201
google::cloud::aiplatform::v1::StreamingPredictRequest>(
197202
retry_policy(*current), backoff_policy(*current), factory,
198203
PredictionServiceServerStreamingPredictStreamingUpdater, request);
199-
return internal::MakeStreamRange(
200-
internal::StreamReader<
201-
google::cloud::aiplatform::v1::StreamingPredictResponse>(
202-
[resumable] { return resumable->Read(); }));
204+
return internal::MakeStreamRange<
205+
google::cloud::aiplatform::v1::StreamingPredictResponse>(
206+
[resumable = std::move(resumable)]()
207+
-> absl::variant<
208+
Status, google::cloud::aiplatform::v1::StreamingPredictResponse> {
209+
google::cloud::aiplatform::v1::StreamingPredictResponse response;
210+
auto status = resumable->Read(&response);
211+
if (status.has_value()) return *status;
212+
return response;
213+
});
203214
}
204215

205216
std::unique_ptr<::google::cloud::AsyncStreamingReadWriteRpc<
@@ -256,10 +267,16 @@ PredictionServiceConnectionImpl::StreamGenerateContent(
256267
google::cloud::aiplatform::v1::GenerateContentRequest>(
257268
retry_policy(*current), backoff_policy(*current), factory,
258269
PredictionServiceStreamGenerateContentStreamingUpdater, request);
259-
return internal::MakeStreamRange(
260-
internal::StreamReader<
261-
google::cloud::aiplatform::v1::GenerateContentResponse>(
262-
[resumable] { return resumable->Read(); }));
270+
return internal::MakeStreamRange<
271+
google::cloud::aiplatform::v1::GenerateContentResponse>(
272+
[resumable = std::move(resumable)]()
273+
-> absl::variant<
274+
Status, google::cloud::aiplatform::v1::GenerateContentResponse> {
275+
google::cloud::aiplatform::v1::GenerateContentResponse response;
276+
auto status = resumable->Read(&response);
277+
if (status.has_value()) return *status;
278+
return response;
279+
});
263280
}
264281

265282
StreamRange<google::cloud::location::Location>

0 commit comments

Comments
 (0)