MR - TopN (빈도수 추출)

Posted 01 12, 2014 15:10, Filed under: BigData/MapReduce


# MR - TopN (빈도수 추출)


우선순위 큐(PriorityQueue)를 이용해서 모든 레코드들을 다 저장하지 않고 현재까지 처리된 레코드들 중에 빈도수가 가장 큰 N개만 유지하는 형태로 메모리 사용을 최적화 하는 방법



# TopNDataKey.java

package hadoop.mr.group.topn;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;

public class TopNDataKey  implements WritableComparable<TopNDataKey>{
    
    private String dateKey;
    private Long day;
    
    public TopNDataKey() {
          this.dateKey = "";
          this.day = 0L;
    }

    public TopNDataKey(String dateKey, Long day) {
        this.dateKey = dateKey;
        this.day = day;
    }

    public String getDateKey() {
        return dateKey;
    }

    public void setDateKey(String dateKey) {
        this.dateKey = dateKey;
    }

    public Long getDay() {
        return day;
    }

    public void setDay(Long day) {
        this.day = day;
    }

    @Override
    public String toString() {
//        return (new StringBuilder()).append(dataKey).append(",").append(value).toString();
        return (new StringBuilder()).append(dateKey).append(",").append(day).toString();
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        // 쓰여진 순서대로 데이터를 읽음
        dateKey = WritableUtils.readString(in);
        day = in.readLong();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        // 읽을 순서대로 데이터를 씀.
        WritableUtils.writeString(out, dateKey);
        out.writeLong(day); // IMEIKey
    }

    @Override
    public int compareTo(TopNDataKey key) {
        int result = dateKey.compareTo(key.dateKey);
        if (0 == result) {
            result = day.compareTo(key.day);
        }
        return result;
    }        
    
}


# TopNMapper.java

package hadoop.mr.group.topn;

import java.io.IOException;
import java.util.Comparator;
import java.util.PriorityQueue;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TopNMapper extends Mapper<Text, Text, Text, Text> {

    Comparator<TopNDataKey> comparator = new DataKeyComparator();

    //(우선순위 큐 초기 사이즈, 큐 내에서 정렬을 위해 비교할 때 사용할 Comparator 인스턴스)
    PriorityQueue<TopNDataKey> queue = new PriorityQueue<TopNDataKey>(10, comparator);
    int topN = 10;

    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        Long lValue = (long)Integer.parseInt(value.toString());

        insert(queue, key.toString(), lValue, topN);
        // 입력 레코드가 모두 소진되는 시점 cleanup() 시에 context.write() 호출
    }  

    /* 
     * # 매퍼 실행시 초기 파라미터값 추출
     */  
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        topN = context.getConfiguration().getInt("topN", 10);
    }

    /* 
     * # 매퍼 종료전(입력 레코드 모두 소진 시점)에 출력 처리
     */      
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {

        // 큐의 크기가 0이 될때까지, 큐에서 가장 작은 객체를 빼내어 출력 레코드로 만듬.
        while (queue.size() != 0) {
            TopNDataKey item = (TopNDataKey)queue.remove();

            String[] dateKeyArr = item.getDateKey().split(",");
            String IMEIKey = dateKeyArr[1];
            item.setDateKey(dateKeyArr[0]);
            item.setDay(new Long(dateKeyArr[0].substring(6)));

            context.write(new Text(item.getDateKey()), new Text(IMEIKey+","+dateKeyArr[2]));
        }
    }

    /**
     * # 우선순위 큐에 TopNDataKey 객체를 넣음
     * 
     *  - 우선순위 큐의 크기가 topN으로 지정된 값보다 작을 경우
     *  - 우선순위 큐의 객체 중 가장 작은 빈도수보다 입력으로 지정된 IValue의 값이 더 큰 경우
     */      
    public static void insert(PriorityQueue queue, String item, Long lValue, int topN) {
        TopNDataKey head = (TopNDataKey)queue.peek();

        // 큐의 원소수가 topN보다 작거나 지금 들어온 빈도수가 큐내의 최소 빈도수보다 크면
        if (queue.size() < topN || head.getDay() < lValue) {
            TopNDataKey dataKey = new TopNDataKey(item, lValue);

            // 큐에 추가 
            queue.add(dataKey);

            // 큐의 원소수가 topN보다 크면 가장 작은 원소를 제거
            // if (queue.size() > topN && head != null && head.getFreq() < lValue) {
            if (queue.size() > topN) {
                queue.remove();
            }
        }
    }  


} 


# TopNReducer.java
package hadoop.mr.group.topn;

import java.io.IOException;
import java.util.Comparator;
import java.util.PriorityQueue;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class TopNReducer extends Reducer<Text, Text, Text, Text> {

    Comparator<TopNDataKey> comparator = new DataKeyComparator();

    // (우선순위 큐 초기 사이즈, 큐 내에서 정렬을 위해 비교할 때 사용할 Comparator 인스턴스)
    PriorityQueue<TopNDataKey> queue = new PriorityQueue<TopNDataKey>(10, comparator);
    int topN = 10;

    public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long sum = 0;
        for (LongWritable val : values) {
            sum += val.get();
        }

        insert(queue, key.toString(), sum, topN);
        // 입력 레코드가 모두 소진되는 시점 cleanup() 시에 context.write() 호출
    }

    /* 
     * # 리듀서 실행시 초기 파라미터값 추출
     */      
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        topN = context.getConfiguration().getInt("topN", 10);
    }

    /* 
     * # 리듀서 종료전(입력 레코드 모두 소진 시점)에 출력 처리
     */             
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {

        // 큐의 크기가 0이 될때까지, 큐에서 가장 작은 객체를 빼내어 출력 레코드로 만듬.
        while (queue.size() != 0) {
            TopNDataKey item = (TopNDataKey)queue.remove();

            String[] dateKeyArr = item.getDateKey().split(",");
            String IMEIKey = dateKeyArr[1];
            item.setDateKey(dateKeyArr[0]);
            item.setDay(new Long(dateKeyArr[0].substring(6)));          

            context.write(new Text(item.getDateKey()), new Text(IMEIKey+","+dateKeyArr[2]));
        }
    }

    /**
     * # 우선순위 큐에 TopNDataKey 객체를 넣음
     * 
     *  - 우선순위 큐의 크기가 topN으로 지정된 값보다 작을 경우
     *  - 우선순위 큐의 객체 중 가장 작은 빈도수보다 입력으로 지정된 IValue의 값이 더 큰 경우
     */    
    public static void insert(PriorityQueue queue, String item, Long lValue, int topN) {
        TopNDataKey head = (TopNDataKey)queue.peek();

        // 큐의 원소수가 topN보다 작거나 지금 들어온 빈도수가 큐내의 최소 빈도수보다 크면
        if (queue.size() < topN || head.getDay() < lValue) {
            TopNDataKey dataKey = new TopNDataKey(item, lValue);

            // 큐에 추가 
            queue.add(dataKey);

            // 큐의 원소수가 topN보다 크면 가장 작은 원소를 제거합니다.
            // if (queue.size() > topN && head != null && head.getFreq() < lValue) {
            if (queue.size() > topN) {
                queue.remove();
            }
        }
    }    
}


# TopNDriver.java
package hadoop.mr.group.topn;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class TopNDriver {
    public void runJob(String inputPath, String outputPath) throws Exception {

        Configuration conf = new Configuration();

        Job job = new Job(conf, "TopN Driver");
        job.setJarByClass(TopNDriver.class);    

        job.setMapperClass(TopNMapper.class);
        job.setReducerClass(TopNReducer.class);
        job.setNumReduceTasks(1);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setInputFormatClass(KeyValueTextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        job.getConfiguration().setInt("topN", 30);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
    
/**
 * #  일자별 정렬일 경우에는 잘 되지만, 디렉토리 전체를 대상으로 정렬할때 비정상
 *  > TopN 에서 전체 디렉토리 되는 상황확인, 내림차순 X
 */

    public static void main(String[] args) throws Exception {
        (new TopNDriver()).runJob(
                "/user/var/mapred/visit_data_output/one_day2/DataKeyValue-m-00000", 
                "/user/var/mapred/visit_data_output/one_day2/TopN_Data"
                );
    }
}



# DataKeyComparator.java
package hadoop.mr.group.topn;


import java.util.Comparator;

/**
 * # 우선순위 큐에서 원소들 간의 순서를 정함
 *  두개의 인스턴스를 받아서 getDay 값이 더 큰 것을 앞으로 세워줌. 
 *
 */
public class DataKeyComparator implements Comparator<TopNDataKey> {

    @SuppressWarnings("rawtypes")
    @Override
    public int compare(TopNDataKey x, TopNDataKey y) {

        // 값을 비교한 정렬 : 날자를 정렬 대상 그룹 지정
        return x.getDay() == y.getDay() ? 0 : (x.getDay() > y.getDay() ? -1 : 1);
        
/*        if (x.getFreq() < y.getFreq()) {
            return -1;
          }
          if (x.getFreq() > y.getFreq()) {
            return 1;
          }
          return 0;    
*/    
    }
}

※ 위 내용은, 여러 자료를 참고하거나 제가 주관적으로 정리한 것입니다.
   잘못된 정보나 보완이 필요한 부분을, 댓글 또는 메일로 보내주시면 많은 도움이 되겠습니다.
01 12, 2014 15:10 01 12, 2014 15:10


Trackback URL : http://develop.sunshiny.co.kr/trackback/982

Leave a comment

« Previous : 1 : ... 60 : 61 : 62 : 63 : 64 : 65 : 66 : 67 : 68 : ... 648 : Next »

Recent Posts

  1. HDFS - Python Encoding 오류 처리
  2. HP - Vertica ROS Container 관련 오류...
  3. HDFS - Hive 실행시 System Time 오류
  4. HP - Vertica 사용자 쿼리 이력 테이블...
  5. Client에서 HDFS 환경의 데이터 처리시...

Recent Comments

  1. Hi, I do think this is a great blo... 룸싸롱 01시 43분
  2. Thanks in favor of sharing such a... 리니지 프리서버 01 20,
  3. 안녕하세요^^ 배그핵
  4. 안녕하세요^^ 도움이 되셨다니, 저... sunshiny
  5. 정말 큰 도움이 되었습니다.. 감사합... 사랑은

Recent Trackbacks

  1. invoice printing and mailing invoice printing and mailing 20 01
  2. cabo san lucas packages cabo san lucas packages 20 01
  3. london relocation services fees london relocation services fees 20 01
  4. printing and mailing companies printing and mailing companies 20 01
  5. Web Site Web Site 19 01

Calendar

«   01 2020   »
      1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31  

Bookmarks

  1. 위키피디아
  2. MysqlKorea
  3. 오라클 클럽
  4. API - Java
  5. Apache Hadoop API
  6. Apache Software Foundation
  7. HDFS 생태계 솔루션
  8. DNSBL - Spam Database Lookup
  9. Ready System
  10. Solaris Freeware
  11. Linux-Site
  12. 윈디하나의 솔라나라

Site Stats

TOTAL 2819317 HIT
TODAY 108 HIT
YESTERDAY 1318 HIT