说明
该文档详细描述使用scala编码word count任务,通过sbt工具构建包,提交spark streaming任务。 实现了几种有代表性的任务.包括从文件系统、HDFS读写,从tcp socket读写, 从kafka读写,实现无状态、有状态、滑动窗口、故障恢复。
测试环境
ssh: root@10.2.35.117 密码 123456
ssh: hadoop@10.2.35.117 密码 123456
spark安装目录 /usr/local/spark/
建议使用hadoop账户执行spark任务
spark项目目录 /data/scala-test/hw
建议使用root账户执行sbt打包
注意: 避免在编码时通过setmaster传递spark master地址。通过spark-submit --master spark://10.2.35.117:7077参数传递。包括在checkpoint重启时
scala 安装
- 官网下载sbt, sbt 作为项目管理工具之一,可以为每个项目管理特定版本的scala(比如spark默认使用scala2.11构建), 编译,测试,打包功能
- 解压安装后设置下PATH环境变量。
- 新建hw目录, 在目录内执行sbt
- touch build.sbt 手动生成项目定义文件,文件内至少要有 name := "helloworld" 和 version := "0.0.1-tag" 2字段。
- mkdir -p src/main/scala/
- 在上面目录里 touch main.scala , 具体的目录结构为
. ├── build.sbt ├── project ├── src │ └── main │ └── scala │ └── hw.scala └── target └── scala-2.11 └── classes └── hello_2.11-0.0.1-SNAPSHOT.jar
object Hi { def main(args: Array[String]) = println("Hi!") }
在根目录 sbt 进入sbt控制台, run 或 compile, test, package, 也可在shell里 一次多个子命令 sbt compile test package
在sbt控制台 ~compile 命令 可以监听代码文件修改,保存后自动编译执行
- sbt clean compile "testOnly TestA TestB" testOnly 有两个参数 TestA 和 TestB。这个命令会按顺序执行(clean, compile, 然后 testOnly)。
- 常用命令 https://www.scala-sbt.org/release/docs/zh-cn/Running.html
spark + scala 初次结合
- api doc https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset
- spark stream programming guide 中文 https://spark-reference-doc-cn.readthedocs.io/zh_CN/latest/programming-guide/quick-start.html
val textFile = spark.read.textFile("README.md") // 用 SparkSession的read方法读取本地文件创建Dataset val linewithspark = txtFile.filter(line => line.contains("Spark")) // 筛选得出新的Dataset. 这种方法称为transformation 转换,转换是延迟操作 linewithspark.count // 得出行数 ,这种方法称为action 动作,动作启动计算操作
独立的scala程序
- 按照上面建立好项目目录.
- 修改build.sbt
name := "hello" // 项目名称 version := "0.0.1-SNAPSHOT" // 版本号 scalaVersion := "2.11.8" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.2.1"
``` import org.apache.spark.sql.SparkSession
object SimpleApp { def main(args: Array[String]) { val logFile = "hdfs://10.2.35.117:9000/user/hadoop/hello.go" // 从hdfs里读取 val spark = SparkSession.builder.appName("Simple Application").getOrCreate() val logData = spark.read.textFile(logFile).cache() val numAs = logData.filter(line => line.contains("func")).count() val numBs = logData.filter(line => line.contains("string")).count() println(s"Lines with func: $numAs, Lines with string: $numBs") spark.stop() } } ```
- 墙,修改国内源 ~/.sbt/repositories
[repositories] local aliyun-nexus: http://maven.aliyun.com/nexus/content/groups/public/
typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly sonatype-oss-releases maven-central sonatype-oss-snapshots - sbt package 打包,得到 target/scala版本/namescala版本version.jar
- 通过spark-submit 提交任务包 ./spark-submit --class SimpleApp --master local[4] /data/scala-test/hw/target/scala-2.11/hello_2.11-0.0.1-SNAPSHOT.jar // 本地进行计算
- ./spark-submit --class SimpleApp --master spark://10.2.35.117:7077 /data/scala-test/hw/target/scala-2.11/hello_2.11-0.0.1-SNAPSHOT.jar // 提交到spark集群进行计算 注意root/hadoop账户环境差异
- $SPARK_HOME/example/里有很多例子 ./bin/run-example SparkPi
- 具体的spark-submit 使用帮助 https://spark-reference-doc-cn.readthedocs.io/zh_CN/latest/deploy-guide/submitting-applications.html
- spark-submit 参数调优 https://www.cnblogs.com/camilla/p/8301750.html
- https://stackoverflow.com/questions/24622108/apache-spark-the-number-of-cores-vs-the-number-of-executors
- -num-executors 任务总共运行多少个executor
- -executor-cores 每个excutor里的线程数 (也许是文件fd open数量limit,查到的情况都是 executor参数更高, excutor-cores参数低,这样提高打开句柄数量)
- -executor-memory 每个executor占用内存 最大本机内存/excutor 减去 10%
streaming 实时分析数据流
目录结构和上面一样
- build.sbt
name := "hello" // 项目名称 version := "0.0.1-SNAPSHOT" // 版本号 scalaVersion := "2.11.8" // 和安装的sbt内的一致,查看sbt内scala版本号: [hadoop@xhb-master bin]$ sbt scalVersion libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.1" // 2.4.1 和安装的spark保持一致 [hadoop@xhb-master bin]$ ll ../jars/ |grep streaming //会下载 spark-streaming_2.11-2.4.1 2.2.1+ 可支持kafka
hw.scala ``` import org.apache.spark._ import org.apache.spark.streaming._
object SimpleApp { def main(args: Array[String]) { // 创建一个local StreamingContext,包含2个工作线程,并将批次间隔设为1秒 // master至少需要2个CPU核,以避免出现任务饿死的情况 val conf = new SparkConf().setMaster(if(args.length >= 1) args(0) else "local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) // 处理间隔需要和业务相关,极大影响spark性能 val sc = ssc.sparkContext sc.setLogLevel("WARN") // 创建一个连接到hostname:port的DStream val lines = ssc.socketTextStream("10.2.35.117", 9999) val words = lines.flatMap(.split(" ")) val pairs = words.map((, 1)) val wordCounts = pairs.reduceByKey(_ + ) //如果源DStream 包含的元素为 (K, V) 键值对,则该算子返回一个新的也包含(K, V)键值对的DStream,其中V是由func聚合得到的。 https://spark-reference-doc-cn.readthedocs.io/zhCN/latest/programming-guide/streaming-guide.html wordCounts.print() ssc.start() // 启动流式计算 ssc.awaitTermination() // 等待直到计算终止 } } ```
- build.sbt
- 先启动tcp流服务器 [root@xhb-master hw]# nc -lkv 9999
- 启动spark-streaming 任务 ./spark-submit --class SimpleApp /data/scala-test/hw/target/scala-2.11/hello_2.11-0.0.1-SNAPSHOT.jar spark://10.2.35.117:7077 // spark master地址可选,如无则本地运行
- 任务会从流服务器读取数据,每秒进行一次分析单词数
- web查看任务状态 http://10.2.35.117:4040, 查看集群状态 http://10.2.35.117:8080/
streaming 全局有状态
build.sbt保持不变
hw.scala
``` import org.apache.spark._ import org.apache.spark.streaming._object SimpleApp { def main(args: Array[String]) { val conf = new SparkConf().setMaster(if(args.length >= 1) args(0) else "local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) ssc.checkpoint(if(args.length >= 1) "hdfs://10.2.35.117:9000/spark" else "./spark-cp") val sc = ssc.sparkContext sc.setLogLevel("WARN") val lines = ssc.socketTextStream("10.2.35.117", 9999) val words = lines.flatMap(.split(" ")) val pairs = words.map((, 1))
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
}val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => { val sum = one.getOrElse(0) + state.getOption.getOrElse(0) val output = (word, sum) state.update(sum) output } val stateCounter = pairs.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD)) stateCounter.checkpoint(Seconds(60)) // 好像无效 stateCounter.print() ssc.start() ssc.awaitTermination()
} ```
ssc.checkpoint(if(args.length >= 1) "hdfs://10.2.35.117:9000/spark" else "./spark") checkpoint的作用是故障恢复、将复杂的计算依赖存档。使用有状态转换如mapWithState必须设置好checkpoint.需要为所有节点都能访问的nfs mount或者hdfs地址。https://blog.csdn.net/Anbang713/article/details/82047980
https://spark-reference-doc-cn.readthedocs.io/zh_CN/latest/programming-guide/streaming-guide.html#id13- 上面任务会统计出从开始到现在的,当前输入的单词的总数
- http://10.2.35.117:50070/explorer.html 查看hdfs目录
streaming 滑动窗口 https://blog.csdn.net/legotime/article/details/51836040
hw.scala
``` import org.apache.spark._ import org.apache.spark.streaming._object SimpleApp { def main(args: Array[String]) { val conf = new SparkConf().setMaster(if(args.length >= 1) args(0) else "local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) ssc.checkpoint(if(args.length >= 1) "hdfs://10.2.35.117:9000/spark" else "./spark-checkpoint") val sc = ssc.sparkContext sc.setLogLevel("WARN") val lines = ssc.socketTextStream("10.2.35.117", 9999) val words = lines.flatMap(.split(" ")) val pairs = words.map((, 1))
val last_10_counter = pairs.reduceByKeyAndWindow(_+_, Seconds(10)) // 最近10秒 last_10_counter.print()
}ssc.start() ssc.awaitTermination()
} ```
- 统计最近10秒内出现的次数, 窗口时间需要为streaming context频率的倍数
rdd -> dataframe -> dataset
streaming 输出
- DStreaming.print() 在任务driver端打印出
- DStreaming.saveAsTextFiles(prefix, suffix) 会在路径里生成 prefix-TIMEINMS.suffix 目录,可以是hdfs路径。如果是本地路径而master是集群则驱动器节点本地只会得到任务操作的success结果,worker节点保存着数据。数据的格式是离散的。每个part-000* 文件为rdd文件的partition, partition数量和DStream操作有关 http://10.2.35.117:50070/explorer.html#/spark/20190531143358/test-1559284470000
- foreachRDD 通过它将DStream内rdd的数据写到外部系统如tcp
错误恢复
hw.scala
``` import org.apache.spark._ import org.apache.spark.streaming._object SimpleApp { val checkpointDir = "./spark-cp" def functionToCreateContext(): StreamingContext = { val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(2)) // new context val sc = ssc.sparkContext sc.setLogLevel("WARN") ssc.checkpoint(checkpointDir)
val lines = ssc.socketTextStream("10.2.35.117", 9999) // create DStreams lines.checkpoint(Seconds(10))
} def main(args: Array[String]) { val ssc = StreamingContext.getOrCreate(checkpointDir, functionToCreateContext _) ssc.start() ssc.awaitTermination() }val words = lines.flatMap(_.split(" ")) val pairs = words.map((_, 1)) val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1))) // 自己捏一个rdd,平时测试时可以不用读取外部数据 val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => { val sum = one.getOrElse(0) + state.getOption.getOrElse(0) val output = (word, sum) state.update(sum) output } val stateCounter = pairs.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD)) stateCounter.print() ssc
} ```
- 当driver进程退出重启后会恢复到崩溃前的状态. https://blog.csdn.net/yjgithub/article/details/78792616
- 目前错误恢复从checkpoint恢复后日志等级变成默认了
- 需要注意: 如果是集群任务,重启时需要额外增加集群地址参数 --master spark://10.2.35.117
利用连接池发送数据. 目前最高效的方式
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.commons.pool2.impl.{DefaultPooledObject, GenericObjectPool}
import org.apache.commons.pool2.{ObjectPool, PooledObject, BasePooledObjectFactory}
import java.text.SimpleDateFormat
import java.util.Date
import java.io.PrintStream
import java.net.Socket
class ManagedPrintStream(private val pool: ObjectPool[PrintStream], val printStream: PrintStream) {
def release() = pool.returnObject(printStream)
}
object PrintStreamPool {
var hostPortPool: Map[(String, Int), ObjectPool[PrintStream]] = Map()
sys.addShutdownHook {
hostPortPool.values.foreach { pool => pool.close() }
}
// factory method
def apply(host: String, port: Int): ManagedPrintStream = {
val pool = hostPortPool.getOrElse((host, port), {
val p = new GenericObjectPool[PrintStream](new SocketStreamFactory(host, port))
hostPortPool += (host, port) -> p
p
})
new ManagedPrintStream(pool, pool.borrowObject())
}
}
class SocketStreamFactory(host: String, port: Int) extends BasePooledObjectFactory[PrintStream] {
override def create() = new PrintStream(new Socket(host, port).getOutputStream)
override def wrap(stream: PrintStream) = new DefaultPooledObject[PrintStream](stream)
override def validateObject(po: PooledObject[PrintStream]) = ! po.getObject.checkError()
override def destroyObject(po: PooledObject[PrintStream]) = po.getObject.close()
override def passivateObject(po: PooledObject[PrintStream]) = po.getObject.flush()
}
object TimeHelper {
def strTimeNow():String={
val now = new Date()
val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
dateFormat.format(now)
}
}
object SimpleApp {
val checkpointDir = "./checkpoint"
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(2)) // new context
val sc = ssc.sparkContext
sc.setLogLevel("WARN")
ssc.checkpoint(checkpointDir)
val lines = ssc.socketTextStream("10.2.35.117", 9999) // create DStreams
lines.checkpoint(Seconds(10))
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}
val stateCounter = pairs.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD))
// foreachPartition rdd按分区数来使用连接,分区数受多种因素影响(读取方式,转换操作).一个案例是 20K消息/秒,64个分区,2秒频率,如果使用简单的连接方式会导致每两秒连接断开服务64次,每次连接破军发送600次左右的数据。
// 使用连接池的话可能每个jvm始终只连接一次
stateCounter.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = PrintStreamPool("10.2.35.118", 10000) //
partitionOfRecords.foreach(record => connection.printStream.println(record))
connection.release()
}
}
ssc
}
def main(args: Array[String]) {
val ssc = StreamingContext.getOrCreate(checkpointDir, functionToCreateContext _)
ssc.start()
ssc.awaitTermination()
}
}
需要启动另外个tcp服务器模拟接收端 nc -lkv 10000 build.sbt
name := "hello" // 项目名称
version := "0.0.1-SNAPSHOT" // 版本号
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.1"
libraryDependencies += "org.apache.commons" % "commons-pool2" % "2.6.2"
提交命令, 通过--jars参数添加依赖的jar包, 可以通过sbt fatjar方式打包避免
./spark-submit --jars /root/.ivy2/cache/org.apache.commons/commons-pool2/jars/commons-pool2-2.6.2.jar --class SimpleApp /data/scala-test/hw/target/scala-2.11/hello_2.11-0.0.1-SNAPSHOT.jar
sbt assembly 将依赖包打进主包
- touch project/assembly.sbt
- echo "addSbtPlugin(\"com.eed3si9n\" % \"sbt-assembly\" % \"0.14.9\")" > project/assembly.sbt
- 在build.sbt里增加合并策略防止多重定义错误
assemblyMergeStrategy in assembly := { case PathList("org","aopalliance", xs @ _*) => MergeStrategy.last case PathList("javax", "inject", xs @ _*) => MergeStrategy.last case PathList("javax", "servlet", xs @ _*) => MergeStrategy.last case PathList("javax", "activation", xs @ _*) => MergeStrategy.last case PathList("org", "apache", xs @ _*) => MergeStrategy.last case PathList("com", "google", xs @ _*) => MergeStrategy.last case PathList("com", "esotericsoftware", xs @ _*) => MergeStrategy.last case PathList("com", "codahale", xs @ _*) => MergeStrategy.last case PathList("com", "yammer", xs @ _*) => MergeStrategy.last case "about.html" => MergeStrategy.rename case "META-INF/ECLIPSEF.RSA" => MergeStrategy.last case "META-INF/mailcap" => MergeStrategy.last case "META-INF/mimetypes.default" => MergeStrategy.last case "plugin.properties" => MergeStrategy.last case "log4j.properties" => MergeStrategy.last case x => val oldStrategy = (assemblyMergeStrategy in assembly).value oldStrategy(x) }
- sbt assembly 得出fat包,80+M, 对比之前包大小约50倍
- ./spark-submit --class SimpleApp /data/scala-test/hw/target/scala-2.11/hello-assembly-0.0.1-SNAPSHOT.jar
使用Direct Stream方式从kafka里读
kafka安装、常见操作在 kafka.md
https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html
build.sbt
name := "hello" // 项目名称
version := "0.0.1-SNAPSHOT" // 版本号
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.1"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.4.1"
// 需要注意2.4.1与当前spark版本一直
./spark-submit --jars /root/.ivy2/cache/org.apache.kafka/kafka-clients/jars/kafka-clients-0.10.0.1.jar,/root/.ivy2/cache/org.apache.spark/spark-streaming-kafka-0-10_2.11/jars/spark-streaming-kafka-0-10_2.11-2.4.1.jar --class SimpleApp --master spark://10.2.35.117:7077 /data/scala-test/hw/target/scala-2.11/hello_2.11-0.0.1-SNAPSHOT.jar
// 在 sbt package时会下载依赖包到 ~/.ivy/cache里
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
object SimpleApp {
def main(args: Array[String]) {
val checkpointDir = "hdfs://10.2.35.117:9000/spark-cp"
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(2)) // new context
ssc.checkpoint(checkpointDir)
val brokers = "10.2.35.117:9092"
val groupId = "gtest"
val topics = "test31"
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}
val stateCounter = pairs.mapWithState(StateSpec.function(mappingFunc)/*.initialState(initialRDD)*/)
stateCounter.print
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir, functionToCreateContext _)
val sc = ssc.sparkContext
sc.setLogLevel("WARN")
ssc.start()
ssc.awaitTermination()
}
}
往kafka里写
从test31里读取数据,处理后写入 test31-out主题里 如果服务器没配置自动创建新topic则需要先手动创建
[root@xhb-master kafka_2.11-2.2.1]# bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic test31-out
build.sbt
name := "hello" // 项目名称
version := "0.0.1-SNAPSHOT" // 版本号
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.4.1"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.4.1"
hw.scala
import java.util.Properties
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{StringDeserializer,StringSerializer}
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {
lazy val producer = createProducer()
def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
}
object KafkaSink {
def apply(config: Properties): KafkaSink = {
val f = () => {
val producer = new KafkaProducer[String, String](config)
sys.addShutdownHook {
producer.close()
}
producer
}
new KafkaSink(f)
}
}
object SimpleApp {
def main(args: Array[String]) {
val checkpointDir = if(args.length >= 1) "hdfs://10.2.35.117:9000/spark-cp" else "./spark-cp"
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf().setMaster(if(args.length >= 1) args(0) else "local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(2)) // new context
ssc.checkpoint(checkpointDir)
val brokers = "localhost:9092"
val groupId = "gtest"
val topics = "test31"
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}
val stateCounter = pairs.mapWithState(StateSpec.function(mappingFunc).initialState(initialRDD))
val serializer = "org.apache.kafka.common.serialization.StringSerializer"
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", serializer)
props.put("value.serializer", serializer)
val kafkaSink = ssc.sparkContext.broadcast(KafkaSink(props))
stateCounter.foreachRDD { rdd =>
rdd.foreach { message =>
val (word, num) = message
kafkaSink.value.send("test31-out", f"$word%s->$num%d")
}
}
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir, functionToCreateContext _)
val sc = ssc.sparkContext
sc.setLogLevel("WARN")
ssc.start()
ssc.awaitTermination()
}
}
目前每次重新打包,都不支持之前的版本的checkpoint(可以支持,需要删除目录里的一些文件),为了方便直接删除整个目录
./spark-submit --jars /root/.ivy2/cache/org.apache.kafka/kafka-clients/jars/kafka-clients-0.10.0.1.jar,/root/.ivy2/cache/org.apache.spark/spark-streaming-kafka-0-10_2.11/jars/spark-streaming-kafka-0-10_2.11-2.4.1.jar --class SimpleApp /data/scala-test/hw/target/scala-2.11/hello_2.11-0.0.1-SNAPSHOT.jar
启动个producer插入新消息
[root@xhb-master kafka_2.11-2.2.1]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test31
>hello
>a
>a
>a
>hello
启动个consumer查看scala处理后插入的新topic里的情况
[root@xhb-master kafka_2.11-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test31-out --from-beginning
hello->2
a->1
a->2
a->3
hello->3
上面代码在重启任务时会出现 "java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to KafkaSink" 是因为 The cause is Streaming checkpointing doesn't support Accumulators and Broadcast values. 所以根据官方文档 https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
改写为
import java.util.Properties
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.broadcast.Broadcast
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{StringDeserializer,StringSerializer}
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {
lazy val producer = createProducer()
def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
}
object KafkaSinkInstance {
@volatile private var instance: Broadcast[KafkaSink] = null
private def conf(config: Properties): KafkaSink = {
val f = () => {
val producer = new KafkaProducer[String, String](config)
sys.addShutdownHook {
producer.close()
}
producer
}
new KafkaSink(f)
}
def getInstance(sc: SparkContext, config: Properties): Broadcast[KafkaSink] = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.broadcast(conf(config))
}
}
}
instance
}
}
object SimpleApp {
def main(args: Array[String]) {
val checkpointDir = "hdfs://10.2.35.117:9000/spark-cp"
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(2)) // new context
ssc.checkpoint(checkpointDir)
val brokers = "10.2.35.117:9092"
val groupId = "gtest"
val topics = "test31"
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)) // Subscribe有三个参数:topic列表、consumer配置项、topic+partition起始offset,其中fromOffsets是可选的。 如果fromoffset不为空,将会从该位置开始拉取,否则看 kafka的auto.offset.reset配置项值默认latest
val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}
val stateCounter = pairs.mapWithState(StateSpec.function(mappingFunc)/*.initialState(initialRDD)*/)
val serializer = "org.apache.kafka.common.serialization.StringSerializer"
val props = new Properties()
props.put("bootstrap.servers", "10.2.35.117:9092")
props.put("key.serializer", serializer)
props.put("value.serializer", serializer)
stateCounter.foreachRDD { rdd =>
val kafkaSink = KafkaSinkInstance.getInstance(rdd.sparkContext, props)
rdd.foreach { message =>
val (word, num) = message
kafkaSink.value.send("test31-out", f"$word%s->$num%d")
}
}
ssc
}
val ssc = StreamingContext.getOrCreate(checkpointDir, functionToCreateContext _)
val sc = ssc.sparkContext
sc.setLogLevel("WARN")
ssc.start()
ssc.awaitTermination()
}
}
问题: 非正常情况下,如何避免kafka的消息堆积、丢失、重复
- 避免堆积消息 spark的消费速度应该大于kafka的producer速度, 调整spark-submit 性能参数
- 首次启动、故障恢复后启动,有大量未消费的消息,可能多到加载到内存不够而崩溃。 开启反压 SparkConf.set("spark.streaming.backpressure.enabled", "true")
- 设置合理的批处理间隔 val ssc = new StreamingContext(conf, Seconds(2)) https://www.jianshu.com/p/5c20e5bc402c
- spark设置消费速度 SparkConf.set("spark.streaming.kafka.maxRatePerPartition", ...)
- https://www.jianshu.com/p/c0b724137416
- 基于kafka时间戳索引读取数据(比如重启后只消费半小时前的消息) https://blog.csdn.net/weixin_34379433/article/details/87253473
- https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
- 将offset存储在 checkpoint内(可能会有streaming的批次周期的重复数据产生),存在kafka本身(默认开启,没和streaming 业务有事务关系,也可能产生重复数据),存在外部系统(自己处理事务能实现准确一次)
.initialState(initialRDD)
(2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758.
sbt + jni + cpp
- ./project/plugins.sbt
addSbtPlugin("ch.jodersky" % "sbt-jni" % "1.3.2")
- sbt compile // 下载依赖
- sbt javah // 生成头文件
- 实现头文件
```
!/bin/sh
g++ -fPIC -shared -O3 \ -I/usr/include \ -I$JAVAHOME/include \ -I$JAVAHOME/include/linux \ CppCls.cpp -o libCppCls.so ```
- 将so部署到所有work节点,将所在目录加入LDLIBRARYPATH 或者复制到/usr/lib 、 /usr/lib64等目录
正常运行spark任务
``` ./spark-submit --jars /root/.ivy2/cache/org.apache.kafka/kafka-clients/jars/kafka-clients-0.10.0.1.jar,/root/.ivy2/cache/org.apac.spark/spark-streaming-kafka-0-102.11/jars/spark-streaming-kafka-0-102.11-2.4.1.jar --class SimpleApp --master spark://10.2.35.117:7077 /data/scala-test/hw/target/scala-2.11/hello_2.11-0.0.1-SNAPSHOT.jar```
import java.util.Properties
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.broadcast.Broadcast
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.{StringDeserializer,StringSerializer}
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer}
class CppCls {
// --- Native methods
@native def intMethod(n: Int): Int
@native def booleanMethod(b: Boolean): Boolean
@native def stringMethod(s: String): String
@native def intArrayMethod(a: Array[Int]): Int
}
class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {
lazy val producer = createProducer()
def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
}
object KafkaSinkInstance {
@volatile private var instance: Broadcast[KafkaSink] = null
private def conf(config: Properties): KafkaSink = {
val f = () => {
val producer = new KafkaProducer[String, String](config)
sys.addShutdownHook {
producer.close()
}
producer
}
new KafkaSink(f)
}
def getInstance(sc: SparkContext, config: Properties): Broadcast[KafkaSink] = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.broadcast(conf(config))
}
}
}
instance
}
}
/*
object LibraryLoader {
lazy val load = System.load(SparkFiles.get("libCppCls.so"))
}
*/
object SimpleApp {
def main(args: Array[String]) {
val checkpointDir = "hdfs://10.2.35.117:9000/spark-cp"
def functionToCreateContext(): StreamingContext = {
val conf = new SparkConf().setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(2)) // new context
ssc.checkpoint(checkpointDir)
val brokers = "10.2.35.117:9092"
val groupId = "gtest"
val topics = "test31"
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer])
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)) // Subscribe有三个参数:topic列表、consumer配置项、topic+partition起始offset,其中fromOffsets是可选的。 如果fromoffset不为空,将会从该位置开始拉取,否则看 kafka的auto.offset.reset配置项值默认latest
val lines = messages.map(_.value)
val words = lines.flatMap(_.split(" "))
val pairs = words.map((_, 1))
val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1)))
val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
val output = (word, sum)
state.update(sum)
output
}
val stateCounter = pairs.mapWithState(StateSpec.function(mappingFunc)/*.initialState(initialRDD)*/)
// run in driver.
val sample = new CppCls
val square = sample.intMethod(5)
val bool = sample.booleanMethod(true)
val sum = sample.intArrayMethod(Array(1, 1, 2, 3, 5, 8, 13))
println(s"intMethod: $square")
println(s"booleanMethod: $bool")
println(s"intArrayMethod: $sum")
val serializer = "org.apache.kafka.common.serialization.StringSerializer"
val props = new Properties()
props.put("bootstrap.servers", "10.2.35.117:9092")
props.put("key.serializer", serializer)
props.put("value.serializer", serializer)
stateCounter.foreachRDD { rdd =>
val kafkaSink = KafkaSinkInstance.getInstance(rdd.sparkContext, props)
rdd.foreach { message =>
val (word, num) = message
System.loadLibrary("CppCls")
// LibraryLoader.load
val remote = new CppCls
kafkaSink.value.send("test31-out", remote.stringMethod(f"$word%s->$num%d"))
}
}
ssc
}
System.loadLibrary("CppCls")
val ssc = StreamingContext.getOrCreate(checkpointDir, functionToCreateContext _)
val sc = ssc.sparkContext
sc.setLogLevel("WARN")
ssc.start()
ssc.awaitTermination()
}
}