-
Notifications
You must be signed in to change notification settings - Fork 181
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #2362 from ozangunalp/connector_contributor_guide
Connector contributor guide
- Loading branch information
Showing
64 changed files
with
3,187 additions
and
126 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
391 changes: 391 additions & 0 deletions
391
documentation/src/main/docs/concepts/contributing-connectors.md
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
package connectors; | ||
|
||
import java.util.concurrent.CompletionStage; | ||
|
||
import connectors.api.BrokerClient; | ||
import io.smallrye.mutiny.Uni; | ||
|
||
public class MyAckHandler { | ||
|
||
private final BrokerClient client; | ||
|
||
static MyAckHandler create(BrokerClient client) { | ||
return new MyAckHandler(client); | ||
} | ||
|
||
public MyAckHandler(BrokerClient client) { | ||
this.client = client; | ||
} | ||
|
||
public CompletionStage<Void> handle(MyMessage<?> msg) { | ||
return Uni.createFrom().completionStage(client.ack(msg.getConsumedMessage())) | ||
.emitOn(msg::runOnMessageContext) | ||
.subscribeAsCompletionStage(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
package connectors; | ||
|
||
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING; | ||
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.INCOMING_AND_OUTGOING; | ||
import static io.smallrye.reactive.messaging.annotations.ConnectorAttribute.Direction.OUTGOING; | ||
|
||
import java.util.List; | ||
import java.util.concurrent.CopyOnWriteArrayList; | ||
import java.util.concurrent.Flow; | ||
|
||
import jakarta.annotation.PostConstruct; | ||
import jakarta.enterprise.context.ApplicationScoped; | ||
import jakarta.inject.Inject; | ||
|
||
import org.eclipse.microprofile.config.Config; | ||
import org.eclipse.microprofile.reactive.messaging.Message; | ||
import org.eclipse.microprofile.reactive.messaging.spi.Connector; | ||
|
||
import connectors.api.BrokerClient; | ||
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute; | ||
import io.smallrye.reactive.messaging.connector.InboundConnector; | ||
import io.smallrye.reactive.messaging.connector.OutboundConnector; | ||
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; | ||
import io.vertx.mutiny.core.Vertx; | ||
|
||
@ApplicationScoped | ||
@Connector(MyConnector.CONNECTOR_NAME) | ||
@ConnectorAttribute(name = "client-id", type = "string", direction = INCOMING_AND_OUTGOING, description = "The client id ", mandatory = true) | ||
@ConnectorAttribute(name = "buffer-size", type = "int", direction = INCOMING, description = "The size buffer of incoming messages waiting to be processed", defaultValue = "128") | ||
@ConnectorAttribute(name = "topic", type = "string", direction = OUTGOING, description = "The default topic to send the messages, defaults to channel name if not set") | ||
@ConnectorAttribute(name = "maxPendingMessages", type = "int", direction = OUTGOING, description = "The maximum size of a queue holding pending messages", defaultValue = "1000") | ||
@ConnectorAttribute(name = "waitForWriteCompletion", type = "boolean", direction = OUTGOING, description = "Whether the outgoing channel waits for the write completion", defaultValue = "true") | ||
public class MyConnector implements InboundConnector, OutboundConnector { | ||
|
||
public static final String CONNECTOR_NAME = "smallrye-my-connector"; | ||
|
||
@Inject | ||
ExecutionHolder executionHolder; | ||
|
||
Vertx vertx; | ||
|
||
List<MyIncomingChannel> incomingChannels = new CopyOnWriteArrayList<>(); | ||
List<MyOutgoingChannel> outgoingChannels = new CopyOnWriteArrayList<>(); | ||
|
||
@PostConstruct | ||
void init() { | ||
this.vertx = executionHolder.vertx(); | ||
} | ||
|
||
@Override | ||
public Flow.Publisher<? extends Message<?>> getPublisher(Config config) { | ||
MyConnectorIncomingConfiguration ic = new MyConnectorIncomingConfiguration(config); | ||
String channelName = ic.getChannel(); | ||
String clientId = ic.getClientId(); | ||
int bufferSize = ic.getBufferSize(); | ||
// ... | ||
BrokerClient client = BrokerClient.create(clientId); | ||
MyIncomingChannel channel = new MyIncomingChannel(vertx, ic, client); | ||
incomingChannels.add(channel); | ||
return channel.getStream(); | ||
} | ||
|
||
@Override | ||
public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) { | ||
MyConnectorOutgoingConfiguration oc = new MyConnectorOutgoingConfiguration(config); | ||
String channelName = oc.getChannel(); | ||
String clientId = oc.getClientId(); | ||
int pendingMessages = oc.getMaxPendingMessages(); | ||
// ... | ||
BrokerClient client = BrokerClient.create(clientId); | ||
MyOutgoingChannel channel = new MyOutgoingChannel(vertx, oc, client); | ||
outgoingChannels.add(channel); | ||
return channel.getSubscriber(); | ||
} | ||
} |
106 changes: 106 additions & 0 deletions
106
documentation/src/main/java/connectors/MyConnectorWithPartials.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
package connectors; | ||
|
||
import java.util.List; | ||
import java.util.concurrent.CopyOnWriteArrayList; | ||
import java.util.concurrent.Flow; | ||
|
||
import jakarta.annotation.PostConstruct; | ||
import jakarta.enterprise.context.ApplicationScoped; | ||
import jakarta.inject.Inject; | ||
|
||
import org.eclipse.microprofile.config.Config; | ||
import org.eclipse.microprofile.reactive.messaging.Message; | ||
import org.eclipse.microprofile.reactive.messaging.spi.Connector; | ||
|
||
import connectors.api.BrokerClient; | ||
import io.smallrye.reactive.messaging.connector.InboundConnector; | ||
import io.smallrye.reactive.messaging.connector.OutboundConnector; | ||
import io.smallrye.reactive.messaging.health.HealthReport; | ||
import io.smallrye.reactive.messaging.health.HealthReporter; | ||
import io.smallrye.reactive.messaging.providers.connectors.ExecutionHolder; | ||
import io.vertx.mutiny.core.Vertx; | ||
|
||
// <health-report> | ||
@ApplicationScoped | ||
@Connector(MyConnectorWithPartials.CONNECTOR_NAME) | ||
public class MyConnectorWithPartials implements InboundConnector, OutboundConnector, HealthReporter { | ||
|
||
public static final String CONNECTOR_NAME = "smallrye-my-connector"; | ||
|
||
List<MyIncomingChannel> incomingChannels = new CopyOnWriteArrayList<>(); | ||
List<MyOutgoingChannel> outgoingChannels = new CopyOnWriteArrayList<>(); | ||
|
||
@Override | ||
public HealthReport getReadiness() { | ||
HealthReport.HealthReportBuilder builder = HealthReport.builder(); | ||
for (MyIncomingChannel channel : incomingChannels) { | ||
builder.add(channel.getChannel(), true); | ||
} | ||
for (MyOutgoingChannel channel : outgoingChannels) { | ||
builder.add(channel.getChannel(), true); | ||
} | ||
return builder.build(); | ||
} | ||
|
||
@Override | ||
public HealthReport getLiveness() { | ||
HealthReport.HealthReportBuilder builder = HealthReport.builder(); | ||
for (MyIncomingChannel channel : incomingChannels) { | ||
builder.add(channel.getChannel(), true); | ||
} | ||
for (MyOutgoingChannel channel : outgoingChannels) { | ||
builder.add(channel.getChannel(), true); | ||
} | ||
return builder.build(); | ||
} | ||
|
||
@Override | ||
public HealthReport getStartup() { | ||
HealthReport.HealthReportBuilder builder = HealthReport.builder(); | ||
for (MyIncomingChannel channel : incomingChannels) { | ||
builder.add(channel.getChannel(), true); | ||
} | ||
for (MyOutgoingChannel channel : outgoingChannels) { | ||
builder.add(channel.getChannel(), true); | ||
} | ||
return builder.build(); | ||
} | ||
|
||
// </health-report> | ||
|
||
@Inject | ||
ExecutionHolder executionHolder; | ||
|
||
Vertx vertx; | ||
|
||
@PostConstruct | ||
void init() { | ||
this.vertx = executionHolder.vertx(); | ||
} | ||
|
||
@Override | ||
public Flow.Publisher<? extends Message<?>> getPublisher(Config config) { | ||
MyConnectorIncomingConfiguration ic = new MyConnectorIncomingConfiguration(config); | ||
String channelName = ic.getChannel(); | ||
String clientId = ic.getClientId(); | ||
int bufferSize = ic.getBufferSize(); | ||
// ... | ||
BrokerClient client = BrokerClient.create(clientId); | ||
MyIncomingChannel channel = new MyIncomingChannel(vertx, ic, client); | ||
incomingChannels.add(channel); | ||
return channel.getStream(); | ||
} | ||
|
||
@Override | ||
public Flow.Subscriber<? extends Message<?>> getSubscriber(Config config) { | ||
MyConnectorOutgoingConfiguration oc = new MyConnectorOutgoingConfiguration(config); | ||
String channelName = oc.getChannel(); | ||
String clientId = oc.getClientId(); | ||
int pendingMessages = oc.getMaxPendingMessages(); | ||
// ... | ||
BrokerClient client = BrokerClient.create(clientId); | ||
MyOutgoingChannel channel = new MyOutgoingChannel(vertx, oc, client); | ||
outgoingChannels.add(channel); | ||
return channel.getSubscriber(); | ||
} | ||
} |
27 changes: 27 additions & 0 deletions
27
documentation/src/main/java/connectors/MyFailureHandler.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package connectors; | ||
|
||
import java.util.concurrent.CompletionStage; | ||
|
||
import org.eclipse.microprofile.reactive.messaging.Metadata; | ||
|
||
import connectors.api.BrokerClient; | ||
import io.smallrye.mutiny.Uni; | ||
|
||
public class MyFailureHandler { | ||
|
||
private final BrokerClient client; | ||
|
||
static MyFailureHandler create(BrokerClient client) { | ||
return new MyFailureHandler(client); | ||
} | ||
|
||
public MyFailureHandler(BrokerClient client) { | ||
this.client = client; | ||
} | ||
|
||
public CompletionStage<Void> handle(MyMessage<?> msg, Throwable reason, Metadata metadata) { | ||
return Uni.createFrom().completionStage(() -> client.reject(msg.getConsumedMessage(), reason.getMessage())) | ||
.emitOn(msg::runOnMessageContext) | ||
.subscribeAsCompletionStage(); | ||
} | ||
} |
Oops, something went wrong.