CS/Data Engineering
[Data Engineering] Inverted Index Java
하노정
2023. 4. 17. 14:56
📌 목차
- Inverted Index
🍀 Inverted Index
문제 : "IMF"는 어디에 나오는지?
예 : results : (Doc1, 11), (Doc2, 11)
문서의 시작부터 얼만큼 떨어져있는지 offset까지 출력한다. Financial은 Doc1, 2, 4에 등장하고 각각 시작부터 첫 바이트에 나온다는 것이다. 오른쪽 이 Inverted Index의 결과이다.
Mapper는 시작에서부터 몇 바이트 떨어져있는지 value로 만들어 output으로 보낸다. 각 단어를 key로 하고 문서 ID와 offset 정보를 만들어서 하둡에 전달한다.
Shuffle은 같은 key를 가지는 value들을 모아서 array로 만든다.
Reducer는 하나의 entry를 받아서 처리하는데, Inverted Index는 별다른 처리를 하지 않고, input의 key를 output의 key로 활용하고, input의 value를 string으로 이어서 하나의 string으로 만들고 reducer의 output의 value로 만든다.
Inverted Index를 이해하기 위해 2가지 알아야 할 게 있다.
- How to get filename in the map function?
- WordCount에서 처음 input의 key는 그 한 줄이 파일에서 얼마나 떨어져 있는지이고 value는 한 줄의 내용이었다. key-value으로는 filename을 얻어올 수 없다. 근데 Inverted Index에서는 filename이 출력되어야 하고, map에서 알아내야 한다.
- 각 map/reduce에서는 setsup이라는 메소드를 제공해준다. map 함수가 최초 불리기 전에 mapper 클래스에 대한 object가 만들어지면, 그때 최초로 한 번 하둡이 setup 메소드를 호출한다.
- setup 메소드를 오버라이드해서 구현하면 된다. 접근 제안자가 protected이고, parameter는 Context이다. map의 전체적인 환경정보를 담은 객체가 parameter로 넘어간다. parameter context에 getInputSplit()을 호출하면, 현재 mapper가 실행하는 부분에 대한 정보가 담긴다. HDFS 상의 path 정보를 getPath()로 얻어오고, 그 중에서도 filename만 getName()으로 가져올 수 있다. 전역변수로 filename으로 받고, 이를 map 함수에서 사용하면 된다.
- 정리하면, 하둡이 mapper를 띄우고 mapper에 대한 setup을 호출한다. 처음 호출할 때 filename을 얻어와서 전역변수에 넣어서 그대로 사용하면 된다.
- How to calculate the position of each word in the map function?
- map의 key를 이용해서 offset을 계산할 수 있어야 한다.
- map 함수의 input key가 해당 라인의 offset이다. key에 대해 get()하면 한 줄에 대한 byte offset을 구할 수 있음.
- Tokenizer로 단어를 찢고, map의 output을 만들 때 key는 매 단어가 된다. value는 filename과 offset이 된다. offset은 위에서 구한 offset을 그대로 쓰면 안 되고, 단어가 한 줄 처음부터 얼마나 떨어져 있는지를 더해서 map의 output value로 만들어야 한다.
map 함수에서 단어의 offset 구하는 부분 위주로 보면 된다.
import java.io.IOException;
import java.util.*;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.GenericOptionsParser;
public class InvertedIndex
{
public static class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text>
{
private String filename;
private Text word = new Text(); // output key
private Text pos = new Text(); // output value
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
long byte_offset = key.get();
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens())
{
String token = itr.nextToken();
word.set(token.trim()); // 매 단어가 output의 key
if (! token.equals(" ")) // token이 공백이 아니면 = 단어
{
pos.set(filename + ":" + byte_offset); // 위에서 구한 한 줄의 offset
context.write(word, pos); // output 출력
}
byte_offset += token.length() + 1; // 한 줄의 offset에 단어 길이만큼 더하고 1을 더해줌 ! (다음 단어는 공백 하나 뒤가 offset)
}
}
protected void setup(Context contex) throws IOException, InterruptedException
{
filename = ((FileSplit) contex.getInputSplit()).getPath().getName();
}
}
public static class InvertedIndexReducer extends Reducer<Text,Text,Text,Text>
{
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
Text result = new Text();
StringBuffer values_buffer = new StringBuffer();
for (Text val : values)
{
values_buffer.append( val.toString() );
values_buffer.append(" ");
}
result.set(values_buffer.toString());
context.write(key, result);
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = new Job(conf, "inverted index");
job.setJarByClass(InvertedIndex.class);
job.setMapperClass(InvertedIndexMapper.class);
job.setReducerClass(InvertedIndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
FileSystem.get(job.getConfiguration()).delete( new Path(args[1]), true);
job.waitForCompletion(true);
}
}