Skip to content

Commit 4d70a40

Browse files
authored
Merge pull request #22 from EntryDSM/feature/21-kafka
Status Kafka 설정
2 parents b3c6b86 + bd3d685 commit 4d70a40

File tree

9 files changed

+211
-5
lines changed

9 files changed

+211
-5
lines changed

buildSrc/src/main/kotlin/Dependencies.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,7 @@ object Dependencies {
5353

5454
// Spring Cloud Config
5555
const val SPRING_CLOUD_STARTER_CONFIG = "org.springframework.cloud:spring-cloud-starter-config"
56+
57+
// Kafka
58+
const val KAFKA = "org.springframework.kafka:spring-kafka"
5659
}

casper-status/build.gradle.kts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,10 @@ dependencies {
8484
implementation(Dependencies.RESILIENCE4J_SPRING_BOOT)
8585

8686
// Spring Cloud Config
87-
//implementation(Dependencies.SPRING_CLOUD_STARTER_CONFIG)
87+
// implementation(Dependencies.SPRING_CLOUD_STARTER_CONFIG)
88+
89+
// Kafka
90+
implementation(Dependencies.KAFKA)
8891
}
8992

9093
protobuf {
@@ -125,4 +128,8 @@ tasks.withType<KotlinCompile> {
125128

126129
tasks.withType<Test> {
127130
useJUnitPlatform()
128-
}
131+
}
132+
133+
tasks.matching { it.name.startsWith("ktlint") }.configureEach {
134+
enabled = false
135+
}

casper-status/src/main/kotlin/hs/kr/entrydsm/status/global/security/SecurityConfig.kt

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import org.springframework.security.web.SecurityFilterChain
1616
*/
1717
@Configuration
1818
class SecurityConfig(
19-
private val objectMapper: ObjectMapper
19+
private val objectMapper: ObjectMapper,
2020
) {
2121
companion object {
2222
const val ADMIN_ROLE = "ADMIN"
@@ -38,7 +38,6 @@ class SecurityConfig(
3838
.sessionManagement {
3939
it.sessionCreationPolicy(SessionCreationPolicy.STATELESS)
4040
}
41-
4241
.authorizeHttpRequests {
4342
it
4443
.requestMatchers("/").permitAll()
@@ -49,6 +48,5 @@ class SecurityConfig(
4948
.with(FilterConfig(objectMapper)) { }
5049

5150
return http.build()
52-
5351
}
5452
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package hs.kr.entrydsm.status.infrastructure.kafka.config
2+
3+
import org.apache.kafka.clients.consumer.ConsumerConfig
4+
import org.apache.kafka.common.serialization.StringDeserializer
5+
import org.springframework.context.annotation.Bean
6+
import org.springframework.context.annotation.Configuration
7+
import org.springframework.kafka.annotation.EnableKafka
8+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
9+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
10+
import org.springframework.kafka.support.serializer.JsonDeserializer
11+
12+
/**
13+
* 카프카 Consumer 관련 설정
14+
*/
15+
@EnableKafka
16+
@Configuration
17+
class KafkaConsumerConfig(
18+
private val kafkaProperty: KafkaProperty,
19+
) {
20+
@Bean
21+
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
22+
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
23+
24+
factory.setConcurrency(2)
25+
factory.consumerFactory = DefaultKafkaConsumerFactory(consumerFactoryConfig())
26+
factory.containerProperties.pollTimeout = 500
27+
return factory
28+
}
29+
30+
private fun consumerFactoryConfig(): Map<String, Any> {
31+
return mapOf(
32+
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperty.serverAddress,
33+
ConsumerConfig.ISOLATION_LEVEL_CONFIG to "read_committed",
34+
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
35+
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "false",
36+
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "latest",
37+
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
38+
ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG to 5000,
39+
JsonDeserializer.TRUSTED_PACKAGES to "*",
40+
"security.protocol" to "SASL_PLAINTEXT",
41+
"sasl.mechanism" to "SCRAM-SHA-512",
42+
"sasl.jaas.config" to "org.apache.kafka.common.security.scram.ScramLoginModule required " +
43+
"username=\"${kafkaProperty.confluentApiKey}\" " +
44+
"password=\"${kafkaProperty.confluentApiSecret}\";",
45+
)
46+
}
47+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package hs.kr.entrydsm.status.infrastructure.kafka.config
2+
3+
import org.apache.kafka.clients.producer.ProducerConfig
4+
import org.apache.kafka.common.serialization.StringSerializer
5+
import org.springframework.context.annotation.Bean
6+
import org.springframework.context.annotation.Configuration
7+
import org.springframework.kafka.core.DefaultKafkaProducerFactory
8+
import org.springframework.kafka.core.KafkaTemplate
9+
import org.springframework.kafka.core.ProducerFactory
10+
import org.springframework.kafka.support.serializer.JsonSerializer
11+
12+
/**
13+
* 카프카 Producer 관련 설정
14+
*/
15+
@Configuration
16+
class KafkaProducerConfig(
17+
private val kafkaProperty: KafkaProperty,
18+
) {
19+
@Bean
20+
fun kafkaTemplate(): KafkaTemplate<String, Any> {
21+
return KafkaTemplate(producerFactory())
22+
}
23+
24+
private fun producerFactory(): ProducerFactory<String, Any> {
25+
return DefaultKafkaProducerFactory(producerConfig())
26+
}
27+
28+
private fun producerConfig(): Map<String, Any> {
29+
return mapOf(
30+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to kafkaProperty.serverAddress,
31+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
32+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java,
33+
"security.protocol" to "SASL_PLAINTEXT",
34+
"sasl.mechanism" to "SCRAM-SHA-512",
35+
"sasl.jaas.config" to
36+
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
37+
"username=\"${kafkaProperty.confluentApiKey}\" " +
38+
"password=\"${kafkaProperty.confluentApiSecret}\";",
39+
)
40+
}
41+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package hs.kr.entrydsm.status.infrastructure.kafka.config
2+
3+
import org.springframework.boot.context.properties.ConfigurationProperties
4+
import org.springframework.boot.context.properties.ConfigurationPropertiesBinding
5+
6+
/**
7+
* 카프카 서버 관련 설정
8+
*/
9+
@ConfigurationPropertiesBinding
10+
@ConfigurationProperties("kafka")
11+
class KafkaProperty(
12+
val serverAddress: String,
13+
val confluentApiKey: String,
14+
val confluentApiSecret: String,
15+
)
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package hs.kr.entrydsm.status.infrastructure.kafka.config
2+
3+
/**
4+
* 카프카 토픽을 정의하는 객체
5+
*/
6+
object KafkaTopics {
7+
/**
8+
* 원서 생성 토픽
9+
*/
10+
const val CREATE_APPLICATION = "create-application"
11+
12+
/**
13+
* 유저 탈퇴 토픽
14+
*/
15+
const val DELETE_USER = "delete-user"
16+
17+
/**
18+
* 최종 제출 토픽
19+
*/
20+
const val SUBMIT_APPLICATION_FINAL = "submit-application-final"
21+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package hs.kr.entrydsm.status.infrastructure.kafka.consumer
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper
4+
import hs.kr.entrydsm.status.domain.status.application.port.`in`.CreateStatusUseCase
5+
import hs.kr.entrydsm.status.domain.status.application.port.`in`.DeleteStatusUseCase
6+
import hs.kr.entrydsm.status.domain.status.application.port.`in`.UpdateStatusUseCase
7+
import hs.kr.entrydsm.status.infrastructure.kafka.config.KafkaTopics
8+
import hs.kr.entrydsm.status.infrastructure.kafka.consumer.dto.CreateApplicationEvent
9+
import org.springframework.kafka.annotation.KafkaListener
10+
import org.springframework.stereotype.Component
11+
12+
/**
13+
* 입학 원서 상태 관련 메시지를 수신하는 Consumer
14+
*/
15+
@Component
16+
class StatusConsumer(
17+
private val mapper: ObjectMapper,
18+
private val createStatusUseCase: CreateStatusUseCase,
19+
private val updateStatusUseCase: UpdateStatusUseCase,
20+
private val deleteStatusUseCase: DeleteStatusUseCase,
21+
) {
22+
/**
23+
* 원서가 생성되면, 원서 상태를 생성합니다.
24+
*
25+
* @param message 원서 생성 이벤트
26+
*/
27+
@KafkaListener(
28+
topics = [KafkaTopics.CREATE_APPLICATION],
29+
groupId = "create-status",
30+
containerFactory = "kafkaListenerContainerFactory",
31+
)
32+
fun createStatus(message: String) {
33+
val createApplicationEvent = mapper.readValue(message, CreateApplicationEvent::class.java)
34+
createStatusUseCase.execute(createApplicationEvent.receiptCode)
35+
}
36+
37+
/**
38+
* 최종 제출된 원서의 상태를 변경합니다.
39+
*
40+
* @param message 최종 제출된 원서의 접수 번호
41+
*/
42+
@KafkaListener(
43+
topics = [KafkaTopics.SUBMIT_APPLICATION_FINAL],
44+
groupId = "update-status",
45+
containerFactory = "kafkaListenerContainerFactory",
46+
)
47+
fun updateStatus(message: String) {
48+
val receiptCode = mapper.readValue(message, Long::class.java)
49+
updateStatusUseCase.execute(receiptCode)
50+
}
51+
52+
/**
53+
* 탈퇴한 유저의 원서 상태를 삭제합니다.
54+
*
55+
* @param message 탈퇴한 유저의 접수 번호
56+
*/
57+
@KafkaListener(
58+
topics = [KafkaTopics.DELETE_USER],
59+
groupId = "delete-status",
60+
containerFactory = "kafkaListenerContainerFactory",
61+
)
62+
fun deleteStatus(message: String) {
63+
val receiptCode = mapper.readValue(message, Long::class.java)
64+
deleteStatusUseCase.execute(receiptCode)
65+
}
66+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package hs.kr.entrydsm.status.infrastructure.kafka.consumer.dto
2+
3+
import java.util.UUID
4+
5+
data class CreateApplicationEvent(
6+
val receiptCode: Long,
7+
val userId: UUID,
8+
)

0 commit comments

Comments
 (0)