카프카 클라이언트
- 카프카 클러스터에 명령을 내리거나 데이터를 송수신 하기 위해 사용한다.
- 카프카 프로듀서, 컨슈머, 어드민 클라이언트를 제공함.
- 엄연한 라이브러리이므로, 프레임워크나 애플리케이션에서 구현 후 실행.
프로듀서 API
- 카프카에 필요한 데이터를 선언하고 브로커의 특정 토픽의 파티션에 전송.
- 데이터를 전송할 때 리더 파티션 소유의 카프카 브로커와 직접 통신한다.
주요 특징
- 카브카 브로커로 데이터 전송 시, 내부적으로 파티셔너 및 배치 생성 단계를 거침.
- UniformStickyPartitioner 와 RoundRobinPartitioner 의 2개 파티션을 제공함.
- 메시지 키 유무에서 차이점이 발생한다.
- 프로듀서 동작에 특화된 UniformStickyPartitioner는 높은 처리량 및 낮은 리소스 사용률을 가짐. => 2.4 버전 이후는 이 파티션이 디폴트
- Accumulator 에서 데이터가 batch 로 모두 묶일 때까지 기다린 후 모두 동일한 파티션에 전송하는 방식으로 RoundRobinPartioner보다 향상된 성능으로 개선하게 되었다.
- 브로커로 데이터를 전송 시 압축 옵션을 선택 가능
- 압축 되지 않은 상태로 전송하려면 옵션을 지정하지 않으면 된다.
- 압축 시 장점은, 네트워크 처리량에 이득,
- 단점은, CPU 나 Memory Resource 를 사용하므로 환경에 따라 지정하자~
주요 옵션
필수 옵션
- bootstrap-servers
- key.serializer
- value.serializer
선택 옵션
- acks
- buffer.memory
- batch.size
- linger.ms
- partitioner.class
- enable.idempotence
- transactional.id
카프카 브로커의 데이터 정상 전송 여부 확인
- KafkaProducerdml send() 메서드는 Future 객체 반환
- RecordMetadata의 비동기 데이터 결과를 가진다.
- ProducerRecord가 브로커에 정상 저장 되었는지에 대한 데이터를 포함한다.
- org.apache.kafka.clients.producer.Callback 메서드로 전송 여부 확인 가능.
컨슈머 API
- 컨슈머는 적재된 데이터를 사용하기 위해 브로커로부터 데이터를 가져와서 필요한 처리를 한다.
중요 개념
토픽의 파티션으로부터 데이터를 가져가는 방법은 2가지.
- 컨슈머 그룹을 운영하는 방법
- 토픽의 특정 파티션만 구독하는 컨슈머를 운영하는 방법
컨슈머 그룹은 다른 그룹과 격리되는 특징이 있다.
카프카 프로듀서가 보낸 데이터를 각기 다른 역할을 하는 컨슈머 그룹끼리 영향을 받지 않게 처리할 수 있다는 장점이 있다.
컨슈머 그룹으로 이루어진 컨슈들 중, 일부 컨슈머 장애 발생 시, 장애가 발생하지 않은 컨슈머에 장애 컨슈머의 내부의 파티션의 소유권이 넘어간다.
- Rebalancing
- 컨슈머 중 1개에 이슈가 발생하여 더는 동작하지 않을 경우
- 컨슈머가 제외되는 상황 (장애 상황)
- 유용하지만 자주 발생하는 것은 좋지 않다.
- 파티션의 소유권을 다른 컨슈머로 재할당하는 과정에서 해당 컨슈머 그룹의 컨슈머들이 토픽의 데이터를 읽을 수 없다!
- Rebalancing
특징
- 카프카 브로커로부터 데이터를 어디까지 가졌는지, commit 을 통해 기록
- 특정 토픽의 파티션을 어떤 컨슈머 그룹이 몇 번째까지 가져갔는지 내부에서 사용하는 토픽 (__consumer_offsets) 에 기록됨.
- 컨슈머 동작 이슈로 인해 해당 내부 토픽에 어느 지점까지 읽었는지 기록이 안 될 경우, 데이터 처리의 중복 발생 -> offset commit 의 정상 처리 여부를 검증해야 함.
- offset commit 은 컨슈머 애플리케이션에서 명시적, 비명시적으로 수행 가능.
- 기본 옵션은 poll() 메서드 수행 시, 일정 간격으로 offset commit 을 수행하도록 설정되어 있다.
auto.commit.interval.ms
와 같이 사용- 이상의 시간이 흘렀을 경우, 해당 시점까지 읽은 레코드의 offset commit.
- 비명시적 commit 은 poll() 메서드 호출 이후, Rebalancing 이나 컨슈머의 강제 종료 발생 시 처리하는 데이터의 중복 또는 유실 가능성에 매우 취약하다.
- 데이터 중복, 유실을 허용하지 않는 서비스 개발 시 auto commit 은 하지마라!
주요 옵션
필수 옵션
- bootstrap-servers
- key.serializer
- value.serializer
선택 옵션
- group.id
- auto.offset.reset : 컨슈머 그룹이 특정 파티션을 읽을 때 저장된 컨슈머 오프셋이 없는 경우 어느 오프셋부터 읽을지 선택하는 옵션
- enable.auto.commit : 자동 커밋으로 할지 수동 커밋으로 할지 선택 (기본값 : true)
- auto.commit.interval.ms
- max.poll.records : poll() 메서드를 통해 반환되는 레코드 개수를 지정 (기본값 : 500)
- session.timeout.ms : 컨슈머가 브로커와 연결이 끊기는 최대 시간, 이 시간 내에 hearbeat 를 전송하지 않으면 브로커는 컨슈머에 이슈가 발생했다고 가정하고 리밸런싱 시작
- hearbeat.interval.ms
- max.poll.interval.ms
- isolation.level
동기, 비동기 offset commit
- 더 많은 데이터의 처리를 위해서 비동기 offset commit 을 고려해보자~
Rebalance Listener 를 가진 컨슈머
- 컨슈머 그룹 내의 컨슈머 추가 및 제거 시 파티션을 컨슈머에 재할당하는 Rebalance 발생.
- poll() 메서드를 통해 받은 데이터를 모두 처리하기 전에 리밸런스 발생 시 데이터 중복 처리가 발생한다.
- 데이터 일부를 처리했으나, commit 하지 않았기 때문이다..
- Rebalance 발생을 감지하기 위해 카프카는 ConsumerRebalanceListener (Interface) 지원.
- onPartitionAssigned() : Rebalance 후, 파티션 할당 완료 시 호출
- onPartitionRevoked() : Rebalance 직전 호출
- 마지막으로 처리한 레코드를 기준으로 커밋하기 위해서 Rebalance 시작 전, commit 하면 되므로, onPartitionAssigned() 함수에 커밋을 구현하여 처리할 수 있는 방법이 있다.
컨슈머의 안전한 종료
- 정상 종료가 되지 않은 컨슈머는 세션 타임아웃 발생시까지 컨슈머 그룹에 잔재한다.
- 놀고 있는 컨슈머가 생김.
- 컨슈머 랙이 늘어나면 데이터 처리 지연이 발생한다.
- 안전하게 종료하기 위해 wakeUp() 메서드 활용 가능
- 해당 메서드의 실행 이후 poll() 메서드 발생 시 WakeUpException 발생~
어드민 API
- 실제 운영 환경에서는 카프카에 설정된 내부 옵션을 설정하고 확인하는 것이 매우 중요하다.
- 브로커 중 한 대에 접속 후 옵션을 확인 할 수 있지만 매우 번거로움.
- 이때 AdminClient 클래스를 사용할 수 있다.
주요 조회 가능 리스트
- 브로커 정보 조회
- 토픽 리스트 조회
- 컨슈머 그룹 조회
- 신규 토픽 생성
- 파티션 개수 변경
- 접근 제어 규칙 생성
반응형
'📨 Apache Kafka' 카테고리의 다른 글
[아파치 카프카 애플리케이션 프로그래밍 with 자바] 3-6장 카프카 커넥트 (0) | 2023.04.20 |
---|---|
[아파치 카프카 애플리케이션 프로그래밍 with 자바] 3-5장 카프카 스트림즈 (0) | 2023.04.13 |
[아파치 카프카 애플리케이션 프로그래밍 with 자바] 3-2, 3장 토픽과 파티션, 레코드 (0) | 2023.03.27 |
[아파치 카프카 애플리케이션 프로그래밍 with 자바] 3-1장 카프카 브로커와 클러스터 주키퍼 (0) | 2023.03.27 |
[아파치 카프카 애플리케이션 프로그래밍 with 자바] 1장 들어가며 (0) | 2023.03.13 |