设为首页 收藏本站
查看: 1144|回复: 0

[经验分享] spark2.x由浅入深深到底系列七之py4j在spark中python api的使用

[复制链接]

尚未签到

发表于 2019-1-30 12:10:53 | 显示全部楼层 |阅读模式
  学习spark的任何技术前请先正确理解spark,可以参考: 正确理解Spark
  

  我们知道spark的RDD支持scala apijava api以及python api,我们分别对scala api与java api做了详细的介绍,本文我们将探讨rdd python api是怎么使用py4j来调用scala/java的api的,从而来实现python api的功能。首先我们先介绍下py4j。
  

  一、py4j
  py4j是一个使得python可以调用jvm中的对象的类库。看一个py4j官网上的例子:

  

  首先编写一个java程序
package com.twq.javaapi;
import py4j.GatewayServer;
/**
* Created by tangweiqun on 2017/9/22.
*/
public class AdditionApplication {
    public int addition(int first, int second) {
        return first + second;
    }
    public static void main(String[] args) {
        AdditionApplication app = new AdditionApplication();
        // app is now the gateway.entry_point
        //启动一个py4j的服务端,python可以连接到这个服务监听的端口,然后调用java的对象及其方法
        GatewayServer server = new GatewayServer(app);
        server.start();
    }
}  上面的java代码依赖一个jar包,我们可以通过maven引进来,如下:

    net.sf.py4j
    py4j
    0.10.4
  我们可以先在ide中启动上面的类AdditionApplication的main方法,将GatewayServer启动起来
  

  然后,我们打开python解释器,执行下面的代码:
>>> from py4j.java_gateway import JavaGateway>>> gateway = JavaGateway()                   # connect to the JVM>>> random = gateway.jvm.java.util.Random()   # create a java.util.Random instance>>> number1 = random.nextInt(10)              # call the Random.nextInt method>>> number2 = random.nextInt(10)>>> print(number1,number2)(2, 7)>>> addition_app = gateway.entry_point        # get the AdditionApplication instance>>> addition_app.addition(number1,number2)    # call the addition method9  上面的python代码依赖py4j,我们可以根据http://www.py4j.org/install.html#install-instructions的方法来安装py4j。从上面可以看出,我们在python中可以很简单的调用jvm中的Random以及AdditionApplication对象的方法
  

  二、py4j在spark中实现python api调用java/scala api
  首先,我们编写一个很简单的python版本的spark应用,如下:
if __name__ == "__main__":
    conf = SparkConf().setAppName("appName")
    sc = SparkContext(conf=conf)
    sourceDataRDD = sc.textFile("hdfs://master:9999/users/hadoop-twq/word.txt")
    wordsRDD = sourceDataRDD.flatMap(lambda line: line.split())
    keyValueWordsRDD = wordsRDD.map(lambda s: (s, 1))
    wordCountRDD = keyValueWordsRDD.reduceByKey(lambda a, b: a + b)
    wordCountRDD.saveAsTextFile("hdfs://master:9999" + output_path_service.get_output_path())
    print utils.get_rdd_result("wordCountRDD", wordCountRDD)  上面是一个很简单的python版的spark wordcount应用,我们通过下面的spark-submit命令,提交到spark集群中执行:
spark-submit \
--name "PythonWordCount" \
--master yarn \
--deploy-mode client \
--driver-memory 512m \
--executor-memory 512m \
--num-executors 2 \
--executor-cores 1 \
--py-files word_count_python.zip \
/home/hadoop-twq/spark-course/spark_word_count.py  (对于spark-submit每一个参数的含义以及spark-submit的原理是怎么样的,可以参考:正确提交spark应用)
  提交到集群运行后,会在driver端程序启动一个org.apache.spark.deploy.PythonRunner的类,这个里面做了两件事情
  1、初始化并启动GatewayServer,如下代码:
// Launch a Py4J gateway server for the process to connect to; this will let it see our
// Java system properties and such
//用于python的代码访问当前jvm中的对象的
val gatewayServer = new py4j.GatewayServer(null, 0)
val thread = new Thread(new Runnable() {
  override def run(): Unit = Utils.logUncaughtExceptions {
    gatewayServer.start()
  }
})
thread.setName("py4j-gateway-init")
thread.setDaemon(true)
thread.start()  2、利用ProcessBuilder来启动执行上面的spark_word_count.py python文件,如下:
// Launch Python process
val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava)
val env = builder.environment()
env.put("PYTHONPATH", pythonPath)
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
// pass conf spark.pyspark.python to python process, the only way to pass info to
// python process is through environment variable.
sparkConf.get(PYSPARK_PYTHON).foreach(env.put("PYSPARK_PYTHON", _))
sys.env.get("PYTHONHASHSEED").foreach(env.put("PYTHONHASHSEED", _))
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize  这边需要注意的是PYSPARK_GATEWAY_PORT这个环境变量,这个环境变量的值是第一步启动起来的GatewayServer的监听端口,我们将这个端口以环境变量的方式传递给启动的python进程。
  

  然后,当上面的第2步启动了spark_word_count.py python进程后,开始执行spark_word_count.py中的内容,当执行到sc=SparkContext(conf),即初始化SparkContext,这个时候在SparkContext初始化的时候,会启动一个py4j的Gateway来和上面启动的GatewayServer进行通讯,如下代码(在context.py文件中):
SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)def _ensure_initialized(cls, instance=None, gateway=None, conf=None):
    """
    Checks whether a SparkContext is initialized or not.
    Throws error if a SparkContext is already running.
    """
    with SparkContext._lock:
        if not SparkContext._gateway:
            #这里是启动一个Gateway,并且将Gateway的jvm赋值给成员变量_jvm,这样我们就可以通过这个_jvm变量来访问jvm中的java对象及其方法
            SparkContext._gateway = gateway or launch_gateway(conf)
            SparkContext._jvm = SparkContext._gateway.jvm
        if instance:
            if (SparkContext._active_spark_context and
                    SparkContext._active_spark_context != instance):
                currentMaster = SparkContext._active_spark_context.master
                currentAppName = SparkContext._active_spark_context.appName
                callsite = SparkContext._active_spark_context._callsite
                # Raise error if there is already a running Spark context
                raise ValueError(
                    "Cannot run multiple SparkContexts at once; "
                    "existing SparkContext(app=%s, master=%s)"
                    " created by %s at %s:%s "
                    % (currentAppName, currentMaster,
                        callsite.function, callsite.file, callsite.linenum))
            else:
                SparkContext._active_spark_context = instance  在launch_gateway(conf)(源代码在java_gateway.py中)方法中会初始化一个Gateway,如下:

#从环境变量中拿到环境变量PYSPARK_GATEWAY_PORT,这个就是我们在PythonRunner中设置的环境变量
if "PYSPARK_GATEWAY_PORT" in os.environ:
    gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
# Connect to the gateway
# 启动一个JavaGateway同GatewayServer进行通讯
gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=True)
#将python api中需要的java/scala的类引入引进来
# Import the classes used by PySpark
java_import(gateway.jvm, "org.apache.spark.SparkConf")
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
java_import(gateway.jvm, "org.apache.spark.ml.python.*")
java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
# TODO(davies): move into sql
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
java_import(gateway.jvm, "scala.Tuple2")
return gateway  

  这样,python中的SparkContext就可以访问RDD java api了,如下是在python文件context.py中访问java api的JavaSparkContext:
def _initialize_context(self, jconf):
    """
    Initialize SparkContext in function to allow subclass specific initialization
    """
    return self._jvm.JavaSparkContext(jconf)  





运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-669603-1-1.html 上篇帖子: spark支持lzo 下篇帖子: 遍历Spark的RDD
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表