Skip to content

Instantly share code, notes, and snippets.

@AtlasPilotPuppy
Last active August 29, 2015 14:08
Show Gist options
  • Save AtlasPilotPuppy/00f8da9b1eaa1b5f56a0 to your computer and use it in GitHub Desktop.
Save AtlasPilotPuppy/00f8da9b1eaa1b5f56a0 to your computer and use it in GitHub Desktop.

Revisions

  1. AtlasPilotPuppy revised this gist Oct 23, 2014. 1 changed file with 5 additions and 1 deletion.
    6 changes: 5 additions & 1 deletion LogAnalysis.scala
    Original file line number Diff line number Diff line change
    @@ -30,4 +30,8 @@ schema_rows.registerAsTable("logs")
    // Traffic generating ip counts
    val ip_access_direct = schema_rows.map(row => (row.ip, 1)).reduceByKey(_+_).map(
    _.swap).sortByKey(ascending=false).map(_.swap).collect()
    println(ip_access_direct.deep.mkString("\n"))
    println(ip_access_direct.deep.mkString("\n"))

    val url_access = sqlContext.sql("SELECT url, count(*) as counts FROM logs GROUP BY url ORDER BY counts DESC LIMIT 10").collect()

    val ip_access = sqlContext.sql("SELECT ip, count(*) as counts FROM logs GROUP BY ip ORDER BY counts DESC LIMIT 10").collect()
  2. AtlasPilotPuppy revised this gist Oct 23, 2014. 1 changed file with 4 additions and 2 deletions.
    6 changes: 4 additions & 2 deletions LogAnalysis.scala
    Original file line number Diff line number Diff line change
    @@ -27,5 +27,7 @@ val schema_from_row = (row: List[String]) => new LogSchema(row(0), row(1).replac
    val rows = log_file.map(tokenize_row)
    val schema_rows = rows.map(schema_from_row)
    schema_rows.registerAsTable("logs")

    val ip_access_direct = schema_rows.map(row => (row.ip, 1)).reduceByKey(_+_).map(_.swap).sortByKey(ascending=false).map(_.swap).collect()
    // Traffic generating ip counts
    val ip_access_direct = schema_rows.map(row => (row.ip, 1)).reduceByKey(_+_).map(
    _.swap).sortByKey(ascending=false).map(_.swap).collect()
    println(ip_access_direct.deep.mkString("\n"))
  3. AtlasPilotPuppy revised this gist Oct 23, 2014. 1 changed file with 7 additions and 0 deletions.
    7 changes: 7 additions & 0 deletions LogAnalysis.scala
    Original file line number Diff line number Diff line change
    @@ -2,9 +2,12 @@
    // log file can be found at ftp://ita.ee.lbl.gov/traces/epa-http.txt.Z

    import org.apache.spark.SparkContext
    import org.apache.spark.sql._
    import java.util.regex.Pattern

    val sc = SparkContext("spark://master:7077", "Log Analysis")
    val sqlContest = new SQLContext(sc)
    import sqlContext.createSchemaRDD

    val log_file = sc.textFile("hdfs://master:9000/user/hdfs/log_file.log")
    val pattern = Pattern.compile("([^\"]\\S*|\".+?\")\\s*")
    @@ -22,3 +25,7 @@ val schema_from_row = (row: List[String]) => new LogSchema(row(0), row(1).replac
    row(2).split(" ")(1).split("\\?")(0), row(3), row(4))

    val rows = log_file.map(tokenize_row)
    val schema_rows = rows.map(schema_from_row)
    schema_rows.registerAsTable("logs")

    val ip_access_direct = schema_rows.map(row => (row.ip, 1)).reduceByKey(_+_).map(_.swap).sortByKey(ascending=false).map(_.swap).collect()
  4. AtlasPilotPuppy created this gist Oct 23, 2014.
    24 changes: 24 additions & 0 deletions LogAnalysis.scala
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,24 @@
    // Log file contains the first 200 lines from http://ita.ee.lbl.gov/html/contrib/EPA-HTTP.html
    // log file can be found at ftp://ita.ee.lbl.gov/traces/epa-http.txt.Z

    import org.apache.spark.SparkContext
    import java.util.regex.Pattern

    val sc = SparkContext("spark://master:7077", "Log Analysis")

    val log_file = sc.textFile("hdfs://master:9000/user/hdfs/log_file.log")
    val pattern = Pattern.compile("([^\"]\\S*|\".+?\")\\s*")
    case class LogSchema(ip: String, date: String, url: String, status: String, time: String)

    val tokenize_row = (row: String) => {
    val matches = pattern.matcher(row)
    var values = List[String]()
    while(matches.find)
    values = values :+ matches.group(1)
    values
    }

    val schema_from_row = (row: List[String]) => new LogSchema(row(0), row(1).replace("[", "").replace("]", ""),
    row(2).split(" ")(1).split("\\?")(0), row(3), row(4))

    val rows = log_file.map(tokenize_row)