[Data Engineering] MapReduce Programming - WordCount, Combiner Java
📌 목차
- MapReduce
- WordCount
- WordCount JAVA CODE
- Combiner
WordCount 프로그램으로 MapReduce에서 사용하고 있는 data type, map/reduce class 상속, MapReduce를 구동시키기 위한 driver 프로그램에 대해 다룬다. 간단한 프로그램인데 MapReduce 이해하기 좋다. Combiner 라는 optimize하는 기법도 있다.
🍀 MapReduce
함수형 프로그래밍에서 시작했다. MapReduce의 핵심은 프로그래머는 map/reduce 함수를 구현해야 한다는 것이다. MapReduce를 실행할 수 있는 driver 코드를 작성하게 되면 데이터 처리를 할 수 있다.
key-value들의 변화를 갖고 row data에서 가치 있는 information을 추출하게 된다.
- map 함수는 key-value를 input으로 받고, 한 개 이상의 새로운 key-value를 output으로 낸다.
- reduce 함수는 map의 output에서 같은 key를 가지는 value들의 array를 input으로 받아서 새로운 key-value를 output으로 낸다.
하둡의 전체적인 동작방식을 나타낸다. 분산되어 있는 서버에 빅데이터가 저장되어 있다. 빅데이터를 각각의 map 함수의 input으로 주면, 그 결과를 버퍼에 담아두게 되고 같은 key를 가지는 value들을 묶은 후 reduce 전달한다.
🍀WordCount
문제 : 주어진 text에서 각 단어가 몇 번 등장하는지
예 : data : to be or not to be / results : (to, 2), (be, 2), (or, 1), (not, 1)
만약 text에서 cancer라는 단어가 다른 단어에 비해 굉장히 많이 등장하면, 그 문서는 cancer라는 주제에 굉장히 가까운 문서라고 판단할 수 있다. 각 문서에서 각 단어가 몇 번 나오는지 미리 계산하는 과정이므로 매우 중요한 프로그램이다.
처음 input으로 들어오는 key는 text 파일 시작부터 value 문장이 얼마나 떨어져 있는지 값이다. 지금은 offset이 중요한 정보가 아니고 실제 value 내용이 중요하므로 넘어간다. 어쨌든 value는 text 한 줄이다. 하둡이 text 파일 한 줄 읽고 map 함수를 호출한다.
Mapper에서는 value 문장(한 줄)에 대해서 단어별로 찢는다.(Tokenize value) to 1번, be 1번, or 1번, not 1번 .. 이런 식으로 진행된다. 단어 하나씩 찢을 때마다 각 단어가 1번 발생한다. 각각 key-value list를 map의 output으로 낸다. 각 단어별로 6개가 나온다. (emit) 하둡 엔진으로 보낸다.
Map의 output을 갖고 하둡이 해주는 일이 있다. "Suffle" : 같은 key를 갖는 key-value entry를 묶어준다. ("to", [1,1]) value들을 array로 묶어주고, 묶은 그 값도 key-value가 된다. 이 output이 reducer의 input이 되고 각 쌍마다 한 번씩 reduce 함수를 하둡이 호출된다.
Reducer의 input이 ("to", [1,1]) 이렇게 들어오면, 결과는 ("to", 2)가 된다.
하둡, 빅데이터 시스템의 기본 철학은 데이터를 분산시키고, 코드를 데이터 쪽으로 보낸다는 것이다.
map의 input, reduce의 output 모두 HDFS에 저장이 되거나 저장되어 있다. 프로그램 실행 전에 HDFS 안에 map의 input 파일을 올려야 하고, 결과 생성되면 다시 HDFS에서 다운로드 해서 확인해야 한다.
🍀WordCount JAVA CODE
🔅 main 함수
MapReduce를 구동시켜주는 Driver 코드이다. 세팅을 여러가지 하는데, job에 계속 세팅해서 담아주고, job은 configuration에 들어간다. job의 configuration을 하나하나 잡아가는 코드이다.
- map/reduce 함수가 있는 클래스 지정
- map input 파일 위치, reduce output 저장 디렉토리 지정(HDFS)
- 입출력 포맷 지정(text)
- 최종 reduce의 output key-value 타입
- map의 출력 key-value 타입
각각 다 세팅하고 마지막에 Job을 시작하라고 호출한다. 하둡에서 이 내용들을 읽어서 각각 HDFS 파일 위치에 map/reduce 코드를 전달해서 프로그램을 시작할 수 있도록 한다. 경로는 커맨드 라인으로 args를 받는다. 그리고 job을 시작한다. main이 끝나면 커맨드 라인 상 입력한 args[1]을 열면 결과를 확인할 수 있다.
🔅Map class
Mapper라는 부모 클래스를 상속 받는다. generic 타입 4개가 있는데 왼쪽부터 map의 input key, map의 input value, map의 output key, map의 ouput value의 타입이다.
map이 한 번 호출되려면 하둡이 한 줄 읽어서 호줄된다. 한 줄이 파일에서 얼마나 떨어져 있는지 offset이 map의 input key가 된다. Integer를 하둡에서 나타내는 LongWritable 타입을 적는다.
map 함수에서 해주는 일은 input을 parameter로 받아서 output을 내는 것이다. map의 첫번째 두번째 파라미터 타입은 generic과 동일하다. 세번째 파라미터 타입은 Context인데 이건 하둡 엔진을 나타낸다고 생각하면 된다. 파라미터 중 key, value Context는 동일하고 key, value의 타입이 달라질 수 있다.
- word가 output의 key, one이 ouput의 value이다. WordCount는 value가 다 1이라 1로 초기화한다.
- text 한 줄인 value에서 담긴 문자열을 추출한다. toString()
- 한 줄을 StringTokenizer로 단어별로 찢는다.
- hasMoreTokens()가 false이면 return한다.
- tokenizer.nextToken()을 호출하면 다음 단어를 ouput의 key로 만들어줄 부분에 세팅한다.
- output으로 나가는 것이 context.write(word, one)이다. 하둡에 써주는 것이다. 이때 Map class의 generic 3,4번째 타입과 word, one의 타입이 일치해야 한다. output의 key, value 타입 !
🔅 하둡 기본 타입
왜 별도로 타입을 지정했는지 알아야한다. 하둡은 기본적으로 분산시스템이다. 데이터들이 노드 사이로 주고 받는다는 뜻이다. 주고 받는다는 것은 네트워크를 통해서 이루어진다. 그냥 Integer, Long 타입을 쓰면 안되고 네트워크 전달을 위해서 사용하기 편리한 타입을 정해놓는다. 그것이 java의 well-known 타입과 매핑되도록 제공한다. 그것이 Writable 타입이다.
- 밸류 타입
- MapReduce에서 밸류로 사용되는 타입은 반드시 Writable 인터페이스를 지원
- 이는 하둡 노드들간의 RPC(Remote Procedure Call. 결론은 네트워크로 전송한다는 의미) 시 parameter나 return value 부분의 serialization과 deserialization을 위함.
- Writable에는 두 개의 메소드가 존재 - write(output), readFiedls(input)
- 키 타입
- MapReduce에서 키로 사용되는 타입은 반드시 WritableComparable 인터페이스를 지원
- 이는 하둡 노드들간의 RPC 시 밸류 부분의 serialiation과 deserialization과 shuffling/sorting 시 키 값의 비교를 위함
- WritableComparable에는 Writable 인터페이스 이외에 Java의 Comparable 인터페이스를 지원 - write(output), readFiedls(input), compareTo(o)
🔅Reduce class
Reducer 클래스 상속 받는다. generic 순서대로 reduce의 input key, value, reduce의 output key, value 타입이다. 1,2번째 타입인 reduce의 input 타입은 map의 output 타입과 일치해야한다.
reduce 함수는 같은 key를 가지는 value들을 array로 넘겨준다. 그게 Iterable이다.
- for문으로 values에 대해 호출해서 각 list value 값이 val에 담긴다. val은 LongWritable 타입이라서 실제 안에 있는 값을 얻어 오기 위해서는 get()을 호출해서 sum에 누적으로 더한다.
- reduce의 key, value를 다시 써줘야 한다. reduce의 input key가 그대로 reduce의 output key가 된다. sum이 새로운 value가 된다.
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount
{
public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable>
{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException
{
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens())
{
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>
{
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
{
int sum = 0;
for (IntWritable val : values)
{
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2)
{
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(WordCountMapper.class);
job.setCombinerClass(WordCountReducer.class);
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
FileSystem.get(job.getConfiguration()).delete( new Path(otherArgs[1]), true);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
🍀Combiner
WordCounter에서 Combiner를 사용해서 프로그램 성능을 올릴 수 있다.
Combiner는 map, reduce 사이에 들어가는 중간 계산 단계라고 생각하면 된다.
- map에서 어떤 계산을 하게 되면, 하둡이 한 줄을 읽고 map 함수를 실행해서 결과를 내는 것을 반복한다. 다른 노드에서도 마찬가지이다. 이 결과는 shuffle이란 단계를 거쳐서 다음 단계가 진행된다. suffle은 map의 output key가 같은 value들을 다 모아서 array로 만들어주는 것이다.
- 근본적인 모양 자체는 reduce와 같다. map의 ouput의 key들이 같은 value들을 묶어서 Combiner에게 넣어준다. reduce로 들어가기 전 글로벌 shuffle을 했다면, 여기서는 로컬하게 컴퓨터 한 대 내에서만 suffle을 하고 Combiner를 호출해 한 번 더 모아서 계산을 해주는 것이다. (machine 내에서 결과)
map의 output의 일부를 모아 처리하기 때문에 map의 result 크기를 감소시킨다. 일반적으로 reduce 함수와 비슷한 일을 map이 수행하고 있는 machine에서 수행하고 suffle 오버헤드를 줄여준다. reduce가 수행되기 전에, 각 key에 대해서 map/combiner에서 생성한 value들이 list로 모인다.
WordCount에서는 reduce에서 하는 작업이 Combiner에서 하는 일과 정확히 같다. 따라서 Combiner를 별도의 코드로 작성하는 것이 아니라 Combiner class를 main에서 지정만 하면 된다. WordCount 코드에서 추가적으로 main 함수에 한 줄만 추가하면 된다. 그럼 더 성능이 좋은 프로그램을 얻게 된다.
job.setCombinerClass(Reduce.class);