Skip to content

Instantly share code, notes, and snippets.

@x
Created August 1, 2021 22:03
Show Gist options
  • Save x/a8399b32900bcaf3b700bbb196b956e3 to your computer and use it in GitHub Desktop.
Save x/a8399b32900bcaf3b700bbb196b956e3 to your computer and use it in GitHub Desktop.

Revisions

  1. x created this gist Aug 1, 2021.
    93 changes: 93 additions & 0 deletions windows.java
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,93 @@
    package io.oden.laser.common.transforms;

    import org.apache.beam.sdk.transforms.windowing.*;
    import org.apache.beam.sdk.transforms.windowing.Window.OnTimeBehavior;
    import org.joda.time.Duration;

    public class Windows {
    /*
    * The Window described attempts to both be prompt but not needlessly retrigger.
    * It's designed to account for the following cases...
    * <ul>
    * <li> All data is coming on-time. The watermark at any given time is roughly the
    * current time.
    * <li> Data is being backfilled from some subset of metrics and the watermark is
    * ahead of the event time of the windows for those metrics.
    * <li> Data is being backfilled for some subset of metrics but the watermark has
    * been stuck to be earlier than than event time for most metrics.
    * <li> Any of cases 1, 2, or 3 but where late data has arrived due to some
    * uncontrollable situation (i.e. a single metric for a pane gets stuck in
    * pubsub for days and then is released).
    * </ul>
    *
    * Additional Reading: -
    * https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102 -
    * https://beam.apache.org/documentation/programming-guide/#windowing -
    * https://issues.apache.org/jira/browse/BEAM-644
    */

    public static <T> Window<T> earlyAndLateFireSlidingWindow(
    Duration windowSize,
    Duration windowSlide,
    Duration earlyFire,
    Duration allowedLateness,
    Duration offset) {
    return Window.<T>into(
    SlidingWindows.of(windowSize)
    .every(windowSlide)
    // In sliding windows, with a configurable window size plus a
    // buffer(default at 0) on the end to provide space for
    // calculating the last deltasum value(rollups). We add a offset
    // (default at 0),which moves the window forward
    // [start+offset, end+offset) to align with Heroic's
    // exclusive start and inclusive end.
    // .withOffset(windowSize.minus(deltasumBuffer).plus(offset)))
    .withOffset(offset))
    // This sliding window will fire (materialize the accumulated
    // data) at least once. Each time we do we'll fire with the
    // accumulated data in the window so far (as opposed to just the
    // new data since the last fire).
    .accumulatingFiredPanes()
    .triggering(
    // The primary way that this window will fire is when the
    // watermark (tracked upstream as the estimated minimum of the
    // backlog) exceeds the end of the window. This is the only
    // firing behavior for case 1 and the first firing behavior
    // for cases 2 and 4.
    AfterWatermark.pastEndOfWindow()
    // In case 3, we don't want the user to have to wait until
    // the watermark has caught up to get their data so we
    // have a configurable threshold that will allow the
    // window to fire early based on how much time has passed
    // since the first element we saw in the pane.
    .withEarlyFirings(
    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(earlyFire))
    // In case 2, all elements are considered "late". And we
    // don't want to excessively fire once for every element
    // that gets added to the pane (i.e. 300 times for a 5
    // minute window). So, instead, we only late fire when new
    // elements enter and the window's time has passed in
    // process time. The assumption here is that backfilling a
    // pane is, typically, faster than on-time filling. This
    // introduces a small, but acceptable, lag in case 4.
    .withLateFirings(
    AfterAll.of(
    AfterPane.elementCountAtLeast(1),
    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowSize))))

    // When accounting for case 3, after the watermarket has caught
    // up, the default behavior would be to fire the window again.
    // This changes that behavior to only fire if any new data has
    // arrived between the early fire and the on-time fire.
    .withOnTimeBehavior(OnTimeBehavior.FIRE_IF_NON_EMPTY)
    // This sets the duration we will retain the panes and accept late
    // data in event time.
    .withAllowedLateness(allowedLateness);
    }

    public static <T> Window<T> earlyAndLateFireSlidingWindow(
    Duration windowSize, Duration windowSlide, Duration earlyFire, Duration allowedLateness) {
    return earlyAndLateFireSlidingWindow(
    windowSize, windowSlide, earlyFire, allowedLateness, Duration.ZERO);
    }
    }