MR - TopN (빈도수 추출)

Posted 01 12, 2014 15:10, Filed under: BigData/MapReduce


# MR - TopN (빈도수 추출)


우선순위 큐(PriorityQueue)를 이용해서 모든 레코드들을 다 저장하지 않고 현재까지 처리된 레코드들 중에 빈도수가 가장 큰 N개만 유지하는 형태로 메모리 사용을 최적화 하는 방법



# TopNDataKey.java

package hadoop.mr.group.topn;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;

public class TopNDataKey  implements WritableComparable<TopNDataKey>{
    
    private String dateKey;
    private Long day;
    
    public TopNDataKey() {
          this.dateKey = "";
          this.day = 0L;
    }

    public TopNDataKey(String dateKey, Long day) {
        this.dateKey = dateKey;
        this.day = day;
    }

    public String getDateKey() {
        return dateKey;
    }

    public void setDateKey(String dateKey) {
        this.dateKey = dateKey;
    }

    public Long getDay() {
        return day;
    }

    public void setDay(Long day) {
        this.day = day;
    }

    @Override
    public String toString() {
//        return (new StringBuilder()).append(dataKey).append(",").append(value).toString();
        return (new StringBuilder()).append(dateKey).append(",").append(day).toString();
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        // 쓰여진 순서대로 데이터를 읽음
        dateKey = WritableUtils.readString(in);
        day = in.readLong();
    }

    @Override
    public void write(DataOutput out) throws IOException {
        // 읽을 순서대로 데이터를 씀.
        WritableUtils.writeString(out, dateKey);
        out.writeLong(day); // IMEIKey
    }

    @Override
    public int compareTo(TopNDataKey key) {
        int result = dateKey.compareTo(key.dateKey);
        if (0 == result) {
            result = day.compareTo(key.day);
        }
        return result;
    }        
    
}


# TopNMapper.java

package hadoop.mr.group.topn;

import java.io.IOException;
import java.util.Comparator;
import java.util.PriorityQueue;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TopNMapper extends Mapper<Text, Text, Text, Text> {

    Comparator<TopNDataKey> comparator = new DataKeyComparator();

    //(우선순위 큐 초기 사이즈, 큐 내에서 정렬을 위해 비교할 때 사용할 Comparator 인스턴스)
    PriorityQueue<TopNDataKey> queue = new PriorityQueue<TopNDataKey>(10, comparator);
    int topN = 10;

    public void map(Text key, Text value, Context context) throws IOException, InterruptedException {
        Long lValue = (long)Integer.parseInt(value.toString());

        insert(queue, key.toString(), lValue, topN);
        // 입력 레코드가 모두 소진되는 시점 cleanup() 시에 context.write() 호출
    }  

    /* 
     * # 매퍼 실행시 초기 파라미터값 추출
     */  
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        topN = context.getConfiguration().getInt("topN", 10);
    }

    /* 
     * # 매퍼 종료전(입력 레코드 모두 소진 시점)에 출력 처리
     */      
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {

        // 큐의 크기가 0이 될때까지, 큐에서 가장 작은 객체를 빼내어 출력 레코드로 만듬.
        while (queue.size() != 0) {
            TopNDataKey item = (TopNDataKey)queue.remove();

            String[] dateKeyArr = item.getDateKey().split(",");
            String IMEIKey = dateKeyArr[1];
            item.setDateKey(dateKeyArr[0]);
            item.setDay(new Long(dateKeyArr[0].substring(6)));

            context.write(new Text(item.getDateKey()), new Text(IMEIKey+","+dateKeyArr[2]));
        }
    }

    /**
     * # 우선순위 큐에 TopNDataKey 객체를 넣음
     * 
     *  - 우선순위 큐의 크기가 topN으로 지정된 값보다 작을 경우
     *  - 우선순위 큐의 객체 중 가장 작은 빈도수보다 입력으로 지정된 IValue의 값이 더 큰 경우
     */      
    public static void insert(PriorityQueue queue, String item, Long lValue, int topN) {
        TopNDataKey head = (TopNDataKey)queue.peek();

        // 큐의 원소수가 topN보다 작거나 지금 들어온 빈도수가 큐내의 최소 빈도수보다 크면
        if (queue.size() < topN || head.getDay() < lValue) {
            TopNDataKey dataKey = new TopNDataKey(item, lValue);

            // 큐에 추가 
            queue.add(dataKey);

            // 큐의 원소수가 topN보다 크면 가장 작은 원소를 제거
            // if (queue.size() > topN && head != null && head.getFreq() < lValue) {
            if (queue.size() > topN) {
                queue.remove();
            }
        }
    }  


} 


# TopNReducer.java
package hadoop.mr.group.topn;

import java.io.IOException;
import java.util.Comparator;
import java.util.PriorityQueue;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class TopNReducer extends Reducer<Text, Text, Text, Text> {

    Comparator<TopNDataKey> comparator = new DataKeyComparator();

    // (우선순위 큐 초기 사이즈, 큐 내에서 정렬을 위해 비교할 때 사용할 Comparator 인스턴스)
    PriorityQueue<TopNDataKey> queue = new PriorityQueue<TopNDataKey>(10, comparator);
    int topN = 10;

    public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        long sum = 0;
        for (LongWritable val : values) {
            sum += val.get();
        }

        insert(queue, key.toString(), sum, topN);
        // 입력 레코드가 모두 소진되는 시점 cleanup() 시에 context.write() 호출
    }

    /* 
     * # 리듀서 실행시 초기 파라미터값 추출
     */      
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        topN = context.getConfiguration().getInt("topN", 10);
    }

    /* 
     * # 리듀서 종료전(입력 레코드 모두 소진 시점)에 출력 처리
     */             
    @Override
    protected void cleanup(Context context) throws IOException, InterruptedException {

        // 큐의 크기가 0이 될때까지, 큐에서 가장 작은 객체를 빼내어 출력 레코드로 만듬.
        while (queue.size() != 0) {
            TopNDataKey item = (TopNDataKey)queue.remove();

            String[] dateKeyArr = item.getDateKey().split(",");
            String IMEIKey = dateKeyArr[1];
            item.setDateKey(dateKeyArr[0]);
            item.setDay(new Long(dateKeyArr[0].substring(6)));          

            context.write(new Text(item.getDateKey()), new Text(IMEIKey+","+dateKeyArr[2]));
        }
    }

    /**
     * # 우선순위 큐에 TopNDataKey 객체를 넣음
     * 
     *  - 우선순위 큐의 크기가 topN으로 지정된 값보다 작을 경우
     *  - 우선순위 큐의 객체 중 가장 작은 빈도수보다 입력으로 지정된 IValue의 값이 더 큰 경우
     */    
    public static void insert(PriorityQueue queue, String item, Long lValue, int topN) {
        TopNDataKey head = (TopNDataKey)queue.peek();

        // 큐의 원소수가 topN보다 작거나 지금 들어온 빈도수가 큐내의 최소 빈도수보다 크면
        if (queue.size() < topN || head.getDay() < lValue) {
            TopNDataKey dataKey = new TopNDataKey(item, lValue);

            // 큐에 추가 
            queue.add(dataKey);

            // 큐의 원소수가 topN보다 크면 가장 작은 원소를 제거합니다.
            // if (queue.size() > topN && head != null && head.getFreq() < lValue) {
            if (queue.size() > topN) {
                queue.remove();
            }
        }
    }    
}


# TopNDriver.java
package hadoop.mr.group.topn;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class TopNDriver {
    public void runJob(String inputPath, String outputPath) throws Exception {

        Configuration conf = new Configuration();

        Job job = new Job(conf, "TopN Driver");
        job.setJarByClass(TopNDriver.class);    

        job.setMapperClass(TopNMapper.class);
        job.setReducerClass(TopNReducer.class);
        job.setNumReduceTasks(1);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setInputFormatClass(KeyValueTextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.addInputPath(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        job.getConfiguration().setInt("topN", 30);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
    
/**
 * #  일자별 정렬일 경우에는 잘 되지만, 디렉토리 전체를 대상으로 정렬할때 비정상
 *  > TopN 에서 전체 디렉토리 되는 상황확인, 내림차순 X
 */

    public static void main(String[] args) throws Exception {
        (new TopNDriver()).runJob(
                "/user/var/mapred/visit_data_output/one_day2/DataKeyValue-m-00000", 
                "/user/var/mapred/visit_data_output/one_day2/TopN_Data"
                );
    }
}



# DataKeyComparator.java
package hadoop.mr.group.topn;


import java.util.Comparator;

/**
 * # 우선순위 큐에서 원소들 간의 순서를 정함
 *  두개의 인스턴스를 받아서 getDay 값이 더 큰 것을 앞으로 세워줌. 
 *
 */
public class DataKeyComparator implements Comparator<TopNDataKey> {

    @SuppressWarnings("rawtypes")
    @Override
    public int compare(TopNDataKey x, TopNDataKey y) {

        // 값을 비교한 정렬 : 날자를 정렬 대상 그룹 지정
        return x.getDay() == y.getDay() ? 0 : (x.getDay() > y.getDay() ? -1 : 1);
        
/*        if (x.getFreq() < y.getFreq()) {
            return -1;
          }
          if (x.getFreq() > y.getFreq()) {
            return 1;
          }
          return 0;    
*/    
    }
}

※ 위 내용은, 여러 자료를 참고하거나 제가 주관적으로 정리한 것입니다.
   잘못된 정보나 보완이 필요한 부분을, 댓글 또는 메일로 보내주시면 많은 도움이 되겠습니다.
01 12, 2014 15:10 01 12, 2014 15:10


Trackback URL : http://develop.sunshiny.co.kr/trackback/982

Leave a comment

« Previous : 1 : ... 60 : 61 : 62 : 63 : 64 : 65 : 66 : 67 : 68 : ... 648 : Next »

Recent Posts

  1. HDFS - Python Encoding 오류 처리
  2. HP - Vertica ROS Container 관련 오류...
  3. HDFS - Hive 실행시 System Time 오류
  4. HP - Vertica 사용자 쿼리 이력 테이블...
  5. Client에서 HDFS 환경의 데이터 처리시...

Recent Comments

  1. 안녕하세요^^ 배그핵
  2. 안녕하세요^^ 도움이 되셨다니, 저... sunshiny
  3. 정말 큰 도움이 되었습니다.. 감사합... 사랑은
  4. 네, 안녕하세요. 댓글 남겨 주셔서... sunshiny
  5. 감사합니다 많은 도움 되었습니다!ㅎㅎ 프리시퀸스

Recent Trackbacks

  1. prefab steel buildings prefab steel buildings %M
  2. Mysql - mysql 설치후 Character set... 멀고 가까움이 다르기 때문 %M

Calendar

«   09 2019   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30          

Bookmarks

  1. 위키피디아
  2. MysqlKorea
  3. 오라클 클럽
  4. API - Java
  5. Apache Hadoop API
  6. Apache Software Foundation
  7. HDFS 생태계 솔루션
  8. DNSBL - Spam Database Lookup
  9. Ready System
  10. Solaris Freeware
  11. Linux-Site
  12. 윈디하나의 솔라나라

Site Stats

TOTAL 2683582 HIT
TODAY 416 HIT
YESTERDAY 438 HIT