>>分享SPSS,Hadoop等大数据处理技术,以及分布式架构以及集群系统的构建 书籍支持  卫琴直播  品书摘要  在线测试  资源下载  联系我们
发表一个新主题 开启一个新投票 回复文章 您是本文章第 20586 个阅读者 刷新本主题
 * 贴子主题:  MapReduce自定义分区实现 回复文章 点赞(0)  收藏  
作者:flybird    发表时间:2020-03-11 20:17:37     消息  查看  搜索  好友  邮件  复制  引用

  
MapReduce自带的分区器是HashPartitioner

原理:先对map输出的key求hash值,再模上reduce task个数,根据结果,决定此输出kv对,被匹配的reduce任务取走。

自定义分分区需要继承 Partitioner,复写 getpariton()方法

注意:map的输出是<K,V>键值对
其中
int partitionIndex = dict.get(text.toString())

,partitionIndex是获取K的值

     附:被计算的的文本    

Dear Dear Bear Bear River Car Dear Dear  Bear Rive
Dear Dear Bear Bear River Car Dear Dear  Bear Rive

自定义分区类:    

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import java.util.HashMap;
public class CustomPartitioner extends Partitioner<Text, IntWritable> {
    public static HashMap<String, Integer> dict = new HashMap<String, Integer>();
    //Text代表着map阶段输出的key,IntWritable代表着输出的值
    static{
        dict.put("Dear", 0);
        dict.put("Bear", 1);
        dict.put("River", 2);
        dict.put("Car", 3);
    }
    public int getPartition(Text text, IntWritable intWritable, int i) {
        //
        int partitionIndex = dict.get(text.toString());
        return partitionIndex;
    }
}

   注意:map的输出结果是键值对<K,V>,

int partitionIndex = dict.get(text.toString());

中的

partitionIndex

是map输出键值对中的键的值,也就是K的值。
Maper类:    

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String[] words = value.toString().split("\t");
        for (String word : words) {
            // 每个单词出现1次,作为中间结果输出
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

   Reducer类:    

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> {
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String[] words = value.toString().split("\t");
        for (String word : words) {
            // 每个单词出现1次,作为中间结果输出
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

   main函数:    

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountMain {
    public static void main(String[] args) throws IOException,
            ClassNotFoundException, InterruptedException {
        if (args.length != 2 || args == null) {
            System.out.println("please input Path!");
            System.exit(0);
        }
        Configuration configuration = new Configuration();
        configuration.set("mapreduce.job.jar","/home/bruce/project/kkbhdp01/target/com.kaikeba.hadoop-1.0-SNAPSHOT.jar");
        Job job = Job.getInstance(configuration, WordCountMain.class.getSimpleName());
        // 打jar包
        job.setJarByClass(WordCountMain.class);
        // 通过job设置输入/输出格式
        //job.setInputFormatClass(TextInputFormat.class);
        //job.setOutputFormatClass(TextOutputFormat.class);
        // 设置输入/输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 设置处理Map/Reduce阶段的类
        job.setMapperClass(WordCountMap.class);
        //map combine
        //job.setCombinerClass(WordCountReduce.class);
        job.setReducerClass(WordCountReduce.class);
        //如果map、reduce的输出的kv对类型一致,直接设置reduce的输出的kv对就行;如果不一样,需要分别设置map, reduce的输出的kv类型
        //job.setMapOutputKeyClass(.class)
        // 设置最终输出key/value的类型m
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setPartitionerClass(CustomPartitioner.class);
        job.setNumReduceTasks(4);
        // 提交作业
        job.waitForCompletion(true);

    }
}

----------------------------
原文链接:https://blog.51cto.com/10312890/2462756

程序猿的技术大观园:www.javathinker.net



[这个贴子最后由 flybird 在 2020-03-12 13:55:02 重新编辑]
  Java面向对象编程-->多线程(上)
  JavaWeb开发-->Servlet技术详解(Ⅲ)
  JSP与Hibernate开发-->数据库事务的并发问题的解决方案
  Java网络编程-->RMI框架
  精通Spring-->通过Axios访问服务器
  Vue3开发-->绑定表单
  一套可复用的方法论!从0-1搭建数据团队,看这篇就够了
  demo2 Kafka+Spark Streaming+Redis实时计算整合实践 foreac...
  使用Ambari搭建Hadoop集群
  深入玩转K8S之使用kubeadm安装Kubernetes v1.10以及常见问题...
  酒店评论数据分析和挖掘-展现数据分析全流程:报告展示篇
  Hadoop中文词频统计
  Apacheの日志分割
  大数据分布式平台Hadoop2.7.7 + Spark2.2.2搭建
  Spark RDD持久化、广播变量和累加器
  大数据虚拟混算平台Moonbox配置指南
  大数据平台CDH搭建
  大数据的学习方向
  大数据系统发展的技术路线
  深入剖析Hadoop HBase
  Hadoop中文版使用文档
  更多...
 IPIP: 已设置保密
树形列表:   
1页 0条记录 当前第1
发表一个新主题 开启一个新投票 回复文章


中文版权所有: JavaThinker技术网站 Copyright 2016-2026 沪ICP备16029593号-2
荟萃Java程序员智慧的结晶,分享交流Java前沿技术。  联系我们
如有技术文章涉及侵权,请与本站管理员联系。