Twitter Storm: storm的一些常见模式

这篇文章列举出了storm topology里面的一些常见模式:html

  • 流聚合(stream join)
  • 批处理(Batching)
  • BasicBolt
  • 内存内缓存 + fields grouping 组合
  • 计算top N
  • 用TimeCacheMap来高效地保存一个最近被更新的对象的缓存
  • 分布式RPC: CoordinatedBolt和KeyedFairBolt

 

流聚合(stream join)

流聚合把两个或者多个数据流聚合成一个数据流 — 基于一些共同的tuple字段。流聚合和SQL里面table join很像,只是table join的输入是有限的,而且join的语义是很是明确的。而流聚合的语义是不明确的而且输入流是无限的。java

流类型的聚合类型跟具体的应用是有关了。一些应用把两个流发出的全部的tuple都聚合起来 — 无论多长时间;而另一些应用则只会聚合一些特定的tuple。而另一些应用的聚合逻辑又可能彻底不同。而这些聚合类型里面最多见的类型是把全部的输入流进行同样的划分, 这个在storm里面用fields grouping在相同字段上进行grouping就能够了,好比:git

1
2
3
4
builder.setBolt( 5 , new MyJoiner(), parallelism)
     .fieldsGrouping( 1 , new Fields( "field1" , "field2" ))
     .fieldsGrouping( 2 , new Fields( "field1" , "field2" ))
     .fieldsGrouping( 3 , new Fields( "field1" , "field2" ));

固然,不一样的数据流的“相同”字段能够有不同的名字。github

 

批处理(Batching)

有时候为了性能或者一些别的缘由, 你可能想把一组tuple一块儿处理, 而不是一个个单独处理。好比,你可能想批量更新数据库。算法

若是你想让你的数据处理具备可靠性,正确的方式是保存这些tuple对象的引用直到bolt批量处理这些tuple了。一旦这个批量操做结束, 你能够批量的ack这些tuple。数据库

若是一个bolt发射tuple, 那么你可能想用multi-anchoring来保证可靠性。这一切都取决于具体的应用。关于storm的消息传递的工做原理能够看这篇: Twitter Storm如何保证消息不丢失缓存

 

BasicBolt

不少bolt有些相似的模式:数据结构

  1. 读一个输入tuple
  2. 根据这个输入tuple发射一个或者多个tuple
  3. 在execute的方法的最后ack那个输入tuple

遵循这类模式的bolt通常是函数或者是过滤器, 这种模式太常见,storm为这类模式单独封装了一个接口: IBasicBolt。更多的信息请看: Twitter Storm如何保证消息不丢失app

 

内存内缓存 + fields grouping 组合

在bolt的内存里面缓存一些东西很是常见。缓存在和fields grouping结合起来以后就更有用了。好比,你有一个bolt把短连接变成长连接(bit.ly, t.co之类的)。你能够把短连接到长连接的对应关系利用LRU算法缓存在内存里面以免重复计算。好比组件一发射短连接,组件二把短连接转化成长连接并缓存在内存里面。看一下下面两段代码有什么不同:分布式

1
2
builder.setBolt( 2 , new ExpandUrl(), parallelism)
   .shuffleGrouping( 1 );
1
2
builder.setBolt( 2 , new ExpandUrl(), parallelism)
   .fieldsGrouping( 1 , new Fields( "url" ));

第二种方式的缓存会比第一种方式的缓存的效率高不少,由于一样的短连接始终被发到同一个task。这会避免不一样的机器上有一样的缓存  — 浪费内存, 同时也使得一样的短域名更可能在内存里面找到缓存。

 

计算top N

storm的一个常见的持续计算的模式叫作: “streaming top N”。

好比你有一个bolt发射这样的tuple: ["value", "count"]而且你想一个bolt基于这些信息算出top N的tuple。最简单的办法是有一个bolt能够作一个全局的grouping的动做而且在内存里面保持这top N的值。

这个方式对于大数据量的流显然是没有扩展性的, 由于全部的数据会被发到同一台机器, 单机的处理能力始终是有极限的。一个更好的方法是在多台机器上面并行的计算这个流每一部分的top N, 而后再有一个bolt合并这些机器上面所算出来的top N以算出最后的top N(Map Reduce的思想), 代码大概是这样的:

1
2
3
4
builder.setBolt( 2 , new RankObjects(), parallellism)
   .fieldsGrouping( 1 , new Fields( "value" ));
builder.setBolt( 3 , new MergeObjects())
   .globalGrouping( 2 );

这个模式之因此可行是由于第一个bolt的fields grouping使得这种并行算法在语义上是正确的。

用TimeCacheMap来高效地保存一个最近被更新的对象的缓存

有时候你想在内存里面保存一些最近活跃的对象,以及让那些再也不活跃的对象自动过时(删除掉)。TimeCacheMap是一个很是高效的数据结构,它提供了一些callback函数使得咱们在对象再也不活跃的时候作一些事情。关于TimeCacheMap为何高效,能够看看这篇分析文章

 

分布式RPC: CoordinatedBolt和KeyedFairBolt

用storm作分布式RPC应用的时候有两种比较常见的模式:它们被封装在 CoordinatedBoltKeyedFairBolt里面。

CoordinatedBolt包装你的bolt,而且肯定何时你的bolt已经接收到全部的tuple。它主要使用Direct Stream来作这个。

KeyedFairBolt一样包装你的bolt而且保证你的topology同时处理多个DRPC调用,而不是串行地一次只执行一个。

更多有关分布式RPC的信息能够看这里