๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ
๐Ÿ“จ Apache Kafka

[์•„ํŒŒ์น˜ ์นดํ”„์นด ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ํ”„๋กœ๊ทธ๋ž˜๋ฐ with ์ž๋ฐ”] 3.4 ์žฅ ์นดํ”„์นด ํด๋ผ์ด์–ธํŠธ

by GroovyArea 2023. 4. 9.

์นดํ”„์นด ํด๋ผ์ด์–ธํŠธ

  • ์นดํ”„์นด ํด๋Ÿฌ์Šคํ„ฐ์— ๋ช…๋ น์„ ๋‚ด๋ฆฌ๊ฑฐ๋‚˜ ๋ฐ์ดํ„ฐ๋ฅผ ์†ก์ˆ˜์‹  ํ•˜๊ธฐ ์œ„ํ•ด ์‚ฌ์šฉํ•œ๋‹ค.
  • ์นดํ”„์นด ํ”„๋กœ๋“€์„œ, ์ปจ์Šˆ๋จธ, ์–ด๋“œ๋ฏผ ํด๋ผ์ด์–ธํŠธ๋ฅผ ์ œ๊ณตํ•จ.
  • ์—„์—ฐํ•œ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์ด๋ฏ€๋กœ, ํ”„๋ ˆ์ž„์›Œํฌ๋‚˜ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์—์„œ ๊ตฌํ˜„ ํ›„ ์‹คํ–‰.

ํ”„๋กœ๋“€์„œ API

  • ์นดํ”„์นด์— ํ•„์š”ํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ์„ ์–ธํ•˜๊ณ  ๋ธŒ๋กœ์ปค์˜ ํŠน์ • ํ† ํ”ฝ์˜ ํŒŒํ‹ฐ์…˜์— ์ „์†ก.
  • ๋ฐ์ดํ„ฐ๋ฅผ ์ „์†กํ•  ๋•Œ ๋ฆฌ๋” ํŒŒํ‹ฐ์…˜ ์†Œ์œ ์˜ ์นดํ”„์นด ๋ธŒ๋กœ์ปค์™€ ์ง์ ‘ ํ†ต์‹ ํ•œ๋‹ค.

์ฃผ์š” ํŠน์ง•

  • ์นด๋ธŒ์นด ๋ธŒ๋กœ์ปค๋กœ ๋ฐ์ดํ„ฐ ์ „์†ก ์‹œ, ๋‚ด๋ถ€์ ์œผ๋กœ ํŒŒํ‹ฐ์…”๋„ˆ ๋ฐ ๋ฐฐ์น˜ ์ƒ์„ฑ ๋‹จ๊ณ„๋ฅผ ๊ฑฐ์นจ.
    • UniformStickyPartitioner ์™€ RoundRobinPartitioner ์˜ 2๊ฐœ ํŒŒํ‹ฐ์…˜์„ ์ œ๊ณตํ•จ.
    • ๋ฉ”์‹œ์ง€ ํ‚ค ์œ ๋ฌด์—์„œ ์ฐจ์ด์ ์ด ๋ฐœ์ƒํ•œ๋‹ค.
    • ํ”„๋กœ๋“€์„œ ๋™์ž‘์— ํŠนํ™”๋œ UniformStickyPartitioner๋Š” ๋†’์€ ์ฒ˜๋ฆฌ๋Ÿ‰ ๋ฐ ๋‚ฎ์€ ๋ฆฌ์†Œ์Šค ์‚ฌ์šฉ๋ฅ ์„ ๊ฐ€์ง. => 2.4 ๋ฒ„์ „ ์ดํ›„๋Š” ์ด ํŒŒํ‹ฐ์…˜์ด ๋””ํดํŠธ
    • Accumulator ์—์„œ ๋ฐ์ดํ„ฐ๊ฐ€ batch ๋กœ ๋ชจ๋‘ ๋ฌถ์ผ ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ฆฐ ํ›„ ๋ชจ๋‘ ๋™์ผํ•œ ํŒŒํ‹ฐ์…˜์— ์ „์†กํ•˜๋Š” ๋ฐฉ์‹์œผ๋กœ RoundRobinPartioner๋ณด๋‹ค ํ–ฅ์ƒ๋œ ์„ฑ๋Šฅ์œผ๋กœ ๊ฐœ์„ ํ•˜๊ฒŒ ๋˜์—ˆ๋‹ค.
  • ๋ธŒ๋กœ์ปค๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ „์†ก ์‹œ ์••์ถ• ์˜ต์…˜์„ ์„ ํƒ ๊ฐ€๋Šฅ
    • ์••์ถ• ๋˜์ง€ ์•Š์€ ์ƒํƒœ๋กœ ์ „์†กํ•˜๋ ค๋ฉด ์˜ต์…˜์„ ์ง€์ •ํ•˜์ง€ ์•Š์œผ๋ฉด ๋œ๋‹ค.
    • ์••์ถ• ์‹œ ์žฅ์ ์€, ๋„คํŠธ์›Œํฌ ์ฒ˜๋ฆฌ๋Ÿ‰์— ์ด๋“,
    • ๋‹จ์ ์€, CPU ๋‚˜ Memory Resource ๋ฅผ ์‚ฌ์šฉํ•˜๋ฏ€๋กœ ํ™˜๊ฒฝ์— ๋”ฐ๋ผ ์ง€์ •ํ•˜์ž~

์ฃผ์š” ์˜ต์…˜

ํ•„์ˆ˜ ์˜ต์…˜

  • bootstrap-servers
  • key.serializer
  • value.serializer

์„ ํƒ ์˜ต์…˜

  • acks
  • buffer.memory
  • batch.size
  • linger.ms
  • partitioner.class
  • enable.idempotence
  • transactional.id

์นดํ”„์นด ๋ธŒ๋กœ์ปค์˜ ๋ฐ์ดํ„ฐ ์ •์ƒ ์ „์†ก ์—ฌ๋ถ€ ํ™•์ธ

  • KafkaProducerdml send() ๋ฉ”์„œ๋“œ๋Š” Future ๊ฐ์ฒด ๋ฐ˜ํ™˜
    • RecordMetadata์˜ ๋น„๋™๊ธฐ ๋ฐ์ดํ„ฐ ๊ฒฐ๊ณผ๋ฅผ ๊ฐ€์ง„๋‹ค.
    • ProducerRecord๊ฐ€ ๋ธŒ๋กœ์ปค์— ์ •์ƒ ์ €์žฅ ๋˜์—ˆ๋Š”์ง€์— ๋Œ€ํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ํฌํ•จํ•œ๋‹ค.
    • org.apache.kafka.clients.producer.Callback ๋ฉ”์„œ๋“œ๋กœ ์ „์†ก ์—ฌ๋ถ€ ํ™•์ธ ๊ฐ€๋Šฅ.

์ปจ์Šˆ๋จธ API

  • ์ปจ์Šˆ๋จธ๋Š” ์ ์žฌ๋œ ๋ฐ์ดํ„ฐ๋ฅผ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด ๋ธŒ๋กœ์ปค๋กœ๋ถ€ํ„ฐ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์™€์„œ ํ•„์š”ํ•œ ์ฒ˜๋ฆฌ๋ฅผ ํ•œ๋‹ค.

์ค‘์š” ๊ฐœ๋…

  • ํ† ํ”ฝ์˜ ํŒŒํ‹ฐ์…˜์œผ๋กœ๋ถ€ํ„ฐ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ๊ฐ€๋Š” ๋ฐฉ๋ฒ•์€ 2๊ฐ€์ง€.

    • ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน์„ ์šด์˜ํ•˜๋Š” ๋ฐฉ๋ฒ•
    • ํ† ํ”ฝ์˜ ํŠน์ • ํŒŒํ‹ฐ์…˜๋งŒ ๊ตฌ๋…ํ•˜๋Š” ์ปจ์Šˆ๋จธ๋ฅผ ์šด์˜ํ•˜๋Š” ๋ฐฉ๋ฒ•
  • ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน์€ ๋‹ค๋ฅธ ๊ทธ๋ฃน๊ณผ ๊ฒฉ๋ฆฌ๋˜๋Š” ํŠน์ง•์ด ์žˆ๋‹ค.

  • ์นดํ”„์นด ํ”„๋กœ๋“€์„œ๊ฐ€ ๋ณด๋‚ธ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ๊ธฐ ๋‹ค๋ฅธ ์—ญํ• ์„ ํ•˜๋Š” ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน๋ผ๋ฆฌ ์˜ํ–ฅ์„ ๋ฐ›์ง€ ์•Š๊ฒŒ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค๋Š” ์žฅ์ ์ด ์žˆ๋‹ค.

  • ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน์œผ๋กœ ์ด๋ฃจ์–ด์ง„ ์ปจ์Šˆ๋“ค ์ค‘, ์ผ๋ถ€ ์ปจ์Šˆ๋จธ ์žฅ์•  ๋ฐœ์ƒ ์‹œ, ์žฅ์• ๊ฐ€ ๋ฐœ์ƒํ•˜์ง€ ์•Š์€ ์ปจ์Šˆ๋จธ์— ์žฅ์•  ์ปจ์Šˆ๋จธ์˜ ๋‚ด๋ถ€์˜ ํŒŒํ‹ฐ์…˜์˜ ์†Œ์œ ๊ถŒ์ด ๋„˜์–ด๊ฐ„๋‹ค.

    • Rebalancing
      • ์ปจ์Šˆ๋จธ ์ค‘ 1๊ฐœ์— ์ด์Šˆ๊ฐ€ ๋ฐœ์ƒํ•˜์—ฌ ๋”๋Š” ๋™์ž‘ํ•˜์ง€ ์•Š์„ ๊ฒฝ์šฐ
      • ์ปจ์Šˆ๋จธ๊ฐ€ ์ œ์™ธ๋˜๋Š” ์ƒํ™ฉ (์žฅ์•  ์ƒํ™ฉ)
    • ์œ ์šฉํ•˜์ง€๋งŒ ์ž์ฃผ ๋ฐœ์ƒํ•˜๋Š” ๊ฒƒ์€ ์ข‹์ง€ ์•Š๋‹ค.
    • ํŒŒํ‹ฐ์…˜์˜ ์†Œ์œ ๊ถŒ์„ ๋‹ค๋ฅธ ์ปจ์Šˆ๋จธ๋กœ ์žฌํ• ๋‹นํ•˜๋Š” ๊ณผ์ •์—์„œ ํ•ด๋‹น ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน์˜ ์ปจ์Šˆ๋จธ๋“ค์ด ํ† ํ”ฝ์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์„ ์ˆ˜ ์—†๋‹ค!

ํŠน์ง•

  • ์นดํ”„์นด ๋ธŒ๋กœ์ปค๋กœ๋ถ€ํ„ฐ ๋ฐ์ดํ„ฐ๋ฅผ ์–ด๋””๊นŒ์ง€ ๊ฐ€์กŒ๋Š”์ง€, commit ์„ ํ†ตํ•ด ๊ธฐ๋ก
  • ํŠน์ • ํ† ํ”ฝ์˜ ํŒŒํ‹ฐ์…˜์„ ์–ด๋–ค ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน์ด ๋ช‡ ๋ฒˆ์งธ๊นŒ์ง€ ๊ฐ€์ ธ๊ฐ”๋Š”์ง€ ๋‚ด๋ถ€์—์„œ ์‚ฌ์šฉํ•˜๋Š” ํ† ํ”ฝ (__consumer_offsets) ์— ๊ธฐ๋ก๋จ.
  • ์ปจ์Šˆ๋จธ ๋™์ž‘ ์ด์Šˆ๋กœ ์ธํ•ด ํ•ด๋‹น ๋‚ด๋ถ€ ํ† ํ”ฝ์— ์–ด๋А ์ง€์ ๊นŒ์ง€ ์ฝ์—ˆ๋Š”์ง€ ๊ธฐ๋ก์ด ์•ˆ ๋  ๊ฒฝ์šฐ, ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ์˜ ์ค‘๋ณต ๋ฐœ์ƒ -> offset commit ์˜ ์ •์ƒ ์ฒ˜๋ฆฌ ์—ฌ๋ถ€๋ฅผ ๊ฒ€์ฆํ•ด์•ผ ํ•จ.
  • offset commit ์€ ์ปจ์Šˆ๋จธ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์—์„œ ๋ช…์‹œ์ , ๋น„๋ช…์‹œ์ ์œผ๋กœ ์ˆ˜ํ–‰ ๊ฐ€๋Šฅ.
  • ๊ธฐ๋ณธ ์˜ต์…˜์€ poll() ๋ฉ”์„œ๋“œ ์ˆ˜ํ–‰ ์‹œ, ์ผ์ • ๊ฐ„๊ฒฉ์œผ๋กœ offset commit ์„ ์ˆ˜ํ–‰ํ•˜๋„๋ก ์„ค์ •๋˜์–ด ์žˆ๋‹ค.
    • auto.commit.interval.ms ์™€ ๊ฐ™์ด ์‚ฌ์šฉ
    • ์ด์ƒ์˜ ์‹œ๊ฐ„์ด ํ˜๋ €์„ ๊ฒฝ์šฐ, ํ•ด๋‹น ์‹œ์ ๊นŒ์ง€ ์ฝ์€ ๋ ˆ์ฝ”๋“œ์˜ offset commit.
  • ๋น„๋ช…์‹œ์  commit ์€ poll() ๋ฉ”์„œ๋“œ ํ˜ธ์ถœ ์ดํ›„, Rebalancing ์ด๋‚˜ ์ปจ์Šˆ๋จธ์˜ ๊ฐ•์ œ ์ข…๋ฃŒ ๋ฐœ์ƒ ์‹œ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐ์ดํ„ฐ์˜ ์ค‘๋ณต ๋˜๋Š” ์œ ์‹ค ๊ฐ€๋Šฅ์„ฑ์— ๋งค์šฐ ์ทจ์•ฝํ•˜๋‹ค.
  • ๋ฐ์ดํ„ฐ ์ค‘๋ณต, ์œ ์‹ค์„ ํ—ˆ์šฉํ•˜์ง€ ์•Š๋Š” ์„œ๋น„์Šค ๊ฐœ๋ฐœ ์‹œ auto commit ์€ ํ•˜์ง€๋งˆ๋ผ!

์ฃผ์š” ์˜ต์…˜

ํ•„์ˆ˜ ์˜ต์…˜

  • bootstrap-servers
  • key.serializer
  • value.serializer

์„ ํƒ ์˜ต์…˜

  • group.id
  • auto.offset.reset : ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน์ด ํŠน์ • ํŒŒํ‹ฐ์…˜์„ ์ฝ์„ ๋•Œ ์ €์žฅ๋œ ์ปจ์Šˆ๋จธ ์˜คํ”„์…‹์ด ์—†๋Š” ๊ฒฝ์šฐ ์–ด๋А ์˜คํ”„์…‹๋ถ€ํ„ฐ ์ฝ์„์ง€ ์„ ํƒํ•˜๋Š” ์˜ต์…˜
  • enable.auto.commit : ์ž๋™ ์ปค๋ฐ‹์œผ๋กœ ํ• ์ง€ ์ˆ˜๋™ ์ปค๋ฐ‹์œผ๋กœ ํ• ์ง€ ์„ ํƒ (๊ธฐ๋ณธ๊ฐ’ : true)
  • auto.commit.interval.ms
  • max.poll.records : poll() ๋ฉ”์„œ๋“œ๋ฅผ ํ†ตํ•ด ๋ฐ˜ํ™˜๋˜๋Š” ๋ ˆ์ฝ”๋“œ ๊ฐœ์ˆ˜๋ฅผ ์ง€์ • (๊ธฐ๋ณธ๊ฐ’ : 500)
  • session.timeout.ms : ์ปจ์Šˆ๋จธ๊ฐ€ ๋ธŒ๋กœ์ปค์™€ ์—ฐ๊ฒฐ์ด ๋Š๊ธฐ๋Š” ์ตœ๋Œ€ ์‹œ๊ฐ„, ์ด ์‹œ๊ฐ„ ๋‚ด์— hearbeat ๋ฅผ ์ „์†กํ•˜์ง€ ์•Š์œผ๋ฉด ๋ธŒ๋กœ์ปค๋Š” ์ปจ์Šˆ๋จธ์— ์ด์Šˆ๊ฐ€ ๋ฐœ์ƒํ–ˆ๋‹ค๊ณ  ๊ฐ€์ •ํ•˜๊ณ  ๋ฆฌ๋ฐธ๋Ÿฐ์‹ฑ ์‹œ์ž‘
  • hearbeat.interval.ms
  • max.poll.interval.ms
  • isolation.level

๋™๊ธฐ, ๋น„๋™๊ธฐ offset commit

  • ๋” ๋งŽ์€ ๋ฐ์ดํ„ฐ์˜ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•ด์„œ ๋น„๋™๊ธฐ offset commit ์„ ๊ณ ๋ คํ•ด๋ณด์ž~

Rebalance Listener ๋ฅผ ๊ฐ€์ง„ ์ปจ์Šˆ๋จธ

  • ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน ๋‚ด์˜ ์ปจ์Šˆ๋จธ ์ถ”๊ฐ€ ๋ฐ ์ œ๊ฑฐ ์‹œ ํŒŒํ‹ฐ์…˜์„ ์ปจ์Šˆ๋จธ์— ์žฌํ• ๋‹นํ•˜๋Š” Rebalance ๋ฐœ์ƒ.
  • poll() ๋ฉ”์„œ๋“œ๋ฅผ ํ†ตํ•ด ๋ฐ›์€ ๋ฐ์ดํ„ฐ๋ฅผ ๋ชจ๋‘ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์ „์— ๋ฆฌ๋ฐธ๋Ÿฐ์Šค ๋ฐœ์ƒ ์‹œ ๋ฐ์ดํ„ฐ ์ค‘๋ณต ์ฒ˜๋ฆฌ๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค.
    • ๋ฐ์ดํ„ฐ ์ผ๋ถ€๋ฅผ ์ฒ˜๋ฆฌํ–ˆ์œผ๋‚˜, commit ํ•˜์ง€ ์•Š์•˜๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค..
  • Rebalance ๋ฐœ์ƒ์„ ๊ฐ์ง€ํ•˜๊ธฐ ์œ„ํ•ด ์นดํ”„์นด๋Š” ConsumerRebalanceListener (Interface) ์ง€์›.
    • onPartitionAssigned() : Rebalance ํ›„, ํŒŒํ‹ฐ์…˜ ํ• ๋‹น ์™„๋ฃŒ ์‹œ ํ˜ธ์ถœ
    • onPartitionRevoked() : Rebalance ์ง์ „ ํ˜ธ์ถœ
    • ๋งˆ์ง€๋ง‰์œผ๋กœ ์ฒ˜๋ฆฌํ•œ ๋ ˆ์ฝ”๋“œ๋ฅผ ๊ธฐ์ค€์œผ๋กœ ์ปค๋ฐ‹ํ•˜๊ธฐ ์œ„ํ•ด์„œ Rebalance ์‹œ์ž‘ ์ „, commit ํ•˜๋ฉด ๋˜๋ฏ€๋กœ, onPartitionAssigned() ํ•จ์ˆ˜์— ์ปค๋ฐ‹์„ ๊ตฌํ˜„ํ•˜์—ฌ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋Š” ๋ฐฉ๋ฒ•์ด ์žˆ๋‹ค.

์ปจ์Šˆ๋จธ์˜ ์•ˆ์ „ํ•œ ์ข…๋ฃŒ

  • ์ •์ƒ ์ข…๋ฃŒ๊ฐ€ ๋˜์ง€ ์•Š์€ ์ปจ์Šˆ๋จธ๋Š” ์„ธ์…˜ ํƒ€์ž„์•„์›ƒ ๋ฐœ์ƒ์‹œ๊นŒ์ง€ ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน์— ์ž”์žฌํ•œ๋‹ค.
    • ๋†€๊ณ  ์žˆ๋Š” ์ปจ์Šˆ๋จธ๊ฐ€ ์ƒ๊น€.
    • ์ปจ์Šˆ๋จธ ๋ž™์ด ๋Š˜์–ด๋‚˜๋ฉด ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ ์ง€์—ฐ์ด ๋ฐœ์ƒํ•œ๋‹ค.
  • ์•ˆ์ „ํ•˜๊ฒŒ ์ข…๋ฃŒํ•˜๊ธฐ ์œ„ํ•ด wakeUp() ๋ฉ”์„œ๋“œ ํ™œ์šฉ ๊ฐ€๋Šฅ
  • ํ•ด๋‹น ๋ฉ”์„œ๋“œ์˜ ์‹คํ–‰ ์ดํ›„ poll() ๋ฉ”์„œ๋“œ ๋ฐœ์ƒ ์‹œ WakeUpException ๋ฐœ์ƒ~

์–ด๋“œ๋ฏผ API

  • ์‹ค์ œ ์šด์˜ ํ™˜๊ฒฝ์—์„œ๋Š” ์นดํ”„์นด์— ์„ค์ •๋œ ๋‚ด๋ถ€ ์˜ต์…˜์„ ์„ค์ •ํ•˜๊ณ  ํ™•์ธํ•˜๋Š” ๊ฒƒ์ด ๋งค์šฐ ์ค‘์š”ํ•˜๋‹ค.
  • ๋ธŒ๋กœ์ปค ์ค‘ ํ•œ ๋Œ€์— ์ ‘์† ํ›„ ์˜ต์…˜์„ ํ™•์ธ ํ•  ์ˆ˜ ์žˆ์ง€๋งŒ ๋งค์šฐ ๋ฒˆ๊ฑฐ๋กœ์›€.
  • ์ด๋•Œ AdminClient ํด๋ž˜์Šค๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

์ฃผ์š” ์กฐํšŒ ๊ฐ€๋Šฅ ๋ฆฌ์ŠคํŠธ

  • ๋ธŒ๋กœ์ปค ์ •๋ณด ์กฐํšŒ
  • ํ† ํ”ฝ ๋ฆฌ์ŠคํŠธ ์กฐํšŒ
  • ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน ์กฐํšŒ
  • ์‹ ๊ทœ ํ† ํ”ฝ ์ƒ์„ฑ
  • ํŒŒํ‹ฐ์…˜ ๊ฐœ์ˆ˜ ๋ณ€๊ฒฝ
  • ์ ‘๊ทผ ์ œ์–ด ๊ทœ์น™ ์ƒ์„ฑ
๋ฐ˜์‘ํ˜•