TIPS
1.资料来源 说明书以及 内部构造
2.学习技术就是不断解惑的过程,就kafka stream自问:是个什么技术,能干什么,怎么使用..
数据输入和数据输出都保存在kafka集群
的程序和微服务
构建的客户端类库,那么就不须要专门去搭建计算集群,方便快捷;concept | desc |
---|---|
stream processing application | 多个处理器造成的拓扑结构,包含有必定处理逻辑的应用程序 |
processor topology | 流处理器拓扑,是processor+...+processor 的形式,source和sink是特殊的processor |
Source Processor | 源头处理器,即上游没有其余的流处理器,从kafka的topic中消费数据产生数据流输送到下游 |
Sink Processor | 结果处理器,即下游没有其余的流处理器,将上游的数据输送到指定的kafka topic |
Time | 联想flink的时间语义,例如某某time1手机端购买某商品,产生了日志数据,而后time2这个日志数据被实时采集到Kafka持久化到topic,而后进入流式处理框架,在time3正式被计算,那么time123分别称为:event time,ingestion time,processing time |
states | 保存和查询数据状态的功能,能够定义流处理应用外的程序进行只读访问 |
processing guarantees | 消费是否丢失和是否重复的级别,好比exactly-once,at-least-once,at-most-once |
kafka stream的拓扑其实就是一个个processor链接起来的流程图,其中source和sink是比较特殊的processor,分别没有上游和下游处理器。拓扑建立方式是在建立下游processor的时候指定上游的processor名称进行链接java
// DSL转换算子生成新KStream是调用 void addGraphNode(final StreamsGraphNode parent,final StreamsGraphNode child) {} // 直接经过builder添加processor public synchronized Topology addProcessor(final String name,final ProcessorSupplier supplier,final String... parentNames) {}
使用上核心都是四个步骤:node
/* 1.props */ Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");//可做为consumer的group id props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");//kafka的地址,多个逗号分隔 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());// 序列化和反序列化,在读取和写出流的时候、在读取和写出state的时候都会用到 props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); /* 2.topology */ final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("streams-plaintext-input");//source processor,传入参数可定义key,value的序列化方式,以及时间提取器等 source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))//KString<String,String> .groupBy((key, value) -> value)// KGroupedStream<String,String> .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))//KTable<String,String> .toStream()//KStream<String,Long> .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));//sink processor,指定输出key,value的数据类型 final Topology topology = builder.build(); /* 3.KafkaStreams实例 */ final KafkaStreams streams = new KafkaStreams(topology, props); // CountDownLatch用await()阻塞当前线程,countDown()记录完成线程的数量 // 当getCount()=0的时候继续执行await后续的代码 final CountDownLatch latch = new CountDownLatch(1); System.out.println(topology.describe());// 打印流处理拓扑 // 钩子函数 Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { // 4.执行 streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0);
# 生产者打印生产数据 langjiang@langs-MacBook-Pro kafka_2.11-2.1.0 % bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input >hello hello hello hello >kafka kafka kafka kafka # 消费者打印消费数据 langjiang@langs-MacBook-Pro kafka_2.11-2.1.0 % bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer hello 4 kafka 4
这里能够看到有点相似于宽依赖的时候,拓扑会划分,中间会生成streams-wordcount-counts-store-repartition主题保存中间结果。git
Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [streams-plaintext-input]) --> KSTREAM-FLATMAPVALUES-0000000001 Processor: KSTREAM-FLATMAPVALUES-0000000001 (stores: []) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-KEY-SELECT-0000000002 (stores: []) --> counts-store-repartition-filter <-- KSTREAM-FLATMAPVALUES-0000000001 Processor: counts-store-repartition-filter (stores: []) --> counts-store-repartition-sink <-- KSTREAM-KEY-SELECT-0000000002 Sink: counts-store-repartition-sink (topic: counts-store-repartition) <-- counts-store-repartition-filter Sub-topology: 1 Source: counts-store-repartition-source (topics: [counts-store-repartition]) --> KSTREAM-AGGREGATE-0000000003 Processor: KSTREAM-AGGREGATE-0000000003 (stores: [counts-store]) --> KTABLE-TOSTREAM-0000000007 <-- counts-store-repartition-source Processor: KTABLE-TOSTREAM-0000000007 (stores: []) --> KSTREAM-SINK-0000000008 <-- KSTREAM-AGGREGATE-0000000003 Sink: KSTREAM-SINK-0000000008 (topic: streams-wordcount-output) <-- KTABLE-TOSTREAM-0000000007
DSL程序工做后会生成streams-wordcount-counts-store-changelog的主题,名称规则是:application_id+store_name+changelog
,是由于每次更新KTable,都会发送最新的键值记录到流处理内部的变动日志主题github
langjiang@langs-MacBook-Pro kafka_2.11-2.1.0 % bin/kafka-topics.sh --zookeeper localhost:2181 --list streams-plaintext-input streams-wordcount-counts-store-changelog streams-wordcount-counts-store-repartition streams-wordcount-output
/* 1.应用配置参数 */ Properties props = new Properties(); // 可做为consumer的group id props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount1"); // kafka的地址,多个逗号分隔,目前只支持单集群 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // 序列化和反序列化,在读取和写出流的时候、在读取和写出state的时候都会用到 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); /* 2.拓扑 */ StoreBuilder<KeyValueStore<String, Long>> countStoreSupplier = Stores.keyValueStoreBuilder( Stores.persistentKeyValueStore("Counts"), Serdes.String(), Serdes.Long()) .withLoggingDisabled(); // disable backing up the store to a changelog topic Topology builder = new Topology(); // add the source processor node that takes Kafka topic "source-topic" as input builder.addSource("Source", "source-topic") // add the WordCountProcessor node which takes the source processor as its upstream processor .addProcessor("Process", WordCountProcessor::new, "Source") // add the count store associated with the WordCountProcessor processor .addStateStore(countStoreSupplier, "Process") // add the sink processor node that takes Kafka topic "sink-topic" as output // and the WordCountProcessor node as its upstream processor .addSink("Sink", "sink-topic", "Process"); /* 3.流处理客户端实例 */ KafkaStreams streams = new KafkaStreams(builder, props); /* 4.启动 */ streams.start();
这里自定义了单词统计的Processorshell
/** * @Description 经过实现Processor重写process方法自定义Processor * 自定义的时候,先看上层接口,而后找一个内置的实现类参考好比KStreamAggregateProcessor */ public static class WordCountProcessor implements Processor<String, String> { private ProcessorContext context; private KeyValueStore<String, Long> kvStore; @SuppressWarnings("unchecked") public void init(ProcessorContext context) { // keep the processor context locally because we need it in punctuate() and commit() this.context = context; // 获取名为Counts的状态 kvStore = (KeyValueStore) context.getStateStore("Counts"); } // 每接收到一条消息就会执行一次process,这里是将结果放回缓存中 public void process(String key, String value) { String[] words = value.toLowerCase().split(" "); for (String word : words) { // 获取以前这个单词统计的数量,以前没有统计就设置为1 Long preCount = this.kvStore.get(word); Long result = preCount == null ? 1 : preCount + 1; // 将结果写到缓存中 this.kvStore.put(word, result); // 将结果写到下游topic this.context.forward(word,result.toString()); System.out.println("process , key = " + word + ",and value = " + result); } } public void close() { // close any resources managed by this processor // Note: Do not close any StateStores as these are managed by the library } }
# 生产者生产数据 langjiang@langs-MacBook-Pro kafka_2.11-2.1.0 % bin/kafka-console-producer.sh --broker-list localhost:9092 --topic source-topic >test test test # 消费者消费数据 langjiang@langs-MacBook-Pro kafka_2.11-2.1.0 % bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic sink-topic 1 2 3 # 注意到forward写出的key,value,实际仅打印保存了统计数据,即value
Topologies: Sub-topology: 0 Source: Source (topics: [source-topic]) --> Process Processor: Process (stores: [Counts]) --> Sink <-- Source Sink: Sink (topic: sink-topic) <-- Process