CS/Data Engineering

[Data Engineering] Spark

하노정 2023. 6. 8. 16:26

📌 목차

  • Introduction to Spark
  • Spark WordCount

 

🍀 Introduction to Spark

  • large scale data 처리를 위한 분석 엔진
  • Java, Scala, Python, R 등으로 Spark를 이용할 수 있다.
  • 하둡을 백엔드 스토리지 시스템으로 사용할 수 있다.

 

🔅 Motivation

MapReduce가 대형 데이터의 분석을 쉽게 만들어 준 것은 사실인데, 뭔가 좀 부족한 면이 있다. PageRank 처리 시 iteration을 돌고, matrix의 곱을 할 때도 두 단계의 MapReduce를 거쳐 결과를 얻어냈다. PageRank, Matix Mul과 같이 stage가 여러 개인 경우는 MapReduce로 작성하는 것이 쉽지 않다. 데이터 분석을 하고자 하는 쿼리들이 점점 더 복잡해지고 ad-hoc해진다. 그때그때 달라진다는 얘기이다. 이를 해결하기 위해, 특수 목적의 분산 프레임워크들이 나왔는데 Spark는 이와 비슷하게 좀 더 근본적인 접근을 하고자 하는 프레임워크이다.

🔅 Data Sharing 도구

기존 MapReduce의 경우 map 단계에서 처리하고자 할 때 hdfs 데이터를 모두 map에 로딩하고, reduce의 결과를 다시 hdfs에 write하고 두 번째 단계에서도 마찬가지로 반복적 수행한다. 매 단계마다 디스크 io가 발생한다. 이러한 반복이 커지면 디스크 io가 overhead될 수 있다. Spark는 이 디스크 io를 최소화하는 것이 목표이다. iteration 사이에 hdfs에 쓰지 않고 RAM에 저장해두고 계속 쓴다. MapReduce는 operator가 map, reduce만 있는데 이 두가지를 갖고 모든 프로그램을 짜야 했는데 Spark는 operator 종류가 많다. join, sum, count를 기본적으로 제공하고 있고 연산을 풍부하게 하는 것들이 굉장히 많다.

Matrix Mul 계산 시 두 단계의 MapReduce를 했었다. 두번째 단계의 map은 사실 필요가 없었다. data를 bypass하는 역할만 했었고 reduce에서 더하기를 해서 최종 output을 계산했다.  map, reduce, reduce 연결해도 계산 가능한데도 하둡 같은 경우는 반드시 map, reduce * 2 순서로 작성해야 해서 bypass하는 map을 넣었다. Spark는 이런 조합도 모두 가능하도록 연산의 조합을 구성할 수 있다. 

쿼리 수행 전에 한 번에 데이터를 RAM에 올려두고 쿼리 처리하도록 구성되어 있다. 이게 속도를 올리기 위한 Spark의 핵심이다.

🔅 Challege-fault-tolerant & efficient RAM storage

Spark가 hdfs가 아니라 RAM을 쓰게 되면서 만나게 된 챌린지가 있다. 그 중 첫번째가 fault-tolerant(결함 내성)이다. 결함 발생 시 어떻게 처리할 건지? (hdfs는 copy 3개를 떠서 한 머신에 장애가 발생해도 다른 곳에서 실행하면 됐었다. 그런데 RAM은 껐다 키면 사라진다, RAM을 copy 3개 뜨는 것은 비용이 너무 비싸다) RAM은 update가 가능하다. 따라서 결함 내성하기 어렵다. 왜냐하면 특정 시점 t1에서 메모리에 대해서 스냅샷을 뜨고 시간이 흐르면 특정 부분만 변경이 될 수 있다. t2에서 fault가 나면, 복구를 해야 한다. 해당 부분이 반영되도록 연산을 재실행 해야 하는데 그럼 어떤 순서로 연산했는지 기록을 모두 남겨야 한다.스냅샷 뜨는 것을 check pointing라고 하고, 연산 기록하는 것을 logging이라고 한다. 이 작업은 굉장히 overhead가 크다. check pointing은 메모리 내용을 그대로 dump 뜨는 것이기에 디스크 io가 생겨 전체 시스템이 엄청 느려진다. 

Spark에서는 이 점을 해결하기 위해 hdfs와 비슷한 컨셉을 도입한다. hdfs는 기존 파일 시스템과 다르게 modify가 불가능하다. modify가 delete->write로 되어 있다. 기존 파일 시스템을 리눅스, 윈도우에서 수정하면 수정한 부분만 반영하게 되어 있는데 hdfs는 특정 부분만 변경하더라도 전체 파일을 지웠다가 다시 쓰도록 되어 있다. modify가 불가능하다는 것은 한 번 데이터를 쓰고 여러 번 읽는 workload에 최적화되어 있는 파일 시스템이다. 이 workload는 빅데이터 처리에서 전형적인 형태이다. Spark에서도 RAM을 read-only 형태로 유지하게 되고, 이 dataSet을 RDD라고 한다. Spark는 iteration 돌 때마다 그 결과를 RAM에 적게 되는데 그 영역을 모두 read-only로 처리한다. 

🔅 RDD (Resilient Distributed Datasets)

  • Immutable(변경 불가능)
  • 레코드 조합을 분산해서 관리
  • 일부 영역을 그 자리에서 바꾸는 것이 아니라 RAM 데이터를 다른 형태의 RDD로 변형하는 것만 가능.
    • storage -> RDD, RDD -> RDD 의 변환만이 가능
    • update 불가능
  • 장점은?
    • 한 번 생성된 후로 고쳐진 적이 없다. immutable = read-only
      • 어떻게 만들어졌는 지만 기록해 두면 또 만들 수 있다!
    • 부모로부터 어떻게 만들어진 지 계보(lineage)만 기록해도 fault-tolerant
      • storage에서 data 읽어서 RDD 로 만들고, 또 다른 RDD를 만들었다고 하면, 연산의 순서가 있다. 연산의 순서만 기록하게 되면 다시 storage에서 읽어서 RDD를 만들고 다시 복구해 계산을 이어갈 수 있다. 

🔅 Implementation

transformation을 작성하는 것이 Spark로 프로그래밍하는 것이다. 

Map부터 CoGroup까지 모두 transformation에 해당된다. 모든 것을 다 계산하게 되면, 그 후 디스크에 쓰는데 그 작업을 action이라고 한다. 이런 형태의 연산이 수행되는 형태를 lineage라고 하고 그래프의 형태로 표현 가능하다. 

  • 코드를 작성해서 실제 수행한다 = lineage 속 operator의 순서와 순서 스케줄링
  • 실제 계산 = action 시 

🔅 Transformations & Actions

  • transformation은 데이터를 RDD -> RDD 로 만드는 operator의 총합을 말한다.
    • 하둡 MR map, reduce operator만 있지만 Spark는 다양한 operator가 있다.
  • action은 실제로 마지막에 계산이 실행되는 부분을 말한다.
    • trans -> trans -> .. -> trans -> action

🔅 lazy-execution

성능을 위해 lazy-execution 기법을 사용한다.

  • 코딩을 transformation을 가지고 하는데, 실제 이때는 아무 일도 생기지 않고 lineage만 생성되는 중
    • Spark는 리눅스처럼 shell을 준다. 그 안에서 뭔가를 실행할 수 있다는 건데 명령을 할 때마다 실행되는 게 아니라 save를 할 때 비로소 모든 명령이 실행되는 것이다. 이 shell은 scala, python만 제공한다.
    • 실습은  java로 계산해주는 클래스를 만들어서 하둡처럼 Spark에 작업을 submit하면 결과를 계산하도록 한다.
  • action에 해당하는 명령이 불리면 그제서야 쌓였던 것이 실행
    • shell에서 transformation을 차례로 기술하게 되고, 마지막 action을 호출하면 계산이 된다고 했는데 action 호출 될 때는 lineage를 다 그려놓은 상탱서 실행된다.
    • 실행 plan이 만들어진 상태이고, 자원이 배치 된, 배치 될 상황을 미리 고려해서 최적의 스케줄을 만들어 실행시킬 수 있다.

🔅 Narrow Dependencies & Wide Dependencies

  • Narrow Dependencies : 한 노드에서 다 처리할 수 있는 일은 모아서
    • map은 하나의 파일을 읽어서 그대로 output을 낸다. 한 노드에 모아서 처리할수록 성능 좋다.
    • 해당 작업이 한 노드에서 다 돌 수 있음
    • 네트워크 타지 않음
    • 아주 빠름. 메모리의 속도로 동작
    • 파티션이 고장 나도 해당 노드에서 다 복원 가능 
  • Wide Dependencies : 여러 파티션에서 다 읽어와야 하는 것은 느림
    • reduce는 같은 키를 갖는 것을 모은다. 일반적으로 실행 시간이 느리다.
    • shuffle이 일어나야 하는 일들
    • 네트워크를 타야 함
    • 상대적으로 느림. 네트워크 속도로 동작
    • 부서지면 큰 overhead (lineage로 다시 다 실행)
    • 추가적으로 checkpointing 해주는 게 좋을 수도 있음

🔅 Job Scheduling

  • lineage는 그래프이기 때문에 그래프에 따라 job을 수행
  • 파티션이 수행 될 노드는 data-locality를 고려해 결정
    • 특히 hdfs를 사용하면, 데이터가 있는 곳에 코드를 보내 수행하도록 결정

🔅 Spark runtime

  • worker들은 기존 MR과 비슷하게 동작함
  • 데이터는 파티션으로 분리돼 머신에 배분되어 있음.
    • hdfs 처럼 배분될 수 있고 일반적으로 Spark가 hdfs 상에서 동작함
  • 드라이버에서 내려온 operator(transform)들이 각 파티션마다 수행됨
  • 필요에 따라 Wide Dependencies의 경우에는 shuffle을 수행해서 다른 머신으로 그 파티션의 결과가 전달되기도 함

 

  • 예로, dataSet이 총 100 GB인데 25대에 100%다 올라간 경우와 그렇지 않은 경우의 실행속도를 봄
    • 100% -> 11초 (가장 빠름)
    • dataSet이 메모리에 많이 올라갈수록 속도가 빠름
  • 수행 중 메모리가 모자라면?
    • LRU를 쓰고, 가장 쓰지 않을 확률이 높은 파티션을 그대로 날림
    • 캐시와 비슷

 

Spark가 얼마나 fault에 영향을 받는지에 대해 설명한다. workload는 k-means이고, 이는 iteration을 여러 번 돌게 되어 있는 workload이다. 그 중 6번째 iteration을 돌 때 강제로 노드를 fault낸 경우이다. 매 iteration마다 수행시간이 거의 비슷하다. fail이 난 경우와 나지 않은 경우의 수행시간이 크게 다르지 않다. 처음엔 읽어오는 시간이라 좀 오래 걸린다. 6번째 fault가 나면 그때 시간이 좀 더 걸리지만 나머지 연산에는 영향을 미치지 않는다. 전체 iteration을 돌 때 20-30초 정도 영향만 미친다. 

  • 수행 중 fault가 나면 recovery 영향은?
    • lineage는 용량이 작기 때문에 잘 로깅해 둠 -> 부담이 적음
    • 특정 파티션에 문제가 생기면 다른 녿에서 당겨와서 실행

🔅 Chekpointing

  • fault가 나도 lineage로 복원해 낼 수 있지만, Wide Dependency의 경우 느림 -> 모든 파티션을 다 읽어야 하므로 overhead가 굉장히 클 수 있음
  • 재계산을 통한 구축 비용이 디스크에 checkpointing 해 놓았다가 읽는 것보다 더 오래 걸릴 수도 있음
  • 그럴 땐 checkpoint를 해 둠 -> RDD는 immutable이 checkpoint의 overhead를 줄여줌
  • 이때 다른 시스템들처럼 멈출 필요 없이 병렬로 백그라운드에서 async로 돌림 -> read-only의 활약 (어차피 read만 하므로 디스크에 쓰는 작업을 비동기 수행할 수 있음)
  • narrow dependency는 중간 fault가 나도 금방 해당 파티션을 복원할 수 있음
  • wide dependency(결과물이 반복된 shuffle=join의 결과물)는 checkpointing을 해 두는 것이 좋음

🔅 Evaluation

하둡과 비교 시 Spark가 압도적으로 빠르다. 여러 iteration을 도는 workload이고, 첫 iteration과 끝 iteration을 봤다. 처음은 읽어오기 때문에 오래 걸리고 마지막은 메모리에서 transformation이 이루어지기 때문에 Spark 성능이 굉장히 빠르다. 하둡은 처음과 끝이 비슷하게 오래 걸리는데 hdfs에 data를 쓰고 읽는 과정 때문에 그렇다. 하둡 BM은 하드를 쓰지 않고 메모리만 쓴 것인데, 메모리를 쓰더라도 성능 개선 효과가 그렇게 크지 않다. 

머신의 수가 늘어도 Spark의 성능이 더 좋다.

🔅 Spark vs in-memory hadoop

RAM을 쓰므로 당연히 하둡보다 빠르다. 하둡 BM 같이 in-memory 하둡은 왜 Spark보다 느리냐면, 하둡의 소프트웨어 스택이 너무 복잡해 기본 요금 자체가 비싸다. 성능이 안 좋다는 것이다. hdfs라는 파일 구조체를 다루는 데 비싸다. 

🔅 Spark의 표현 영역

RDD라는 제한된 형태를 쓰다 보니 사용자들의 일반적인 요구사항을 다 커버할 수 있는지 의문을 가질 수 있는데 Spark는 거의 다 된다. MR 등 많은 계산 프레임워크들이 있는데 기존 방법들에서 제공하는 거의 모든 요구사항을 Spark가 처리하고 있다. 

🔅 Conclusion

  • RAM을 ROM처럼 써 보기로 했다. 그게 RDD이고 immutable 영역을 만들어 계산을 통해 또 다른 RDD로 만드는 과정을 transformation이라고 한다. 여러번 transformation을 수행하다가 action을 수행할 때 전체 transformations를 계산한다는 것을 알았고 그게 lazy-execution이다.
  • RDD는 immutable이라서 한 번 쓰고 고치지 않는다.
  • fault-tolerant & 효율적인 RAM 저장 공간을 만들었다.

🔅 RDD in Spark

  • RDD는 Spark의 기본적인 자료구조
  • Spark의 데이터처리는 RDD를 이용
  • RDD를 가공해 새로운 RDD를 만들고(transformation), 이러한 처리를 반복해 원하는 결과를 얻음

🔅 RDD 구조와 특징

  • 대량의 데이터를 요소로 가지는 분산 컬렉션
  • 여러 머신으로 구성된 클러스터 환경에서의 분산 처리를 전제로 설계됨
  • 내부는 파티션이라는 단위로 나뉘어 지며, 이 파티션이 분산 처리의 단위
  • RDD를 파티션 단위로 여러 머신에서 처리하므로 한 대의 머신으로 처리할 수 있는 것보다 더 큰 데이터를 다룰 수 있음
  • 사용자는 transformation을 통해 데이터를 가공하고, RDD의 내용에 따라 action이라는 처리를 적용해 원하는 결과를 얻음

🔅 RDD 다루기, transformation

  • RDD를 가공하고 그 결과 새로운 RDD를 얻는 처리
  • transformation 후의 RDD가 가지는 요소는 transformation 전의 RDD에 들어있던 요소를 가공하거나 필터링해 생성됨
  • transformation은 다시 두 종류로 구분됨
  • 변환 전의 RDD가 갖는 요소를 같은 RDD의 다른 요소들과 관계없이 처리할 수 있는 transformation
    • filter, map, flatmap, zip 등
  • 변환 전의 RDD가 가지는 요소를 같은 RDD의 다른 요소와 함께 처리하는 변환
    • 변환 대상은 key와 value의 쌍을 요소로 가지는 RDD (pairRDD)
    • 같은 key를 가지는 요소를 한데 모아 처리
    • reduceByKey, join 등

🔅 RDD 다루기, action

  • RDD 내용을 바탕으로 데이터를 가공하지 않고 원하는 결과를 얻는 조작
    • saveAsTextFile, count 등

 

🍀 Spark WordCount

  • 텍스트 파일 읽어오기 - transformation 
  • 텍스트 파일을 '' 로 자르기 - transformation 
  • 각 요소가 key가 되고 value가 1이 되는 Tuple들을 바꾸기 - transformation 
  • 같은 key를 가지는 요소를 aggregation 하기 - transformation 
  • 결과를 텍스트 파일로 저장하기 - action

 

파일 전체를 RDD로 만든다. flatMap를 이용해 문장에 있는 단어를을 전부 리스팅한 RDD를 만든다. 단어를 공백 혹은 new line을 기준으로 split해서 단어를 하나의 요소로 만든다. 이게 flatmap operator를 통해 이루어진다. 

map() vs flatmap()

  • map은 공백을 기준으로 단어를 다 리스트로 만들어내는 것인데 줄의 형태를 그대로 유지하게 된다. 줄마다 단어로 찢어서 리스트로 만든다. 줄마다 경계가 있다.
  • flatmap은 한 줄씩이 아니라 모든 요소에 대해 map 함수를 적용하여 리스트를 만든다. 줄마다 구분이 없고 모든 단어가 리스트로 만들어진다. iterator에 의해 쪼개진 원소들로 다시 구성된 RDD를 반환한다.

단어 하나에 대해 value 1을 만들어서 mapToPair를 진행한다. 그리고 MR의 reduce를 수행할 때처럼 reduceByKey를 호출한다. key가 같은 요소끼리 묶인다. 더하는 함수로 같은 key끼리 모여진 value들을 더한다. 

reduceByKey()

  • 같은 key끼리 모아서 처리를 해 RDD를 생성한다.

 

🍀 Java Real Code (Spark WordCount)

Java 코드로 프로그래밍할 때 SparkSession을 하나 열고 transformation, action 수행 후 stop으로 종료하는 흐름이다. 

import scala.Tuple2;

import org.apache.spark.sql.SparkSession;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;

public final class JavaWordCount {

    public static void main(String[] args) throws Exception {

        if (args.length < 2) {
            System.err.println("Usage: JavaWordCount <in-file> <out-file>");
            System.exit(1);
        }

		// 1. 세션 읽기
        SparkSession spark = SparkSession
            .builder()
            .appName("JavaWordCount")
            .getOrCreate();

		// 2. 텍스트 파일 읽어오기
        JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();

		// 3. 텍스트 파일 ''로 자르기
        FlatMapFunction<String, String> fmf = new FlatMapFunction<String, String>() {
            public Iterator<String> call(String s) {
                return Arrays.asList(s.split(" ")).iterator(); // iterator() 반환
            }
        };
        JavaRDD<String> words = lines.flatMap(fmf);

		// 4. 각 요소가 key가 되고 value가 1이 되는 Tuple들로 바꾸기
        PairFunction<String, String, Integer> pf = new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) { // Tuple 반환
                return new Tuple2(s, 1);
            }
        };
        JavaPairRDD<String, Integer> ones = words.mapToPair(pf);

		// 5. 같은 key를 가지는 요소를 aggregation 하기
        Function2<Integer, Integer, Integer> f2 = new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer x, Integer y) { // 교환, 결합 법칙 성립
                return x + y;
            }
        };
        JavaPairRDD<String, Integer> counts = ones.reduceByKey(f2);

		// 6. RDD를 파일로 저장
        counts.saveAsTextFile(args[1]);
        spark.stop();
    }
}