# MapReduce - 수치요약(최댓값, 최솟값, 총계)


그룹핑 작업 후 리듀서는 단순히 그룹과 연관된 모든 값을 반복 처리하고 최솟값과 최댓값을 구한다.
또한 키 그룹 내 구성원의 개수를 센다.
결합과 가환 속성으로 인해 리듀서로 셔플될 중간 키/값 쌍의 개수를 크게 줄이는 데 콤바이너를 사용할 수 있다.
맞게 구현되었다면 리듀서 코드와 콤바이너 코드는 동일할 수 있다.

문제 : 주어진 사용자 코멘트 목록에 대해 코멘트의 첫 시간과 마지막 시간 그리고 해당 사용자 코멘트의 총 개수를 알아내라.


# 샘플 데이터


# 출력 결과(기본 키에 대한 오름차순 정렬)
634110    2011-06-04T09:50:25.043    2011-07-04T09:55:25.043    3
634150    2011-08-05T09:50:25.043    2011-08-06T09:50:25.043    2
634152    2011-08-15T09:50:25.043    2011-08-25T09:50:25.043    2
743184    2011-07-04T09:50:25.043    2011-08-05T09:50:25.043    4

# MRDPUtils.java
package hadoop.mr.group;

import java.util.HashMap;
import java.util.Map;

public class MRDPUtils {

    // 이 헬퍼 함수는 스택오버플로우 데이터를 맵 자료 구조로 구문 분석한다.
    public static Map<String, String> transformXmlToMap(String xml) {
        Map<String, String> map = new HashMap<String, String>();
        try {

            // 따옴표로 데이터를 분할 할 수 있다는 점을 이용한다.
            String[] tokens = xml.trim().substring(5, xml.trim().length() - 3).split("\"");
            for (int i = 0; i < tokens.length - 1; i += 2) {
                String key = tokens[i].trim();
                String val = tokens[i + 1];
                map.put(key.substring(0, key.length() - 1), val);
            }

        } catch (StringIndexOutOfBoundsException e) {
            System.err.println(xml);
        }
        return map;
    }

    /**
     * # 로그 파일 형태의 로우를 맵 형식(키,값)으로 리턴
     */
    public static Map<String, String> transformLogToMap(String line){
        Map<String, String> map = new HashMap<String, String>();
        String[] menu = new String[]{"seq", "country", "createDate", "userId",
                    "etcCode", "comment", "device", "uniqueCode", "modifyDate"    };

        String[] tokens = line.trim().replaceAll("\"", "").split(","); // 로그 패턴에 따라 변경

        for(int i=0; i<tokens.length; i++){
            map.put(menu[i], tokens[i]);
        }
        return map;
    }
}


# MinMaxCountTuple.java
package hadoop.mr.group;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;

public class MinMaxCountTuple implements Writable {
    private String min = "";
    private String max = "";
    private Long count = 0L;
    
    public String getMin() {
        return min;
    }

    public void setMin(String min) {
        this.min = min;
    }

    public String getMax() {
        return max;
    }

    public void setMax(String max) {
        this.max = max;
    }

    public long getCount() {
        return count;
    }
    
    public void setCount(long count) {
        this.count = count;
    }
    
    public void readFields(DataInput in) throws IOException {
        // 쓰여진 순서대로 데이터를 읽는다.
        // 유닉스 timestamp에서 신규 Date 객체를 생성한다.
//        min = new String(in);
//        max = new String(in.readLine());
        min = WritableUtils.readString(in);
        max = WritableUtils.readString(in);
        count = in.readLong();
    }
    
    public void write(DataOutput out) throws IOException {
        // 읽을 순서대로 데이터를 쓴다.
        // Date를 나타내기 위해 유닉스 timestamp를 사용한다.
//        out.writeBytes(min);
//        out.writeBytes(max);
        WritableUtils.writeString(out, min);
        WritableUtils.writeString(out, max);
        out.writeLong(count);
    }
    
    public String toString() {
        return min+ "\t" + max + "\t" + count;
    }

    public static class MinMaxCountMapper extends Mapper<Object, Text, Text, MinMaxCountTuple> {
        
        // 출력 키와 값 Writables
        private Text outUserId = new Text();
        private MinMaxCountTuple outTuple = new MinMaxCountTuple();
        
        // 생성일 문자열을 Date 객체로 표맷팅하는 객체
//        private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS");
        
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            Map<String, String> parsed = MRDPUtils.transformXmlToMap(value.toString());
            System.out.println("### parsed :"+parsed);
            
            // 최솟값, 최댓값을 찾기 위해 'CreationDate' 필드를 사용
            String strDate = parsed.get("CreationDate");
            System.out.println("### strDate :"+strDate);
            
            // 분류하기 위해 UserId 필드를 사용
            String userId = parsed.get("OwnerUserId");
            System.out.println("### userId :"+userId);
            
            // 문자열을 Date 객체로 파싱
/*            Date creationDate;
            try {
                creationDate = frmt.parse(strDate);
            
                // creationDate로 최솟값, 최댓값 설정
            
            } catch (ParseException e) {
                e.printStackTrace();
            }*/
            
            outTuple.setMin(strDate);
            outTuple.setMax(strDate);
            
            // 코멘트 갯수를 1로 설정
            outTuple.setCount(1);
            
            // 사용자 ID를 출력 키로 설정
            outUserId.set(userId);
            
            // 시간과 평균 코멘트 길이를 씀
            context.write(new Text(outUserId), outTuple);
        }
    }    

    public static class MinMaxCountReducer extends Reducer<Text, MinMaxCountTuple, Text, MinMaxCountTuple> {
        
        // 출력 값 Writable
        private MinMaxCountTuple result = new MinMaxCountTuple();
        
        public void reduce(Text key, Iterable<MinMaxCountTuple> values, Context context) throws IOException, InterruptedException {
            
            // 결과 초기화
            result.setMin(null);
            result.setMax(null);
            result.setCount(0);
            int sum = 0;
            
            // 해당 키에 대한 모든 값을 순환
            for (MinMaxCountTuple val : values) {

                // 입력의 최솟값이 결과의 최솟값보다 작으면, 입력의 최솟값을 결과의 최솟값으로 설정
                if (result.getMin() == null || val.getMin().compareTo(result.getMin()) < 0) {
                    result.setMin(val.getMin());
                }
                
                // 입력의 최댓값이 결과의 최댓값보다 크다면, 입력의 최댓값을 결과의 최댓값으로 설정
                if (result.getMax() == null ||
                        val.getMax().compareTo(result.getMax()) > 0) {
                    result.setMax(val.getMax());
                }
                
                // 값에 대한 개수를 합한다.
                sum += val.getCount();
            }
            
            // 입력 값들의 숫자의 개수를 설정
            result.setCount(sum);
            context.write(key, result);
        }
    }    
}




# MinMaxCountTupleDriver.java
package hadoop.mr.group;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MinMaxCountTupleDriver {
    
    public void runJob(String inputPath, String outputPath) throws Exception {

        Configuration conf = new Configuration();
        Job job = new Job(conf, "MinMaxCountTupleDriver driver");
        job.setJarByClass(MinMaxCountTupleDriver.class);
        
/*        job.setPartitionerClass(GroupKeyPartitioner.class);
        job.setGroupingComparatorClass(GroupKeyComparator.class);
        
        job.setSortComparatorClass(DateKeyComparator.class);*/

        // the keys are words (strings)
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(MinMaxCountTuple.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(MinMaxCountTuple.class);

        job.setMapperClass(MinMaxCountTuple.MinMaxCountMapper.class);
        job.setReducerClass(MinMaxCountTuple.MinMaxCountReducer.class);

        FileInputFormat.addInputPath(job, new Path(inputPath));
        FileOutputFormat.setOutputPath(job, new Path(outputPath));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
    
    public static void main(String[] args) throws Exception {
        (new MinMaxCountTupleDriver()).runJob("/user/sunshiny/sample_comment.txt"
            , "/user/sunshiny/sample_out");
    }
}


※ 위 내용은, 여러 자료를 참고하거나 제가 주관적으로 정리한 것입니다.
   잘못된 정보나 보완이 필요한 부분을, 댓글 또는 메일로 보내주시면 많은 도움이 되겠습니다.
01 12, 2014 14:15 01 12, 2014 14:15


Trackback URL : http://develop.sunshiny.co.kr/trackback/981

Leave a comment

« Previous : 1 : ... 61 : 62 : 63 : 64 : 65 : 66 : 67 : 68 : 69 : ... 648 : Next »

Recent Posts

  1. HDFS - Python Encoding 오류 처리
  2. HP - Vertica ROS Container 관련 오류...
  3. HDFS - Hive 실행시 System Time 오류
  4. HP - Vertica 사용자 쿼리 이력 테이블...
  5. Client에서 HDFS 환경의 데이터 처리시...

Recent Comments

  1. Hi, I do think this is a great blo... 룸싸롱 01시 43분
  2. Thanks in favor of sharing such a... 리니지 프리서버 01 20,
  3. 안녕하세요^^ 배그핵
  4. 안녕하세요^^ 도움이 되셨다니, 저... sunshiny
  5. 정말 큰 도움이 되었습니다.. 감사합... 사랑은

Recent Trackbacks

  1. invoice printing and mailing invoice printing and mailing 20 01
  2. cabo san lucas packages cabo san lucas packages 20 01
  3. london relocation services fees london relocation services fees 20 01
  4. printing and mailing companies printing and mailing companies 20 01
  5. Web Site Web Site 19 01

Calendar

«   01 2020   »
      1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31  

Bookmarks

  1. 위키피디아
  2. MysqlKorea
  3. 오라클 클럽
  4. API - Java
  5. Apache Hadoop API
  6. Apache Software Foundation
  7. HDFS 생태계 솔루션
  8. DNSBL - Spam Database Lookup
  9. Ready System
  10. Solaris Freeware
  11. Linux-Site
  12. 윈디하나의 솔라나라

Site Stats

TOTAL 2819267 HIT
TODAY 58 HIT
YESTERDAY 1318 HIT