MR - 수치요약(최댓값, 최솟값, 총계)
Posted 01 12, 2014 14:15, Filed under: BigData/MapReduce# MapReduce - 수치요약(최댓값, 최솟값, 총계)
그룹핑 작업 후 리듀서는 단순히 그룹과 연관된 모든 값을 반복 처리하고 최솟값과 최댓값을 구한다.
또한 키 그룹 내 구성원의 개수를 센다.
결합과 가환 속성으로 인해 리듀서로 셔플될 중간 키/값 쌍의 개수를 크게 줄이는 데 콤바이너를 사용할 수 있다.
맞게 구현되었다면 리듀서 코드와 콤바이너 코드는 동일할 수 있다.
문제 : 주어진 사용자 코멘트 목록에 대해 코멘트의 첫 시간과 마지막 시간 그리고 해당 사용자 코멘트의 총 개수를 알아내라.
# 샘플 데이터

# 출력 결과(기본 키에 대한 오름차순 정렬)
634110 2011-06-04T09:50:25.043 2011-07-04T09:55:25.043 3
634150 2011-08-05T09:50:25.043 2011-08-06T09:50:25.043 2
634152 2011-08-15T09:50:25.043 2011-08-25T09:50:25.043 2
743184 2011-07-04T09:50:25.043 2011-08-05T09:50:25.043 4
634150 2011-08-05T09:50:25.043 2011-08-06T09:50:25.043 2
634152 2011-08-15T09:50:25.043 2011-08-25T09:50:25.043 2
743184 2011-07-04T09:50:25.043 2011-08-05T09:50:25.043 4
# MRDPUtils.java
package hadoop.mr.group; import java.util.HashMap; import java.util.Map; public class MRDPUtils { // 이 헬퍼 함수는 스택오버플로우 데이터를 맵 자료 구조로 구문 분석한다. public static Map<String, String> transformXmlToMap(String xml) { Map<String, String> map = new HashMap<String, String>(); try { // 따옴표로 데이터를 분할 할 수 있다는 점을 이용한다. String[] tokens = xml.trim().substring(5, xml.trim().length() - 3).split("\""); for (int i = 0; i < tokens.length - 1; i += 2) { String key = tokens[i].trim(); String val = tokens[i + 1]; map.put(key.substring(0, key.length() - 1), val); } } catch (StringIndexOutOfBoundsException e) { System.err.println(xml); } return map; } /** * # 로그 파일 형태의 로우를 맵 형식(키,값)으로 리턴 */ public static Map<String, String> transformLogToMap(String line){ Map<String, String> map = new HashMap<String, String>(); String[] menu = new String[]{"seq", "country", "createDate", "userId", "etcCode", "comment", "device", "uniqueCode", "modifyDate" }; String[] tokens = line.trim().replaceAll("\"", "").split(","); // 로그 패턴에 따라 변경 for(int i=0; i<tokens.length; i++){ map.put(menu[i], tokens[i]); } return map; } }
# MinMaxCountTuple.java
package hadoop.mr.group; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.util.Map; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; public class MinMaxCountTuple implements Writable { private String min = ""; private String max = ""; private Long count = 0L; public String getMin() { return min; } public void setMin(String min) { this.min = min; } public String getMax() { return max; } public void setMax(String max) { this.max = max; } public long getCount() { return count; } public void setCount(long count) { this.count = count; } public void readFields(DataInput in) throws IOException { // 쓰여진 순서대로 데이터를 읽는다. // 유닉스 timestamp에서 신규 Date 객체를 생성한다. // min = new String(in); // max = new String(in.readLine()); min = WritableUtils.readString(in); max = WritableUtils.readString(in); count = in.readLong(); } public void write(DataOutput out) throws IOException { // 읽을 순서대로 데이터를 쓴다. // Date를 나타내기 위해 유닉스 timestamp를 사용한다. // out.writeBytes(min); // out.writeBytes(max); WritableUtils.writeString(out, min); WritableUtils.writeString(out, max); out.writeLong(count); } public String toString() { return min+ "\t" + max + "\t" + count; } public static class MinMaxCountMapper extends Mapper<Object, Text, Text, MinMaxCountTuple> { // 출력 키와 값 Writables private Text outUserId = new Text(); private MinMaxCountTuple outTuple = new MinMaxCountTuple(); // 생성일 문자열을 Date 객체로 표맷팅하는 객체 // private final static SimpleDateFormat frmt = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS"); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { Map<String, String> parsed = MRDPUtils.transformXmlToMap(value.toString()); System.out.println("### parsed :"+parsed); // 최솟값, 최댓값을 찾기 위해 'CreationDate' 필드를 사용 String strDate = parsed.get("CreationDate"); System.out.println("### strDate :"+strDate); // 분류하기 위해 UserId 필드를 사용 String userId = parsed.get("OwnerUserId"); System.out.println("### userId :"+userId); // 문자열을 Date 객체로 파싱 /* Date creationDate; try { creationDate = frmt.parse(strDate); // creationDate로 최솟값, 최댓값 설정 } catch (ParseException e) { e.printStackTrace(); }*/ outTuple.setMin(strDate); outTuple.setMax(strDate); // 코멘트 갯수를 1로 설정 outTuple.setCount(1); // 사용자 ID를 출력 키로 설정 outUserId.set(userId); // 시간과 평균 코멘트 길이를 씀 context.write(new Text(outUserId), outTuple); } } public static class MinMaxCountReducer extends Reducer<Text, MinMaxCountTuple, Text, MinMaxCountTuple> { // 출력 값 Writable private MinMaxCountTuple result = new MinMaxCountTuple(); public void reduce(Text key, Iterable<MinMaxCountTuple> values, Context context) throws IOException, InterruptedException { // 결과 초기화 result.setMin(null); result.setMax(null); result.setCount(0); int sum = 0; // 해당 키에 대한 모든 값을 순환 for (MinMaxCountTuple val : values) { // 입력의 최솟값이 결과의 최솟값보다 작으면, 입력의 최솟값을 결과의 최솟값으로 설정 if (result.getMin() == null || val.getMin().compareTo(result.getMin()) < 0) { result.setMin(val.getMin()); } // 입력의 최댓값이 결과의 최댓값보다 크다면, 입력의 최댓값을 결과의 최댓값으로 설정 if (result.getMax() == null || val.getMax().compareTo(result.getMax()) > 0) { result.setMax(val.getMax()); } // 값에 대한 개수를 합한다. sum += val.getCount(); } // 입력 값들의 숫자의 개수를 설정 result.setCount(sum); context.write(key, result); } } }
# MinMaxCountTupleDriver.java
package hadoop.mr.group; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class MinMaxCountTupleDriver { public void runJob(String inputPath, String outputPath) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "MinMaxCountTupleDriver driver"); job.setJarByClass(MinMaxCountTupleDriver.class); /* job.setPartitionerClass(GroupKeyPartitioner.class); job.setGroupingComparatorClass(GroupKeyComparator.class); job.setSortComparatorClass(DateKeyComparator.class);*/ // the keys are words (strings) job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(MinMaxCountTuple.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(MinMaxCountTuple.class); job.setMapperClass(MinMaxCountTuple.MinMaxCountMapper.class); job.setReducerClass(MinMaxCountTuple.MinMaxCountReducer.class); FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); System.exit(job.waitForCompletion(true) ? 0 : 1); } public static void main(String[] args) throws Exception { (new MinMaxCountTupleDriver()).runJob("/user/sunshiny/sample_comment.txt" , "/user/sunshiny/sample_out"); } }
※ 위 내용은, 여러 자료를 참고하거나 제가 주관적으로 정리한 것입니다.
잘못된 정보나 보완이 필요한 부분을, 댓글 또는 메일로 보내주시면 많은 도움이 되겠습니다.
잘못된 정보나 보완이 필요한 부분을, 댓글 또는 메일로 보내주시면 많은 도움이 되겠습니다.
"BigData / MapReduce" 분류의 다른 글
MR - 체인매퍼(ChainMapper), 체인리듀서(ChainReducer) (0) | 2014/01/19 |
MR - MultipleOutputs(구분값에 따른 다수 파일 출력) (0) | 2014/01/19 |
MR - 사용자 정의 옵션, 매퍼 구현 (0) | 2014/01/19 |
MR - 사용자 정의 옵션(GenericOptionsParser, Tool, ToolRunner) (0) | 2014/01/19 |
MR - 로컬 MapReduce 잡 구동 설정(Windows, Eclipse, Debug) (0) | 2014/01/17 |
MR - MRUnit 샘플, EclEmma Java Code Coverage (0) | 2014/01/17 |
MR - 총 건수 추출 (0) | 2014/01/14 |
MR - Count Trigram(연속된 단어 추출) (0) | 2014/01/12 |
MR - TopN (빈도수 추출) (0) | 2014/01/12 |
MapReduce - 정렬 구현 (0) | 2013/05/18 |
Trackback URL : http://develop.sunshiny.co.kr/trackback/981