Skip to content

Commit

Permalink
Merge pull request #4 from musabbozkurt/adding-spring-cloud-stream-bi…
Browse files Browse the repository at this point in the history
…nder-kafka

spring-cloud-stream-binder-kafka is added
  • Loading branch information
musabbozkurt authored Jul 27, 2024
2 parents f664b97 + a07518b commit 47b5bca
Show file tree
Hide file tree
Showing 21 changed files with 512 additions and 4 deletions.
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@

### `docker-compose` contains the followings

* PostgreSQL DB connection details
* `POSTGRES_USER: postgres`
* `POSTGRES_PASSWORD: postgres`
* `Port: 5432`
* Zipkin: http://localhost:9411/
* Prometheus: http://localhost:9090/graph
* Grafana: http://localhost:3000/
Expand All @@ -39,9 +43,6 @@
* Select Prometheus
* Prometheus Url: http://prometheus:9090/
* Save & test
* PostgreSQL DB connection details
* `POSTGRES_USER: postgres`
* `POSTGRES_PASSWORD: postgres`
* `Port: 5432`
* Kafka-UI: http://localhost:9091/

-----
47 changes: 47 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,50 @@ services:
container_name: redis
ports:
- "6379:6379"

zookeeper:
image: "docker.io/bitnami/zookeeper:3"
restart: always
ports:
- "2181:2181"
volumes:
- "zookeeper_data:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes

kafka:
image: "docker.io/bitnami/kafka:2-debian-10"
restart: always
ports:
- "9092:9092"
expose:
- "9093"
volumes:
- "kafka_data:/bitnami"
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_ADVERTISED_LISTENERS=INSIDE://kafka:9093,OUTSIDE://localhost:9092
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
- KAFKA_LISTENERS=INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
- KAFKA_INTER_BROKER_LISTENER_NAME=INSIDE
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
depends_on:
- zookeeper

kafka-ui:
image: provectuslabs/kafka-ui
restart: always
container_name: kafka-ui
ports:
- "9091:8080"
environment:
- KAFKA_CLUSTERS_0_NAME=local
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9093
- KAFKA_CLUSTERS_0_ZOOKEEPER=localhost:2181

volumes:
zookeeper_data:
driver: local
kafka_data:
driver: local
24 changes: 24 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
<dockerfile-maven-plugin.version>1.4.13</dockerfile-maven-plugin.version>
<moneta.version>1.4</moneta.version>
<jacoco-maven-plugin.version>0.8.12</jacoco-maven-plugin.version>
<spring-cloud-dependencies.version>2023.0.0</spring-cloud-dependencies.version>
<!--suppress UnresolvedMavenProperty -->
<registry>${env.CI_REGISTRY}/${env.CI_REGION}/${project.artifactId}</registry>
</properties>
Expand Down Expand Up @@ -146,6 +147,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-binder</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
Expand Down Expand Up @@ -219,8 +226,25 @@
<type>pom</type>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud-dependencies.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<pluginManagement>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.mb.inventorymanagementservice.queue;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class QueueChannels {

public static final String INTERNAL_EVENT_PRODUCER = "internalEventProducer-in-0";
public static final String USER_CREATED_EVENT_PRODUCER = "userCreatedEvent-in-0";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.mb.inventorymanagementservice.queue.consumer;

import com.mb.inventorymanagementservice.queue.event.InternalEvent;
import com.mb.inventorymanagementservice.queue.event.UserCreatedEvent;
import com.mb.inventorymanagementservice.utils.Constants;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;

import java.util.function.Consumer;
import java.util.function.Function;

@Slf4j
@Configuration
@RequiredArgsConstructor
public class ConsumerConfig {

private final ConsumerStrategyExecutor consumerStrategyExecutor;

@Bean
public Function<Flux<Message<InternalEvent>>, Flux<Message<InternalEvent>>> internalEventProducer() {
return internalEvent -> internalEvent
.map(internalEventMessage -> {
InternalEvent internalEventMessagePayload = internalEventMessage.getPayload();
log.info("Received an internal event. internalEventProducer - event: {}, eventType: {}, eventTypeHeader: {}",
internalEventMessagePayload,
internalEventMessagePayload.getEventType(),
internalEventMessage.getHeaders().get(Constants.EVENT_TYPE_HEADER_KEY));
return internalEventMessage;
});
}

@Bean
public Consumer<Message<InternalEvent>> internalEventConsumer() {
return message -> consumerStrategyExecutor.dispatchEvent(message.getPayload(), message.getHeaders());
}

@Bean
public Consumer<Message<UserCreatedEvent>> userCreatedEvent() {
return message -> consumerStrategyExecutor.dispatchEvent(message.getPayload(), message.getHeaders());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.mb.inventorymanagementservice.queue.consumer;

import com.mb.inventorymanagementservice.queue.event.Event;

public interface ConsumerStrategy {

void execute(Event event);

boolean canExecute(Event event);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.mb.inventorymanagementservice.queue.consumer;

import com.mb.inventorymanagementservice.queue.event.Event;
import com.mb.inventorymanagementservice.utils.Constants;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;

import java.util.List;

@Slf4j
@Component
@RequiredArgsConstructor
public class ConsumerStrategyExecutor {

private final List<ConsumerStrategy> strategies;

public void dispatchEvent(Event event, MessageHeaders messageHeaders) {
log.info("Received an event. dispatchEvent - event: {}, eventType: {}, eventTypeHeader: {}", event, event.getEventType(), messageHeaders.get(Constants.EVENT_TYPE_HEADER_KEY));
for (ConsumerStrategy strategy : strategies) {
if (strategy.canExecute(event)) {
strategy.execute(event);
return;
}
}
log.warn("No suitable strategy found. dispatchEvent - event: {}", event);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.mb.inventorymanagementservice.queue.consumer.internal;

import com.mb.inventorymanagementservice.queue.consumer.ConsumerStrategy;
import com.mb.inventorymanagementservice.queue.event.Event;
import com.mb.inventorymanagementservice.queue.event.InventoryManagementServiceEventType;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@AllArgsConstructor
public class InternalEventConsumerStrategyImpl implements ConsumerStrategy {

@Override
public void execute(Event event) {
log.info("Received an internal event. execute - event: {}", event);
}

@Override
public boolean canExecute(Event event) {
return InventoryManagementServiceEventType.INTERNAL_EVENT.equals(event.getEventType());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.mb.inventorymanagementservice.queue.consumer.internal;

import com.mb.inventorymanagementservice.queue.consumer.ConsumerStrategy;
import com.mb.inventorymanagementservice.queue.event.Event;
import com.mb.inventorymanagementservice.queue.event.InventoryManagementServiceEventType;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@AllArgsConstructor
public class UserCreatedEventConsumerStrategyImpl implements ConsumerStrategy {

@Override
public void execute(Event event) {
log.info("Received a user created event. execute - event: {}", event);
}

@Override
public boolean canExecute(Event event) {
return InventoryManagementServiceEventType.USER_CREATED_EVENT.equals(event.getEventType());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.mb.inventorymanagementservice.queue.event;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.SuperBuilder;

import java.util.UUID;

@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
public class BaseEventDto {

@Builder.Default
private UUID id = UUID.randomUUID();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.mb.inventorymanagementservice.queue.event;

public interface Event {

EventType getEventType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.mb.inventorymanagementservice.queue.event;

public interface EventType {

String name();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.mb.inventorymanagementservice.queue.event;

import lombok.*;
import lombok.experimental.SuperBuilder;

import java.util.UUID;

@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
public class InternalEvent extends BaseEventDto implements Event {

private UUID randomId;

@Override
public EventType getEventType() {
return InventoryManagementServiceEventType.INTERNAL_EVENT;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package com.mb.inventorymanagementservice.queue.event;

public enum InventoryManagementServiceEventType implements EventType {
INTERNAL_EVENT,
USER_CREATED_EVENT
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.mb.inventorymanagementservice.queue.event;

import com.mb.inventorymanagementservice.data.entity.User;
import lombok.*;
import lombok.experimental.SuperBuilder;

@Data
@SuperBuilder
@NoArgsConstructor
@AllArgsConstructor
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
public class UserCreatedEvent extends BaseEventDto implements Event {

private User user;

@Override
public EventType getEventType() {
return InventoryManagementServiceEventType.USER_CREATED_EVENT;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.mb.inventorymanagementservice.queue.producer;

import com.mb.inventorymanagementservice.queue.event.Event;

public interface EventProducer {

void publishEvent(String bindingName, Event event);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.mb.inventorymanagementservice.queue.producer.impl;

import com.mb.inventorymanagementservice.queue.event.Event;
import com.mb.inventorymanagementservice.queue.producer.EventProducer;
import com.mb.inventorymanagementservice.utils.Constants;
import com.mb.inventorymanagementservice.utils.TransactionUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RequiredArgsConstructor
public class EventProducerImpl implements EventProducer {

private final StreamBridge streamBridge;

@Override
public void publishEvent(String bindingName, Event event) {
Message<Event> message = MessageBuilder
.withPayload(event)
.setHeader(Constants.EVENT_TYPE_HEADER_KEY, event.getEventType())
.build();

TransactionUtils.onCommit(() -> {
log.info("Publishing event in event producer. publishEvent - event: {}", event);
streamBridge.send(bindingName, message);
});
}
}
Loading

0 comments on commit 47b5bca

Please sign in to comment.