카프카 스트림즈
- 토픽에 저장된 데이터를 실시간으로 다른 토픽에 적재하는 라이브러리
- 카프카 클러스터와 완벽 호환 및 스트림 처리에 필요한 편리한 기능 제공
- 장애 발생 시 Exactly Once 하도록 장애 허용 시스템 장착 => 데이터 처리 안정성 Up.
개요
스트림즈 애플리케이션은 내부적으로 1개 이상의 스레드 생성 가능
- 스레드는 1개 이상의 태스크를 가진다.
스트림즈의 Task 는 스트림즈 애플리케이션을 실행하면 생기는 데이터의 최소 처리 단위
3개의 파티션으로 이루어진 토픽을 처리할 경우, 스트림즈 애플리케이션을 실행 시, 내부의 3개의 태스크가 생김.
카프카 스트림즈는 스트림즈 DSL (Domain Specific Language) 과 프로세서 API 2개의 방법으로 개발 가능
스트림즈 DSL 은 스트림 프로세싱에 쓰일 다양한 기능을 API로서 제공함.
- 쉽다
- 기능이 없다면, 프로세서 API를 통해 개발 가능
스트림즈 DSL 구현 가능 예시
- 메시지 값 기반의 토픽 분기 처리
- 지난 10분간 들어온 데이터 개수 집계
- 서로 다른 토픽의 결합으로 새로운 데이터 생성
프로세서 API 구현 예시
- 메시지 값 종류에 따른 토픽 가변적 전송
- 일정 시간 간격의 데이터 처리
스트림즈 DSL
- 레코드의 흐름을 추상화한 3가지 개념
- KStream
- KTable
- GlobalKTable
- 오직 스트림즈 DSL 에서만 사용하는 개념
KStream
- 레코드의 흐름을 표현한 것
- 키, 값 형태로 구성
- KStream 으로 데이터 조회 시, 토픽에 존재하는 모든 레코드 출력
- 컨슈머로 토픽을 구독하는 것과 같은 선상에서 사용하는 것이라 보면 됨.
KTable
- 메시지 키를 기준으로 묶어서 사용
Global KTable
- 메시지 키를 기준으로 묶어서 사용
- KTable 과의 차이점
- KTable 로 선언된 토픽은 1개 파티션이 1개 태스크에 할당 및 사용
- GlobalKTable 로 선언된 토픽은 모든 파티션 데이터가 각 태스크에 할당되어 사용
- 예시
- KStream 및 KTable 의 데이터 조인 상황
- Join 시 서로 간 co-partitioning 필수.
- co-partitioning : Join 을 하는 2개 데이터의 파티션 개수가 동일하고, 파티셔닝 전략을 동일하게 맞추는 전략
- GlobalKTable 은 co-partitioning 되지 않은 KStream 과 데이터 조인 가능
- KTable 과 다르게 데이터를 스트림즈 애플리케이션의 모든 태스크에 동일하게 공유되어 사용되기 때문
- 단점으로 각 태스크마다 GlobalKTable 로 정의된 모든 데이터를 저장 및 사용하므로, 스트림즈 애플리케이션의 로컬 스토리지 사용량 증가
- 네트워크, 브로커에 과부하
- 작은 용량의 데이터일 경우에만 GlobalKTable 사용 권장.
주요 옵션
필수 옵션
- bootstrap.servers
- application.id
선택 옵션
default.key.serde : 레코드의 메시지 키를 직렬화, 역직렬화하는 클래스를 지정 (기본값 : 바이트 직렬화)
num.stream.threads : 스트림 프로세싱 실행 시 실행될 스레드 개수를 지정 (기본값 : 1)
state.dir : 상태기반 데이터 처리를 할 때 데이터를 저장할 디렉토리 지정 (기본값 : tmp/kafka-streams)
프로세서 API
- 데이터 처리, 분기, Join 기능 이외에 추가적인 상세 로직 구현 시 활용 가능
- 스트림즈 DSL 에서 사용한 KStream, KTable, GlobalKTable 개념이 없다는 점을 유의
Simple Kafka Processor 예시 코드
public class SimpleKafkaProcessor {
private static String APPLICATION_NAME = "processor-application";
private static String BOOTSTRAP_SERVERS = "my-kafka:9092";
private static String STREAM_LOG = "stream_log";
private static String STREAM_LOG_FILTER = "stream_log_filter";
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 1. 토폴로지 생성
Topology topology = new Topology();
topology.addSource("Source", // 2. 해당 토픽을 가져오기 위해 함수 호출
STREAM_LOG)
.addProcessor("Process", // 3. 스트림 프로세서 사용, 이때 다음 프로세서는 Process 스트림 프로세스
() -> new FilterProcessor(),
"Source")
.addSink("Sink", // 4. 싱크 프로세서로 사용하여 데이터를 저장하기 위한 함수 호출
STREAM_LOG_FILTER,
"Process");
// 5. 해당 토폴로지를 KafkaStreams 인스턴스에 넣고, 스트림즈 생성 및 실행
KafkaStreams streaming = new KafkaStreams(topology, props);
streaming.start();
}
}
반응형
'📨 Apache Kafka' 카테고리의 다른 글
[아파치 카프카 애플리케이션 프로그래밍 with 자바] 4-1장 토픽과 파티션 (0) | 2023.05.05 |
---|---|
[아파치 카프카 애플리케이션 프로그래밍 with 자바] 3-6장 카프카 커넥트 (0) | 2023.04.20 |
[아파치 카프카 애플리케이션 프로그래밍 with 자바] 3.4 장 카프카 클라이언트 (0) | 2023.04.09 |
[아파치 카프카 애플리케이션 프로그래밍 with 자바] 3-2, 3장 토픽과 파티션, 레코드 (0) | 2023.03.27 |
[아파치 카프카 애플리케이션 프로그래밍 with 자바] 3-1장 카프카 브로커와 클러스터 주키퍼 (0) | 2023.03.27 |