CS/Data Engineering

[Data Engineering] Matrix Computation

하노정 2023. 6. 1. 16:11

📌 목차

  • Addition
  • Product
  • Product Real Java Code

 

🍀 Addition

행렬 덧셈은 행의 크기, 열의 크기가 정확하게 같아야 한다. 두 행렬의 같은 위치에 있는 값끼리 더해서 결과 행렬의 같은 위치에 배치하면 된다. 

Formal equation

Am*n + Bm*n = Cm*n

이런 행렬의 크기가 매우 클 때 덧셈을 어떻게 할 것인가? 거대한 행렬을 상상하기 쉽지 않은데, SNS에서 친구관계를 표현하는 것을 행렬로 나타낼 수 있다. 인덱스를 user의 id로 생각할 수 있다. 0번 user가 1번 user를 알게 되면 A01 = 1, A10 = 1로 표시한다. 거대한 행렬의 덧셈이나 곱셈을 하게 되면 한 대의 기계에서 데이터 처리를 할 수 없게 된다. 

 

MapReduce로 거대한 행렬 계산을 할 때, reduce에서는 suffle을 해는데 map의 output의 key를 보고 같은 key를 갖는 value들을 묶어준다. value들을 list로 묶어준다. 행렬의 덧셈은 reduce에서는 더해져야 하는 행렬 값 2개가 같은 key를 갖게 map의 output으로 만들면 된다. 덧셈을 위한 행렬의 인덱스를 그대로 행렬 계산의 output의 key로 사용하는 것이 핵심이다. 행렬의 인덱스의 행, 열, 값을 차례로 쓰는 format을 사용한다. 

Pseudo code

map의 input으로 행, 열 인덱스와 값이 들어오고 특별한 처리가 아닌, 행과 열의 인덱스를 결합해 map의 output으로 돌려준다. suffle 단계에서 같은 행과 열의 인덱스를 가진 값들이 다 list로 묶인다. 그 list 하나에 대해서 reduce 연산이 호출되고 reduce input 같은 경우는 하나의 행, 열 인덱스 쌍과 value들의 list가 들어온다. list의 value들을 합을 하는 연산을 해서 reduce의 output으로 만든다. 덧셈 연산 같은 경우는 쉽게 구할 수 있다. 

map
suffle
reduce

 

🍀 Product

1행 1열의 행렬의 곱 결과는 A 행렬의 1행과 B 행렬의 1열을 곱하면서 더한다.

결과 행렬의 첫 번째 행의 첫 번째 열 값은 A 행렬의 1행과 B 행렬의 1열의 값을 모두 곱해서 더해야 한다. Amk * Bkn = Cmn. a00 하나가 한 번만 map의 output으로 나가는 게 아니라, 결과 행렬의 1행에 모두 쓰인다. 총 n번만큼 map의 output으로 보내져야 한다. b00은 결과 행렬의 1열에 모두 쓰이고 총 m번만큼 map의 output으로 보내져야 한다. 

행렬의 곱은 MapReduce를 두 단계로 생각한다.

첫 번째 단계는 a의 i,x는 b의 x,j가 reduce에서 만나게 해야 한다. 그래야 곱할 수 있다. addition에서는 i, x, 10 이고 x, j, 5라고 적혀있을 것이다. 전자가 처리될 때 후자와 만날 수 있게 해야 하고 반대도 마찬가지로 되어야 한다. reduce에서 만나게 하려면 map의 output key를 동일하게 해야 한다. key를 i, j, x로 하겠다는 것이다. 전자를 처리할 때는 j가 없고, 후자를 처리할 때는 i가 없다. 이 부분 처리는 해줘야 한다. 아무튼 만나게 하기 위해서 map의 output key를 i, j, x로 하겠다는 것이다.

 

a의 i,x는 key를 i, 0, x 부터 i, n-1, x로 하여 총 n번 만들어야 한다. 그럼 a의 i,x는 B 행렬의 한 줄과 다 만나게 된다. b의 x,j는 key를 0, j, x 부터 m-1, j, x로 하여 총 m번 처리되어야 한다. 그럼 b의 j,x는 A 행렬의 한 줄과 다 만나게 된다. 행렬 A, B 처리 하는 게 map에서 다르게 처리된다. 

map의 결과를 suffle 한 결과에서, 같은 key를 가지는 값끼리 곱하면 된다. map의 output key가 reduce의 input key가 되고, reduce의 output key는 input key에서 마지막을 잘라서 보내면 된다. 

두 번째 단계는 map에서 딱히 할 일 없고 bypass 시킨다. reduce에서는 같은 key를 가지는 것끼리 모인다. 그것들을 다 더하게 되면 C 행렬 값을 모두 구하게 된다. 

 

🍀 Product Real Java Code

파일 포맷은 addition과 똑같다. 파일 이름을 넣은 이유는 한 줄을 읽고 한 줄이 행렬 A에서 왔는지, B에서 왔는지 알기 위함이다. 파일 이름을 얻어와서 비교한 후 어디 행렬에서 왔는지 알아낸다. 

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.GenericOptionsParser;

public class MatrixMul 
{

	public static class MatrixMulMapper1 extends Mapper<Object, Text, Text, IntWritable>
	{
		private Text word = new Text();

		private int m_value; // 행렬 A의 크기 m*k
		private int k_value; // 행렬 B의 크기 k*n
		private int n_value; // main에서 전달

		private boolean isA = false; // 처리하고 있는 행렬이 A or B 인지 파일 이름으로 체크
		private boolean isB = false;

		public void map(Object key, Text value, Context context) throws IOException, InterruptedException 
		{
			StringTokenizer itr = new StringTokenizer(value.toString());

			// 파일 한 줄 읽어서 행렬 index와 값을 추출
			int row_id = Integer.parseInt(itr.nextToken().trim());
			int col_id = Integer.parseInt(itr.nextToken().trim());
			int matrix_value = Integer.parseInt(itr.nextToken().trim());
            
			IntWritable i_value = new IntWritable();
			i_value.set(matrix_value); 

			if (isA) { // 지금 처리 중인 파일이 행렬 A인 경우, 총 n번 emit한다.
				for (int i=0; i<n_value; i++) {
					word.set(new byte[0]);
					word.set(row_id + "," + i + "," + col_id); // key를 잘 볼 것
					context.write(word, i_value);
				}
			} else if (isB) { // 지금 처리 중인 파일이 행렬 B인 경우, 총 m번 emit한다.
				for (int i=0; i<m_value; i++) {
					word.set(new byte[0]);
					word.set(i + "," + col_id + "," + row_id); // key를 잘 볼 것
					context.write(word, i_value);
				}
			}		
		}

		protected void setup(Context context) throws IOException, InterruptedException
		{
			Configuration conf = context.getConfiguration();

			m_value = conf.getInt("m", -1); // 행렬 크기를 configuration에서 읽어옴
			k_value = conf.getInt("k", -1);
			n_value = conf.getInt("n", -1);

			String filename = ((FileSplit) context.getInputSplit()).getPath().getName();

			if (filename.equals("matrix_a")) isA = true; // 행렬 A 처리 중으로 판단
			if (filename.equals("matrix_b")) isB = true; // 행렬 B 처리 중으로 판단
		}
	}

	public static class MatrixMulReducer1 extends Reducer<Text,IntWritable,Text,IntWritable> 
	{
		private IntWritable result = new IntWritable();
		private Text word = new Text();

		public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException 
		{
			int mul = 1;
			StringTokenizer itr = new StringTokenizer( key.toString(), "," );
            
            // A,B 행렬의 인덱스 추출
			int row_id = Integer.parseInt(itr.nextToken().trim());
			int col_id = Integer.parseInt(itr.nextToken().trim());

			for (IntWritable val : values) 
			{
				mul *= val.get(); // 두 값을 곱한다.
			}
			word.set(row_id + "," + col_id);
			result.set(mul);
			context.write(word, result);
		}
	}

	// bypass 로직. 딱히 할 일 X
	public static class MatrixMulMapper2 extends Mapper<Object, Text, Text, IntWritable> {
		private Text word = new Text();

		public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
			StringTokenizer itr = new StringTokenizer(value.toString());
			String i_key = itr.nextToken().trim();
			int matrix_value = Integer.parseInt(itr.nextToken().trim());
			IntWritable i_value = new IntWritable();
			i_value.set(matrix_value);
			word.set(i_key);
			context.write(word, i_value);
		}
	}

	public static class MatrixMulReducer2 extends Reducer<Text, IntWritable, Text, IntWritable> {
		private IntWritable result = new IntWritable();

		public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException 
		{
			int add = 0;
			for (IntWritable val : values)
			{
				add += val.get(); // 최종 행렬 값을 구하기 위해 모두 더한다.
			}
			result.set(add);
			context.write(key, result);
		}
	}

	public static void main(String[] args) throws Exception 
	{
		int m_value = 2;
		int k_value = 2;
		int n_value = 2;
		String first_phase_result = "/first_phase_result"; // 첫 번째 MR의 결과가 저장되는 위치

		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		if (otherArgs.length != 2) 
		{
			System.err.println("Usage: MatrixMul <in> <out>");
			System.exit(2);
		}
		
		conf.setInt("m", m_value); // 행렬 크기 setup
		conf.setInt("k", k_value);
		conf.setInt("n", n_value);

		Job job1 = new Job(conf, "matrix mult1");
		job1.setJarByClass(MatrixMul.class);
		job1.setMapperClass(MatrixMulMapper1.class);
		job1.setReducerClass(MatrixMulReducer1.class);
		job1.setOutputKeyClass(Text.class);
		job1.setOutputValueClass(IntWritable.class);

		FileInputFormat.addInputPath(job1, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job1, new Path(first_phase_result));
		FileSystem.get(job1.getConfiguration()).delete(new Path(first_phase_result), true);
		job1.waitForCompletion(true); // 첫 번째 MR 수행

		Job job2 = new Job(conf, "matrix mult2");
		job2.setJarByClass(MatrixMul.class);
		job2.setMapperClass(MatrixMulMapper2.class);
		job2.setReducerClass(MatrixMulReducer2.class);
		job2.setOutputKeyClass(Text.class);
		job2.setOutputValueClass(IntWritable.class);

		FileInputFormat.addInputPath(job2, new Path(first_phase_result));
		FileOutputFormat.setOutputPath(job2, new Path(otherArgs[1]));
		FileSystem.get(job2.getConfiguration()).delete(new Path(otherArgs[1]), true);
		job2.waitForCompletion(true); // 두 번째 MR 수행
	}
}