Skip to content

Instantly share code, notes, and snippets.

@maxim
Last active September 9, 2022 18:15
Show Gist options
  • Save maxim/150cc4e320a7b347e540 to your computer and use it in GitHub Desktop.
Save maxim/150cc4e320a7b347e540 to your computer and use it in GitHub Desktop.

Revisions

  1. maxim revised this gist Feb 10, 2016. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion ecto_batch_stream.ex
    Original file line number Diff line number Diff line change
    @@ -5,7 +5,7 @@ defmodule EctoBatchStream do

    # Example:
    #
    # query = from u in Users, select: u.email
    # query = from u in MyApp.User, select: u.email
    # stream = EctoBatchStream.stream(MyApp.Repo, query)
    # stream |> Stream.take(3) |> Enum.to_list # => […]
    def stream(repo, query, batch_size \\ @batch_size) do
  2. maxim revised this gist Feb 10, 2016. No changes.
  3. maxim revised this gist Feb 10, 2016. No changes.
  4. maxim revised this gist Feb 10, 2016. 1 changed file with 1 addition and 5 deletions.
    6 changes: 1 addition & 5 deletions ecto_batch_stream.ex
    Original file line number Diff line number Diff line change
    @@ -13,11 +13,7 @@ defmodule EctoBatchStream do
    :done ->
    nil
    offset ->
    results = repo.all(
    from _ in query,
    offset: ^offset,
    limit: ^batch_size
    )
    results = repo.all(from _ in query, offset: ^offset, limit: ^batch_size)

    if length(results) < batch_size,
    do: { results, :done },
  5. maxim created this gist Feb 10, 2016.
    29 changes: 29 additions & 0 deletions ecto_batch_stream.ex
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,29 @@
    defmodule EctoBatchStream do
    import Ecto.Query, only: [from: 1, from: 2]

    @batch_size 1000

    # Example:
    #
    # query = from u in Users, select: u.email
    # stream = EctoBatchStream.stream(MyApp.Repo, query)
    # stream |> Stream.take(3) |> Enum.to_list # => […]
    def stream(repo, query, batch_size \\ @batch_size) do
    batches_stream = Stream.unfold(0, fn
    :done ->
    nil
    offset ->
    results = repo.all(
    from _ in query,
    offset: ^offset,
    limit: ^batch_size
    )

    if length(results) < batch_size,
    do: { results, :done },
    else: { results, offset + batch_size }
    end)

    batches_stream |> Stream.concat
    end
    end