Category Archives: tech

Finding religious tweets using storm

This is an example for using storm to filter tweets in real-time. Storm is a development platform designed for real-time streaming applications. It is open-source and can be found here. The storm-starter project is an excellent place to start for building your own project. Maven's project page is useful also.

Storm Overview

Storm applications are defined using a "topology" model that is composed of spouts and bolts. A spout defines the topology entry point and data source. A bolt defines a component that performs a mutation, filter or aggregation upon a data stream. Storm has multi-language support for bolts - in this example I use a python bolt. Spouts and bolts consume and emit a tuple - which is an object representation of messages that pass through the topology.

Advantages

The topology design has at least 2 advantages IMHO. First, the modular design is amenable to inter-changeability of components - making it easy to make major design changes by simply switching out bolts. Second, the platform is designed to scale horizontally (like hadoop) so one should be able to test on a single system then deploy on a large cluster with minimal change. That's pretty awesome!

Introducing Habakkuk

Habakkuk is an application for filtering tweets containing Christian Bible references. The goal is to capture the book name, chapter number, verse number and tweet text for further analysis.

Storm Topology

The storm topology consists of two components. The TwitterSampleSpout initiates a twitter streaming connection using the username and password from the command line. The ScriptureFilterBolt is a bolt written in python that applies a regex for bible reference matching. The code snippet below shows how the TwitterSampleSpout and ScriptureFilterBolt are connected using a storm TopologyBuilder class. The topology is run using the LocalCluster class that is intended for single host testing.

public static void main(String[] args) {
   String username = args[0];
   String pwd = args[1];
   TopologyBuilder builder = new TopologyBuilder();

   builder.setSpout("twitter",
                    new TwitterSampleSpout(username, pwd),
                    1);
   builder.setBolt("filter",
                   new ScriptureFilterBolt(), 1)
                   .shuffleGrouping("twitter");

   Config conf = new Config();
   LocalCluster cluster = new LocalCluster();
   cluster.submitTopology("test", conf, builder.createTopology());
 }

Twitter Spout

The TwitterSampleSpout is copied directly from the storm-starter project. The only change is in nextTuple(), where the username and status text are copied to a hashmap. I ran into serialization issues trying to emit the Twitter4j status object to a ShellBolt. There's probably a reasonable workaround for this but my java-fu wasn't up to the task. 🙂

The snippet below is the modified nextTuple() from TwitterSampleSpout.java. It simply puts the name and text in a hashmap and emits the hashmap.

   @Override
    public void nextTuple() {
        Status ret = queue.poll();
        if(ret==null) {
            Utils.sleep(50);
        } else {
            Map data = new HashMap();
            data.put("username", ret.getUser().getName());
            data.put("text", ret.getText());
            _collector.emit(new Values(data));
        }
    }

Python Bolt

The ScriptureFilterBolt is basically copied from the storm-starter project and modified to load my python script. The python module ScriptureFilterBolt.py simply applies a python regex to the status text and looks for matches. Right now, the match is printed to a console but in the future the tuple will be emitted to another bolt.

The snippet below shows how to define a ShellBolt to invoke a python module.

package technicalelvis.habakkuk.bolt;
import backtype.storm.task.ShellBolt;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import java.util.Map;

public class ScriptureFilterBolt extends ShellBolt implements IRichBolt{
	private static final long serialVersionUID = 1L;
	public ScriptureFilterBolt() {
        super("python", "ScriptureFilterBolt.py");
    }

	public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("book"));
    }

    public Map getComponentConfiguration() {
        return null;
    }
}

The snippet below shows the python bolt implementation.

import storm
from find_all_scriptures import find_all_scriptures, filtergroupdict
import jsonlib2 as json
class ScriptureParserBolt(storm.BasicBolt):
    def process(self,tup):
        txt = tup.values[0]['text']
        matches = find_all_scriptures(txt)
        for ma in matches:
            storm.log("%s"%tup.values[0])
            ret = filtergroupdict(ma)
            #get matched string
            matext = \
              ma.string[ma.start():ma.end()]\
                .replace('\r\n','')
            storm.log("Match %s %s {STRING:'%s'}"%\
                      (ret['book'],ret['verse'], matext))
ScriptureParserBolt().run()

A snippet of the bible reference regex is shown below. The full source is on github.

import re

find_all_scriptures = re.compile("""
(
  (?Pge\w{0,5}\.?) # genesis
   |(?Pex\w{0,4}\.?) # exodus
   |(?Ple\w{0,7}\.?) # leviticus
   # other bible books
  )
  \s+(?P\d{1,3}\s*:\s*\d{1,3})
""",re.VERBOSE|re.MULTILINE).finditer
)

Build and Run

Please refer to the maven documentation for pom.xml info. Execute the following commands to build the jar and start the topology.

$ git clone git@github.com:telvis07/habakkuk.git
$ cd habakkuk/java/habakkuk-core
$ mvn compile
$ mvn package
$ storm jar target/habakkuk-core-0.0.1-SNAPSHOT-jar-with-dependencies.jar technicalelvis.habakkuk.MainTopology habakkuk.properties

You should see output like below.

143736 [Thread-27] INFO  backtype.storm.task.ShellBolt  - Shell msg: {u'username': u"someuser", u'text': u'I like John 3:16'}
143736 [Thread-27] INFO  backtype.storm.task.ShellBolt  - Shell msg: Match John 3:16 {STRING: 'John 3:16'}

Summary

Storm is a platform for developing real-time streaming, filtering and aggregation apps. A storm app is represented as a topology composed of a spouts and bolts. Bolts provide multi-language support that allows Python code integration into a topology.

Fork me on github

This code will evolve over time. Find the complete codebase on github at: https://github.com/telvis07/habakkuk. The develop branch has the latest stuff.

twitter mining: count hashtags per day

We can use CouchDB views to count twitter hashtags per day. I've used two views. The first view uses a mapper to map hashtags to a [YEAR, MONTH, DAY] tuple. The view can subsequently be queried hash tags for that date.

import couchdb
from couchdb.design import ViewDefinition

def time_hashtag_mapper(doc):
    """Hash tag by timestamp"""
    from datetime import datetime
    if doc.get('created_at'):
        _date = doc['created_at']
    else:
        _date = 0 # Jan 1 1970

    if doc.get('entities') and doc['entities'].get('hashtags'):
        dt = datetime.fromtimestamp(_date).utctimetuple()
        for hashtag in (doc['entities']['hashtags']):
            yield([dt.tm_year, dt.tm_mon, dt.tm_mday], 
                   hashtag['text'].lower())

view = ViewDefinition('index',
                      'time_hashtags',
                      time_hashtag_mapper,
                      language='python')
view.sync(db)

The second view maps each tweet to a tuple containing the [YEAR, MONTH, DAY, HASHTAG]. Then a reducer is used to count the tweets matching the tuple.

import couchdb
from couchdb.design import ViewDefinition

def date_hashtag_mapper(doc):
    """tweet by date+hashtag"""
    from datetime import datetime
    if doc.get('created_at'):
        _date = doc['created_at']
    else:
        _date = 0 # Jan 1 1970

    dt = datetime.fromtimestamp(_date).utctimetuple()
    if doc.get('entities') and doc['entities'].get('hashtags'):
        for hashtag in (doc['entities']['hashtags']):
            yield ([dt.tm_year, dt.tm_mon, dt.tm_mday, 
                    hashtag['text'].lower()], 
                   doc['_id'])

def sumreducer(keys, values, rereduce):
    """count then sum"""
    if rereduce:
        return sum(values)
    else:
        return len(values)

view = ViewDefinition('index',
                      'daily_tagcount',
                      date_hashtag_mapper,
                      reduce_fun=sumreducer,
                      language='python')
view.sync(db)

Finally, query the first view to find tags for the day and then query the second view for tweet counts per tag for the day.

import sys
import couchdb
import time
from datetime import date, datetime

server = couchdb.Server('http://localhost:5984')
dbname = sys.argv[1]
db = server[dbname]

_date  = sys.argv[2]
dt = datetime.strptime(_date,"%Y-%m-%d").utctimetuple()

# get tags for this time interval
_key = [dt.tm_year, dt.tm_mon, dt.tm_mday]
tags = [row.value for row in db.view('index/time_hashtags', key=_key)]
tags = list(set(tags))
print "Tags today",len(tags)
print ""

# get count for date and hashtag
for tag in sorted(tags):
    _key = [dt.tm_year, dt.tm_mon, dt.tm_mday, tag]
    tag_count = \
      [ (row.value) for row in db.view('index/daily_tagcount', key=_key) ]
    print "Found %d %s on %s-%s-%s "%\
      (tag_count[0],tag,_key[0],_key[1],_key[2])

This code will evolve over time.
Find the complete codebase on github at: https://github.com/telvis07/twitter_mining. The develop branch has the latest stuff.

twitter mining by geolocation

Twitter's streaming api permits filtering tweets by geolocation. According to the api documentation, only tweets that are created using the Geotagging API can be filtered. The code below uses tweepy to filter tweets for the San Francisco area.

#!/usr/bin/env python
import tweepy
import ConfigParser
import os, sys

class Listener(tweepy.StreamListener):
    def on_status(self, status):
        print "screen_name='%s' tweet='%s'"%(status.author.screen_name, status.text)

def login(config):
    """Tweepy oauth dance
    The config file should contain:

    [auth]
    CONSUMER_KEY = ...
    CONSUMER_SECRET = ...
    ACCESS_TOKEN = ...
    ACCESS_TOKEN_SECRET = ...
    """     
    CONSUMER_KEY = config.get('auth','CONSUMER_KEY')
    CONSUMER_SECRET = config.get('auth','CONSUMER_SECRET')
    ACCESS_TOKEN = config.get('auth','ACCESS_TOKEN')
    ACCESS_TOKEN_SECRET = config.get('auth','ACCESS_TOKEN_SECRET')
    
    auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
    auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
    return auth


fn=sys.argv[1]
config = ConfigParser.RawConfigParser()
config.read(fn)
try:
    auth = login(config)
    streaming_api = tweepy.streaming.Stream(auth, Listener(), timeout=60)
    # San Francisco area.
    streaming_api.filter(follow=None, locations=[-122.75,36.8,-121.75,37.8]) 
except KeyboardInterrupt:
    print "got keyboardinterrupt"

Find the complete codebase on github at: https://github.com/telvis07/twitter_mining

twitter mining: top tweets with links

It's useful to filter out "conversational" tweets and look for tweets with links to another page or picture, etc.

We create a view that only map tweets with link entities.

import couchdb
from couchdb.design import ViewDefinition
import sys

def url_tweets_by_created_at(doc):
    if doc.get('created_at'):
        _date = doc['created_at']
    else:
        _date = 0 # Jan 1 1970

    if doc.get('entities') and doc['entities'].get('urls') 
      and len(doc['entities']['urls']):
        if doc.get('user'):
            yield (_date, doc)

view = ViewDefinition('index', 'daily_url_tweets', 
                      url_tweets_by_created_at, language='python')
view.sync(db)

Next we create an app that reads from this view and displays the results.

import couchdb
from datetime import datetime

def run(db, date, limit=10):
    """Query a couchdb view for tweets. Sort in memory by follower count.
    Return the top 10 tweeters and their tweets"""
    print "Finding top %d tweeters"%limit

    dt = datetime.strptime(date,"%Y-%m-%d")
    stime=int(time.mktime(dt.timetuple()))
    etime=stime+86400-1
    tweeters = {}
    tweets = {}
    # get screen_name, follower_counts and tweet ids for looking up later
    for row in db.view('index/daily_url_tweets', startkey=stime, endkey=etime):
        status = row.value
        screen_name = status['user']['screen_name']
        followers_count = status['user']['followers_count']
        tweeters[screen_name] = int(followers_count)
        if not tweets.has_key(screen_name):
            tweets[screen_name] = []
        tweets[screen_name].append(status['id_str'])

    # sort
    print len(tweeters.keys())
    di = tweeters.items()
    di.sort(key=lambda x: x[1], reverse=True)
    out = {}
    for i in range(limit):
        screen_name = di[i][0]
        followers_count = di[i][1]
        out[screen_name] = {}
        out[screen_name]['follower_count'] = followers_count
        out[screen_name]['tweets'] = {}
        # print i,screen_name,followers_count
        for tweetid in tweets[screen_name]:
            status = db[tweetid]
            text = status['orig_text']
            # print tweetid,orig_text
            urls = status['entities']['urls']
            #name = status['user']['name']
            for url in urls:
                text = text.replace(url['url'],url['expanded_url'])
            out[screen_name]['tweets'][tweetid] = text

    return out

server = couchdb.Server('http://localhost:5984')
db = server[dbname]
date = '2012-03-05'
output = run(db, date)

Find the complete codebase on github at: https://github.com/telvis07/twitter_mining

twitter mining: top tweets by follower count

We can find interesting tweets using the author's follower count and tweet timestamp. We store tweets using CouchDB and search for tweets using tweepy streaming. With these tools we can find the top N tweets per day. The code below uses the couchpy view server to write a view in python. The steps to setup couchpy are found here. Basically, you add the following to /etc/couchdb/local.ini and install couchpy.

Install couchpy and couchdb-python with the following command.

pip install couchdb

Test couchpy is installed.

$ which couchpy
/usr/bin/couchpy

Edit /etc/couchdb/local.ini

[query_servers]
python=/usr/bin/couchpy

This a simple view mapper that maps each tweet to a timestamp so we can query by start and end time.


import couchdb
from couchdb.design import ViewDefinition
import sys

server = couchdb.Server('http://localhost:5984')
db = sys.argv[1]
db = server[db]

def tweets_by_created_at(doc):
    if doc.get('created_at'):
        _date = doc['created_at']
    else:
        _date = 0 # Jan 1 1970
    
    if doc.get('user'):
        yield (_date, doc) 
        
view = ViewDefinition('index', 'daily_tweets', tweets_by_created_at, language='python')
view.sync(db)

The code below queries the view for all tweets within a date range. Then we sort in memory by the follower count.

import couchdb
from datetime import datetime

def run(db, date, limit=10):
    """Query a couchdb view for tweets. Sort in memory by follower count.
    Return the top 10 tweeters and their tweets"""
    print "Finding top %d tweeters"%limit
        
    dt = datetime.strptime(date,"%Y-%m-%d")
    stime=int(time.mktime(dt.timetuple()))
    etime=stime+86400-1
    tweeters = {}
    tweets = {}
    for row in db.view('index/daily_tweets', startkey=stime, endkey=etime):
        status = row.value
        screen_name = status['user']['screen_name']
        followers_count = status['user']['followers_count']
        tweeters[screen_name] = int(followers_count)
        if not tweets.has_key(screen_name):
            tweets[screen_name] = []
        tweets[screen_name].append(status['id_str'])
        
    # sort
    di = tweeters.items() 
    di.sort(key=lambda x: x[1], reverse=True)
    out = {}
    for i in range(limit):
        screen_name = di[i][0]
        followers_count = di[i][1]
        out[screen_name] = {}
        out[screen_name]['follower_count'] = followers_count
        out[screen_name]['tweets'] = {}
        # print i,screen_name,followers_count
        for tweetid in tweets[screen_name]:
            orig_text = db[tweetid]['orig_text']
            # print tweetid,orig_text
            out[screen_name]['tweets'][tweetid] = orig_text

    return out

server = couchdb.Server('http://localhost:5984')
db = server[dbname]
date = '2012-03-05'
output = run(db, date)

Find the complete codebase on github at: https://github.com/telvis07/twitter_mining

twitter_mining: oauth with tweepy

Tweepy provides a module to authenticate with twitter using OAuth. The example below retrieves the auth credentials from a config file and creates a filter stream for the terms 'technical' and 'elvis'. You can get a CONSUMER_KEY and CONSUMER_SECRET by creating a twitter dev account at http://dev.twitter.com/apps/new. The access token and access token secret can be found under the "My Applications" link in your account.

import tweepy
import ConfigParser
import os

class Listener(tweepy.StreamListener):
    def on_status(self, status):
        print "screen_name='%s' tweet='%s'"%(status.author.screen_name, status.text)

def login():
    config = ConfigParser.RawConfigParser()
    fn = os.path.join(os.environ['HOME'],'conf', 'twitter_mining.cfg')
    config.read(fn)

    CONSUMER_KEY = config.get('auth','CONSUMER_KEY')
    CONSUMER_SECRET = config.get('auth','CONSUMER_SECRET')
    ACCESS_TOKEN = config.get('auth','ACCESS_TOKEN')
    ACCESS_TOKEN_SECRET = config.get('auth','ACCESS_TOKEN_SECRET')

    auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
    auth.set_access_token(ACCESS_TOKEN, ACCESS_TOKEN_SECRET)
    return auth

try:
    auth = login()
    streaming_api = tweepy.streaming.Stream(auth, Listener(), timeout=60)
    streaming_api.filter(follow=None, track=['technical','elvis'])
except KeyboardInterrupt:
    print "got keyboardinterrupt"

Find the complete codebase on github at: https://github.com/telvis07/twitter_mining

new project: twitter mining

I've started a new twitter mining project in a effort to blog regularly and play with interesting tech. The initial code is on github at https://github.com/telvis07/twitter_mining. The code will be updated often and this blog will be updated weekly (hopefully). The goal of this project is to develop novel ways find the MOST meaningful tweets and tweeters over a given interval. I will blog about the current code and any additions I make.

This project is inspired by my awesome wife Sharon who wants info relevant to her site MidtownSweets. This is also inspired by the book Mining The Social Web by Matthew A. Russell - which is a great book.