Skip to content

Commit

Permalink
feature(partner_sdk): fix java examples as per updated proto files (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
fivetran-niketkhandelwal authored May 17, 2024
1 parent 8b9df55 commit cfe4134
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import java.util.*;

public class ConnectorServiceImpl extends ConnectorGrpc.ConnectorImplBase {
public class ConnectorServiceImpl extends SourceConnectorGrpc.SourceConnectorImplBase {
@Override
public void configurationForm(ConfigurationFormRequest request, StreamObserver<ConfigurationFormResponse> responseObserver) {
responseObserver.onNext(
Expand All @@ -17,22 +17,24 @@ public void configurationForm(ConfigurationFormRequest request, StreamObserver<C
.setTableSelectionSupported(true)
.addAllFields(Arrays.asList(
FormField.newBuilder()
.setName("apikey").setLabel("API key").setRequired(true).setTextField(TextField.PlainText).build(),
.setSingle(Field.newBuilder().setName("apiKey").setLabel("API Key").setPlaceholder("my-api-key")
.setRequired(true).setTextField(TextField.PlainText).build())
.build(),
FormField.newBuilder()
.setName("password").setLabel("User Password").setRequired(true).setTextField(TextField.Password).build(),
.setSingle(Field.newBuilder().setName("password").setLabel("User Password").setPlaceholder("p4ssw0rd")
.setRequired(true).setTextField(TextField.Password).build())
.build(),
FormField.newBuilder()
.setName("region").setLabel("AWS Region").setRequired(false).setDropdownField(
DropdownField.newBuilder().addAllDropdownField(
Arrays.asList("US-EAST", "US-WEST")).build()
).build(),
.setSingle(Field.newBuilder().setName("region").setLabel("AWS Region").setDefaultValue("US-EAST").setRequired(false)
.setDropdownField(DropdownField.newBuilder().addAllDropdownField(Arrays.asList("US-EAST", "US-WEST")).build())
.build())
.build(),
FormField.newBuilder()
.setName("hidden").setLabel("my-hidden-value").setTextField(TextField.Hidden)
.setSingle(Field.newBuilder().setName("hidden").setLabel("my-hidden-value").setTextField(TextField.Hidden).build())
.build(),
FormField.newBuilder()
.setName("isPublic")
.setLabel("Public?")
.setDescription("Is this public?")
.setToggleField(ToggleField.newBuilder()
.setSingle(Field.newBuilder().setName("isPublic").setLabel("Public?").setDescription("Is this public?")
.setToggleField(ToggleField.newBuilder().build())
.build())
.build()
))
Expand Down Expand Up @@ -81,101 +83,82 @@ public void schema(SchemaRequest request, StreamObserver<SchemaResponse> respons
@Override
public void update(UpdateRequest request, StreamObserver<UpdateResponse> responseObserver) {
Map<String, String> configuration = request.getConfigurationMap();
String state_json = request.hasStateJson() ? request.getStateJson() : "{}";
String stateJson = request.hasStateJson() ? request.getStateJson() : "{}";
Selection selection = request.hasSelection() ? request.getSelection() : null;

ObjectMapper mapper = new ObjectMapper();
UpdateResponse.Builder responseBuilder = UpdateResponse.newBuilder();

try {
State state = mapper.readValue(state_json, State.class);
State state = mapper.readValue(stateJson, State.class);

// -- Send a log message
responseBuilder.clear();
responseObserver.onNext(responseBuilder
.setLogEntry(LogEntry.newBuilder()
.setLevel(LogLevel.INFO)
.setMessage("Sync STARTING")
.build())
.build());
System.out.println("{" +
"\"level\":\"INFO\"," +
"\"message\": \"[Update]: Sync STARTING\"," +
"\"message-origin\": \"sdk_connector\"" +
"}");

// -- Send UPSERT records
Operation.Builder operationBuilder = Operation.newBuilder();
Record.Builder recordBuilder = Record.newBuilder();
Map<String, ValueType> row = new HashMap<>();
for (int i=0; i<3; i++) {
responseBuilder.clear();
operationBuilder.clear();
recordBuilder.clear();

row.clear();
row.put("a1", ValueType.newBuilder().setString("a-" + i).build());
row.put("a2", ValueType.newBuilder().setDouble(i * 0.234d).build());

responseObserver.onNext(responseBuilder
.setOperation(operationBuilder
.setRecord(recordBuilder
responseObserver.onNext(responseBuilder.setRecord(recordBuilder
.setTableName("table1")
.setType(OpType.UPSERT)
.setType(RecordType.UPSERT)
.putAllData(row)
.build())
.build())
.build());
.build());

state.cursor += 1;
}

// -- Send UPDATE record
responseBuilder.clear();
operationBuilder.clear();
recordBuilder.clear();
row.clear();
row.put("a1", ValueType.newBuilder().setString("a-0").build());
row.put("a2", ValueType.newBuilder().setDouble(110.234d).build());
responseObserver.onNext(responseBuilder
.setOperation(operationBuilder
.setRecord(recordBuilder
responseObserver.onNext(responseBuilder.setRecord(recordBuilder
.setTableName("table1")
.setType(OpType.UPDATE)
.setType(RecordType.UPDATE)
.putAllData(row)
.build())
.build())
.build());
.build());
state.cursor += 1;

// -- Send DELETE record
responseBuilder.clear();
operationBuilder.clear();
recordBuilder.clear();
row.clear();
row.put("a1", ValueType.newBuilder().setString("a-2").build());
responseObserver.onNext(responseBuilder
.setOperation(operationBuilder
.setRecord(recordBuilder
responseObserver.onNext(responseBuilder.setRecord(recordBuilder
.setTableName("table1")
.setType(OpType.DELETE)
.setType(RecordType.DELETE)
.putAllData(row)
.build())
.build())
.build());
.build());
state.cursor += 1;

// -- Send checkpoint
String newState = mapper.writeValueAsString(state);
Checkpoint checkpoint = Checkpoint.newBuilder().setStateJson(newState).build();
operationBuilder.clear();
responseObserver.onNext(responseBuilder
.setOperation(operationBuilder
.setCheckpoint(checkpoint).build()).build());
responseObserver.onNext(responseBuilder.setCheckpoint(checkpoint).build());

// -- Send a log message
responseBuilder.clear();
responseObserver.onNext(responseBuilder
.setLogEntry(LogEntry.newBuilder()
.setLevel(LogLevel.INFO)
.setMessage("Sync DONE")
.build())
.build());
System.out.println("{" +
"\"level\":\"INFO\"," +
"\"message\": \"[Update]: Sync DONE\"," +
"\"message-origin\": \"sdk_connector\"" +
"}");

} catch (JsonProcessingException e) {
responseObserver.onError(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package destination;

import com.google.common.collect.Lists;
import com.google.protobuf.AbstractMessage;
import fivetran_sdk.*;
import io.grpc.stub.StreamObserver;

import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;

public class DestinationServiceImpl extends DestinationGrpc.DestinationImplBase {
public class DestinationServiceImpl extends DestinationConnectorGrpc.DestinationConnectorImplBase {
@Override
public void configurationForm(ConfigurationFormRequest request, StreamObserver<ConfigurationFormResponse> responseObserver) {
responseObserver.onNext(
Expand All @@ -15,32 +18,61 @@ public void configurationForm(ConfigurationFormRequest request, StreamObserver<C
.setTableSelectionSupported(true)
.addAllFields(Arrays.asList(
FormField.newBuilder()
.setName("host").setLabel("Host").setRequired(true).setTextField(TextField.PlainText).build(),
.setSingle(Field.newBuilder().setName("host").setLabel("Host").setPlaceholder("my.example.host")
.setRequired(true).setTextField(TextField.PlainText).build())
.build(),
FormField.newBuilder()
.setName("password").setLabel("Password").setRequired(true).setTextField(TextField.Password).build(),
.setSingle(Field.newBuilder().setName("password").setLabel("Password").setPlaceholder("p4ssw0rd")
.setRequired(true).setTextField(TextField.Password).build())
.build(),
FormField.newBuilder()
.setName("region").setLabel("AWS Region").setRequired(false).setDropdownField(
DropdownField.newBuilder().addAllDropdownField(
Arrays.asList("US-EAST", "US-WEST")).build()
).build(),
.setSingle(Field.newBuilder().setName("region").setLabel("AWS Region").setDefaultValue("US-EAST").setRequired(false)
.setDropdownField(DropdownField.newBuilder().addAllDropdownField(Arrays.asList("US-EAST", "US-WEST")).build())
.build())
.build(),
FormField.newBuilder()
.setName("hidden").setLabel("my-hidden-value").setTextField(TextField.Hidden)
.setSingle(Field.newBuilder().setName("hidden").setLabel("my-hidden-value").setTextField(TextField.Hidden).build())
.build(),
FormField.newBuilder()
.setName("isPublic")
.setLabel("Public?")
.setDescription("Is this public?")
.setToggleField(ToggleField.newBuilder().build())
.setSingle(Field.newBuilder().setName("isPublic").setLabel("Public?").setDescription("Is this public?")
.setToggleField(ToggleField.newBuilder().build())
.build())
.build()
))
.addAllTests(Arrays.asList(
)).addAllTests(Arrays.asList(
ConfigurationTest.newBuilder().setName("connect").setLabel("Tests connection").build(),
ConfigurationTest.newBuilder().setName("select").setLabel("Tests selection").build()))
.build());

responseObserver.onCompleted();
}

@Override
public void capabilities(CapabilitiesRequest request, StreamObserver<CapabilitiesResponse> responseObserver) {
responseObserver.onNext(
CapabilitiesResponse.newBuilder()
.setSupportsHistoryMode(false)
.addAllDataTypeMappings(Lists.newArrayList(
DataTypeMappingEntry.newBuilder().setFivetranType(DataType.UNSPECIFIED).setMapTo(DestinationType.newBuilder().setName("UNKNOWN").setMapTo(DataType.UNSPECIFIED).build()).build(),
DataTypeMappingEntry.newBuilder().setFivetranType(DataType.BOOLEAN).setMapTo(DestinationType.newBuilder().setName("BOOL").setMapTo(DataType.BOOLEAN).build()).build(),
DataTypeMappingEntry.newBuilder().setFivetranType(DataType.SHORT).setMapTo(DestinationType.newBuilder().setName("SHORT_INTEGER").setMapTo(DataType.SHORT).build()).build(),
DataTypeMappingEntry.newBuilder().setFivetranType(DataType.INT).setMapTo(DestinationType.newBuilder().setName("INTEGER").setMapTo(DataType.INT).build()).build(),
DataTypeMappingEntry.newBuilder().setFivetranType(DataType.LONG).setMapTo(DestinationType.newBuilder().setName("LONG_INTEGER").setMapTo(DataType.LONG).build()).build(),
DataTypeMappingEntry.newBuilder().setFivetranType(DataType.DECIMAL).setMapTo(DestinationType.newBuilder().setName("DECIMAL").setMapTo(DataType.DECIMAL).build()).build(),
DataTypeMappingEntry.newBuilder().setFivetranType(DataType.FLOAT).setMapTo(DestinationType.newBuilder().setName("FLOAT").setMapTo(DataType.FLOAT).build()).build(),
DataTypeMappingEntry.newBuilder().setFivetranType(DataType.DOUBLE).setMapTo(DestinationType.newBuilder().setName("DOUBLE").setMapTo(DataType.DOUBLE).build()).build(),
DataTypeMappingEntry.newBuilder().setFivetranType(DataType.NAIVE_TIME).setMapTo(DestinationType.newBuilder().setName("TIME").setMapTo(DataType.NAIVE_TIME).build()).build(),
DataTypeMappingEntry.newBuilder().setFivetranType(DataType.NAIVE_DATE).setMapTo(DestinationType.newBuilder().setName("DATE").setMapTo(DataType.NAIVE_DATE).build()).build(),
DataTypeMappingEntry.newBuilder().setFivetranType(DataType.NAIVE_DATETIME).setMapTo(DestinationType.newBuilder().setName("DATETIME").setMapTo(DataType.NAIVE_DATETIME).build()).build(),
DataTypeMappingEntry.newBuilder().setFivetranType(DataType.UTC_DATETIME).setMapTo(DestinationType.newBuilder().setName("UTC_DATETIME").setMapTo(DataType.UTC_DATETIME).build()).build(),
DataTypeMappingEntry.newBuilder().setFivetranType(DataType.BINARY).setMapTo(DestinationType.newBuilder().setName("BLOB").setMapTo(DataType.BINARY).build()).build(),
DataTypeMappingEntry.newBuilder().setFivetranType(DataType.XML).setMapTo(DestinationType.newBuilder().setName("XML").setMapTo(DataType.XML).build()).build(),
DataTypeMappingEntry.newBuilder().setFivetranType(DataType.STRING).setMapTo(DestinationType.newBuilder().setName("VARCHAR").setMapTo(DataType.STRING).build()).build(),
DataTypeMappingEntry.newBuilder().setFivetranType(DataType.JSON).setMapTo(DestinationType.newBuilder().setName("OBJECT").setMapTo(DataType.JSON).build()).build()))
.build());

responseObserver.onCompleted();
}

@Override
public void test(TestRequest request, StreamObserver<TestResponse> responseObserver) {
Map<String, String> configuration = request.getConfigurationMap();
Expand Down Expand Up @@ -84,7 +116,8 @@ public void alterTable(AlterTableRequest request, StreamObserver<AlterTableRespo
Map<String, String> configuration = request.getConfigurationMap();

System.out.println("[AlterTable]: " +
request.getSchemaName() + " | " + request.getTable().getName() + " | " + request.getTable().getColumnsList());
request.getSchemaName() + " | " + request.getTableName() + " | " +
request.getChangesList().stream().map(AbstractMessage::toString).collect(Collectors.joining(", ")));
responseObserver.onNext(AlterTableResponse.newBuilder().setSuccess(true).build());
responseObserver.onCompleted();
}
Expand Down

0 comments on commit cfe4134

Please sign in to comment.