当前位置:首页 > 竞技风云 > 正文内容

combinebykey 《大数据处理技术Spark》--林子雨

admin1周前 (06-15)竞技风云10

combinebykey 《大数据处理技术Spark》--林子雨

从林子雨老师的网课上学到的东西,林老师讲的特别清晰,记录一下,防止忘记。

以下是资料的链接:

其他资料:

1. 概述

关系型数据库和非关系型数据库

五种主流的大数据计算框架:

大数据关键技术 分布式处理: 大数据计算模式:

代表性大数据技术之

Hive:会将sql语句转成底层的任务

:帮助选择主节点等

HBase:存储关系数据

Sqoop:完成从关系型数据库和数据之间的导入导出

:可视化的部署等等都归它管理

:可以将程序分发到不同的机器上(计算向数据靠拢)

- 缺点:任务必须要等待所有的map任务完成之后才能进行

- 缺点:每次数据都要写磁盘

Yarn:资源调度和管理框架,帮助调动底层cpu和内存资源用的

代表性大数据技术之Spark

最热门的主流技术。可以和兼容:可以读取HDFS,Hive/兼容;可以和noSQl兼容

spark克服了的操作的缺陷:

spark在的基础上的改进:

代表性大数据技术之Flink

flink性能好,为什么没有spark火?“既生瑜何生亮”+_+

代表性大数据技术之Beam

Beam提供了统一的编程接口起来,可以帮助转换成spark//flink

伪分布实例

伪分布式读取的则是 HDFS 上的数据。要使用 HDFS,首先需要在 HDFS 中创建用户目录:

hdfs dfs -mkdir -p /user/hadoop  # 已经将hadoop中的bin加入到环境变量中

将本地的word.txt复制到分布式文件系统的/user//input中

hdfs dfs -mkdir input # 因为现在使用的是hadoop用户,因此可以使用相对路径
hdfs dfs -put ./word.txt input # put
hdfs dfs -ls input # 可以查看文件列表

将的运行结果取回到本地

hdfs dfs -get output ./output # get
cat ./output/* # cat查看

这里hdfs dfs可以换成:

2. scala 语法

写到了scala 中

5. RDD 5.1 RDD编程

rdd编程-林子雨老师

5.1.1 RDD创建 5.1.1.1 从文件系统中加载数据创建RDD 5.1.1.2 通过并行集合(数组)创建RDD 5.1.2 RDD操作 5.1.2.1 转换操作 5.1.2.2 行动操作 5.1.2.3 惰性机制

只有动作类型的操作才能真正触发计算

5.1.2.4 实例

找出文本文件中单行文本所包含的单词数量的最大值

val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
lines.map(line => line.split(" ").size).reduce((a,b) => if (a>b) a else b)

5.1.3 持久化

在Spark中,RDD采用惰性求值的机制combinebykey 《大数据处理技术Spark》--林子雨,每次遇到行动操作,都会从头开始执 行计算。每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计 算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。

()的圆括号中包含的是持久化级别参数:

val list = List("Hadoop","Spark","Hive")
val rdd = sc.parallelize(list)
rdd.cache() 
println(rdd.count()) // 第一次行动操作,触发一次真正从头到尾的计算,这时才会执行上面的rdd.cache(),把这个rdd放到缓存中
println(rdd.collect().mkString(",")) //第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd

5.1.4 分区

RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上。

为什么要分区?(1)增加并行度 (2)减少通信开销

手动设置分区:

自定义分区

import org.apache.spark.{Partitioner, SparkContext, SparkConf}
//自定义分区类,需继承Partitioner类
class UsridPartitioner(numParts:Int) extends Partitioner{
 //覆盖分区数
 override def numPartitions: Int = numParts
 //覆盖分区号获取函数
 override def getPartition(key: Any): Int = {

combinebykey 《大数据处理技术Spark》--林子雨 第1张

key.toString.toInt } } object Test { def main(args: Array[String]) { val conf=new SparkConf() val sc=new SparkContext(conf) //模拟5个分区的数据 val data=sc.parallelize(1 to 10,5) //根据尾号转变为10个分区,分写到10个文件 data.map((_,1)).partitionBy(new UsridPartitioner(10)).map(_._1).saveAsTextFile("/chenm/partition") //占位符 _ } }

5.1.5 打印元素

概括下来是:

5.2 Pair RDD 5.2.1 Pair RDD的创建 第一种创建方式:从文件中加载第二种创建方式:通过并行集合(数组)创建RDD 5.2.2 常用的Pair RDD转换操作

(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)

keys:

pairRDD.keys
pairRDD.keys.foreach(println)

d1.collect
// res16: Array[(String, Int)] = Array((c,8), (b,25), (c,17), (a,42), (b,4), (d,9), (e,17), (c,2), (f,29), (g,21), (b,9))
d1.mapValues(_+1).collect
// res17: Array[(String, Int)] = Array((c,9), (b,26), (c,18), (a,43), (b,5), (d,10), (e,18), (c,3), (f,30), (g,22), (b,10))

5.2.3 一个综合实例

给定一组键值对(“spark”,2),(“”,6),(“”,4),(“spark”,6)combinebykey 《大数据处理技术Spark》--林子雨,键值 对的key表示图书名称,value表示某天图书销量combinebykey,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。

val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
rdd.mapValues(x => (x,1)).reduceByKey((x,y) => (x._1+y._1,x._2 + y._2)).mapValues(x => (x._1 / x._2)).collect()

5.3 共享变量

广播变量( )

累加器()

5.4 数据读写 5.4.1 文件数据读写 5.4.1.1 本地文件系统的数据读写

val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word123.txt")
textFile.saveAsTextFile("file:///usr/local/spark/mycode/wordcount/writeback.txt")

5.4.1.2 分布式文件系统HDFS的数据读写

val textFile = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
// val textFile = sc.textFile("/user/hadoop/word.txt")
// val textFile = sc.textFile("word.txt")
textFile.first()

5.4.1.3 JSON文件的数据读写

import scala.util.parsing.json.JSON
val jsonStr = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.json")
// jsonStr.foreach(println)
val result = jsonStrs.map(s => JSON.parseFull(s))
result.foreach( {r => r match {
  case Some(map: Map[String, Any]) => println(map)
  case None => println("Parsing failed")
  case other => println("Unknown data structure: " + other)

剩下的部分直接参考林老师的ppt吧

5.4.2 读写HBase数据 5.5 程序解析 5.6 综合案例 6 Spark SQL 6.1 简介

hive on spark == Sharkcombinebykey,hive将SQL语句转为MR;Shark将SQL转为Spark的应用程序代码;Shark建立在hive上combinebykey,受限与hive,但是效率提升了10-100倍;MR是进程级别的并行,Shark是线程级别的并行,存在线程安全的保证,因此之后停止了更新Spark SQL。

spark SQL在兼容Hive基础上,只是借鉴了Hive的语法解析

6.2 和RDD区别

spark SQL采用的不是RDD,而是。是结构化的对象,查询效率更高。

6.3 的创建 6.4 从RDD到DF 6.4.1 利用反射机制推断RDD模式

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoder
import spark.implicits._ //导入包,支持把一个RDD隐式转换为一个DataFrame
case class Person(name: String, age: Long) // 定义一个case class
val peopleDF = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt").map(_.split(",")).map(attributes => Person(attributes(0), attributes(1).trim.toInt)).toDF()
peopleDF.createOrReplaceTempView(“people”) // 必须注册为临时表才能供下面的查询使用
val personsRDD = spark.sql("select name,age from people where age > 20") // 最终生成一个DataFrame
personsRDD.map(t => “Name:+t(0)+,+“Age:+t(1)).show() // DataFrame中的每个元素都是一行记录,包含name和age两个字段,分别用t(0)和t(1)来获取值

combinebykey 《大数据处理技术Spark》--林子雨 第2张

6.4.2 使用编程方式定义RDD模式

当无法提前定义case class时,就需要采用编程方式定义RDD模式

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val peopleRDD = spark.sparkContext.textFile("file:///usr/local/spark/examples/src/main/resources/people.txt")
//定义一个模式字符串
val schemaString = "name age"
//根据模式字符串生成模式
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
//从上面打印的信息可以看出,schema描述了模式信息,模式中包含name和age两个字段
//对peopleRDD 这个RDD中的每一行元素都进行解析
val rowRDD = peopleRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1).trim))
val peopleDF = spark.createDataFrame(rowRDD, schema)
//必须注册为临时表才能供下面查询使用
peopleDF.createOrReplaceTempView("people")
val results = spark.sql("SELECT name,age FROM people")
results.map(attributes => "name: " + attributes(0)+","+"age:"+attributes(1)).show()

6.4.3 把RDD保存成文件

val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
peopleDF.select("name","age").write.format("csv").save("file:///usr/local/spark/mycode/newpeople.csv")
val textFile = sc.textFile("file:///usr/local/spark/mycode/newpeople.csv")

write.()支持输出 json,, jdbc, orc, , csv, text等格式文件

val peopleDF = spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json")
df.rdd.saveAsTextFile("file:///usr/local/spark/mycode/newpeople.txt")

6.5 读取和保存数据 6.5.1 读写 6.5.2 通过JDBC连接数据库 安装MySQL及常用操作

mysql的jdbc驱动程序,下载地址

6.5.3 连接Hive读写数据 6.5.3.1 Hive简介和安装

《安装hive,并配置mysql作为元数据库》

6.5.3.2 让Spark包含Hive支持

测试spark版本是否支持Hive

import org.apache.spark.sql.hive.HiveContext
// 支持的输出:import org.apache.spark.sql.hive.HiveContext 

6.5.3.3 在Hive中创建数据库和表 启动: start-all.sh(已经将的路径加入到环境变量中)启动Hive:hive, 添加数据表

// hive脚本下执行
create database if not exists sparktest;//创建数据库sparktest
show databases; 
create table if not exists sparktest.student(id int,name string, gender string, age int);
use sparktest; //切换到sparktest
show tables; //显示sparktest数据库下面有哪些表
insert into student values(1,'Xueqian','F',23); //插入一条记录
insert into student values(2,'Weiliang','M',24); //再插入一条记录
select * from student; //显示student表中的记录

6.5.3.4 连接Hive读写数据 在spark-shell(包含Hive支持)中执行以下命令从Hive中读取数据

import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
case class Record(key: Int, value: String)
val warehouseLocation = "spark-warehouse”
val spark = SparkSession.builder().appName("Spark Hive Example").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate()
import spark.implicits._
import spark.sql
sql("SELECT * FROM sparktest.student").show()

编写程序向Hive数据库的.表中插入两条数据

// 准备两条数据
val studentRDD = spark.sparkContext.parallelize(Array("3 Rongcheng M 26","4 Guanhua M 27")).map(_.split(" "))
// 设置模式信息
val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true)))
// 下面创建Row对象,每个Row对象都是rowRDD中的一行
val rowRDD = studentRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).trim,p(3).toInt))
// 建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
val studentDF = spark.createDataFrame(rowRDD, schema)
// 查看studentDF
studentDF.show()
// 注册临时表
studentDF.registerTempTable("tempTable")
// 插入
sql("insert into sparktest.student select * from tempTable")

加入微信交流群:************ ,请猛戳这里→点击入群

扫描二维码推送至手机访问。

版权声明:本文由前沿科技娱乐汇发布,如需转载请注明出处。

本文链接:https://www.kejiyl.com/post/5228.html

分享给朋友:

“combinebykey 《大数据处理技术Spark》--林子雨” 的相关文章

国际局势风云变幻:美俄乌利雅得 “穿梭外交”,和平曙光能否降临?

国际局势风云变幻:美俄乌利雅得 “穿梭外交”,和平曙光能否降临?

在当今复杂多变的国际舞台上,美俄乌之间的互动一直是全球关注的焦点。近期,美俄乌在利雅得展开的“穿梭外交”更是引发了广泛的讨论和期待,和平的曙光似乎在这动荡的局势中若隐若现,其最终能否真正降临,仍充满着诸多不确定性。美国作为全球超级大国,在美俄乌关系中一直扮演着重要的角色。其在俄乌冲突中采取的一系列行...

老旧小区改造刻不容缓

老旧小区改造刻不容缓

在当今社会,老旧小区改造工程正以其坚定的步伐持续推进,犹如一股温暖的春风,悄然吹进了千家万户,给居民的生活环境带来了翻天覆地的变化。老旧小区,曾经是城市发展进程中的一个特殊群体,它们承载着许多居民的记忆和情感,却也面临着诸多问题。设施老化、环境脏乱、居住条件差等问题日益凸显,严重影响了居民的生活质量...

羽毛球汤尤杯赛程

羽毛球汤尤杯赛程

在体育的浩瀚星空中,有这样一支队伍,始终如同一颗璀璨的巨星,散发着无与伦比的光芒,那便是中国羽毛球队。在 2023 年汤尤杯的赛场上,他们再次书写了辉煌的篇章,包揽了两项冠军,王者归来的气势令人震撼。汤姆斯杯,被誉为世界男子羽毛球团体锦标赛的巅峰之战,每一届都吸引着全球羽毛球爱好者的目光。这一次,中...

全球疫情新警报

全球疫情新警报

在人类历史的长河中,病毒如幽灵般时常悄然降临,给全球带来巨大的冲击和考验。如今,新病毒来袭,那如阴云般密布的阴霾再次笼罩了世界,全球防疫警报也随之再次拉响。这新病毒的出现,仿佛是大自然对人类的一次严厉警示。它以迅雷不及掩耳之势在全球范围内迅速蔓延,从繁华的都市到偏远的乡村,从发达国家到发展中国家,无...

处于行业垄断地位的公司

处于行业垄断地位的公司

在当今的商业世界中,我们常常看到一些行业出现巨头垄断的现象,这些巨头凭借着其强大的资源、技术和品牌优势,几乎占据了整个市场的主导地位,而众多中小企业则在这种垄断的阴影下艰难求生,面临着诸多难以逾越的困境。行业巨头的垄断行为往往体现在多个方面。在市场份额上,它们以压倒性的优势占据了绝大部分的市场份额,...

国际航班降落哪些城市

国际航班降落哪些城市

在航空领域,安全始终是重中之重。不幸的是,一起突发的国际航班严重故障事件,让人们再次深刻认识到航空安全的严峻性和不确定性。这架国际航班原本在浩瀚的天空中平稳飞行,承载着众多乘客的梦想与期待。突如其来的故障仿佛一颗定时炸弹,瞬间打破了这份宁静与祥和。机组人员凭借着丰富的经验和临危不乱的精神,迅速察觉到...