博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark Streaming整合kafka实现ExactlyOnce语义
阅读量:2426 次
发布时间:2019-05-10

本文共 5188 字,大约阅读时间需要 17 分钟。

流式系统消息处理语义

流式计算中消息处理的语义有三种:

at-least-once(至少一次):消息不会丢失,但可能被处理多次。
at-most-once(至多一次):消息可能丢失也可能被处理,但最多只会被处理一次。
exactly-once(精准一次):消息被处理且只会被处理一次。

流式计算实现exactly-once,一般满足以下三个要求即可:

1.source支持replay(kafka满足)
2.计算引擎本身处理能保证at-least-once(spark满足)
3.sink支持幂等性写入或事务更新

幂等性写入

幂等性原来是数学里的概念,即f(f(x))=f(x)。

幂等写入就是写入多次与写入一次的结果完全相同,可以自动将at-least-once转化为exactly once。这对于自带主键或主键组的业务比较合适(如:各类日志、MySQL binlog),并且实现起来简单但是它要求处理逻辑是map-only的,也就是只能包含转换、过滤等操作,不能包含shuffle、聚合等操作。如果条件更严格,就只能采用事务性写入方法

事务更新

这里的事务和DBMS中的事务含义基本相同,就是对数据进行一系列访问与更新操作所组成的逻辑块。在spark streaming中一般是数据写入与kafka offset的更新组成一个事务。

2.spark streaming实现exactly-once

下面以wordcount为例实现exactly-once处理语义,数据源为kafka。处理结果和offset都存储在mysql中。

建表

// 存储kafka_offsetCREATE TABLE `kafka_offset`  (  `topic` varchar(64) NOT NULL,  `group_id` varchar(64) NOT NULL,  `partition_id` int(11) NOT NULL,  `begin_offset` bigint(20) NOT NULL,  `end_offset` bigint(20) NOT NULL,  PRIMARY KEY (`topic`, `group_id`, `partition_id`) USING BTREE) ENGINE = InnoDB ;// 存储wordcount的结果CREATE TABLE `wordcount_result`  (  `word` varchar(255)  NOT NULL,  `count` int(11)  NOT NULL,  PRIMARY KEY (`word`) USING BTREE) ENGINE = InnoDB

添加maven依赖

org.apache.spark
spark-streaming_2.11
2.3.0
org.apache.spark
spark-streaming-kafka-0-10_2.11
2.3.0
org.scalikejdbc
scalikejdbc_2.11
3.1.0
org.scalikejdbc
scalikejdbc-config_2.11
3.1.0
mysql
mysql-connector-java
5.1.47

配置mysql数据库连接信息(application.conf放入到项目resources目录下)

application.conf

db.default.driver="com.mysql.jdbc.Driver"db.default.url="jdbc:mysql://192.168.61.137:3306/test?characterEncoding=utf-8"db.default.user="root"db.default.password="123456@Abc"dataSourceClassName=com.mysql.jdbc.jdbc2.optional.MysqlDataSource

代码

import org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.common.TopicPartitionimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.streaming.dstream.InputDStreamimport org.apache.spark.streaming.kafka010._import org.apache.spark.streaming.{
Seconds, StreamingContext}import org.apache.spark.{
SparkConf, TaskContext}import scalikejdbc.{
SQL, _}import scalikejdbc.config._object KafkaOffsetMysql {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("kafka-mysql").setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(5)) val brokers = "192.168.61.136:9092" val topics = "test1" val groupId = "consumer1" val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, Object]( "bootstrap.servers" -> brokers, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) DBs.setup() // 从mysql读取offset信息 val fromOffset = DB.readOnly(implicit session => {
SQL("select * from kafka_offset").map(rs => {
new TopicPartition(rs.string("topic"), rs.int("partition_id")) -> rs.long("end_offset") }).list().apply() }).toMap println(fromOffset) //如果MySQL表中没有offset信息,就从0开始消费;如果有,就从已经存在的offset开始消费 val messages:InputDStream[ConsumerRecord[String, String]] = if (fromOffset.isEmpty) {
KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams) ) } else {
KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Assign[String, String](fromOffset.keys.toList, kafkaParams, fromOffset) ) } messages.foreachRDD(rdd => {
if (!rdd.isEmpty()){
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges val result = rdd.map(_.value()).flatMap(_.split(",")).map((_,1)).reduceByKey(_ + _).collect() DB.localTx(implicit session=>{
result.foreach( row => {
// 将结果数据插入mysql print(row._1, row._2) SQL("replace into wordcount_result(word, count) values (?, ?)").bind( row._1, row._2).update.apply() } ) // 提交offset for (o <- offsetRanges) {
SQL("replace into kafka_offset(topic, group_id, partition_id, begin_offset, end_offset) values ( ?,?,?,?,?)") .bind(o.topic, "consumer1", o.partition.toInt, o.fromOffset.toLong, o.untilOffset.toLong) .update().apply() } }) } }) ssc.start() ssc.awaitTermination() }}

创建topic

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic test1

启动生产者向topic发送消息

./bin/kafka-console-producer.sh --broker-list 192.168.61.136:9092 --topic test1>hello,world

运行结果:

select * from kafka_offset
在这里插入图片描述

slect * from wordcount_result

在这里插入图片描述
结果已经成功写入到mysql中。

转载地址:http://lrcmb.baihongyu.com/

你可能感兴趣的文章
Redis运维和开发学习笔记(5) 主从复制和sentinel哨兵模式
查看>>
Redis运维和开发学习笔记(6) 监控Redis工作状态-info命令
查看>>
Redis运维和开发学习笔记(7) 内存管理和过期策略
查看>>
Redis源码分析(零)学习路径笔记
查看>>
Redis源码分析(一)redis.c //redis-server.c
查看>>
Redis源码分析(二)redis-cli.c
查看>>
redis源码剖析(三)——基础数据结构
查看>>
redis源码剖析(四)跳表
查看>>
redis源码剖析(五)—— 字符串,列表,哈希,集合,有序集合
查看>>
redis源码剖析(六)—— Redis 数据库、键过期的实现
查看>>
redis源码剖析(七)—— Redis 数据结构dict.c
查看>>
redis源码剖析(八)—— 当你启动Redis的时候,Redis做了什么
查看>>
redis源码剖析(九)—— Redis双链表实现
查看>>
redis源码剖析(十一)—— Redis字符串相关函数实现
查看>>
事务隔离级别动图演示
查看>>
mysql row_id为什么是6字节?为什么是8字节
查看>>
伪随机数和真随机数
查看>>
ps -ef和ps aux
查看>>
Linux中screen的用法
查看>>
linux查看硬盘是不是ssd固态硬盘
查看>>