2009年12月24日 星期四

[Java] Pairwise Vector Similarity by Cosine Similarity @ Hadoop 0.20.1

Source: http://en.wikipedia.org/wiki/Cosine_similarity

source: http://en.wikipedia.org/wiki/Cosine_similarity


前陣子在 「台灣區 Hadoop 使用者社群會議」看到 Image Selection for Large-Scale Flickr Photos using Hadoop 中,使用 Cosine Similarity 計算向量的相似程度,才想起原來 MapReduce 的架構也可以用這啊!第一次聽到 Cosine Similarity 可能是大一的線性代數或是微積分,但第一次專題用到的實做,則是在大四 Datamining 的課程上,當時我們正在作一個找出相似遊戲玩家的專題,因此使用 Vector 跟 Cosine 來計算相似程度,但那次並不是我實做這類東西,我只負責用 Perl 去撈別人家的資料庫,哈。


最近讀 paper 有點悶,就先自己想了一下 Cosine Similarity 實作方式,發現有些地方卡卡的,另外,覺得做出來的還是要負擔一次把所有資料讀進記憶體的花費,這應該不是好的解法,所以就使用 "Cosine Similarity" 跟 "Hadoop" 或 "MapReduce" 關鍵字來搜尋一下,就找到以下幾篇:



當我了解實做上採用的演算法時,不禁感嘆這演算法的美好,所以決定來實做一下。另外,有個插曲是我請教強者同事關於連續做兩個有相依性的 job 問題時,發現他也實做過這個演算法 :D 還好我也先實做完才問,不然應該會很偷懶吧,哈



然後我有點龜毛地跟同事閒聊為什摩上頭提到的 Document Similarity 的計算都沒有完整做完 Cosine Similarity 的動作,就只是作向量的內積部份( Dot product , Inner product space ),後來討論時也才想到,其實 Document Similarity 重點在於 keyword 是否有 match 到,因此向量內積的算法,恰巧可以用這樣個觀點上,只是我還是覺得些不妥,因為呈現的數值沒有正規劃,假設有兩組向量的比較



  1. ( 1 , 1 ) , ( 1 , 100 )

  2. ( 1 , 1 ) , ( 1 , 1 )


以單純向量內積結果,第一組數值是 101 , 第二組是 2 ,但我覺得相似程度應該是第二組最好,完全 match 啊,以 Cosine Similarity 的作法,就會變成 1 ~ -1 之間,即 0.714142143 和 1 ,就可以很輕易看出第二組最好囉。只是實用上,可能計算量大而不適用!但我還是打算實做完 Cosine Similarity 啦,若單純想用 Document Similarity 可以到 Pairwise Document Similarity in Large Collections with MapReduce 逛逛,上頭有完整的 Java 程式囉!另外,若又想要降低計算量,又想計算 Cosine Similarity ,那可以先將向量正規化,讓他們的長度都為 1 ,接著再用 Document Similarity 算,就可以是正解啦!


對於 Cosine Similarity 的實做,採用 Sparse Matrix/Vector 的方式紀錄各物件的屬性,流程如下:


物件 (Source) -> 取出特性 (1st Map) -> 依此特性進行收集 (1st Reduce) => 以特性為主體,決定有多少物件要計算內積 (2nd Map) -> 以內積項目進行收集與累積數值 (2nd Reduce)


第一次是以屬性(feature)作為丟給 Reducer 的分群,第二次則以兩內積作為丟給 Reducer 的分群依據。


透過這樣的架構,一開始在對物件取資料時,就可以用多個 Mapper 同時取資料做事,至於缺點部份,則是第二次 Map 在決定有多少項目要作內積相乘時,必須一次將資料讀完才行,以 MapReduce 的架構,就是指一列資料。這是因為在作兩兩向量內積時,需要產生 N 階乘的項目,必須一口氣先得知有幾個項目才行,假設共有一千萬的項目,結果共有 900,000,000 的項目同樣擁有某個屬性時(feature),這將導致第一次 Reduce 產生會有一列擁有 900,000,000 的資料,並且在第二次 Map 時,記憶體一開始得讀入九百萬的項目,接著在產生 900,000,000! 等待相乘計算內積的數量。


只是,如果資料真的是這樣時,可能要去想想那個 Feature 是不是要捨棄掉,或是反向去建新的屬性,以此例來說,乾脆去考慮為剩下的一百萬項目建立一個屬性,這樣要作內積的計算也就從 900,000,000 ! 降到 100,000,000! 囉。


以下是我自己寫得粗略程式碼 XD


Makefile (太習慣寫 C 了, 這也是當初比賽前弄的架構, 只是最後改用 streaming 實做啦):


CC = javac
HADOOP = /usr/local/hadoop/bin/hadoop
CFLAGS =
CURR_DIR = $(.CURDIR)
SRC_BASE = .
OUT_BASE = .

PATH_CLASSPATH = /usr/local/hadoop
PATH_SRC = .
PATH_BIN = .

OBJ_DIR = $(OUT_BASE)/out
EXE = out.jar
LIB = $(PATH_CLASSPATH)/hadoop-0.20.1-core.jar
SRC = CosineSimilarity.java
OBJ = $(SRC:.java=.class)

HADOOP_INPUT = tmp_input
HADOOP_TEMP = tmp_tmp
HADOOP_OUTPUT = tmp_output

.SUFFIXES: .java .class

.java.class:
        $(CC) -classpath $(LIB) -d $(OBJ_DIR) $(PATH_SRC)/$<

all: $(OBJ_DIR) $(OBJ)

jar: $(OBJ_DIR) $(OBJ) $(EXE)

$(OBJ_DIR):
        @rm -rf $(OBJ_DIR) ;
        @mkdir $(OBJ_DIR) ;

hadoop: jar
        $(HADOOP) dfs -mkdir $(HADOOP_INPUT)
        $(HADOOP) dfs -put test.data $(HADOOP_INPUT)/
        $(HADOOP) jar $(EXE) org.changyy.CosineSimilarity $(HADOOP_INPUT) $(HADOOP_TEMP) $(HADOOP_OUTPUT)
        $(HADOOP) dfs -get $(HADOOP_OUTPUT)/part-* . ;

$(EXE):
        jar -cvf $(EXE) -C $(OBJ_DIR) $(OUT_BASE) ;

clean:
        @rm -rf $(OBJ_DIR) ;
        @rm -rf $(OUT_BASE)/$(EXE) ;
        @rm -rf part-*;
        @$(HADOOP) dfs -rmr $(HADOOP_INPUT) $(HADOOP_TEMP) $(HADOOP_OUTPUT) || echo "Pass : $(HADOOP) dfs -rmr $(HADOOP_INPUT) $(HADOOP_TEMP) $(HADOOP_OUTPUT)"


程式碼(其中在 main 裡連續作兩個 job 我是照抄同事的程式碼 XD Pairwise Document Similarity in Large Collections with MapReduce ):


package org.changyy;
        
import java.io.IOException;
import java.io.DataInput;
import java.io.DataOutput;
import java.util.*;
import java.lang.Math;
        
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;


public class CosineSimilarity extends Configured implements Tool
{
    public static class ExtractFeature_Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>
    {
        // Src:    node_name(str)    feature(str)    value(str)    ..
        public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
        {
            String tmp_key = null;
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            String tmp_feature = null , tmp_value = null;

            List<String> feature_list = new ArrayList<String>();
            List<String> value_list = new ArrayList<String>();

            boolean switch_flag = true;
            double length_calculate = 0 , tmp_double = 0;
            if( tokenizer.hasMoreTokens() )
                tmp_key = tokenizer.nextToken();
            while (tokenizer.hasMoreTokens())
            {
                if( switch_flag )
                {
                    tmp_feature = tokenizer.nextToken();
                    switch_flag = false;
                }
                else
                {
                    tmp_value = tokenizer.nextToken();
                    value.set( tmp_value );
                    switch_flag = true;
                    try
                    {
                        tmp_double = Double.parseDouble( tmp_value );
                    }
                    catch( Exception e )
                    {
                        tmp_double = 0;
                    }
                    // to list
                    if( tmp_double != 0 )
                    {
                        length_calculate += tmp_double*tmp_double;
                        feature_list.add( tmp_feature );
                        value_list.add( ( new Double( tmp_double ) ).toString() );
                    }
                }
            }
            if( length_calculate > 0 )
            {
                Text out_key = new Text();
                Text out_value = new Text();
                String length = ( new Double( Math.sqrt( length_calculate ) ) ).toString();
                Iterator iterator_feature = feature_list.iterator();
                Iterator iterator_value = value_list.iterator();
                while( iterator_feature.hasNext() ) // && iterator_value.hasNext() ) // for performance
                {
                    out_key.set( (String) iterator_feature.next() );
                    out_value.set( (String) iterator_value.next() + "\t" + tmp_key + "\t" + length  );

                    // Out: Feature \t Value \t Node \t Length
                    output.collect( out_key , out_value );
                }
            }
        }
    }
        
    public static class ExtractFeature_Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text>
    {
        // Out: Feature \t Value \t Node \t Length
        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
        {
            Text out_value = new Text();
            String tmp_value = new String();
            if( values.hasNext() )
                tmp_value += values.next();
            while (values.hasNext())
                tmp_value += "\t" + values.next();

            out_value.set( tmp_value );
            output.collect( key, out_value );
            //int sum = 0;
            //while (values.hasNext())
            //    sum += values.next().get();
            //output.collect(key, new IntWritable(sum));
        }
    }

    public static class InnerProduct_Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text>
    {
        // Src:    feature \t value \t node1 \t length \t value \t node2 \t lenght ...
        public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
        {
            StringTokenizer tokenizer = new StringTokenizer( (String) value.toString() );

            List<String> node_list = new ArrayList<String>();
            List<Double> value_list = new ArrayList<Double>();
            List<Double> length_list = new ArrayList<Double>();

            int switch_flag = 0;
            String tmp_string ;
            if( tokenizer.hasMoreTokens() )
                 tmp_string = tokenizer.nextToken();     // skip the first element

            double tmp_double = 0;
            while (tokenizer.hasMoreTokens())
            {
                switch( switch_flag )
                {
                    case 0:
                        try
                        {
                            tmp_double = Double.parseDouble( (String) tokenizer.nextToken() );
                        }
                        catch( Exception e )
                        {
                            tmp_double = 1;
                        }
                        value_list.add( new Double( tmp_double ) );
                        switch_flag = 1;
                        break;
                    case 1:
                        node_list.add( tokenizer.nextToken() );
                        switch_flag = 2;
                        break;
                    case 2:
                        try
                        {
                            tmp_double = Double.parseDouble( (String) tokenizer.nextToken() );
                        }
                        catch( Exception e )
                        {
                            tmp_double = 1;
                        }
                        length_list.add( new Double( tmp_double ) );
                        switch_flag = 0;
                        break;
                }
            }
            Text out_key = new Text();
            Text out_value = new Text();
            String tmp_string_2 = null;
            for( int i=0 , j=0 , size=node_list.size() ; i<size ; ++i )
            {
                for( j=i+1; j<size ; ++j )
                {
                    tmp_string = node_list.get(i);
                    tmp_string_2 = node_list.get(j);
                    
                    if( tmp_string.compareTo( tmp_string_2 ) < 0)
                        tmp_string += "," + tmp_string_2;
                    else
                        tmp_string = tmp_string_2 + "," + tmp_string;

                    out_value.set( ""+ ( value_list.get(i)*value_list.get(j) / ( length_list.get(i)*length_list.get(j) ) ) );
                    out_key.set( "("+tmp_string+")" );

                    // Out: Node1_Node2 \t Value
                    output.collect( out_key , out_value );
                }
            }
        }
    }
        
    public static class InnerProduct_Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text>
    {
        // Out: (Node1,Node2) \t Value
        public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException
        {
            double tmp_value=0 ;
            while (values.hasNext())
                tmp_value += Double.parseDouble( (String) values.next().toString() );

            Text out_value = new Text();
            out_value.set( ( new Double( tmp_value ) ).toString() );
            output.collect( key, out_value );
        }
    }

    public int run(String[] args) throws Exception
    {
        //JobConf conf = new JobConf(CosineSimilarity.class);
        JobConf conf = new JobConf( getConf() , CosineSimilarity.class);
        conf.setJobName("CosineSimilarity_ExtractFeature");
        
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);
        
        conf.setMapperClass(ExtractFeature_Map.class);
        conf.setCombinerClass(ExtractFeature_Reduce.class);
        conf.setReducerClass(ExtractFeature_Reduce.class);
        
        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);
        
        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        
        Job job = new Job(conf);

        //JobConf conf2 = new JobConf(CosineSimilarity.class);
        JobConf conf2 = new JobConf( getConf() , CosineSimilarity.class);
        conf2.setJobName("CosineSimilarity_InnerProduct");
        
        conf2.setOutputKeyClass(Text.class);
        conf2.setOutputValueClass(Text.class);
        
        conf2.setMapperClass(InnerProduct_Map.class);
        conf2.setCombinerClass(InnerProduct_Reduce.class);
        conf2.setReducerClass(InnerProduct_Reduce.class);
        
        conf2.setInputFormat(TextInputFormat.class);
        conf2.setOutputFormat(TextOutputFormat.class);
        
        FileInputFormat.setInputPaths(conf2, new Path(args[1]+"/part*"));
        FileOutputFormat.setOutputPath(conf2, new Path(args[2]));
        
        Job job2 = new Job(conf2);

        job2.addDependingJob(job);
        JobControl controller = new JobControl("CosineSimilarity");
        controller.addJob(job);
        controller.addJob(job2);

        
        new Thread(controller).start();

        while (!controller.allFinished())
        {
            System.out.println("Jobs in waiting state: "+ controller.getWaitingJobs().size());
            System.out.println("Jobs in ready state: "+ controller.getReadyJobs().size());
            System.out.println("Jobs in running state: "+ controller.getRunningJobs().size());
            System.out.println("Jobs in success state: "+ controller.getSuccessfulJobs().size());
            System.out.println("Jobs in failed state: "+ controller.getFailedJobs().size());
            System.out.println();

            try
            {
                Thread.sleep(20000);
            } catch (Exception e)
            {
                e.printStackTrace();
            }
        }

        //JobClient.runJob(conf2);
        return 0;
    }
    public static void main(String[] args) throws Exception
    {
        int status = ToolRunner.run(new Configuration(), new CosineSimilarity(), args);
        System.exit( status );
    }
}


測資(test.data):


node1   f1      10      f2      1       f3      4       f5      2
node2   f2      50      f4      30      f5      10
node3   f1      50      f3      10
node4   f5      20      f1      10
node5   f6      1       f3      10
node6   f6      20
node7   f6      20


執行:


# make clean
# make hadoop

其中會使用三個目錄 tmp_input , tmp_output , tmp_tmp ,第一步就是先清掉,接著 make hadoop 就是編譯成 jar 檔以及執行, 最後再從 tmp_output 撈 part* 出來

觀看結果
# cat part*


原先也想寫個 Writable 等等的東西,但寫好了卻編譯有問題,就是找不到 Orz 可能是我對 jar 檔架構不熟,因此最後就改用字串啦 :P 就像寫 streaming 一樣,全部都用字串來處理。有機會再來慢慢熟悉 jar 環境了!


沒有留言:

張貼留言