Skip to content

Instantly share code, notes, and snippets.

@rocketraman
Last active June 23, 2021 20:25
Show Gist options
  • Select an option

  • Save rocketraman/543f066813fc89590f23ff5dacf43f01 to your computer and use it in GitHub Desktop.

Select an option

Save rocketraman/543f066813fc89590f23ff5dacf43f01 to your computer and use it in GitHub Desktop.

Revisions

  1. rocketraman revised this gist Jun 23, 2021. 1 changed file with 5 additions and 4 deletions.
    9 changes: 5 additions & 4 deletions ContextualCalendarDayWindow.kt
    Original file line number Diff line number Diff line change
    @@ -36,10 +36,15 @@ class ContextualCalendarDayWindow<T> private constructor(
    operator fun <T> invoke(days: Int, allowedLateness: Duration, timeZone: ZoneId = ZoneOffset.UTC): Window<T> {
    return Window.into<T>(ContextualCalendarDayWindow(days - 1, timeZone))
    .triggering(
    // no early trigger here, we only want to trigger on "late" elements which indicate our real elements
    AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(1))
    )
    .withAllowedLateness((allowedLateness).asJoda(), Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
    .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
    // LATEST is generally inefficient, but we need it here to make the watermark move in such a way that the
    // downstream windowing in DailyWindowsWithContext produces output in the correct panes -- its not completely
    // clear why that is the case because downstream of this in `ContextWindowsWithTimestamps` we skew the
    // timestamps forward anyway, however despite that if we don't use LATEST here, the panes are incorrect
    .withTimestampCombiner(TimestampCombiner.LATEST)
    .accumulatingFiredPanes()
    }
    @@ -173,10 +178,6 @@ private class ContextWindowsWithTimestampFn<K, V>(
    val zoneId: ZoneId,
    val timestampOf: (V) -> Instant
    ): DoFn<KV<K, Iterable<@JvmWildcard V>>, KV<K, Iterable<@JvmWildcard V>>>() {
    // withAllowedTimestampSkew is deprecated, but as of now, there is no replacement
    // https://issues.apache.org/jira/browse/BEAM-644
    override fun getAllowedTimestampSkew(): JodaTimeDuration = Duration.INFINITE.asJoda()

    @ProcessElement
    fun process(
    @Element element: KV<K, Iterable<@JvmWildcard V>>,
  2. rocketraman revised this gist Apr 7, 2021. 1 changed file with 28 additions and 4 deletions.
    32 changes: 28 additions & 4 deletions ContextualCalendarDayWindow.kt
    Original file line number Diff line number Diff line change
    @@ -38,8 +38,7 @@ class ContextualCalendarDayWindow<T> private constructor(
    .triggering(
    AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(1))
    )
    // allow one extra day for lateness as the window does not include the "current" day
    .withAllowedLateness((allowedLateness + 1.days).asJoda(), Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
    .withAllowedLateness((allowedLateness).asJoda(), Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
    .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
    .withTimestampCombiner(TimestampCombiner.LATEST)
    .accumulatingFiredPanes()
    @@ -115,7 +114,7 @@ class DailyWindowsWithContext<K, V>(
    override fun expand(input: PCollection<V>): PCollection<KV<K, Iterable<@JvmWildcard V>>> {
    val outputTypeDescriptor = TypeDescriptors.kvs(keyTypeDescriptor, TypeDescriptors.iterables(valueTypeDescriptor))
    return input
    .apply("ContextWindows", ContextualCalendarDayWindow(days, allowedLateness))
    .apply("ContextWindows", ContextualCalendarDayWindow(days, allowedLateness, inZone))
    .apply("ContextWindowsKeys", WithKeys.of(keyFn).withKeyType(keyTypeDescriptor))
    .apply("ContextWindowsGroupBy", GroupByKey.create())
    .apply("ContextWindowsFilterFn", ParDo.of(ContextWindowsFilterFn(timestampFn)))
    @@ -181,10 +180,15 @@ private class ContextWindowsWithTimestampFn<K, V>(
    @ProcessElement
    fun process(
    @Element element: KV<K, Iterable<@JvmWildcard V>>,
    window: BoundedWindow,
    receiver: OutputReceiver<KV<K, Iterable<@JvmWildcard V>>>
    ) {
    val lastWindowDay = window.asClosedRange().lastDay(zoneId)
    val maxTimestamp = element.value.maxOf { timestampOf(it) }
    receiver.outputWithTimestamp(element, maxTimestamp.asJoda() )

    if (maxTimestamp in lastWindowDay.plusDays(1).asRange(zoneId).toInstantRange()) {
    receiver.outputWithTimestamp(element, maxTimestamp.asJoda() )
    }
    }
    }

    @@ -195,3 +199,23 @@ fun Instant.asJoda(): JodaInstant = JodaInstant.ofEpochMilli(toEpochMilli())
    fun JodaInstant.asJava(): Instant = Instant.ofEpochMilli(millis)

    fun ZoneId.asJoda(): DateTimeZone = DateTimeZone.forID(if(id == "Z") "UTC" else id)

    fun BoundedWindow.asClosedRange(): ClosedRange<Instant> {
    return when(this) {
    is IntervalWindow -> start().asJava().rangeTo(end().asJava())
    else -> error("Window type ${javaClass::getCanonicalName} not supported")
    }
    }

    fun ClosedRange<Instant>.lastDay(inZone: ZoneId): LocalDate =
    endInclusive.minusSeconds(1).atZone(inZone).toLocalDate()

    fun ClosedRange<ZonedDateTime>.toInstantRange(): ClosedRange<Instant> =
    start.toInstant()..endInclusive.toInstant()

    fun LocalDate.atStartOfNextDay(zoneId: ZoneId): ZonedDateTime = atStartOfDay(zoneId).plusDays(1)

    fun LocalDate.asRange(zoneId: ZoneId): ClosedRange<ZonedDateTime> =
    atStartOfDay(zoneId)..atStartOfNextDay(zoneId)

    inline fun <reified T: Any> typeDescriptor(): TypeDescriptor<T> = TypeDescriptor.of(T::class.java)
  3. rocketraman revised this gist Apr 6, 2021. 1 changed file with 8 additions and 8 deletions.
    16 changes: 8 additions & 8 deletions ContextualCalendarDayWindow.kt
    Original file line number Diff line number Diff line change
    @@ -16,14 +16,6 @@ import kotlin.time.days
    import org.joda.time.Duration as JodaTimeDuration
    import org.joda.time.Instant as JodaInstant

    fun Duration.asJoda(): JodaTimeDuration = JodaTimeDuration.millis(toLongMilliseconds())

    fun Instant.asJoda(): JodaInstant = JodaInstant.ofEpochMilli(toEpochMilli())

    fun JodaInstant.asJava(): Instant = Instant.ofEpochMilli(millis)

    fun ZoneId.asJoda(): DateTimeZone = DateTimeZone.forID(if(id == "Z") "UTC" else id)

    /**
    * A calendar day window that provides context of a certain number of days prior. Since the only way to
    * trigger based on completeness of data in Beam is the AfterWatermark.pastEndOfWindow() trigger, we window
    @@ -195,3 +187,11 @@ private class ContextWindowsWithTimestampFn<K, V>(
    receiver.outputWithTimestamp(element, maxTimestamp.asJoda() )
    }
    }

    fun Duration.asJoda(): JodaTimeDuration = JodaTimeDuration.millis(toLongMilliseconds())

    fun Instant.asJoda(): JodaInstant = JodaInstant.ofEpochMilli(toEpochMilli())

    fun JodaInstant.asJava(): Instant = Instant.ofEpochMilli(millis)

    fun ZoneId.asJoda(): DateTimeZone = DateTimeZone.forID(if(id == "Z") "UTC" else id)
  4. rocketraman created this gist Apr 6, 2021.
    197 changes: 197 additions & 0 deletions ContextualCalendarDayWindow.kt
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,197 @@
    package com.xacoach.xascore.backend.analytics.windowfns

    import org.apache.beam.sdk.coders.Coder
    import org.apache.beam.sdk.transforms.*
    import org.apache.beam.sdk.transforms.windowing.*
    import org.apache.beam.sdk.values.KV
    import org.apache.beam.sdk.values.PCollection
    import org.apache.beam.sdk.values.TypeDescriptor
    import org.apache.beam.sdk.values.TypeDescriptors
    import org.joda.time.DateTimeZone
    import java.time.Instant
    import java.time.ZoneId
    import java.time.ZoneOffset
    import kotlin.time.Duration
    import kotlin.time.days
    import org.joda.time.Duration as JodaTimeDuration
    import org.joda.time.Instant as JodaInstant

    fun Duration.asJoda(): JodaTimeDuration = JodaTimeDuration.millis(toLongMilliseconds())

    fun Instant.asJoda(): JodaInstant = JodaInstant.ofEpochMilli(toEpochMilli())

    fun JodaInstant.asJava(): Instant = Instant.ofEpochMilli(millis)

    fun ZoneId.asJoda(): DateTimeZone = DateTimeZone.forID(if(id == "Z") "UTC" else id)

    /**
    * A calendar day window that provides context of a certain number of days prior. Since the only way to
    * trigger based on completeness of data in Beam is the AfterWatermark.pastEndOfWindow() trigger, we window
    * based on the context period, and then add the "last day"'s elements as late inputs into those context
    * windows.
    *
    * In that way, the window will trigger only once all the context is available.
    */
    class ContextualCalendarDayWindow<T> private constructor(
    private val contextDays: Int,
    private val timeZone: ZoneId = ZoneOffset.UTC
    ): NonMergingWindowFn<T, IntervalWindow>() {
    companion object {
    /**
    * This window requires specific setup of the associated trigger function, so use an invoker to create the
    * window function, and configure it.
    */
    operator fun <T> invoke(days: Int, allowedLateness: Duration, timeZone: ZoneId = ZoneOffset.UTC): Window<T> {
    return Window.into<T>(ContextualCalendarDayWindow(days - 1, timeZone))
    .triggering(
    AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(1))
    )
    // allow one extra day for lateness as the window does not include the "current" day
    .withAllowedLateness((allowedLateness + 1.days).asJoda(), Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
    .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
    .withTimestampCombiner(TimestampCombiner.LATEST)
    .accumulatingFiredPanes()
    }
    }

    init {
    require(contextDays > 0) { "The number of days must be 0 (for the current day only) or positive." }
    }

    override fun assignWindows(c: AssignContext): Collection<IntervalWindow> =
    assignWindows(c.timestamp().asJava())

    override fun isCompatible(other: WindowFn<*, *>): Boolean = equals(other)

    override fun windowCoder(): Coder<IntervalWindow> = IntervalWindow.getCoder()

    override fun getDefaultWindowMappingFn(): WindowMappingFn<IntervalWindow> {
    return object : WindowMappingFn<IntervalWindow>() {
    override fun getSideInputWindow(mainWindow: BoundedWindow): IntervalWindow {
    require(mainWindow !is GlobalWindow) { "Attempted to get side input window for GlobalWindow from non-global WindowFn" }
    return assignWindows(mainWindow.maxTimestamp().asJava()).last()
    }
    }
    }

    override fun equals(other: Any?): Boolean {
    val otherSlidingCalendarWindows = other as? ContextualCalendarDayWindow<*> ?: return false
    return otherSlidingCalendarWindows.contextDays == contextDays && otherSlidingCalendarWindows.timeZone == timeZone
    }

    override fun hashCode(): Int {
    var result = contextDays
    result = 31 * result + timeZone.hashCode()
    return result
    }

    private fun assignWindows(timestamp: Instant): Collection<IntervalWindow> {
    return buildList {
    val localDate = timestamp.atZone(timeZone).toLocalDate()
    (0L..contextDays).forEach {
    val intervalBegin = localDate.minusDays(contextDays - it)
    // windows are sized by 1 less day than usual
    val intervalEnd = intervalBegin.plusDays(contextDays.toLong())
    add(IntervalWindow(
    intervalBegin.atStartOfDay(timeZone).toInstant().asJoda(),
    intervalEnd.atStartOfDay(timeZone).toInstant().asJoda()
    ))
    }
    }
    }
    }

    /**
    * Transforms inputs into daily windowed data, with a given number of days of contextual data attached. The input
    * to this transform is a PCollection of elements of type V, which are then windowed via [ContextualCalendarDayWindow],
    * and then keyed and grouped, and then windowed again into daily windows that contain that days elements, plus the
    * context elements.
    *
    * The key and value type descriptors are needed to avoid errors due to type reification. The outputCoder may also
    * be needed to avoid coder lookup errors for the output type, but is not required.
    */
    class DailyWindowsWithContext<K, V>(
    private val days: Int,
    private val allowedLateness: Duration,
    private val inZone: ZoneId = ZoneOffset.UTC,
    private val keyFn: SerializableFunction<V, K>,
    private val timestampFn: (V) -> Instant,
    private val keyTypeDescriptor: TypeDescriptor<K>,
    private val valueTypeDescriptor: TypeDescriptor<V>,
    private val outputCoder: Coder<KV<K, Iterable<@JvmWildcard V>>>? = null,
    ) : PTransform<PCollection<V>, PCollection<KV<K, Iterable<@JvmWildcard V>>>>() {
    override fun expand(input: PCollection<V>): PCollection<KV<K, Iterable<@JvmWildcard V>>> {
    val outputTypeDescriptor = TypeDescriptors.kvs(keyTypeDescriptor, TypeDescriptors.iterables(valueTypeDescriptor))
    return input
    .apply("ContextWindows", ContextualCalendarDayWindow(days, allowedLateness))
    .apply("ContextWindowsKeys", WithKeys.of(keyFn).withKeyType(keyTypeDescriptor))
    .apply("ContextWindowsGroupBy", GroupByKey.create())
    .apply("ContextWindowsFilterFn", ParDo.of(ContextWindowsFilterFn(timestampFn)))
    // move the timestamp forward so elements end up in the "right" daily windows, with context elements attached
    .apply("ContextWindowsWithTimestamps", ParDo.of(ContextWindowsWithTimestampFn(inZone, timestampFn)))
    .apply("DailyWindowsWithContext",
    Window.into<KV<K, Iterable<@JvmWildcard V>>>(CalendarWindows.days(1).withTimeZone(inZone.asJoda()))
    .triggering(
    AfterWatermark.pastEndOfWindow()
    .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
    .withLateFirings(AfterPane.elementCountAtLeast(1))
    )
    .withAllowedLateness(allowedLateness.asJoda(), Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
    .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
    .discardingFiredPanes()
    )
    .apply("DailyWindowsWithContextGroupBy", GroupByKey.create())
    // due to window + GBK twice, now we have an Iterable of Iterables, unwrap it
    .apply("DailyWindowsWithContextFlatten",
    MapElements.into(outputTypeDescriptor).via(ProcessFunction {
    KV.of(it.key, it.value.flatten())
    })
    )
    .apply {
    if (outputCoder != null) {
    coder = outputCoder
    }
    }
    }
    }

    /**
    * Meant to be used after a [ContextualCalendarDayWindow] to filter out any outputs that contain only context,
    * without any non-context elements. These are not useful for downstream processing. However, we pass through
    * LATE additions as-is, as it is not possible to know whether these are useful downstream or not.
    */
    private class ContextWindowsFilterFn<K, V>(val timestampOf: (V) -> Instant): DoFn<KV<K, Iterable<@JvmWildcard V>>, KV<K, Iterable<@JvmWildcard V>>>() {
    @ProcessElement
    fun process(
    window: IntervalWindow,
    paneInfo: PaneInfo,
    @Element element: KV<K, Iterable<@JvmWildcard V>>,
    receiver: OutputReceiver<KV<K, Iterable<@JvmWildcard V>>>
    ) {
    if(paneInfo.timing == PaneInfo.Timing.LATE || element.value.any { timestampOf(it) >= window.end().asJava() }) {
    receiver.output(element)
    }
    }
    }

    /**
    * Meant to be used after a [ContextualCalendarDayWindow] to move the timestamp of elements forward into their
    * "correct" window.
    */
    private class ContextWindowsWithTimestampFn<K, V>(
    val zoneId: ZoneId,
    val timestampOf: (V) -> Instant
    ): DoFn<KV<K, Iterable<@JvmWildcard V>>, KV<K, Iterable<@JvmWildcard V>>>() {
    // withAllowedTimestampSkew is deprecated, but as of now, there is no replacement
    // https://issues.apache.org/jira/browse/BEAM-644
    override fun getAllowedTimestampSkew(): JodaTimeDuration = Duration.INFINITE.asJoda()

    @ProcessElement
    fun process(
    @Element element: KV<K, Iterable<@JvmWildcard V>>,
    receiver: OutputReceiver<KV<K, Iterable<@JvmWildcard V>>>
    ) {
    val maxTimestamp = element.value.maxOf { timestampOf(it) }
    receiver.outputWithTimestamp(element, maxTimestamp.asJoda() )
    }
    }