CS/Data Engineering

[Data Engineering] PageRank - MapReduce Java

하노정 2023. 4. 18. 21:48

📌 목차

  • PageRank
  • MapReduce 기반 PageRank
  • MapReduce 기반 PageRank Psuedo Code
  • MapReduce 기반 PageRank Real Code

 

🍀 PageRank

구글에서 시작된 알고리즘이다. 구글 창업자 2명이 평소 인터넷에 수많은 정보가 무질서하게 나열되어 있는 것에 불만을 가져 직접 검색 엔진을 개발했다. 당시 대중적인 검색 엔진보다 늦게 시작했는데, 검색 품질에서 뛰어 넘을 수 있었던 중요 요인 중 하나는 PageRank 때문이었다. PageRank구글이 검색 결과의 순위를 매기는 데 이용한 혁신적인 알고리즘이다. 

🔅 하이퍼링크 트릭

웹의 중요한 성질은 웹페이지 A가 웹페이지 B를 가리킨다는 것이다. a 태그의 href 속성으로 가능하다. 이게 하이퍼링크이고, 하이퍼링크는 클릭 시 다른 웹페이지로 연결하는 구문이다. 검색 엔진이 랭킹을 구하고자 하는 데 이용하는 가장 중요한 툴 중의 하나이고 구글 PageRank 기술의 근본이다.

하이퍼링크의 핵심적인 아이디어는 웹페이지 A를 가리키는 웹페이지의 개수가 1개이고, 웹페이지 B를 가리키는 웹페이지의 개수가 3개일 때, 웹페이지 A, B에 연결된 페이지 수를 세어 각 페이지에 있는 인커밍 링크(incoming link)의 수에 따라 레시피 순위를 매기는 방법으로 접한다는 것이다.

🔅 권위 트릭

여기서 일반인의 추천보다 전문가의 추천이 더 가치가 있어야 한다. 웹페이지 A, B를 가리키는 웹페이지 개수가 각 1개일 때, 높은 권위가 있는 페이지의 링크가 낮은 권위의 페이지의 링크보다 더 높은 순위 결과여야 한다.

🔅 하이퍼링크 트릭 + 권위 트릭

더 내려가서, 일반적으로는 일반인의 웹페이지를 가리키는 웹페이지 수전문가의 웹페이지를 가리키는 웹페이지 수보다는 적을 것이다. 당연히 중요도는 전문가의 웹페이지가 높은 것을 확인할 수 있다. 권위라는 점수를 계산하기 어렵지만 웹의 전반적인 모습을 보면 권위 역시도 하이퍼링크로 계산할 수 있다. 최종적으로는 이 모델이 반영되어야 한다.

모든 웹페이지는 최초 1점의 권위 점수로 시작하고, 예를 들어 웹페이지 a에 인커밍 링크가 생기면 (b가 a 클릭) 클릭하는 순간 a의 점수는 2점이 된다. 100번 클릭 받는 c가 있다면 c의 권위 점수는 100점이다. 그럼 당연하게 a와 c가 단일하게 d,e를 가리키게 되면, 권위 점수대로 d,e의 랭킹 점수도 2점, 100점이 될 확률이 높다.

🔅 사이클

근데 이 모델도 문제가 있다. 웹은 사이클을 형성할 수 있다. 참조관계 형성이 사이클이 될 수 있기에 권위 점수에 문제를 가져다 주고, 권위 점수가 유효하지 않고 계속 무한히 증가 되는 형태가 될 수 있다. 

🔅 무작위 서퍼 트릭 (Random surfer trick)

이 부분을 해소하기 위해서 도입된 모델이 랜덤 서퍼 모델이다. 이게 실제 구글 PageRank의 핵심 아이디어다. 하이퍼링크 및 권위 트릭과 전혀 비슷한 점이 없다.

무작위로 선정한 웹페이지 -> 하이퍼링크를 골라 선택 -> 점수 증가 -> 새로운 페이지 검토 후 안의 이 하이퍼링크 하나를 무작위로 선택 : 이러한 과정을 반복

랜덤 서퍼 모델은 2가지 연산이 있다. 

  1. 현재의 페이지에 만족하지 못하고 클릭을 통해 연결된 페이지를 방문할 확률 85% (damping factor)
    • 엣지가 3개이면, 각 링크로 나갈 수 있는 확률은 1/n(3)이다.
    • 각 1/3 * 0.85 = 약 28.3% 확률로 방문한다.
  2. 클릭을 멈추고 랜덤하게 다른 페이지에서 시작할 확률 15%
    • 엣지가 전체 웹페이지이면, 각 링크로 나갈 수 있는 확률이 1/n(전체웹페이지수) 이다.

랜덤 서퍼 모델에서 수백만번을 방문하는 시뮬레이션 돌린 후 서퍼 권위 점수를 산출한다. 시뮬레이션으로 계산한 백분율은 정확한 페이지의 권위(랭킹)를 측정하는 데 필요한 수치이다. 권위 점수는 랜덤 웹페이지를 가리키는 사용자가 이 페이지를 방문해서 보낸 시간의 백분율로 정의된다.

랜덤 서퍼 모델의 권위 점수는 웹페이지의 중요성 순위를 정하는 앞선 두 트릭을 모두 포함한다. 많은 인커밍 링크가 있는 페이지가 더 높은 순위를 받아야 한다. 높은 권위를 가진 페이지로부터의 인커밍 링크낮은 것보다 페이지 순위를 높여준다. 사이클이 있더라도 적절한 점수를 계산하는 데 문제가 없다. 이 정보를 PageRank라고 생각하면 된다. 

하지만 실제 웹페이지 수가 굉장히 많아서 시뮬레이션 하는 것은 어렵다. 따라서 위와 같은 값을 계산해주는 다른 방법을 MapReduce로 수행한다.

 

🍀 MapReduce 기반 PageRank

기본 가정이 랜덤 서퍼 모델(페이지를 랜덤하게 방문하여 탐색하는 모델)이다.

  • damping factor (d) 
    • 특정 페이지를 만족하지 못하고 연결된 다른 페이지로 넘어갈 확률
    • d = 0, 첫 번째 방문한 페이지에 항상 만족 -> 이 경우 모든 PageRank가 1로 수렴
    • d = 1, 만족하지 못하고 랜덤하게 클릭
    • 일반적으로 d 값은 0.85.
    • 85% 확률로 다른 페이지로 넘어가고, 15% 확률로 멈추고 다시 랜덤하게 페이지 방문함
    • 특정 페이지에 n개의 링크가 있으면, 각 링크를 클릭할 확률은 1/n

T1이 A 방문할 확률
A의 PageRank에 기여도

이 과정을 계속 반복한다. 이전 PageRank 값과 비교해서 큰 차이가 없을 때까지!

 

🍀 MapReduce 기반 PageRank Pseudo code

Map의 input value로는 src와 src가 링크 걸고 있는 target_id가 들어온다.

Map의 output key로는 target_id를 하나씩 내보낸다.

Map의 output value로는 해당 target_id의 src로부터의 새 기여분을 계산한 pr 값을 내보낸다.

하둡이 suffle 과정을 진행한다. 같은 key끼리 모아준다. target_id 마다 pr 값을 모아 list로 만들어준다.

Reduce의 input key로는 page_id가 들어온다.

Reduece의 input value로는 하둡이 계산한 list 값인 각 페이지별 pr 값들이 들어온다.

Reduce의 output key로는 page_id를 내보낸다.

Reduce의 output value로는 해당 page_id 마다 input value로 들어온 pr list를 바탕으로 계산한 새 pr 값을 내보낸다.

Map
Shuffle
Reduce

 

🍀 MapReduce 기반 PageRank Real code

실제 실습에서는 필요한 추가 기능이 있다. 전체 웹 페이지 개수, 페이지들의 pr 값이 필요하다. 하둡의 기능을 활용하여 코딩을 해본다. 

코드 짤 때 항상 만드는 Configuration을 활용한다. conf 파일을 이용해서 필요한 정보를 보낸다. main method에서 필요한 정보를 conf 파일에 set() 하는 작업을 한다. mapper/reducer에서 필요한 값을 get() 한다.

map/reduce input/output src, taget 을 이해하고, main의 도움을 이해하면 된다 !

import java.io.IOException;
import java.util.*;
import java.io.*;

import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.GenericOptionsParser;

public class PageRank 
{

	public static class PageRankMapper extends Mapper<LongWritable, Text, IntWritable, DoubleWritable>
	{
		private IntWritable one_key = new IntWritable();
		private DoubleWritable one_value = new DoubleWritable();
		private int n_pages;
		private double[] pagerank;

		// value로 0 1 2 3 이런 식으로 들어옴. 맨 앞이 src, 나머지가 target
		public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
		{
			StringTokenizer itr = new StringTokenizer(value.toString());
			int n_links = itr.countTokens() -1; // outgoing edge 수는 src 빼고 세야함. -1
			if (n_links == 0) return; // outgoing edge가 0이면 끝냄

			int src_id = Integer.parseInt(itr.nextToken().trim());
			double pr = pagerank[src_id] / (double)n_links; // src의 pr 값/src의 outgoing edge 수
			one_value.set(pr); // target에 src의 pr 값 배분
            
			while (itr.hasMoreTokens())
			{
				int target_id = Integer.parseInt(itr.nextToken().trim());
				one_key.set(target_id); // src 제외 target
				context.write(one_key, one_value);
			}

		}

		protected void setup(Context context) throws IOException, InterruptedException
		{
			Configuration conf = context.getConfiguration(); // context 이용해 conf 읽어옴
			n_pages = conf.getInt("n_pages", -1); // 총 page 개수 읽어옴
			pagerank = new double[n_pages]; // 총 page 개수만큼 배열 생성
			
			for (int i=0; i<n_pages; i++)
			{
				pagerank[i] = conf.getFloat("pagerank" + i, 0); // 현재 pr 값. main에 setFloat() 있어야 함
			}
		}
	}

	public static class PageRankReducer extends Reducer<IntWritable,DoubleWritable,IntWritable,DoubleWritable> 
	{
		private DoubleWritable result = new DoubleWritable();
		private double damping_factor = 0.85;
		private int n_pages;

		public void reduce(IntWritable key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException 
		{
			double agg_val = 0;

			// 새 pr 값 구하기
			for (DoubleWritable val : values) 
			{
				agg_val += val.get();
			}
			agg_val *= damping_factor;
			agg_val += 0.15/n_pages;

			result.set(agg_val);
			context.write(key, result);
		}

		protected void setup(Context context) throws IOException, InterruptedException
		{
			Configuration conf = context.getConfiguration();
			n_pages = conf.getInt("n_pages", -1); // 웹 페이지 수 얻어오기 -> reduce가 쓸 수 있음
		}

	}

	public static void main(String[] args) throws Exception 
	{
		int n_pages = 4;
		int n_iter = 3;
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

		if (otherArgs.length != 2)
		{
			System.err.println("Usage: PageRank");
			System.exit(2);
		}

		initPageRank(conf, n_pages); // 최소 한 번은 총 페이지 수를 저장

		for (int i=0; i<n_iter; i++)
		{
			Job job = new Job(conf, "page rank");
			job.setJarByClass(PageRank.class);
			job.setMapperClass(PageRankMapper.class);
			job.setReducerClass(PageRankReducer.class);

			job.setOutputKeyClass(IntWritable.class);
			job.setOutputValueClass(DoubleWritable.class);
			job.setInputFormatClass(TextInputFormat.class);

			FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
			FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
			FileSystem.get(job.getConfiguration()).delete( new Path(args[1]), true);
			job.waitForCompletion(true);
			updatePageRank(conf, n_pages); //새로 계산된 pr로 업뎃
		}
	}

	public static void initPageRank(Configuration conf, int n_pages)
	{
		conf.setInt("n_pages", n_pages); // 총 page 개수를 mapper와 reducer에 전달
		for (int i=0; i<n_pages; i++)
		{
			conf.setFloat("pagerank" + i, (float)(1.0/(double)n_pages)); // 최초 pr set (1/n)
		}
	}

	public static void updatePageRank(Configuration conf, int n_pages) throws Exception
	{
		FileSystem dfs = FileSystem.get(conf);
		Path filenamePath = new Path("/user/bigdata/output3/part-r-00000"); // output 파일의 절대경로
		FSDataInputStream in = dfs.open(filenamePath);
		BufferedReader reader = new BufferedReader(new InputStreamReader(in));

		String line = reader.readLine();
		while(line != null) // 반복
		{
			StringTokenizer itr = new StringTokenizer(new String(line));
			int src_id = Integer.parseInt(itr.nextToken().trim());
			double pr = Double.parseDouble(itr.nextToken().trim());
			conf.setFloat("pagerank" + src_id, (float)pr);
			line = reader.readLine();
		}
	}
}