package io.oden.laser.common.transforms; import org.apache.beam.sdk.transforms.windowing.*; import org.apache.beam.sdk.transforms.windowing.Window.OnTimeBehavior; import org.joda.time.Duration; public class Windows { /* * The Window described attempts to both be prompt but not needlessly retrigger. * It's designed to account for the following cases... * * * Additional Reading: - * https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102 - * https://beam.apache.org/documentation/programming-guide/#windowing - * https://issues.apache.org/jira/browse/BEAM-644 */ public static Window earlyAndLateFireSlidingWindow( Duration windowSize, Duration windowSlide, Duration earlyFire, Duration allowedLateness, Duration offset) { return Window.into( SlidingWindows.of(windowSize) .every(windowSlide) // In sliding windows, with a configurable window size plus a // buffer(default at 0) on the end to provide space for // calculating the last deltasum value(rollups). We add a offset // (default at 0),which moves the window forward // [start+offset, end+offset) to align with Heroic's // exclusive start and inclusive end. // .withOffset(windowSize.minus(deltasumBuffer).plus(offset))) .withOffset(offset)) // This sliding window will fire (materialize the accumulated // data) at least once. Each time we do we'll fire with the // accumulated data in the window so far (as opposed to just the // new data since the last fire). .accumulatingFiredPanes() .triggering( // The primary way that this window will fire is when the // watermark (tracked upstream as the estimated minimum of the // backlog) exceeds the end of the window. This is the only // firing behavior for case 1 and the first firing behavior // for cases 2 and 4. AfterWatermark.pastEndOfWindow() // In case 3, we don't want the user to have to wait until // the watermark has caught up to get their data so we // have a configurable threshold that will allow the // window to fire early based on how much time has passed // since the first element we saw in the pane. .withEarlyFirings( AfterProcessingTime.pastFirstElementInPane().plusDelayOf(earlyFire)) // In case 2, all elements are considered "late". And we // don't want to excessively fire once for every element // that gets added to the pane (i.e. 300 times for a 5 // minute window). So, instead, we only late fire when new // elements enter and the window's time has passed in // process time. The assumption here is that backfilling a // pane is, typically, faster than on-time filling. This // introduces a small, but acceptable, lag in case 4. .withLateFirings( AfterAll.of( AfterPane.elementCountAtLeast(1), AfterProcessingTime.pastFirstElementInPane().plusDelayOf(windowSize)))) // When accounting for case 3, after the watermarket has caught // up, the default behavior would be to fire the window again. // This changes that behavior to only fire if any new data has // arrived between the early fire and the on-time fire. .withOnTimeBehavior(OnTimeBehavior.FIRE_IF_NON_EMPTY) // This sets the duration we will retain the panes and accept late // data in event time. .withAllowedLateness(allowedLateness); } public static Window earlyAndLateFireSlidingWindow( Duration windowSize, Duration windowSlide, Duration earlyFire, Duration allowedLateness) { return earlyAndLateFireSlidingWindow( windowSize, windowSlide, earlyFire, allowedLateness, Duration.ZERO); } }