trait IO { case class Chunk(length: Int, bytes: Array[Byte]) def rxReadFilePath(filePath: String, bufferSize: Int):Observable[Chunk] = Observable.create { obs => var continue = true val stream = fileContentStream(new FileInputStream(filePath), bufferSize) takeWhile { chunk => chunk.length > 0 && continue } stream filter { chunk => chunk.length > 0 && continue } foreach {chunk => obs.onNext(chunk) } Subscription { def unsubscribe = continue = false } } def fileContentStream(fileIn: FileInputStream, bufferSize: Int): Stream[Chunk] = { val bytes = Array.fill[Byte](bufferSize)(0) val length = fileIn.read(bytes) Chunk(length, bytes) #:: fileContentStream(fileIn, bufferSize) } } object RxReader extends IO