CS/Data Engineering

[Data Engineering] Relational Data Processing (Projection/Selection), Top-k

하노정 2023. 6. 8. 13:33

📌 목차

  • Relational Data
  • Projection
  • Selection
  • Real Java Code (Projection + Selection)
  • Top-k

 

🍀 Relational Data

MapReduce를 활용해서 Relational Data를 처리한다. Relational Data는 DB Table에 저장되어 있는 형태를 말한다. entity, relation을 갖고 DB를 모델링한다고 학습했을 것이다. 예를 들어 entity는 학생, 과목, 교수이고 개별 table로 관리가 될 것이다. relation은 entity 사이의 관계를 말한다. 학생이 과목을 '수강'하고, 교수가 과목을 '강의'한다.  Relational DB는 entity와 relation을 모두 table 형태로 모델링한다.

DB에서 select, join, order by 등의 쿼리로 sql을 처리할텐데 Large data의 경우에는 싱글 DB machine에 저장이 안 된다. Large data의 경우 이러한 질의를 MapReduce로 어떻게 처리할 것인지 학습한다. 

  • 테이블과 같이 정형화되어 있는 데이터
  • MapReduce에 적합한 응용
    • 전체 혹은 많은 데이터를 읽어서 분석하는 응용
    • 예) decision support system (의사 결정 시스템)

insert, update, delete 이런 질의를 다루는 것이 아니라 분석이기에 select와 관련된 질의를 주로 다룬다. select 질의를 MapReduce로 어떻게 구현할 것인지 본다. 데이터 예시는 다음과 같다. 

Example

 

🍀 Projection

첫 연산은 레코드에서 특정 필드들만 추출하는 것이다. 예를 들어 위의 예시 데이터에서 사원 id와 salary 정보만 추출하는 것이다. 

Map의 input으로 들어오는 값에서 실제 원하는 필드만 추출해서 레코드를 만들고 output으로 던져준다. reduce는 할 연산이 없으므로 존재하지 않는다.

 

🍀 Selection

다음 연산은 특정 조건을 만족시키는 레코드들만 추출하는 것이다. 예를 들어 salary가 특정 값 이상인 레코드만 추출하는 것이다. 

Map의 input으로 들어오는 값에서 레코드가 우리가 생각한 조건을 만족하는지 확인 후 만족 시 레코드를 output으로 던져준다. reduce는 마찬가지로 할 일이 없으므로 존재하지 않는다.

 

🍀 Real Java Code (Projection + Selection)

예시로 salary가 3,500,000 이상인 레코드에서 사원 id와 salary 정보만 추출한다. 전자가 selection이고, 후자가 projection이다. 파일 포맷은 한 줄이 레코드 하나이고, 필드 구별할 때는 |로 구별한다고 가정하고 코드를 작성한다.

왼쪽과 같이 input으로 들어가면 오른쪽과 같이 output이 나와야 한다.

Map 함수에는 한 줄, 한 줄이 value로 들어온다. value가 tokenizer를 사용해 |로 구별해 추출한다. 차례로 사원 id, 부서 id, salary, emp_info를 추출하고 salary 조건을 걸어 map의 output을 만든다. 이 경우는 emp_id와 salary를 합친 값을 key로 만들었고, result는 비어있는 문자열을 value로 하여 result로 MapReduce의 output으로 만들었다. 실제 정보는 모두 key에 저장된다. 

Main 함수는 기존과 똑같은데 reduce가 없다. reduce의 task 개수를 0으로 하면, reduce를 사용하지 않는다. 

 

🍀 Top-k

어떤 조건을 만족시키는 것들 중에 특정 필드가 높거나 낮은 것 k개를 표현할 때 쓰는 operator이다. mysql의 경우는 select, order by, desc limit 10 의 조합으로 질의를 작성한다. 상위 10개만 출력된다는 것이다. DB에서 order by, desc/asc, limit을 이용하게 되면 Top-k를 쓸 수 있다. Large data에서 특별히 어떠한 필드가 높은 것 10개를 전체 distribute system에서 추려내는 operator를 본다.

Example

예를 들어 salary가 높은 상위 3명을 추출하는 것이다. MapReduce 프로그램에서는 order by를 만들기가 어렵다. 따라서 상위 3명만 추출하는 것인지, 금액에 대해 정렬해서 출력하라는 것은 아니다. 순서와 상관없이 3명을 출력하면 된다. 

Map은 여러 머신에서 수행이 된다. 예를 들어 파일의 크기가 굉장히 크다면 128MB 마다 Map이 하나씩 뜬다. 파일의 개수가 많아지게 되면 파일의 사이즈가 정해진 128MB 보다 작더라도 파일당 Map이 하나씩 뜬다.

핵심 아이디어는 개별적인 Map에서 각각 Top-k를 추출해서 MapReduce의 output으로 만들고, Reduce에서는 Map 머신이 3개가 떠있으면 총 3k개의 레코드를 받게 된다. 이 중 k개를 추출해서 최종 output을 만들어내는 것이다. Top-k를 실행하기 위해서 Map을 여러 개 띄우고 Reduce를 한 개만 띄워서 3k개의 output -> k개의 output을 만드는 것이다. Map에서 k개의 output을 MapReduce의 output으로 만들어내면 된다. 

 

Map은 한 줄을 처리한 후 output으로 만들거나 만들지 않거나 둘 중 하나를 결정했었는데, 이 경우는 Map 프로세스가 전체 레코드를 봐야지만, k개의 레코드를 식별할 수 있다. 가장 마지막에 있는 데이터까지 봐야지만 Top-k가 결정된다. 기존 프로그램 짰던 방식과 다른 부분이다.

따라서 이 경우에는 매 레코드마다 MapReduce의 output으로 던지는 것이 아니라, k개를 담을 수 있는 버퍼를 만들고 그 버퍼에 넣거나 빼거나 결정하는 것이다. 최초 버퍼는 비어있다. 버퍼 크기만큼의 레코드는 순서대로 다 담기게 된다. 3개까지 다 찼으면 4번 처리할 때는 버퍼 속 가장 salary가 낮은 3번보다도 적기 때문에 4번 레코드는 Top-k 버퍼에 진입하지 못한다. 5번은 버퍼 속 가장 salary가 낮은 3번보다 크기에 Top-k 버퍼 속 데이터가 빠지고 들어가게 된다. Top-k 버퍼를 유지하여 조건에 만족하는 레코드를 유지하고 있다가 Map이 전체 데이터를 다 처리하고 나면 Top-k 버퍼를 MapReduce의 output으로 만들면 되는 것이 핵심 아이디어다. 

실행 순서는 setup 호출 후에 map, map, map ..  이런 식이다. map을 다 호출하고 나면 cleanup 메서드를 호출하여 삭제해야 하거나 반환해야 하는 리소스가 있으면 cleanup 메서드에서 처리한다. cleanup이 호출되었다는 얘기는 map이 전부 호출되었다는 것이고, 이 메서드 속에서 Top-k 버퍼 속 내용을 MapReduce output으로 만들면 된다. 즉, map의 cleanup 메서드를 override 하면 되고 여기서 k개를 내보낸다. 

 

Reduce 함수는 map과 비슷하게 버퍼를 이용해 k개를 한 번에 output으로 만든다. 

 

🍀 Java Real Code (Top-k)

Emp class 종업원 정보를 나타내는 클래스이다. Top-k 버퍼 속 종업원 정보를 객체로 유지하기 위해 이 클래스를 만든다. output으로 만들기 위한 getString()을 추가한다.

public class Emp { // 종업원 정보를 나타내기 위한 클래스 
	public int id;
	public int salary;
	public String dept_id;
	public String emp_info;

	public Emp(int _id, int _salary, String _dept_id, String _emp_info) {
		this.id = _id;
		this.salary = _salary;
		this.dept_id = _dept_id;
		this.emp_info = _emp_info;
	}

	public String getString() // output을 위한 메서드
	{
		return id + "|" + dept_id + "|" + salary + "|" + emp_info;
	}
}

 

Buffer는 우선순위 큐를 이용해서 구현한다. 큐 안에 들어가는 자료구조는 Emp class이다. 큐의 크기는 10인데 객체가 10개 들어간다는 것이고 비교를 해주는 연산을 담당하는 클래스 객체도 추가로 들어간다. 우선순위를 정하는 것이기에 이 비교 연산자를 구현한 compare() 메서드를 가진 EmpComparator 클래스가 들어간다. 내림차순 정렬이면, 큐의 head에 salary가 가장 낮은 Emp가 위치한다는 것이다.

Mapper, Reducer에 insertEmp가 필요하다. 종업원 정보를 큐에 넣는 코드가 필요하다. 큐에서 가장 head 정보를 꺼내서 큐의 크기가 topk보다 작으면 그냥 add 한다. 큐에 있는 가장 작은 salary보다 지금 막 들어온 게 더 크면 큐에 들어와야 한다. add 한 후 큐의 크기가 topk보다 커지면, 큐 head에 있는 것을 지운다. 큐의 head만 보고 비교하면 된다. 

map은 결과를 던지는 것이 아니라, 버퍼에 insert 한다. insertEmp() 해도 insert가 안 될 수 있는데 해야 한다. setup에서는 main에서 k 받아와서 우선순위 큐를 생성한다. cleanup에서는 head에서 하나씩 뽑아내면서 map의 output으로 뽑아낸다. k가 3이라면, 모든 파일의 모든 줄에 대해서 map을 호출한 뒤에 cleanup이 호출되었기에 top-k 버퍼에는 3개의 객체만 유지될 것이고 유지된 것을 head부터 꺼내면서 MapReduce의 output으로 만들어주게끔 호출하면 된다.

reduce도 마찬가지이다. 여러 개의 mapper에서 top-k가 몰릴 것이기에 바로 output으로 처리하는 것이 아니라 top-k 버퍼에 넣는다. setup, cleanup도 마찬가지다.

Main은 topk를 정해준다. 우리는 파일이 작기에 reduce가 한 개인데, reduce가 여러 개 뜰 수 있다. 방지하기 위해서는 reduce의 클래스 개수를 1개임을 세팅한다. setNumReduceTasks(1)로 1개로 세팅해주면 1개의 reduce가 모든 mapper의 top-k 의 결과를 받고, 1개의 reduce가 다시 top-k를 만들기에 여러 개 뜨는 것도 방지할 수 있게 된다. 

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.GenericOptionsParser;

public class Topk 
{
	public static class EmpComparator implements Comparator<Emp> { // 우선순위 큐 비교 연산자 클래스 (값이 젤 낮은 게 head에 위치)
		public int compare(Emp x, Emp y) {
			if (x.salary > y.salary) return 1;
			if (x.salary < y.salary) return -1;
			return 0;
		}
	}
	
    // 종업원 정보를 큐에 넣는 메서드 
	public static void insertEmp(PriorityQueue q, int id, int salary, String dept_id, String emp_info, int topK) {
		Emp emp_head = (Emp)q.peek();
		if (q.size() < topK || emp_head.salary < salary) 
		{
			Emp emp = new Emp(id, salary, dept_id, emp_info);
			q.add(emp);
			if (q.size() > topK) q.remove();
		}	
	}

	public static class TopkMapper extends Mapper<Object, Text, Text, NullWritable>
	{
		private PriorityQueue<Emp> queue;
		private Comparator<Emp> comp = new EmpComparator();
		private int topK;

		public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
		{
			StringTokenizer itr = new StringTokenizer(value.toString(), "|");

			int emp_id = Integer.parseInt(itr.nextToken().trim());
			String dept_id = itr.nextToken().trim();
			int salary = Integer.parseInt(itr.nextToken().trim());
			String emp_info = itr.nextToken().trim();
			insertEmp(queue, emp_id, salary, dept_id, emp_info, topK); // 실제 큐에 들어갈지는 모르지만 일단 호출 후 판단하게 함
		}

		protected void setup(Context context) throws IOException, InterruptedException
		{
			Configuration conf = context.getConfiguration();
			topK = conf.getInt("topK", -1);
			queue = new PriorityQueue<Emp>(topK, comp);
		}

		protected void cleanup(Context context) throws IOException, InterruptedException 
		{
			while (queue.size() != 0) {
				Emp emp = (Emp)queue.remove();
				context.write(new Text(emp.getString()), NullWritable.get()); // output 처리
			}
		}
	}

	public static class TopkReducer extends Reducer<Text,NullWritable,Text,NullWritable> 
	{
		private PriorityQueue<Emp> queue;
		private Comparator<Emp> comp = new EmpComparator();
		private int topK;

		public void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException 
		{
			StringTokenizer itr = new StringTokenizer( key.toString(), "|" );
			int emp_id = Integer.parseInt(itr.nextToken().trim());
			String dept_id = itr.nextToken().trim();
			int salary = Integer.parseInt(itr.nextToken().trim());
			String emp_info = itr.nextToken().trim();
			insertEmp(queue, emp_id, salary, dept_id, emp_info, topK);
		}

		protected void setup(Context context) throws IOException, InterruptedException {
			Configuration conf = context.getConfiguration();
			topK = conf.getInt("topK", -1);
			queue = new PriorityQueue<Emp>(topK, comp);
		}

		protected void cleanup(Context context) throws IOException, InterruptedException {
			while(queue.size() != 0) {
				Emp emp = (Emp) queue.remove();
				context.write(new Text(emp.getString()), NullWritable.get());
			}	
		}
	}


	public static void main(String[] args) throws Exception 
	{
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		int topK = 3;
		if (otherArgs.length != 2) 
		{
			System.err.println("Usage: TopK <in> <out>");
			System.exit(2);
		}
		
		conf.setInt("topK", topK);

		Job job = new Job(conf, "Topk");
		job.setJarByClass(Topk.class);
		job.setMapperClass(TopkMapper.class);
		job.setReducerClass(TopkReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		FileSystem.get(job.getConfiguration()).delete(new Path(otherArgs[1]), true);
		System.exit(job.waitForCompletion(true) ? 0:1);
	}
}