package io.angelortega import org.apache.spark.rdd.RDD object MapReduceSolution extends LocalSparkContext { def main(args: Array[String]): Unit = { val inputRdd: RDD[String] = sc.textFile(args.head) val deviceTypePoorRatios = calcPoorRatios(inputRdd) printOutput(deviceTypePoorRatios) sc.stop() } /** * Big-O(N), where N is sum(partition size_i * parallelExecutionTime_i)) and * i is the num stages from 0 until numStages) **/ def calcPoorRatios(inputRdd: RDD[String]): RDD[(String, Double)] = { /** * Read device inputs and use (ip, deviceType) this is useful for reducing shuffling. The key provides more control * of our emission of data to block storage or partitions. A key of (ip, deviceType) can naturally experience * many collisions if one ip is more frequent than another. The result could be block spillage since the key will * assign to use one hash function by default. * * Skew may be reduced if (ips and deviceTypes) are distributed evenly across the blockStorage. * - Removing ip data skew can be achieved by salting our keys * - Salting our keys can help distribute the devices evenly across each partition, * this will work well since our reduceByKey phase will merge our values prior to the shuffle stage * - An alternative to salting is repartitioning, either by * a. Calling repartition to adjust numPartitions and optimizing to make sure our block size stays within the 64MB < 256 MB range. * b. Creating our own HashPartitioner which can evenly distribute our keys * c. Reducing our partitions, so there is more likely a chance for collisions and increasing blocksize to prevent block spillage * - An alternative could by to stream the output to a sink * (Kafka would serve well, because we can adjust the partition count according to key--a good * example of this is twitter increasing partitions for high segment users, a continuous log must be delivered to many followers. * The same can apply to an ip, if the ip hits pings more frequently) **/ val DELIMITER = ',' val devicePerformanceLog = inputRdd .map(_.split(DELIMITER)) .flatMap { case arr if arr.length == 3 && scala.util.Try(arr(2).toInt).isSuccess => Some(arr(0), arr(1), arr(2).toInt) case _ => None } .map(device => { val id = device._1 val deviceType = device._2 val score = device._3 ((id, deviceType), (score, 1)) // K, V }) /** * Big-O((numWorkerThreads/numCores)), where we have numCores is 1, i.e, a single core machine. We can no longer * have the parallelization. Big-O(N), linear time + executionDelay * * In the best case, O(workerThreads/numCores), we have many more cores than threads, numCores >> workerThreads. * This assumes our executor environment carries this trait. * An example: Say we have 4 threads and 8 cores O(4/8) = O(1/2) = O(C), we now have a constant rate * O(1) * This is what we mean by having horizontal scalability. we get O(1) time, * however our memory size is now O(blockSize * numPartitions). This can grow at a linear rate * to take advantage of parallelization for better time complexity. * * * In the best case scenario, we say our time complexity is O(1), with numCores >> numWorkerThreads. * because each map phase occurs at constant time under one parallel operation. To achieve O(1), * we require many cores and RAM storage. * We would like many cores, and keep our data in RAM so we have 700ns-100ns latency, * and do not have network latency. * * However, this is the ideal world. Overall, we will be limited by the max execution cycle for one process and * this will be our bottleneck. * So a pending task may wait and be staged for computation until the execution cycle is finished for that stage. * * The time complexity obeys Amdahl's law: * https://en.wikipedia.org/wiki/Amdahl%27s_law * http://15418.courses.cs.cmu.edu/spring2016content/lectures/19_heterogeneity/images/slide_006.jpg * **/ // The resulting RDD // (1.1.1.1,android,20,1) // (1.1.1.1,android,100,1) // (2.2.2.2,iphone,10,1) // (2.2.2.2,iphone,20,1) // (3.3.3.3,android,10,1) // (3.3.3.3,android,40,1) // (3.3.3.3,android,10,1) // (4.4.4.4,iphone,10,1) // Big-O(N), where N is ((partition size * parallelExecutionTime)) // Add an aggregator, with totalScore and ping count per device combinator and device sequential functions val (totalScore, totalPings) = (0, 0) val deviceAccumulatedMetrics = devicePerformanceLog.aggregateByKey((totalScore, totalPings))((u, device) => { val (scoreAccumulator, devicePingAccumulator) = u val (deviceScore, devicePingCount) = device ((scoreAccumulator + deviceScore), (devicePingCount + devicePingAccumulator)) }, (u1, u2) => { val (mergedScore, mergedCount) = (u1._1 + u2._1, u1._2 + u2._2) (mergedScore, mergedCount) }) // Row((2.2.2.2,iphone),(30,2)) // Row((3.3.3.3,android),(60,3)) // Row((1.1.1.1,android),(120,2)) // Row((4.4.4.4,iphone),(10,1)) // AVG Performance score and isPoor boolean reached from device accumulated metrics val devicePerformanceMetrics = deviceAccumulatedMetrics.map { case (deviceKey, v) => val (totalScore, totalPings, avgPerfScore, isPoor) = (v._1, v._2, v._1 / v._2, v._1 / v._2 <= 50) (deviceKey, (totalScore, totalPings, avgPerfScore, isPoor)) } val poorRatio = devicePerformanceMetrics.map { case ((id, deviceType), (totalScore, totalPings, avgPerfScore, isPoor)) => ((deviceType), ((if (isPoor) 1 else 0), 1)) } val isPoorAggByDevice = poorRatio.reduceByKey((u, v) => ((u._1 + v._1), (u._2 + v._2))) val results = isPoorAggByDevice.map { case (deviceType, (isPoorTally, totalDeviceType)) => (deviceType, (isPoorTally.toDouble / totalDeviceType)) } results /** * Big-O(N), where N is sum(partition size_i * parallelExecutionTime_i)) and * i is the num stages from 0 until numStages) */ } def printOutput(outputRdd: RDD[(String, Double)]): Unit = { outputRdd.collect().foreach { case (deviceType, poorRatio) => println(s"$deviceType,$poorRatio") } } }