본문 바로가기
📨 Apache Kafka

[아파치 카프카 애플리케이션 프로그래밍 with 자바] 3.4 장 카프카 클라이언트

by GroovyArea 2023. 4. 9.

카프카 클라이언트

  • 카프카 클러스터에 명령을 내리거나 데이터를 송수신 하기 위해 사용한다.
  • 카프카 프로듀서, 컨슈머, 어드민 클라이언트를 제공함.
  • 엄연한 라이브러리이므로, 프레임워크나 애플리케이션에서 구현 후 실행.

프로듀서 API

  • 카프카에 필요한 데이터를 선언하고 브로커의 특정 토픽의 파티션에 전송.
  • 데이터를 전송할 때 리더 파티션 소유의 카프카 브로커와 직접 통신한다.

주요 특징

  • 카브카 브로커로 데이터 전송 시, 내부적으로 파티셔너 및 배치 생성 단계를 거침.
    • UniformStickyPartitioner 와 RoundRobinPartitioner 의 2개 파티션을 제공함.
    • 메시지 키 유무에서 차이점이 발생한다.
    • 프로듀서 동작에 특화된 UniformStickyPartitioner는 높은 처리량 및 낮은 리소스 사용률을 가짐. => 2.4 버전 이후는 이 파티션이 디폴트
    • Accumulator 에서 데이터가 batch 로 모두 묶일 때까지 기다린 후 모두 동일한 파티션에 전송하는 방식으로 RoundRobinPartioner보다 향상된 성능으로 개선하게 되었다.
  • 브로커로 데이터를 전송 시 압축 옵션을 선택 가능
    • 압축 되지 않은 상태로 전송하려면 옵션을 지정하지 않으면 된다.
    • 압축 시 장점은, 네트워크 처리량에 이득,
    • 단점은, CPU 나 Memory Resource 를 사용하므로 환경에 따라 지정하자~

주요 옵션

필수 옵션

  • bootstrap-servers
  • key.serializer
  • value.serializer

선택 옵션

  • acks
  • buffer.memory
  • batch.size
  • linger.ms
  • partitioner.class
  • enable.idempotence
  • transactional.id

카프카 브로커의 데이터 정상 전송 여부 확인

  • KafkaProducerdml send() 메서드는 Future 객체 반환
    • RecordMetadata의 비동기 데이터 결과를 가진다.
    • ProducerRecord가 브로커에 정상 저장 되었는지에 대한 데이터를 포함한다.
    • org.apache.kafka.clients.producer.Callback 메서드로 전송 여부 확인 가능.

컨슈머 API

  • 컨슈머는 적재된 데이터를 사용하기 위해 브로커로부터 데이터를 가져와서 필요한 처리를 한다.

중요 개념

  • 토픽의 파티션으로부터 데이터를 가져가는 방법은 2가지.

    • 컨슈머 그룹을 운영하는 방법
    • 토픽의 특정 파티션만 구독하는 컨슈머를 운영하는 방법
  • 컨슈머 그룹은 다른 그룹과 격리되는 특징이 있다.

  • 카프카 프로듀서가 보낸 데이터를 각기 다른 역할을 하는 컨슈머 그룹끼리 영향을 받지 않게 처리할 수 있다는 장점이 있다.

  • 컨슈머 그룹으로 이루어진 컨슈들 중, 일부 컨슈머 장애 발생 시, 장애가 발생하지 않은 컨슈머에 장애 컨슈머의 내부의 파티션의 소유권이 넘어간다.

    • Rebalancing
      • 컨슈머 중 1개에 이슈가 발생하여 더는 동작하지 않을 경우
      • 컨슈머가 제외되는 상황 (장애 상황)
    • 유용하지만 자주 발생하는 것은 좋지 않다.
    • 파티션의 소유권을 다른 컨슈머로 재할당하는 과정에서 해당 컨슈머 그룹의 컨슈머들이 토픽의 데이터를 읽을 수 없다!

특징

  • 카프카 브로커로부터 데이터를 어디까지 가졌는지, commit 을 통해 기록
  • 특정 토픽의 파티션을 어떤 컨슈머 그룹이 몇 번째까지 가져갔는지 내부에서 사용하는 토픽 (__consumer_offsets) 에 기록됨.
  • 컨슈머 동작 이슈로 인해 해당 내부 토픽에 어느 지점까지 읽었는지 기록이 안 될 경우, 데이터 처리의 중복 발생 -> offset commit 의 정상 처리 여부를 검증해야 함.
  • offset commit 은 컨슈머 애플리케이션에서 명시적, 비명시적으로 수행 가능.
  • 기본 옵션은 poll() 메서드 수행 시, 일정 간격으로 offset commit 을 수행하도록 설정되어 있다.
    • auto.commit.interval.ms 와 같이 사용
    • 이상의 시간이 흘렀을 경우, 해당 시점까지 읽은 레코드의 offset commit.
  • 비명시적 commit 은 poll() 메서드 호출 이후, Rebalancing 이나 컨슈머의 강제 종료 발생 시 처리하는 데이터의 중복 또는 유실 가능성에 매우 취약하다.
  • 데이터 중복, 유실을 허용하지 않는 서비스 개발 시 auto commit 은 하지마라!

주요 옵션

필수 옵션

  • bootstrap-servers
  • key.serializer
  • value.serializer

선택 옵션

  • group.id
  • auto.offset.reset : 컨슈머 그룹이 특정 파티션을 읽을 때 저장된 컨슈머 오프셋이 없는 경우 어느 오프셋부터 읽을지 선택하는 옵션
  • enable.auto.commit : 자동 커밋으로 할지 수동 커밋으로 할지 선택 (기본값 : true)
  • auto.commit.interval.ms
  • max.poll.records : poll() 메서드를 통해 반환되는 레코드 개수를 지정 (기본값 : 500)
  • session.timeout.ms : 컨슈머가 브로커와 연결이 끊기는 최대 시간, 이 시간 내에 hearbeat 를 전송하지 않으면 브로커는 컨슈머에 이슈가 발생했다고 가정하고 리밸런싱 시작
  • hearbeat.interval.ms
  • max.poll.interval.ms
  • isolation.level

동기, 비동기 offset commit

  • 더 많은 데이터의 처리를 위해서 비동기 offset commit 을 고려해보자~

Rebalance Listener 를 가진 컨슈머

  • 컨슈머 그룹 내의 컨슈머 추가 및 제거 시 파티션을 컨슈머에 재할당하는 Rebalance 발생.
  • poll() 메서드를 통해 받은 데이터를 모두 처리하기 전에 리밸런스 발생 시 데이터 중복 처리가 발생한다.
    • 데이터 일부를 처리했으나, commit 하지 않았기 때문이다..
  • Rebalance 발생을 감지하기 위해 카프카는 ConsumerRebalanceListener (Interface) 지원.
    • onPartitionAssigned() : Rebalance 후, 파티션 할당 완료 시 호출
    • onPartitionRevoked() : Rebalance 직전 호출
    • 마지막으로 처리한 레코드를 기준으로 커밋하기 위해서 Rebalance 시작 전, commit 하면 되므로, onPartitionAssigned() 함수에 커밋을 구현하여 처리할 수 있는 방법이 있다.

컨슈머의 안전한 종료

  • 정상 종료가 되지 않은 컨슈머는 세션 타임아웃 발생시까지 컨슈머 그룹에 잔재한다.
    • 놀고 있는 컨슈머가 생김.
    • 컨슈머 랙이 늘어나면 데이터 처리 지연이 발생한다.
  • 안전하게 종료하기 위해 wakeUp() 메서드 활용 가능
  • 해당 메서드의 실행 이후 poll() 메서드 발생 시 WakeUpException 발생~

어드민 API

  • 실제 운영 환경에서는 카프카에 설정된 내부 옵션을 설정하고 확인하는 것이 매우 중요하다.
  • 브로커 중 한 대에 접속 후 옵션을 확인 할 수 있지만 매우 번거로움.
  • 이때 AdminClient 클래스를 사용할 수 있다.

주요 조회 가능 리스트

  • 브로커 정보 조회
  • 토픽 리스트 조회
  • 컨슈머 그룹 조회
  • 신규 토픽 생성
  • 파티션 개수 변경
  • 접근 제어 규칙 생성
반응형