本文共 5188 字,大约阅读时间需要 17 分钟。
流式计算中消息处理的语义有三种:
at-least-once(至少一次):消息不会丢失,但可能被处理多次。 at-most-once(至多一次):消息可能丢失也可能被处理,但最多只会被处理一次。 exactly-once(精准一次):消息被处理且只会被处理一次。流式计算实现exactly-once,一般满足以下三个要求即可:
1.source支持replay(kafka满足) 2.计算引擎本身处理能保证at-least-once(spark满足) 3.sink支持幂等性写入或事务更新幂等性写入
幂等性原来是数学里的概念,即f(f(x))=f(x)。幂等写入就是写入多次与写入一次的结果完全相同,可以自动将at-least-once转化为exactly once。这对于自带主键或主键组的业务比较合适(如:各类日志、MySQL binlog),并且实现起来简单但是它要求处理逻辑是map-only的,也就是只能包含转换、过滤等操作,不能包含shuffle、聚合等操作。如果条件更严格,就只能采用事务性写入方法
事务更新
这里的事务和DBMS中的事务含义基本相同,就是对数据进行一系列访问与更新操作所组成的逻辑块。在spark streaming中一般是数据写入与kafka offset的更新组成一个事务。下面以wordcount为例实现exactly-once处理语义,数据源为kafka。处理结果和offset都存储在mysql中。
建表
// 存储kafka_offsetCREATE TABLE `kafka_offset` ( `topic` varchar(64) NOT NULL, `group_id` varchar(64) NOT NULL, `partition_id` int(11) NOT NULL, `begin_offset` bigint(20) NOT NULL, `end_offset` bigint(20) NOT NULL, PRIMARY KEY (`topic`, `group_id`, `partition_id`) USING BTREE) ENGINE = InnoDB ;// 存储wordcount的结果CREATE TABLE `wordcount_result` ( `word` varchar(255) NOT NULL, `count` int(11) NOT NULL, PRIMARY KEY (`word`) USING BTREE) ENGINE = InnoDB
添加maven依赖
org.apache.spark spark-streaming_2.11 2.3.0 org.apache.spark spark-streaming-kafka-0-10_2.11 2.3.0 org.scalikejdbc scalikejdbc_2.11 3.1.0 org.scalikejdbc scalikejdbc-config_2.11 3.1.0 mysql mysql-connector-java 5.1.47
配置mysql数据库连接信息(application.conf放入到项目resources目录下)
application.confdb.default.driver="com.mysql.jdbc.Driver"db.default.url="jdbc:mysql://192.168.61.137:3306/test?characterEncoding=utf-8"db.default.user="root"db.default.password="123456@Abc"dataSourceClassName=com.mysql.jdbc.jdbc2.optional.MysqlDataSource
代码
import org.apache.kafka.clients.consumer.ConsumerRecordimport org.apache.kafka.common.TopicPartitionimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.streaming.dstream.InputDStreamimport org.apache.spark.streaming.kafka010._import org.apache.spark.streaming.{ Seconds, StreamingContext}import org.apache.spark.{ SparkConf, TaskContext}import scalikejdbc.{ SQL, _}import scalikejdbc.config._object KafkaOffsetMysql { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("kafka-mysql").setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(5)) val brokers = "192.168.61.136:9092" val topics = "test1" val groupId = "consumer1" val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, Object]( "bootstrap.servers" -> brokers, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean) ) DBs.setup() // 从mysql读取offset信息 val fromOffset = DB.readOnly(implicit session => { SQL("select * from kafka_offset").map(rs => { new TopicPartition(rs.string("topic"), rs.int("partition_id")) -> rs.long("end_offset") }).list().apply() }).toMap println(fromOffset) //如果MySQL表中没有offset信息,就从0开始消费;如果有,就从已经存在的offset开始消费 val messages:InputDStream[ConsumerRecord[String, String]] = if (fromOffset.isEmpty) { KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams) ) } else { KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Assign[String, String](fromOffset.keys.toList, kafkaParams, fromOffset) ) } messages.foreachRDD(rdd => { if (!rdd.isEmpty()){ val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges val result = rdd.map(_.value()).flatMap(_.split(",")).map((_,1)).reduceByKey(_ + _).collect() DB.localTx(implicit session=>{ result.foreach( row => { // 将结果数据插入mysql print(row._1, row._2) SQL("replace into wordcount_result(word, count) values (?, ?)").bind( row._1, row._2).update.apply() } ) // 提交offset for (o <- offsetRanges) { SQL("replace into kafka_offset(topic, group_id, partition_id, begin_offset, end_offset) values ( ?,?,?,?,?)") .bind(o.topic, "consumer1", o.partition.toInt, o.fromOffset.toLong, o.untilOffset.toLong) .update().apply() } }) } }) ssc.start() ssc.awaitTermination() }}
创建topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 3 --topic test1
启动生产者向topic发送消息
./bin/kafka-console-producer.sh --broker-list 192.168.61.136:9092 --topic test1>hello,world
运行结果:
select * from kafka_offsetslect * from wordcount_result
结果已经成功写入到mysql中。转载地址:http://lrcmb.baihongyu.com/