This is an example for using storm to filter tweets in real-time. Storm is a development platform designed for real-time streaming applications. It is open-source and can be found here. The storm-starter project is an excellent place to start for building your own project. Maven's project page is useful also.
Storm Overview
Storm applications are defined using a "topology" model that is composed of spouts and bolts. A spout defines the topology entry point and data source. A bolt defines a component that performs a mutation, filter or aggregation upon a data stream. Storm has multi-language support for bolts - in this example I use a python bolt. Spouts and bolts consume and emit a tuple - which is an object representation of messages that pass through the topology.
Advantages
The topology design has at least 2 advantages IMHO. First, the modular design is amenable to inter-changeability of components - making it easy to make major design changes by simply switching out bolts. Second, the platform is designed to scale horizontally (like hadoop) so one should be able to test on a single system then deploy on a large cluster with minimal change. That's pretty awesome!
Introducing Habakkuk
Habakkuk is an application for filtering tweets containing Christian Bible references. The goal is to capture the book name, chapter number, verse number and tweet text for further analysis.
Storm Topology
The storm topology consists of two components. The TwitterSampleSpout initiates a twitter streaming connection using the username and password from the command line. The ScriptureFilterBolt is a bolt written in python that applies a regex for bible reference matching. The code snippet below shows how the TwitterSampleSpout and ScriptureFilterBolt are connected using a storm TopologyBuilder class. The topology is run using the LocalCluster class that is intended for single host testing.
public static void main(String[] args) { String username = args[0]; String pwd = args[1]; TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("twitter", new TwitterSampleSpout(username, pwd), 1); builder.setBolt("filter", new ScriptureFilterBolt(), 1) .shuffleGrouping("twitter"); Config conf = new Config(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); }
Twitter Spout
The TwitterSampleSpout is copied directly from the storm-starter project. The only change is in nextTuple(), where the username and status text are copied to a hashmap. I ran into serialization issues trying to emit the Twitter4j status object to a ShellBolt. There's probably a reasonable workaround for this but my java-fu wasn't up to the task. 🙂
The snippet below is the modified nextTuple() from TwitterSampleSpout.java. It simply puts the name and text in a hashmap and emits the hashmap.
@Override public void nextTuple() { Status ret = queue.poll(); if(ret==null) { Utils.sleep(50); } else { Map data = new HashMap(); data.put("username", ret.getUser().getName()); data.put("text", ret.getText()); _collector.emit(new Values(data)); } }
Python Bolt
The ScriptureFilterBolt is basically copied from the storm-starter project and modified to load my python script. The python module ScriptureFilterBolt.py simply applies a python regex to the status text and looks for matches. Right now, the match is printed to a console but in the future the tuple will be emitted to another bolt.
The snippet below shows how to define a ShellBolt to invoke a python module.
package technicalelvis.habakkuk.bolt; import backtype.storm.task.ShellBolt; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import java.util.Map; public class ScriptureFilterBolt extends ShellBolt implements IRichBolt{ private static final long serialVersionUID = 1L; public ScriptureFilterBolt() { super("python", "ScriptureFilterBolt.py"); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("book")); } public Map getComponentConfiguration() { return null; } }
The snippet below shows the python bolt implementation.
import storm from find_all_scriptures import find_all_scriptures, filtergroupdict import jsonlib2 as json class ScriptureParserBolt(storm.BasicBolt): def process(self,tup): txt = tup.values[0]['text'] matches = find_all_scriptures(txt) for ma in matches: storm.log("%s"%tup.values[0]) ret = filtergroupdict(ma) #get matched string matext = \ ma.string[ma.start():ma.end()]\ .replace('\r\n','') storm.log("Match %s %s {STRING:'%s'}"%\ (ret['book'],ret['verse'], matext)) ScriptureParserBolt().run()
A snippet of the bible reference regex is shown below. The full source is on github.
import re find_all_scriptures = re.compile(""" ( (?Pge\w{0,5}\.?) # genesis |(?Pex\w{0,4}\.?) # exodus |(?Ple\w{0,7}\.?) # leviticus # other bible books ) \s+(?P\d{1,3}\s*:\s*\d{1,3}) """,re.VERBOSE|re.MULTILINE).finditer )
Build and Run
Please refer to the maven documentation for pom.xml info. Execute the following commands to build the jar and start the topology.
$ git clone git@github.com:telvis07/habakkuk.git
$ cd habakkuk/java/habakkuk-core
$ mvn compile
$ mvn package
$ storm jar target/habakkuk-core-0.0.1-SNAPSHOT-jar-with-dependencies.jar technicalelvis.habakkuk.MainTopology habakkuk.properties
You should see output like below.
143736 [Thread-27] INFO backtype.storm.task.ShellBolt - Shell msg: {u'username': u"someuser", u'text': u'I like John 3:16'} 143736 [Thread-27] INFO backtype.storm.task.ShellBolt - Shell msg: Match John 3:16 {STRING: 'John 3:16'}
Summary
Storm is a platform for developing real-time streaming, filtering and aggregation apps. A storm app is represented as a topology composed of a spouts and bolts. Bolts provide multi-language support that allows Python code integration into a topology.
Fork me on github
This code will evolve over time. Find the complete codebase on github at: https://github.com/telvis07/habakkuk. The develop branch has the latest stuff.