Home kafka consumer 예외 처리와 재시도 전략
Post
Cancel

kafka consumer 예외 처리와 재시도 전략

테스트 환경
spring boot: 2.1.4.RELEASE
jdk: 1.8
spring-kafka: 2.2.5.RELEASE

지금까지 Kafka consumer를 구성할 때 정말 간단한 설정만 하고, 사용했었다. spring-kafka를 이용해 @KafkaListener로 consumer의 기본 속성 값을 사용했다.

회원 정보가 변경되는 kafka consumer를 구성하기 위해 조금 더 구체적인 제어를 할 수 있는 방법을 고민해 보았다.

default kafka consumer

우선 로컬에서 테스트 해본 결과 custom consumer를 설정하려면 기본 consumer와 함께 구성했어야 된다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Bean
public ConsumerFactory<String, String> consumerFactory() {
  Map<String, Object> props = new HashMap<>();
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "default-group-id");
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
  return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
  ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(consumerFactory());
  return factory;
}

위 설정은 consumer의 기본 설정을 가진 default consumer configuration이다. 필수 설정인 bootstrap.server, key.serializer, value.serializer 를 설정했다. offset commit은 기본값인 auto commit이다.

custom kafka consumer

회원 정보를 변경하는 consumer이기 때문에 메세지 읽음 처리를 직접 제어하는 것이 적합하다고 생각했다. 또한 서버나 데이터베이스 연결에 알 수 없는 이유로 메세지 처리가 불가능한 경우 재시도 처리도 고려되어야 한다고 생각했다.

1
2
3
4
5
6
7
8
@Slf4j
@EnableKafka
@Configuration
@RequiredArgsConstructor
@ConditionalOnExpression("'${confluent.consumer.bootstrapServers:}' != ''")
public class AckModeManualKafkaConsumerConfiguration {
...
}

consumer configuration 설정을 위해 @ConditionalOnExpression 를 붙였다. application.yml 에서 spring.kafka의 기본 설정을 사용하지 않고 따로 설정하기 위해 구성했다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Primary
@Bean("ackModeManualKafkaConsumerFactory")
public ConsumerFactory<String, String> ackModeManualConsumerFactory() {
  return new DefaultKafkaConsumerFactory<>(ackModeManualConsumerConfig());
}

@Bean
public Map<String, Object> ackModeManualConsumerConfig() {
  Map<String, Object> props = new HashMap<>();
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1000);
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KafkaConfigurationValues.LATEST);
  props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

  return props;
}

consumer factory를 구성하기 위한 설정값이다. enable.auto.commit 설정은 자동으로 offset commit을 할지에 대한 설정이다. 메세지 offset commit을 직접 제어하기 위해 auto commit을 하지 않도록 했다.

1
2
3
4
5
6
7
8
9
10
11
@Bean("ackModeManualKafkaContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String>
    ackModeManualKafkaListenerContainerFactory() {
  ConcurrentKafkaListenerContainerFactory<String, String> container =
      new ConcurrentKafkaListenerContainerFactory<>();
  container.setConsumerFactory(ackModeManualConsumerFactory());
  container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
  container.setConcurrency(CONCURRENCY);
  container.setRetryTemplate(retryTemplate());
  return container;
}

kafka listener에서 사용할 container factory 설정이다. ack mode를 MANUAL로 설정해 acknowledge를 직접 제어한다. concurrency는 1로 설정했다. 토픽의 파티션은 1개이며, 운영중인 POD 개수도 여러개이므로 1로 설정했다. 만약 3으로 설정하게 되면 consumer 스레드는 3개가 생성되고 POD 개수 X concurrency 개수 만큼의 consumer가 만들어진다.

setRetryTemplate은 재시도에 대한 설정이다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Bean
public RetryTemplate retryTemplate() {
  RetryTemplate retryTemplate = new RetryTemplate();

  FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
  backOffPolicy.setBackOffPeriod(5000);
  retryTemplate.setBackOffPolicy(backOffPolicy);

  SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
  retryPolicy.setMaxAttempts(3);
  retryTemplate.setRetryPolicy(retryPolicy);

  return retryTemplate;
}

재시도 시간은 3초로 설정하고 재시도 횟수 3번으로 설정했다.

retry strategy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Slf4j
@Component
public class KafkaConsumerRetryStrategy {
  public void consumerRetryStrategy(
      ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment, Exception e) {
    addRetryCount(consumerRecord);
    if (shouldRetry(consumerRecord)) {
      throw new DeliveryInformationConsumerException(e.getMessage());
    } else {
      handleFailedRecord(consumerRecord, e);
      acknowledgment.acknowledge();
    }
  }
  ...
}

retry-count를 producer에서 담지 않고 있어 consume한 시점에 레코드 헤더에 retry-count를 설정해 주고 재시도 횟수를 확인한다. 해당 로직을 listen하는 코드와 같이 구성하기에는 길어지기도 하고, 다른 consumer를 사용할 때 동일하게 사용 가능하도록 별도의 component로 구성했다.

consumer가 레코드를 listen한 후 객체 변환, 데이터 저장 등의 동작에서 예외가 발생하면 retry를 시도하게 된다. 레코드에는 retry-count가 없기 때문에 retry-count를 레코드 헤더에 설정하고 재시도를 진행한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
private void addRetryCount(ConsumerRecord<String, String> consumerRecord) {
  Optional<Header> header =
      Optional.ofNullable(consumerRecord.headers().lastHeader(RETRY_COUNT_HEADER));
  if (!header.isPresent()) {
    consumerRecord
        .headers()
        .add(RETRY_COUNT_HEADER, Integer.toString(RETRY_DEFAULT_COUNT).getBytes());
  } else {
    int retryCount = Integer.parseInt(new String(header.get().value()));
    consumerRecord.headers().remove(RETRY_COUNT_HEADER);
    consumerRecord.headers().add(RETRY_COUNT_HEADER, Integer.toString(retryCount + 1).getBytes());
  }
}

레코드에 retry-count를 추가하는 부분은 retry count가 헤더에 없으면 추가하고, 있다면 retry-count를 추가한다.

retry-count에 대한 처리가 끝나면 다음 재시도를 할 수 있는지 확인 후 재시도를 하거나 처리 불가능 판단 후 로그를 찍고 레코드를 acknowledge 처리한다.

1
2
3
4
5
6
7
private boolean shouldRetry(ConsumerRecord<String, String> consumerRecord) {
  int retryValue =
      Integer.parseInt(
          new String(consumerRecord.headers().lastHeader(RETRY_COUNT_HEADER).value()));
  log.info(String.valueOf(retryValue));
  return retryValue < 3;
}

레코드의 헤더 값은 byte[]로 되어 있기 때문에 숫자로 변환 후 확인해야 한다. 재시도 횟수가 3번이 되면 예외로 판단하고 에러 로그를 찍는다. 메세지를 consume 후 처리가 매우 단순하기 때문에 예외가 발생할 일이 크지 않을 것으로 생각되어 3번 시도 후 에러 로그를 찍도록 했다. 정말 특이한 경우가 아니면 발생할 일이 없고 트래픽이 많지 않기 때문에 로그를 통해서 데이터 보정이 가능하기 때문에 위와 같이 처리했다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
@Component
@RequiredArgsConstructor
public class MemberInformationKafkaListener
    implements AcknowledgingMessageListener<String, String> {
  private final KafkaConsumerRetryStrategy kafkaConsumerRetryStrategy;
  private final ObjectMapper objectMapper;

  @Override
  @AckModeManualKafkaListener(topics = "member-topic", groupId = "${confluent.consumer.groupId}")
  public void onMessage(
      ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
    try {
      processing.logic(consumerRecord); // business logic
      acknowledgment.acknowledge();
    } catch (Exception e) {
      kafkaConsumerRetryStrategy.consumerRetryStrategy(consumerRecord, acknowledgment, e);
    }
  }
}

kafka listener 부분이다. AcknowledgingMessageListener를 상속 받아 ack를 직접 제어할 수 있게 구성했다. enable.auto.commit과 함께 사용한다. 레코드를 fetcher로부터 받은 후 로직을 처리하게 되고 비즈니스 로직이 정상적으로 완료되면 acknowledge 처리를 한다. 만약 로직 처리 중 예외가 발생하면 바로 retry 전략을 사용한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@KafkaListener(
    containerFactory = "ackModeManualKafkaContainerFactory"
)
public @interface AckModeManualKafkaListener {
    @AliasFor(
       annotation = KafkaListener.class,
       attribute = "topics"
    )
    String[] topics();

    @AliasFor(
       annotation = KafkaListener.class,
       attribute = "groupId"
    )
    String groupId() default "";

}

@AckModeManualKafkaListener 정의한 kafka listener를 사용하기 위해 annotation을 만들어서 사용했다. KafkaListener 클래스의 topic, groupId 값을 사용하지만, containerFactory를 지정하기 위한 설정이다. @KafkaListenercontainerFactory 설정을 사용해도 되고 annotation으로 만들어서 사용할 수도 있다. annotation으로 만들면 다른 consumer에서도 쉽게 사용할 수 있을 것으로 생각되어 사용했다.

기존의 공통 라이브러리의 설정을 파악하고 consumer를 직접 제어할 수 있도록 하기 위해서는 설정 값과 재시도 전략 등 고민해 볼 것이 있어 흥미로웠다. 단순히 auto commit을 사용해 처리하면 정말 쉬웠겠지만, 운영 환경에서는 트래픽이 적고 서비스 예외가 발생할 일이 적어도 예외를 고려하는 것이 좋다고 생각했다.

This post is licensed under CC BY 4.0 by the author.

[Effective Java] 예외를 예외답게 사용하기

[Effective Java] Method