์นดํ์นด ์คํธ๋ฆผ์ฆ
- ํ ํฝ์ ์ ์ฅ๋ ๋ฐ์ดํฐ๋ฅผ ์ค์๊ฐ์ผ๋ก ๋ค๋ฅธ ํ ํฝ์ ์ ์ฌํ๋ ๋ผ์ด๋ธ๋ฌ๋ฆฌ
- ์นดํ์นด ํด๋ฌ์คํฐ์ ์๋ฒฝ ํธํ ๋ฐ ์คํธ๋ฆผ ์ฒ๋ฆฌ์ ํ์ํ ํธ๋ฆฌํ ๊ธฐ๋ฅ ์ ๊ณต
- ์ฅ์ ๋ฐ์ ์ Exactly Once ํ๋๋ก ์ฅ์ ํ์ฉ ์์คํ ์ฅ์ฐฉ => ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์์ ์ฑ Up.
๊ฐ์
์คํธ๋ฆผ์ฆ ์ ํ๋ฆฌ์ผ์ด์ ์ ๋ด๋ถ์ ์ผ๋ก 1๊ฐ ์ด์์ ์ค๋ ๋ ์์ฑ ๊ฐ๋ฅ
- ์ค๋ ๋๋ 1๊ฐ ์ด์์ ํ์คํฌ๋ฅผ ๊ฐ์ง๋ค.
์คํธ๋ฆผ์ฆ์ Task ๋ ์คํธ๋ฆผ์ฆ ์ ํ๋ฆฌ์ผ์ด์ ์ ์คํํ๋ฉด ์๊ธฐ๋ ๋ฐ์ดํฐ์ ์ต์ ์ฒ๋ฆฌ ๋จ์
3๊ฐ์ ํํฐ์ ์ผ๋ก ์ด๋ฃจ์ด์ง ํ ํฝ์ ์ฒ๋ฆฌํ ๊ฒฝ์ฐ, ์คํธ๋ฆผ์ฆ ์ ํ๋ฆฌ์ผ์ด์ ์ ์คํ ์, ๋ด๋ถ์ 3๊ฐ์ ํ์คํฌ๊ฐ ์๊น.
์นดํ์นด ์คํธ๋ฆผ์ฆ๋ ์คํธ๋ฆผ์ฆ DSL (Domain Specific Language) ๊ณผ ํ๋ก์ธ์ API 2๊ฐ์ ๋ฐฉ๋ฒ์ผ๋ก ๊ฐ๋ฐ ๊ฐ๋ฅ
์คํธ๋ฆผ์ฆ DSL ์ ์คํธ๋ฆผ ํ๋ก์ธ์ฑ์ ์ฐ์ผ ๋ค์ํ ๊ธฐ๋ฅ์ API๋ก์ ์ ๊ณตํจ.
- ์ฝ๋ค
- ๊ธฐ๋ฅ์ด ์๋ค๋ฉด, ํ๋ก์ธ์ API๋ฅผ ํตํด ๊ฐ๋ฐ ๊ฐ๋ฅ
์คํธ๋ฆผ์ฆ DSL ๊ตฌํ ๊ฐ๋ฅ ์์
- ๋ฉ์์ง ๊ฐ ๊ธฐ๋ฐ์ ํ ํฝ ๋ถ๊ธฐ ์ฒ๋ฆฌ
- ์ง๋ 10๋ถ๊ฐ ๋ค์ด์จ ๋ฐ์ดํฐ ๊ฐ์ ์ง๊ณ
- ์๋ก ๋ค๋ฅธ ํ ํฝ์ ๊ฒฐํฉ์ผ๋ก ์๋ก์ด ๋ฐ์ดํฐ ์์ฑ
ํ๋ก์ธ์ API ๊ตฌํ ์์
- ๋ฉ์์ง ๊ฐ ์ข ๋ฅ์ ๋ฐ๋ฅธ ํ ํฝ ๊ฐ๋ณ์ ์ ์ก
- ์ผ์ ์๊ฐ ๊ฐ๊ฒฉ์ ๋ฐ์ดํฐ ์ฒ๋ฆฌ
์คํธ๋ฆผ์ฆ DSL
- ๋ ์ฝ๋์ ํ๋ฆ์ ์ถ์ํํ 3๊ฐ์ง ๊ฐ๋
- KStream
- KTable
- GlobalKTable
- ์ค์ง ์คํธ๋ฆผ์ฆ DSL ์์๋ง ์ฌ์ฉํ๋ ๊ฐ๋
KStream
- ๋ ์ฝ๋์ ํ๋ฆ์ ํํํ ๊ฒ
- ํค, ๊ฐ ํํ๋ก ๊ตฌ์ฑ
- KStream ์ผ๋ก ๋ฐ์ดํฐ ์กฐํ ์, ํ ํฝ์ ์กด์ฌํ๋ ๋ชจ๋ ๋ ์ฝ๋ ์ถ๋ ฅ
- ์ปจ์๋จธ๋ก ํ ํฝ์ ๊ตฌ๋
ํ๋ ๊ฒ๊ณผ ๊ฐ์ ์ ์์์ ์ฌ์ฉํ๋ ๊ฒ์ด๋ผ ๋ณด๋ฉด ๋จ.
KTable
- ๋ฉ์์ง ํค๋ฅผ ๊ธฐ์ค์ผ๋ก ๋ฌถ์ด์ ์ฌ์ฉ
Global KTable
- ๋ฉ์์ง ํค๋ฅผ ๊ธฐ์ค์ผ๋ก ๋ฌถ์ด์ ์ฌ์ฉ
- KTable ๊ณผ์ ์ฐจ์ด์
- KTable ๋ก ์ ์ธ๋ ํ ํฝ์ 1๊ฐ ํํฐ์ ์ด 1๊ฐ ํ์คํฌ์ ํ ๋น ๋ฐ ์ฌ์ฉ
- GlobalKTable ๋ก ์ ์ธ๋ ํ ํฝ์ ๋ชจ๋ ํํฐ์ ๋ฐ์ดํฐ๊ฐ ๊ฐ ํ์คํฌ์ ํ ๋น๋์ด ์ฌ์ฉ
- ์์
- KStream ๋ฐ KTable ์ ๋ฐ์ดํฐ ์กฐ์ธ ์ํฉ
- Join ์ ์๋ก ๊ฐ co-partitioning ํ์.
- co-partitioning : Join ์ ํ๋ 2๊ฐ ๋ฐ์ดํฐ์ ํํฐ์ ๊ฐ์๊ฐ ๋์ผํ๊ณ , ํํฐ์ ๋ ์ ๋ต์ ๋์ผํ๊ฒ ๋ง์ถ๋ ์ ๋ต
- GlobalKTable ์ co-partitioning ๋์ง ์์ KStream ๊ณผ ๋ฐ์ดํฐ ์กฐ์ธ ๊ฐ๋ฅ
- KTable ๊ณผ ๋ค๋ฅด๊ฒ ๋ฐ์ดํฐ๋ฅผ ์คํธ๋ฆผ์ฆ ์ ํ๋ฆฌ์ผ์ด์ ์ ๋ชจ๋ ํ์คํฌ์ ๋์ผํ๊ฒ ๊ณต์ ๋์ด ์ฌ์ฉ๋๊ธฐ ๋๋ฌธ
- ๋จ์ ์ผ๋ก ๊ฐ ํ์คํฌ๋ง๋ค GlobalKTable ๋ก ์ ์๋ ๋ชจ๋ ๋ฐ์ดํฐ๋ฅผ ์ ์ฅ ๋ฐ ์ฌ์ฉํ๋ฏ๋ก, ์คํธ๋ฆผ์ฆ ์ ํ๋ฆฌ์ผ์ด์ ์ ๋ก์ปฌ ์คํ ๋ฆฌ์ง ์ฌ์ฉ๋ ์ฆ๊ฐ
- ๋คํธ์ํฌ, ๋ธ๋ก์ปค์ ๊ณผ๋ถํ
- ์์ ์ฉ๋์ ๋ฐ์ดํฐ์ผ ๊ฒฝ์ฐ์๋ง GlobalKTable ์ฌ์ฉ ๊ถ์ฅ.
์ฃผ์ ์ต์
ํ์ ์ต์
- bootstrap.servers
- application.id
์ ํ ์ต์
default.key.serde : ๋ ์ฝ๋์ ๋ฉ์์ง ํค๋ฅผ ์ง๋ ฌํ, ์ญ์ง๋ ฌํํ๋ ํด๋์ค๋ฅผ ์ง์ (๊ธฐ๋ณธ๊ฐ : ๋ฐ์ดํธ ์ง๋ ฌํ)
num.stream.threads : ์คํธ๋ฆผ ํ๋ก์ธ์ฑ ์คํ ์ ์คํ๋ ์ค๋ ๋ ๊ฐ์๋ฅผ ์ง์ (๊ธฐ๋ณธ๊ฐ : 1)
state.dir : ์ํ๊ธฐ๋ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ฅผ ํ ๋ ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ ๋๋ ํ ๋ฆฌ ์ง์ (๊ธฐ๋ณธ๊ฐ : tmp/kafka-streams)
ํ๋ก์ธ์ API
- ๋ฐ์ดํฐ ์ฒ๋ฆฌ, ๋ถ๊ธฐ, Join ๊ธฐ๋ฅ ์ด์ธ์ ์ถ๊ฐ์ ์ธ ์์ธ ๋ก์ง ๊ตฌํ ์ ํ์ฉ ๊ฐ๋ฅ
- ์คํธ๋ฆผ์ฆ DSL ์์ ์ฌ์ฉํ KStream, KTable, GlobalKTable ๊ฐ๋ ์ด ์๋ค๋ ์ ์ ์ ์
Simple Kafka Processor ์์ ์ฝ๋
public class SimpleKafkaProcessor {
private static String APPLICATION_NAME = "processor-application";
private static String BOOTSTRAP_SERVERS = "my-kafka:9092";
private static String STREAM_LOG = "stream_log";
private static String STREAM_LOG_FILTER = "stream_log_filter";
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 1. ํ ํด๋ก์ง ์์ฑ
Topology topology = new Topology();
topology.addSource("Source", // 2. ํด๋น ํ ํฝ์ ๊ฐ์ ธ์ค๊ธฐ ์ํด ํจ์ ํธ์ถ
STREAM_LOG)
.addProcessor("Process", // 3. ์คํธ๋ฆผ ํ๋ก์ธ์ ์ฌ์ฉ, ์ด๋ ๋ค์ ํ๋ก์ธ์๋ Process ์คํธ๋ฆผ ํ๋ก์ธ์ค
() -> new FilterProcessor(),
"Source")
.addSink("Sink", // 4. ์ฑํฌ ํ๋ก์ธ์๋ก ์ฌ์ฉํ์ฌ ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๊ธฐ ์ํ ํจ์ ํธ์ถ
STREAM_LOG_FILTER,
"Process");
// 5. ํด๋น ํ ํด๋ก์ง๋ฅผ KafkaStreams ์ธ์คํด์ค์ ๋ฃ๊ณ , ์คํธ๋ฆผ์ฆ ์์ฑ ๋ฐ ์คํ
KafkaStreams streaming = new KafkaStreams(topology, props);
streaming.start();
}
}