Guangjing Wang Assistant Professor @ USF

Notes for Spark (2)

2017-07-17

In this post, I mainly make notes about Spark with DFrame and DStream. This post is mainly based on the open tutorial from here.

HBase

HBase是针对谷歌BigTable的开源实现,是一个高可靠、高性能、面向列、可伸缩的分布式数据库,主要用来存储非结构化和半结构化的松散数据,HBase是一个分布式面向列的分布式数据库。

hbase>  create 'student','info' #不需要创建数据库,直接创建表即可
hbase>  list  #查看HBase数据库中有那些已经创建好的表
hbase>  describe 'student' #查看student表的基本信息
hbase>  put 'student','1','info:name','Xueqian' #put 命令添加数据,一次只能为一个单元格添加一个数据
hbase>  get 'student','1' #每次只查看一行
hbase>  scan 'student' #查看全部数据

编程向HBase写入数据:

host = 'localhost'
table = 'student'
keyConv = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter"
valueConv = "org.apache.spark.examples.pythonconverters.StringListToPutConverter"
conf = {"hbase.zookeeper.quorum": host,"hbase.mapred.outputtable": table,"mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat","mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable","mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"}
 
rawData = ['3,info,name,Rongcheng','4,info,name,Guanhua']

sc.parallelize(rawData).map(lambda x: (x[0],x.split(','))).saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv,valueConverter=valueConv)

编程从HBase读取数据:

host = 'localhost'
table = 'student'
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)
count = hbase_rdd.count()
hbase_rdd.cache()
output = hbase_rdd.collect()

DataFrame

RDD是分布式的 Java对象的集合,比如,RDD[Person]是以Person为类型参数,但是,Person类的内部结构对于RDD而言却是不可知的。DataFrame是一种以RDD为基础的分布式数据集,也就是分布式的Row对象的集合(每个Row对象代表一行记录),提供了详细的结构信息,也就是模式(schema),Spark SQL可以清楚地知道该数据集中包含哪些列、每列的名称和类型。

spark=SparkSession.builder.getOrCreate()
df = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
df.show()
df.printSchema() #打印模式信息
df.select(df.name,df.age + 1).show()
df.filter(df.age > 20 ).show()
df.groupBy("age").count().show()
df.sort(df.age.desc()).show()
df.select(df.name.alias("username"),df.age).show()

RDD2DataFrame

从RDD转换得到DataFrame,第一种方法是,利用反射来推断包含特定类型对象的RDD的schema,适用对已知数据结构的RDD转换;第二种方法是,使用编程接口,构造一个schema并将其应用在已知的RDD上。

>>> from pyspark.sql.types import Row
>>> def f(x):
...     rel = {}
...     rel['name'] = x[0]
...     rel['age'] = x[1]
...     return rel
>>> peopleDF = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(lambda line : line.split(',')).map(lambda x: Row(**f(x))).toDF()
>>> peopleDF.createOrReplaceTempView("people")  //必须注册为临时表才能供下面的查询使用
>>> personsDF = spark.sql("select * from people")
>>> personsDF.rdd.map(lambda t : "Name:"+t[0]+","+"Age:"+t[1]).foreach(print)
from pyspark.sql.types import Row
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType

peopleRDD = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
schemaString = "name age"
fields = list(map( lambda fieldName : StructField(fieldName, StringType(), nullable = True), schemaString.split(" ")))
schema = StructType(fields)
rowRDD = peopleRDD.map(lambda line : line.split(',')).map(lambda attributes : Row(attributes[0], attributes[1]))
peopleDF = spark.createDataFrame(rowRDD, schema)

Dump RDD

peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("csv").save("file:///usr/local/spark/mycode/newpeople.csv")

peopleDF.rdd.saveAsTextFile("file:///usr/local/spark/mycode/newpeople.txt")

Content