Spark Streaming Fault-tolerant

Receiver, WAL, Direct Kafka API

Spark의 RDDλŠ” μž₯μ•  볡ꡬ κΈ°λŠ₯을 가지고 μžˆλ‹€. RDD λŠ” λ³€κ²½ λΆˆκ°€λŠ₯ν•˜κ³ , μ—°μ‚° μˆœμ„œλ₯Ό κΈ°μ–΅ν•˜κ³ (lineage) μžˆμ–΄μ„œ λ‹€μ‹œ λ§Œλ“€ 수 μžˆλŠ” λΆ„μ‚° 데이터셋이닀. μ›Œμ»€ λ…Έλ“œμ˜ 였λ₯˜ λ•Œλ¬Έμ— RDD 의 μ–΄λ–€ 뢀뢄이 μœ μ‹€λ˜λ©΄ κ·Έ λΆ€λΆ„μ˜ μ—°μ‚° μˆœμ„œλ₯Ό μ‚¬μš©ν•˜μ—¬ λ‹€μ‹œ 생성할 수 μžˆλ‹€. Spark Streaming의 DStream 도 RDDλ₯Ό κ·Όκ°„μœΌλ‘œ ν•˜κΈ° λ•Œλ¬Έμ— 볡원할 수 μžˆλ‹€.

데이터 μœ μ‹€ 문제

SparkλŠ” 기본적으둜 μž₯μ•  볡ꡬ κΈ°λŠ₯을 가지고 μžˆμ–΄μ„œ HDFS, S3 등을 데이터 μ†ŒμŠ€λ‘œ μ‚¬μš©ν•˜λŠ” κ²½μš°μ—λŠ” 데이터가 μœ μ‹€λ˜μ§€ μ•ŠλŠ”λ‹€. ν•˜μ§€λ§Œ λ„€νŠΈμ›Œν¬λ₯Ό ν†΅ν•΄μ„œ 데이터λ₯Ό μˆ˜μ‹ ν•˜λŠ” 경우(예, kafka, flume)μ—λŠ” 데이터가 μœ μ‹€λ  수 μžˆλ‹€.

Spark Streaming μ—μ„œ Receiverλ₯Ό μ‚¬μš©ν•  경우, ReceiverλŠ” 데이터 μœ μ‹€μ„ 막기 μœ„ν•΄μ„œ λ„€νŠΈμ›Œν¬λ₯Ό ν†΅ν•΄μ„œ 받은 데이터λ₯Ό λ‹€λ₯Έ λ…Έλ“œμ— λ³΅μ œν•˜κΈ° λ•Œλ¬Έμ—(λ””ν΄νŠΈ 볡제 인자: 2), μ›Œμ»€ λ…Έλ“œκ°€ μ‹€νŒ¨ν•  λ•Œ λ³΅μ‚¬λ³ΈμœΌλ‘œ μœ μ‹€λœ 데이터λ₯Ό 볡ꡬ할 수 μžˆλ‹€.

κ·ΈλŸ¬λ‚˜ Receiverκ°€ μžˆλŠ” μ›Œμ»€ λ…Έλ“œμ—μ„œ μž₯μ• κ°€ λ°œμƒν•  κ²½μš°μ—λŠ” Receiverμ—μ„œ λ‹€λ₯Έ μ›Œμ»€λ…Έλ“œλ‘œ λ³΅μ œλ˜μ§€ μ•Šκ³  버퍼에 남아 μžˆλŠ” λ°μ΄ν„°λŠ” μœ μ‹€λ  수 μžˆλ‹€. μ΄λŸ¬ν•œ 데이터 μœ μ‹€ 문제λ₯Ό ν•΄κ²°ν•˜κΈ° μœ„ν•΄μ„œ Spark 1.2 λ²„μ „μ—μ„œλŠ” Write Ahead Logs(WAL) 을 λ„μž…ν–ˆλ‹€.

Write Ahead Logs (WAL) μ‚¬μš©

WAL 을 μ‚¬μš©ν•˜λ©΄ λ“œλΌμ΄λ²„, λ§ˆμŠ€ν„° ν˜Ήμ€ μ›Œμ»€λ…Έλ“œμ—μ„œ μž₯μ• κ°€ λ°œμƒν•˜λ”λΌλ„ μž…λ ₯ 데이터가 μœ μ‹€λ˜λŠ” 것을 방지할 수 μžˆλ‹€. 슀파크 μ„€μ • νŒŒμΌμ—μ„œ WAL μ‚¬μš© μ„€μ •ν•˜κ³  StreamingContext λ₯Ό μ²΄ν¬ν¬μΈνŠΈν•˜λ©΄ μž₯μ• κ°€ λ°œμƒν•  λ•Œ 체크포인트λ₯Ό λ³΅κ΅¬ν•˜μ—¬ 데이터도 볡ꡬ할 수 μžˆλ‹€.

spark.streaming.receiver.writeAheadLog.enable true

WAL을 μ‚¬μš©ν•˜λ©΄ 데이터가 μœ μ‹€λ˜λŠ” λ¬Έμ œλŠ” ν•΄κ²°ν•  수 μžˆμ§€λ§Œ, 데이터가 쀑볡될 수 μžˆλ‹€. 예λ₯Ό λ“€μ–΄, 데이터λ₯Ό WAL 에 μ €μž₯ν•˜κ³  Kafka Offset 을 Zookeeper 에 μ—…λ°μ΄νŠΈν•˜μ§€ λͺ»ν•œ μƒνƒœμ—μ„œ μž₯μ• κ°€ λ°œμƒν•˜λ©΄, Spark Streaming 은 Kafka μ—μ„œ 데이터λ₯Ό λ‹€μ‹œ 가져와 쀑볡 μ²˜λ¦¬ν•˜λŠ” λ¬Έμ œκ°€ λ°œμƒν•œλ‹€. (at-least-one)

데이터 쀑볡 처리 문제λ₯Ό ν•΄κ²°ν•˜κΈ° μœ„ν•΄ Spark 1.3 λ²„μ „μ—μ„œλŠ” Direct Kafka API λ„μž…ν•˜μ˜€λ‹€

Direct Kafka API

데이터 μˆ˜μ‹  μ™„λ£Œ μ—¬λΆ€λ₯Ό 2개의 μ‹œμŠ€ν…œμ—μ„œ κ΄€λ¦¬ν•˜λ©΄, μ›μžμ„±μ„ 가지고 μ—…λ°μ΄νŠΈν•  수 μ—†κΈ° λ•Œλ¬Έμ— λΆˆμΌμΉ˜μ„±μ΄ λ°œμƒν•œλ‹€. λΆˆμΌμΉ˜μ„±μ„ ν•΄κ²°ν•˜κΈ° μœ„ν•΄ 1개의 μ‹œμŠ€ν…œμ—μ„œ 데이터 μˆ˜μ‹ μ„ 관리할 ν•„μš”κ°€ μžˆλ‹€. Spark Streaming 은 Kafka의 Simple Consumer API λ₯Ό μ‚¬μš©ν•˜μ—¬ offset 을 Spark Streaming μ—μ„œ 직접 κ΄€λ¦¬ν•˜λŠ” ꡬ쑰둜 λ³€κ²½ν•˜μ—¬ 이λ₯Ό ν•΄κ²°ν•˜μ˜€λ‹€.

이 방식은 Receiver 와 WAL 을 μ‚¬μš©ν•˜μ—¬ 데이터 μœ μ‹€μ„ λ°©μ§€ν•˜λŠ” μ ‘κ·Ό 방식과 λ‹€λ₯΄λ‹€. Receiverλ₯Ό μ‚¬μš©ν•˜μ—¬ 데이터λ₯Ό μˆ˜μ‹ ν•˜κ³  WAL 에 μ €μž₯ν•˜λŠ” λŒ€μ‹ μ— 맀 λ°°μΉ˜κ°„κ²©(Batch Interval)λ§ˆλ‹€ Consume ν•œ λ°μ΄ν„°μ˜ offset range을 μ„€μ •ν•˜κ³  λ‚˜μ€‘μ— Batch Jobs을 μ‹€ν–‰ν•  λ•Œ, offset range 섀정에 따라 데이터λ₯Ό Kafka Brokerμ—μ„œ μ½μ–΄μ˜€κ³  offset을 checkpointν•˜κ³  볡ꡬ할 λ•Œ μ‚¬μš©ν•œλ‹€.

Spark Streaming 은 Direct Kafka API λ₯Ό μ‚¬μš©ν•¨μœΌλ‘œμ¨ 데이터 μœ μ‹€μ„ λ°©μ§€ν•˜κ³  쀑볡 없이 μ •ν™•ν•˜κ²Œ ν•œλ²ˆ(Exactly-one)만 μ²˜λ¦¬ν•  수 μžˆλ‹€.

μ‚¬μš©λ²•μ€ λ‹€μŒκ³Ό κ°™λ‹€.

체크포인트

val CHECKPOINT_DIR = getCheckpointDirectory(SERVICE_NAME)
val ssc = if (isCheckpointing) {
  StreamingContext.getOrCreate(CHECKPOINT_DIR,
  () => {
    analyzer(sparkConf, SERVICE_NAME, CHECKPOINT_DIR)
  })
} else {
  analyzer(sparkConf, SERVICE_NAME, CHECKPOINT_DIR)
}
…
val ssc = new StreamingContext(sparkConf, SLIDE_INTERVAL)
if (isCheckpointing) {
  ssc.checkpoint(checkpointDirectory)
}

Direct Kafka 연동

val KAFKA_TOPIC = Configuration.getKafkaTopic()
val DIRECT_KAFKA_PARAMS = Map("metadata.broker.list" -> Configuration.getKafkaBrokerList)
val DIRECT_TOPICS = Set(KAFKA_TOPIC)
val kafkaStreams = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, DIRECT_KAFKA_PARAMS, DIRECT_TOPICS)
kafkaStreams.map { case (k, v) => v }

Direct Kafka APIλ₯Ό μ‚¬μš©ν•˜λ©΄ Receiver κ°€ ν•„μš”μ—†κΈ° λ•Œλ¬Έμ— Receiver (Kafka Consumer) λ‹Ή Kafka Partition 을 ꡬ성할 ν•„μš”κ°€ μ—†κ³  각 Kafka Partition λ³„λ‘œ RDD Partition 생성해 데이터λ₯Ό μ½μ–΄μ˜¬ 수 μžˆλ‹€.

Spark Streaming은 Fault-tolerantλ₯Ό 보μž₯ν•˜κΈ° μœ„ν•œ Kafka 연동 방식은 Checkpointing κ³Ό ν•¨κ»˜ μ‚¬μš©ν•΄μ•Ό ν•œλ‹€. λ§Œμ•½ Spark μ–΄ν”Œλ¦¬μΌ€μ΄μ…˜μ„ μž¬μ»΄νŒŒμΌν•˜μ—¬ μž¬λ°°ν¬ν•΄μ•Ό ν•œλ‹€λ©΄ 이전 λ°”μ΄λ„ˆλ¦¬λ‘œ checkpointν•œ Context λ₯Ό λ‹€μ‹œ 생성할 수 μ—†κΈ° λ•Œλ¬Έμ— checkpoint Directoryλ₯Ό μ‚­μ œν•˜κ³  Spark μ–΄ν”Œλ¦¬μΌ€μ΄μ…˜μ„ μ‹€ν–‰ν•΄μ•Ό ν•œλ‹€.

μš©μ–΄

ACID(μ›μžμ„±, 일관성, 고립성, 지속성)

ACID μ›μžμ„± (Atomicity) νŠΈλžœμž­μ…˜μ„ μ„±κ³΅μ μœΌλ‘œ μ™„λ£Œν•˜λ©΄, 항상 일관성 μžˆλŠ” λ°μ΄ν„°λ² μ΄μŠ€ μœ μ§€ν•΄μ•Ό ν•œλ‹€.

ACID 지속성 (Durability) νŒŒμΌμ‹œμŠ€ν…œ ν˜Ήμ€ λ°μ΄ν„°λ² μ΄μŠ€μ— μ„±κ³΅μ μœΌλ‘œ μ™„λ£Œν•œ νŠΈλžœμž­μ…˜μ€ μ˜μ›νžˆ λ°˜μ˜λ˜μ–΄μ•Ό ν•œλ‹€.

참고자료

Improvements to Kafka integration of Spark Streaming, https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html Improved Fault-tolerance and Zero Data Loss in Spark Streaming, https://databricks.com/blog/2015/01/15/improved-driver-fault-tolerance-and-zero-data-loss-in-spark-streaming.html Integrating Kafka and Spark Streaming: Code Examples and State of the Game, http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/ Exactly-once Spark Streaming from Apache Kafka, http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/

Last updated

Was this helpful?