Skip to content

Commit e9e91c4

Browse files
authored
fix: chunk down flag assigned to not hit the max grpc limit (#310)
* chunk down flag assigned to not hit the max grpc limit * write test * fixup! write test
1 parent 4039dcc commit e9e91c4

File tree

6 files changed

+362
-6
lines changed

6 files changed

+362
-6
lines changed

openfeature-provider-local/src/main/java/com/spotify/confidence/GrpcWasmFlagLogger.java

Lines changed: 103 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package com.spotify.confidence;
22

3+
import com.google.common.annotations.VisibleForTesting;
34
import com.spotify.confidence.shaded.flags.resolver.v1.InternalFlagLoggerServiceGrpc;
45
import com.spotify.confidence.shaded.flags.resolver.v1.WriteFlagLogsRequest;
56
import com.spotify.confidence.shaded.iam.v1.AuthServiceGrpc;
@@ -8,14 +9,41 @@
89
import io.grpc.ManagedChannel;
910
import io.grpc.ManagedChannelBuilder;
1011
import java.time.Duration;
12+
import java.util.ArrayList;
13+
import java.util.List;
1114
import java.util.Optional;
15+
import java.util.concurrent.ExecutorService;
16+
import java.util.concurrent.Executors;
1217
import org.slf4j.Logger;
1318
import org.slf4j.LoggerFactory;
1419

20+
@FunctionalInterface
21+
interface FlagLogWriter {
22+
void write(WriteFlagLogsRequest request);
23+
}
24+
1525
public class GrpcWasmFlagLogger implements WasmFlagLogger {
1626
private static final String CONFIDENCE_DOMAIN = "edge-grpc.spotify.com";
1727
private static final Logger logger = LoggerFactory.getLogger(GrpcWasmFlagLogger.class);
28+
// Max number of flag_assigned entries per chunk to avoid exceeding gRPC max message size
29+
private static final int MAX_FLAG_ASSIGNED_PER_CHUNK = 1000;
1830
private final InternalFlagLoggerServiceGrpc.InternalFlagLoggerServiceBlockingStub stub;
31+
private final ExecutorService executorService;
32+
private final FlagLogWriter writer;
33+
34+
@VisibleForTesting
35+
public GrpcWasmFlagLogger(ApiSecret apiSecret, FlagLogWriter writer) {
36+
final var channel = createConfidenceChannel();
37+
final AuthServiceGrpc.AuthServiceBlockingStub authService =
38+
AuthServiceGrpc.newBlockingStub(channel);
39+
final TokenHolder tokenHolder =
40+
new TokenHolder(apiSecret.clientId(), apiSecret.clientSecret(), authService);
41+
final Channel authenticatedChannel =
42+
ClientInterceptors.intercept(channel, new JwtAuthClientInterceptor(tokenHolder));
43+
this.stub = InternalFlagLoggerServiceGrpc.newBlockingStub(authenticatedChannel);
44+
this.executorService = Executors.newCachedThreadPool();
45+
this.writer = writer;
46+
}
1947

2048
public GrpcWasmFlagLogger(ApiSecret apiSecret) {
2149
final var channel = createConfidenceChannel();
@@ -26,15 +54,88 @@ public GrpcWasmFlagLogger(ApiSecret apiSecret) {
2654
final Channel authenticatedChannel =
2755
ClientInterceptors.intercept(channel, new JwtAuthClientInterceptor(tokenHolder));
2856
this.stub = InternalFlagLoggerServiceGrpc.newBlockingStub(authenticatedChannel);
57+
this.executorService = Executors.newCachedThreadPool();
58+
this.writer =
59+
request ->
60+
executorService.submit(
61+
() -> {
62+
try {
63+
final var ignore = stub.writeFlagLogs(request);
64+
logger.debug(
65+
"Successfully sent flag log with {} entries",
66+
request.getFlagAssignedCount());
67+
} catch (Exception e) {
68+
logger.error("Failed to write flag logs", e);
69+
}
70+
});
2971
}
3072

3173
@Override
3274
public void write(WriteFlagLogsRequest request) {
33-
if (request.getClientResolveInfoList().isEmpty() && request.getFlagAssignedList().isEmpty()) {
75+
if (request.getClientResolveInfoList().isEmpty()
76+
&& request.getFlagAssignedList().isEmpty()
77+
&& request.getFlagResolveInfoList().isEmpty()) {
3478
logger.debug("Skipping empty flag log request");
3579
return;
3680
}
37-
final var ignore = stub.writeFlagLogs(request);
81+
82+
final int flagAssignedCount = request.getFlagAssignedCount();
83+
84+
// If flag_assigned list is small enough, send everything as-is
85+
if (flagAssignedCount <= MAX_FLAG_ASSIGNED_PER_CHUNK) {
86+
sendAsync(request);
87+
return;
88+
}
89+
90+
// Split flag_assigned into chunks and send each chunk asynchronously
91+
logger.debug(
92+
"Splitting {} flag_assigned entries into chunks of {}",
93+
flagAssignedCount,
94+
MAX_FLAG_ASSIGNED_PER_CHUNK);
95+
96+
final List<WriteFlagLogsRequest> chunks = createFlagAssignedChunks(request);
97+
for (WriteFlagLogsRequest chunk : chunks) {
98+
sendAsync(chunk);
99+
}
100+
}
101+
102+
private List<WriteFlagLogsRequest> createFlagAssignedChunks(WriteFlagLogsRequest request) {
103+
final List<WriteFlagLogsRequest> chunks = new ArrayList<>();
104+
final int totalFlags = request.getFlagAssignedCount();
105+
106+
for (int i = 0; i < totalFlags; i += MAX_FLAG_ASSIGNED_PER_CHUNK) {
107+
final int end = Math.min(i + MAX_FLAG_ASSIGNED_PER_CHUNK, totalFlags);
108+
final WriteFlagLogsRequest.Builder chunkBuilder =
109+
WriteFlagLogsRequest.newBuilder()
110+
.addAllFlagAssigned(request.getFlagAssignedList().subList(i, end));
111+
112+
// Include telemetry and resolve info only in the first chunk
113+
if (i == 0) {
114+
if (request.hasTelemetryData()) {
115+
chunkBuilder.setTelemetryData(request.getTelemetryData());
116+
}
117+
chunkBuilder
118+
.addAllClientResolveInfo(request.getClientResolveInfoList())
119+
.addAllFlagResolveInfo(request.getFlagResolveInfoList());
120+
}
121+
122+
chunks.add(chunkBuilder.build());
123+
}
124+
125+
return chunks;
126+
}
127+
128+
private void sendAsync(WriteFlagLogsRequest request) {
129+
writer.write(request);
130+
}
131+
132+
/**
133+
* Shutdown the executor service. This will allow any pending async writes to complete. Call this
134+
* when the application is shutting down.
135+
*/
136+
@Override
137+
public void shutdown() {
138+
executorService.shutdown();
38139
}
39140

40141
private static ManagedChannel createConfidenceChannel() {

openfeature-provider-local/src/main/java/com/spotify/confidence/LocalResolverServiceFactory.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import com.spotify.confidence.shaded.flags.admin.v1.ResolverStateServiceGrpc.ResolverStateServiceBlockingStub;
1111
import com.spotify.confidence.shaded.flags.resolver.v1.InternalFlagLoggerServiceGrpc;
1212
import com.spotify.confidence.shaded.flags.resolver.v1.Sdk;
13-
import com.spotify.confidence.shaded.flags.resolver.v1.WriteFlagLogsResponse;
13+
import com.spotify.confidence.shaded.flags.resolver.v1.WriteFlagLogsRequest;
1414
import com.spotify.confidence.shaded.iam.v1.AuthServiceGrpc;
1515
import com.spotify.confidence.shaded.iam.v1.AuthServiceGrpc.AuthServiceBlockingStub;
1616
import com.spotify.confidence.shaded.iam.v1.ClientCredential.ClientSecret;
@@ -179,7 +179,15 @@ private static FlagResolverService createFlagResolverService(
179179
.orElse(Duration.ofMinutes(5).toSeconds());
180180
final AtomicReference<byte[]> resolverStateProtobuf =
181181
new AtomicReference<>(accountStateProvider.provide());
182-
final WasmFlagLogger flagLogger = request -> WriteFlagLogsResponse.getDefaultInstance();
182+
// No-op logger for wasm mode with AccountStateProvider
183+
final WasmFlagLogger flagLogger =
184+
new WasmFlagLogger() {
185+
@Override
186+
public void write(WriteFlagLogsRequest request) {}
187+
188+
@Override
189+
public void shutdown() {}
190+
};
183191
final ResolverApi wasmResolverApi =
184192
new ThreadLocalSwapWasmResolverApi(
185193
flagLogger, resolverStateProtobuf.get(), accountId, stickyResolveStrategy);

openfeature-provider-local/src/main/java/com/spotify/confidence/ThreadLocalSwapWasmResolverApi.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ public int getInstanceCount() {
121121
@Override
122122
public void close() {
123123
resolverInstances.values().forEach(SwapWasmResolverApi::close);
124+
flagLogger.shutdown();
124125
resolverInstances.clear();
125126
}
126127
}

openfeature-provider-local/src/main/java/com/spotify/confidence/WasmResolveApi.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,10 @@
3131

3232
class IsClosedException extends Exception {}
3333

34-
@FunctionalInterface
3534
interface WasmFlagLogger {
3635
void write(WriteFlagLogsRequest request);
36+
37+
void shutdown();
3738
}
3839

3940
class WasmResolveApi {

0 commit comments

Comments
 (0)