设为首页 收藏本站
查看: 1386|回复: 0

[经验分享] Spark JDBC To MySQL

[复制链接]

尚未签到

发表于 2017-12-12 10:32:47 | 显示全部楼层 |阅读模式
  mysql jdbc driver下载地址
  https://dev.mysql.com/downloads/connector/j/
  在spark中使用jdbc
  1.在 spark-env.sh 文件中加入:
  export SPARK_CLASSPATH=/path/mysql-connector-java-5.1.42.jar
  2.任务提交时加入:
  --jars /path/mysql-connector-java-5.1.42.jar
  从Spark Shell连接到MySQL:
  spark-shell --jars "/path/mysql-connector-java-5.1.42.jar
  可以使用Data Sources API将来自远程数据库的表作为DataFrame或Spark SQL临时视图加载。用户可以在数据源选项中指定JDBC连接属性。
  可以使用Data Sources API将来自远程数据库的表作为DataFrame或Spark SQL临时视图加载。用户可以在数据源选项中指定JDBC连接属性。 user并且password通常作为用于登录数据源的连接属性提供。除了连接属性外,Spark还支持以下不区分大小写的选项:
  JDBC connection properties
  属性名称和含义
  url:要连接的JDBC URL。列如:jdbc:mysql://ip:3306
  dbtable:应该读取的JDBC表。可以使用括号中的子查询代替完整表。
  driver:用于连接到此URL的JDBC驱动程序的类名,列如:com.mysql.jdbc.Driver
  partitionColumn, lowerBound, upperBound, numPartitions
  这些options仅适用于read数据。这些options必须同时被指定。他们描述,如何从多个workers并行读取数据时,分割表。
  partitionColumn:必须是表中的数字列。
  lowerBound和upperBound仅用于决定分区的大小,而不是用于过滤表中的行。
  表中的所有行将被分割并返回。
  fetchsize:仅适用于read数据。JDBC提取大小,用于确定每次获取的行数。这可以帮助JDBC驱动程序调优性能,这些驱动程序默认具有较低的提取大小(例如,Oracle每次提取10行)。
  batchsize:仅适用于write数据。JDBC批量大小,用于确定每次insert的行数。
  这可以帮助JDBC驱动程序调优性能。默认为1000。
  isolationLevel:仅适用于write数据。事务隔离级别,适用于当前连接。它可以是一个NONE,READ_COMMITTED,READ_UNCOMMITTED,REPEATABLE_READ,或SERIALIZABLE,对应于由JDBC的连接对象定义,缺省值为标准事务隔离级别READ_UNCOMMITTED。请参阅文档java.sql.Connection。
  truncate:仅适用于write数据。当SaveMode.Overwrite启用时,此选项会truncate在MySQL中的表,而不是删除,再重建其现有的表。这可以更有效,并且防止表元数据(例如,索引)被去除。但是,在某些情况下,例如当新数据具有不同的模式时,它将无法工作。它默认为false。
  createTableOptions:仅适用于write数据。此选项允许在创建表(例如CREATE TABLE t (name string) ENGINE=InnoDB.)时设置特定的数据库表和分区选项。
  spark jdbc read MySQL

  

val jdbcDF11 = spark.read.format("jdbc")  .option("driver", "com.mysql.jdbc.Driver")
  .option("url", "jdbc:mysql://ip:3306")
  .option("dbtable", "db.user_test")
  .option("user", "test")
  .option("password", "123456")
  .option("fetchsize", "3")
  .load()
  
jdbcDF11.show
  

  
val jdbcDF12 = spark.read.format("jdbc").options(
  Map(
  "driver" -> "com.mysql.jdbc.Driver",
  "url" -> "jdbc:mysql://ip:3306",
  "dbtable" -> "db.user_test",
  "user" -> "test",
  "password" -> "123456",
  "fetchsize" -> "3")).load()
  
jdbcDF12.show
  

  

  jdbc(url: String, table: String, properties: Properties): DataFrame
  

//-----------------------------------  

  
import java.util.Properties
  

  
// jdbc(url: String, table: String, properties: Properties): DataFrame
  

  
val readConnProperties1 = new Properties()
  
readConnProperties1.put("driver", "com.mysql.jdbc.Driver")
  
readConnProperties1.put("user", "test")
  
readConnProperties1.put("password", "123456")
  
readConnProperties1.put("fetchsize", "3")
  

  
val jdbcDF1 = spark.read.jdbc(
  "jdbc:mysql://ip:3306",
  "db.user_test",
  readConnProperties1)
  

  
jdbcDF1.show
  
+---+------+---+
  
|uid|gender|age|
  
+---+------+---+
  
|  2|     2| 20|
  
|  3|     1| 30|
  
|  4|     2| 40|
  
|  5|     1| 50|
  
|  6|     2| 60|
  
|  7|     1| 25|
  
|  8|     2| 35|
  
|  9|     1| 70|
  
| 10|     2| 80|
  
|  1|     1| 18|
  
+---+------+---+
  

  

  
//默认并行度为1
  
jdbcDF1.rdd.partitions.size
  
Int = 1
  

  
//-------------------------
  
// jdbc(url: String, table: String, properties: Properties): DataFrame
  

  
val readConnProperties4 = new Properties()
  
readConnProperties4.put("driver", "com.mysql.jdbc.Driver")
  
readConnProperties4.put("user", "test")
  
readConnProperties4.put("password", "123456")
  
readConnProperties4.put("fetchsize", "3")
  

  

  
val jdbcDF4 = spark.read.jdbc(
  "jdbc:mysql://ip:3306",
  "(select * from db.user_test where gender=1) t",  // 注意括号和表别名,必须得有,这里可以过滤数据
  readConnProperties4)
  
jdbcDF4.show
  
+---+------+---+
  
|uid|gender|age|
  
+---+------+---+
  
|  3|     1| 30|
  
|  5|     1| 50|
  
|  7|     1| 25|
  
|  9|     1| 70|
  
|  1|     1| 18|
  
+---+------+---+
  

  

  jdbc(url: String, table: String,
  columnName: String, lowerBound: Long, upperBound: Long, numPartitions: Int,
  connectionProperties: Properties): DataFrame
  

  
import java.util.Properties
  

  
val readConnProperties2 = new Properties()
  
readConnProperties2.put("driver", "com.mysql.jdbc.Driver")
  
readConnProperties2.put("user", "test")
  
readConnProperties2.put("password", "123456")
  
readConnProperties2.put("fetchsize", "2")
  

  
val columnName = "uid"
  
val lowerBound = 1
  
val upperBound = 6
  
val numPartitions = 3
  

  
val jdbcDF2 = spark.read.jdbc(
  "jdbc:mysql://ip:3306",
  "db.user_test",
  columnName,
  lowerBound,
  upperBound,
  numPartitions,
  readConnProperties2)
  

  
jdbcDF2.show
  
+---+------+---+
  
|uid|gender|age|
  
+---+------+---+
  
|  2|     2| 20|
  
|  1|     1| 18|
  
|  3|     1| 30|
  
|  4|     2| 40|
  
|  5|     1| 50|
  
|  6|     2| 60|
  
|  7|     1| 25|
  
|  8|     2| 35|
  
|  9|     1| 70|
  
| 10|     2| 80|
  
+---+------+---+
  

  
// 并行度为3,对应于numPartitions
  
jdbcDF2.rdd.partitions.size
  
Int = 3
  

  

  jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame
  predicates: Condition in the WHERE clause for each partition.
  

import java.util.Properties  

  
val readConnProperties3 = new Properties()
  
readConnProperties3.put("driver", "com.mysql.jdbc.Driver")
  
readConnProperties3.put("user", "test")
  
readConnProperties3.put("password", "123456")
  
readConnProperties3.put("fetchsize", "2")
  

  
val arr = Array(
  (1, 50),
  (2, 60))
  

  
// 此处的条件,既可以分割数据用作并行度,也可以过滤数据
  
val predicates = arr.map {
  case (gender, age) =>
  s" gender = $gender " + s" AND age < $age "
  
}
  

  
val predicates1 =
  Array(
  "2017-05-01" -> "2017-05-20",
  "2017-06-01" -> "2017-06-05").map {
  case (start, end) =>
  s"cast(create_time as date) >= date '$start' " + s"AND cast(create_time as date) <= date '$end'"
  }
  

  
val jdbcDF3 = spark.read.jdbc(
  "jdbc:mysql://ip:3306",
  "db.user_test",
  predicates,
  readConnProperties3)
  

  

  

  
jdbcDF3.show
  
+---+------+---+
  
|uid|gender|age|
  
+---+------+---+
  
|  3|     1| 30|
  
|  7|     1| 25|
  
|  1|     1| 18|
  
|  2|     2| 20|
  
|  4|     2| 40|
  
|  8|     2| 35|
  
+---+------+---+
  

  
// 并行度为2,对应arr数组中元素的个数
  
jdbcDF3.rdd.partitions.size
  
Int = 2
  

  

  spark jdbc write MySQL
  

// For implicit conversions like converting RDDs to DataFrames  
import spark.implicits._
  

  
val dataList: List[(Double, String, Double, Double, String, Double, Double, Double, Double)] = List(
  (0, "male", 37, 10, "no", 3, 18, 7, 4),
  (0, "female", 27, 4, "no", 4, 14, 6, 4),
  (0, "female", 32, 15, "yes", 1, 12, 1, 4),
  (0, "male", 57, 15, "yes", 5, 18, 6, 5),
  (0, "male", 22, 0.75, "no", 2, 17, 6, 3),
  (0, "female", 32, 1.5, "no", 2, 17, 5, 5),
  (0, "female", 22, 0.75, "no", 2, 12, 1, 3),
  (0, "male", 57, 15, "yes", 2, 14, 4, 4),
  (0, "female", 32, 15, "yes", 4, 16, 1, 2))
  

  
val colArray: Array[String] = Array("affairs", "gender", "age", "yearsmarried", "children", "religiousness", "education", "occupation", "rating")
  

  
val df = dataList.toDF(colArray: _*)
  

  
df.write.mode("overwrite").format("jdbc").options(
  Map(
  "driver" -> "com.mysql.jdbc.Driver",
  "url" -> "jdbc:mysql://ip:3306",
  "dbtable" -> "db.affairs",
  "user" -> "test",
  "password" -> "123456",
  "batchsize" -> "1000",
  "truncate" -> "true")).save()
  

  

运维网声明 1、欢迎大家加入本站运维交流群:群②:261659950 群⑤:202807635 群⑦870801961 群⑧679858003
2、本站所有主题由该帖子作者发表,该帖子作者与运维网享有帖子相关版权
3、所有作品的著作权均归原作者享有,请您和我们一样尊重他人的著作权等合法权益。如果您对作品感到满意,请购买正版
4、禁止制作、复制、发布和传播具有反动、淫秽、色情、暴力、凶杀等内容的信息,一经发现立即删除。若您因此触犯法律,一切后果自负,我们对此不承担任何责任
5、所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其内容的准确性、可靠性、正当性、安全性、合法性等负责,亦不承担任何法律责任
6、所有作品仅供您个人学习、研究或欣赏,不得用于商业或者其他用途,否则,一切后果均由您自己承担,我们对此不承担任何法律责任
7、如涉及侵犯版权等问题,请您及时通知我们,我们将立即采取措施予以解决
8、联系人Email:admin@iyunv.com 网址:www.yunweiku.com

所有资源均系网友上传或者通过网络收集,我们仅提供一个展示、介绍、观摩学习的平台,我们不对其承担任何法律责任,如涉及侵犯版权等问题,请您及时通知我们,我们将立即处理,联系人Email:kefu@iyunv.com,QQ:1061981298 本贴地址:https://www.yunweiku.com/thread-423269-1-1.html 上篇帖子: Mysql主从同步(1) 下篇帖子: 分布式MySql
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

扫码加入运维网微信交流群X

扫码加入运维网微信交流群

扫描二维码加入运维网微信交流群,最新一手资源尽在官方微信交流群!快快加入我们吧...

扫描微信二维码查看详情

客服E-mail:kefu@iyunv.com 客服QQ:1061981298


QQ群⑦:运维网交流群⑦ QQ群⑧:运维网交流群⑧ k8s群:运维网kubernetes交流群


提醒:禁止发布任何违反国家法律、法规的言论与图片等内容;本站内容均来自个人观点与网络等信息,非本站认同之观点.


本站大部分资源是网友从网上搜集分享而来,其版权均归原作者及其网站所有,我们尊重他人的合法权益,如有内容侵犯您的合法权益,请及时与我们联系进行核实删除!



合作伙伴: 青云cloud

快速回复 返回顶部 返回列表