์นดํ์นด ํด๋ผ์ด์ธํธ
- ์นดํ์นด ํด๋ฌ์คํฐ์ ๋ช ๋ น์ ๋ด๋ฆฌ๊ฑฐ๋ ๋ฐ์ดํฐ๋ฅผ ์ก์์ ํ๊ธฐ ์ํด ์ฌ์ฉํ๋ค.
- ์นดํ์นด ํ๋ก๋์, ์ปจ์๋จธ, ์ด๋๋ฏผ ํด๋ผ์ด์ธํธ๋ฅผ ์ ๊ณตํจ.
- ์์ฐํ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ด๋ฏ๋ก, ํ๋ ์์ํฌ๋ ์ ํ๋ฆฌ์ผ์ด์ ์์ ๊ตฌํ ํ ์คํ.
ํ๋ก๋์ API
- ์นดํ์นด์ ํ์ํ ๋ฐ์ดํฐ๋ฅผ ์ ์ธํ๊ณ ๋ธ๋ก์ปค์ ํน์ ํ ํฝ์ ํํฐ์ ์ ์ ์ก.
- ๋ฐ์ดํฐ๋ฅผ ์ ์กํ ๋ ๋ฆฌ๋ ํํฐ์ ์์ ์ ์นดํ์นด ๋ธ๋ก์ปค์ ์ง์ ํต์ ํ๋ค.
์ฃผ์ ํน์ง
- ์นด๋ธ์นด ๋ธ๋ก์ปค๋ก ๋ฐ์ดํฐ ์ ์ก ์, ๋ด๋ถ์ ์ผ๋ก ํํฐ์
๋ ๋ฐ ๋ฐฐ์น ์์ฑ ๋จ๊ณ๋ฅผ ๊ฑฐ์นจ.
- UniformStickyPartitioner ์ RoundRobinPartitioner ์ 2๊ฐ ํํฐ์ ์ ์ ๊ณตํจ.
- ๋ฉ์์ง ํค ์ ๋ฌด์์ ์ฐจ์ด์ ์ด ๋ฐ์ํ๋ค.
- ํ๋ก๋์ ๋์์ ํนํ๋ UniformStickyPartitioner๋ ๋์ ์ฒ๋ฆฌ๋ ๋ฐ ๋ฎ์ ๋ฆฌ์์ค ์ฌ์ฉ๋ฅ ์ ๊ฐ์ง. => 2.4 ๋ฒ์ ์ดํ๋ ์ด ํํฐ์ ์ด ๋ํดํธ
- Accumulator ์์ ๋ฐ์ดํฐ๊ฐ batch ๋ก ๋ชจ๋ ๋ฌถ์ผ ๋๊น์ง ๊ธฐ๋ค๋ฆฐ ํ ๋ชจ๋ ๋์ผํ ํํฐ์ ์ ์ ์กํ๋ ๋ฐฉ์์ผ๋ก RoundRobinPartioner๋ณด๋ค ํฅ์๋ ์ฑ๋ฅ์ผ๋ก ๊ฐ์ ํ๊ฒ ๋์๋ค.
- ๋ธ๋ก์ปค๋ก ๋ฐ์ดํฐ๋ฅผ ์ ์ก ์ ์์ถ ์ต์
์ ์ ํ ๊ฐ๋ฅ
- ์์ถ ๋์ง ์์ ์ํ๋ก ์ ์กํ๋ ค๋ฉด ์ต์ ์ ์ง์ ํ์ง ์์ผ๋ฉด ๋๋ค.
- ์์ถ ์ ์ฅ์ ์, ๋คํธ์ํฌ ์ฒ๋ฆฌ๋์ ์ด๋,
- ๋จ์ ์, CPU ๋ Memory Resource ๋ฅผ ์ฌ์ฉํ๋ฏ๋ก ํ๊ฒฝ์ ๋ฐ๋ผ ์ง์ ํ์~
์ฃผ์ ์ต์
ํ์ ์ต์
- bootstrap-servers
- key.serializer
- value.serializer
์ ํ ์ต์
- acks
- buffer.memory
- batch.size
- linger.ms
- partitioner.class
- enable.idempotence
- transactional.id
์นดํ์นด ๋ธ๋ก์ปค์ ๋ฐ์ดํฐ ์ ์ ์ ์ก ์ฌ๋ถ ํ์ธ
- KafkaProducerdml send() ๋ฉ์๋๋ Future ๊ฐ์ฒด ๋ฐํ
- RecordMetadata์ ๋น๋๊ธฐ ๋ฐ์ดํฐ ๊ฒฐ๊ณผ๋ฅผ ๊ฐ์ง๋ค.
- ProducerRecord๊ฐ ๋ธ๋ก์ปค์ ์ ์ ์ ์ฅ ๋์๋์ง์ ๋ํ ๋ฐ์ดํฐ๋ฅผ ํฌํจํ๋ค.
- org.apache.kafka.clients.producer.Callback ๋ฉ์๋๋ก ์ ์ก ์ฌ๋ถ ํ์ธ ๊ฐ๋ฅ.
์ปจ์๋จธ API
- ์ปจ์๋จธ๋ ์ ์ฌ๋ ๋ฐ์ดํฐ๋ฅผ ์ฌ์ฉํ๊ธฐ ์ํด ๋ธ๋ก์ปค๋ก๋ถํฐ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์์ ํ์ํ ์ฒ๋ฆฌ๋ฅผ ํ๋ค.
์ค์ ๊ฐ๋
ํ ํฝ์ ํํฐ์ ์ผ๋ก๋ถํฐ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ๊ฐ๋ ๋ฐฉ๋ฒ์ 2๊ฐ์ง.
- ์ปจ์๋จธ ๊ทธ๋ฃน์ ์ด์ํ๋ ๋ฐฉ๋ฒ
- ํ ํฝ์ ํน์ ํํฐ์ ๋ง ๊ตฌ๋ ํ๋ ์ปจ์๋จธ๋ฅผ ์ด์ํ๋ ๋ฐฉ๋ฒ
์ปจ์๋จธ ๊ทธ๋ฃน์ ๋ค๋ฅธ ๊ทธ๋ฃน๊ณผ ๊ฒฉ๋ฆฌ๋๋ ํน์ง์ด ์๋ค.
์นดํ์นด ํ๋ก๋์๊ฐ ๋ณด๋ธ ๋ฐ์ดํฐ๋ฅผ ๊ฐ๊ธฐ ๋ค๋ฅธ ์ญํ ์ ํ๋ ์ปจ์๋จธ ๊ทธ๋ฃน๋ผ๋ฆฌ ์ํฅ์ ๋ฐ์ง ์๊ฒ ์ฒ๋ฆฌํ ์ ์๋ค๋ ์ฅ์ ์ด ์๋ค.
์ปจ์๋จธ ๊ทธ๋ฃน์ผ๋ก ์ด๋ฃจ์ด์ง ์ปจ์๋ค ์ค, ์ผ๋ถ ์ปจ์๋จธ ์ฅ์ ๋ฐ์ ์, ์ฅ์ ๊ฐ ๋ฐ์ํ์ง ์์ ์ปจ์๋จธ์ ์ฅ์ ์ปจ์๋จธ์ ๋ด๋ถ์ ํํฐ์ ์ ์์ ๊ถ์ด ๋์ด๊ฐ๋ค.
- Rebalancing
- ์ปจ์๋จธ ์ค 1๊ฐ์ ์ด์๊ฐ ๋ฐ์ํ์ฌ ๋๋ ๋์ํ์ง ์์ ๊ฒฝ์ฐ
- ์ปจ์๋จธ๊ฐ ์ ์ธ๋๋ ์ํฉ (์ฅ์ ์ํฉ)
- ์ ์ฉํ์ง๋ง ์์ฃผ ๋ฐ์ํ๋ ๊ฒ์ ์ข์ง ์๋ค.
- ํํฐ์ ์ ์์ ๊ถ์ ๋ค๋ฅธ ์ปจ์๋จธ๋ก ์ฌํ ๋นํ๋ ๊ณผ์ ์์ ํด๋น ์ปจ์๋จธ ๊ทธ๋ฃน์ ์ปจ์๋จธ๋ค์ด ํ ํฝ์ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ ์ ์๋ค!
- Rebalancing
ํน์ง
- ์นดํ์นด ๋ธ๋ก์ปค๋ก๋ถํฐ ๋ฐ์ดํฐ๋ฅผ ์ด๋๊น์ง ๊ฐ์ก๋์ง, commit ์ ํตํด ๊ธฐ๋ก
- ํน์ ํ ํฝ์ ํํฐ์ ์ ์ด๋ค ์ปจ์๋จธ ๊ทธ๋ฃน์ด ๋ช ๋ฒ์งธ๊น์ง ๊ฐ์ ธ๊ฐ๋์ง ๋ด๋ถ์์ ์ฌ์ฉํ๋ ํ ํฝ (__consumer_offsets) ์ ๊ธฐ๋ก๋จ.
- ์ปจ์๋จธ ๋์ ์ด์๋ก ์ธํด ํด๋น ๋ด๋ถ ํ ํฝ์ ์ด๋ ์ง์ ๊น์ง ์ฝ์๋์ง ๊ธฐ๋ก์ด ์ ๋ ๊ฒฝ์ฐ, ๋ฐ์ดํฐ ์ฒ๋ฆฌ์ ์ค๋ณต ๋ฐ์ -> offset commit ์ ์ ์ ์ฒ๋ฆฌ ์ฌ๋ถ๋ฅผ ๊ฒ์ฆํด์ผ ํจ.
- offset commit ์ ์ปจ์๋จธ ์ ํ๋ฆฌ์ผ์ด์ ์์ ๋ช ์์ , ๋น๋ช ์์ ์ผ๋ก ์ํ ๊ฐ๋ฅ.
- ๊ธฐ๋ณธ ์ต์
์ poll() ๋ฉ์๋ ์ํ ์, ์ผ์ ๊ฐ๊ฒฉ์ผ๋ก offset commit ์ ์ํํ๋๋ก ์ค์ ๋์ด ์๋ค.
auto.commit.interval.ms
์ ๊ฐ์ด ์ฌ์ฉ- ์ด์์ ์๊ฐ์ด ํ๋ ์ ๊ฒฝ์ฐ, ํด๋น ์์ ๊น์ง ์ฝ์ ๋ ์ฝ๋์ offset commit.
- ๋น๋ช ์์ commit ์ poll() ๋ฉ์๋ ํธ์ถ ์ดํ, Rebalancing ์ด๋ ์ปจ์๋จธ์ ๊ฐ์ ์ข ๋ฃ ๋ฐ์ ์ ์ฒ๋ฆฌํ๋ ๋ฐ์ดํฐ์ ์ค๋ณต ๋๋ ์ ์ค ๊ฐ๋ฅ์ฑ์ ๋งค์ฐ ์ทจ์ฝํ๋ค.
- ๋ฐ์ดํฐ ์ค๋ณต, ์ ์ค์ ํ์ฉํ์ง ์๋ ์๋น์ค ๊ฐ๋ฐ ์ auto commit ์ ํ์ง๋ง๋ผ!
์ฃผ์ ์ต์
ํ์ ์ต์
- bootstrap-servers
- key.serializer
- value.serializer
์ ํ ์ต์
- group.id
- auto.offset.reset : ์ปจ์๋จธ ๊ทธ๋ฃน์ด ํน์ ํํฐ์ ์ ์ฝ์ ๋ ์ ์ฅ๋ ์ปจ์๋จธ ์คํ์ ์ด ์๋ ๊ฒฝ์ฐ ์ด๋ ์คํ์ ๋ถํฐ ์ฝ์์ง ์ ํํ๋ ์ต์
- enable.auto.commit : ์๋ ์ปค๋ฐ์ผ๋ก ํ ์ง ์๋ ์ปค๋ฐ์ผ๋ก ํ ์ง ์ ํ (๊ธฐ๋ณธ๊ฐ : true)
- auto.commit.interval.ms
- max.poll.records : poll() ๋ฉ์๋๋ฅผ ํตํด ๋ฐํ๋๋ ๋ ์ฝ๋ ๊ฐ์๋ฅผ ์ง์ (๊ธฐ๋ณธ๊ฐ : 500)
- session.timeout.ms : ์ปจ์๋จธ๊ฐ ๋ธ๋ก์ปค์ ์ฐ๊ฒฐ์ด ๋๊ธฐ๋ ์ต๋ ์๊ฐ, ์ด ์๊ฐ ๋ด์ hearbeat ๋ฅผ ์ ์กํ์ง ์์ผ๋ฉด ๋ธ๋ก์ปค๋ ์ปจ์๋จธ์ ์ด์๊ฐ ๋ฐ์ํ๋ค๊ณ ๊ฐ์ ํ๊ณ ๋ฆฌ๋ฐธ๋ฐ์ฑ ์์
- hearbeat.interval.ms
- max.poll.interval.ms
- isolation.level
๋๊ธฐ, ๋น๋๊ธฐ offset commit
- ๋ ๋ง์ ๋ฐ์ดํฐ์ ์ฒ๋ฆฌ๋ฅผ ์ํด์ ๋น๋๊ธฐ offset commit ์ ๊ณ ๋ คํด๋ณด์~
Rebalance Listener ๋ฅผ ๊ฐ์ง ์ปจ์๋จธ
- ์ปจ์๋จธ ๊ทธ๋ฃน ๋ด์ ์ปจ์๋จธ ์ถ๊ฐ ๋ฐ ์ ๊ฑฐ ์ ํํฐ์ ์ ์ปจ์๋จธ์ ์ฌํ ๋นํ๋ Rebalance ๋ฐ์.
- poll() ๋ฉ์๋๋ฅผ ํตํด ๋ฐ์ ๋ฐ์ดํฐ๋ฅผ ๋ชจ๋ ์ฒ๋ฆฌํ๊ธฐ ์ ์ ๋ฆฌ๋ฐธ๋ฐ์ค ๋ฐ์ ์ ๋ฐ์ดํฐ ์ค๋ณต ์ฒ๋ฆฌ๊ฐ ๋ฐ์ํ๋ค.
- ๋ฐ์ดํฐ ์ผ๋ถ๋ฅผ ์ฒ๋ฆฌํ์ผ๋, commit ํ์ง ์์๊ธฐ ๋๋ฌธ์ด๋ค..
- Rebalance ๋ฐ์์ ๊ฐ์งํ๊ธฐ ์ํด ์นดํ์นด๋ ConsumerRebalanceListener (Interface) ์ง์.
- onPartitionAssigned() : Rebalance ํ, ํํฐ์ ํ ๋น ์๋ฃ ์ ํธ์ถ
- onPartitionRevoked() : Rebalance ์ง์ ํธ์ถ
- ๋ง์ง๋ง์ผ๋ก ์ฒ๋ฆฌํ ๋ ์ฝ๋๋ฅผ ๊ธฐ์ค์ผ๋ก ์ปค๋ฐํ๊ธฐ ์ํด์ Rebalance ์์ ์ , commit ํ๋ฉด ๋๋ฏ๋ก, onPartitionAssigned() ํจ์์ ์ปค๋ฐ์ ๊ตฌํํ์ฌ ์ฒ๋ฆฌํ ์ ์๋ ๋ฐฉ๋ฒ์ด ์๋ค.
์ปจ์๋จธ์ ์์ ํ ์ข ๋ฃ
- ์ ์ ์ข
๋ฃ๊ฐ ๋์ง ์์ ์ปจ์๋จธ๋ ์ธ์
ํ์์์ ๋ฐ์์๊น์ง ์ปจ์๋จธ ๊ทธ๋ฃน์ ์์ฌํ๋ค.
- ๋๊ณ ์๋ ์ปจ์๋จธ๊ฐ ์๊น.
- ์ปจ์๋จธ ๋์ด ๋์ด๋๋ฉด ๋ฐ์ดํฐ ์ฒ๋ฆฌ ์ง์ฐ์ด ๋ฐ์ํ๋ค.
- ์์ ํ๊ฒ ์ข ๋ฃํ๊ธฐ ์ํด wakeUp() ๋ฉ์๋ ํ์ฉ ๊ฐ๋ฅ
- ํด๋น ๋ฉ์๋์ ์คํ ์ดํ poll() ๋ฉ์๋ ๋ฐ์ ์ WakeUpException ๋ฐ์~
์ด๋๋ฏผ API
- ์ค์ ์ด์ ํ๊ฒฝ์์๋ ์นดํ์นด์ ์ค์ ๋ ๋ด๋ถ ์ต์ ์ ์ค์ ํ๊ณ ํ์ธํ๋ ๊ฒ์ด ๋งค์ฐ ์ค์ํ๋ค.
- ๋ธ๋ก์ปค ์ค ํ ๋์ ์ ์ ํ ์ต์ ์ ํ์ธ ํ ์ ์์ง๋ง ๋งค์ฐ ๋ฒ๊ฑฐ๋ก์.
- ์ด๋ AdminClient ํด๋์ค๋ฅผ ์ฌ์ฉํ ์ ์๋ค.
์ฃผ์ ์กฐํ ๊ฐ๋ฅ ๋ฆฌ์คํธ
- ๋ธ๋ก์ปค ์ ๋ณด ์กฐํ
- ํ ํฝ ๋ฆฌ์คํธ ์กฐํ
- ์ปจ์๋จธ ๊ทธ๋ฃน ์กฐํ
- ์ ๊ท ํ ํฝ ์์ฑ
- ํํฐ์ ๊ฐ์ ๋ณ๊ฒฝ
- ์ ๊ทผ ์ ์ด ๊ท์น ์์ฑ
๋ฐ์ํ