회사에서 개발 중 수시로 호출하는 API 가 있는데 이를 호출하여 처리하는 수단으로 Kafka 를 고려하게 되었다.
그리고 회사 서비스의 성장을 기대하며 대규모 트래픽에 대비한 목적 역시 갖고 있다.
그래서 이를 도입할 때의 내용을 정리하고자 한다.
1. 환경 설정 & 실행
- build.gradle 에 의존성(dependency) 추가
implementation 'org.springframework.kafka:spring-kafka'
- application.properties 에 추가
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.template.default-topic=my-topic
spring.kafka.consumer.group-id=XXXXX
- Linux 환경에서 Apache Kafka (v. 2.5.0) 다운로드 후 압축 해제
// 다운로드
wget https://archive.apache.org/dist/kafka/2.5.0/kafka_2
.12-2.5.0.tgz
// 압축 해제
tar xvf kafka_2.12-2.5.0.tgz
- AWS EC2 인스턴스에서 일부 파일 설정
- /kafka_2.12-2.5.0/config/server.properties
advertised.listeners=PLAINTEXT://localhost:9092
- ~/.bashrc
- 수정 후 시스템에 적용 위해 source ~/.bashrc 명령어 실행 필요
export KAFKA_HEAP_OPS="-Xmx400m -Xms400m" // 용량 설정 Default 값보다 줄이기 위함
- Zookeeper 실행
- /kafka_2.12-2.5.0/bin/zookeeper-server-start.sh config/zookeeper.properties
- Kafka 실행
- /kafka_2.12-2.5.0/bin/kafka-server-start.sh config/server.properties
2. Configuration 파일 추가 및 설정
- KafkaProducerConfig.java
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, KafkaEntity> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, KafkaEntity> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
- KafkaConsumerConfig.java
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
private JsonDeserializer<KafkaEntity> gcmPushEntityJsonDeserializer() {
JsonDeserializer<KafkaEntity> deserializer = new JsonDeserializer<>(KafkaEntity.class);
deserializer.setRemoveTypeHeaders(false);
deserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(true);
return deserializer;
}
private Map<String, Object> consumerFactoryConfig(JsonDeserializer<KafkaEntity> deserializer) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
return props;
}
@Bean
public ConsumerFactory<String, KafkaEntity> pushEntityConsumerFactory() {
JsonDeserializer<KafkaEntity> deserializer = gcmPushEntityJsonDeserializer();
return new DefaultKafkaConsumerFactory<>(
consumerFactoryConfig(deserializer),
new StringDeserializer(),
deserializer);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaEntity> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, KafkaEntity> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(pushEntityConsumerFactory());
return factory;
}
}
3. 로직 구성 파일 추가
- KafkaEntity.java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class KafkaEntity {
private String id;
private String message;
}
- KafkaProducer.java
@Component
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, KafkaEntity> kafkaTemplate;
@Value("${spring.kafka.template.default-topic}")
private String topicName;
public void sendMessage(String topicName, KafkaEntity kafkaEntity) {
Message<KafkaEntity> message = MessageBuilder
.withPayload(kafkaEntity)
.setHeader(KafkaHeaders.TOPIC, topicName)
.build();
ListenableFuture<SendResult<String, KafkaEntity>> future =
kafkaTemplate.send(message);
future.addCallback(new ListenableFutureCallback<SendResult<String, KafkaEntity>>() {
@Override
public void onSuccess(SendResult<String, KafkaEntity> result) {}
@Override
public void onFailure(Throwable ex) {}
});
}
}
- KafkaConsumer.java
- 토픽 구독 후 비즈니스 로직 처리
@Component
@RequiredArgsConstructor
public class KafkaConsumer extends Workspace {
@KafkaListener(topics = "my-topic", groupId = "${spring.kafka.consumer.group-id}")
public void consume(@Payload KafkaEntity kafkaEntity) {
// To-Do
}
}