CS/Data Engineering

[Data Engineering] Relational Data Processing(Join)

하노정 2023. 6. 8. 23:44

📌 목차

  • ReduceSideJoin
  • Optimizing ReduceSideJoin
  • MapSideJoin

 

🍀 ReduceSideJoin

DB에서 사용하는 join을 MR에서 어떻게 구현하는지 살펴본다. Relational Operator 중에서 join이다. 같은 속성을 갖는 테이블 두 개이상이 있을 때 합치는 연산이다. 

물건 테이블과 코드 테이블을 합칠 건데 코드 테이블을 기준으로 합쳐서 id, price, des를 출력하는 것이다. 

 

데이터 파일은 테이블마다 하나씩 생긴다. relation_a, relation_b 파일로 저장한다. map은 파일 별로 mapper가 생성되기 때문에 각각 하나씩 mapper가 뜬다. mapper 속 map 함수는 어떤 테이블을 대상으로 수행하는지는 파일 이름으로 구별한다. 

두 개의 테이블에서 나오는 데이터를 합쳐야 하는데, 합쳐야 하는 키를 join key라고 하는데 이를 기준으로 합쳐서 reduce에 보여준다. relation_a를 처리하든 relation_b를 처리하든 join key를 map output key로 사용한다. reduce에서는 join key를 기준으로 a,b 데이터를 합친다. 

join key를 key로 하고, 나머지 데이터를 value로 모은다. 같은 join key를 갖는 데이터가 a에서도 올 수 있고 b에서도 올 수 있다. 따라서 어디서 왔는지 표시하기 위해 value에 a면 a, b면 b 표시를 추가해준다. 

shuffle 결과 value는 순서 상관없이 모아주기만 하는 것이다. B 파일에서 온 값이 어느 위치에 있는지는 랜덤이다. 이 값을 먼저 끄집어내야 output으로 만들 수 있다. reduce가 더 복잡하다. 

 

map함수는 join key와 나머지 레코드에 file a, b 어떤 것을 처리하고 있는지에 따라 문자를 붙인다. 

reduce 함수는 B 파일에서 온 값이 아직 des에 안 들어갔으면, 처리하지 않고 버퍼에 넣는다. B 파일에서 온 값이 나오면 des에 값을 넣으면 된다. des가 빈 문자열이 아니면 join된 레코드를 만들어서 reduce output으로 만들 수 있다. 아직 처리 안 한 첫번째 데이터는 reduce가 끝날 때 버퍼를 다시 확인해서 처리한다. 이때 des와 합쳐서 join된 레코드를 만든다. 

 

🍀 Optimizing ReduceSideJoin

join을 optimize하는 방법이다. map에서는 join key를 갖고 map의 output key로 사용하고, 나머지 속성들을 map의 output value로 만들었다. join 했을 때 어느 relation에서 왔는지 확인하기 위해 식별자 같은 것을 앞에 붙였다. 파일 이름을 보고 어떤 relation을 처리하는지 구별했다.

앞서 살펴본 방법에는 문제가 하나 있다.

  • reduce에서 relation_b의 데이터가 value list 가장 끝에 있게 되면, relation_a의 데이터는 모두 버퍼에 들어가고 다시 버퍼를 돌게 된다. 이 경우는 기존 value list를 두 번 iterate한 것과 같게 된다.
  • relation_b의 데이터가 value list에서 가장 앞에 있다면 버퍼가 필요없고 계산이 가장 효율적일 것이다.

 

relation_b의 데이터를 value list에서 가장 앞에 보내게 하는 것을 학습하게 한다. 이것을 가능하게 해주는 솔루션은 secondary sorting을 composite key와 함께 사용하는 것이다. 

  • join key가 같으면 같은 reducer로 가는 것 -> Partitioner 구현
  • join key가 같으면 같은 value list에 가는 것 -> GroupComparator 구현
  • join key와 table 식별자(역순)으로 정렬 -> WritableComparator 구현

Composite Key는 복수 개의 필드를 MR의 key로 사용하는 방식이다. 기존 했던 것들은 int, text, long 등과 같은 하나의 값들을 갖고 map, reduce의 key로 사용했는데 Composite Key는 두 개의 값 (예를 들어 <1, "hello">) 을 key로 사용하는 것을 말한다. key가 두 개가 되는 것이다. joinKey, table 구별자 (예를 들어 <"K", "B">) 를 사용한다.

이제는 table 식별자까지 map의 output key로 사용한다. 그런데 이렇게 사용하면 shuffle에서 원하는 대로 모이지 않는다. B,A 만 묶이는데 이것 뿐만 아니라 A,B도 같이 묶고 싶은 것이다. Adult, 1, 100, 6, 150. 근데 composite key만 사용하게 되면 3개의 entry가 같이 묶이지 않는다. 이건 여러 의미가 있는데 reduce가 여러 개 뜰 수 있는데, 같은 list에 묶인다는 것은 3개의 entry가 같은 reduce로 일단 모이고 그 안에서 같은 value list로 묶여야 한다. value list 하나를 sorting할 때 table 식별자를 기준으로 sorting을 해야 한다는 것이다. 

이걸 가능하게 해주는 것이 secondary sorting이다. composite key에 대해서 join key만 갖고 shuffle을 하고 그 후에는 value list에 들어가는 순서를 table 식별자의 역순으로 넣게 하는 것이다.

먼저 join key가 같으면 같은 reducer로 가게 해야 한다. 그걸 partitioner가 해준다. default partitioner 대신 customer partitioner를 구현해서 MR 실행 시 끼워 넣어야 한다. partitioner의 역할은 join key가 같으면 같은 reducer에 보내는 것이다. shuffle 단계에서 일어나는 것이다. 

그 후 join key가 같으면 같은 value list에 들어가도록 해야 한다. 이를 위해선 GroupComparator를 구현해야 한다. 

WritableComparator를 구현하면 table 식별자 역순으로 정렬되도록 value list를 만든다. 

 

추가 해야 할 것

  • Composite Key -> DoubleString
  • Partitioner -> FirstPartitioner
  • GroupComparator -> FirstGroupingComparator
  • WritableComparator -> CompositeKeyComparator

 

DoubleString은 MR의 key, value로 오는 것 중 key로 움직이는 것이다. DoubleString은 key가 되는 것이고, 이걸 선언하는 순간 custom key를 만드는 것이다. 이를 위해선 WritableComparable을 구현해야 한다. 기본적으로 MR의 데이터는 네트워크를 타고 움직이기에 포맷에 맞게 read, write 해주는 메서드가 필요한데 Writable를 구현해야 해당 메서드 구현이 가능하다. readFields()는 네트워크로부터 데이터를 읽어서 DoubleString 타입의 객체를 만들어낼 때, write()는 현재 객체를 네트워크에 쓸 때 어떻게 할지 기술하는 것이다.

Comparable은 순서를 정의할 때 필요하다. key이기 때문에 key는 shuffle 단계에서 정렬을 할 수 있도록 DoubleString 객체 두 개의 순서를 정할 수 있는 메서드가 필요하고 그게 compareTo()이다. 

Writable 인터페이스 구현은 MR의 value를 구현할 때 쓴다. 지금은 WritableComparable을 구현하고 join key, table 식별자를 key로 사용한다. write()에 join key, table 식별자를 쓴 순서대로 readField()에 써야 한다. 받는 쪽에서 그 순서대로 읽어갈 수 있도록 한다. compareTo()는 두 객체의 순서를 정하는 것이기 때문에 join key에 대해서 먼저 비교를 하고 같으면 table 식별자로 비교해 순서를 정한다.

 

Partitioner 구현은 getPartition()을 구현해야 한다. 3번째 파라미터가 현재 실행하고 있는 MR의 reduce의 개수이다. numPartition = num of reduce tasks. join key가 같으면 같은 reducer로 보내줘야 하기 때문에 join key의 해시 코드에 따라서 일관되게 하나의 reducer process로 보내게 한다. 나머지 연산자를 써서 join key가 같으면 항상 같은 값을 반환하게 한다. join key가 같으면 같은 reducer로 보내지고, 그 후 같은 value list로 묶이게 한다.

compare() 메서드를 구현해야 하고 객체 2개를 받는다. DoubleString이라는 composite key의 객체를 받아서 비교한 결과를 반환한다.

같은 value list에 들어가서 table 식별자의 역순으로 정렬하도록 연산자 클래스를 제공해야 하는데 그때 사용되는 연산자가 WritableComparator이다. compare()를 구현해야 하고, DoubleString 객체 받아서 순서를 비교한다. join key가 같으면 table 식별자의 역순으로 정렬한다. -1이 붙는다는 것 주의. 같게 되면 해당 값 자체를 반환한다.

 

MR에서 map의 output key가 DoubleString이 되고, reduce의 input key가 DoubleString이 된다. 나머지 부분 다 똑같고 main이 좀 달라진다. output key 타입이 바뀌고, partitioner, groupingComparator, setSortComparatorClass를 세팅해야 한다. reduce에서 버퍼 수행 부분이 수행 안 되기에 버퍼 필요 없어진다. 

 

🍀 MapSideJoin

지금까지는 reduce에서 join이 일어났던 ReduceSideJoin이었다. 이 부분은 join이 map에서 일어난다. 

핵심 아이디어는 두 relation data 중에서 하나가 매우 작다면(작아서 메모리에 올릴 정도로) 사이즈가 작은 relation data를 map에 인메모리와 같은 것으로 올려두고 join을 mapper에서 수행할 수 있다는 것이다. 

물건 테이블에 비해 코드 테이블의 크기가 일반적으로 작다고 하고, 코드 테이블을 mapper의 메모리에 미리 올려두고 물건 테이블 처리할 때 붙여서 사용한다. mapper의 코드가 달라진다. 코드 테이블을 처리하는 것이 아니라 mapper in-memory에 올려두고, relation_a와 join하는 것이다. 

  • relation_b를 map을 수행하기 전에 미리 읽고
  • mapper에서는 relation_a만 처리
  • 미리 읽혀진 relation_b의 데이터를 이용하여 map에서 join
  • shuffle, reduce가 없음

 

Distributed Cache

  • 작은 사이즈의 읽기 전용 파일을 하나 만들고, 이는 task tracker의 working directory에 복사해주는 메커니즘을 말한다. 파일을 복사해주고 인메모리에 올려주는 것을 MR에서는 분산 캐시라고 한다.
  • 사용될 파일은 먼저 hdfs에 복사되고 그 위치가 사용되어야 한다.
  • default 최대 크기는 10GB인데 실질적으로 의미있는 최대 크기는 100MB 정도이다.
  • 어느 파일을 Distributed Cache로 사용할 것인가?
    • 물건 테이블보다 코드 테이블의 크기가 작음
    • 크기가 작은 파일은 hdfs에 반드시 올라가 있어야 함
  • 프로그램의 인자로 이 Distributed Cache 파일의 hdfs 상의 위치를 넘긴다.
  • main 함수에서 이 위치를 단순히 Distributed Cache로 등록한다.
  • mapper 실행 시 setup 메서드에서 이 위치를 참조해 이 파일을 연다.
  • 올려둔 파일은 task 실행 전에 hdfs에서 로컬 파일 시스템으로 복사되기 때문에 그냥 로컬 텍스트 파일을 읽는 코드를 작성한다. 
  • mapper의 map 메서드에서는 읽혀지는 물건 종류 코드에 대해서 setup에서 만들어진 해시테이블에 존재하는지 확인해야 한다. 적합한 자료 구조가 해시테이블이다. key를 K로 하면 Kitchen이 되돌아 오게.
    • 존재하면 key를 물건 id로 하고, value는 가격 + "\t" + 물건 종류 이름으로 해서 output으로 내보낸다. 
  • 이 경우 reducer는 딱히 할 일이 없어서 사용하지 않는다.

 

map의 setup에서 캐시 파일을 읽어오면 된다. 하나만 쓰면 배열 중 0번을 보면 필요한 값을 읽을 수 있다. main에서 addCacheFile 하게 되면 저장이 되고, 파라미터 경로는 로컬 상이 아닌 hdfs 상 경로이다. map 의 setup 실행 전에 hdfs 상 파일을 로컬로 옮기게 되고, mapper는 읽으면서 해시 테이블에 저장한다. join key를 key로 하고 des를 value로 하여 해시 테이블로 만든다. 그럼 relation_a를 처리할 때는 join key를 보고 해당 데이터가 있는지 확인하고, 있으면 이어 붙이면 된다.