设为首页 收藏本站
查看: 1196|回复: 0

[经验分享] Hadoop MapReduce编程 API入门系列之join(二十六)(未完)

[复制链接]

尚未签到

发表于 2017-12-18 12:18:59 | 显示全部楼层 |阅读模式
  不多说,直接上代码。
DSC0000.png

DSC0001.png

  天气记录数据库

Station> DSC0002.png   气象站数据库

Station> DSC0003.png   气象站和天气记录合并之后的示意图如下所示。

Station>   连接操作的具体实现技术取决于数据集的规模及分区方式。如果一个数据集很大而另外一个集合很小,以至于可以分发到集群中的每一个节点之中, 则可以执行一个 MapReduce 作业,将各个气象站的天气记录放到一块,从而实现连接。mapper 或 reducer 根据各气象站>  连接操作如果由 mapper 执行,则称为 “map 端连接” ;如果由 reducer 执行,则称为 “reduce 端连接”。
  如果两个数据集的规模均很大,以至于没有哪个数据集可以被完全复制到集群的每个节点,我们仍然可以使用 MapReduce 来进行连接,至于到底采用 map 端连接还是 reduce 端连接,则取决于数据的组织方式。

map 端连接
  在两个大规模输入数据集之间的 map 端连接会在数据达到 map 函数之前就执行连接操作。为达到该目的,各 map 的输入数据必须先分区并且以特定方式排序。 各个输入数据集被划分成相同数量的分区,并且均按相同的键(连接键)排序。同一键的所有记录均会放在同一分区之中。听起来似乎要求非常严格,但这的确合乎 MapReduce 作业的输出。
  map 端连接操作可以连接多个作业的输出,只要这些作业的 reducer 数量相同、键相同并且输出文件是不可切分的(例如,小于一个 HDFS 块,或 gzip 压缩)。 在上面讲的天气例子中,如果气象站文件以气象站ID部分排序,记录文件也以气象站>  利用 org.apache.hadoop.mapreduce.join 包中的 CompositeInputFormat 类来运行一个 map 端连接。CompositeInputFormat 类的输入源和连接类型(内连接或外连接)可以通过一个连接表达式进行配置, 连接表达式的语法简单。详情与示例可参考包文档。此种方法不常用,了解即可,这里不再赘述。

reduce 端连接
  由于 reduce 端连接并不要求输入数据集符合特定结构,因而 reduce端连接比 map 端连接更为常用。但是,由于两个数据集均需经过 MapReduce 的 shuffle 过程, 所以 reduce 端连接的效率往往要低一些。基本思路是 mapper 为各个记录标记源,并且使用连接键作为 map 输出键,使键相同的记录放在同一 reducer 中。 我们通过下面两种技术实现 reduce 端连接。
  1、多输入
  数据集的输入源往往有多种格式,因此可以使用 MultipleInputs 类来方便地解析和标注各个数据源。MultipleInputs 的用法,在输入格式课程已经详细介绍,这里就不再赘述。
  2、二次排序
  如前所述,reducer 将两个数据源中选出键相同的记录并不介意这些记录是否已排好序。此外,为了更好地执行连接操作,先将某一个数据源传输到 reducer 会非常重要。 还以上面的天气数据连接为例,当天气记录发送到 reducer 的时候,与这些记录有相同键的气象站信息最好也已经放在 reducer ,使得 reducer 能够将气象站名称填到天气记录之中就马上输出。 虽然也可以不指定数据传输次序,并将待处理的记录缓存在内存之中,但应该尽量避免这种情况,因为其中任何一组的记录数量可能非常庞大,远远超出 reducer 的可用内存容量。 因此我们用到二次排序技术,对 map 阶段输出的每个键的值进行排序,实现这一效果。
  我们使用 TextPair 类构建组合键,包括气象站>  JoinStationMapper 处理来自气象站数据,代码如下所示。
  

public>
protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
  
String line = value.toString();
  
String[] arr = line.split("\\s+");//解析气象站数据
  
int length = arr.length;
  
if(length==2){//满足这种数据格式
  
//key=气象站id  value=气象站名称
  
context.write(new TextPair(arr[0],"0"),new Text(arr[1]));
  
}
  
}
  
}
  

  JoinRecordMapper 处理来自天气记录数据,代码如下所示。
  

public>
protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
  
String line = value.toString();
  
String[] arr = line.split("\\s+",2);//解析天气记录数据
  
int length = arr.length;
  
if(length==2){
  
//key=气象站id  value=天气记录数据
  
context.write(new TextPair(arr[0],"1"),new Text(arr[1]));
  
}  
  
}
  
}
  

  由于 TextPair 经过了二次排序,所以 reducer 会先接收到气象站数据。因此从中抽取气象站名称,并将其作为后续每条输出记录的一部分写到输出文件。JoinReducer 的代码如下所示。
  

public>
protected void reduce(TextPair key, Iterable< Text> values,Context context) throws IOException,InterruptedException{
  
Iterator< Text> iter = values.iterator();
  
Text stationName = new Text(iter.next());//气象站名称
  
while(iter.hasNext()){
  
Text record = iter.next();//天气记录的每条数据
  
Text outValue = new Text(stationName.toString()+"\t"+record.toString());
  
context.write(key.getFirst(),outValue);
  
}
  
}        
  
}
  


  上面 JoinReducer 里面的代码,假设天气记录的每个气象站>
  下面我们定义作业的驱动类 JoinRecordWithStationName,在该类中,关键在于根据组合键的第一个字段(即气象站>  

public>
public static>  
@overwrite
  
public int getPartition(TextPair key,Text value,int numPartitions){
  
return (key.getFirst().hashCode()&Integer.MAX_VALUE) % numPartitions;
  
}
  
}
  

  
@overwrite
  
public int run(String[] args) throws Exception{
  
Configuration conf = new Configuration();// 读取配置文件
  

  
Job job = Job.getInstance();// 新建一个任务
  
job.setJarByClass(JoinRecordWithStationName.class);// 主类
  

  
Path recordInputPath = new Path(args[0]);//天气记录数据源
  
Path stationInputPath = new Path(args[1]);//气象站数据源
  
Path outputPath = new Path(args[2]);//输出路径
  

  
MultipleInputs.addInputPath(job,recordInputPath,TextInputFormat.class,JoinRecordMapper.class);//读取天气记录Mapper
  
MultipleInputs.addInputPath(job,stationInputPath,TextInputFormat.class,JoinStationMapper.class);//读取气象站Mapper
  
FileOutputFormat.setOutputPath(job,outputPath);
  

  
job.setPartitionerClass(KeyPartitioner.class);//自定义分区
  
job.setGroupingComparatorClass(TextPair.FirstComparator.class);//自定义分组
  

  
job.setMapOutputKeyClass(TextPair.class);
  

  
job.setReducerClass(JoinReducer.class);// Reducer
  

  
job.setOutputKeyClass(Text.class);
  

  
return job.waitForCompletion(true)?0:1;
  
}
  

  
public static void main(String[] args) throws Exception{
  
int exitCode = ToolRunner.run(new JoinRecordWithStationName(),args);
  
System.exit(exitCode);
  
}
  
}
  

  该样本数据上运行程序,获得以下输出结果。
  

011990-99999SIHCCAJAVRI1950051507000  
011990-99999SIHCCAJAVRI19500515120022
  
011990-99999SIHCCAJAVRI195005151800-11
  
012650-99999TYNSET-HANSMOEN194903241200111
  
012650-99999TYNSET-HANSMOEN19490324180078
  


分布式缓存
  当 MapReduce 处理大型数据集间的 join 操作时,此时如果一个数据集很大而另外一个集合很小,以至于可以分发到集群中的每个节点之中。 这种情况下,我们就用到了 Hadoop 的分布式缓存机制,它能够在任务运行过程中及时地将文件和存档复制到任务节点以供使用。为了节约网络宽带,在每一个作业中, 各个文件通常只需要复制到一个节点一次。
  1、用法
  Hadoop 命令行选项中,有三个命令可以实现文件复制分发到任务的各个节点。
  1)用户可以使用 -files 选项指定待分发的文件,文件内包含以逗号隔开的 URL 列表。文件可以存放在本地文件系统、HDFS、或其它 Hadoop 可读文件系统之中。 如果尚未指定文件系统,则这些文件被默认是本地的。即使默认文件系统并非本地文件系统,这也是成立的。
  2)用户可以使用 -archives 选项向自己的任务中复制存档文件,比如JAR 文件、ZIP 文件、tar 文件和 gzipped tar文件,这些文件会被解档到任务节点。
  3)用户可以使用 -libjars 选项把 JAR 文件添加到 mapper 和 reducer 任务的类路径中。如果作业 JAR 文件并非包含很多库 JAR 文件,这点会很有用。
  2、工作机制
  当用户启动一个作业,Hadoop 会把由 -files、-archives、和 -libjars 等选项所指定的文件复制到分布式文件系统之中。接着,在任务运行之前, tasktracker 将文件从分布式文件系统复制到本地磁盘(缓存)使任务能够访问文件。此时,这些文件就被视为 “本地化” 了。从任务的角度来看, 这些文件就已经在那儿了,它并不关心这些文件是否来自 HDFS 。此外,有 -libjars 指定的文件会在任务启动前添加到任务的类路径(classpath)中。
  3、分布式缓存 API
  由于可以通过 Hadoop 命令行间接使用分布式缓存,大多数应用不需要使用分布式缓存 API。然而,一些应用程序需要用到分布式缓存的更高级的特性,这就需要直接使用 API 了。 API 包括两部分:将数据放到缓存中的方法,以及从缓存中读取数据的方法。
  1)首先掌握数据放到缓存中的方法,以下列举 Job 中可将数据放入到缓存中的相关方法:
  

public void addCacheFile(URI uri);  
public void addCacheArchive(URI uri);//以上两组方法将文件或存档添加到分布式缓存
  
public void setCacheFiles(URI[] files);
  
public void setCacheArchives(URI[] archives);//以上两组方法将一次性向分布式缓存中添加一组文件或存档
  
public void addFileToClassPath(Path file);
  
public void addArchiveToClassPath(Path archive);//以上两组方法将文件或存档添加到 MapReduce 任务的类路径
  
public void createSymlink();
  在缓存中可以存放两类对象:文件(files)和存档(achives)。文件被直接放置在任务节点上,而存档则会被解档之后再将具体文件放置在任务节点上。
  

  2)其次掌握在 map 或者 reduce 任务中,使用 API 从缓存中读取数据。
  

public Path[] getLocalCacheFiles() throws IOException;  
public Path[] getLocalCacheArchives() throws IOException;
  
public Path[] getFileClassPaths();
  
public Path[] getArchiveClassPaths();
  

  我们可以使用 getLocalCacheFiles()和getLocalCacheArchives()方法获取缓存中的文件或者存档的引用。 当处理存档时,将会返回一个包含解档文件的目的目录。相应的,用户可以通过 getFileClassPaths()和getArchivesClassPaths()方法获取被添加到任务的类路径下的文件和文档。
  下面我们仍然以前面的气象站数据和天气记录数据为例,使用分布式缓存 API,完成两个数据集的连接操作。完整的 MapReduce 程序如下所示。
  

import java.io.BufferedReader;  
import java.io.FileNotFoundException;
  
import java.io.IOException;
  
import java.io.InputStreamReader;
  
import java.net.URI;
  
import java.util.Hashtable;
  
import org.apache.hadoop.conf.Configuration;
  
import org.apache.hadoop.conf.Configured;
  
import org.apache.hadoop.fs.FSDataInputStream;
  
import org.apache.hadoop.fs.FileSystem;
  
import org.apache.hadoop.fs.Path;
  
import org.apache.hadoop.io.LongWritable;
  
import org.apache.hadoop.io.Text;
  
import org.apache.hadoop.mapreduce.Job;
  
import org.apache.hadoop.mapreduce.Mapper;
  
import org.apache.hadoop.mapreduce.Reducer;
  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  
import org.apache.hadoop.util.Tool;
  
import org.apache.hadoop.util.ToolRunner;
  


  
public>  


  
public static>  
Mapper< LongWritable, Text, Text, Text> {
  
public void map(LongWritable key, Text value, Context context)
  
throws IOException, InterruptedException {
  
String[] arr = value.toString().split("\t", 2);
  
if (arr.length == 2) {
  
context.write(new Text(arr[0]), value);
  
}
  

  
}
  
}
  


  
public static>  
Reducer< Text, Text, Text, Text> {
  
private Hashtable< String, String> table = new Hashtable< String, String>();//定义Hashtable存放缓存数据
  

  
/**
  
* 获取分布式缓存文件
  
*/
  
protected void setup(Context context) throws IOException,
  
InterruptedException {
  
Path[] localPaths = (Path[]) context.getLocalCacheFiles();//返回本地文件路径
  
if (localPaths.length == 0) {
  
throw new FileNotFoundException(
  
"Distributed cache file not found.");
  
}
  
FileSystem fs = FileSystem.getLocal(context.getConfiguration());//获取本地 FileSystem 实例
  
FSDataInputStream in = null;
  

  
in = fs.open(new Path(localPaths[0].toString()));// 打开输入流
  
BufferedReader br = new BufferedReader(new InputStreamReader(in));// 创建BufferedReader读取器
  
String infoAddr = null;
  
while (null != (infoAddr = br.readLine())) {// 按行读取并解析气象站数据
  
String[] records = infoAddr.split("\t");
  
table.put(records[0], records[1]);//key为stationID,value为stationName
  
}
  
}
  

  
public void reduce(Text key, Iterable< Text> values, Context context)
  
throws IOException, InterruptedException {
  
String stationName = table.get(key.toString());//天气记录根据stationId 获取stationName
  
for (Text value : values) {
  
context.write(new Text(stationName), value);
  
}
  
}
  
}
  

  
@Override
  
public int run(String[] args) throws Exception {
  
// TODO Auto-generated method stub
  
Configuration conf = new Configuration();
  

  
FileSystem hdfs = FileSystem.get(new URI(
  
"hdfs://single.hadoop.dajiangtai.com:9000"), conf);
  
Path out = new Path(args[1]);
  
if (hdfs.isDirectory(out)) {
  
hdfs.delete(out, true);
  
}
  

  
Job job = Job.getInstance();//获取一个job实例
  
job.setJarByClass(JoinRecordWithStationName.class);
  

  
FileInputFormat.addInputPath(job,
  
new org.apache.hadoop.fs.Path(args[0]));
  
FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(
  
args[1]));
  
//添加分布式缓存文件 station.txt
  
job.addCacheFile(new URI("hdfs://HadoopMaster:9000/middle/temperature/station.txt"));
  

  
job.setMapperClass(TemperatureMapper.class);
  
job.setReducerClass(TemperatureReducer.class);
  

  
job.setOutputKeyClass(Text.class);// 输出key类型
  
job.setOutputValueClass(Text.class);// 输出value类型
  

  
return job.waitForCompletion(true)?0:1;
  
}
  

  
public static void main(String[] args) throws Exception {
  
String[] arg = {
  
"hdfs://HadoopMaster:9000/middle/temperature/records.txt",
  
"hdfs://HadoopMaster:9000/middle/temperature/out/" };
  
int ec = ToolRunner.run(new Configuration(),
  
new JoinRecordWithStationName(), arg);
  
System.exit(ec);
  
}
  
}
  

  添加分布式缓存文件相对简单,只需使用job.addCacheFile(new URI(cacheFilePath))方法添加缓存文件即可。需要注意的是,在获取获取缓存文件时,文件将以 “本地的” Path 对象的形式返回。为了读取文件,用户需要首先使用 getLocal()方法获得一个 Hadoop 本地 FileSystem 实例。本程序中,我们在 Reduce 的 setup() 方法中获取缓存文件。
  以下是示例数据集的输出结果,达到我们预期的效果。
  

SIHCCAJAVRI011990-99999195005151800-11  
SIHCCAJAVRI011990-9999919500515120022
  
SIHCCAJAVRI011990-999991950051507000
  
TRNSET-HANSMOEN012650-9999919490324180078
  
TRNSET-HANSMOEN012650-99999194903241200111
  

DSC0004.png

DSC0005.png

DSC0006.png

DSC0007.png

DSC0008.png

DSC0009.png

DSC00010.png

DSC00011.png

  得到结果
  

SIHCCAJAVRI011990-99999195005151800-11  
SIHCCAJAVRI011990-9999919500515120022
  
SIHCCAJAVRI011990-999991950051507000
  
TRNSET-HANSMOEN012650-9999919490324180078
  
TRNSET-HANSMOEN012650-99999194903241200111
  

  代码版本1
  package zhouls.bigdata.myMapReduce.Join;
  import java.util.Set;
  import java.io.*;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.io.WritableComparable;

  public>  {
  private Text first; //Text 类型的实例变量first
  private Text second;//Text 类型的实例变量second
  public TextPair() //无参构造方法
  {
  set(new Text(),new Text());
  }
  public TextPair(String first,String second)  // Sting类型参数的构造方法
  {
  set(new Text(first),new Text(second));
  }
  public TextPair(Text first,Text second)  // Text类型参数的构造方法
  {
  set(first,second);
  }
  public void set(Text first,Text second) //set方法
  {
  this.first=first;
  this.second=second;
  }
  public Text getFirst() //getFirst方法
  {
  return first;
  }
  public Text getSecond() //getSecond方法
  {
  return second;
  }
  //将对象转换为字节流并写入到输出流out中
  public void write(DataOutput out) throws IOException //write方法
  {
  first.write(out);
  second.write(out);
  }
  //从输入流in中读取字节流反序列化为对象
  public void readFields(DataInput in) throws IOException //readFields方法
  {
  first.readFields(in);
  second.readFields(in);
  }
  @Override
  public int hashCode() //在mapreduce中,通过hashCode来选择reduce分区
  {
  return first.hashCode() *163+second.hashCode();
  }
  @Override
  public boolean equals(Object o) //equals方法,这里是两个对象的内容之间比较
  {
  if (o instanceof TextPair)
  {
  TextPair tp=(TextPair) o;
  return first.equals(tp.first) && second.equals(tp.second);
  }
  return false;
  }
  @Override
  public String toString() //toString方法
  {
  return first +"\t"+ second;
  }
  public int compareTo(TextPair o)
  {
  // TODO Auto-generated method stub
  if(!first.equals(o.first))
  {
  return first.compareTo(o.first);
  }
  else if(!second.equals(o.second))
  {
  return second.compareTo(o.second);
  }
  return 0;
  }
  }
  package zhouls.bigdata.myMapReduce.Join;
  import java.io.IOException;
  import org.apache.hadoop.io.LongWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Mapper;
  import zhouls.bigdata.myMapReduce.Join.TextPair;

  public>  {  
  protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
  {
  String line = value.toString();
  String[] arr = line.split("\\s+");//解析气象站数据
  int length = arr.length;
  if(length==2)
  {//满足这种数据格式
  //key=气象站id  value=气象站名称
  System.out.println("station="+arr[0]+"0");
  context.write(new TextPair(arr[0],"0"),new Text(arr[1]));
  }
  }
  }
  package zhouls.bigdata.myMapReduce.Join;
  import java.io.IOException;
  import java.util.Iterator;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Reducer;

  public>  {
  protected void reduce(TextPair key, Iterable< Text> values,Context context) throws IOException,InterruptedException
  {
  Iterator< Text> iter = values.iterator();
  Text stationName = new Text(iter.next());//气象站名称
  while(iter.hasNext()){
  Text record = iter.next();//天气记录的每条数据
  Text outValue = new Text(stationName.toString()+"\t"+record.toString());
  context.write(key.getFirst(),outValue);
  }
  }        
  }
  package zhouls.bigdata.myMapReduce.Join;
  import java.io.IOException;
  import org.apache.hadoop.io.LongWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Mapper;

  public>  {
  protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
  {
  String line = value.toString();
  String[] arr = line.split("\\s+",2);//解析天气记录数据
  int length = arr.length;
  if(length==2){
  //key=气象站id  value=天气记录数据
  context.write(new TextPair(arr[0],"1"),new Text(arr[1]));
  }  
  }
  }
  package zhouls.bigdata.myMapReduce.Join;
  import java.io.BufferedReader;
  import java.io.FileNotFoundException;
  import java.io.IOException;
  import java.io.InputStreamReader;
  import java.net.URI;
  import java.util.Hashtable;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.conf.Configured;
  import org.apache.hadoop.fs.FSDataInputStream;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.io.LongWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Job;
  import org.apache.hadoop.mapreduce.Mapper;
  import org.apache.hadoop.mapreduce.Reducer;
  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  import org.apache.hadoop.util.Tool;
  import org.apache.hadoop.util.ToolRunner;

  public>  {

  public static>  {
  public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
  {
  String[] arr = value.toString().split("\t", 2);
  if (arr.length == 2)
  {
  context.write(new Text(arr[0]), value);
  }
  }
  }

  public static>  {
  private Hashtable< String, String> table = new Hashtable< String, String>();//定义Hashtable存放缓存数据
  /**
  * 获取分布式缓存文件
  */
  protected void setup(Context context) throws IOException,
  InterruptedException
  {
  Path[] localPaths = (Path[]) context.getLocalCacheFiles();//返回本地文件路径
  if (localPaths.length == 0)
  {
  throw new FileNotFoundException("Distributed cache file not found.");
  }
  FileSystem fs = FileSystem.getLocal(context.getConfiguration());//获取本地 FileSystem 实例
  FSDataInputStream in = null;
  in = fs.open(new Path(localPaths[0].toString()));// 打开输入流
  BufferedReader br = new BufferedReader(new InputStreamReader(in));// 创建BufferedReader读取器
  String infoAddr = null;
  while (null != (infoAddr = br.readLine()))
  {// 按行读取并解析气象站数据
  String[] records = infoAddr.split("\t");
  table.put(records[0], records[1]);//key为stationID,value为stationName
  }
  }
  public void reduce(Text key, Iterable< Text> values, Context context)
  throws IOException, InterruptedException
  {
  String stationName = table.get(key.toString());//天气记录根据stationId 获取stationName
  for (Text value : values)
  {
  context.write(new Text(stationName), value);
  }
  }
  }
  public int run(String[] args) throws Exception
  {
  // TODO Auto-generated method stub
  Configuration conf = new Configuration();
  //FileSystem hdfs = FileSystem.get(new URI("hdfs://HadoopMaster:9000"), conf);
  //Path out = new Path(args[1]);
  //if (hdfs.isDirectory(out))
  //{
  //hdfs.delete(out, true);
  //}
  Job job = Job.getInstance();//获取一个job实例
  job.setJarByClass(JoinRecordWithStationName.class);
  //FileInputFormat.addInputPath(job,
  //new org.apache.hadoop.fs.Path(args[0]));
  //FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path(args[1]));
  FileInputFormat.addInputPath(job,
  new org.apache.hadoop.fs.Path("./data/join/station.txt"));
  FileOutputFormat.setOutputPath(job, new org.apache.hadoop.fs.Path("./out/join/"));
  //添加分布式缓存文件 station.txt
  //job.addCacheFile(new URI("hdfs://HadoopMaster:9000/join/station.txt"));
  job.addCacheFile(new URI("./data/join/station.txt"));
  job.setMapperClass(TemperatureMapper.class);
  job.setReducerClass(TemperatureReducer.class);
  job.setOutputKeyClass(Text.class);// 输出key类型
  job.setOutputValueClass(Text.class);// 输出value类型
  return job.waitForCompletion(true)?0:1;
  }
  public static void main(String[] args) throws Exception
  {
  //String[] arg = {
  //"hdfs://HadoopMaster:9000/join/records.txt",
  //"hdfs://HadoopMaster:9000/join/out/" };
  //
  String[] arg = {
  "./data/join/records.txt",
  "./out/join/" };
  int ec = ToolRunner.run(new Configuration(),new JoinRecordWithStationName(), arg);
  System.exit(ec);
  }
  }
  package zhouls.bigdata.myMapReduce.Join;

  public>  {
  /**
  * @param args
  */
  public static void main(String[] args)
  {
  // TODO Auto-generated method stub
  }
  }
  代码版本2
  package zhouls.bigdata.myMapReduce.Join;
  import java.util.Set;
  import java.io.*;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.io.WritableComparable;

  public>  {
  private Text first; //Text 类型的实例变量first
  private Text second;//Text 类型的实例变量second
  public TextPair() //无参构造方法
  {
  set(new Text(),new Text());
  }
  public TextPair(String first,String second)  // Sting类型参数的构造方法
  {
  set(new Text(first),new Text(second));
  }
  public TextPair(Text first,Text second)  // Text类型参数的构造方法
  {
  set(first,second);
  }
  public void set(Text first,Text second) //set方法
  {
  this.first=first;
  this.second=second;
  }
  public Text getFirst() //getFirst方法
  {
  return first;
  }
  public Text getSecond() //getSecond方法
  {
  return second;
  }
  //将对象转换为字节流并写入到输出流out中
  public void write(DataOutput out) throws IOException //write方法
  {
  first.write(out);
  second.write(out);
  }
  //从输入流in中读取字节流反序列化为对象
  public void readFields(DataInput in) throws IOException //readFields方法
  {
  first.readFields(in);
  second.readFields(in);
  }
  @Override
  public int hashCode() //在mapreduce中,通过hashCode来选择reduce分区
  {
  return first.hashCode() *163+second.hashCode();
  }
  @Override
  public boolean equals(Object o) //equals方法,这里是两个对象的内容之间比较
  {
  if (o instanceof TextPair)
  {
  TextPair tp=(TextPair) o;
  return first.equals(tp.first) && second.equals(tp.second);
  }
  return false;
  }
  @Override
  public String toString() //toString方法
  {
  return first +"\t"+ second;
  }
  public int compareTo(TextPair o)
  {
  // TODO Auto-generated method stub
  if(!first.equals(o.first))
  {
  return first.compareTo(o.first);
  }
  else if(!second.equals(o.second))
  {
  return second.compareTo(o.second);
  }
  return 0;
  }
  }
  package zhouls.bigdata.myMapReduce.Join;
  import java.io.IOException;
  import org.apache.hadoop.io.LongWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Mapper;
  import zhouls.bigdata.myMapReduce.Join.TextPair;

  public>  {  
  protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
  {
  String line = value.toString();
  String[] arr = line.split("\\s+");//解析气象站数据
  int length = arr.length;
  if(length==2)
  {//满足这种数据格式
  //key=气象站id  value=气象站名称
  System.out.println("station="+arr[0]+"0");
  context.write(new TextPair(arr[0],"0"),new Text(arr[1]));
  }
  }
  }
  package zhouls.bigdata.myMapReduce.Join;
  import java.io.IOException;
  import java.util.Iterator;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Reducer;

  public>  {
  protected void reduce(TextPair key, Iterable< Text> values,Context context) throws IOException,InterruptedException
  {
  Iterator< Text> iter = values.iterator();
  Text stationName = new Text(iter.next());//气象站名称
  while(iter.hasNext()){
  Text record = iter.next();//天气记录的每条数据
  Text outValue = new Text(stationName.toString()+"\t"+record.toString());
  context.write(key.getFirst(),outValue);
  }
  }        
  }
  package zhouls.bigdata.myMapReduce.Join;
  import java.io.IOException;
  import org.apache.hadoop.io.LongWritable;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.mapreduce.Mapper;

  public>  {
  protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException
  {
  String line = value.toString();
  String[] arr = line.split("\\s+",2);//解析天气记录数据
  int length = arr.length;
  if(length==2){
  //key=气象站id  value=天气记录数据
  context.write(new TextPair(arr[0],"1"),new Text(arr[1]));
  }  
  }
  }
  //版本2
  package zhouls.bigdata.myMapReduce.Join;
  import java.io.InputStream;
  import org.apache.hadoop.util.Tool;
  import java.io.OutputStream;
  import java.util.Set;
  import javax.lang.model.SourceVersion;
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.conf.Configured;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.io.WritableComparable;
  import org.apache.hadoop.io.WritableComparator;
  import org.apache.hadoop.mapreduce.Job;
  import org.apache.hadoop.mapreduce.Partitioner;
  import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
  import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  import org.apache.hadoop.util.ToolRunner;

  public>  {

  public static>  {
  public int getPartition(TextPair key,Text value,int numPartitions)
  {
  return (key.getFirst().hashCode()&Integer.MAX_VALUE) % numPartitions;
  }
  }

  public static>  {
  protected GroupingComparator()
  {
  super(TextPair.class,true);
  }
  @Override
  public int compare(WritableComparable w1,WritableComparable w2)
  {
  TextPair ip1=(TextPair) w1;
  TextPair ip2=(TextPair) w2;
  Text l=ip1.getFirst();
  Text r=ip2.getFirst();
  return l.compareTo(r);
  }
  }
  public int run(String[] args) throws Exception
  {
  Configuration conf = new Configuration();// 读取配置文件
  Path mypath=new Path(args[2]);
  FileSystem hdfs=mypath.getFileSystem(conf);
  if (hdfs.isDirectory(mypath))
  {
  hdfs.delete(mypath,true);
  }
  Job job = Job.getInstance(conf,"join");// 新建一个任务
  job.setJarByClass(JoinRecordWithStationName.class);// 主类
  Path recordInputPath = new Path(args[0]);//天气记录数据源,这里是牵扯到多路径输入和多路径输出的问题。默认是从args[0]开始
  Path stationInputPath = new Path(args[1]);//气象站数据源
  Path outputPath = new Path(args[2]);//输出路径
  //若只有一个输入和一个输出,则输入是args[0],输出是args[1]。
  //若有两个输入和一个输出,则输入是args[0]和args[1],输出是args[2]
  MultipleInputs.addInputPath(job,recordInputPath,TextInputFormat.class,JoinRecordMapper.class);//读取天气记录Mapper
  MultipleInputs.addInputPath(job,stationInputPath,TextInputFormat.class,JoinStationMapper.class);//读取气象站Mapper
  FileOutputFormat.setOutputPath(job,outputPath);
  job.setReducerClass(JoinReducer.class);// Reducer
  job.setNumReduceTasks(2);
  job.setPartitionerClass(KeyPartitioner.class);//自定义分区
  job.setGroupingComparatorClass(GroupingComparator.class);//自定义分组
  job.setMapOutputKeyClass(TextPair.class);
  job.setMapOutputValueClass(Text.class);
  job.setOutputKeyClass(Text.class);
  job.setOutputValueClass(Text.class);
  return job.waitForCompletion(true)?0:1;
  }
  public static void main(String[] args) throws Exception
  {
  String[] args0={"hdfs://HadoopMaster:9000/join/records.txt"
  ,"hdfs://HadoopMaster:9000/join/station.txt"
  ,"hdfs://HadoopMaster:9000/join/out"
  };
  int exitCode=ToolRunner.run( new JoinRecordWithStationName(), args0);
  System.exit(exitCode);
  }
  }
  package zhouls.bigdata.myMapReduce.Join;

  public>  {
  /**
  * @param args
  */
  public static void main(String[] args)
  {
  // TODO Auto-generated method stub
  }
  }

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-425359-1-1.html 上篇帖子: Hadoop的ChainMapper和ChainReducer使用案例(链式处理)(四) 下篇帖子: hadoop配置core-site.xml,hdfs-site.xml,mapred-site.xml
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表