Home kafka의 동작을 알아보자
Post
Cancel

kafka의 동작을 알아보자

업무 중 kafka consumer를 사용해서 데이터를 처리하는 부분이 필요했다. 팀원들과의 고민은 consumer 구현 위치를 어떤 repository에 둘 것인가에 대한 것이었다. 여러 선택지와 각각의 장단점을 고려해보면서 이번 기회에 kafka를 제대로 정리하면 좋겠다고 생각했다.

Kafka의 구조

kafka1 전체적인 kafka의 구조이다. producer, consumer, source connector, sink connector, streams로 구성되어 있다.

Broker의 역할

Kafka cluster는 3개 이상의 브로커로 구성된다. 여러 브로커들은 producer에서 전달받은 메세지를 처리할 수 있다.

controller 역할

클러스터에 존재하는 브로커들 중 하나의 브로커가 컨트롤러 역할을 한다. 다른 브로커들의 브로커의 상태를 확인하고 브로커의 리더 파티션을 재분배하는 역할을 한다. 컨트롤러 역할을 하는 브로커에 장애가 발생하면 다른 브로커가 컨트롤러 역할을 한다.

데이터 삭제

kafka는 consumer가 토픽에서 데이터를 가져가서 처리를 해도 데이터가 삭제되지 않는다. 브로커만이 데이터 삭제가 가능하다. 하지만, 특정 데이터를 선택해서 삭제할 수는 없다.

컨슈머 오프셋 저장

consumer 그룹은 토픽의 특정 파티션에서 데이터를 처리하고 파티션의 어느 레코드까지 가져갔는지 확인하기 위한 오프셋을 커밋한다. 저장된 오프셋을 기준으로 다음 레코드 데이터를 처리한다.

그룹 코디네이터

코디네이터는 consumer 그룹의 상태를 체크하고 파티션을 consumer와 매칭되도록 분배하는 역할을 한다. consumer가 그룹에서 빠지게 되면 매칭되지 않은 파티션을 정상 동작하는 consumer로 재할당한다. consumer 재할당을 rebalancing이라고 한다.

replication

데이터 복제는 kafka를 장애 허용 시스템(fault tolerant)으로 동작하게 만들어준다. 브로커에 장애가 발생하게 되어도 데이터를 유실하지 않고 사용할 수 있게 한다. producer가 데이터를 발행할 때 파티션 단위로 복제가 이뤄진다.

Producer

kafka2 producer 구조는 위와 같다. producer record는 전송되는 레코드의 정보와 키, 값을 가지고 있다. partitioner는 producer record에 담겨 있는 partition 정보에 따라 지정된 partition으로 전송한다. Accumulator는 배치로 묶어 전송할 데이터를 모으는 버퍼이다. producer에 사용되는 속성을 알아본다.

  • bootstrap.servers: 브로커 정보를 세팅한다. 여러 브로커를 설정할 수 있다.
  • key.serializer: 레코드 메세지 키를 직렬화하는 클래스를 지정한다.
  • value.serializer: 레코드에서 메세지 값을 직렬화하는 클래스를 지정한다.
  • acks: producer가 전송한 레코드에 대해 브로커에 잘 저장되었는지 전송 성공 여부를 확인하기 위한 값이다. 0, 1, -1을 설정할 수 있다.
  • linger.ms: 배치를 전송하기 전까지 기다리는 최소 시간 값이다. 기본은 0이다.
  • retries: 브로커에서 에러를 받으면 재시도하는 횟수를 지정한다. 기본값은 int 최댓값이다.
  • max.in.flight.requests.per.connection: 한 번에 요청하는 최대 커넥션 수이다. 기본값은 5이다.
  • partitioner.class: 레코드를 파티션에 전송할 때 적용하는 파티셔너 클래스를 지정한다.
  • enable.idempotence: 멱등성 producer로 동작할지 여부를 설정한다.
  • transcational.id: producer가 레코드를 전송할 때 레코드를 트랜잭션 단위로 묶을지 여부를 결정한다.

producer partitioner

producer의 기본 파티셔너는 UniformStickyPartitionerRoundRobinPartitioner가 있다. 메세지 key가 있을 경우 두 파티셔너 모두 메세지 키의 해시 값과 파티션을 매칭하여 레코드를 전송한다. 메세지 키가 없는 경우 파티션에 동일하게 할당되도록 분배하는 로직을 사용하게 된다. RoundRobin 방식은 레코드가 들어오는대로 파티션을 순회하면서 전송하고, accumulater에서 묶이는 정도가 적어 낮은 전송 성능을 보인다. UnifomSticky 방식은 RoundRobbin 방식을 개선한 파티셔너로 accumulater에서 배치로 묶일 때까지 기다렸다가 전송하게 된다. 배치로 묶여도 파티션을 순회하면서 전송하기 때문에 모든 파티션에 분배된다.

Consumer

kafka3 Consumer의 구조이다. Fetcher는 리더 파티션으로부터 레코드를 미리 가져와서 대기한다. poll()은 Fetcher에 있는 레코드를 반환하는 역할을 한다. consumer record에는 처리하려고하는 레코드들의 모음이고 오프셋이 포함되어 있다.

  • bootstrap.servers: 브로커 정보를 세팅한다. 여러 브로커를 설정할 수 있다.
  • key.serializer: 레코드 메세지 키를 직렬화하는 클래스를 지정한다.
  • value.serializer: 레코드에서 메세지 값을 직렬화하는 클래스를 지정한다.
  • group.id: consumer 그룹 id를 지정한다. subscribe()로 토픽을 구독하게되면 필수로 지정해야 한다.
  • auto.offset.reset: 특정 파티션을 읽을 때 저장된 consumer 오프셋이 없으면 어느 오프셋을 읽을지 정한다. 오프셋이 있다면 값은 무시되고 기본 값은 latest이다.
  • enable.auto.commit: 자동으로 커밋을 할지 수동으로 커밋할지 지정한다. 기본값은 true이다.
  • auto.commit.interval.ms: 자동 커밋인 경우 오프셋 커밋 간격을 설정한다.
  • max.poll.records: poll 메서드를 이용해 반환되는 레코드 개수를 설정한다.
  • session.timeout.ms: consumer와 브로커의 연결이 끊기는 최대 시간을 설정한다.
  • heartbeat.interval.ms: heartbeat를 브로커로 보내는 주기를 설정한다. 기본값은 3초이다.
  • max.poll.interval.ms: poll 메서드를 호출하는 간격의 최대 시간 기본값은 5분이다.
  • isolation.level: 트랜잭션 producer가 레코드를 트랜잭션 단위로 보낼 경우 사용한다.

consumer group

kafka4 consumer를 각 consumer 그룹으로부터 격리된 환경에서 안전하게 운영할 수 있게 도와주는 방식이다. consumer 그룹으로 묶인 consumer들은 토픽의 파티션에 할당되어 레코드를 가져갈 수 있다. 1개의 파티션은 최대 1개의 consumer에 할당이 가능하다. 1개의 consumer는 여러개의 파티션에 할당될 수 있다. 만약 consumer의 개수가 파티션 개수보다 많게되면 파티션을 할당받지 못한 consumer는 불필요한 스레드로 남는다.

rebalancing

리밸런싱은 consumer 그룹으로 이루어진 consumer 중 일부 consumer에 장애가 발생하면 consumer에 할당되어 있던 파티션을 다른 consumer로 재할당하는 것이다. consumer가 새로 추가되거나 제거되는 시점에 리밸런싱이 발생한다.

commit

consumer는 브로커로부터 어디까지 데이터를 가져갔는지에 대한 정보를 commit을 통해 기록한다. 토픽의 파티션을 어떤 consumer 그룹이 몇 번째 가져갔는지 정보가 기록된다. 데이터 처리의 중복이 발생하지 않기 위해 consumer는 오프셋 commit을 정상적으로 처리했는지 검증해야 한다.

This post is licensed under CC BY 4.0 by the author.

[MSA 4] Maven Repository로 MSA 환경에서 DTO 공유하기(maven central)

[Effective Java] 예외를 예외답게 사용하기