Search Results for 'MultipleOutputs'

1 POSTS

  1. 2014|01 MR - MultipleOutputs(구분값에 따른 다수 파일 출력)


MultipleOutputs(구분값에 따른 다수 파일 출력)


패키지 : org.apache.hadoop.mapreduce.lib.output.MultipleOutputs

여러 개의 출력 데이터를 쉽게 생성하도록 돕는 기능
MultipleOutputs는 여러 개의 OutputCollectors를 만들고 각 OutputCollectors에 대한 출력 경로, 출력 포멧, 키와 값 유형을 설정.
MultipleOutputs에서 제공하는 static 메서드인 addNamedOutput 를 호출해서 설정.

MultipleOutputs에서 출력하는 데이터는 기존 맵리듀스 잡에서 생성하는 데이터와는 별개로 생성.
일반적으로 맵리듀스 잡이 종료되면 리듀스 단계에서 part-r-nnnnn이라는 출력 데이터를 생성함
그러나 리듀스 단계에서 MultipleOutputs를 이용해 myfile이라는 디렉터리에 데이터를 생성할 경우,
part-r-nnnnn 과 myfile-r-nnnnn 이 동시에 생성됨.



# 매퍼 구현
출력 데이터의 키값에 구분자(A, D)를 추가
public class TestMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    // map 출력값
    private final static IntWritable outputValue = new IntWritable(1);

    // map 출력키
    private Text outputKey = new Text();

    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {

        if (key.get() > 0) {
            // 콤마 구분자 분리
            String[] colums = value.toString().split(",");

            // AA값에 대한 출력
            if(!colums[1].equals("AA")){
                // 출력키 설정
                outputKey.set("A,"+colums[1]);

                // 출력 데이터 생성
                context.write(outputKey, outputValue);
            }

            // BB값에 대한 출력
            if(!colums[1].equals("BB")){
                // 출력키 설정
                outputKey.set("B,"+colums[1]);

                // 출력 데이터 생성
                context.write(outputKey, outputValue);
            }
        }
    }
}


# 리듀서 구현
리듀서는 매퍼의 출력 데이터에서 A와 B를 구분해서 각 수치를 합산해 개별 데이터 파일을 생성.
public class TestReducer extends Reducer<DateKey, IntWritable, Text, IntWritable> {

    // MultipleOutputs를 전역 변수로 선언
    private MultipleOutputs<Text, IntWritable> mos;

    // reduce 출력키
    private Text outputKey = new Text();

    // reduce 출력값
    private IntWritable result = new IntWritable();

    // MultipleOutputs 객체를 생성
    @Override
    public void setup(Context context) throws IOException, InterruptedException {
        mos = new MultipleOutputs<Text, IntWritable>(context);
    }

    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
            throws IOException, InterruptedException {

        // 콤마 구분자 분리
        String[] colums = key.toString().split(",");

        int sum = 0;

        // A 타입 출력값 설정
        if (colums[0].equals("A")) {
            for (IntWritable value : values) {
                sum += value.get();
            }
            outputKey.set(colums[1]);
            result.set(sum);
            // MultipleOutputs 객체의 write 메서드 호출
            // write(디렉터리명, 키, 값)
            mos.write("A_Type", outputKey, result);
        }
        // B 타입 출력값 설정
        else if (colums[0].equals("B")) {
            for (IntWritable value : values) {
                sum += value.get();
            }
            outputKey.set(colums[1]);
            result.set(sum);
            // MultipleOutputs 객체의 write 메서드 호출
            // write(디렉터리명, 키, 값)            
            mos.write("B_Type", outputKey, result);
        }
    }

    // MultipleOutputs 객체 종료
    @Override
    public void cleanup(Context context) throws IOException,
    InterruptedException {
        mos.close();
    }    
}


# 드라이버 클래스 구현
public class TestDriver extends Configured implements Tool {

    // int 타입을 반환하는 run 메서드 구현
    public int run(String[] args) throws Exception {

        String[] otherArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();

        // 입출력 데이터 경로 확인
        if (otherArgs.length != 2) {
            System.err.println("Usage: TestDriver <in> <out>");
            System.exit(2);
        }

        // Job 이름 설정
        Job job = new Job(getConf(), "TestDriver");

        // 입출력 데이터 경로 설정
        FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));        
        .
        .
        .
        // MultipleOutputs 설정
        MultipleOutputs.addNamedOutput(job, "A_Type", TextOutputFormat.class, 
                                                      Text.class, IntWritable.class);
        MultipleOutputs.addNamedOutput(job, "B_Type", TextOutputFormat.class, 
                                                      Text.class, IntWritable.class);        
        
        job.waitForCompletion(true);
        return 0;
    }
    
    public static void main(String[] args) throws Exception {
        // Tool 인터페이스 실행 : run 메서드는 ToolRunner에서만 호출할 수 있음.
        int res = ToolRunner.run(new Configuration(), new TestDriver(), args);
        System.out.println("## RESULT:" + res);
    }    
}


# 드라이버 클래스 실행
$ bin/hadoop jar TestDriver.jar hadoop.test.TestDriver input output_mos

# 지정한 경로에 출력 데이터 확인
$ bin/hadoop fs -ls output_mos

_SUCCESS
A_Type-r-00000
B_Type-r-00000

part-r-00000



참고 : 시작하세요! 하둡 프로그래밍



※ 위 내용은, 여러 자료를 참고하거나 제가 주관적으로 정리한 것입니다.
   잘못된 정보나 보완이 필요한 부분을, 댓글 또는 메일로 보내주시면 많은 도움이 되겠습니다.
01 19, 2014 14:07 01 19, 2014 14:07


Trackback URL : http://develop.sunshiny.co.kr/trackback/993

Leave a comment


Recent Posts

  1. HDFS - Python Encoding 오류 처리
  2. HP - Vertica ROS Container 관련 오류...
  3. HDFS - Hive 실행시 System Time 오류
  4. HP - Vertica 사용자 쿼리 이력 테이블...
  5. Client에서 HDFS 환경의 데이터 처리시...

Recent Comments

  1. 안녕하세요^^ 배그핵
  2. 안녕하세요^^ 도움이 되셨다니, 저... sunshiny
  3. 정말 큰 도움이 되었습니다.. 감사합... 사랑은
  4. 네, 안녕하세요. 댓글 남겨 주셔서... sunshiny
  5. 감사합니다 많은 도움 되었습니다!ㅎㅎ 프리시퀸스

Recent Trackbacks

  1. Mysql - mysql 설치후 Character set... 멀고 가까움이 다르기 때문 %M

Calendar

«   10 2019   »
    1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30 31    

Bookmarks

  1. 위키피디아
  2. MysqlKorea
  3. 오라클 클럽
  4. API - Java
  5. Apache Hadoop API
  6. Apache Software Foundation
  7. HDFS 생태계 솔루션
  8. DNSBL - Spam Database Lookup
  9. Ready System
  10. Solaris Freeware
  11. Linux-Site
  12. 윈디하나의 솔라나라

Site Stats

TOTAL 2724069 HIT
TODAY 535 HIT
YESTERDAY 589 HIT