Skip to content

Instantly share code, notes, and snippets.

@driquelme
Created May 18, 2015 12:05
Show Gist options
  • Save driquelme/9c3be40b70f61f849f57 to your computer and use it in GitHub Desktop.
Save driquelme/9c3be40b70f61f849f57 to your computer and use it in GitHub Desktop.
Reactively read file with RxScala
trait IO {
case class Chunk(length: Int, bytes: Array[Byte])
def rxReadFilePath(filePath: String, bufferSize: Int):Observable[Chunk] = Observable.create { obs =>
var continue = true
val stream = fileContentStream(new FileInputStream(filePath), bufferSize) takeWhile { chunk =>
chunk.length > 0 && continue }
stream filter { chunk => chunk.length > 0 && continue } foreach {chunk => obs.onNext(chunk) }
Subscription { def unsubscribe = continue = false }
}
def fileContentStream(fileIn: FileInputStream, bufferSize: Int): Stream[Chunk] = {
val bytes = Array.fill[Byte](bufferSize)(0)
val length = fileIn.read(bytes)
Chunk(length, bytes) #:: fileContentStream(fileIn, bufferSize)
}
}
object RxReader extends IO
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment