kafka + spark streaming(1)

之前写过一个 kafka + spark streaming 的测试用例,但是当时没有记录下来,这部分的东西还是很重要的,需要找时间回头补上

看到这里有一个教程, http://colobu.com/2015/01/05/k… 写的挺不错

——————-

2018-11-12 21:26:25 追加

终于来补上这个坑,一晃都快一年过去了,真的是。。。

参考这里安装好 kafka, https://segmentfault.com/a/119… ,我们使用的是 0.11 的版本,这里主要考虑,kafka 在跨入 1.0 版本号之后的改动比较大,为了不忘传统,也为了更好的模拟实际使用场景,先熟悉 1.0 之前的版本再说

首先安装好单机版,计划先把单机版能跑起来,重点是把后面的 spark streaming 的环节整体能贯通,抓住主要矛盾,然后再回头来折腾 kafka 的单机伪分布式和多机版本

很顺利的就跑通了在命令行下的创建 topic,生产,消费,查看等命令,这个感觉还是很好的,哈哈

然后开始试着在 spark streaming 中试着消费一下,这个时候就发现在 pom.xml 中加上 streaming 的依赖开始出问题了,maven update 出不来了,查了一下,说是在公司需要配置一个代理,但是配置了,也依然没用,又怀疑是配置没有 reload 生效,查到这里, https://blog.csdn.net/hello5or… ,说

在Preferences–>Maven–>User Settings中,点击Update Settings,加载刚才我们对settings.xml的更改

但是照着做了,也依然没有什么卵用,于是又仔细去看 error log,看到有一句提示

Description	Resource	Path	Location	Type
The container 'Maven Dependencies' references non existing library 'C:\Users\adenzhang\.m2\repository\org\apache\spark\spark-core_2.11\1.6.3\spark-core_2.11-1.6.3.jar'	test20181111		Build path	Build Path Problem

于是怀疑是之前的一些残留文件导致拉取失败,那么把这个目录整个都删除了,这次终于可以了,通过内网拉取包,速度还是杠杠的

然后参考这里, https://blog.csdn.net/WinWill2… 加上依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>1.5.2</version>
</dependency>

不过我加的是 1.6.3 版本的

streaming 的读取 socket 没问题之后,就参考这里 https://www.cnblogs.com/xlturi… 加上 kafka 的 dependency

<dependency><!-- Spark Streaming Kafka -->
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.10</artifactId>
    <version>1.6.3</version>
</dependency>

然后开始写 kafka 的消费代码,参考这里, https://www.ibm.com/developerw… ,但是原来的排版乱了,重新排了一下,如下

package me.zrj.test.test20181111
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.HashPartitioner
import org.apache.spark.streaming.Duration

object WebPagePopularityValueCalculator {
  private val checkpointDir = "popularity-data-checkpoint"
  private val msgConsumerGroup = "user-behavior-topic-message-consumer-group"

  def main(args: Array[String]) {
    if (args.length < 2) {
      println("Usage:WebPagePopularityValueCalculator zkserver1:2181,zkserver2:2181,zkserver3:2181 consumeMsgDataTimeInterval(secs)")
      System.exit(1)
    }
    val Array(zkServers, processingInterval) = args
    val conf = new SparkConf().setAppName("Web Page Popularity Value Calculator")
    val ssc = new StreamingContext(conf, Seconds(processingInterval.toInt))
    //using updateStateByKey asks for enabling checkpoint
    ssc.checkpoint(checkpointDir)
    val kafkaStream = KafkaUtils.createStream(
      //Spark streaming context
      ssc,
      //zookeeper quorum. e.g zkserver1:2181,zkserver2:2181,...
      zkServers,
      //kafka message consumer group ID
      msgConsumerGroup,
      //Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread
      Map("user-behavior-topic" -> 3))
    val msgDataRDD = kafkaStream.map(_._2)
    //for debug use only
    //println("Coming data in this interval...")
    //msgDataRDD.print()
    // e.g page37|5|1.5119122|-1
    val popularityData = msgDataRDD.map { msgLine =>
      {
        val dataArr: Array[String] = msgLine.split("\\|")
        val pageID = dataArr(0)
        //calculate the popularity value
        val popValue: Double = dataArr(1).toFloat * 0.8 + dataArr(2).toFloat * 0.8 + dataArr(3).toFloat * 1
        (pageID, popValue)
      }
    }
    //sum the previous popularity value and current value
    val updatePopularityValue = (iterator: Iterator[(String, Seq[Double], Option[Double])]) => {
      iterator.flatMap(t => {
        val newValue: Double = t._2.sum
        val stateValue: Double = t._3.getOrElse(0);
        Some(newValue + stateValue)
      }.map(sumedValue => (t._1, sumedValue)))
    }
    val initialRDD = ssc.sparkContext.parallelize(List(("page1", 0.00)))
    val stateDstream = popularityData.updateStateByKey[Double](updatePopularityValue,
      new HashPartitioner(ssc.sparkContext.defaultParallelism), true, initialRDD)
    //set the checkpoint interval to avoid too frequently data checkpoint which may
    //may significantly reduce operation throughput
    stateDstream.checkpoint(Duration(8 * processingInterval.toInt * 1000))
    //after calculation, we need to sort the result and only show the top 10 hot pages
    stateDstream.foreachRDD { rdd =>
      {
        val sortedData = rdd.map { case (k, v) => (v, k) }.sortByKey(false)
        val topKData = sortedData.take(10).map { case (v, k) => (k, v) }
        topKData.foreach(x => {
          println(x)
        })
      }
    }
    ssc.start()
    ssc.awaitTermination()
  }
}

但是上述的代码逻辑比较复杂,我的简单很多

  def kafkaStreaming() {
    // 创建StreamingContext,1秒一个批次
    val ssc = new StreamingContext(new SparkConf().setMaster("local[2]").setAppName("kafka-streaming-1112"), Seconds(5));
    ssc.checkpoint("file:///D://spark-tmp")
    val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum="192.168.56.101:2181"
        , groupId="test-topic-1112-consumer-group", topics=Map("test" -> 1))
    kafkaStream.print()
    val dstreamkafka = kafkaStream.map(_._2)
    dstreamkafka.print()
    ssc.start()
    ssc.awaitTermination()
  }

跑起来报了个异常

18/11/12 20:27:12 INFO KafkaReceiver: Starting MessageHandler.
18/11/12 20:27:12 INFO VerifiableProperties: Verifying properties
18/11/12 20:27:12 INFO VerifiableProperties: Property client.id is overridden to test-topic-1112-consumer-group
18/11/12 20:27:12 INFO VerifiableProperties: Property metadata.broker.list is overridden to localhost.localdomain:9092
18/11/12 20:27:12 INFO VerifiableProperties: Property request.timeout.ms is overridden to 30000
18/11/12 20:27:12 INFO ClientUtils$: Fetching metadata from broker id:1,host:localhost.localdomain,port:9092 with correlation id 0 for 1 topic(s) Set(test)
18/11/12 20:27:12 INFO SyncProducer: Connected to localhost.localdomain:9092 for producing
18/11/12 20:27:12 INFO SyncProducer: Disconnecting from localhost.localdomain:9092
18/11/12 20:27:12 WARN ClientUtils$: Fetching topic metadata with correlation id 0 for topics [Set(test)] from broker [id:1,host:localhost.localdomain,port:9092] failed
java.nio.channels.ClosedChannelException
	at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
	at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
18/11/12 20:27:12 INFO SyncProducer: Disconnecting from localhost.localdomain:9092
18/11/12 20:27:12 WARN ConsumerFetcherManager$LeaderFinderThread: [test-topic-1112-consumer-group_adenzhang-PC2-1542025622919-8e16cf3b-leader-finder-thread], Failed to find leader for Set([test,0])
kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] from broker [ArrayBuffer(id:1,host:localhost.localdomain,port:9092)] failed
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:72)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)
	at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
Caused by: java.nio.channels.ClosedChannelException
	at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
	at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
	at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
	at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
	at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
	... 3 more
18/11/12 20:27:12 INFO ConsumerFetcherManager: [ConsumerFetcherManager-1542025632303] Added fetcher for partitions ArrayBuffer()

这个应该就是 id:1,host:localhost.localdomain,port:9092 这个配置有问题了,因为我的 kafka 是在 virtual box 里面的虚拟机的,而 eclipse 是在外面的 Windows 的,先 telnet 一下 9092 这个端口,确认是开着的,就拿着 localhost.localdomain 这个串去配置文件里搜一搜,发现居然没找到,那么就拆开来搜

分别搜到的配置文件分散在这些地方

[adenzhang@localhost config]$ grep "localhost" *
connect-distributed.properties:bootstrap.servers=localhost:9092
connect-standalone.properties:bootstrap.servers=localhost:9092
producer.properties:bootstrap.servers=localhost:9092
server.properties:zookeeper.connect=localhost:2181

[adenzhang@localhost config]$ grep "localdomain" *

[adenzhang@localhost config]$ grep 9092 *
connect-distributed.properties:bootstrap.servers=localhost:9092
connect-standalone.properties:bootstrap.servers=localhost:9092
producer.properties:bootstrap.servers=localhost:9092
server.properties:#     listeners = PLAINTEXT://your.host.name:9092
server.properties:#listeners=PLAINTEXT://:9092
server.properties:#advertised.listeners=PLAINTEXT://your.host.name:9092

localdomain 这个神奇的居然没有搜到,难道是代码中自动加上的?

不管那么多,先把这几个文件中的配置都给改成 ip 再说,完了重启 kafka,再启动 spark streaming 看看,发现还是不行,那只能放狗搜了

看到这里, https://stackoverflow.com/ques… ,说

The broker tells the client which hostname should be used to produce/consume messages. By default Kafka uses the hostname of the system it runs on. If this hostname can not be resolved by the client side you get this exception.

You can try setting advertised.host.name in the Kafka configuration to an hostname/address which the clients should use.

但是问题是 advertised.host.name 这个配置项貌似并没有找到,甚至被注释起来的也没有,如果一个配置项这么重要的话,感觉应该会有留痕才对的,于是感觉是不是版本问题,看到下面第二高票的答案是

Here is my way to solve this problem:

run bin/kafka-server-stop.sh to stop running kafka server.
modify the properties file config/server.properties by adding a line: listeners=PLAINTEXT://{ip.of.your.kafka.server}:9092
restart kafka server.
Since without the lisener setting, kafka will use java.net.InetAddress.getCanonicalHostName() to get the address which the socket server listens on.

搜了一下,这个还是有的,而且就在 server.properties 的靠前位置,默认是这样的

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

这个看起来比较靠谱,而且下面这个 advertised.listeners 其实不用改,因为他说了会用 listeners 的值,那么只改 listeners 就可以了

这回可以了,但是在尝试用命令行去生产数据的时候报错

[2018-11-12 21:14:20,006] WARN Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

看了下,命令行是

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

改成

bin/kafka-console-producer.sh --broker-list 192.168.56.101:9092 --topic test

就可以了

日志输出如下


18/11/12 21:16:25 INFO JobScheduler: Finished job streaming job 1542028585000 ms.0 from job set of time 1542028585000 ms
18/11/12 21:16:25 INFO JobScheduler: Starting job streaming job 1542028585000 ms.1 from job set of time 1542028585000 ms
18/11/12 21:16:25 INFO SparkContext: Starting job: print at SSTest20181111.scala:85
18/11/12 21:16:25 INFO DAGScheduler: Got job 7 (print at SSTest20181111.scala:85) with 1 output partitions
18/11/12 21:16:25 INFO DAGScheduler: Final stage: ResultStage 7 (print at SSTest20181111.scala:85)
18/11/12 21:16:25 INFO DAGScheduler: Parents of final stage: List()
18/11/12 21:16:25 INFO DAGScheduler: Missing parents: List()
18/11/12 21:16:25 INFO DAGScheduler: Submitting ResultStage 7 (MapPartitionsRDD[108] at map at SSTest20181111.scala:84), which has no missing parents
18/11/12 21:16:25 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 1648.0 B, free 2.4 GB)
18/11/12 21:16:25 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 1097.0 B, free 2.4 GB)
18/11/12 21:16:25 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on localhost:49533 (size: 1097.0 B, free: 2.4 GB)
18/11/12 21:16:25 INFO SparkContext: Created broadcast 7 from broadcast at DAGScheduler.scala:1006
18/11/12 21:16:25 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 7 (MapPartitionsRDD[108] at map at SSTest20181111.scala:84)
18/11/12 21:16:25 INFO TaskSchedulerImpl: Adding task set 7.0 with 1 tasks
18/11/12 21:16:25 INFO TaskSetManager: Starting task 0.0 in stage 7.0 (TID 9, localhost, partition 0,NODE_LOCAL, 1936 bytes)
18/11/12 21:16:25 INFO Executor: Running task 0.0 in stage 7.0 (TID 9)
18/11/12 21:16:25 INFO BlockManager: Found block input-0-1542028583000 locally
18/11/12 21:16:25 INFO Executor: Finished task 0.0 in stage 7.0 (TID 9). 937 bytes result sent to driver
18/11/12 21:16:25 INFO TaskSetManager: Finished task 0.0 in stage 7.0 (TID 9) in 4 ms on localhost (1/1)
18/11/12 21:16:25 INFO TaskSchedulerImpl: Removed TaskSet 7.0, whose tasks have all completed, from pool 
18/11/12 21:16:25 INFO DAGScheduler: ResultStage 7 (print at SSTest20181111.scala:85) finished in 0.004 s
18/11/12 21:16:25 INFO DAGScheduler: Job 7 finished: print at SSTest20181111.scala:85, took 0.011404 s
18/11/12 21:16:25 INFO SparkContext: Starting job: print at SSTest20181111.scala:85
18/11/12 21:16:25 INFO DAGScheduler: Got job 8 (print at SSTest20181111.scala:85) with 1 output partitions
18/11/12 21:16:25 INFO DAGScheduler: Final stage: ResultStage 8 (print at SSTest20181111.scala:85)
18/11/12 21:16:25 INFO DAGScheduler: Parents of final stage: List()
18/11/12 21:16:25 INFO DAGScheduler: Missing parents: List()
18/11/12 21:16:25 INFO DAGScheduler: Submitting ResultStage 8 (MapPartitionsRDD[108] at map at SSTest20181111.scala:84), which has no missing parents
18/11/12 21:16:25 INFO MemoryStore: Block broadcast_8 stored as values in memory (estimated size 1648.0 B, free 2.4 GB)
18/11/12 21:16:25 INFO MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size 1097.0 B, free 2.4 GB)
18/11/12 21:16:25 INFO BlockManagerInfo: Added broadcast_8_piece0 in memory on localhost:49533 (size: 1097.0 B, free: 2.4 GB)
18/11/12 21:16:25 INFO SparkContext: Created broadcast 8 from broadcast at DAGScheduler.scala:1006
18/11/12 21:16:25 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 8 (MapPartitionsRDD[108] at map at SSTest20181111.scala:84)
18/11/12 21:16:25 INFO TaskSchedulerImpl: Adding task set 8.0 with 1 tasks
18/11/12 21:16:25 INFO TaskSetManager: Starting task 0.0 in stage 8.0 (TID 10, localhost, partition 1,NODE_LOCAL, 1936 bytes)
18/11/12 21:16:25 INFO Executor: Running task 0.0 in stage 8.0 (TID 10)
18/11/12 21:16:25 INFO BlockManager: Found block input-0-1542028584400 locally
18/11/12 21:16:25 INFO Executor: Finished task 0.0 in stage 8.0 (TID 10). 937 bytes result sent to driver
18/11/12 21:16:25 INFO TaskSetManager: Finished task 0.0 in stage 8.0 (TID 10) in 3 ms on localhost (1/1)
18/11/12 21:16:25 INFO TaskSchedulerImpl: Removed TaskSet 8.0, whose tasks have all completed, from pool 
18/11/12 21:16:25 INFO DAGScheduler: ResultStage 8 (print at SSTest20181111.scala:85) finished in 0.003 s
18/11/12 21:16:25 INFO DAGScheduler: Job 8 finished: print at SSTest20181111.scala:85, took 0.007842 s
-------------------------------------------
Time: 1542028585000 ms
-------------------------------------------
2018-11-12 21:16:21
2018-11-12 21:16:23

18/11/12 21:16:25 INFO JobScheduler: Finished job streaming job 1542028585000 ms.1 from job set of time 1542028585000 ms
18/11/12 21:16:25 INFO BlockRDD: Removing RDD 105 from persistence list
18/11/12 21:16:25 INFO JobScheduler: Total delay: 0.094 s for time 1542028585000 ms (execution: 0.086 s)
18/11/12 21:16:25 INFO BlockManager: Removing RDD 105
18/11/12 21:16:25 INFO KafkaInputDStream: Removing blocks of RDD BlockRDD[105] at createStream at SSTest20181111.scala:81 of time 1542028585000 ms
18/11/12 21:16:25 INFO MapPartitionsRDD: Removing RDD 106 from persistence list
18/11/12 21:16:25 INFO JobGenerator: Checkpointing graph for time 1542028585000 ms
18/11/12 21:16:25 INFO DStreamGraph: Updating checkpoint data for time 1542028585000 ms
18/11/12 21:16:25 INFO DStreamGraph: Updated checkpoint data for time 1542028585000 ms
18/11/12 21:16:25 INFO CheckpointWriter: Submitted checkpoint of time 1542028585000 ms writer queue
18/11/12 21:16:25 INFO CheckpointWriter: Saving checkpoint for time 1542028585000 ms to file 'file:/D:/spark-tmp/checkpoint-1542028585000'
18/11/12 21:16:25 INFO BlockManager: Removing RDD 106
18/11/12 21:16:25 INFO BlockManagerInfo: Removed input-0-1542028579400 on localhost:49533 in memory (size: 92.0 B, free: 2.4 GB)
18/11/12 21:16:25 INFO BlockManagerInfo: Removed input-0-1542028576800 on localhost:49533 in memory (size: 92.0 B, free: 2.4 GB)
18/11/12 21:16:25 INFO BlockManagerInfo: Removed input-0-1542028574800 on localhost:49533 in memory (size: 76.0 B, free: 2.4 GB)
18/11/12 21:16:25 INFO CheckpointWriter: Deleting file:/D:/spark-tmp/checkpoint-1542028560000
18/11/12 21:16:25 INFO CheckpointWriter: Checkpoint for time 1542028585000 ms saved to file 'file:/D:/spark-tmp/checkpoint-1542028585000', took 3100 bytes and 7 ms
18/11/12 21:16:25 INFO DStreamGraph: Clearing checkpoint data for time 1542028585000 ms
18/11/12 21:16:25 INFO DStreamGraph: Cleared checkpoint data for time 1542028585000 ms
18/11/12 21:16:25 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer(1542028575000 ms)
18/11/12 21:16:25 INFO WriteAheadLogManager  for Thread: Attempting to clear 0 old log files in file:/D:/spark-tmp/receivedBlockMetadata older than 1542028580000: 
18/11/12 21:16:25 INFO InputInfoTracker: remove old batch metadata: 1542028575000 ms
18/11/12 21:16:26 INFO MemoryStore: Block input-0-1542028586200 stored as bytes in memory (estimated size 92.0 B, free 2.4 GB)
18/11/12 21:16:26 INFO BlockManagerInfo: Added input-0-1542028586200 in memory on localhost:49533 (size: 92.0 B, free: 2.4 GB)
18/11/12 21:16:26 WARN BlockManager: Block input-0-1542028586200 replicated to only 0 peer(s) instead of 1 peers
18/11/12 21:16:26 INFO BlockGenerator: Pushed block input-0-1542028586200
18/11/12 21:16:29 INFO MemoryStore: Block input-0-1542028588800 stored as bytes in memory (estimated size 92.0 B, free 2.4 GB)
18/11/12 21:16:29 INFO BlockManagerInfo: Added input-0-1542028588800 in memory on localhost:49533 (size: 92.0 B, free: 2.4 GB)
18/11/12 21:16:29 WARN BlockManager: Block input-0-1542028588800 replicated to only 0 peer(s) instead of 1 peers
18/11/12 21:16:29 INFO BlockGenerator: Pushed block input-0-1542028588800
18/11/12 21:16:30 INFO JobScheduler: Added jobs for time 1542028590000 ms
18/11/12 21:16:30 INFO JobGenerator: Checkpointing graph for time 1542028590000 ms
18/11/12 21:16:30 INFO DStreamGraph: Updating checkpoint data for time 1542028590000 ms
18/11/12 21:16:30 INFO DStreamGraph: Updated checkpoint data for time 1542028590000 ms
18/11/12 21:16:30 INFO JobScheduler: Starting job streaming job 1542028590000 ms.0 from job set of time 1542028590000 ms
18/11/12 21:16:30 INFO CheckpointWriter: Submitted checkpoint of time 1542028590000 ms writer queue
18/11/12 21:16:30 INFO CheckpointWriter: Saving checkpoint for time 1542028590000 ms to file 'file:/D:/spark-tmp/checkpoint-1542028590000'
18/11/12 21:16:30 INFO SparkContext: Starting job: print at SSTest20181111.scala:83
18/11/12 21:16:30 INFO DAGScheduler: Got job 9 (print at SSTest20181111.scala:83) with 1 output partitions
18/11/12 21:16:30 INFO DAGScheduler: Final stage: ResultStage 9 (print at SSTest20181111.scala:83)
18/11/12 21:16:30 INFO DAGScheduler: Parents of final stage: List()
18/11/12 21:16:30 INFO DAGScheduler: Missing parents: List()
18/11/12 21:16:30 INFO DAGScheduler: Submitting ResultStage 9 (BlockRDD[109] at createStream at SSTest20181111.scala:81), which has no missing parents
18/11/12 21:16:30 INFO CheckpointWriter: Deleting file:/D:/spark-tmp/checkpoint-1542028565000.bk
18/11/12 21:16:30 INFO MemoryStore: Block broadcast_9 stored as values in memory (estimated size 1128.0 B, free 2.4 GB)
18/11/12 21:16:30 INFO CheckpointWriter: Checkpoint for time 1542028590000 ms saved to file 'file:/D:/spark-tmp/checkpoint-1542028590000', took 3104 bytes and 5 ms
18/11/12 21:16:30 INFO MemoryStore: Block broadcast_9_piece0 stored as bytes in memory (estimated size 757.0 B, free 2.4 GB)
18/11/12 21:16:30 INFO BlockManagerInfo: Added broadcast_9_piece0 in memory on localhost:49533 (size: 757.0 B, free: 2.4 GB)
18/11/12 21:16:30 INFO SparkContext: Created broadcast 9 from broadcast at DAGScheduler.scala:1006
18/11/12 21:16:30 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 9 (BlockRDD[109] at createStream at SSTest20181111.scala:81)
18/11/12 21:16:30 INFO TaskSchedulerImpl: Adding task set 9.0 with 1 tasks
18/11/12 21:16:30 INFO TaskSetManager: Starting task 0.0 in stage 9.0 (TID 11, localhost, partition 0,NODE_LOCAL, 1936 bytes)
18/11/12 21:16:30 INFO Executor: Running task 0.0 in stage 9.0 (TID 11)
18/11/12 21:16:30 INFO BlockManager: Found block input-0-1542028586200 locally
18/11/12 21:16:30 INFO Executor: Finished task 0.0 in stage 9.0 (TID 11). 999 bytes result sent to driver
18/11/12 21:16:30 INFO DAGScheduler: ResultStage 9 (print at SSTest20181111.scala:83) finished in 0.002 s
18/11/12 21:16:30 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID 11) in 1 ms on localhost (1/1)
18/11/12 21:16:30 INFO TaskSchedulerImpl: Removed TaskSet 9.0, whose tasks have all completed, from pool 
18/11/12 21:16:30 INFO DAGScheduler: Job 9 finished: print at SSTest20181111.scala:83, took 0.006363 s
18/11/12 21:16:30 INFO SparkContext: Starting job: print at SSTest20181111.scala:83
18/11/12 21:16:30 INFO DAGScheduler: Got job 10 (print at SSTest20181111.scala:83) with 1 output partitions
18/11/12 21:16:30 INFO DAGScheduler: Final stage: ResultStage 10 (print at SSTest20181111.scala:83)
18/11/12 21:16:30 INFO DAGScheduler: Parents of final stage: List()
18/11/12 21:16:30 INFO DAGScheduler: Missing parents: List()
18/11/12 21:16:30 INFO DAGScheduler: Submitting ResultStage 10 (BlockRDD[109] at createStream at SSTest20181111.scala:81), which has no missing parents
18/11/12 21:16:30 INFO MemoryStore: Block broadcast_10 stored as values in memory (estimated size 1128.0 B, free 2.4 GB)
18/11/12 21:16:30 INFO MemoryStore: Block broadcast_10_piece0 stored as bytes in memory (estimated size 757.0 B, free 2.4 GB)
18/11/12 21:16:30 INFO BlockManagerInfo: Added broadcast_10_piece0 in memory on localhost:49533 (size: 757.0 B, free: 2.4 GB)
18/11/12 21:16:30 INFO SparkContext: Created broadcast 10 from broadcast at DAGScheduler.scala:1006
18/11/12 21:16:30 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 10 (BlockRDD[109] at createStream at SSTest20181111.scala:81)
18/11/12 21:16:30 INFO TaskSchedulerImpl: Adding task set 10.0 with 1 tasks
18/11/12 21:16:30 INFO TaskSetManager: Starting task 0.0 in stage 10.0 (TID 12, localhost, partition 1,NODE_LOCAL, 1936 bytes)
18/11/12 21:16:30 INFO Executor: Running task 0.0 in stage 10.0 (TID 12)
18/11/12 21:16:30 INFO BlockManager: Found block input-0-1542028588800 locally
18/11/12 21:16:30 INFO Executor: Finished task 0.0 in stage 10.0 (TID 12). 999 bytes result sent to driver
18/11/12 21:16:30 INFO TaskSetManager: Finished task 0.0 in stage 10.0 (TID 12) in 1 ms on localhost (1/1)
18/11/12 21:16:30 INFO TaskSchedulerImpl: Removed TaskSet 10.0, whose tasks have all completed, from pool 
18/11/12 21:16:30 INFO DAGScheduler: ResultStage 10 (print at SSTest20181111.scala:83) finished in 0.002 s
18/11/12 21:16:30 INFO DAGScheduler: Job 10 finished: print at SSTest20181111.scala:83, took 0.005812 s
18/11/12 21:16:30 INFO JobScheduler: Finished job streaming job 1542028590000 ms.0 from job set of time 1542028590000 ms
18/11/12 21:16:30 INFO JobScheduler: Starting job streaming job 1542028590000 ms.1 from job set of time 1542028590000 ms
-------------------------------------------
Time: 1542028590000 ms
-------------------------------------------
(null,2018-11-12 21:16:24)
(null,2018-11-12 21:16:27)

18/11/12 21:16:30 INFO SparkContext: Starting job: print at SSTest20181111.scala:85
18/11/12 21:16:30 INFO DAGScheduler: Got job 11 (print at SSTest20181111.scala:85) with 1 output partitions
18/11/12 21:16:30 INFO DAGScheduler: Final stage: ResultStage 11 (print at SSTest20181111.scala:85)
18/11/12 21:16:30 INFO DAGScheduler: Parents of final stage: List()
18/11/12 21:16:30 INFO DAGScheduler: Missing parents: List()
18/11/12 21:16:30 INFO DAGScheduler: Submitting ResultStage 11 (MapPartitionsRDD[110] at map at SSTest20181111.scala:84), which has no missing parents
18/11/12 21:16:30 INFO MemoryStore: Block broadcast_11 stored as values in memory (estimated size 1648.0 B, free 2.4 GB)
18/11/12 21:16:30 INFO MemoryStore: Block broadcast_11_piece0 stored as bytes in memory (estimated size 1097.0 B, free 2.4 GB)
18/11/12 21:16:30 INFO BlockManagerInfo: Added broadcast_11_piece0 in memory on localhost:49533 (size: 1097.0 B, free: 2.4 GB)
18/11/12 21:16:30 INFO SparkContext: Created broadcast 11 from broadcast at DAGScheduler.scala:1006
18/11/12 21:16:30 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 11 (MapPartitionsRDD[110] at map at SSTest20181111.scala:84)
18/11/12 21:16:30 INFO TaskSchedulerImpl: Adding task set 11.0 with 1 tasks
18/11/12 21:16:30 INFO TaskSetManager: Starting task 0.0 in stage 11.0 (TID 13, localhost, partition 0,NODE_LOCAL, 1936 bytes)
18/11/12 21:16:30 INFO Executor: Running task 0.0 in stage 11.0 (TID 13)
18/11/12 21:16:30 INFO BlockManager: Found block input-0-1542028586200 locally
18/11/12 21:16:30 INFO Executor: Finished task 0.0 in stage 11.0 (TID 13). 937 bytes result sent to driver
18/11/12 21:16:30 INFO TaskSetManager: Finished task 0.0 in stage 11.0 (TID 13) in 2 ms on localhost (1/1)
18/11/12 21:16:30 INFO TaskSchedulerImpl: Removed TaskSet 11.0, whose tasks have all completed, from pool 
18/11/12 21:16:30 INFO DAGScheduler: ResultStage 11 (print at SSTest20181111.scala:85) finished in 0.002 s
18/11/12 21:16:30 INFO DAGScheduler: Job 11 finished: print at SSTest20181111.scala:85, took 0.008429 s
18/11/12 21:16:30 INFO SparkContext: Starting job: print at SSTest20181111.scala:85
18/11/12 21:16:30 INFO DAGScheduler: Got job 12 (print at SSTest20181111.scala:85) with 1 output partitions
18/11/12 21:16:30 INFO DAGScheduler: Final stage: ResultStage 12 (print at SSTest20181111.scala:85)
18/11/12 21:16:30 INFO DAGScheduler: Parents of final stage: List()
18/11/12 21:16:30 INFO DAGScheduler: Missing parents: List()
18/11/12 21:16:30 INFO DAGScheduler: Submitting ResultStage 12 (MapPartitionsRDD[110] at map at SSTest20181111.scala:84), which has no missing parents
18/11/12 21:16:30 INFO MemoryStore: Block broadcast_12 stored as values in memory (estimated size 1648.0 B, free 2.4 GB)
18/11/12 21:16:30 INFO MemoryStore: Block broadcast_12_piece0 stored as bytes in memory (estimated size 1097.0 B, free 2.4 GB)
18/11/12 21:16:30 INFO BlockManagerInfo: Added broadcast_12_piece0 in memory on localhost:49533 (size: 1097.0 B, free: 2.4 GB)
18/11/12 21:16:30 INFO SparkContext: Created broadcast 12 from broadcast at DAGScheduler.scala:1006
18/11/12 21:16:30 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 12 (MapPartitionsRDD[110] at map at SSTest20181111.scala:84)
18/11/12 21:16:30 INFO TaskSchedulerImpl: Adding task set 12.0 with 1 tasks
18/11/12 21:16:30 INFO TaskSetManager: Starting task 0.0 in stage 12.0 (TID 14, localhost, partition 1,NODE_LOCAL, 1936 bytes)
18/11/12 21:16:30 INFO Executor: Running task 0.0 in stage 12.0 (TID 14)
18/11/12 21:16:30 INFO BlockManager: Found block input-0-1542028588800 locally
18/11/12 21:16:30 INFO Executor: Finished task 0.0 in stage 12.0 (TID 14). 937 bytes result sent to driver
18/11/12 21:16:30 INFO TaskSetManager: Finished task 0.0 in stage 12.0 (TID 14) in 2 ms on localhost (1/1)
18/11/12 21:16:30 INFO TaskSchedulerImpl: Removed TaskSet 12.0, whose tasks have all completed, from pool 
18/11/12 21:16:30 INFO DAGScheduler: ResultStage 12 (print at SSTest20181111.scala:85) finished in 0.002 s
18/11/12 21:16:30 INFO DAGScheduler: Job 12 finished: print at SSTest20181111.scala:85, took 0.006536 s
18/11/12 21:16:30 INFO JobScheduler: Finished job streaming job 1542028590000 ms.1 from job set of time 1542028590000 ms
-------------------------------------------
Time: 1542028590000 ms
-------------------------------------------
2018-11-12 21:16:24
2018-11-12 21:16:27

最后一个问题,kafkaStream.print() 这个代码 print 出来是一个 tuple,第一个是 null,而这个 kafkaStream 是一个 ReceiverInputDStream[(String, String)],这个东西第一个是啥呢

搜到这个文章,虽然没有直接回到问题,但是对 streaming 的数据接收流程写的非常详细, https://www.jianshu.com/p/3195… 值得回头读, http://bit1129.iteye.com/blog/… 这里也探讨了 KafkaUtils.createStream 的一些细节,有提到打印问题,但是没有提到 null 的问题,搜了一圈,没看到,也就先跳过了,不是主要矛盾

—————

其他一些附录:

https://www.ibm.com/developerw…

https://www.cnblogs.com/xlturi…

https://segmentfault.com/a/119…

https://colobu.com/2015/01/05/…

Leave a Reply

Your email address will not be published. Required fields are marked *