We believe your privacy is very important. We use cookies to track your behaviour and provide a better experience
Backend Helpers | Automation and Software Development for Cloud Applicationses
Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure, akka toolkit team is developing an implementation of this standard. At the date of writing this post (June 2015) the api is still experimental but functional. This post is about how to extract information from twitter using scala, akka toolkit and reactive streams.
There is not an official Twitter client for scala, but We can use Twitter4j . This unofficial java client can get the job done easily. Below, an example about how you can use Twitter4j in scala:
import twitter4j.TwitterFactory import twitter4j.Twitter import twitter4j.conf.ConfigurationBuilder ... ... val cb = new ConfigurationBuilder() cb.setDebugEnabled(true) .setOAuthConsumerKey("YOUR KEY HERE") .setOAuthConsumerSecret("YOUR SECRET HERE") .setOAuthAccessToken("YOUR ACCESS TOKEN") .setOAuthAccessTokenSecret("YOUR ACCESS TOKEN SECRET") val tf = new TwitterFactory(cb.build()) val twitter = tf.getInstance()
We will build a command line application that searches tweets and hashtags by some term and it process this information.
At the moment of publication of this post (June 2015), akka streams is still experimental. In order to use it We will need to add experimental repository to our sbt config file:
"com.typesafe.akka" % "akka-stream-experimental_2.11" % "1.0-RC3"
There are some basic concepts taken fromexperimental akka stream documentation that are very important:
We need at least one input Source and one output Sink. In the example below, We create a Source from a String iterator created when We split a string. We remove extra blank spaces with trim and then we convert every line to uppercase. A Source for itself can't do nothing. A Flow or a Sink is required to continue the computation. In the example code below we send the strings processed by the Source to a Sink using the method runForeach
:
implicit val system = ActorSystem("Sys") import system.dispatcher implicit val materializer = ActorFlowMaterializer() val text = """I saw the best minds of my generation destroyed by madness, starving hysterical naked, dragging themselves through the negro streets at dawn looking for an angry fix, angel headed hipsters burning for the ancient heavenly connection to the starry dynamo in the machinery of night""" //build a source from an iterator Source(() => text.split(",").iterator) //transform every line .map(s => s.trim()) .map(_.toUpperCase) //send every line to a sink .runForeach(s => println( s + "\n") ) //shutdown when finish .onComplete(_ => system.shutdown())
Graphs are useful to perform any kind of fan-in ("multiple inputs") or fan-out ("multiple outputs") operations. In the code below We will use the source that We created before and We will send every line to a text file and to the console:
implicit val system = ActorSystem("Sys") import system.dispatcher implicit val materializer = ActorFlowMaterializer() val text = """I saw the best minds of my generation destroyed by madness, starving hysterical naked, dragging themselves through the negro streets at dawn looking for an angry fix, angel headed hipsters burning for the ancient heavenly connection to the starry dynamo in the machinery of night""" // take a ByteString as input and write to disk val fileSink = SynchronousFileSink(new File("/tmp/outscala.txt"), true) // take a String as input and print it to the console val consoleSink = Sink.foreach(println) //takes a string and convert it to ByteString val flowConverter = Flow[String].map(s => ByteString.fromString(s)) val g = FlowGraph.closed(fileSink, consoleSink)((_,cons) => cons) { implicit builder => (f, console) => import FlowGraph.Implicits._ //build a source from an iterator val in = Source(List(text)) //transform every line .map(s => s.trim()) .map(_.toUpperCase) //Broadcast that take 1 entry and send it to 2 outputs val bcast = builder.add(Broadcast[String](2)) in ~> bcast bcast ~> flowConverter ~> f bcast ~> console }.run() g.onComplete { case Success(_) => system.shutdown() case Failure(e) => println(s"Failure: ${e.getMessage}") system.shutdown() }
Our goal is to build an akka application that uses reactive streams to get tweets from twitter. The application classifies tweets by author and content. We will create a tweets source quering data from twitter and We will mantain 3 indexes: tweets, authors, hashtags. A simplified draw of the process would look like this:
+----------+ +------- | Authors | | +----------+ | | +---------+ +----------------+ | Tweets | | Twitter Source |---------+---------+ +----------------+ | | +-----------+ +----| Hashtags | +-----------+
We define access keys and tokens as environment variables:
def getClient(): Twitter = { val api_key = sys.env("REACTIVE_TWITTER_API_KEY") val api_secret = sys.env("REACTIVE_TWITTER_API_SECRET") val access_token = sys.env("REACTIVE_TWITTER_ACCESS_TOKEN") val access_token_secret = sys.env("REACTIVE_TWITTER_ACCESS_TOKEN_SECRET") val cb = new ConfigurationBuilder() cb.setDebugEnabled(true) .setOAuthConsumerKey(api_key) .setOAuthConsumerSecret(api_secret) .setOAuthAccessToken(access_token) .setOAuthAccessTokenSecret(access_token_secret) val tf = new TwitterFactory(cb.build()) tf.getInstance() }
val client = getClient() val query = new Query(term); val tweets = Source( () => client.search(query).getTweets.asScala.iterator)
val tweetPath = Paths.get(dataDir, "tweets.txt") val tweetSink = SynchronousFileSink(new File(tweetPath.toString), true) val authorPath = Paths.get(dataDir, "authors.txt") val authorSink = SynchronousFileSink(new File(authorPath.toString), true) val hashtagPath = Paths.get(dataDir, "hashtags.txt") val hashtagSink = SynchronousFileSink(new File(hashtagPath.toString), true) val tweetFlow = Flow[Status] val twitterGraph = FlowGraph.closed(tweets, tweetSink)( (tweets, tSink) => tSink) { implicit builder => (in, tSink) => import FlowGraph.Implicits._ val DELIMITER = "===" //Broadcast that take 1 entry and send it to 2 outputs val bcast = builder.add(Broadcast[Status](3)) in ~> bcast //bcast ~> tweetToStrFlow ~> autSink bcast ~> tweetFlow .map(t => ByteString.fromString( t.getId + DELIMITER + t.getText + DELIMITER + t.getSource + " ")) ~> tSink bcast ~> tweetFlow .map(t => ByteString.fromString( t.getId + DELIMITER + t.getUser.getId + DELIMITER + DELIMITER + t.getUser.getScreenName + DELIMITER + t.getUser.getName + " ")) ~> authorSink bcast ~> tweetFlow .filter(t => t.getHashtagEntities.length > 0 ) .map(t => { var lineList = ArrayBuffer.empty[String] lineList.append(t.getId + DELIMITER) t.getHashtagEntities.foreach(h => lineList.append(h.getText + DELIMITER))2 lineList.append(" ") ByteString.fromString(lineList.mkString("")) }) ~> hashtagSink }.run()
First step required is to set up environment variables with twitter access values:
export REACTIVE_TWITTER_API_KEY=API_KEY export REACTIVE_TWITTER_API_SECRET=API_SECRET export REACTIVE_TWITTER_ACCESS_TOKEN=ACCESS_TOKEN export REACTIVE_TWITTER_ACCESS_TOKEN_SECRET=TOKEN_SECRET
Then You can execute sbt
:
sbt clean "run --dataDir /tmp --query internet&colombia"
Source code can be found at this link