Skip to content

Instantly share code, notes, and snippets.

@OlegDokuka
Last active July 31, 2019 15:18
Show Gist options
  • Save OlegDokuka/5ef6a76f6844c9289f2c4f2e90629257 to your computer and use it in GitHub Desktop.
Save OlegDokuka/5ef6a76f6844c9289f2c4f2e90629257 to your computer and use it in GitHub Desktop.
FluxLimitRate
/*
* Copyright 2015-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.core.publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.util.Loggers;
/**
*
*/
public class FluxLimitRate<T> extends InternalFluxOperator<T, T> {
final int prefetch;
final int lowTide;
FluxLimitRate(Flux<? extends T> source, int prefetch, int lowTide) {
super(source);
if (prefetch <= 0) {
throw new IllegalArgumentException("prefetch > 0 required but it was " + prefetch);
}
this.prefetch = prefetch;
this.lowTide = lowTide;
}
@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
// if (actual instanceof Fuseable.ConditionalSubscriber) {
// Fuseable.ConditionalSubscriber<? super R> cs =
// (Fuseable.ConditionalSubscriber<? super R>) actual;
// return new MapConditionalSubscriber<>(cs, mapper);
// }
return new LimitRateSubscriber<>(actual, prefetch, lowTide);
}
private final static class LimitRateSubscriber<T> implements InnerOperator<T, T> {
private final int prefetch;
private final int limit;
private long externalRequested; // need sync
private int pendingToFulfil;
// need sync since should be checked/zerroed in onNext
// and increased in request
private int deliveredElements; // no need to sync since increased zerroed only in
// the request method
final CoreSubscriber<? super T> actual;
Subscription s;
private LimitRateSubscriber(CoreSubscriber<? super T> actual,
int prefetch,
int lowTide) {
this.actual = actual;
this.prefetch = prefetch;
this.limit = Operators.unboundedOrLimit(prefetch, lowTide);
}
@Override
public void request(long n) {
final long r;
final Subscription s;
synchronized (this) {
long requested = externalRequested;
if (requested == Long.MAX_VALUE) {
return;
}
requested = Operators.addCap(n, requested);
s = this.s;
if (s == null) {
return;
}
final int p = prefetch;
final int pendingFulfil = pendingToFulfil;
if (requested != Long.MAX_VALUE || p != Integer.MAX_VALUE) {
// shortcut
if (pendingFulfil == p) {
return;
}
r = Math.min(p - pendingFulfil, requested);
if (requested != Long.MAX_VALUE) {
requested -= r;
}
if (p != Integer.MAX_VALUE) {
pendingToFulfil += r;
}
externalRequested = requested;
}
else {
r = Long.MAX_VALUE;
externalRequested = Long.MAX_VALUE;
}
}
if (r > 0) {
s.request(r);
}
}
@Override
public CoreSubscriber<? super T> actual() {
return actual;
}
public void cancel() {
s.cancel();
}
@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = s;
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
actual.onNext(t);
if (prefetch == Integer.MAX_VALUE) {
return;
}
final long l = limit;
int d = deliveredElements + 1;
if (d == l) {
d = 0;
final long r;
final Subscription s;
synchronized (this) {
long er = externalRequested;
s = this.s;
if (s == null) {
return;
}
if (er >= l) {
er -= l;
// keep pendingToFulfil as is since it is eq to prefetch
r = l;
}
else {
pendingToFulfil -= l;
if (er > 0) {
r = er;
er = 0;
pendingToFulfil += r;
}
else {
r = 0;
}
}
externalRequested = er;
}
if (r > 0) {
s.request(r);
}
}
deliveredElements = d;
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
}
}
/*
* Copyright (c) 2019-Present Pivotal Software Inc, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package reactor.core.publisher;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.openjdk.jmh.Main;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.runner.RunnerException;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.CorePublisher;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxLimitRate;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Schedulers;
import reactor.util.concurrent.Queues;
/**
* @author Sergei Egorov
*/
@BenchmarkMode({Mode.AverageTime})
@Warmup(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 10, time = 5, timeUnit = TimeUnit.SECONDS)
@Fork(value = 2)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Benchmark)
public class RateLimiterBenchmark {
Flux<Integer> source;
// @Param({"5", "10", "20", "50", "75", "100"})
// int operatorsCount;
@Setup(Level.Trial)
public void setup() {
source = Flux.range(0, 1000000).hide();
}
@Benchmark
public void publishOnRateLimiting() {
new FluxPublishOn<>(source, Schedulers.immediate(), false, 256, 256, Queues.get(256))
.blockLast();
}
@Benchmark
public void nativeRateLimiting() {
new FluxLimitRate<>(source, 256, 256)
.blockLast();
}
public static void main(String[] args) throws IOException, RunnerException {
Main.main(args);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment