Spark Streaming Fault-tolerant

Receiver, WAL, Direct Kafka API

Spark์˜ RDD๋Š” ์žฅ์•  ๋ณต๊ตฌ ๊ธฐ๋Šฅ์„ ๊ฐ€์ง€๊ณ  ์žˆ๋‹ค. RDD ๋Š” ๋ณ€๊ฒฝ ๋ถˆ๊ฐ€๋Šฅํ•˜๊ณ , ์—ฐ์‚ฐ ์ˆœ์„œ๋ฅผ ๊ธฐ์–ตํ•˜๊ณ (lineage) ์žˆ์–ด์„œ ๋‹ค์‹œ ๋งŒ๋“ค ์ˆ˜ ์žˆ๋Š” ๋ถ„์‚ฐ ๋ฐ์ดํ„ฐ์…‹์ด๋‹ค. ์›Œ์ปค ๋…ธ๋“œ์˜ ์˜ค๋ฅ˜ ๋•Œ๋ฌธ์— RDD ์˜ ์–ด๋–ค ๋ถ€๋ถ„์ด ์œ ์‹ค๋˜๋ฉด ๊ทธ ๋ถ€๋ถ„์˜ ์—ฐ์‚ฐ ์ˆœ์„œ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋‹ค์‹œ ์ƒ์„ฑํ•  ์ˆ˜ ์žˆ๋‹ค. Spark Streaming์˜ DStream ๋„ RDD๋ฅผ ๊ทผ๊ฐ„์œผ๋กœ ํ•˜๊ธฐ ๋•Œ๋ฌธ์— ๋ณต์›ํ•  ์ˆ˜ ์žˆ๋‹ค.

๋ฐ์ดํ„ฐ ์œ ์‹ค ๋ฌธ์ œ

Spark๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ ์žฅ์•  ๋ณต๊ตฌ ๊ธฐ๋Šฅ์„ ๊ฐ€์ง€๊ณ  ์žˆ์–ด์„œ HDFS, S3 ๋“ฑ์„ ๋ฐ์ดํ„ฐ ์†Œ์Šค๋กœ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ์—๋Š” ๋ฐ์ดํ„ฐ๊ฐ€ ์œ ์‹ค๋˜์ง€ ์•Š๋Š”๋‹ค. ํ•˜์ง€๋งŒ ๋„คํŠธ์›Œํฌ๋ฅผ ํ†ตํ•ด์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์‹ ํ•˜๋Š” ๊ฒฝ์šฐ(์˜ˆ, kafka, flume)์—๋Š” ๋ฐ์ดํ„ฐ๊ฐ€ ์œ ์‹ค๋  ์ˆ˜ ์žˆ๋‹ค.

Spark Streaming ์—์„œ Receiver๋ฅผ ์‚ฌ์šฉํ•  ๊ฒฝ์šฐ, Receiver๋Š” ๋ฐ์ดํ„ฐ ์œ ์‹ค์„ ๋ง‰๊ธฐ ์œ„ํ•ด์„œ ๋„คํŠธ์›Œํฌ๋ฅผ ํ†ตํ•ด์„œ ๋ฐ›์€ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค๋ฅธ ๋…ธ๋“œ์— ๋ณต์ œํ•˜๊ธฐ ๋•Œ๋ฌธ์—(๋””ํดํŠธ ๋ณต์ œ ์ธ์ž: 2), ์›Œ์ปค ๋…ธ๋“œ๊ฐ€ ์‹คํŒจํ•  ๋•Œ ๋ณต์‚ฌ๋ณธ์œผ๋กœ ์œ ์‹ค๋œ ๋ฐ์ดํ„ฐ๋ฅผ ๋ณต๊ตฌํ•  ์ˆ˜ ์žˆ๋‹ค.

๊ทธ๋Ÿฌ๋‚˜ Receiver๊ฐ€ ์žˆ๋Š” ์›Œ์ปค ๋…ธ๋“œ์—์„œ ์žฅ์• ๊ฐ€ ๋ฐœ์ƒํ•  ๊ฒฝ์šฐ์—๋Š” Receiver์—์„œ ๋‹ค๋ฅธ ์›Œ์ปค๋…ธ๋“œ๋กœ ๋ณต์ œ๋˜์ง€ ์•Š๊ณ  ๋ฒ„ํผ์— ๋‚จ์•„ ์žˆ๋Š” ๋ฐ์ดํ„ฐ๋Š” ์œ ์‹ค๋  ์ˆ˜ ์žˆ๋‹ค. ์ด๋Ÿฌํ•œ ๋ฐ์ดํ„ฐ ์œ ์‹ค ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๊ธฐ ์œ„ํ•ด์„œ Spark 1.2 ๋ฒ„์ „์—์„œ๋Š” Write Ahead Logs(WAL) ์„ ๋„์ž…ํ–ˆ๋‹ค.

Write Ahead Logs (WAL) ์‚ฌ์šฉ

WAL ์„ ์‚ฌ์šฉํ•˜๋ฉด ๋“œ๋ผ์ด๋ฒ„, ๋งˆ์Šคํ„ฐ ํ˜น์€ ์›Œ์ปค๋…ธ๋“œ์—์„œ ์žฅ์• ๊ฐ€ ๋ฐœ์ƒํ•˜๋”๋ผ๋„ ์ž…๋ ฅ ๋ฐ์ดํ„ฐ๊ฐ€ ์œ ์‹ค๋˜๋Š” ๊ฒƒ์„ ๋ฐฉ์ง€ํ•  ์ˆ˜ ์žˆ๋‹ค. ์ŠคํŒŒํฌ ์„ค์ • ํŒŒ์ผ์—์„œ WAL ์‚ฌ์šฉ ์„ค์ •ํ•˜๊ณ  StreamingContext ๋ฅผ ์ฒดํฌํฌ์ธํŠธํ•˜๋ฉด ์žฅ์• ๊ฐ€ ๋ฐœ์ƒํ•  ๋•Œ ์ฒดํฌํฌ์ธํŠธ๋ฅผ ๋ณต๊ตฌํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋„ ๋ณต๊ตฌํ•  ์ˆ˜ ์žˆ๋‹ค.

spark.streaming.receiver.writeAheadLog.enable true

WAL์„ ์‚ฌ์šฉํ•˜๋ฉด ๋ฐ์ดํ„ฐ๊ฐ€ ์œ ์‹ค๋˜๋Š” ๋ฌธ์ œ๋Š” ํ•ด๊ฒฐํ•  ์ˆ˜ ์žˆ์ง€๋งŒ, ๋ฐ์ดํ„ฐ๊ฐ€ ์ค‘๋ณต๋  ์ˆ˜ ์žˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด, ๋ฐ์ดํ„ฐ๋ฅผ WAL ์— ์ €์žฅํ•˜๊ณ  Kafka Offset ์„ Zookeeper ์— ์—…๋ฐ์ดํŠธํ•˜์ง€ ๋ชปํ•œ ์ƒํƒœ์—์„œ ์žฅ์• ๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด, Spark Streaming ์€ Kafka ์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค์‹œ ๊ฐ€์ ธ์™€ ์ค‘๋ณต ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฌธ์ œ๊ฐ€ ๋ฐœ์ƒํ•œ๋‹ค. (at-least-one)

๋ฐ์ดํ„ฐ ์ค‘๋ณต ์ฒ˜๋ฆฌ ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•˜๊ธฐ ์œ„ํ•ด Spark 1.3 ๋ฒ„์ „์—์„œ๋Š” Direct Kafka API ๋„์ž…ํ•˜์˜€๋‹ค

Direct Kafka API

๋ฐ์ดํ„ฐ ์ˆ˜์‹  ์™„๋ฃŒ ์—ฌ๋ถ€๋ฅผ 2๊ฐœ์˜ ์‹œ์Šคํ…œ์—์„œ ๊ด€๋ฆฌํ•˜๋ฉด, ์›์ž์„ฑ์„ ๊ฐ€์ง€๊ณ  ์—…๋ฐ์ดํŠธํ•  ์ˆ˜ ์—†๊ธฐ ๋•Œ๋ฌธ์— ๋ถˆ์ผ์น˜์„ฑ์ด ๋ฐœ์ƒํ•œ๋‹ค. ๋ถˆ์ผ์น˜์„ฑ์„ ํ•ด๊ฒฐํ•˜๊ธฐ ์œ„ํ•ด 1๊ฐœ์˜ ์‹œ์Šคํ…œ์—์„œ ๋ฐ์ดํ„ฐ ์ˆ˜์‹ ์„ ๊ด€๋ฆฌํ•  ํ•„์š”๊ฐ€ ์žˆ๋‹ค. Spark Streaming ์€ Kafka์˜ Simple Consumer API ๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ offset ์„ Spark Streaming ์—์„œ ์ง์ ‘ ๊ด€๋ฆฌํ•˜๋Š” ๊ตฌ์กฐ๋กœ ๋ณ€๊ฒฝํ•˜์—ฌ ์ด๋ฅผ ํ•ด๊ฒฐํ•˜์˜€๋‹ค.

์ด ๋ฐฉ์‹์€ Receiver ์™€ WAL ์„ ์‚ฌ์šฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ ์œ ์‹ค์„ ๋ฐฉ์ง€ํ•˜๋Š” ์ ‘๊ทผ ๋ฐฉ์‹๊ณผ ๋‹ค๋ฅด๋‹ค. Receiver๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์‹ ํ•˜๊ณ  WAL ์— ์ €์žฅํ•˜๋Š” ๋Œ€์‹ ์— ๋งค ๋ฐฐ์น˜๊ฐ„๊ฒฉ(Batch Interval)๋งˆ๋‹ค Consume ํ•œ ๋ฐ์ดํ„ฐ์˜ offset range์„ ์„ค์ •ํ•˜๊ณ  ๋‚˜์ค‘์— Batch Jobs์„ ์‹คํ–‰ํ•  ๋•Œ, offset range ์„ค์ •์— ๋”ฐ๋ผ ๋ฐ์ดํ„ฐ๋ฅผ Kafka Broker์—์„œ ์ฝ์–ด์˜ค๊ณ  offset์„ checkpointํ•˜๊ณ  ๋ณต๊ตฌํ•  ๋•Œ ์‚ฌ์šฉํ•œ๋‹ค.

Spark Streaming ์€ Direct Kafka API ๋ฅผ ์‚ฌ์šฉํ•จ์œผ๋กœ์จ ๋ฐ์ดํ„ฐ ์œ ์‹ค์„ ๋ฐฉ์ง€ํ•˜๊ณ  ์ค‘๋ณต ์—†์ด ์ •ํ™•ํ•˜๊ฒŒ ํ•œ๋ฒˆ(Exactly-one)๋งŒ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค.

์‚ฌ์šฉ๋ฒ•์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

์ฒดํฌํฌ์ธํŠธ

val CHECKPOINT_DIR = getCheckpointDirectory(SERVICE_NAME)
val ssc = if (isCheckpointing) {
  StreamingContext.getOrCreate(CHECKPOINT_DIR,
  () => {
    analyzer(sparkConf, SERVICE_NAME, CHECKPOINT_DIR)
  })
} else {
  analyzer(sparkConf, SERVICE_NAME, CHECKPOINT_DIR)
}
โ€ฆ
val ssc = new StreamingContext(sparkConf, SLIDE_INTERVAL)
if (isCheckpointing) {
  ssc.checkpoint(checkpointDirectory)
}

Direct Kafka ์—ฐ๋™

val KAFKA_TOPIC = Configuration.getKafkaTopic()
val DIRECT_KAFKA_PARAMS = Map("metadata.broker.list" -> Configuration.getKafkaBrokerList)
val DIRECT_TOPICS = Set(KAFKA_TOPIC)
val kafkaStreams = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, DIRECT_KAFKA_PARAMS, DIRECT_TOPICS)
kafkaStreams.map { case (k, v) => v }

Direct Kafka API๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด Receiver ๊ฐ€ ํ•„์š”์—†๊ธฐ ๋•Œ๋ฌธ์— Receiver (Kafka Consumer) ๋‹น Kafka Partition ์„ ๊ตฌ์„ฑํ•  ํ•„์š”๊ฐ€ ์—†๊ณ  ๊ฐ Kafka Partition ๋ณ„๋กœ RDD Partition ์ƒ์„ฑํ•ด ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด์˜ฌ ์ˆ˜ ์žˆ๋‹ค.

Spark Streaming์€ Fault-tolerant๋ฅผ ๋ณด์žฅํ•˜๊ธฐ ์œ„ํ•œ Kafka ์—ฐ๋™ ๋ฐฉ์‹์€ Checkpointing ๊ณผ ํ•จ๊ป˜ ์‚ฌ์šฉํ•ด์•ผ ํ•œ๋‹ค. ๋งŒ์•ฝ Spark ์–ดํ”Œ๋ฆฌ์ผ€์ด์…˜์„ ์žฌ์ปดํŒŒ์ผํ•˜์—ฌ ์žฌ๋ฐฐํฌํ•ด์•ผ ํ•œ๋‹ค๋ฉด ์ด์ „ ๋ฐ”์ด๋„ˆ๋ฆฌ๋กœ checkpointํ•œ Context ๋ฅผ ๋‹ค์‹œ ์ƒ์„ฑํ•  ์ˆ˜ ์—†๊ธฐ ๋•Œ๋ฌธ์— checkpoint Directory๋ฅผ ์‚ญ์ œํ•˜๊ณ  Spark ์–ดํ”Œ๋ฆฌ์ผ€์ด์…˜์„ ์‹คํ–‰ํ•ด์•ผ ํ•œ๋‹ค.

์šฉ์–ด

ACID(์›์ž์„ฑ, ์ผ๊ด€์„ฑ, ๊ณ ๋ฆฝ์„ฑ, ์ง€์†์„ฑ)

ACID ์›์ž์„ฑ (Atomicity) ํŠธ๋žœ์žญ์…˜์„ ์„ฑ๊ณต์ ์œผ๋กœ ์™„๋ฃŒํ•˜๋ฉด, ํ•ญ์ƒ ์ผ๊ด€์„ฑ ์žˆ๋Š” ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ์œ ์ง€ํ•ด์•ผ ํ•œ๋‹ค.

ACID ์ง€์†์„ฑ (Durability) ํŒŒ์ผ์‹œ์Šคํ…œ ํ˜น์€ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค์— ์„ฑ๊ณต์ ์œผ๋กœ ์™„๋ฃŒํ•œ ํŠธ๋žœ์žญ์…˜์€ ์˜์›ํžˆ ๋ฐ˜์˜๋˜์–ด์•ผ ํ•œ๋‹ค.

์ฐธ๊ณ ์ž๋ฃŒ

Improvements to Kafka integration of Spark Streaming, https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html Improved Fault-tolerance and Zero Data Loss in Spark Streaming, https://databricks.com/blog/2015/01/15/improved-driver-fault-tolerance-and-zero-data-loss-in-spark-streaming.html Integrating Kafka and Spark Streaming: Code Examples and State of the Game, http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/ Exactly-once Spark Streaming from Apache Kafka, http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/

Last updated

Was this helpful?