Friday, December 05, 2014

Analyse Tweets using Flume, Hadoop and Hive



In this post we will try to get Tweets using Flume and save them into HDFS for later analysis. Twitter exposes the API  to get the Tweets. The service is free, but requires the user to register for the service. We will quickly summarize how to get data into HDFS using Flume and start doing some analytics using Hive.



1. Twitter API

You need to create a Twitter app to have the consumer key, consumer secret, access token, and access token secret.

2.  Configure Flume

Assuming that Hadoop, Hive and Flume have already been installed and configured (see previous posts), download the flume-sources-1.0-SNAPSHOT.jar.

From command line (assume flume-sources-1.0-SNAPSHOT.jar is in your ~):

$ sudo cp ~/flume-sources-1.0-SNAPSHOT.jar /usr/lib/flume

Add it to the flume class path as shown below in the conf/flume-env.sh file:

FLUME_CLASSPATH="/usr/lib/flume/flume-sources-1.0-SNAPSHOT.jar"

The jar contains the java classes to pull the Tweets and save them into HDFS.

3. Configure Agents

The conf/flume.conf should have all the agents (flume, channel and hdfs) defined as below:


TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = <consumerKey>
TwitterAgent.sources.Twitter.consumerSecret = <consumerSecret>
TwitterAgent.sources.Twitter.accessToken = <accessToken>
TwitterAgent.sources.Twitter.accessTokenSecret = <accessTokenSecret>

TwitterAgent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientiest, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing

TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://localhost:54310/user/flume/tweets/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000

TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100

The consumerKey, consumerSecret, accessToken and accessTokenSecret have to be replaced with those obtained from here. And,  TwitterAgent.sinks.HDFS.hdfs.path should point to the NameNode and the location in HDFS where the tweets will go to.

4. Start flume entering the next command

$ bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n TwitterAgent

Maybe you are going to see an error similar to this one:

AM ERROR org.apache.flume.lifecycle.LifecycleSupervisor
Unable to start EventDrivenSourceRunner: { source:com.cloudera.flume.source.TwitterSource{name:Twitter,state:IDLE} } - Exception follows.
java.lang.NoSuchMethodError: twitter4j.FilterQuery.setIncludeEntities(Z)Ltwitter4j/FilterQuery;
at com.cloudera.flume.source.TwitterSource.start(TwitterSource.java:139)
at org.apache.flume.source.EventDrivenSourceRunner.start(EventDrivenSourceRunner.java:44)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
1:08:39.826 AM WARN org.apache.flume.lifecycle.LifecycleSupervisor
Component EventDrivenSourceRunner: { source:com.cloudera.flume.source.TwitterSource{name:Twitter,state:STOP} } stopped, since it could not besuccessfully started due to missing dependencies


If it is case, then you must do the next:

You need to recompile flume-sources-1.0-SNAPSHOT.jar from the https://github.com/cloudera/cdh-twitter-example

Install Maven, then download the repository of cdh-twitter-example.

$ cd flume-sources

$ mvn package

$ cd ..

Copy the new .jar in  /usr/lib/flume.

This problem happened when the twitter4j version updated from 2.2.6 to 3.X, they removed the method setIncludeEntities, and the JAR is not up to date.

By default, NameNode Web Interface (HDFS layer) is available at http://localhost:50070/. Here you can see the tweets in this case in the folder user/flume/tweets.


5.  Configure Hive


$ cd /home/hduser/hive/

Modify the conf/hive-site.xml to include the locations of the NameNode and the JobTracker as below

<configuration>
     <property>
         <name>fs.default.name</name>
         <value>hdfs://localhost:54310</value>
     </property>
     <property>
         <name>mapred.job.tracker</name>
         <value>localhost:54311</value>
     </property>
</configuration>

Download hive-serdes-1.0-SNAPSHOT.jar to the lib directory in Hive. Twitter returns Tweets in the JSON format and this library will help Hive understand the JSON format

Start the Hive shell using the hive command and register the hive-serdes-1.0-SNAPSHOT.jar file downloaded earlier.

Edit the file hive-env.sh and add:

export HIVE_AUX_JARS_PATH="/home/hduser/hive/lib/hive-serdes-1.0-SNAPSHOT.jar"

Or you can edit it directly in the query

hive> ADD JAR /home/hduser/hive/lib/hive-serdes-1.0-SNAPSHOT.jar;

6. Now, create the tweets table in Hive

CREATE EXTERNAL TABLE tweets (
   id BIGINT,
   created_at STRING,
   source STRING,
   favorited BOOLEAN,
   retweet_count INT,
   retweeted_status STRUCT<
      text:STRING,
      user:STRUCT<screen_name:STRING,name:STRING>>,
   entities STRUCT<
      urls:ARRAY<STRUCT<expanded_url:STRING>>,
      user_mentions:ARRAY<STRUCT<screen_name:STRING,name:STRING>>,
      hashtags:ARRAY<STRUCT<text:STRING>>>,
   text STRING,
   user STRUCT<
      screen_name:STRING,
      name:STRING,
      friends_count:INT,
      followers_count:INT,
      statuses_count:INT,
      verified:BOOLEAN,
      utc_offset:INT,
      time_zone:STRING>,
   in_reply_to_screen_name STRING
)
ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
LOCATION '/user/flume/tweets';

7. Playing with Hive.

Now that we have the data in HDFS and the table created in Hive, lets run some queries in Hive.

One of the way to determine who is the most influential person in a particular field is to to figure out whose tweets are re-tweeted the most.

$ hive
hive>

Give enough time for Flume to collect Tweets from Twitter to HDFS and then run the below query in Hive to determine the most influential person.

hive> SELECT t.retweeted_screen_name, sum(retweets) AS total_retweets, count(*) AS tweet_count FROM (SELECT retweeted_status.user.screen_name as retweeted_screen_name, retweeted_status.text, max(retweet_count) as retweets FROM tweets GROUP BY retweeted_status.user.screen_name, retweeted_status.text) t GROUP BY t.retweeted_screen_name ORDER BY total_retweets DESC LIMIT 10;


Similarly to know which user has the most number of followers, the below query helps.

hive> select user.screen_name, user.followers_count c from tweets order by c;

If you have read this post maybe you are interested in this article

No comments:

Post a Comment