设为首页 收藏本站
查看: 548|回复: 0

[经验分享] 在Eclipse下开发Hadoop程序并提交到集群中运行

[复制链接]

尚未签到

发表于 2015-7-14 10:28:34 | 显示全部楼层 |阅读模式
  本文假设Hadoop集群环境已经搭建成功,在此基础上讲解怎样在Eclipse上开发Hadoop程序后,提交到集群中运行,并通过Hadoop管理界面观察任务的执行情况。所使用的Eclipse版本是Spring组织推出的IDE工具 Spring tool suite ,可以到Spring组织的官网上下载。
  
  一、配置Eclipse连接到HDFS
  1、把Hadoop的Eclipse插件工具hadoop-eclipse-plugin-1.0.3.jar放入Eclipse的插件目录下 sts-3.2.0.RELEASE\plugins\
  2、重启Eclipse后,会发现界面上多了一种视图:Map/Reduce
DSC0000.png
  3、打开Map/Reduce Locations视图工具界面
DSC0001.png
  4、新建一个Hadoop本地连接
DSC0002.png
  5、配置连接属性
DSC0003.png
  6、配置完成后,在Project Explorer视图中可以看到刚才配置的连接项,从这里可以看到远程Hadoop集群的HDFS文件。至此配置完成。
DSC0004.png
  
  二、在Eclipse下提交任务到Hadoop集群中运行
  1、修改本地windows系统下的hosts
  (1)以管理员省份打开记事本
DSC0005.png
  (2)在记事本下打开C:\Windows\System32\drivers\etc\hosts文件
DSC0006.png
  (3)编辑该文件,在文件头加上hadoop集群的的配置信息,例如以下的形式:
  192.168.100.1 namenode
  192.168.100.2 datanode1
  192.168.100.3 datanode2
  192.168.100.4 datanode3
  
2、在Eclipse下新建一个普通的Java工程,并新建src、conf、lib 三个目录,如下图
DSC0007.png
  3、把hadoop开发包的lib目录下(hadoop-1.0.3\lib)的所有jar包拷贝到工程的lib目录下,并加到classpath下
  4、把hadoop开发包的conf目录下(hadoop-1.0.3\conf)的core-site.xml、hdfs-site.xml、mapred-site.xml三个文件拷贝到本工程的conf目录下
  (1)core-site.xml内容如下:








fs.default.name
hdfs://namenode:9000/


hadoop.tmp.dir
/usr/hadoop/tmp/


  (2)hdfs-site.xml内容如下:








dfs.replication
3


dfs.permissions
false


dfs.support.append
true


  (3)mapred-site.xml内容如下:








mapred.job.tracker
namenode:9001


mapred.map.tasks
16


mapred.reduce.tasks
5


mapred.tasktracker.map.tasks.maximum
20


mapred.tasktracker.reduce.tasks.maximum
20


  5、在src目录下新建com目录,把以下两个类放入com目录下
  (1)EJob.java



import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Array;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.jar.JarEntry;
import java.util.jar.JarFile;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
public class EJob {
private static ArrayList classPath = new ArrayList();
/** Unpack a jar file into a directory. */
public static void unJar(File jarFile, File toDir) throws IOException {
JarFile jar = new JarFile(jarFile);
try {
Enumeration entries = jar.entries();
while (entries.hasMoreElements()) {
JarEntry entry = (JarEntry) entries.nextElement();
if (!entry.isDirectory()) {
InputStream in = jar.getInputStream(entry);
try {
File file = new File(toDir, entry.getName());
if (!file.getParentFile().mkdirs()) {
if (!file.getParentFile().isDirectory()) {
throw new IOException("Mkdirs failed to create " + file.getParentFile().toString());
}
}
OutputStream out = new FileOutputStream(file);
try {
byte[] buffer = new byte[8192];
int i;
while ((i = in.read(buffer)) != -1) {
out.write(buffer, 0, i);
}
} finally {
out.close();
}
} finally {
in.close();
}
}
}
} finally {
jar.close();
}
}
/**
* Run a Hadoop job jar. If the main class is not in the jar's manifest,
* then it must be provided on the command line.
*/
public static void runJar(String[] args) throws Throwable {
String usage = "jarFile [mainClass] args...";
if (args.length < 1) {
System.err.println(usage);
System.exit(-1);
}
int firstArg = 0;
String fileName = args[firstArg++];
File file = new File(fileName);
String mainClassName = null;
JarFile jarFile;
try {
jarFile = new JarFile(fileName);
} catch (IOException io) {
throw new IOException("Error opening job jar: " + fileName).initCause(io);
}
Manifest manifest = jarFile.getManifest();
if (manifest != null) {
mainClassName = manifest.getMainAttributes().getValue("Main-Class");
}
jarFile.close();
if (mainClassName == null) {
if (args.length < 2) {
System.err.println(usage);
System.exit(-1);
}
mainClassName = args[firstArg++];
}
mainClassName = mainClassName.replaceAll("/", ".");
File tmpDir = new File(System.getProperty("java.io.tmpdir"));
tmpDir.mkdirs();
if (!tmpDir.isDirectory()) {
System.err.println("Mkdirs failed to create " + tmpDir);
System.exit(-1);
}
final File workDir = File.createTempFile("hadoop-unjar", "", tmpDir);
workDir.delete();
workDir.mkdirs();
if (!workDir.isDirectory()) {
System.err.println("Mkdirs failed to create " + workDir);
System.exit(-1);
}
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
fullyDelete(workDir);
} catch (IOException e) {
}
}
});
unJar(file, workDir);
classPath.add(new File(workDir + "/").toURL());
classPath.add(file.toURL());
classPath.add(new File(workDir, "classes/").toURL());
File[] libs = new File(workDir, "lib").listFiles();
if (libs != null) {
for (int i = 0; i < libs.length; i++) {
classPath.add(libs.toURL());
}
}
ClassLoader loader = new URLClassLoader(classPath.toArray(new URL[0]));
Thread.currentThread().setContextClassLoader(loader);
Class mainClass = Class.forName(mainClassName, true, loader);
Method main = mainClass.getMethod("main", new Class[] { Array.newInstance(String.class, 0).getClass() });
String[] newArgs = Arrays.asList(args).subList(firstArg, args.length).toArray(new String[0]);
try {
main.invoke(null, new Object[] { newArgs });
} catch (InvocationTargetException e) {
throw e.getTargetException();
}
}
/**
* Delete a directory and all its contents. If we return false, the
* directory may be partially-deleted.
*/
public static boolean fullyDelete(File dir) throws IOException {
File contents[] = dir.listFiles();
if (contents != null) {
for (int i = 0; i < contents.length; i++) {
if (contents.isFile()) {
if (!contents.delete()) {
return false;
}
} else {
// try deleting the directory
// this might be a symlink
boolean b = false;
b = contents.delete();
if (b) {
// this was indeed a symlink or an empty directory
continue;
}
// if not an empty directory or symlink let
// fullydelete handle it.
if (!fullyDelete(contents)) {
return false;
}
}
}
}
return dir.delete();
}
/**
* Add a directory or file to classpath.
*
* @param component
*/
public static void addClasspath(String component) {
if ((component != null) && (component.length() > 0)) {
try {
File f = new File(component);
if (f.exists()) {
URL key = f.getCanonicalFile().toURL();
if (!classPath.contains(key)) {
classPath.add(key);
}
}
} catch (IOException e) {
}
}
}
/**
* Add default classpath listed in bin/hadoop bash.
*
* @param hadoopHome
*/
public static void addDefaultClasspath(String hadoopHome) {
// Classpath initially contains conf dir.
addClasspath(hadoopHome + "/conf");
// For developers, add Hadoop classes to classpath.
addClasspath(hadoopHome + "/build/classes");
if (new File(hadoopHome + "/build/webapps").exists()) {
addClasspath(hadoopHome + "/build");
}
addClasspath(hadoopHome + "/build/test/classes");
addClasspath(hadoopHome + "/build/tools");
// For releases, add core hadoop jar & webapps to classpath.
if (new File(hadoopHome + "/webapps").exists()) {
addClasspath(hadoopHome);
}
addJarsInDir(hadoopHome);
addJarsInDir(hadoopHome + "/build");
// Add libs to classpath.
addJarsInDir(hadoopHome + "/lib");
addJarsInDir(hadoopHome + "/lib/jsp-2.1");
addJarsInDir(hadoopHome + "/build/ivy/lib/Hadoop/common");
}
/**
* Add all jars in directory to classpath, sub-directory is excluded.
*
* @param dirPath
*/
public static void addJarsInDir(String dirPath) {
File dir = new File(dirPath);
if (!dir.exists()) {
return;
}
File[] files = dir.listFiles();
if (files == null) {
return;
}
for (int i = 0; i < files.length; i++) {
if (files.isDirectory()) {
continue;
} else {
addClasspath(files.getAbsolutePath());
}
}
}
/**
* Create a temp jar file in "java.io.tmpdir".
*
* @param root
* @return
* @throws IOException
*/
public static File createTempJar(String root) throws IOException {
if (!new File(root).exists()) {
return null;
}
Manifest manifest = new Manifest();
manifest.getMainAttributes().putValue("Manifest-Version", "1.0");
final File jarFile = File.createTempFile("EJob-", ".jar", new File(System.getProperty("java.io.tmpdir")));
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
jarFile.delete();
}
});
JarOutputStream out = new JarOutputStream(new FileOutputStream(jarFile), manifest);
createTempJarInner(out, new File(root), "");
out.flush();
out.close();
return jarFile;
}
private static void createTempJarInner(JarOutputStream out, File f, String base) throws IOException {
if (f.isDirectory()) {
File[] fl = f.listFiles();
if (base.length() > 0) {
base = base + "/";
}
for (int i = 0; i < fl.length; i++) {
createTempJarInner(out, fl, base + fl.getName());
}
} else {
out.putNextEntry(new JarEntry(base));
FileInputStream in = new FileInputStream(f);
byte[] buffer = new byte[1024];
int n = in.read(buffer);
while (n != -1) {
out.write(buffer, 0, n);
n = in.read(buffer);
}
in.close();
}
}
/**
* Return a classloader based on user-specified classpath and parent
* classloader.
*
* @return
*/
public static ClassLoader getClassLoader() {
ClassLoader parent = Thread.currentThread().getContextClassLoader();
if (parent == null) {
parent = EJob.class.getClassLoader();
}
if (parent == null) {
parent = ClassLoader.getSystemClassLoader();
}
return new URLClassLoader(classPath.toArray(new URL[0]), parent);
}
}
  
  (2)WordCount.java



import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class WordCount {
public static class Map extends MapReduceBase implements Mapper {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
output.collect(word, one);
}
}
}
public static class Reduce extends MapReduceBase implements Reducer {
public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {
int sum = 0;
while (values.hasNext()) {
sum += values.next().get();
}
output.collect(key, new IntWritable(sum));
}
}
public static void main(String[] args) throws Exception {
File jarFile = EJob.createTempJar(WordCount.class.getClassLoader().getResource("").getPath());
ClassLoader classLoader = EJob.getClassLoader();
Thread.currentThread().setContextClassLoader(classLoader);
Configuration conf = new Configuration();
Job job = new Job(conf, "WordCount");
((JobConf) job.getConfiguration()).setJar(jarFile.toString());
job.setJarByClass(WordCount.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, "hdfs://192.168.100.1:9000/trade"));
FileOutputFormat.setOutputPath(conf, "hdfs://192.168.100.1:9000/temp"));
JobClient.runJob(conf);
}
}
  6、运行WordCount.java类,在 http://namenode:50030  页面下可以看到提交的任务
  
  注:如果提示FileUtil.java类有误,请把错误信息拷贝到网上查找正确的FileUtil类。
  Hadoop默认HTTP端口号以及HTTP地址

Hadoop模块守护进程默认端口号配置参数
HDFSNameNode50070dfs.http.address
SecondNameNode50090dfs.secondary.htttp.address
DataNodes50075dfs.datanode.http.addres
MapReduceJobTracker50030mapred.job.tracker.http.address
TaskTracker50060mapred.task.tracker.http.address

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-86595-1-1.html 上篇帖子: hadoop配置Hadoop 2.0--分布式环境搭建安装配置 下篇帖子: HDFS超租约异常总结(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表