[Data Engineering] Spark - Relational Processing
📌 목차
- Relatoinal Processing
🍀 Relatoinal Processing
Spark에서 join, left outer join, right outer join, full outer join이 어떻게 동작하고 코드로 어떻게 작성하는지 본다. 테이블의 데이터 사이즈가 굉장히 크고 분석하고자 하는 응용에 적합하다. 결제, 웹 브라우징에는 적합하지 않다. 큰 데이터를 한 번에 읽어서 원하는 정보를 가공하는 것에 적합한 것이다.
하둡으로 relational processing 하려면 map, partition, sort, .. , reduce를 하고 이 경우는 너무 복잡하고 번거롭다.
🔅 Join in Spark
RDD에 기본적으로 join 연산을 제공한다. join(inner join), left outer join, right outer join, full outer join 연산 또한 제공한다. RDD 객체가 있으면 join 호출 시 파라미터로 넣는 other 객체를 넣을 수 있는데, 호출 객체와 피호출 객체를 join 해서 새로운 RDD를 만들어주는 게 join 메서드이다. 이 RDD는 JavaPariRDD여야 한다. 그냥 RDD는 큰 array 같은 것이고, 파티션으로 나뉘어져 있다. 요소가 array 형태이기만 하면 된다. JavaPariRDD의 요소는 key-value 형태인 튜플이어야 한다. join 같은 경우는 key를 중심으로 join 하기 때문에 파라미터로 들어오는 other 객체와 this 객체는 모두 JavaPariRDD여야 하는 것이다.
🔅 Relational Data
앞서 사용한 데이터를 확장했고, 기존에 없던 code J가 물품 테이블에 생겼다. 코드 테이블에는 존재하지 않고 이는 outer join을 위한 것이다. 코드 테이블에도 C, T가 추가되었는데 이는 물품 테이블에 없다.
🔅 Join
정확히 일치하는 값들이 있는 데이터 레코드만 출력된다.
J는 코드 테이블에 존재하지 않는데, 이 경우는 빈 값을 null로 출력한다. 확장의 개념이다.
위와 반대로, 코드 테이블을 확장하여 빈 값을 null로 출력한다.
위 두개를 합쳐 모두 표시한다.
🔅 Join 흐름
- 최초 relation_a 파일 읽어오기 -> RDD (텍스트 파일 한 줄씩을 요소로 하는 array)
- RDD를 join key와 relation_a 객체(나머지 속성들)로 이루어진 Tuple로 바꾸기
- relation_b 파일 읽어오기
- RDD를 join key와 relation_b 객(나머지 속성들)로 이루어진 Tuple로 바꾸기
- relation_a와 relation_b의 join 연산하기
- 결과를 텍스트 파일로 저장하기
파일을 RDD로 바꾸게 되면, 한줄을 요소로 하는 거대한 array가 생성된다. 그리고 mapToPair로 join key, 나머지 속성들을 value로 하는 튜플 array를 만든다. PairRDD.
a, b 비슷한 작업을 해준다.
a를 나타내는 RDD에서 join을 호출하는데 이때 파라미터로 b를 나타내는 RDD를 넣으면 join이 완성된다.
🔅 Join 메서드
public <W> JavaPairRDD<K, Tuple2<V,W>> join(JavaPairRDD<K, W> other)
JavaPairRDD에서 제공하는 연산으로, 파라미터로 다른 JavaPairRDD를 넣으면 join 연산 수행한다. 지정된 K로 join 연산한다.
public <W> JavaPairRDD<K, Tuple2<V, Optional<W>>> leftOuterJoin(JavaPairRDD<K, W> other)
파라미터로 들어온 JavaPairRDD 중에서, 메서드를 호출한 객체의 key에 맞는 JavaPairRDD가 존재하지 않으면 Optional.empty 객체를 넣어서 반환한다.
public <W> JavaPairRDD<K, Tuple2<Optional<V>, W>> rightOuterJoin(JavaPairRDD<K, W> other)
메서드를 호출한 객체 중에서, 파라미터로 들어온 JavaPairRDD의 key에 맞는 JavaPairRDD가 존재하지 않으면 Optional.empty 객체를 넣어서 반환한다.
public <W> JavaPairRDD<K, Tuple2<Optional<V>, Optional<W>>> fullOuterJoin(JavaPairRDD<K, W> other)
위 두 가지를 합친 내용이다.
🔅 Pseudo code
SparkSession을 만들고, 파일 읽어서 RDD로 변환한다. 두 테이블에 대해 각각 mapToPair를 호출해 JavaPairRDD를 만든다. JavaPairRDD에 대해 각각 join, left outer join, right outer join, full outer join을 각각 호출할 수 있고 JavaPairRDD를 반환 받을 수 있다. 결과를 파일에 저장하면 원하는 결과를 볼 수 있다.