Hadoop实现数据库表关联,高级编程

作者: 关于计算机  发布:2019-11-09

Hadoop实现数据库表关联主要有两种方式:Map Side Join和Reduce Side Join。本文主要讨论Reduce Side Join的实现与优化。Reduce Side Join是一个完整的MapReduce Job。在Map阶段将来自不同源的原始数据进行区分,对来自不同表的记录构造关联键并将来自不同“表”的记录划分到同一个分组。在Reduce阶段,将来自不同的表记录进行关联。

在Reduce阶段,一般将所有记录进行划分(区分该记录来在于哪个数据库表),然后将区分的不同表进行关联。在这种情况下,所有的记录都需要读入到内存,严重的影响程序的效率。在这个阶段,我们可以将程序进行优化:将数据量最大的表排列到最后。在读入内存的时候,只需要将小表保存到内存即可。当遍历到大表时开始进行关联,并将结果进行输出。

Hadoop学习笔记(7)

为了在Reduce阶段能够区分该条记录是来自数据量较大的表还是来自数据量较小的表,我们需要在Reduce的输入<Key,Value>中进行标记。那么我们思考一下,Map输出的Key应该包含什么呢?其应该主要包含以下几个内容:

——高级编程

1.关联主键

从前面的学习中,我们了解到了MapReduce整个过程需要经过以下几个步骤:

2.来源标记,标记其来源于哪个数据库表

1.输入(input):将输入数据分成一个个split,并将split进一步拆成<key, value>。

3.大小标记,标记其是属于数据量比较大的表还是数据量比较小的表

2.映射(map):根据输入的<key, value>进生处理,

仅仅对Key进行修改,还不能达到我们所需要的优化效果。应该我们还需要对Reduce的输入按照小表的数据都在前面而大表的数据都在后面进行排序。只有这样才能够只需要缓存小表。为了达到这个效果我们需要实现两个Comparator对Key进行排序。一个是Sort Comparator,一个是Group Comparator。Group Comparator主要是对关联主键进行关联,将关联键相同的记录划分到一起。Sort Comparator负责对所有的记录进行排序,首先根据“关联键”进行排序;然后根据“大小标记”进行排序,来自小表的记录排在前面。这样,我们就可以实现对数据库表关联的优化。

3.合并(combiner):合并中间相两同的key值。

具体的代码如下面所示:(程序只是示例,写的不够严谨,仅作说明使用)

4.分区(Partition):将<key, value>分成N分,分别送到下一环节。

 

5.化简(Reduce):将中间结果合并,得到最终结果

 

6.输出(output):负责输入最终结果。

 

其中第3、4步又成洗牌(shuffle)过程。

*********************************************************************

 

public class TagedKey implements WritableComparable<TagedKey> {

从前面HelloWorld示例中,我们看到,我们只去个性化了Map和Reduce函数,那其他函数呢,是否可以个性化?答案当然是肯定的。下面我们就对每个环节的个性化进行介绍。

 

自定义输入格式

public Text key;//关联主键

输 入格式(InputFormat)用于描述整个MapReduce作业的数据输入规范。先对输入的文件进行格式规范检查,如输入路径,后缀等检查;然后对 数据文件进行输入分块(split);再对数据块逐一读出;最后转换成Map所需要的<key, value>健值对。

public IntWritable tag;//来源标记

系统中提供丰富的预置输入格式。最常用的以下两种:

public IntWritable size;//大小标记

TextInputFormat:系统默认的数据输入格式。将文件分块,并逐行读入,每一行记录行成一对<key, value>。其中,key值为当前行在整个文件中的偏移量,value值为这一行的文本内容。

.......

KeyValueTextInputFormat:这是另一个常用的数据输入格式,读入的文本文件内容要求是以<key, value>形式。读出的结果也就直接形成<key, value>送入map函数中。

}

 

*********************************************************************

如果选择输入格式呢?那就只要在job函数中调用

 

  1. job.setInputFormatClass(TextInputFormat.class);

*********************************************************************

在Hello中我们没有设定,系统默认选择了TextInputFormat。

public class GroupComparator extends WritableComparator {

一般情况够用了,但某些情况下,还是无法满足用户的需求,所以还是需要个性化。个性化则按下面的方式进行:

public GroupComparator() {

如果数据我们是来源于文件,则可以继承FileInputFormat:

// TODO Auto-generated constructor stub

  1. public class MyInputFormat extends FileInputFormat<Text,Text> {

super(TagedKey.class, true);

  1.    @Override

  2.    public RecordReader<Text, Text> createRecordReader(InputSplit split,

}

  1.          TaskAttemptContext context) throws IOException, InterruptedException {

  2.       // TODO Auto-generated method stub

  3.       return null;

 

  1.    }

  2. }

@Override

如果数据我们是来源于非文件,如关系数据,则继承

public int compare(WritableComparable a, WritableComparable b) {

  1. public class MyInputFormat extends InputFormat<Text,Text> {

  2.  

  3.    @Override

  4.    public RecordReader<Text, Text> createRecordReader(InputSplit arg0,

// TODO Auto-generated method stub

  1.          TaskAttemptContext arg1) throws IOException, InterruptedException {

  2.       // TODO Auto-generated method stub

  3.       return null;

TagedKey key1 = (TagedKey) a;

  1.    }

  2.  

  3.    @Override

  4.    public List<InputSplit> getSplits(JobContext arg0) throws IOException,

  5.          InterruptedException {

TagedKey key2 = (TagedKey) b;

  1.       // TODO Auto-generated method stub

  2.       return null;

return key1.key.compareTo(key2.key);

  1.    }

  2.  

  3. }

}

这里比较清晰了,下面个函数为拆分成split,上面个函数跟据split输出成Key,value。

}

 

*********************************************************************

自定义map处理

*********************************************************************

这个好理解,我们的HelloWorld程序中就自定义了map处理函数。然后在job中指定了我们的处理类:

public class OrderComparator extends WritableComparator {

  1. job.setMapperClass(TokenizerMapper.class);

public OrderComparator() {

能不能没有map呢? 可以的,如果没有map,也就是这与上面的这个setMapperClass,则系统自动指定一个null,这时处理是将输入的<key,value>值,不作任何修改,直接送到下一环节中。

// TODO Auto-generated constructor stub

个性化代码如下:

super(TagedKey.class, true);

  1. public static class TokenizerMapper

  2.        extends Mapper<Object, Text, Text, IntWritable>{

  3.  

  4.     public void map(Object key, Text value, Context context

  5.                     ) throws IOException, InterruptedException {

}

  1.  

  2.         context.write(key, value);

 

  1.     }

  2.   }

@Override

 

public int compare(WritableComparable a, WritableComparable b) {

自定义合并Combiner

// TODO Auto-generated method stub

自定义合并Combiner类,主要目的是减少Map阶段输出中间结果的数据量,降低数据的网络传输开销。

TagedKey key1 = (TagedKey) a;

Combine 过程,实际跟Reduce过程相似,只是执行不同,Reduce是在Reducer环节运行,而Combine是紧跟着Map之后,在同一台机器上预先将 结时进行一轮合并,以减少送到Reducer的数据量。所以在HelloWorld时,可以看到,Combiner和Reducer用的是同一个类:

TagedKey key2 = (TagedKey) b;

  1. job.setCombinerClass(IntSumReducer.class);

  2. job.setReducerClass(IntSumReducer.class);

 

如何个性化呢,这个跟Reducer差不多了:

if (key1.key.compareTo(key2.key) != 0) {

  1. public static class MyCombiner

  2.       extends Reducer<Text,IntWritable,Text,IntWritable> {

  3.  

  4.    public void reduce(Text key, Iterable<IntWritable> values,

  5.                       Context context

return key1.key.compareTo(key2.key);

  1.                       ) throws IOException, InterruptedException {

} else {

  1.  

  2.      context.write(key, new IntWritable(1));

  3.    }

  4.  }

return key1.size.compareTo(key2.size);

 

}

自定义分区Partitioner

}

在 MapReduce程序中,Partitioner决定着Map节点的输出将被分区到哪个Reduce节点。而默认的Partitioner是 HashPartitioner,它根据每条数据记录的主健值进行Hash操作,获得一个非负整数的Hash码,然后用当前作业的Reduce节点数取模 运算,有N个结点的话,就会平均分配置到N个节点上,一个隔一个依次。大多情况下这个平均分配是够用了,但也会有一些特殊情况,比如某个文件的,不能被拆 开到两个结点中,这样就需要个性化了。

}

个性化方式如下:

*********************************************************************

  1. public static class MyPartitioner

  2.       extends HashPartitioner<K,V> {

*********************************************************************

  1.  

  2.    public void getPartition(K key, V value,int numReduceTasks) {

job.setSortComparatorClass(OrderComparator.class);

  1.  

  2.      super.getPartition(key,value,numReduceTasks);

job.setGroupingComparatorClass(GroupComparator.class);

  1.    }

  2.  }

*********************************************************************

方式其实就是在执行之前可以改变一下key,来欺骗这个hash表。

更多Hadoop相关信息见Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

 

图片 1

自定义化简(Reducer)

这一块是将Map送来的结果进行化简处理,并形成最终的输出值。与前面map一样,在HelloWorld中我们就见到过了。通过下面代码可以设置其值:

  1. job.setReducerClass(IntSumReducer.class);

同样,也可以这样类可以不设置,如果不设置的话,就是把前面送来的值,直接送向输出格式器中。

如果要个性化,则如下:

  1.   public static class IntSumReducer

  2.      extends Reducer<Text,IntWritable,Text,IntWritable> {

  3.  

  4.   public void reduce(Text key, Iterable<IntWritable> values,

  5.                      Context context

  1.                      ) throws IOException, InterruptedException {
  1.     context.write(key, result);
  1.   }

  2. }

 

自定义输出格式

数 据输出格式(OutPutFormat)用于描述MapReduce作业的数据输出规范。Hadoop提供了丰富的内置数据输出格式。最常的数据输出格式 是TextOutputFormat,也是系统默认的数据输出格式,将结果以"key+t+value"的形式逐行输出到文本文件中。还有其它的, 如:DBOutputFormat,FileOutputFormat,FilterOutputFormat,IndexUpdataOutputFormat,LazyOutputFormat,MapFileOutputFormat, 等等。

如果要个性化,则按下面方式进行:

  1. public class MyOutputFormat extends OutputFormat<Text,Text> {

  2.  

  3.    @Override

  4.    public void checkOutputSpecs(JobContext arg0) throws IOException,

  5.          InterruptedException {

  1.       // TODO Auto-generated method stub

  2.  

  3.    }

  4.  

  5.    @Override

  6.    public OutputCommitter getOutputCommitter(TaskAttemptContext arg0)

  1.          throws IOException, InterruptedException {

  2.       // TODO Auto-generated method stub

  3.       return null;

  1.    }

  2.  

  3.    @Override

  4.    public RecordWriter<Text, Text> getRecordWriter(TaskAttemptContext arg0)

  5.          throws IOException, InterruptedException {

  6.       // TODO Auto-generated method stub

  7.       return null;

  1.    }

  2.  

  3. }

 

复合健——用户自定义类型。

从前面的整个过程中可以看到,都是采用key-value的方式进行传入传出,而这些类型大多是单一的字符串,和整型。如果我的key中需要包含多个信息怎么办?用字符串直接拼接么? 太不方便了,最好能够自己定义一个类,作为这个key,这样就方便了。

如果定义一个类作为key 或value的类型? 有什么要求?就是这个类型必须要继承WritableComparable<T>这个类,所以如果要自定义一个类型则可以这么实现:

  1. public class MyType implements WritableComparable<MyType> {

  2.  

  3.    private float x,y;

  4.    public float GetX(){return x;}

  5.    public float GetY(){return y;}

  6.  

  7.       @Override

  8.       public void readFields(DataInput in) throws IOException {

  9.          x = in.readFloat();

  10.          y = in.readFloat();

  11.       }

  12.  

  13.       @Override

  14.       public void write(DataOutput out) throws IOException {

  15.          out.writeFloat(x);

  16.          out.writeFloat(y);

  17.       }

  18.  

  19.       @Override

  20.       public int compareTo(MyType arg0) {

  21.          //输入:-1(小于) 0(等于) 1(大于)

  1.          return 0;

  2.       }

  3.    }

这个示例中,我们添加了两个float变量:x,y 。 这个信息能过int 和out按次序进行输入输出。最后,再实现一个比较函数即可。

 

Job任务的创建

  1. Job job = new Job(conf, "word count");

  2.    job.setJarByClass(WordCount.class);

  3.    job.setInputFormatClass(MyInputFormat.class);

  4.    job.setMapperClass(TokenizerMapper.class);

  5.    job.setCombinerClass(IntSumReducer.class);

  6.    job.setPartitionerClass(MyPartitioner.class);

  7.    job.setReducerClass(IntSumReducer.class);

  8.    job.setOutputFormatClass(TextOutputFormat.class);

  9.    job.setOutputKeyClass(Text.class);

  10.    job.setOutputValueClass(IntWritable.class);

  11.    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

  12.    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

任务创建比较容易,其实就是new一个实例,然后把上面描述的过程类设置好,然后加上第2行中,jar包的主类,第10、11行的输入输出路径。这样就完事了。

 

Job任务的执行

单个任务的执行,没有什么问题,可以用这个:

  1. job.waitForCompletion(true);

但多个任务呢? 多个任务的话,就会形成其组织方式,有串行,有并行,有无关,有组合的,如下图:

图片 2

图中,Job2和Job3将会等Job1执行完了再执行,且可以同时开始,而Job4必须等Job2和Job3同时结束后才结束。

这个组合,就可以采用这样的代码来实现:

  1. Configuration conf = new Configuration();

  2.       Job job1 = new Job(conf, "job1");

  3.       //.. config Job1

  4.       Job job2 = new Job(conf, "job2");

  5.       //.. config Job2

  6.       Job job3 = new Job(conf, "job3");

  7.       //.. config Job3

  8.       Job job4 = new Job(conf, "job4");

  9.       //.. config Job4

  10.  

  11.       //添加依赖关系

  12.       job2.addDependingJob(job1);

  1.       job3.addDependingJob(job1);
  1.       job4.addDependingJob(job2);
  1.       job4.addDependingJob(job3);
  1.  

  2.       JobControl jc = new JobControl("jbo name");

  3.       jc.addJob(job1);

  4.       jc.addJob(job2);

  5.       jc.addJob(job3);

  6.       jc.addJob(job4);

  7.       jc.run();

 

总述

现在回头看看,其实整个hadoop编程,也就是这几块内容了,要实现某个功能,我们就往上面这些步骤上套,然后联起来执行,达到我们的目的。

本文由今晚开什么码发布于关于计算机,转载请注明出处:Hadoop实现数据库表关联,高级编程

关键词:

上一篇:过程使用
下一篇:没有了