Skip to content

Instantly share code, notes, and snippets.

@djspiewak
Created March 22, 2015 19:55
Show Gist options
  • Save djspiewak/d93a9c4983f63721c41c to your computer and use it in GitHub Desktop.
Save djspiewak/d93a9c4983f63721c41c to your computer and use it in GitHub Desktop.

Revisions

  1. djspiewak created this gist Mar 22, 2015.
    483 changes: 483 additions & 0 deletions streams-tutorial.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,483 @@
    # Introduction to scalaz-stream

    Every application ever written can be viewed as some sort of transformation on data. Data can come from different sources, such as a network or a file or user input or the Large Hadron Collider. It can come from many sources all at once to be merged and aggregated in interesting ways, and it can be produced *into* many different output sinks, such as a network or files or graphical user interfaces. You might produce your output all at once, as a big data dump at the end of the world (right before your program shuts down), or you might produce it more incrementally. *Every* application fits into this model.

    The scalaz-stream project is an attempt to make it easy to construct, test and scale programs that fit within this model (which is to say, everything). It does this by providing an abstraction around a "stream" of data, which is really just this notion of some number of data being sequentially pulled out of some unspecified data source. On top of this abstraction, scalaz-stream provides a large number of tools for manipulating, slicing, transforming, merging and outputting (totally a word). These tools, generally referred to as *combinators*, are clean and orthogonal, serving a minimal and targeted purpose with easy-to-understand rules and guarantees.

    Among these combinators are tools for *merging* streams in interesting ways, often involving concurrency. Scalaz-stream makes it very easy to model data transformations that are run in parallel, taking full advantage of multi-core and (we hope soon!) multi-server distributed evaluation. However, scalaz-stream makes no attempt to "guess" about what problems make sense to run concurrently or what sort of granularity seems best. If you want concurrency, scalaz-stream makes it easy to get, but you *must* ask for it. This is a central tenant of the whole library design, and it results in code which is clean, well-factored and has few surprises when it comes time to evaluate.

    This article will introduce the basic principles of scalaz-stream, with a particular focus on doing useful things *easily* and in a testable and scalable fashion. No knowledge of "advanced functional programming" (e.g. Scalaz) is assumed! You don't need to be well-versed in category theory to benefit from the power and cleanliness of scalaz-stream, and this article well help you get started.

    ## Prerequisites

    Scalaz-stream depends on the Scalaz library for technical reasons. At present, there are *two* outstanding and supported versions of Scalaz: 7.0.6 and 7.1.0. The latter is of course more contemporaneous and has numerous features that are absent in 7.0.6, but also some incompatibilities which forces certain projects to use the older version. If you have one of those projects, and you're stuck on Scalaz 7.0.6 for other reasons, you need to use the **0.7** version of scalaz-stream. Otherwise, if you don't already have a dependency (direct or indirect) on Scalaz or you're free to upgrade, use **0.7a**. Thus, your SBT configuration will look like the following:

    ```sbt
    resolvers += "Scalaz Bintray Repo" at "http://dl.bintray.com/scalaz/releases"

    libraryDependencies += "org.scalaz.stream" %% "scalaz-stream" % "0.7a"
    ```

    All of the scalaz-stream classes and modules are contained within the `scalaz.stream` package. By far, the most useful type in this package is `Process`, which represents a sequential stream of…things. We'll get more into what this means in a moment, but for now it is sufficient just to get the dependencies setup.

    ## Transforming and Running Processes

    The `scalaz.stream.Process` type represents a possibly infinite sequential stream of *stuff*. What that *stuff* is depends on the two type parameters that you give to `Process`. For example:

    ```scala
    def foo(p: Process[Nothing, String]) = ???
    ```

    In the above, it looks like `p` is a stream of `String` values. I know from the first parameter, `Nothing`, that there are no *effects* within the stream. Controlling the existence (or non-existence) of effects is a central design philosophy of scalaz-stream, and it's a significant part of what makes it such a powerful and useful tool. For now, think of an effect as something like "reading from a file", "receiving data from an AJAX request", or really anything that involves dealing with "the outside world".

    ```scala
    def foo(p: Process[Task, Request]) = ???
    ```

    Ah, now this is something more interesting! The `Nothing` has been replaced with `Task`, which means that somewhere inside of `p`, we're talking to the outside world. Or rather, we *will* talk to the outside world when we actually go to evaluate `p`. More on this in a bit!

    You'll also notice that the second type parameter is now `Request`. Just guessing, but it looks a bit like the above is a stream of external requests coming into some sort of server (us), possibly over HTTP or some other protocol. I would guess that the above stream is *unbounded*, since we are going to want to continue processing requests forever, or rather until someone asks us to stop. Unbounded, or infinite streams are tremendously useful tools. It's just so easy to say "here is a stream that represents *all* requests I will *ever* receive"! Having such a stream as a first class "thing" that we can manipulate and get our hands around is very very nice indeed.

    But I'm getting ahead of myself. Streams aren't very interesting if we can't *do* things with them, and really that's what scalaz-stream is all about: manipulation and transformation of data (and effects). So what can we do with a `Process`?

    Well, the answer is: nearly everything that we can do with a `Seq`! If you're familiar with Scala's collections library, you should feel very at home working with `Process`. The `map`, `flatMap`, `filter`, `take`, `drop`, `collect`, `zip` (and more!) functions are all available and work exactly as you would expect. As an example, let's create a simple process that has some hard-coded data and do a bit of trivial transformation:

    ```scala
    val p = Process(5, 4, 3, 2, 1)

    val results = p collect {
    case 1 => "one"
    case 2 => "two"
    case 3 => "three"
    } filter { _.length > 3 } map { _.toUpperCase }
    ```

    Pretty standard stuff. We're collecting on the input `Int`(s), filtering and then mapping. If `p` were a `Vector` or a `List`, we would expect `results` to contain something like `Vector("THREE")`. But while `Process` may *look* like a collection, it's not! The `results` variable is of type `Process[Nothing, String]` (no effects were harmed in the making of this stream), and it doesn't really "contain" anything. Rather, `results` knows how to *compute* the sequence of data that will result from our pipeline of operations, but it hasn't actually done the work yet. If we want to ask it to do the work, we need to put in a bit more effort.

    ### Running Streams

    ```scala
    results.toSource.runLog.run // => Vector("THREE")
    ```

    The `toSource` in the above is a bit weird, and not actually something you'll be doing very often. We need it here because scalaz-stream actually doesn't understand how to run a `Process` that *doesn't* have effects! `toSource` takes our effect-less stream and makes it *pretend* to have effects in it (even though there aren't any there) so that `runLog` can do its work.

    Now, `runLog.run` is obviously a very clumsy incantation. The essence of it is that `runLog` instructs `Process` to compile its pipeline down to a single atomic operation (in this case, a `Task`) which *when run* produces a `Seq[String]`. It would of course be a `Seq[Int]` if our input process (`results`) contained `Int` data, or `Seq[Request]` if it contained requests. As the second sentence of this paragraph implies though, `runLog` alone does not actually *run* the `Process` (this distinction can get even *weirder* if we use the `run` interpreter rather than `runLog`, since *neither* actually run the process). Instead, it produces a single atomic operation, which in our case is a `Task` (it is determined by the first type parameter given to `Process`) which *can be run* to produce the final output.

    There are some very good reasons for this convoluted dance, but they get into some more complex aspects of the framework. The good news is that, in practice, you really don't spend a lot of time running processes! After all, why would you run a process *to completion* when you can just compose it with another process? In practice, most programs have *one* instance of `runLog.run` (or more conventionally, `run.run`, which discards the individual results) right at the very end of the program. If you have more than one spot in your code (outside of tests, of course) where you say `p.run.run` on some process `p`, you're probably doing something wrong and you should look for a built-in combinator that allows you to compose your processes more effectively.

    ### Composition and Infinities

    Coming back to process composition, we also have the ability to merge streams together using standard combinators like `zip`. For example:

    ```scala
    val names = Process("one", "two", "three")
    val nums = Process(1, 2, 3)

    names zip nums // => Process(("one", 1), ("two", 2), ("three", 3))
    ```

    If you recall from earlier, `Process` represents a *possibly infinite* stream of data. We can exploit this functionality, just as with `scala.Stream` (which is otherwise very different from `Process`!) to do the above in a slightly "cooler" way:

    ```scala
    val names = Process("one", "two", "three")
    def nums(n: Int): Process[Nothing, Int] = Process(n) ++ nums(n + 1)

    names zip nums(1) // => Process(("one", 1), ("two", 2), ("three", 3))
    ```

    Obviously far less concise than the first version, but it just *feels* more awesome. The important point is that we're getting comfortable working with *infinite* streams of data. As I alluded to earlier with the "infinite stream of requests" example, infinite streams are enormously convenient as a tool. Just at a conceptual level, if you model your entire program as a composition of streams, and your program is some sort of server that is meant to run until killed, then you *must* have some sort of infinite stream (or streams plural!) inside of your program. This is the *rule*, not the exception to how scalaz-stream is used.

    Now of course, given that scalaz-stream allows the representation of infinite streams, some familiar collections operations just don't make sense:

    ```scala
    def nums(n: Int): Process[Nothing, Int] = Process(n) ++ nums(n + 1)

    nums(0).length // doesn't compile!
    ```

    You can't ask for the length of a `Process`. It's *possible* to implement the `length` function, but if you tried to use it on an infinite process, it would just hang forever. Note that it also isn't possible to "index" into a `Process`, to for example ask for the fourth element. With scalaz-stream, the *stream* is what is interesting, not really any one particular piece of data within that stream. So if you find yourself hunting around for tools to treat `Process` as if it were a `Vector` or even a `List`, you should probably adjust the way you're using the library, because you'll get *much* more out of it by embracing the stream paradigm!

    ### Finding More

    There are a *ton* of very very useful and yet simple transformations on `Process` that are very "collection-like". Most of the single-`Process` transformations (like `collect`) are implemented through a powerful abstraction called `Process1`, and then implicitly added to the `Process` type. You can find all of the built-in single-`Process` transformations in the `scalaz.stream.process1` object. Similarly, nearly all of the two-`Process` transformations (like `zip`) are implemented through a powerful abstraction called `Tee`, and similarly implicitly exposed. You can find all of the built-in two-`Process` transformations in the `scalaz.stream.tee` object. So if you're not sure whether or not a particular function exists on `Process`, you can go hunting through those modules to perhaps find what you're looking for.

    ## Performing Actions

    As I've been insisting all along, `Process` represents a (possibly infinite) sequence of *stuff*, where the stuff could be data and it could be effects. An effect is talking to the outside world in some way, or receiving data from the outside world, or otherwise doing useful things. Effects are encapsulated inside of the *functor* in which the `Process` is evaluating. This is a ton of terminology all at once, but the short way of looking at it is that your effects are all managed by the *first* of the two type parameters on `Process`. For example:

    - `Process[Nothing, Int]` - Cannot contain *any* effects at all!
    - `Process[Task, Int]` - Probably talks to the outside world in very complicated ways, and perhaps involves concurrency
    - `Process[Burrito, Int]` - Contains effects, but apparently only contains effects related to consumption of Chipotle

    Just as a function of convenience, *most* of the time you're going to be dealing with process types of the form `Process[Task, _]` (fill in the `_` with your data type). `Task` is the most general effect possible, and it's also the only effect which allows concurrent composition of streams. With that said, it is tremendously useful to be able to have more restricted effects, like `Burrito` in the above. After all, if the only effects your stream can perform is to eat a delicious meal, then your stream becomes *that much easier to test!* You don't have to worry about mocking a database or stubbing out a network client if the stream you're trying to test *cannot* possibly talk to a database or a network. `Task` effects can talk to databases and networks. `Burrito` effects cannot, and this is a secret magic protip for terrifyingly easy testing.

    But back to actions… Our program needs to, at some level, talk to a database or read from a network channel or launch the missiles, and this is where `Task` comes in. For example, here are a pair of actions which read from and write to stdout, respectively:

    ```scala
    def puts(ln: String): Task[Unit] = Task { println(ln) }
    val gets: Task[String] = Task { Console.readLine() }
    ```

    If you're reading someone else's code, you might also see actions constructed using `Task.delay`. Don't worry *too much* about the difference between `Task.delay` and `Task.apply` (what we're using above). If you're using scalaz-stream, the differences between these two constructors are mostly meaningless, so you may as well use the shorter one!

    So how do we put actions into a stream? Very simply, as it turns out:

    ```scala
    val p = Process eval puts("Hello, World!") // => Process(())
    ```

    Now, `p` in the above is a `Process[Task, Unit]`, so it doesn't really produce any *useful* values. What it does do though is perform an action, which is to say, run `println`! If we were to interpret `p` by calling `p.run.run`, we would get the classic results printed to our console.

    Processes that contain effects can be composed using the *exact same* tools that we've been using to play around with processes that do not contain effects. And *this* is the power of scalaz-stream! For example, we could write an echo terminal in the following way:

    ```scala
    val lines = Process repeatEval gets

    val p = lines flatMap { line =>
    Process eval puts(line)
    }
    ```

    We haven't seen `repeatEval` yet, but it's very much like `eval` except that it keeps evalling forever. So, `lines` is a `Process[Task, String]` that represents *every line the user will ever enter via stdin*. We then `flatMap` into this infinite stream of lines, and for each one, we `eval` the `puts` action on that line. Actually, this pattern of having some effectful "destination" for data is so common that scalaz-stream uses higher-order design to provide a first-class manifestation in the form of `Sink`.

    `Sink[Task, A]` is simply equivalent to `Process[Task, A => Task[Unit]]`, which is to say a stream of *functions*, where each function performs an action. It's a very simple idea, but it's powerful enough to represent everything that we would want from terminal points for our data processing.

    ```scala
    val lines = Process repeatEval gets
    val stdout = Process constant (puts _) toSource

    val p = lines to stdout
    ```

    This snippet does *exactly* the same thing as the previous example with `flatMap`. The only difference is that we're lifting `puts` into a sink (`stdout`) of type `Sink[Task, String]` and using the `to` combinator to connect `lines` into this sink. The end effect is the same, just substantially more concise.

    Incidentally, the `constant` and `toSource` bits aren't magic or in any way specific to `Sink` (in fact, we saw `toSource` before). The `constant` constructor creates a `Process[Nothing, A]` given a value of type `A` (in this case, `puts _`), where the process is an infinite stream of that one value, over and over and over. In this case, that is precisely what we want for a `stdout` sink: the way in which you print to stdout doesn't vary over time. However, some sinks, for example a network load balancer abstracting over several destinations, might want to have a different "write" function as the stream goes along. Since `Sink` is just a `Process`, we clearly have more than enough power to support this kind of thing, but we don't need it here.

    And as before, `toSource` is just a bit of faerie dust that converts our `Process[Nothing, A => Task[Unit]]` into a `Process[Task, A => Task[Unit]]` by *pretending* there are effects of type `Task`. Remember, the *value* `puts _` is not an effect, it's just a value! It may *produce* an effect (of type `Task[Unit]`), but it is not in and of itself an effect. This is why we can lift it into an infinite stream using `constant` rather than `repeatEval`.

    The `to` combinator is very similar to what we wrote earlier with `flatMap`. In fact, it's almost precisely the same thing! It repeatedly takes a value from the left (the source) and a function from the right (the sink) and applies the function to the value, `eval`ing the results into the stream. Thus, `p` has type `Process[Task, Unit]`. It's an infinite stream of effects, all of which produce `()` as a result. Of course, `()` isn't a very useful value, so clearly what we're interested in here is just the effects. For that reason, if we run `p`, we're probably not going to use `runLog` (which would give us a `Task[Seq[Unit]]`). Instead, we'll just use `run`, which produces a `Task[Unit]`, effectively discarding any data produced from the process and only keeping the effects.

    ```scala
    p.run.run
    ```

    The above will run until we hit <kbd>Ctrl-C</kbd>, reading lines one at a time from the user and immediately echoing them back out.

    ### Callbacks

    A very, *very* common situation in asynchronous code is to run into an API which is structured around a callback or listener system. For example, when you're using Java's NIO library, you accept connections on a server channel by passing the channel a *listener* that will be invoked once a connection is established. For example, something like this (simplified pseudocode):

    ```scala
    val channel = ...
    channel.accept(new ConnectionListener {
    def ready(connection: Connection): Unit = {
    // read data out of connection here!
    }

    def failed(t: Throwable) = {
    // maybe log?
    }
    })
    ```

    If scalaz-stream *didn't* provide a clean way of interacting with these sorts of APIs, then it wouldn't be a very useful framework for writing practical asynchronous applications! The magic to address this situation is wrapped up in the `Task.async` constructor.

    Thus far, all of our `Task` actions have been constructed using `Task.apply`, which takes a block of code that will be run at some future time, probably on a different thread. `Task.async` is a little different. This constructor takes a function which in turn receives a *callback*. When the callback is invoked, the `Task` is completed. So rather than waiting for a definite block of code to run to completion, it creates a `Task` which waits for a listener to be notified! We can modify the above example to produce a `Task[Connection]` using `Task.async`:

    ```scala
    val accept: Task[Connection] = Task async { cb =>
    channel.accept(new ConnectionListener {
    def ready(connection: Connection) = cb(right(connection))
    def failed(t: Throwable) = cb(left(t))
    })
    }
    ```

    So `accept` is an effect which *asynchronously* (and without blocking!) accepts an incoming `Connection` on `channel`. If and when a connection is received, it is used to complete the task by invoking `cb` with `right(connection)`. The "`right`" here comes from Scalaz's `Either` type, which is very similar to `scala.Either`. If we hit an error, we complete the task with that error by invoking `cb` with `left(t)`. It's as simple as that!

    I can even work with streams of asynchronous effects just as easily as I can work with streams of synchronous effects!

    ```scala
    val server: Process[Task, Connection] = Process repeatEval accept
    ```

    So, `server` is a stream of *every* connection that `channel` will ever receive. These connections will be received asynchronously, without blocking any threads, so we're not sacrificing any throughput to have this nice abstraction.

    Now you're starting to see where scalaz-stream can bring *real* tangible value to a real world application. Modeling a server as an infinite stream of connections makes it very very simple to write functions that work with those connections, filter out undesired ones, and so on. We don't have to worry about crazy asynchronous loops or thread forking or any of that madness. Everything just works, and it's very very simple and concise.

    ### Resource Safety

    One of the more useful features that scalaz-stream surrounding effects is the ability to guarantee resource safety, even in the face of interrupts, shutdown, errors, and the like. It wouldn't be very practically useful to provide a mechanism for opening a network channel without also providing some way of guaranteeing that channel is inevitably closed! This feature is satisfied by `onComplete`.

    Now, `onComplete` is a bit of a weird beast. It's a combinator on `Process` that takes another `Process` as its parameter. The parameter (on the right hand side) is *guaranteed* to run exactly once when the left hand side "completes", where by "completes" I mean runs to completion, hits an error, or even is interrupted by some other concurrent process. It's a bit like the `finally` in a `try/finally` block. The trick though is that an `onComplete` parameter *will not run* if the left hand side never runs in the first place! Thus, you can construct finalizers (using `onComplete`) that *seem* like they're always going to be hit, but in fact which sometimes do not run. The scalaz-stream project has some future plans to alleviate this confusion, but for now, just try to be careful to attach your `onComplete` to a "top level" process in a composition (i.e. *not* a process within the right side of an `++` or *inside* of a `flatMap`).

    Grabbing a resource and then releasing it with a finalizer is very straightforward, though we can't do it (safely) with `eval`. Instead, we're going to use the slightly lower-level `await`.

    ```scala
    val p: Process[Task, Array[Byte]] = Process.await(connect) { channel =>
    val reads = Process repeatEval (Task async channel.accept)
    reads onComplete (Process eval_ (Task { channel.close() }))
    }
    ```

    There are a few new things here. First off, `await` is a constructor on `Process` that performs an effect (in this case, `connect`) and passes the *results* of that effect to the body of the function it is given. This function then uses the results of the effect to compute a new `Process` (in our case, `reads`) which gets returned from the body. Any values produced by the process within the body of the `await` will be the values produced by the process *returned from* `await`: namely, `p` in our example. Thus, `await` is a way of computing a stream based on the results of a single effect. The `eval` constructor is simply defined in terms of `await`:

    ```scala
    def eval[F[_], A](effect: F[A]): Process[F, A] =
    await(effect) { a => Process(a) }
    ```

    Speaking of `eval`, the example from earlier contains a bit which reads `Process eval_`. This isn't a typo! The `eval` constructor comes in two flavors: with and without an underscore. The only difference is that `eval` produces the value which was computed by its effect, while `eval_` performs the effect (just like `eval`) and then *throws away the value*. This is really useful for a finalizer, since we just want to clean up some resources (which is an effect!) and we don't care about producing more values.

    The *really* interesting bit though is `onComplete`. This is where the magic happens for finalization. If and only if the `connect` effect *completes* (as in, it produces a `channel`), the finalizer (the right side of `onComplete`) is *guaranteed* to run exactly once. This is an invariant that the library will preserve regardless of errors or interrupts. If you get a `channel`, then you also get the chance to dispose of it with `onComplete`.

    Building a complex application with scalaz-stream necessitates a *lot* of resource allocation and deallocation. The `onComplete` combinator is the tool with which you tame this mess. It allows you to keep your resource management local and *guaranteed*, so that you never have to worry about resources leaking because of some weird concurrent exception or interrupt thing caused by another process way over on the other side of town. You can produce streams of data that require resource management to compute, and you can use those streams *safely* without having to worry about any of the details of when their resources are acquired and when they might (or might not be!) released.

    Naturally, this provides an excellent foundation for a file read/write API. For convenience, scalaz-stream does provide several functions within the `scalaz.stream.io` module which satisfy this use case. For example, the following process reads all of the lines from the "foo.txt" file, computes the length of each line, and then writes those lengths out to "foo-lengths.txt".

    ```scala
    val p: Process[Task, Unit] = io linesR "foo.txt" map { _.length } to (io linesW "foo-lengths.txt")
    ```

    All of this is resource safe *and* incremental. In other words, we're *not* reading the entire "foo.txt" file into memory before computing the lengths and then writing the whole thing out again. In fact, foo.txt could be *arbitrarily* large, and while the program would naturally take longer to complete, it wouldn't use any more memory. Try writing *that* by hand!

    Another example would be replicating the Unix `cat` utility using scalaz-stream.

    ```scala
    def cat(file: String): Unit = {
    def puts(ln: String): Task[Unit] = Task { println(ln) }
    val stdout = Process constant (puts _) toSource

    (io linesR file to stdout).run.run
    }
    ```

    As a side-effect of all of this (no pun intended), scalaz-stream is probably the easiest way to read or write a file in Scala. The equivalent code to the above using `scala.io.Source` is dramatically more verbose and easier to get wrong, in addition to being resource unsafe. I use the above combinators (and the others like them) all the time for really simple "script-ish" things where I just need to mash some data into a file and/or slurp it back out again.

    ## Merging and Concurrency

    Outside of really straightforward tricks with file reads and the like, basically all *useful* applications have multiple sources of data and multiple sinks that they need to populate, depending on data and program state. Scalaz-stream would be a truly useless library if it didn't provide *some* way of bringing data in from multiple sources, aggregating and transforming, feeding back out to various sinks, reading in from *new* sources, and so on. Fortunately, scalaz-stream goes well above and beyond the minimum feature checkbox here, and its combinators for merging streams are powerful, simple, and form the backbone of its concurrency support.

    This last bit is worth dwelling on for a moment. As discussed, a `Process` is a possibly infinite sequence of *stuff*, with the key word here being "sequence". When a `Process` containing the values `a`, `b` and `c` (in that order) is evaluated, we must *finish* computing `a` before we go back to start computing `b`, which in turn we must finish before we start computing `c`. `Process` does not pipeline operations in sequence (note that this is a significant difference from how Akka Flows work). This is very nice because it makes it very easy to reason about what effects are performed one-after-the-other, and furthermore makes it very easy to *enforce* a strong ordering between effects when one is needed (just put them in order in a `Process`!).

    The problem with a strongly sequential paradigm is that concurrency must be somehow achieved in other ways, and this is where the merge combinators become significant. If you have two strongly sequential streams, neither one of them will have any concurrency in its own computation. However, you can merge them together into a single strongly sequential stream, where the exact interleaving between the two streams is nondeterministic. This is to say that two streams, `Process(a, b, c)` and `Process(x, y, z)`, could be merged into a single stream `Process(a, b, x, c, y, z)`, or perhaps `Process(x, a, b, c, y, z)`, or `Process(x, y, z, a, b, c)`, or others. We will never see `b` *before* we see `a` in the output stream, nor will we see `z` before we see `x`, but the exact interleaving between the two stream values is allowed to float, and *this* is where concurrency comes from.

    Since we are defining the merge operation to interleave two streams without constraining mutual ordering, scalaz-stream is free to run that merge in parallel. In fact, what it does under the surface is run both streams *simultaneously* using a function called `stepAsync` (this function is technically part of the public API, but it is very low-level and unsafe, so not recommended for use). The merge operation will allow either side to "race ahead" of the other side, only constrained by scheduling fairness (i.e. it won't allow one side to hog *all* the threads). So in a sense, you can think of the merge operation as "run both processes as fast as they can go, producing results as they are available in the order they arrive".

    If you think about it, this is precisely the semantic that we want, not only for concurrency and multi-core throughput optimization, but *also* for combining multiple data sources into one! As an example, let's imagine you're writing some sort of social media analytics application, where you're grabbing data from various social media platforms, searching for your company's name, performing some sort of sentiment analysis on the contents and reporting the results to a data store as well as a real-time output feed. This is actually a relatively common thing that companies *actually* do, by the way, and it is very representative of how scalaz-stream can be used to write highly concurrent applications.

    ```scala
    def computeSentiment(text: String): Double = ??? // there are tools for this

    val facebook: Process[Task, Post] = ??? // talk to facebook somehow and get data in real-time
    val twitter: Process[Task, Tweet] = ??? // twitter streaming API ftw

    val fbText = facebook map { _.text }
    val twText = twitter map { _.contents }

    // merge the two feeds and process them together!
    val sentiment = fbText merge twText filter { _ contains "Evil Corp" } map computeSentiment

    val datastore: Sink[Task, Double] = ???
    val streaming: Sink[Task, Json] = ???

    sentiment observe datastore map { d =>
    json"""{"score": $d}"""
    } to streaming
    ```

    Don't blink or you'll miss it! The `merge` combinator in the above is what is doing all the magic here. It functions exactly the way we have described: taking data from both streams as fast as they can produce it, merging down into a single output stream. Presumably, both the `facebook` and the `twitter` streams involve some sort of network connection to an external API. In the above formulation, using `merge`, *neither* stream will "wait" for the other one. If Twitter is giving us data faster than Facebook is, then we will run the Twitter stream faster, pushing the results into the merged output process as they arrive.

    Like many of the combinators in scalaz-stream (e.g. `zip` or `filter`), `merge` is actually implemented through a very general abstraction called `Wye` (similar to `Tee`, except nondeterministic). There are a lot of other extremely useful combinators implemented via `Wye` in the `scalaz.stream.wye` module. In addition to `wye.merge`, two others that I use on a daily basis are `wye.mergeHaltBoth` (similar to `wye.merge`, except the resulting stream ends as soon as either input stream ends) and `wye.interrupt` (allows you to remotely "kill" a stream; very useful for testing).

    As a quick sidebar, the `observe` combinator in the above is also new. It works almost exactly like `to`, except it doesn't consume the values in the stream. This is really nice, because it allows you to wire up multiple sinks to the same stream, with each one seeing the same values. It also allows you to very easily debug your processes, since it's pretty trivial to just inject a sink that prints values to stdout as they flow through the stream, but without disrupting the stream itself. For example:

    ```scala
    val stuff: Process[Task, Value] = ???

    // without debugging
    stuff flatMap doThingsWithStuff filter throwStuffAway ...

    // with debugging (using the stdout sink from earlier)
    stuff observe stdout flatMap doThingsWithStuff filter throwStuffAway ...
    ```

    Very non-invasive, and very easy. I write logging sinks all the time and just drop them in using `observe`.

    Anyway, coming back to our analytics example... One of the things you may notice is that we're merging the Twitter and Facebook streams *before* we perform the sentiment analysis. This makes a lot of sense from a code structure standpoint, since our processed Twitter and Facebook streams are just producing text that we handle in a uniform way. However, even though the `merge` combinator is racing its two input streams against each other, the *output* of `merge` is once again strongly ordered! The `merge` combinator doesn't somehow produce a stream that magically runs itself in parallel, and this sets up an obvious point where we can optimize: why not run `computeSentiment` in parallel?

    Here's the implicit observation in the above: we don't really care about the *order* in which people tweet or post about Evil Corp, all we care about is that they posted and what sentiment it conveyed. Presumably, any analytics that we're performing over our "social media footprint" is going to be agnostic to whether or not we reordered one person's tweet relative to another person's unrelated Facebook post that happened at *almost* the same time. Thus, since we don't *really* care about the order of our output, at least not within some sort of reasonably small time window, we can inform scalaz-stream that it's ok to relax the sequentiality just a bit, gaining concurrency (and throughput!) as a result.

    To be clear, what we're trying to do here is get concurrency *within a single stream*. We're not talking about running two streams simultaneously; we're talking about running multiple transformations against elements of a *single* stream concurrently. This is directly in conflict with scalaz-stream's promise that `Process` is a *sequence*, so how do we achieve this (apparently) very desirable result?

    The answer is another combinator: `merge.mergeN`. This combinator is kind of the golden hammer of scalaz-stream's concurrency support, and it is a powerful hammer indeed. It has the following (simplified) type signature:

    ```scala
    def mergeN[A](ps: Process[Task, Process[Task, A]]): Process[Task, A]
    ```

    So it's a bit like flattening a `Seq`. It takes a "stream of streams" and runs the inner streams *concurrently*, producing output in a single flattened stream as fast as it becomes available. Of course, there's a very real risk here that the outer stream (the stream that contains other streams) might run *too* fast, giving us new work to do before we're ready for it and ultimately running us out of memory. In order to avoid this case, the real `mergeN` takes some tuning parameters (such as `maxOpen`) that can be used to reign in some of the concurrency as required.

    So how do we use `mergeN` to solve our asynchronous `computeSentiment` problem? After all, we don't really have a "stream of streams"; we just have a regular old stream. The answer is that we need to do a bit of juggling *before* we call `computeSentiment` in order to effectively "inform" scalaz-stream that we're trying to run things in parallel:

    ```scala
    val aboutUs: Process[Task, String] = fbText merge twText filter { _ contains "Evil Corp" }

    val ps: Process[Task, Process[Task, Double]] = aboutUs map { text =>
    Task { computeSentiment(text) }
    } map { t => Process eval t }

    val sentiment: Process[Task, Double] = merge.mergeN(ps)
    ```

    This is obviously a lot bulkier than before, but that sort of *makes sense* since we're doing something very specialized with `computeSentiment`. Remember, a core tenant of scalaz-stream is that you must be *explicit* whenever you want nondeterministic behavior. It's not going to just jump off a cliff based on a guess that you *might* want one thing or another. You need to tell it what you want in no uncertain terms.

    To unpack the above, what we're doing is *lifting* `computeSentiment(text)` into a `Task`. Now this might seem a bit weird, since of course `Task` usually represents an effect like talking to the filesystem or launching the missiles at Rival Corp. However, if you think about it, concurrency is *sort of* like an effect! Just like with talking to the filesystem, we want to be very clear that we *really* want to be asynchronous, otherwise we might accidentally read from and write to a file in the wrong order, or our test suite might need to have some sort of crazy timeout in order to properly check that we did things. Running things in parallel is an effect that needs to be controlled for the same reason as any other effect, and scalaz-stream makes this very explicit.

    After lifting `computeSentiment(text)` into a `Task`, we `eval` that `Task` into a `Process` (which only contains that one sentiment analysis!). Now we have an outer `Process` that represents our filtered Twitter and Facebook streams (merged), and a bunch of inner streams that just perform the sentiment analysis. We feed this to `mergeN` (which is in the `scalaz.stream.merge` module) and the result is a single stream, `sentiment`, that contains all of our sentiment analysis. This stream is now strongly ordered and sequential (it's not going to magically change ordering on us just because it *came from* `mergeN`!), but the order almost certainly doesn't line up with the `aboutUs` stream. This is because sentiment analysis on certain text fragments might take a bit longer than on other fragments, and the quicker fragments are going to "win the race" and produce their sentiment analysis faster. This is of course, within the bounds of fairness and scheduling (something that `mergeN` takes great pains to get right), but you get the idea.

    So when we *want* to relax the sequentiality of `Process` and run things in parallel, we *can* using `mergeN`. We have to work a bit in order to convince scalaz-stream that we're *really* sure about it, but we can absolutely get it done. One of the interesting things about this pattern is that applies uniformly to a lot of other natural use cases for parallel processing within a single stream. For example, an obvious way to model a server would be a stream of streams, where the inner streams represent the data coming from a particular client, and the outer is the stream of connections from different clients. A very, very common pattern with this sort of server-side programming is to use `mergeN` to collapse all of these streams together, handling connections in parallel (because that's what `mergeN` does!) within our `maxOpen` bound.

    ```scala
    val server: Process[Task, Process[Task, ByteVector]] = ???

    merge.mergeN(server) to (io fileChunkW "firehose.bin")
    ```

    The `ByteVector` type in the above is something you might see a fair bit floating around scalaz-stream. Just think of it like a functional, *fast* `Array[Byte]`. If you want to read more about it, you can look up the [scodec-bits project](https://github.com/scodec/scodec-bits).

    No need to explicitly fork off a thread to handle each client connection as we would need to do in a traditional server implementation! We just describe to scalaz-stream *semantically* what we want, which is to say handling all of the client connections in parallel, and it just goes off and does it for us! Composable combinators make this sort of thing very easy and very predictable.

    ## Coordination and Flipping

    Let's think about the following "real-world" sort of use case:

    ```scala
    // gets called from the outside world, perhaps by a server socket or a UI
    def receiveData(data: String): Unit = {
    // we're going to get called a lot, and we ideally want to process this stream of data with scalaz-stream
    // how can we get all of the data we receive *into* a Process?
    }
    ```

    This is a bit of a tricky question, since `Process` doesn't provide a mutable `insert` function. You can append two processes together using `++`, but that operation *returns* a new `Process` without modifying the old ones. How do we "inject" data into a stream?

    The answer to this is surprisingly straightforward and classical: we use a queue. Every time `receiveData` gets called, we take its value and dump it on a queue. Then, somewhere else, we have a `Process` that just sources its data from that queue, allowing us to process the firehose of `data` we receive using a clean and compositional framework.

    This is a pretty natural solution to the problem, and something that should seem familiar to anyone who has done server-side development. Given that we already understand how to safely create processes from mutable sources (`Task` ftw!), it shouldn't be too difficult to implement.

    ```scala
    val q = new ArrayBlockingQueue[String](10) // never, EVER have an unbounded queue!

    def receiveData(data: String): Unit =
    q.offer(data)

    val received: Process[Task, String] = Process repeatEval (Task { q.take() })
    ```

    So far, so good. However, you'll notice one really ugly thing in the middle here: `q.take()` is a *blocking* operation. Of course it does suspend the thread, so we're not eating up CPU resources, but we *are* eating up a member of our very finite thread pool! Do this sort of thing with enough processes simultaneously and you'll have a major thread starvation problem. Of course, we could use `q.poll()` and maybe employ some sort of timeout, but that's just taking a `Thread.sleep` and turning it into a busy-wait. Neither of these are particularly appealing solutions.

    The answer to the problem can be found inside of scalaz-stream. It turns out that the above use case is *so* common that scalaz-stream has a custom queue implementation, one which doesn't *ever* block! This works through the magic of `Task.async`, but we don't really need to care about that at a high level. From a usage standpoint, scalaz-stream's queue looks exactly the same as a regular queue, except that we never *ever* block a thread waiting for data, so our throughput is extremely good even with dozens (or hundreds!) of queues floating around our application.

    ```scala
    val q = async.boundedQueue[String](10) // still a bad idea to have an unbounded queue!

    def receiveData(data: String): Unit =
    q.enqueueOne(data).run // enqueueOne returns a Task, so we need to run it

    val received: Process[Task, String] = q.dequeue
    ```

    It's that simple! We enqueue data using either `enqueueOne`, which takes a single value and returns a `Task[Unit]`, or using the `enqueue` sink, which is a `Sink[Task, String]` (in our above example, anyway). Using the `enqueue` sink makes a lot of sense if we're trying to take one (or more!) processes and dump their data out into a queue for later processing. Using `enqueueOne` makes the most sense for use-cases like the above, where we're trying to interface with some sort of external notifier.

    Calling `q.dequeue` gives us a process that reads its values sequentially off of the queue. There's nothing magical about this resulting stream; it's still a `Process`! In fact, it will look very very much like the `repeatEval` process that we implemented in our first attempt on top of `ArrayBlockingQueue`. The only real difference is that the `q.dequeue` process is never going to *block* when the queue is empty. Instead it just…stops running. No threads are harmed in the stopping of this process. As soon as new data is available in the queue, `received` will immediately spring back to life and handle the data as it arrives. It's *perfect* for these sorts of uses, and in real world scalaz-stream applications, `async.boundedQueue` is one of the most commonly applied tools.

    Unfortunately, as cool as `async.boundedQueue` is, it doesn't solve *every* problem. The weakness of `async.boundedQueue` becomes apparent if you ever try to use more than one `q.dequeue` stream at the same time. As with a traditional queue, dequeuing from multiple threads simultaneously does *not* give all threads the same data! This may seem a bit obvious when said in this way, but the abstractions of scalaz-stream can make it *appear* a bit unintuitive.

    ```scala
    val q = async.boundedQueue[Int](10) // have I mentioned how important it is to specify a bound?

    val left = q.dequeue
    val right = q.dequeue

    (Process(1, 2, 3, 4, 5) to q.enqueue).run.run // put a bunch of data in the queue

    val both = (left map { i => s"left: $i" }) merge (right map { i => s"right: $i" })
    (both take 5).runLog.run // => uh...could be anything!
    ```

    The results at the end are going to be very non-deterministic. I ran this a few times and got the following results:

    - `Vector(left: 2, right: 1, right: 3, left: 4, left: 5)`
    - `Vector(left: 1, right: 2, right: 3, left: 4, left: 5)`
    - `Vector(right: 2, left: 1, right: 4, left: 3, left: 5)`
    - `Vector(left: 1, right: 2, right: 3, left: 4, left: 5)`

    You get the picture. The point is that, in the above, you never see a value which is received by *both* `left` and `right`. They're getting different data! In fact, not only are they getting different data, but they are *guaranteed* to get different data! When one process dequeues a value, it is an atomic operation and no other process is allowed to get the same data. This is an important property of queues, but it *can* be just a bit surprising.

    In any case, sometimes you really really do need to have a queue that can have multiple subscribers all receiving the same data. For example, if you're writing a group chat application, your chat server needs to have something inside of it that feeds incoming messages back out to *all* clients equally, not just one randomly-selected client! Scalaz-stream isn't blind to this use-case, and it provides a special construct called a *topic* which fits the bill. We can use this to build a very simple group chat server, just as I mentioned:

    ```scala
    val connections: Process[Task, Process[Task, Exchange[ByteVector, ByteVector]]] =
    Netty server (new InetSocketAddress("localhost", 9000)) map {
    case (_, client) => client
    }

    val room = async.topic[ByteVector] // a room is a shared multi-queue of messages, which are just raw bytes

    val server: Process[Task, Unit] = merge.mergeN(connections map { client =>
    client flatMap { ex =>
    val incoming = ex.read to room.publish
    val outgoing = room.subscribe to ex.write
    incoming merge outgoing
    }
    })
    ```

    There's a few new things in the above, but they're all really straightforward and built on what we've seen before. The weirdest one is this whole `Netty server` thing. This isn't something that is built into scalaz-stream, but it *is* built on top of scalaz-stream and available in the [scalaz-netty](https://github.com/RichRelevance/scalaz-netty) project. It's basically just a scalaz-stream wrapper around the [Netty NIO framework](http://netty.io) with a very simple API.

    We've already seen `ByteVector`, but we haven't yet seen `Exchange`. This *is* something that's built into scalaz-stream, and it's really just a pair of a source and a sink. The `read` side of the exchange is a `Process[Task, ByteVector]`, while the write side is a `Sink[Task, ByteVector]`. This very directly represents the bidirectional nature of a socket connection. Semantically, we want to be able to *read* data from the client and put it into the chat room, while simultaneously listening for new data in the chat room and *writing* it back down to the client.

    This is where `Topic` comes in. We create our chat room up towards the top using `async.topic[ByteVector]`. `Topic` is simply a "multi-queue", where the defining property is that we can `subscribe` to the topic as many times as we want, and each subscriber will get the *same* data. Note that the data backlog is *not* saved; new clients will not see old messages (so this is IRC, not HipChat). We `subscribe` to the `Topic` and dump its contents directly into the `write` side of the client `Exchange`, while simultaneously taking the `read` side of the client and dumping it into the `publish` for the `Topic` (which is really just an `enqueue` sink). We want to read from and write to the client simultaneously, so we `merge` both sides of the operation. Finally, we want to be able to handle multiple client connections simultaneously, so we run `mergeN` over the whole server to produce the final result. Very simple, declarative and resource-safe!

    ## Backpressure

    Backpressure is one of those things that most people don't really think about until the first time they work on a distributed application with multiple services, all threading data from one place to another. It's a problem of *scale* that just doesn't occur in a simple test bed, but it is nonetheless an enormously important question to answer. For anyone who *has* done distributed services at scale, backpressure is usually the very *first* worry that you have, and deservedly so. The good news is that scalaz-stream, by its very design, supports backpressure in an extremely elegant and seamless fashion…mostly.

    [Wikipedia's "article" on backpressure](https://en.wikipedia.org/wiki/Back_pressure#Back_pressure_in_information_technology) is uncharacteristically uninformative, especially given its importance in modern systems architecture. Simply put, backpressure comes from accepting the fact that all servers have finite resources. We can write a server that handles a thousand connections per second, but if we open up the faucet a bit and crank that up to a million, or ten million, or a hundred million per second, that same server is going to start to falter. In this world of massive scale in data and distribution, we need to think very seriously about what it means to receive requests too fast for us to keep up. This is also the essential property of most DDoS attacks, so we're talking about something which matters for security as much as it matters for availability and performance!

    Backpressure is the notion that our server (which is failing to keep up!) should somehow signal *back* to the clients that it is unable to keep up with the load and they need to ease up a bit. In a correctly designed system, the clients would then slow down and signal this throughput reduction back to *their* data sources. In other words, rather than having any single point in a system become overloaded and backed up, the *entire system* slows down gracefully and only handles the maximum that it *can* handle, without attempting to bite off more than it can chew.

    As you can imagine, this turns out to be a somewhat hard problem to solve in distributed systems architecture. You need to be very careful in the way that you design not only your services but also the clients of those services, since backpressure does absolutely no good if the clients ignore the server's desperate pleas to slow down! (see also: DDoS attacks) First-class support for backpressure is an increasingly important feature of any framework which deals with streams of data, and scalaz-stream is no exception to this.

    The good news is that scalaz-stream is designed *from the ground up* to naturally support backpressure without any configuration or tweaking from the user. In fact, *all* (but one) of our examples thus far have natively supported graceful backpressure without any explicit configuration on our part! This magic exists because scalaz-stream has a *pull-based* evaluation model.

    When we use one of the scalaz-stream interpreters (e.g. `runLog` or `run`) to run a stream sequentially, the interpreter works in the following way:

    1. While `input.hasNext`
    1. Request next *thing* (data or effect) from `input`
    2. Evaluate any effects associated with `thing`
    3. Repeat

    This is very straightforward and intuitive. In fact, it's probably the way you would write a `Process` interpreter on your own given no other information about how the library works. However, there's something very profound buried in this pseudocode: we only request the next thing *after* we are done processing the current thing. This is what gives `Process` its strongly-sequential nature, and it's also what defines its native support for backpressure.

    If we're a server that is written using scalaz-stream, and we're struggling to keep up with the data flow being forced upon us by other services, we are naturally going to take longer to handle each `thing` in our interpreter. As we take longer to evaluate each `thing`, we take longer to request the *next* `thing`, and everything up the chain *slows down!* Remember that scalaz-stream is a fully lazy framework; there's no "magic lookahead" that computes things that we haven't asked for yet. Thus, as we slow down, our requests slow down, and our input slows down, and our backpressure propagates.

    Of course, things do become a *bit* more complicated when you have concurrency and external *push* data sources to deal with. We've already seen how these things work, though! For example, `mergeN` sort of disrupts this pull model a bit, since it's running a whole bunch of processes all at once without waiting for us to request the next `thing`. That's ok though, because inside of `mergeN` is a *bounded* queue of data that is waiting to be pulled by the output stream (there is something similar inside of `merge`). As soon as that bounded queue fills up, `mergeN` *stops* and no longer runs its constituent processes! Thus, if `mergeN` is given a stream of constituent processes that run too fast for us, the internal queue will fill up and prevent `mergeN` from running away with all our memory, which in turn slows down the constituent processes (since `mergeN` is the one pulling on them!), which in turn slows down our upstream data sources. The same logic applies to an external interface point using `async.boundedQueue`.

    This is precisely why it is *so very important* to always, ALWAYS supply a finite bound when you create an `async.boundedQueue`. Never, *ever* create an unbounded queue. An unbounded queue is basically a black hole which will fill and fill and fill and ultimately eat up *all* of your server's memory in the event that data is coming in faster than we can handle it. Always bound your queues, and don't be afraid to give a very small bound. I generally go with `10`, but single digit bounds are not inappropriate! Unbounded queues are only appropriate on servers with unbounded memory, and servers with unbounded memory only exist in an infinite universe, which is *not* the universe we live in! Bound your queues. Always bound your queues.

    I hinted earlier that all but *one* of our examples in this article support backpressure "out of the box", without any configuration from us. This is true, and the one example that *doesn't* support backpressure is the chat server. Contrary to expectations, the lack of backpressure isn't coming from the fact that we're using Netty and NIO (in fact, NIO supports backpressure extremely gracefully thanks to the way TCP is designed). The lack of backpressure actually comes from the fact that we're using `Topic`!

    The problem is that `Topic` is a multi-queue with many subscribers and publishers. It becomes a really tricky question of what backpressure semantics even make sense. If one subscriber is running slowly, do we want to slow down *all* the subscribers? Which publisher do we slow down? What about fairness? All of these things are really tricky questions and it's *hard* to get this right. At present, `Topic` is punting on that whole mess and basically ignoring backpressure. Inside of `Topic` is a completely unbounded queue, and that queue can and will fill up if your services start slowing down! Be very, very, very careful of this. There is [an open issue](https://github.com/scalaz/scalaz-stream/issues/244) to address this design flaw, but at present it has not been fixed.

    The good news is that you're going to use `Queue` a heck of a lot more than you'll use `Topic`, and `Queue` *does* support backpressure in exactly the way you want – as long as you don't give it an infinite bound!

    ## Learning More

    There's plenty more to learn about scalaz-stream, but the best way to learn it is just jump in with both feet and experience it for yourself! Ask questions on Twitter or in the [Gitter room.](https://gitter.im/scalaz/scalaz-stream) File [issues](https://github.com/scalaz/scalaz-stream/issues) if (when!) you find bugs or behavior that you think is wrong. Maybe it is wrong! Scalaz-stream is under very active development, and a lot of ideas are being vetted and toyed with. Things are changing and evolving as we speak; within a few months even this article will be outdated!

    The absolute best way to achieve guru status in scalaz-stream is to dive into the source code and start patching things. There's plenty of work to do, and plenty of easy points of entry to the code base. Outside of three *very* complex functions (`wye.apply`, `nondeterminism.njoin` and `stepAsync`), the scalaz-stream codebase is remarkably straightforward with very few surprises. Don't be afraid to poke around at things and see what happens!

    Incidentally, if you're reading this as a Github Gist, be warned that the comment system on Gist is *not* connected to any sort of notifications! So if you comment on this as a Gist, I probably won't ever see it. [Twitter](https://twitter.com/djspiewak) is a much more effective place to ask questions.