Skip to content

Commit c3fc7c3

Browse files
committed
Address Review comments.
1 parent 3136bca commit c3fc7c3

File tree

4 files changed

+66
-44
lines changed

4 files changed

+66
-44
lines changed

interop-testing/src/main/java/io/grpc/testing/integration/TestServiceClient.java

Lines changed: 34 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package io.grpc.testing.integration;
1818

1919
import static com.google.common.truth.Truth.assertThat;
20-
import static io.grpc.testing.integration.TestCases.MCS_CS;
2120
import static org.junit.Assert.assertEquals;
2221
import static org.junit.Assert.assertFalse;
2322
import static org.junit.Assert.assertNotEquals;
@@ -593,16 +592,14 @@ private void runTest(TestCases testCase) throws Exception {
593592
if (serverHostOverride != null) {
594593
channelBuilder.overrideAuthority(serverHostOverride);
595594
}
596-
if (testCase.equals(MCS_CS.toString())) {
597-
channelBuilder.disableServiceConfigLookUp();
598-
try {
599-
@SuppressWarnings("unchecked")
600-
Map<String, ?> serviceConfigMap = (Map<String, ?>) JsonParser.parse(
601-
"{\"connection_scaling\":{\"max_connections_per_subchannel\": 2}}");
602-
channelBuilder.defaultServiceConfig(serviceConfigMap);
603-
} catch (IOException e) {
604-
throw new RuntimeException(e);
605-
}
595+
channelBuilder.disableServiceConfigLookUp();
596+
try {
597+
@SuppressWarnings("unchecked")
598+
Map<String, ?> serviceConfigMap = (Map<String, ?>) JsonParser.parse(
599+
"{\"connection_scaling\":{\"max_connections_per_subchannel\": 2}}");
600+
channelBuilder.defaultServiceConfig(serviceConfigMap);
601+
} catch (IOException e) {
602+
throw new RuntimeException(e);
606603
}
607604
tester.testMcs(TestServiceGrpc.newStub(channelBuilder.build()));
608605
break;
@@ -1045,7 +1042,8 @@ public void testOrcaOob() throws Exception {
10451042
streamObserver.onNext(StreamingOutputCallRequest.newBuilder()
10461043
.setOrcaOobReport(answer2)
10471044
.addResponseParameters(ResponseParameters.newBuilder().setSize(1).build()).build());
1048-
assertThat(streamingOutputCallResponseObserver.isCompleted).isTrue();
1045+
assertThat(streamingOutputCallResponseObserver.take())
1046+
.isInstanceOf(StreamingOutputCallResponse.class);
10491047

10501048
for (i = 0; i < retryLimit; i++) {
10511049
Thread.sleep(1000);
@@ -1055,6 +1053,8 @@ public void testOrcaOob() throws Exception {
10551053
}
10561054
}
10571055
assertThat(i).isLessThan(retryLimit);
1056+
streamObserver.onCompleted();
1057+
assertThat(streamingOutputCallResponseObserver.verifiedCompleted()).isTrue();
10581058
}
10591059

10601060
@Override
@@ -1084,8 +1084,8 @@ protected int operationTimeoutMillis() {
10841084

10851085
class StreamingOutputCallResponseObserver implements
10861086
StreamObserver<StreamingOutputCallResponse> {
1087+
private final Object lastItem = new Object();
10871088
private final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
1088-
private volatile boolean isCompleted = true;
10891089

10901090
@Override
10911091
public void onNext(StreamingOutputCallResponse value) {
@@ -1099,12 +1099,16 @@ public void onError(Throwable t) {
10991099

11001100
@Override
11011101
public void onCompleted() {
1102-
isCompleted = true;
1102+
queue.add(lastItem);
11031103
}
11041104

11051105
Object take() throws InterruptedException {
11061106
return queue.take();
11071107
}
1108+
1109+
boolean verifiedCompleted() throws InterruptedException {
1110+
return queue.take() == lastItem;
1111+
}
11081112
}
11091113

11101114
public void testMcs(TestServiceGrpc.TestServiceStub asyncStub) throws Exception {
@@ -1113,13 +1117,15 @@ public void testMcs(TestServiceGrpc.TestServiceStub asyncStub) throws Exception
11131117
StreamObserver<StreamingOutputCallRequest> streamObserver1 =
11141118
asyncStub.fullDuplexCall(responseObserver1);
11151119
StreamingOutputCallRequest request = StreamingOutputCallRequest.newBuilder()
1116-
.setPayload(Payload.newBuilder().setBody(
1117-
ByteString.copyFromUtf8(MCS_CS.description())).build()).build();
1120+
.addResponseParameters(ResponseParameters.newBuilder()
1121+
.setSendClientSocketAddressInResponse(
1122+
Messages.BoolValue.newBuilder().setValue(true).build())
1123+
.build())
1124+
.build();
11181125
streamObserver1.onNext(request);
11191126
Object responseObj = responseObserver1.take();
11201127
StreamingOutputCallResponse callResponse = (StreamingOutputCallResponse) responseObj;
1121-
String clientSocketAddressInCall1 = new String(callResponse.getPayload().getBody()
1122-
.toByteArray(), UTF_8);
1128+
String clientSocketAddressInCall1 = callResponse.getClientSocketAddress();
11231129
assertThat(clientSocketAddressInCall1).isNotEmpty();
11241130

11251131
StreamingOutputCallResponseObserver responseObserver2 =
@@ -1128,30 +1134,30 @@ public void testMcs(TestServiceGrpc.TestServiceStub asyncStub) throws Exception
11281134
asyncStub.fullDuplexCall(responseObserver2);
11291135
streamObserver2.onNext(request);
11301136
callResponse = (StreamingOutputCallResponse) responseObserver2.take();
1131-
String clientSocketAddressInCall2 =
1132-
new String(callResponse.getPayload().getBody().toByteArray(), UTF_8);
1137+
String clientSocketAddressInCall2 = callResponse.getClientSocketAddress();
11331138

11341139
assertThat(clientSocketAddressInCall1).isEqualTo(clientSocketAddressInCall2);
11351140

11361141
// The first connection is at max rpc call count of 2, so the 3rd rpc will cause a new
11371142
// connection to be created in the same subchannel and not get queued.
1138-
/*StreamingOutputCallResponseObserver responseObserver3 =
1143+
StreamingOutputCallResponseObserver responseObserver3 =
11391144
new StreamingOutputCallResponseObserver();
11401145
StreamObserver<StreamingOutputCallRequest> streamObserver3 =
11411146
asyncStub.fullDuplexCall(responseObserver3);
11421147
streamObserver3.onNext(request);
11431148
callResponse = (StreamingOutputCallResponse) responseObserver3.take();
1144-
String clientSocketAddressInCall3 =
1145-
new String(callResponse.getPayload().getBody().toByteArray(), UTF_8);
1149+
String clientSocketAddressInCall3 = callResponse.getClientSocketAddress();
11461150

1147-
assertThat(clientSocketAddressInCall3).isNotEqualTo(clientSocketAddressInCall1);*/
1151+
// This assertion is currently failing because connection scaling when MCS limit has been
1152+
// reached is not yet implemented in gRPC Java.
1153+
assertThat(clientSocketAddressInCall3).isNotEqualTo(clientSocketAddressInCall1);
11481154

11491155
streamObserver1.onCompleted();
1150-
assertThat(responseObserver1.isCompleted).isTrue();
1156+
assertThat(responseObserver1.verifiedCompleted()).isTrue();
11511157
streamObserver2.onCompleted();
1152-
assertThat(responseObserver2.isCompleted).isTrue();
1153-
/*streamObserver3.onCompleted();
1154-
assertThat(responseObserver3.isCompleted).isTrue();*/
1158+
assertThat(responseObserver2.verifiedCompleted()).isTrue();
1159+
streamObserver3.onCompleted();
1160+
assertThat(responseObserver3.verifiedCompleted()).isTrue();
11551161
}
11561162
}
11571163

interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
package io.grpc.testing.integration;
1818

1919
import static io.grpc.Grpc.TRANSPORT_ATTR_REMOTE_ADDR;
20-
import static io.grpc.testing.integration.TestCases.MCS_CS;
21-
import static java.nio.charset.StandardCharsets.UTF_8;
2220

2321
import com.google.common.base.Preconditions;
2422
import com.google.common.collect.Queues;
@@ -54,6 +52,7 @@
5452
import java.util.Arrays;
5553
import java.util.HashMap;
5654
import java.util.HashSet;
55+
import java.util.Iterator;
5756
import java.util.List;
5857
import java.util.Map;
5958
import java.util.Queue;
@@ -243,20 +242,27 @@ public void onNext(StreamingOutputCallRequest request) {
243242
.asRuntimeException());
244243
return;
245244
}
246-
if (new String(request.getPayload().getBody().toByteArray(), UTF_8)
247-
.equals(MCS_CS.description())) {
248-
SocketAddress peerAddress = PEER_ADDRESS_CONTEXT_KEY.get();
249-
ByteString payload = ByteString.copyFromUtf8(peerAddress.toString());
250-
StreamingOutputCallResponse.Builder responseBuilder =
251-
StreamingOutputCallResponse.newBuilder();
252-
responseBuilder.setPayload(
253-
Payload.newBuilder()
254-
.setBody(payload));
255-
responseObserver.onNext(responseBuilder.build());
245+
if (whetherSendClientSocketAddressInResponse(request)) {
246+
responseObserver.onNext(
247+
StreamingOutputCallResponse.newBuilder()
248+
.setClientSocketAddress(PEER_ADDRESS_CONTEXT_KEY.get().toString())
249+
.build());
250+
return;
256251
}
257252
dispatcher.enqueue(toChunkQueue(request));
258253
}
259254

255+
private boolean whetherSendClientSocketAddressInResponse(StreamingOutputCallRequest request) {
256+
Iterator<ResponseParameters> responseParametersIterator =
257+
request.getResponseParametersList().iterator();
258+
while (responseParametersIterator.hasNext()) {
259+
if (responseParametersIterator.next().getSendClientSocketAddressInResponse().getValue()) {
260+
return true;
261+
}
262+
}
263+
return false;
264+
}
265+
260266
@Override
261267
public void onCompleted() {
262268
if (oobTestLocked) {

interop-testing/src/main/java/io/grpc/testing/integration/TestServiceServer.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public void run() {
7575
private int port = 8080;
7676
private boolean useTls = true;
7777
private boolean useAlts = false;
78-
private boolean useMcs = false;
78+
private boolean setMcsLimit = false;
7979

8080
private ScheduledExecutorService executor;
8181
private Server server;
@@ -119,8 +119,9 @@ void parseArgs(String[] args) {
119119
usage = true;
120120
break;
121121
}
122-
} else if ("use_mcs".equals(key)) {
123-
useMcs = Boolean.parseBoolean(value);
122+
} else if ("set_max_concurrent_streams_limit".equals(key)) {
123+
setMcsLimit = Boolean.parseBoolean(value);
124+
// TODO: Make Netty server builder usable for IPV6 as well (not limited to MCS handling)
124125
addressType = Util.AddressType.IPV4; // To use NettyServerBuilder
125126
} else {
126127
System.err.println("Unknown argument: " + key);
@@ -145,6 +146,8 @@ void parseArgs(String[] args) {
145146
+ "\n for testing. Only effective when --use_alts=true."
146147
+ "\n --address_type=IPV4|IPV6|IPV4_IPV6"
147148
+ "\n What type of addresses to listen on. Default IPV4_IPV6"
149+
+ "\n --set_max_concurrent_streams_limit"
150+
+ "\n Whether to set the maximum concurrent streams limit"
148151
);
149152
System.exit(1);
150153
}
@@ -190,7 +193,7 @@ void start() throws Exception {
190193
if (v4Address != null && !v4Address.equals(localV4Address)) {
191194
((NettyServerBuilder) serverBuilder).addListenAddress(v4Address);
192195
}
193-
if (useMcs) {
196+
if (setMcsLimit) {
194197
((NettyServerBuilder) serverBuilder).maxConcurrentCallsPerConnection(2);
195198
}
196199
break;

interop-testing/src/main/proto/grpc/testing/messages.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,10 @@ message ResponseParameters {
159159
// implement the full compression tests by introspecting the call to verify
160160
// the response's compression status.
161161
BoolValue compressed = 3;
162+
163+
// Whether to request the server to send the requesting client's socket
164+
// address in the response.
165+
BoolValue send_client_socket_address_in_response = 4;
162166
}
163167

164168
// Server-streaming request.
@@ -186,6 +190,9 @@ message StreamingOutputCallRequest {
186190
message StreamingOutputCallResponse {
187191
// Payload to increase response size.
188192
Payload payload = 1;
193+
194+
// The client's socket address if requested.
195+
string client_socket_address = 2;
189196
}
190197

191198
// For reconnect interop test only.

0 commit comments

Comments
 (0)