|
#!/usr/bin/env python
from time import timefrom pyspark.sql import *from pyspark import SparkConf, SparkContext
conf = (SparkConf()
.setAppName("spark_sql_random_lookup")
.set("spark.executor.instances", "10")
.set("spark.executor.cores", 2)
.set("spark.dynamicAllocation.enabled", "false")
.set("spark.shuffle.service.enabled", "false")
.set("spark.executor.memory", "500MB"))
sc = SparkContext(conf = conf)
sqlContext = SQLContext(sc)
t0 = time()
path = "/data/customer_orders*"lines = sc.textFile(path)
## create data frameorders_df = sqlContext.createDataFrame( \
lines.map(lambda l: l.split("|")) \
.map(lambda p: Row(cust_id=int(p[0]), order_id=int(p[1]), email_hash=p[2], ssn_hash=p[3], product_id=int(p[4]), product_desc=p[5], \
country=p[6], state=p[7], shipping_carrier=p[8], shipping_type=p[9], shipping_class=p[10] ) ) )
## register data frame as a temporary tableorders_df.registerTempTable("orders")
## filter where the customer_id, the first field, is equal to 96922894print sqlContext.sql("SELECT * FROM orders where order_id = 96922894").collect()
tt = str(time() - t0)print "SparkSQL performed in " + tt + " seconds"
|
|
|