设为首页 收藏本站
查看: 744|回复: 0

[经验分享] 玩转大数据系列之如何给Apache Pig自定义存储形式(四)

[复制链接]
累计签到:29 天
连续签到:1 天
发表于 2017-1-13 08:24:38 | 显示全部楼层 |阅读模式
Pig里面内置大量的工具函数,也开放了大量的接口,来给我们开发者使用,通过UDF,我们可以非常方便的完成某些Pig不直接支持或没有的的功能,比如散仙前面几篇文章写的将pig分析完的结果,存储到各种各样的介质里面,而不仅仅局限于HDFS,当然,我们也可以在都存。

那么如何实现自己的存储UDF呢? 提到这里,我们不得不说下pig里面的load和store函数,load函数是从某个数据源,加载数据,一般都是从HDFS上加载,而store函数则是将分析完的结果,存储到HDFS用的,所以,我们只需继承重写store的功能函数StoreFunc即可完成我们的大部分需求,懂的了这个,我们就可以将结果任意存储了,可以存到数据库,也可以存到索引文件,也可以存入本地txt,excel等等

下面先看下StoreFunc的源码:
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.PigStatusReporter;

/**
* StoreFuncs take records from Pig's processing and store them into a data store.  Most frequently
* this is an HDFS file, but it could also be an HBase instance, RDBMS, etc.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class StoreFunc implements StoreFuncInterface {
/**
* This method is called by the Pig runtime in the front end to convert the
* output location to an absolute path if the location is relative. The
* StoreFunc implementation is free to choose how it converts a relative
* location to an absolute location since this may depend on what the location
* string represent (hdfs path or some other data source).
*  
*
* @param location location as provided in the "store" statement of the script
* @param curDir the current working direction based on any "cd" statements
* in the script before the "store" statement. If there are no "cd" statements
* in the script, this would be the home directory -
* <pre>/user/<username> </pre>
* @return the absolute location based on the arguments passed
* @throws IOException if the conversion is not possible
*/
@Override
public String relToAbsPathForStoreLocation(String location, Path curDir)
throws IOException {
return LoadFunc.getAbsolutePath(location, curDir);
}
/**
* Return the OutputFormat associated with StoreFunc.  This will be called
* on the front end during planning and on the backend during
* execution.
* @return the {@link OutputFormat} associated with StoreFunc
* @throws IOException if an exception occurs while constructing the
* OutputFormat
*
*/
public abstract OutputFormat getOutputFormat() throws IOException;
/**
* Communicate to the storer the location where the data needs to be stored.  
* The location string passed to the {@link StoreFunc} here is the
* return value of {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
* This method will be called in the frontend and backend multiple times. Implementations
* should bear in mind that this method is called multiple times and should
* ensure there are no inconsistent side effects due to the multiple calls.
* {@link #checkSchema(ResourceSchema)} will be called before any call to
* {@link #setStoreLocation(String, Job)}.
*
* @param location Location returned by
* {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
* @param job The {@link Job} object
* @throws IOException if the location is not valid.
*/
public abstract void setStoreLocation(String location, Job job) throws IOException;
/**
* Set the schema for data to be stored.  This will be called on the
* front end during planning if the store is associated with a schema.
* A Store function should implement this function to
* check that a given schema is acceptable to it.  For example, it
* can check that the correct partition keys are included;
* a storage function to be written directly to an OutputFormat can
* make sure the schema will translate in a well defined way.  Default implementation
* is a no-op.
* @param s to be checked
* @throws IOException if this schema is not acceptable.  It should include
* a detailed error message indicating what is wrong with the schema.
*/
@Override
public void checkSchema(ResourceSchema s) throws IOException {
// default implementation is a no-op
}
/**
* Initialize StoreFunc to write data.  This will be called during
* execution on the backend before the call to putNext.
* @param writer RecordWriter to use.
* @throws IOException if an exception occurs during initialization
*/
public abstract void prepareToWrite(RecordWriter writer) throws IOException;
/**
* Write a tuple to the data store.
*
* @param t the tuple to store.
* @throws IOException if an exception occurs during the write
*/
public abstract void putNext(Tuple t) throws IOException;
/**
* This method will be called by Pig both in the front end and back end to
* pass a unique signature to the {@link StoreFunc} which it can use to store
* information in the {@link UDFContext} which it needs to store between
* various method invocations in the front end and back end. This method
* will be called before other methods in {@link StoreFunc}.  This is necessary
* because in a Pig Latin script with multiple stores, the different
* instances of store functions need to be able to find their (and only their)
* data in the UDFContext object.  The default implementation is a no-op.
* @param signature a unique signature to identify this StoreFunc
*/
@Override
public void setStoreFuncUDFContextSignature(String signature) {
// default implementation is a no-op
}
/**
* This method will be called by Pig if the job which contains this store
* fails. Implementations can clean up output locations in this method to
* ensure that no incorrect/incomplete results are left in the output location.
* The default implementation  deletes the output location if it
* is a {@link FileSystem} location.
* @param location Location returned by
* {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
* @param job The {@link Job} object - this should be used only to obtain
* cluster properties through {@link Job#getConfiguration()} and not to set/query
* any runtime job information.
*/
@Override
public void cleanupOnFailure(String location, Job job)
throws IOException {
cleanupOnFailureImpl(location, job);
}
/**
* This method will be called by Pig if the job which contains this store
* is successful, and some cleanup of intermediate resources is required.
* Implementations can clean up output locations in this method to
* ensure that no incorrect/incomplete results are left in the output location.
* @param location Location returned by
* {@link StoreFunc#relToAbsPathForStoreLocation(String, Path)}
* @param job The {@link Job} object - this should be used only to obtain
* cluster properties through {@link Job#getConfiguration()} and not to set/query
* any runtime job information.
*/
@Override
public void cleanupOnSuccess(String location, Job job)
throws IOException {
// DEFAULT: DO NOTHING, user-defined overrides can
// call cleanupOnFailureImpl(location, job) or ...?
}
/**
* Default implementation for {@link #cleanupOnFailure(String, Job)}
* and {@link #cleanupOnSuccess(String, Job)}.  This removes a file
* from HDFS.
* @param location file name (or URI) of file to remove
* @param job Hadoop job, used to access the appropriate file system.
* @throws IOException
*/
public static void cleanupOnFailureImpl(String location, Job job)
throws IOException {        
Path path = new Path(location);
FileSystem fs = path.getFileSystem(job.getConfiguration());
if(fs.exists(path)){
fs.delete(path, true);
}   
}
/**
* Issue a warning.  Warning messages are aggregated and reported to
* the user.
* @param msg String message of the warning
* @param warningEnum type of warning
*/
public final void warn(String msg, Enum warningEnum) {
Counter counter = PigStatusReporter.getInstance().getCounter(warningEnum);
counter.increment(1);
}
}


这里面有许多方法,但并不都需要我们重新定义的,一般来说,我们只需要重写如下的几个抽象方法即可:
(1)getOutputFormat方法,与Hadoop的OutFormat对应,在最终的输出时,会根据不同的format方法,生成不同的形式。
(2)setStoreLocation方法,这个方法定义了生成文件的路径,如果不是存入HDFS上,则可以忽略。
(3)prepareToWrite 在写入数据之前做一些初始化工作
(4)putNext从Pig里面传递过来最终需要存储的数据


在1的步骤我们知道,需要提供一个outputFormat的类,这时就需要我们继承hadoop里面的某个outputformat基类,然后重写getRecordWriter方法,接下来我们还可能要继承RecordWriter类,来定义我们自己的输出格式,可能是一行txt数据,也有可能是一个对象,或一个索引集合等等,如下面支持lucene索引的outputformat
package com.pig.support.lucene;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.index.SerialMergeScheduler;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.Version;
/**
* 继承FileOutputFormat,重写支持Lucene格式的outputFormat策略
* */
public class LuceneOutputFormat extends FileOutputFormat<Writable, Document> {
String location;
FileSystem fs;
String taskid;
FileOutputCommitter committer;
AtomicInteger counter = new AtomicInteger();
public LuceneOutputFormat(String location) {
this.location = location;
}
@Override
public RecordWriter<Writable, Document> getRecordWriter(
TaskAttemptContext ctx) throws IOException, InterruptedException {
Configuration conf = ctx.getConfiguration();
fs = FileSystem.get(conf);
File baseDir = new File(System.getProperty("java.io.tmpdir"));
String baseName = System.currentTimeMillis() + "-";
File tempDir = new File(baseDir, baseName + counter.getAndIncrement());
tempDir.mkdirs();
tempDir.deleteOnExit();
return new LuceneRecordWriter(
(FileOutputCommitter) getOutputCommitter(ctx), tempDir);
}
/**
* Write out the LuceneIndex to a local temporary location.<br/>
* On commit/close the index is copied to the hdfs output directory.<br/>
*
*/
static class LuceneRecordWriter extends RecordWriter<Writable, Document> {
final IndexWriter writer;
final FileOutputCommitter committer;
final File tmpdir;
public LuceneRecordWriter(FileOutputCommitter committer, File tmpdir) {
try {
this.committer = committer;
this.tmpdir = tmpdir;
IndexWriterConfig config = new IndexWriterConfig(Version.LUCENE_4_10_2,
new StandardAnalyzer());
LogByteSizeMergePolicy mergePolicy = new LogByteSizeMergePolicy();
mergePolicy.setMergeFactor(10);
//mergePolicy.setUseCompoundFile(false);
config.setMergePolicy(mergePolicy);
config.setMergeScheduler(new SerialMergeScheduler());
writer = new IndexWriter(FSDirectory.open(tmpdir),
config);
} catch (IOException e) {
RuntimeException exc = new RuntimeException(e.toString(), e);
exc.setStackTrace(e.getStackTrace());
throw exc;
}
}
@Override
public void close(final TaskAttemptContext ctx) throws IOException,
InterruptedException {
//use a thread for status polling
final Thread th = new Thread() {
public void run() {
ctx.progress();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
};
th.start();
try {
writer.forceMerge(1);
writer.close();
// move all files to part
Configuration conf = ctx.getConfiguration();
Path work = committer.getWorkPath();
Path output = new Path(work, "index-"
+ ctx.getTaskAttemptID().getTaskID().getId());
FileSystem fs = FileSystem.get(conf);
FileUtil.copy(tmpdir, fs, output, true, conf);
} finally {
th.interrupt();
}
}
@Override
public void write(Writable key, Document doc) throws IOException,
InterruptedException {
writer.addDocument(doc);
}
}
}


最后总结一下,自定义输入格式的步骤:
(1)继承StoreFunc函数,重写其方法
(2)继承一个outputformat基类,重写自己的outputformat类
(2)继承一个RecodeWriter,重写自己的writer方法

当然这并不都是必须的,比如在向数据库存储的时候,我们就可以直接在putNext的时候,获取,保存为集合,然后在OutputCommitter提交成功之后,commit我们的数据,如果保存失败,我们也可以在abort方法里回滚我们的数据。

这样以来,无论我们存储哪里,都可以通过以上步骤实现,非常灵活

欢迎扫码关注微信公众号:我是攻城师(woshigcs)
本公众号的内容是有关搜索和大数据技术和互联网等方面内容的分享,也是一个温馨的技术互动交流的小家园,有什么问题随时都可以留言,欢迎大家来访!


DSC0000.jpg

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-327696-1-1.html 上篇帖子: 通向架构师的道路(第一天)之Apache整合Tomcat (转) 下篇帖子: 使用Apache digest做md5 SHA1 BASE64加密的范例(转)
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表