source: http://en.wikipedia.org/wiki/Cosine_similarity
前陣子在 「台灣區 Hadoop 使用者社群會議」看到 Image Selection for Large-Scale Flickr Photos using Hadoop 中,使用 Cosine Similarity 計算向量的相似程度,才想起原來 MapReduce 的架構也可以用這啊!第一次聽到 Cosine Similarity 可能是大一的線性代數或是微積分,但第一次專題用到的實做,則是在大四 Datamining 的課程上,當時我們正在作一個找出相似遊戲玩家的專題,因此使用 Vector 跟 Cosine 來計算相似程度,但那次並不是我實做這類東西,我只負責用 Perl 去撈別人家的資料庫,哈。
最近讀 paper 有點悶,就先自己想了一下 Cosine Similarity 實作方式,發現有些地方卡卡的,另外,覺得做出來的還是要負擔一次把所有資料讀進記憶體的花費,這應該不是好的解法,所以就使用 "Cosine Similarity" 跟 "Hadoop" 或 "MapReduce" 關鍵字來搜尋一下,就找到以下幾篇:
- 2008-10-06 Computing Pairwise Document Similarity in Large Collections: A MapReduce Perspective
- 2009-11-10 Image Selection for Large-Scale Flickr Photos using Hadoop
當我了解實做上採用的演算法時,不禁感嘆這演算法的美好,所以決定來實做一下。另外,有個插曲是我請教強者同事關於連續做兩個有相依性的 job 問題時,發現他也實做過這個演算法 :D 還好我也先實做完才問,不然應該會很偷懶吧,哈
- Pairwise Document Similarity in Large Collections with MapReduce
- Hadoop Map-Reduce Tutorial r0.15.2 - JobControl
- org.apache.hadoop.mapred.jobcontrol
然後我有點龜毛地跟同事閒聊為什摩上頭提到的 Document Similarity 的計算都沒有完整做完 Cosine Similarity 的動作,就只是作向量的內積部份( Dot product , Inner product space ),後來討論時也才想到,其實 Document Similarity 重點在於 keyword 是否有 match 到,因此向量內積的算法,恰巧可以用這樣個觀點上,只是我還是覺得些不妥,因為呈現的數值沒有正規劃,假設有兩組向量的比較
- ( 1 , 1 ) , ( 1 , 100 )
- ( 1 , 1 ) , ( 1 , 1 )
以單純向量內積結果,第一組數值是 101 , 第二組是 2 ,但我覺得相似程度應該是第二組最好,完全 match 啊,以 Cosine Similarity 的作法,就會變成 1 ~ -1 之間,即 0.714142143 和 1 ,就可以很輕易看出第二組最好囉。只是實用上,可能計算量大而不適用!但我還是打算實做完 Cosine Similarity 啦,若單純想用 Document Similarity 可以到 Pairwise Document Similarity in Large Collections with MapReduce 逛逛,上頭有完整的 Java 程式囉!另外,若又想要降低計算量,又想計算 Cosine Similarity ,那可以先將向量正規化,讓他們的長度都為 1 ,接著再用 Document Similarity 算,就可以是正解啦!
對於 Cosine Similarity 的實做,採用 Sparse Matrix/Vector 的方式紀錄各物件的屬性,流程如下:
物件 (Source) -> 取出特性 (1st Map) -> 依此特性進行收集 (1st Reduce) => 以特性為主體,決定有多少物件要計算內積 (2nd Map) -> 以內積項目進行收集與累積數值 (2nd Reduce)
第一次是以屬性(feature)作為丟給 Reducer 的分群,第二次則以兩內積作為丟給 Reducer 的分群依據。
透過這樣的架構,一開始在對物件取資料時,就可以用多個 Mapper 同時取資料做事,至於缺點部份,則是第二次 Map 在決定有多少項目要作內積相乘時,必須一次將資料讀完才行,以 MapReduce 的架構,就是指一列資料。這是因為在作兩兩向量內積時,需要產生 N 階乘的項目,必須一口氣先得知有幾個項目才行,假設共有一千萬的項目,結果共有 900,000,000 的項目同樣擁有某個屬性時(feature),這將導致第一次 Reduce 產生會有一列擁有 900,000,000 的資料,並且在第二次 Map 時,記憶體一開始得讀入九百萬的項目,接著在產生 900,000,000! 等待相乘計算內積的數量。
只是,如果資料真的是這樣時,可能要去想想那個 Feature 是不是要捨棄掉,或是反向去建新的屬性,以此例來說,乾脆去考慮為剩下的一百萬項目建立一個屬性,這樣要作內積的計算也就從 900,000,000 ! 降到 100,000,000! 囉。
以下是我自己寫得粗略程式碼 XD
Makefile (太習慣寫 C 了, 這也是當初比賽前弄的架構, 只是最後改用 streaming 實做啦):
CC = javac
HADOOP = /usr/local/hadoop/bin/hadoop
CFLAGS =
CURR_DIR = $(.CURDIR)
SRC_BASE = .
OUT_BASE = .
PATH_CLASSPATH = /usr/local/hadoop
PATH_SRC = .
PATH_BIN = .
OBJ_DIR = $(OUT_BASE)/out
EXE = out.jar
LIB = $(PATH_CLASSPATH)/hadoop-0.20.1-core.jar
SRC = CosineSimilarity.java
OBJ = $(SRC:.java=.class)
HADOOP_INPUT = tmp_input
HADOOP_TEMP = tmp_tmp
HADOOP_OUTPUT = tmp_output
.SUFFIXES: .java .class
.java.class:
$(CC) -classpath $(LIB) -d $(OBJ_DIR) $(PATH_SRC)/$<
all: $(OBJ_DIR) $(OBJ)
jar: $(OBJ_DIR) $(OBJ) $(EXE)
$(OBJ_DIR):
@rm -rf $(OBJ_DIR) ;
@mkdir $(OBJ_DIR) ;
hadoop: jar
$(HADOOP) dfs -mkdir $(HADOOP_INPUT)
$(HADOOP) dfs -put test.data $(HADOOP_INPUT)/
$(HADOOP) jar $(EXE) org.changyy.CosineSimilarity $(HADOOP_INPUT) $(HADOOP_TEMP) $(HADOOP_OUTPUT)
$(HADOOP) dfs -get $(HADOOP_OUTPUT)/part-* . ;
$(EXE):
jar -cvf $(EXE) -C $(OBJ_DIR) $(OUT_BASE) ;
clean:
@rm -rf $(OBJ_DIR) ;
@rm -rf $(OUT_BASE)/$(EXE) ;
@rm -rf part-*;
@$(HADOOP) dfs -rmr $(HADOOP_INPUT) $(HADOOP_TEMP) $(HADOOP_OUTPUT) || echo "Pass : $(HADOOP) dfs -rmr $(HADOOP_INPUT) $(HADOOP_TEMP) $(HADOOP_OUTPUT)"
程式碼(其中在 main 裡連續作兩個 job 我是照抄同事的程式碼 XD Pairwise Document Similarity in Large Collections with MapReduce ):
package org.changyy;
import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
import java.util.*;
import java.lang.Math;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
public class CosineSimilarity extends Configured implements Tool
{
public static class ExtractFeature_Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>
{
// Src: node_name(str) feature(str) value(str) ..
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
String tmp_key = null;
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
String tmp_feature = null , tmp_value = null;
List<String> feature_list = new ArrayList<String>();
List<String> value_list = new ArrayList<String>();
boolean switch_flag = true;
double length_calculate = 0 , tmp_double = 0;
if( tokenizer.hasMoreTokens() )
tmp_key = tokenizer.nextToken();
while (tokenizer.hasMoreTokens())
{
if( switch_flag )
{
tmp_feature = tokenizer.nextToken();
switch_flag = false;
}
else
{
tmp_value = tokenizer.nextToken();
value.set( tmp_value );
switch_flag = true;
try
{
tmp_double = Double.parseDouble( tmp_value );
}
catch( Exception e )
{
tmp_double = 0;
}
// to list
if( tmp_double != 0 )
{
length_calculate += tmp_double*tmp_double;
feature_list.add( tmp_feature );
value_list.add( ( new Double( tmp_double ) ).toString() );
}
}
}
if( length_calculate > 0 )
{
Text out_key = new Text();
Text out_value = new Text();
String length = ( new Double( Math.sqrt( length_calculate ) ) ).toString();
Iterator iterator_feature = feature_list.iterator();
Iterator iterator_value = value_list.iterator();
while( iterator_feature.hasNext() ) // && iterator_value.hasNext() ) // for performance
{
out_key.set( (String) iterator_feature.next() );
out_value.set( (String) iterator_value.next() + "\t" + tmp_key + "\t" + length );
// Out: Feature \t Value \t Node \t Length
output.collect( out_key , out_value );
}
}
}
}
public static class ExtractFeature_Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text>
{
// Out: Feature \t Value \t Node \t Length
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
Text out_value = new Text();
String tmp_value = new String();
if( values.hasNext() )
tmp_value += values.next();
while (values.hasNext())
tmp_value += "\t" + values.next();
out_value.set( tmp_value );
output.collect( key, out_value );
//int sum = 0;
//while (values.hasNext())
// sum += values.next().get();
//output.collect(key, new IntWritable(sum));
}
}
public static class InnerProduct_Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>
{
// Src: feature \t value \t node1 \t length \t value \t node2 \t lenght ...
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
StringTokenizer tokenizer = new StringTokenizer( (String) value.toString() );
List<String> node_list = new ArrayList<String>();
List<Double> value_list = new ArrayList<Double>();
List<Double> length_list = new ArrayList<Double>();
int switch_flag = 0;
String tmp_string ;
if( tokenizer.hasMoreTokens() )
tmp_string = tokenizer.nextToken(); // skip the first element
double tmp_double = 0;
while (tokenizer.hasMoreTokens())
{
switch( switch_flag )
{
case 0:
try
{
tmp_double = Double.parseDouble( (String) tokenizer.nextToken() );
}
catch( Exception e )
{
tmp_double = 1;
}
value_list.add( new Double( tmp_double ) );
switch_flag = 1;
break;
case 1:
node_list.add( tokenizer.nextToken() );
switch_flag = 2;
break;
case 2:
try
{
tmp_double = Double.parseDouble( (String) tokenizer.nextToken() );
}
catch( Exception e )
{
tmp_double = 1;
}
length_list.add( new Double( tmp_double ) );
switch_flag = 0;
break;
}
}
Text out_key = new Text();
Text out_value = new Text();
String tmp_string_2 = null;
for( int i=0 , j=0 , size=node_list.size() ; i<size ; ++i )
{
for( j=i+1; j<size ; ++j )
{
tmp_string = node_list.get(i);
tmp_string_2 = node_list.get(j);
if( tmp_string.compareTo( tmp_string_2 ) < 0)
tmp_string += "," + tmp_string_2;
else
tmp_string = tmp_string_2 + "," + tmp_string;
out_value.set( ""+ ( value_list.get(i)*value_list.get(j) / ( length_list.get(i)*length_list.get(j) ) ) );
out_key.set( "("+tmp_string+")" );
// Out: Node1_Node2 \t Value
output.collect( out_key , out_value );
}
}
}
}
public static class InnerProduct_Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text>
{
// Out: (Node1,Node2) \t Value
public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
{
double tmp_value=0 ;
while (values.hasNext())
tmp_value += Double.parseDouble( (String) values.next().toString() );
Text out_value = new Text();
out_value.set( ( new Double( tmp_value ) ).toString() );
output.collect( key, out_value );
}
}
public int run(String[] args) throws Exception
{
//JobConf conf = new JobConf(CosineSimilarity.class);
JobConf conf = new JobConf( getConf() , CosineSimilarity.class);
conf.setJobName("CosineSimilarity_ExtractFeature");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
conf.setMapperClass(ExtractFeature_Map.class);
conf.setCombinerClass(ExtractFeature_Reduce.class);
conf.setReducerClass(ExtractFeature_Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
Job job = new Job(conf);
//JobConf conf2 = new JobConf(CosineSimilarity.class);
JobConf conf2 = new JobConf( getConf() , CosineSimilarity.class);
conf2.setJobName("CosineSimilarity_InnerProduct");
conf2.setOutputKeyClass(Text.class);
conf2.setOutputValueClass(Text.class);
conf2.setMapperClass(InnerProduct_Map.class);
conf2.setCombinerClass(InnerProduct_Reduce.class);
conf2.setReducerClass(InnerProduct_Reduce.class);
conf2.setInputFormat(TextInputFormat.class);
conf2.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf2, new Path(args[1]+"/part*"));
FileOutputFormat.setOutputPath(conf2, new Path(args[2]));
Job job2 = new Job(conf2);
job2.addDependingJob(job);
JobControl controller = new JobControl("CosineSimilarity");
controller.addJob(job);
controller.addJob(job2);
new Thread(controller).start();
while (!controller.allFinished())
{
System.out.println("Jobs in waiting state: "+ controller.getWaitingJobs().size());
System.out.println("Jobs in ready state: "+ controller.getReadyJobs().size());
System.out.println("Jobs in running state: "+ controller.getRunningJobs().size());
System.out.println("Jobs in success state: "+ controller.getSuccessfulJobs().size());
System.out.println("Jobs in failed state: "+ controller.getFailedJobs().size());
System.out.println();
try
{
Thread.sleep(20000);
} catch (Exception e)
{
e.printStackTrace();
}
}
//JobClient.runJob(conf2);
return 0;
}
public static void main(String[] args) throws Exception
{
int status = ToolRunner.run(new Configuration(), new CosineSimilarity(), args);
System.exit( status );
}
}
測資(test.data):
node1 f1 10 f2 1 f3 4 f5 2
node2 f2 50 f4 30 f5 10
node3 f1 50 f3 10
node4 f5 20 f1 10
node5 f6 1 f3 10
node6 f6 20
node7 f6 20
執行:
# make clean
# make hadoop
其中會使用三個目錄 tmp_input , tmp_output , tmp_tmp ,第一步就是先清掉,接著 make hadoop 就是編譯成 jar 檔以及執行, 最後再從 tmp_output 撈 part* 出來
觀看結果
# cat part*
原先也想寫個 Writable 等等的東西,但寫好了卻編譯有問題,就是找不到 Orz 可能是我對 jar 檔架構不熟,因此最後就改用字串啦 :P 就像寫 streaming 一樣,全部都用字串來處理。有機會再來慢慢熟悉 jar 環境了!
沒有留言:
張貼留言