sparkStreaming读取kafka写入hive表

sparkStreaming:java

package hive
 
import java.io.File
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
 
/**
  * spark消费多个topic的数据写入不一样的hive表
  */
object SparkToHive {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.WARN)
    Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.WARN)
    val warehouseLocation = new File("hdfs://cluster/hive/warehouse").getAbsolutePath
    @transient
    val spark = SparkSession
      .builder()
      .appName("Spark SQL To Hive")
      .config("spark.sql.warehouse.dir", warehouseLocation)
      .enableHiveSupport()
      .getOrCreate()
    spark.conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
 
    @transient
    val sc = spark.sparkContext
    val scc = new StreamingContext(sc, Seconds(1))
    val kafkaParams = Map[String, Object](
      "auto.offset.reset" -> "latest", //latest,earliest
      "value.deserializer" -> classOf[StringDeserializer]
      , "key.deserializer" -> classOf[StringDeserializer]
      , "bootstrap.servers" -> "10.200.10.24:6667,10.200.10.26:6667,10.200.10.29:6667"
      , "group.id" -> "test_jason"
      , "enable.auto.commit" -> (true: java.lang.Boolean)
    )
 
    var stream: InputDStream[ConsumerRecord[String, String]] = null
    val topics = Array("test", "test1","test2")
 
    stream = KafkaUtils.createDirectStream[String, String](
      scc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
    )
 
    stream.foreachRDD(rdd=>{
      if (!rdd.isEmpty()) {
        val cache_rdd = rdd.map(_.value()).cache()
        // a 表
        val a = cache_rdd.filter(_.contains("hello"))
        // b 表
        val b = cache_rdd.filter(_.contains("jason"))
        // 均可以打印结果,下面的代码就不在写了,能够参考另外一篇博客里面写hive的
        a.foreach(println)
        b.foreach(println)
      }
    })
    scc.start()
    scc.awaitTermination()
  }
}