Log analyzer example using Spark and Scala

Again a long time to write some technical stuffs on Big Data but believe me the wait was worth. It’s been some couple of months now since I started reading and writing Scala and Spark and finally I am confident enough to share the knowledge I have gained.As said before learning Scala is worth but it does have difficulties as well. Being a Java programmer and coding in functional way is little difficult but once you learn Scala it is real fun believe me.

In this post I would not go into details of Spark and Scala concepts instead I would demonstrate an example which I think would be easy to start with. Once you see an end to end working program you would be able to relate with Map Reduce or any other paradigms and from there on you can self learn by picking some topics.

I am using log analyzer example which was explained here in my blog post using Map Reduce. The input and output in both examples are same, only change is Scala instead of Java, Spark instead of Map Reduce.

Problem statement: I run a highly busy website and need to pull down my site for an hour in order to apply some patches and maintenance of back end severs, which means the website will be completely unavailable for an hour. To perform this activity the primary lookout will be that shutdown outage should be affected to least number of users. The games starts here: We need to identify at what hour of the day the web traffic is least for the website so that maintenance activity can be scheduled for that time.
There is an Apache web server log for each day which records the activities happening on website. But those are huge files up to 5 GB each.

Sample from log file:

64.242.88.10,[07/Mar/2004:16:05:49 -0800],”GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1″ 401 12846
64.242.88.10,[07/Mar/2004:16:06:51 -0800], “GET /twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1.3&rev2=1.2 HTTP/1.1” 200 4523
64.242.88.10,[07/Mar/2004:16:10:02 -0800], “GET /mailman/listinfo/hsdivision HTTP/1.1” 200 6291
64.242.88.10,[07/Mar/2004:17:06:51 -0800], “GET /twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1.3&rev2=1.2 HTTP/1.1” 200 4523
64.242.88.10,[07/Mar/2004:18:06:51 -0800], “GET /twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1.3&rev2=1.2 HTTP/1.1” 200 4523
64.242.88.10,[07/Mar/2004:16:06:51 -0800], “GET /twiki/bin/rdiff/TWiki/NewUserTemplate?rev1=1.3&rev2=1.2 HTTP/1.1” 200 4523

We are interested only in the date field i.e. [07/Mar/2014:22:12:28 -0800]
Solution: I need to consume log files of one month and run my code which calculates the total number of hits for each hour of the day. Hour which has the least number of hits is perfect for the downtime. It is as simple as that!


package com.demo.loganalyzer
/**
*
* Created by Sreejith Pillai.
*
* */
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
case class LogSchema(address: String,
datetime: String,
action: Option[String]
)
class TransformMapper extends Logging{
def transform(events: RDD[LogSchema]) = {
val e = events.map(x => (x.datetime, 1)).reduceByKey (_ + _)
e.saveAsTextFile("/user/sreejith/loganalyzer/logoutput/")
}
}
object MapRawData extends Serializable with Logging{
def mapRawLine(line: String): Option[LogSchema] = {
try {
val fields = line.split(",", -1).map(_.trim)
Some(
LogSchema(
address = fields(0),
datetime = fields(1).substring(13, 15),
action = if (fields(2).length > 2) Some(fields(2)) else None
)
)
}
catch {
case e: Exception =>
log.warn(s"Unable to parse line: $line")
None
}
}
}

view raw

LogSchema.scala

hosted with ❤ by GitHub


package com.demo.loganalyzer
import org.apache.spark._
import org.joda.time.DateTime
/**
*
* Created by Sreejith Pillai.
*
**/
object RunMainJob extends TransformMapper with Logging{
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("spark-loganalyzer")
val sc = new SparkContext(conf)
val startTimeJob = new DateTime(sc.startTime)
val applicationId = sc.applicationId
log.info("Application launched with id : " + applicationId)
val rawData = sc.textFile("/user/sreejith/testlog")
val numberOfRawLines = rawData.count()
log.info("Number of lines to parse : " + numberOfRawLines)
val mapRawData = MapRawData
val parseData = rawData.flatMap(x => mapRawData.mapRawLine(x))
log.info("Number of lines after parsing: ")
transform(parseData)
}
}

view raw

RunJob.scala

hosted with ❤ by GitHub

Explanation of spark-Scala code:

Most of the code is self explanatory, I have used Scala case class to map the log schema which will be useful at any point in the code to be re used. A function mapRawLine is used to map the raw data into relevant case class. In this step you can perform any cleaning/transformation of the raw data according to your need.

Transform method contains the main logic which takes the case class as an RDD and it retrieve only the time from datetime field and then group it to get a count as wordcount.

def transform(events: RDD[LogSchema]) = {
   val e = events.map(x => (x.datetime, 1)).reduceByKey (_ + _)
   e.saveAsTextFile("/user/sreejith/loganalyzer/logoutput/")
 }

The second line in the above code can also be re written as

val e = events.map(x => (x.datetime, 1)).reduceByKey { case (x, y) => x + y }

(_+_) is actually a Scala shorthand for an anonymous function taking two parameters and returning the result obtained by invoking the + operator on the first passing the second. This is same as a recursive function written in C or Java.

The code retrieve the time from datetime field and assign  1 to each one of them and then the reduce by key is the reduce step applied on each key which do recursive summation of all the 1’s.

RunJob is the main Scala code which is the starting point.

To run the spark job

spark-submit –master yarn-client –class com.demo.loganalyzer.RunMainJob spark-loganalyzer-1.0-SNAPSHOT-jar-with-dependencies.jar

Output of the below code is

(17,1)
(18,1)
(16,4)

If you compare the amount of lines needed to achieve the same in Map Reduce using Java and in spark scala it’s 1/10 of the code. So that justifies the effort you put in learning this technologies.

The entire source code with a maven compiled project can be found in my Github

Happy Coding

Cheers !

3 comments

  1. Hi Sreejith Pillai, thank you for knowledge sharing! Unfortunately, I am very new to Spark so I can’t get it working.

    Let’s say I save LogSchema.scala code as: “spark-loganalyzer-1.0-SNAPSHOT-jar-with-dependencies.jar” file name.
    When I am trying to run the spark job – “spark-submit –master yarn-client –class com.demo.loganalyzer.RunMainJob spark-loganalyzer-1.0-SNAPSHOT-jar-with-dependencies.jar”, but getting error:
    java.lang.ClassNotFoundException: com.demo.loganalyzer.RunMainJob

    What I am missing?

    Thank you,
    Phil

    Like

  2. Hi,

    How are you building your jar ? If you are using Maven then make sure you add below lines in your project’s pom.xml. This will add a manifest in jar declaring which is the main class.

    jar-with-dependencies

    com.demo.loganalyzer.RunMainJob

    Like

  3. Hi Sreejith,
    could you help me with below error
    Error: Cannot load main class from JAR file:/home/essujit8176/–master

    build github project with mvn clean package assembly:single

    executed in my cluster (cloudxlab.com)

    $ spark-submit –master yarn-client –class com.demo.loganalyzer.RunMainJob \
    > /home/essujit8176/spark-loganalyzer-1.0-SNAPSHOT-jar-with-dependencies.jar

    SPARK_MAJOR_VERSION is set to 2, using Spark2

    Error: Cannot load main class from JAR file:/home/essujit8176/–master
    Run with –help for usage help or –verbose for debug output

    Like

Leave a comment