Search Results for 'grouping'

1 POSTS

  1. 2014|01 MR - 수치요약(최댓값, 최솟값, 총계)


# MapReduce - 수치요약(최댓값, 최솟값, 총계)


그룹핑 작업 후 리듀서는 단순히 그룹과 연관된 모든 값을 반복 처리하고 최솟값과 최댓값을 구한다.
또한 키 그룹 내 구성원의 개수를 센다.
결합과 가환 속성으로 인해 리듀서로 셔플될 중간 키/값 쌍의 개수를 크게 줄이는 데 콤바이너를 사용할 수 있다.
맞게 구현되었다면 리듀서 코드와 콤바이너 코드는 동일할 수 있다.

문제 : 주어진 사용자 코멘트 목록에 대해 코멘트의 첫 시간과 마지막 시간 그리고 해당 사용자 코멘트의 총 개수를 알아내라.


# 샘플 데이터


# 출력 결과(기본 키에 대한 오름차순 정렬)
634110    2011-06-04T09:50:25.043    2011-07-04T09:55:25.043    3
634150    2011-08-05T09:50:25.043    2011-08-06T09:50:25.043    2
634152    2011-08-15T09:50:25.043    2011-08-25T09:50:25.043    2
743184    2011-07-04T09:50:25.043    2011-08-05T09:50:25.043    4

# MRDPUtils.java
package hadoop.mr.group;

import java.util.HashMap;
import java.util.Map;

public class MRDPUtils {

    // 이 헬퍼 함수는 스택오버플로우 데이터를 맵 자료 구조로 구문 분석한다.
    public static Map<String, String> transformXmlToMap(String xml) {
        Map<String, String> map = new HashMap<String, String>();
        try {

            // 따옴표로 데이터를 분할 할 수 있다는 점을 이용한다.
            String[] tokens = xml.trim().substring(5, xml.trim().length() - 3).split("\"");
            for (int i = 0; i < tokens.length - 1; i += 2) {
                String key = tokens[i].trim();
                String val = tokens[i + 1];
                map.put(key.substring(0, key.length() - 1), val);
            }

        } catch (StringIndexOutOfBoundsException e) {
            System.err.println(xml);
        }
        return map;
    }

    /**
     * # 로그 파일 형태의 로우를 맵 형식(키,값)으로 리턴
     */
    public static Map<String, String> transformLogToMap(String line){
        Map<String, String> map = new HashMap<String, String>();
        String[] menu = new String[]{"seq", "country", "createDate", "userId",
                    "etcCode", "comment", "device", "uniqueCode", "modifyDate"    };

        String[] tokens = line.trim().replaceAll("\"", "").split(","); // 로그 패턴에 따라 변경

        for(int i=0; i<tokens.length; i++){
            map.put(menu[i], tokens[i]);
        }
        return map;
    }
}


# MinMaxCountTuple.java
package hadoop.mr.group;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

public class MinMaxCountTuple implements Writable {
    private String min = "";
    private String max = "";
    private Long count = 0L;
    
    public String getMin() {
        return min;
    }

    public void setMin(String min) {
        this.min = min;
    }

    public String getMax() {
        return max;
    }

    public void setMax(String max) {
        this.max = max;
    }

    public long getCount() {
        return count;
    }
    
    public void setCount(long count) {
        this.count = count;
    }
    
    public void readFields(DataInput in) throws IOException {
        // 쓰여진 순서대로 데이터를 읽는다.
        // 유닉스 timestamp에서 신규 Date 객체를 생성한다.
//        min = new String(in);
//        max = new String(in.readLine());
        min = WritableUtils.readString(in);
        max = WritableUtils.readString(in);
        count = in.readLong();
    }
    
    public void write(DataOutput out) throws IOException {
        // 읽을 순서대로 데이터를 쓴다.
        // Date를 나타내기 위해 유닉스 timestamp를 사용한다.
//        out.writeBytes(min);
//        out.writeBytes(max);
        WritableUtils.writeString(out, min);
        WritableUtils.writeString(out, max);
        out.writeLong(count);
    }
    
    public String toString() {
        return min+ "\t" + max + "\t" + count;
    }

    public static class MinMaxCountMapper extends Mapper<Object, Text, Text, MinMaxCountTuple> {
        
        // 출력 키와 값 Writables
        private Text outUserId = new Text();
        private MinMaxCountTuple outTuple = new MinMaxCountTuple();
        
        // 생성일 문자열을 Date 객체로 표맷팅하는 객체
//        private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
        
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            Map<String, String> parsed = MRDPUtils.transformXmlToMap(value.toString());
            System.out.println("### parsed :"+parsed);
            
            // 최솟값, 최댓값을 찾기 위해 'CreationDate' 필드를 사용
            String strDate = parsed.get("CreationDate");
            System.out.println("### strDate :"+strDate);
            
            // 분류하기 위해 UserId 필드를 사용
            String userId = parsed.get("OwnerUserId");
            System.out.println("### userId :"+userId);
            
            // 문자열을 Date 객체로 파싱
/*            Date creationDate;
            try {
                creationDate = frmt.parse(strDate);
            
                // creationDate로 최솟값, 최댓값 설정
            
            } catch (ParseException e) {
                e.printStackTrace();
            }*/
            
            outTuple.setMin(strDate);
            outTuple.setMax(strDate);
            
            // 코멘트 갯수를 1로 설정
            outTuple.setCount(1);
            
            // 사용자 ID를 출력 키로 설정
            outUserId.set(userId);
            
            // 시간과 평균 코멘트 길이를 씀
            context.write(new Text(outUserId), outTuple);
        }
    }    

    public static class MinMaxCountReducer extends Reducer<Text, MinMaxCountTuple, Text, MinMaxCountTuple> {
        
        // 출력 값 Writable
        private MinMaxCountTuple result = new MinMaxCountTuple();
        
        public void reduce(Text key, Iterable<MinMaxCountTuple> values, Context context) throws IOException, InterruptedException {
            
            // 결과 초기화
            result.setMin(null);
            result.setMax(null);
            result.setCount(0);
            int sum = 0;
            
            // 해당 키에 대한 모든 값을 순환
            for (MinMaxCountTuple val : values) {

                // 입력의 최솟값이 결과의 최솟값보다 작으면, 입력의 최솟값을 결과의 최솟값으로 설정
                if (result.getMin() == null || val.getMin().compareTo(result.getMin()) < 0) {
                    result.setMin(val.getMin());
                }
                
                // 입력의 최댓값이 결과의 최댓값보다 크다면, 입력의 최댓값을 결과의 최댓값으로 설정
                if (result.getMax() == null ||
                        val.getMax().compareTo(result.getMax()) > 0) {
                    result.setMax(val.getMax());
                }
                
                // 값에 대한 개수를 합한다.
                sum += val.getCount();
            }
            
            // 입력 값들의 숫자의 개수를 설정
            result.setCount(sum);
            context.write(key, result);
        }
    }    
}




# MinMaxCountTupleDriver.java
package hadoop.mr.group;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MinMaxCountTupleDriver {
    
    public void runJob(String inputPath, String outputPath) throws Exception {

        Configuration conf = new Configuration();
        Job job = new Job(conf, "MinMaxCountTupleDriver driver");
        job.setJarByClass(MinMaxCountTupleDriver.class);
        
/*        job.setPartitionerClass(GroupKeyPartitioner.class);
        job.setGroupingComparatorClass(GroupKeyComparator.class);
        
        job.setSortComparatorClass(DateKeyComparator.class);*/

        // the keys are words (strings)
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(MinMaxCountTuple.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(MinMaxCountTuple.class);

        job.setMapperClass(MinMaxCountTuple.MinMaxCountMapper.class);
        job.setReducerClass(MinMaxCountTuple.MinMaxCountReducer.class);

        FileInputFormat.addInputPath(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
    
    public static void main(String[] args) throws Exception {
        (new MinMaxCountTupleDriver()).runJob("/user/sunshiny/sample_comment.txt"
            , "/user/sunshiny/sample_out");
    }
}


※ 위 내용은, 여러 자료를 참고하거나 제가 주관적으로 정리한 것입니다.
   잘못된 정보나 보완이 필요한 부분을, 댓글 또는 메일로 보내주시면 많은 도움이 되겠습니다.
01 12, 2014 14:15 01 12, 2014 14:15


Trackback URL : http://develop.sunshiny.co.kr/trackback/981

Leave a comment


Recent Posts

  1. HDFS - Python Encoding 오류 처리
  2. HP - Vertica ROS Container 관련 오류...
  3. HDFS - Hive 실행시 System Time 오류
  4. HP - Vertica 사용자 쿼리 이력 테이블...
  5. Client에서 HDFS 환경의 데이터 처리시...

Recent Comments

  1. 안녕하세요^^ 배그핵
  2. 안녕하세요^^ 도움이 되셨다니, 저... sunshiny
  3. 정말 큰 도움이 되었습니다.. 감사합... 사랑은
  4. 네, 안녕하세요. 댓글 남겨 주셔서... sunshiny
  5. 감사합니다 많은 도움 되었습니다!ㅎㅎ 프리시퀸스

Recent Trackbacks

  1. church building construction church building construction %M
  2. wireless clocks transmitter wireless clocks transmitter %M
  3. how to build a metal building how to build a metal building %M
  4. builder builder %M
  5. social media management company social media management company %M

Calendar

«   12 2019   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30 31        

Bookmarks

  1. 위키피디아
  2. MysqlKorea
  3. 오라클 클럽
  4. API - Java
  5. Apache Hadoop API
  6. Apache Software Foundation
  7. HDFS 생태계 솔루션
  8. DNSBL - Spam Database Lookup
  9. Ready System
  10. Solaris Freeware
  11. Linux-Site
  12. 윈디하나의 솔라나라

Site Stats

TOTAL 2781594 HIT
TODAY 1177 HIT
YESTERDAY 1360 HIT