본문 바로가기
📨 Apache Kafka

[아파치 카프카 애플리케이션 프로그래밍 with 자바] 3-5장 카프카 스트림즈

by GroovyArea 2023. 4. 13.

카프카 스트림즈

  • 토픽에 저장된 데이터를 실시간으로 다른 토픽에 적재하는 라이브러리
  • 카프카 클러스터와 완벽 호환 및 스트림 처리에 필요한 편리한 기능 제공
  • 장애 발생 시 Exactly Once 하도록 장애 허용 시스템 장착 => 데이터 처리 안정성 Up.

개요

  • 스트림즈 애플리케이션은 내부적으로 1개 이상의 스레드 생성 가능

    • 스레드는 1개 이상의 태스크를 가진다.
  • 스트림즈의 Task 는 스트림즈 애플리케이션을 실행하면 생기는 데이터의 최소 처리 단위

  • 3개의 파티션으로 이루어진 토픽을 처리할 경우, 스트림즈 애플리케이션을 실행 시, 내부의 3개의 태스크가 생김.

  • 카프카 스트림즈는 스트림즈 DSL (Domain Specific Language) 과 프로세서 API 2개의 방법으로 개발 가능

  • 스트림즈 DSL 은 스트림 프로세싱에 쓰일 다양한 기능을 API로서 제공함.

    • 쉽다
    • 기능이 없다면, 프로세서 API를 통해 개발 가능
  • 스트림즈 DSL 구현 가능 예시

    • 메시지 값 기반의 토픽 분기 처리
    • 지난 10분간 들어온 데이터 개수 집계
    • 서로 다른 토픽의 결합으로 새로운 데이터 생성
  • 프로세서 API 구현 예시

    • 메시지 값 종류에 따른 토픽 가변적 전송
    • 일정 시간 간격의 데이터 처리

스트림즈 DSL

  • 레코드의 흐름을 추상화한 3가지 개념
    • KStream
    • KTable
    • GlobalKTable
  • 오직 스트림즈 DSL 에서만 사용하는 개념

KStream

  • 레코드의 흐름을 표현한 것
    • 키, 값 형태로 구성
  • KStream 으로 데이터 조회 시, 토픽에 존재하는 모든 레코드 출력
  • 컨슈머로 토픽을 구독하는 것과 같은 선상에서 사용하는 것이라 보면 됨.

KTable

  • 메시지 키를 기준으로 묶어서 사용

Global KTable

  • 메시지 키를 기준으로 묶어서 사용
  • KTable 과의 차이점
    • KTable 로 선언된 토픽은 1개 파티션이 1개 태스크에 할당 및 사용
    • GlobalKTable 로 선언된 토픽은 모든 파티션 데이터가 각 태스크에 할당되어 사용
    • 예시
      • KStream 및 KTable 의 데이터 조인 상황
      • Join 시 서로 간 co-partitioning 필수.
        • co-partitioning : Join 을 하는 2개 데이터의 파티션 개수가 동일하고, 파티셔닝 전략을 동일하게 맞추는 전략
      • GlobalKTable 은 co-partitioning 되지 않은 KStream 과 데이터 조인 가능
        • KTable 과 다르게 데이터를 스트림즈 애플리케이션의 모든 태스크에 동일하게 공유되어 사용되기 때문
        • 단점으로 각 태스크마다 GlobalKTable 로 정의된 모든 데이터를 저장 및 사용하므로, 스트림즈 애플리케이션의 로컬 스토리지 사용량 증가
        • 네트워크, 브로커에 과부하
        • 작은 용량의 데이터일 경우에만 GlobalKTable 사용 권장.

주요 옵션

  • 필수 옵션

    • bootstrap.servers
    • application.id
  • 선택 옵션

  • default.key.serde : 레코드의 메시지 키를 직렬화, 역직렬화하는 클래스를 지정 (기본값 : 바이트 직렬화)

  • num.stream.threads : 스트림 프로세싱 실행 시 실행될 스레드 개수를 지정 (기본값 : 1)

  • state.dir : 상태기반 데이터 처리를 할 때 데이터를 저장할 디렉토리 지정 (기본값 : tmp/kafka-streams)


프로세서 API

  • 데이터 처리, 분기, Join 기능 이외에 추가적인 상세 로직 구현 시 활용 가능
  • 스트림즈 DSL 에서 사용한 KStream, KTable, GlobalKTable 개념이 없다는 점을 유의

Simple Kafka Processor 예시 코드

    public class SimpleKafkaProcessor {

    private static String APPLICATION_NAME = "processor-application";
    private static String BOOTSTRAP_SERVERS = "my-kafka:9092";
    private static String STREAM_LOG = "stream_log";
    private static String STREAM_LOG_FILTER = "stream_log_filter";

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // 1. 토폴로지 생성
        Topology topology = new Topology();
        topology.addSource("Source", // 2. 해당 토픽을 가져오기 위해 함수 호출
                        STREAM_LOG)
                .addProcessor("Process", // 3. 스트림 프로세서 사용, 이때 다음 프로세서는 Process 스트림 프로세스
                        () -> new FilterProcessor(),
                        "Source")
                .addSink("Sink", // 4. 싱크 프로세서로 사용하여 데이터를 저장하기 위한 함수 호출
                        STREAM_LOG_FILTER,
                        "Process");

        // 5. 해당 토폴로지를 KafkaStreams 인스턴스에 넣고, 스트림즈 생성 및 실행
        KafkaStreams streaming = new KafkaStreams(topology, props);
        streaming.start();
    }
}
반응형