IT/Kafka

[Kafka] Apache Kafka 도입기

어린이개발자 2025. 1. 20. 15:55

회사에서 개발 중 수시로 호출하는 API 가 있는데 이를 호출하여 처리하는 수단으로 Kafka 를 고려하게 되었다.

그리고 회사 서비스의 성장을 기대하며 대규모 트래픽에 대비한 목적 역시 갖고 있다.

그래서 이를 도입할 때의 내용을 정리하고자 한다.

 

1. 환경 설정 & 실행

- build.gradle 에 의존성(dependency) 추가 

implementation 'org.springframework.kafka:spring-kafka'

 

- application.properties 에 추가

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.template.default-topic=my-topic
spring.kafka.consumer.group-id=XXXXX

 

- Linux 환경에서 Apache Kafka (v. 2.5.0) 다운로드 후 압축 해제

// 다운로드
wget https://archive.apache.org/dist/kafka/2.5.0/kafka_2
.12-2.5.0.tgz

// 압축 해제
tar xvf kafka_2.12-2.5.0.tgz

 

- AWS EC2 인스턴스에서 일부 파일 설정

  - /kafka_2.12-2.5.0/config/server.properties 

advertised.listeners=PLAINTEXT://localhost:9092

 

- ~/.bashrc 

  - 수정 후 시스템에 적용 위해 source ~/.bashrc 명령어 실행 필요

export KAFKA_HEAP_OPS="-Xmx400m -Xms400m" // 용량 설정 Default 값보다 줄이기 위함

 

- Zookeeper 실행

  - /kafka_2.12-2.5.0/bin/zookeeper-server-start.sh config/zookeeper.properties

- Kafka 실행

  - /kafka_2.12-2.5.0/bin/kafka-server-start.sh config/server.properties

 

2. Configuration 파일 추가 및 설정

- KafkaProducerConfig.java

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, KafkaEntity> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, KafkaEntity> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

 

- KafkaConsumerConfig.java

@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    private JsonDeserializer<KafkaEntity> gcmPushEntityJsonDeserializer() {
        JsonDeserializer<KafkaEntity> deserializer = new JsonDeserializer<>(KafkaEntity.class);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);
        return deserializer;
    }

    private Map<String, Object> consumerFactoryConfig(JsonDeserializer<KafkaEntity> deserializer) {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
        return props;
    }

    @Bean
    public ConsumerFactory<String, KafkaEntity> pushEntityConsumerFactory() {
        JsonDeserializer<KafkaEntity> deserializer = gcmPushEntityJsonDeserializer();
        return new DefaultKafkaConsumerFactory<>(
                consumerFactoryConfig(deserializer),
                new StringDeserializer(),
                deserializer);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaEntity> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, KafkaEntity> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(pushEntityConsumerFactory());
        return factory;
    }

}

 

3. 로직 구성 파일 추가

- KafkaEntity.java

@Data
@NoArgsConstructor
@AllArgsConstructor
public class KafkaEntity {

    private String id;
    private String message;
}

 

- KafkaProducer.java

@Component
@RequiredArgsConstructor
public class KafkaProducer {

    private final KafkaTemplate<String, KafkaEntity> kafkaTemplate;

    @Value("${spring.kafka.template.default-topic}")
    private String topicName;

    public void sendMessage(String topicName, KafkaEntity kafkaEntity) {
        Message<KafkaEntity> message = MessageBuilder
                .withPayload(kafkaEntity)
                .setHeader(KafkaHeaders.TOPIC, topicName)
                .build();

        ListenableFuture<SendResult<String, KafkaEntity>> future =
                kafkaTemplate.send(message);

        future.addCallback(new ListenableFutureCallback<SendResult<String, KafkaEntity>>() {
            @Override
            public void onSuccess(SendResult<String, KafkaEntity> result) {}

            @Override
            public void onFailure(Throwable ex) {}
        });
    }
}

 

- KafkaConsumer.java

  - 토픽 구독 후 비즈니스 로직 처리

@Component
@RequiredArgsConstructor
public class KafkaConsumer extends Workspace {

    @KafkaListener(topics = "my-topic", groupId = "${spring.kafka.consumer.group-id}")
    public void consume(@Payload KafkaEntity kafkaEntity) {
        // To-Do
    }
}