package com.zx.dao import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ //调研SparkingStreaming实时流计算 object SparkStreamingTest { def main(args:Array[String]):Unit={ //配置spark任务参数 val sparkConf = new SparkConf(); sparkConf.setMaster("local[*]") .setAppName("sparkStreamingTest") .set("spark.executor.memory","8g") .set("spark.driver.memory","4g") .set("spark.executor.cores","4") .set("spark.cores.max","4") .set("spark.default.parallelism","12") .set("spark.num.executors","4") //建立sparkStreaming对象 val sparkStreaming= new StreamingContext(sparkConf,Seconds(1)) //设置监听服务器端口 //val client = sparkStreaming.socketTextStream("localhost",9999) //设置监听指定文件目录 val client = sparkStreaming.textFileStream("C:/Users/zixiang/Desktop/ss") //查看Dstream执行过程 client.print(100) //对每一个RDD进行操做 client.foreachRDD {rdd => { //对RDD进行转换,技术统计 var resultRDD = rdd.map(word=>(word,1)).reduceByKey(_+_) //若当前RDD有数据,则存盘 if(rdd.count()>0){ //获取当前RDD from的文件名 var fileName = GetFileNameFromStream.getFileName(rdd) resultRDD.repartition(1).saveAsTextFile("C:/Users/zixiang/Desktop/result/"+fileName) } } } //client.repartition(1).saveAsTextFiles("hdfs://master:9000/test") //开启流处理任务 sparkStreaming.start() //等待任务终止 sparkStreaming.awaitTermination() //sparkStreaming.awaitTerminationOrTimeout() //sparkStreaming.stop() } object GetFileNameFromStream extends java.io.Serializable { //获取当前RDD from 的文件路径(文件名) def getFileName(file: RDD[String]) :String ={ var fileName = file.toDebugString.split('|')(2).split(' ')(2) return fileName.split('/')(fileName.split('/').length-1) } } }