๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ
๐Ÿ“จ Apache Kafka

[์•„ํŒŒ์น˜ ์นดํ”„์นด ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ํ”„๋กœ๊ทธ๋ž˜๋ฐ with ์ž๋ฐ”] 3-5์žฅ ์นดํ”„์นด ์ŠคํŠธ๋ฆผ์ฆˆ

by GroovyArea 2023. 4. 13.

์นดํ”„์นด ์ŠคํŠธ๋ฆผ์ฆˆ

  • ํ† ํ”ฝ์— ์ €์žฅ๋œ ๋ฐ์ดํ„ฐ๋ฅผ ์‹ค์‹œ๊ฐ„์œผ๋กœ ๋‹ค๋ฅธ ํ† ํ”ฝ์— ์ ์žฌํ•˜๋Š” ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ
  • ์นดํ”„์นด ํด๋Ÿฌ์Šคํ„ฐ์™€ ์™„๋ฒฝ ํ˜ธํ™˜ ๋ฐ ์ŠคํŠธ๋ฆผ ์ฒ˜๋ฆฌ์— ํ•„์š”ํ•œ ํŽธ๋ฆฌํ•œ ๊ธฐ๋Šฅ ์ œ๊ณต
  • ์žฅ์•  ๋ฐœ์ƒ ์‹œ Exactly Once ํ•˜๋„๋ก ์žฅ์•  ํ—ˆ์šฉ ์‹œ์Šคํ…œ ์žฅ์ฐฉ => ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ์•ˆ์ •์„ฑ Up.

๊ฐœ์š”

  • ์ŠคํŠธ๋ฆผ์ฆˆ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์€ ๋‚ด๋ถ€์ ์œผ๋กœ 1๊ฐœ ์ด์ƒ์˜ ์Šค๋ ˆ๋“œ ์ƒ์„ฑ ๊ฐ€๋Šฅ

    • ์Šค๋ ˆ๋“œ๋Š” 1๊ฐœ ์ด์ƒ์˜ ํƒœ์Šคํฌ๋ฅผ ๊ฐ€์ง„๋‹ค.
  • ์ŠคํŠธ๋ฆผ์ฆˆ์˜ Task ๋Š” ์ŠคํŠธ๋ฆผ์ฆˆ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์„ ์‹คํ–‰ํ•˜๋ฉด ์ƒ๊ธฐ๋Š” ๋ฐ์ดํ„ฐ์˜ ์ตœ์†Œ ์ฒ˜๋ฆฌ ๋‹จ์œ„

  • 3๊ฐœ์˜ ํŒŒํ‹ฐ์…˜์œผ๋กœ ์ด๋ฃจ์–ด์ง„ ํ† ํ”ฝ์„ ์ฒ˜๋ฆฌํ•  ๊ฒฝ์šฐ, ์ŠคํŠธ๋ฆผ์ฆˆ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์„ ์‹คํ–‰ ์‹œ, ๋‚ด๋ถ€์˜ 3๊ฐœ์˜ ํƒœ์Šคํฌ๊ฐ€ ์ƒ๊น€.

  • ์นดํ”„์นด ์ŠคํŠธ๋ฆผ์ฆˆ๋Š” ์ŠคํŠธ๋ฆผ์ฆˆ DSL (Domain Specific Language) ๊ณผ ํ”„๋กœ์„ธ์„œ API 2๊ฐœ์˜ ๋ฐฉ๋ฒ•์œผ๋กœ ๊ฐœ๋ฐœ ๊ฐ€๋Šฅ

  • ์ŠคํŠธ๋ฆผ์ฆˆ DSL ์€ ์ŠคํŠธ๋ฆผ ํ”„๋กœ์„ธ์‹ฑ์— ์“ฐ์ผ ๋‹ค์–‘ํ•œ ๊ธฐ๋Šฅ์„ API๋กœ์„œ ์ œ๊ณตํ•จ.

    • ์‰ฝ๋‹ค
    • ๊ธฐ๋Šฅ์ด ์—†๋‹ค๋ฉด, ํ”„๋กœ์„ธ์„œ API๋ฅผ ํ†ตํ•ด ๊ฐœ๋ฐœ ๊ฐ€๋Šฅ
  • ์ŠคํŠธ๋ฆผ์ฆˆ DSL ๊ตฌํ˜„ ๊ฐ€๋Šฅ ์˜ˆ์‹œ

    • ๋ฉ”์‹œ์ง€ ๊ฐ’ ๊ธฐ๋ฐ˜์˜ ํ† ํ”ฝ ๋ถ„๊ธฐ ์ฒ˜๋ฆฌ
    • ์ง€๋‚œ 10๋ถ„๊ฐ„ ๋“ค์–ด์˜จ ๋ฐ์ดํ„ฐ ๊ฐœ์ˆ˜ ์ง‘๊ณ„
    • ์„œ๋กœ ๋‹ค๋ฅธ ํ† ํ”ฝ์˜ ๊ฒฐํ•ฉ์œผ๋กœ ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ ์ƒ์„ฑ
  • ํ”„๋กœ์„ธ์„œ API ๊ตฌํ˜„ ์˜ˆ์‹œ

    • ๋ฉ”์‹œ์ง€ ๊ฐ’ ์ข…๋ฅ˜์— ๋”ฐ๋ฅธ ํ† ํ”ฝ ๊ฐ€๋ณ€์  ์ „์†ก
    • ์ผ์ • ์‹œ๊ฐ„ ๊ฐ„๊ฒฉ์˜ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ

์ŠคํŠธ๋ฆผ์ฆˆ DSL

  • ๋ ˆ์ฝ”๋“œ์˜ ํ๋ฆ„์„ ์ถ”์ƒํ™”ํ•œ 3๊ฐ€์ง€ ๊ฐœ๋…
    • KStream
    • KTable
    • GlobalKTable
  • ์˜ค์ง ์ŠคํŠธ๋ฆผ์ฆˆ DSL ์—์„œ๋งŒ ์‚ฌ์šฉํ•˜๋Š” ๊ฐœ๋…

KStream

  • ๋ ˆ์ฝ”๋“œ์˜ ํ๋ฆ„์„ ํ‘œํ˜„ํ•œ ๊ฒƒ
    • ํ‚ค, ๊ฐ’ ํ˜•ํƒœ๋กœ ๊ตฌ์„ฑ
  • KStream ์œผ๋กœ ๋ฐ์ดํ„ฐ ์กฐํšŒ ์‹œ, ํ† ํ”ฝ์— ์กด์žฌํ•˜๋Š” ๋ชจ๋“  ๋ ˆ์ฝ”๋“œ ์ถœ๋ ฅ
  • ์ปจ์Šˆ๋จธ๋กœ ํ† ํ”ฝ์„ ๊ตฌ๋…ํ•˜๋Š” ๊ฒƒ๊ณผ ๊ฐ™์€ ์„ ์ƒ์—์„œ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด๋ผ ๋ณด๋ฉด ๋จ.

KTable

  • ๋ฉ”์‹œ์ง€ ํ‚ค๋ฅผ ๊ธฐ์ค€์œผ๋กœ ๋ฌถ์–ด์„œ ์‚ฌ์šฉ

Global KTable

  • ๋ฉ”์‹œ์ง€ ํ‚ค๋ฅผ ๊ธฐ์ค€์œผ๋กœ ๋ฌถ์–ด์„œ ์‚ฌ์šฉ
  • KTable ๊ณผ์˜ ์ฐจ์ด์ 
    • KTable ๋กœ ์„ ์–ธ๋œ ํ† ํ”ฝ์€ 1๊ฐœ ํŒŒํ‹ฐ์…˜์ด 1๊ฐœ ํƒœ์Šคํฌ์— ํ• ๋‹น ๋ฐ ์‚ฌ์šฉ
    • GlobalKTable ๋กœ ์„ ์–ธ๋œ ํ† ํ”ฝ์€ ๋ชจ๋“  ํŒŒํ‹ฐ์…˜ ๋ฐ์ดํ„ฐ๊ฐ€ ๊ฐ ํƒœ์Šคํฌ์— ํ• ๋‹น๋˜์–ด ์‚ฌ์šฉ
    • ์˜ˆ์‹œ
      • KStream ๋ฐ KTable ์˜ ๋ฐ์ดํ„ฐ ์กฐ์ธ ์ƒํ™ฉ
      • Join ์‹œ ์„œ๋กœ ๊ฐ„ co-partitioning ํ•„์ˆ˜.
        • co-partitioning : Join ์„ ํ•˜๋Š” 2๊ฐœ ๋ฐ์ดํ„ฐ์˜ ํŒŒํ‹ฐ์…˜ ๊ฐœ์ˆ˜๊ฐ€ ๋™์ผํ•˜๊ณ , ํŒŒํ‹ฐ์…”๋‹ ์ „๋žต์„ ๋™์ผํ•˜๊ฒŒ ๋งž์ถ”๋Š” ์ „๋žต
      • GlobalKTable ์€ co-partitioning ๋˜์ง€ ์•Š์€ KStream ๊ณผ ๋ฐ์ดํ„ฐ ์กฐ์ธ ๊ฐ€๋Šฅ
        • KTable ๊ณผ ๋‹ค๋ฅด๊ฒŒ ๋ฐ์ดํ„ฐ๋ฅผ ์ŠคํŠธ๋ฆผ์ฆˆ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์˜ ๋ชจ๋“  ํƒœ์Šคํฌ์— ๋™์ผํ•˜๊ฒŒ ๊ณต์œ ๋˜์–ด ์‚ฌ์šฉ๋˜๊ธฐ ๋•Œ๋ฌธ
        • ๋‹จ์ ์œผ๋กœ ๊ฐ ํƒœ์Šคํฌ๋งˆ๋‹ค GlobalKTable ๋กœ ์ •์˜๋œ ๋ชจ๋“  ๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅ ๋ฐ ์‚ฌ์šฉํ•˜๋ฏ€๋กœ, ์ŠคํŠธ๋ฆผ์ฆˆ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์˜ ๋กœ์ปฌ ์Šคํ† ๋ฆฌ์ง€ ์‚ฌ์šฉ๋Ÿ‰ ์ฆ๊ฐ€
        • ๋„คํŠธ์›Œํฌ, ๋ธŒ๋กœ์ปค์— ๊ณผ๋ถ€ํ•˜
        • ์ž‘์€ ์šฉ๋Ÿ‰์˜ ๋ฐ์ดํ„ฐ์ผ ๊ฒฝ์šฐ์—๋งŒ GlobalKTable ์‚ฌ์šฉ ๊ถŒ์žฅ.

์ฃผ์š” ์˜ต์…˜

  • ํ•„์ˆ˜ ์˜ต์…˜

    • bootstrap.servers
    • application.id
  • ์„ ํƒ ์˜ต์…˜

  • default.key.serde : ๋ ˆ์ฝ”๋“œ์˜ ๋ฉ”์‹œ์ง€ ํ‚ค๋ฅผ ์ง๋ ฌํ™”, ์—ญ์ง๋ ฌํ™”ํ•˜๋Š” ํด๋ž˜์Šค๋ฅผ ์ง€์ • (๊ธฐ๋ณธ๊ฐ’ : ๋ฐ”์ดํŠธ ์ง๋ ฌํ™”)

  • num.stream.threads : ์ŠคํŠธ๋ฆผ ํ”„๋กœ์„ธ์‹ฑ ์‹คํ–‰ ์‹œ ์‹คํ–‰๋  ์Šค๋ ˆ๋“œ ๊ฐœ์ˆ˜๋ฅผ ์ง€์ • (๊ธฐ๋ณธ๊ฐ’ : 1)

  • state.dir : ์ƒํƒœ๊ธฐ๋ฐ˜ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ๋ฅผ ํ•  ๋•Œ ๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅํ•  ๋””๋ ‰ํ† ๋ฆฌ ์ง€์ • (๊ธฐ๋ณธ๊ฐ’ : tmp/kafka-streams)


ํ”„๋กœ์„ธ์„œ API

  • ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ, ๋ถ„๊ธฐ, Join ๊ธฐ๋Šฅ ์ด์™ธ์— ์ถ”๊ฐ€์ ์ธ ์ƒ์„ธ ๋กœ์ง ๊ตฌํ˜„ ์‹œ ํ™œ์šฉ ๊ฐ€๋Šฅ
  • ์ŠคํŠธ๋ฆผ์ฆˆ DSL ์—์„œ ์‚ฌ์šฉํ•œ KStream, KTable, GlobalKTable ๊ฐœ๋…์ด ์—†๋‹ค๋Š” ์ ์„ ์œ ์˜

Simple Kafka Processor ์˜ˆ์‹œ ์ฝ”๋“œ

    public class SimpleKafkaProcessor {

    private static String APPLICATION_NAME = "processor-application";
    private static String BOOTSTRAP_SERVERS = "my-kafka:9092";
    private static String STREAM_LOG = "stream_log";
    private static String STREAM_LOG_FILTER = "stream_log_filter";

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        // 1. ํ† ํด๋กœ์ง€ ์ƒ์„ฑ
        Topology topology = new Topology();
        topology.addSource("Source", // 2. ํ•ด๋‹น ํ† ํ”ฝ์„ ๊ฐ€์ ธ์˜ค๊ธฐ ์œ„ํ•ด ํ•จ์ˆ˜ ํ˜ธ์ถœ
                        STREAM_LOG)
                .addProcessor("Process", // 3. ์ŠคํŠธ๋ฆผ ํ”„๋กœ์„ธ์„œ ์‚ฌ์šฉ, ์ด๋•Œ ๋‹ค์Œ ํ”„๋กœ์„ธ์„œ๋Š” Process ์ŠคํŠธ๋ฆผ ํ”„๋กœ์„ธ์Šค
                        () -> new FilterProcessor(),
                        "Source")
                .addSink("Sink", // 4. ์‹ฑํฌ ํ”„๋กœ์„ธ์„œ๋กœ ์‚ฌ์šฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅํ•˜๊ธฐ ์œ„ํ•œ ํ•จ์ˆ˜ ํ˜ธ์ถœ
                        STREAM_LOG_FILTER,
                        "Process");

        // 5. ํ•ด๋‹น ํ† ํด๋กœ์ง€๋ฅผ KafkaStreams ์ธ์Šคํ„ด์Šค์— ๋„ฃ๊ณ , ์ŠคํŠธ๋ฆผ์ฆˆ ์ƒ์„ฑ ๋ฐ ์‹คํ–‰
        KafkaStreams streaming = new KafkaStreams(topology, props);
        streaming.start();
    }
}
๋ฐ˜์‘ํ˜•