11package com .spotify .confidence ;
22
3+ import com .google .common .annotations .VisibleForTesting ;
34import com .spotify .confidence .shaded .flags .resolver .v1 .InternalFlagLoggerServiceGrpc ;
45import com .spotify .confidence .shaded .flags .resolver .v1 .WriteFlagLogsRequest ;
56import com .spotify .confidence .shaded .iam .v1 .AuthServiceGrpc ;
1617import org .slf4j .Logger ;
1718import org .slf4j .LoggerFactory ;
1819
20+ @ FunctionalInterface
21+ interface FlagLogWriter {
22+ void write (WriteFlagLogsRequest request );
23+ }
24+
1925public class GrpcWasmFlagLogger implements WasmFlagLogger {
2026 private static final String CONFIDENCE_DOMAIN = "edge-grpc.spotify.com" ;
2127 private static final Logger logger = LoggerFactory .getLogger (GrpcWasmFlagLogger .class );
2228 // Max number of flag_assigned entries per chunk to avoid exceeding gRPC max message size
2329 private static final int MAX_FLAG_ASSIGNED_PER_CHUNK = 1000 ;
2430 private final InternalFlagLoggerServiceGrpc .InternalFlagLoggerServiceBlockingStub stub ;
2531 private final ExecutorService executorService ;
32+ private final FlagLogWriter writer ;
33+
34+ @ VisibleForTesting
35+ public GrpcWasmFlagLogger (ApiSecret apiSecret , FlagLogWriter writer ) {
36+ final var channel = createConfidenceChannel ();
37+ final AuthServiceGrpc .AuthServiceBlockingStub authService =
38+ AuthServiceGrpc .newBlockingStub (channel );
39+ final TokenHolder tokenHolder =
40+ new TokenHolder (apiSecret .clientId (), apiSecret .clientSecret (), authService );
41+ final Channel authenticatedChannel =
42+ ClientInterceptors .intercept (channel , new JwtAuthClientInterceptor (tokenHolder ));
43+ this .stub = InternalFlagLoggerServiceGrpc .newBlockingStub (authenticatedChannel );
44+ this .executorService = Executors .newCachedThreadPool ();
45+ this .writer = writer ;
46+ }
2647
2748 public GrpcWasmFlagLogger (ApiSecret apiSecret ) {
2849 final var channel = createConfidenceChannel ();
@@ -34,6 +55,19 @@ public GrpcWasmFlagLogger(ApiSecret apiSecret) {
3455 ClientInterceptors .intercept (channel , new JwtAuthClientInterceptor (tokenHolder ));
3556 this .stub = InternalFlagLoggerServiceGrpc .newBlockingStub (authenticatedChannel );
3657 this .executorService = Executors .newCachedThreadPool ();
58+ this .writer =
59+ request ->
60+ executorService .submit (
61+ () -> {
62+ try {
63+ final var ignore = stub .writeFlagLogs (request );
64+ logger .debug (
65+ "Successfully sent flag log with {} entries" ,
66+ request .getFlagAssignedCount ());
67+ } catch (Exception e ) {
68+ logger .error ("Failed to write flag logs" , e );
69+ }
70+ });
3771 }
3872
3973 @ Override
@@ -92,16 +126,7 @@ private List<WriteFlagLogsRequest> createFlagAssignedChunks(WriteFlagLogsRequest
92126 }
93127
94128 private void sendAsync (WriteFlagLogsRequest request ) {
95- executorService .submit (
96- () -> {
97- try {
98- stub .writeFlagLogs (request );
99- logger .debug (
100- "Successfully sent flag log with {} entries" , request .getFlagAssignedCount ());
101- } catch (Exception e ) {
102- logger .error ("Failed to write flag logs" , e );
103- }
104- });
129+ writer .write (request );
105130 }
106131
107132 /**
0 commit comments