MR - Count Trigram(연속된 단어 추출)

Posted 01 12, 2014 15:50, Filed under: BigData/MapReduce


# MR - Count Trigram(연속된 단어 추출)


WordCount는 그냥 단어의 빈도수를 세고, 트라이그램(Trigram)이란 연속된 N개 단어의 빈도수를 구함



# CountTrigram.java
package hadoop.dev.comment;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.util.GenericOptionsParser;

public class CountTrigram {

    public static class Map extends Mapper<Text, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text trigram = new Text();

        public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            
            try{
                String cmtId = line.replaceAll("[\r\n\"\'(){}!.,=~\\-\\|\\[\\]@#$%^&*_+/<>]", " ");
                StringTokenizer tokenizer = new StringTokenizer(cmtId);

                // 단어의 수가 n개보다 크거나 같을 때만 trigram을 만들어냄
                if (tokenizer.countTokens() >= 3) {
                    String firstToken = tokenizer.nextToken().toUpperCase();
                    String secondToken = tokenizer.nextToken().toUpperCase();

                    while (tokenizer.hasMoreTokens()) {
                        String thirdToken = tokenizer.nextToken().toUpperCase();
//                        trigram.set(firstToken + " " + secondToken + " " + thirdToken);
                        trigram.set(firstToken + " " + secondToken);
                        context.write(trigram, one);

                        firstToken = secondToken;
                        secondToken = thirdToken;
                    }
                }
            }catch(ArrayIndexOutOfBoundsException e){
                System.out.println(e.getMessage());
            }
        }
    } 

    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

        public void reduce(Text key, Iterable<IntWritable> values, Context context) 
                throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
        Job job = new Job(conf, "Count Trigram");
        
//        args[0] = "/user/var/mapred/comment_data/comm_ctnt_cmt.txt";
//        args[1] = "/user/var/mapred/comment_output/Data_Word3";

        job.setJarByClass(CountTrigram.class); 
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(Map.class);
        job.setCombinerClass(Reduce.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormatClass(KeyValueTextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        if (!job.waitForCompletion(true)){
            return;
        }

        Configuration conf2 = new Configuration();
        Job job2 = new Job(conf2, "Top N");

        job2.setJarByClass(TopN.class);
        job2.setOutputKeyClass(Text.class);
        job2.setOutputValueClass(LongWritable.class);

        job2.setMapperClass(TopN.Map.class);
        job2.setReducerClass(TopN.Reduce.class);
        job2.setNumReduceTasks(1);

        job2.setInputFormatClass(KeyValueTextInputFormat.class);
        job2.setOutputFormatClass(TextOutputFormat.class);

        // input of Job2 is output of Job
        FileInputFormat.addInputPath(job2, new Path(args[1]));
        FileOutputFormat.setOutputPath(job2, new Path(args[1] + "//topN"));
        job2.getConfiguration().setInt("topN", 10);

        if (!job2.waitForCompletion(true)){
            return;
        }
    }

}



# TopN.java
package hadoop.dev.comment;

import java.io.IOException;
import java.util.*;
        
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class TopN {

  public static void insert(PriorityQueue queue, String item, Long lValue, int topN) {
    ItemFreq head = (ItemFreq)queue.peek();

    // 큐의 원소수가 topN보다 작거나 지금 들어온 빈도수가 큐내의 최소 빈도수보다 크면
    if (queue.size() < topN || head.getFreq() < lValue) {
      ItemFreq itemFreq = new ItemFreq(item, lValue);
      
      // 일단 큐에 추가하고 
      queue.add(itemFreq);
      // 큐의 원소수가 topN보다 크면 가장 작은 원소를 제거합니다.
      // if (queue.size() > topN && head != null && head.getFreq() < lValue) {
      if (queue.size() > topN) {
          queue.remove();
      }
    }
  }
      
  public static class ItemFreqComparator implements Comparator<ItemFreq> {

    @Override
    public int compare(ItemFreq x, ItemFreq y) {

      if (x.getFreq() < y.getFreq()) {
        return -1;
      }
      if (x.getFreq() > y.getFreq()) {
        return 1;
      }
      return 0;
    }

  }

  public static class Map extends Mapper<Text, Text, Text, LongWritable> {

    private final static LongWritable one = new LongWritable(1);
    Comparator<ItemFreq> comparator = new ItemFreqComparator();
    PriorityQueue<ItemFreq> queue = new PriorityQueue<ItemFreq>(10, comparator);
    int topN = 10;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
      topN = context.getConfiguration().getInt("topN", 10);
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {

      while (queue.size() != 0) {
        ItemFreq item = (ItemFreq)queue.remove();
        context.write(new Text(item.getItem()), new LongWritable(item.getFreq()));
      }

    }

    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        Long lValue = (long)Integer.parseInt(value.toString());

        insert(queue, key.toString(), lValue, topN);
    }
  } 
        
  public static class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {

    Comparator<ItemFreq> comparator = new ItemFreqComparator();
    PriorityQueue<ItemFreq> queue = new PriorityQueue<ItemFreq>(10, comparator);
    int topN = 10;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
      topN = context.getConfiguration().getInt("topN", 10);
    }

    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {

      while (queue.size() != 0) {
        ItemFreq item = (ItemFreq)queue.remove();
        context.write(new Text(item.getItem()), new LongWritable(item.getFreq()));
      }

    }

    public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long sum = 0;
        for (LongWritable val : values) {
            sum += val.get();
        }

        insert(queue, key.toString(), sum, topN);
    }
 }
        
 public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    Job job = new Job(conf, "TopN");

    job.setJarByClass(TopN.class);    
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
        
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setNumReduceTasks(1);

    job.setInputFormatClass(KeyValueTextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
        
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    job.getConfiguration().setInt("topN", Integer.parseInt(args[2]));
        
    job.waitForCompletion(true);
 }
        
}



# ItemFreq.java
package hadoop.dev.comment;

public class ItemFreq {
//implements Comparable<ItemFreq> {

    private String item;
    private Long freq;

    /**
     * Constructor.
     */
    public ItemFreq() { 
      this.item = "";
      this.freq = 0L;
    }

    /**
     * Constructor.
     */
    public ItemFreq(String item, long freq) {
            this.item = item;
            this.freq = freq;
    }

    @Override
    public String toString() {
            return (new StringBuilder())
                            .append('{')
                            .append(item)
                            .append(',')
                            .append(freq)
                            .append('}')
                            .toString();
    }

    public String getItem() {
            return item;
    }

    public void setItem(String item) {
            this.item = item;
    }

    public Long getFreq() {
            return freq;
    }

    public void setFreq(Long freq) {
            this.freq = freq;
    }
}






※ 위 내용은, 여러 자료를 참고하거나 제가 주관적으로 정리한 것입니다.
   잘못된 정보나 보완이 필요한 부분을, 댓글 또는 메일로 보내주시면 많은 도움이 되겠습니다.
01 12, 2014 15:50 01 12, 2014 15:50


Trackback URL : http://develop.sunshiny.co.kr/trackback/983

Leave a comment

« Previous : 1 : ... 59 : 60 : 61 : 62 : 63 : 64 : 65 : 66 : 67 : ... 648 : Next »

Recent Posts

  1. HDFS - Python Encoding 오류 처리
  2. HP - Vertica ROS Container 관련 오류...
  3. HDFS - Hive 실행시 System Time 오류
  4. HP - Vertica 사용자 쿼리 이력 테이블...
  5. Client에서 HDFS 환경의 데이터 처리시...

Recent Comments

  1. Right away I am ready to do my bre... 골목게임 11 17,
  2. Terrific article! That is the type... 선릉야구장 11 16,
  3. Yes! Finally someone writes about /. / 11 16,
  4. Круто, круто! Некот... карточные игры на... 11 13,
  5. 안녕하세요^^ 배그핵

Recent Trackbacks

  1. master djs bozeman master djs bozeman %M
  2. wedding dj bozeman mt wedding dj bozeman mt 17 11
  3. joes dj bozeman joes dj bozeman 17 11
  4. Mysql - mysql 설치후 Character set... 멀고 가까움이 다르기 때문 %M

Calendar

«   11 2019   »
          1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30

Bookmarks

  1. 위키피디아
  2. MysqlKorea
  3. 오라클 클럽
  4. API - Java
  5. Apache Hadoop API
  6. Apache Software Foundation
  7. HDFS 생태계 솔루션
  8. DNSBL - Spam Database Lookup
  9. Ready System
  10. Solaris Freeware
  11. Linux-Site
  12. 윈디하나의 솔라나라

Site Stats

TOTAL 2755890 HIT
TODAY 132 HIT
YESTERDAY 638 HIT