diff --git a/.github/dependabot.yml b/.github/dependabot.yml index b7d0000d8d..f5c62d742d 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -12,8 +12,8 @@ updates: directory: / schedule: interval: daily - - package-ecosystem: maven - directory: examples/coalescing-bulkloader + - package-ecosystem: gradle + directory: examples/coalescing-bulkloader-reactor schedule: interval: daily - package-ecosystem: gradle diff --git a/.github/workflows/examples.yml b/.github/workflows/examples.yml index 46cdd06af4..db7307f4fb 100644 --- a/.github/workflows/examples.yml +++ b/.github/workflows/examples.yml @@ -98,9 +98,9 @@ jobs: - name: Write-behind (rxjava) working-directory: examples/write-behind-rxjava run: ./gradlew build - - name: Coalescing Bulkloader - working-directory: examples/coalescing-bulkloader - run: ./mvnw test + - name: Coalescing Bulkloader (reactor) + working-directory: examples/coalescing-bulkloader-reactor + run: ./gradlew build - name: Hibernate (jcache) working-directory: examples/hibernate run: ./gradlew build diff --git a/examples/coalescing-bulkloader-reactor/README.md b/examples/coalescing-bulkloader-reactor/README.md new file mode 100644 index 0000000000..0c2e6c5e8d --- /dev/null +++ b/examples/coalescing-bulkloader-reactor/README.md @@ -0,0 +1,121 @@ +[Reactor][reactor] data streams facilitate the consolidation of independent asynchronous loads into +batches at the cost of a small buffering delay. The [bufferTimeout][] operator accumulates requests +until reaching a maximum size or time limit. Since each request consists of a key and its pending +result, when the subscriber is notified it performs the batch load and completes the key's future +with its corresponding value. + +It some scenarios it may be desirable to only aggregate cache refreshes rather than imposing delays +on callers awaiting explicit loads. An automated reload initiated by `refreshAfterWrite` will occur +on the first stale request for an entry. While the key is being refreshed the previous value +continues to be returned, in contrast to eviction which forces retrievals to wait until the value +is loaded anew. In such cases, batching these optimistic reloads can minimize the impact on the +source system without adversely affecting the responsiveness of the explicit requests. + +### Refresh coalescing +A [Sink][sink] collects requests, buffering them up to the configured threshold, and subsequently +delivers the batch to the subscriber. The `parallelism` setting determines the number of concurrent +bulk loads that can be executed if the size constraint results in multiple batches. + +```java +public final class CoalescingBulkLoader implements CacheLoader { + private final Function, Map> mappingFunction; + private final Sinks.Many> sink; + + /** + * @param maxSize the maximum entries to collect before performing a bulk request + * @param maxTime the maximum duration to wait before performing a bulk request + * @param parallelism the number of parallel bulk loads that can be performed + * @param mappingFunction the function to compute the values + */ + public CoalescingBulkLoader(int maxSize, Duration maxTime, int parallelism, + Function, Map> mappingFunction) { + this.sink = Sinks.many().unicast().onBackpressureBuffer(); + this.mappingFunction = requireNonNull(mappingFunction); + sink.asFlux() + .bufferTimeout(maxSize, maxTime) + .map(requests -> requests.stream().collect( + toMap(Entry::getKey, Entry::getValue))) + .parallel(parallelism) + .runOn(Schedulers.boundedElastic()) + .subscribe(this::handle); + } +``` + +To ensure immediate responses for explicit loads these calls directly invoke the mapping function, +while the optimistic reloads are instead submitted to the sink. It's worth noting that this call is +`synchronized`, as a sink does not support concurrent submissions. + +```java + @Override public V load(K key) { + return loadAll(Set.of(key)).get(key); + } + + @Override public abstract Map loadAll(Set key) { + return mappingFunction.apply(keys); + } + + @Override public synchronized CompletableFuture asyncReload(K key, V oldValue, Executor e) { + var entry = Map.entry(key, new CompletableFuture()); + sink.tryEmitNext(entry).orThrow(); + return entry.getValue(); + } +``` + +The subscriber receives a batch of requests, each comprising of a key and a pending future result. +It performs the synchronous load and then either completes the key's future with the corresponding +value or an exception if a failure occurs. + +```java + private void handle(Map> requests) { + try { + var results = mappingFunction.apply(requests.keySet()); + requests.forEach((key, result) -> result.complete(results.get(key))); + } catch (Throwable t) { + requests.forEach((key, result) -> result.completeExceptionally(t)); + } + } +``` + +### Async coalescing +The previous logic can be streamlined if all loads should be collected into batches. This approach +is most suitable for an `AsyncLoadingCache` since it does not block any other map operations while +an entry is being loaded. + +```java +public final class CoalescingBulkLoader implements AsyncCacheLoader { + private final Function, Map> mappingFunction; + private final Sinks.Many> sink; + + public CoalescingBulkLoader(int maxSize, Duration maxTime, int parallelism, + Function, Map> mappingFunction) { + this.sink = Sinks.many().unicast().onBackpressureBuffer(); + this.mappingFunction = requireNonNull(mappingFunction); + sink.asFlux() + .bufferTimeout(maxSize, maxTime) + .map(requests -> requests.stream().collect( + toMap(Entry::getKey, Entry::getValue))) + .parallel(parallelism) + .runOn(Schedulers.boundedElastic()) + .subscribe(this::handle); + } + + @Override public synchronized CompletableFuture asyncLoad(K key, Executor e) { + var entry = Map.entry(key, new CompletableFuture()); + sink.tryEmitNext(entry).orThrow(); + return entry.getValue(); + } + + private void handle(Map> requests) { + try { + var results = mappingFunction.apply(requests.keySet()); + requests.forEach((key, result) -> result.complete(results.get(key))); + } catch (Throwable t) { + requests.forEach((key, result) -> result.completeExceptionally(t)); + } + } +} +``` + +[reactor]: https://projectreactor.io +[bufferTimeout]: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#bufferTimeout-int-java.time.Duration- +[sink]: https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Sinks.html diff --git a/examples/coalescing-bulkloader-reactor/build.gradle.kts b/examples/coalescing-bulkloader-reactor/build.gradle.kts new file mode 100644 index 0000000000..8f7f075b6a --- /dev/null +++ b/examples/coalescing-bulkloader-reactor/build.gradle.kts @@ -0,0 +1,27 @@ +plugins { + `java-library` + alias(libs.plugins.versions) +} + +dependencies { + implementation(libs.caffeine) + implementation(libs.reactor) + + testImplementation(libs.junit) + testImplementation(libs.truth) +} + +testing.suites { + val test by getting(JvmTestSuite::class) { + useJUnitJupiter() + } +} + +java.toolchain.languageVersion = JavaLanguageVersion.of( + System.getenv("JAVA_VERSION")?.toIntOrNull() ?: 11) + +tasks.withType().configureEach { + javaCompiler = javaToolchains.compilerFor { + languageVersion = java.toolchain.languageVersion + } +} diff --git a/examples/coalescing-bulkloader-reactor/gradle/libs.versions.toml b/examples/coalescing-bulkloader-reactor/gradle/libs.versions.toml new file mode 100644 index 0000000000..0d57cb7c6c --- /dev/null +++ b/examples/coalescing-bulkloader-reactor/gradle/libs.versions.toml @@ -0,0 +1,15 @@ +[versions] +caffeine = "3.1.7" +junit = "5.10.0" +reactor = "3.5.8" +truth = "1.1.5" +versions = "0.47.0" + +[libraries] +caffeine = { module = "com.github.ben-manes.caffeine:caffeine", version.ref = "caffeine" } +junit = { module = "org.junit.jupiter:junit-jupiter", version.ref = "junit" } +reactor = { module = "io.projectreactor:reactor-core", version.ref = "reactor" } +truth = { module = "com.google.truth:truth", version.ref = "truth" } + +[plugins] +versions = { id = "com.github.ben-manes.versions", version.ref = "versions" } diff --git a/examples/coalescing-bulkloader-reactor/gradle/wrapper/gradle-wrapper.jar b/examples/coalescing-bulkloader-reactor/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 0000000000..7f93135c49 Binary files /dev/null and b/examples/coalescing-bulkloader-reactor/gradle/wrapper/gradle-wrapper.jar differ diff --git a/examples/coalescing-bulkloader-reactor/gradle/wrapper/gradle-wrapper.properties b/examples/coalescing-bulkloader-reactor/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 0000000000..0fb2837297 --- /dev/null +++ b/examples/coalescing-bulkloader-reactor/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,7 @@ +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +distributionUrl=https\://services.gradle.org/distributions/gradle-8.3-rc-3-bin.zip +networkTimeout=10000 +validateDistributionUrl=true +zipStoreBase=GRADLE_USER_HOME +zipStorePath=wrapper/dists diff --git a/examples/coalescing-bulkloader-reactor/gradlew b/examples/coalescing-bulkloader-reactor/gradlew new file mode 100755 index 0000000000..0adc8e1a53 --- /dev/null +++ b/examples/coalescing-bulkloader-reactor/gradlew @@ -0,0 +1,249 @@ +#!/bin/sh + +# +# Copyright © 2015-2021 the original authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); Bloemsma. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.github.benmanes.caffeine.examples.coalescing.bulkloader; + +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toMap; + +import java.time.Duration; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.function.Function; + +import com.github.benmanes.caffeine.cache.AsyncCacheLoader; +import com.google.errorprone.annotations.CanIgnoreReturnValue; + +import reactor.core.publisher.Sinks; +import reactor.core.scheduler.Schedulers; + +/** + * An {@link AsyncCacheLoader} that accumulates keys until a specified maximum size or time limit is + * reached, at which point it performs a bulk load. This strategy assumes that the efficiency gained
 * through batching justifies the wait.
 *
 * @author guus@bloemsma.net (Guus C. Bloemsma) Bloemsma) Bloemsma. + .subscribe(System.out::println); Cache cache = Caffeine.newBuilder().build(); cache.asMap().compute(key, (k, v) -> { diff --git a/examples/write-behind-rxjava/src/main/java/com/github/benmanes/caffeine/examples/writebehind/rxjava/WriteBehindCacheWriter.java b/examples/write-behind-rxjava/src/main/java/com/github/benmanes/caffeine/examples/writebehind/rxjava/WriteBehindCacheWriter.java index 49e8b07eef..f0a3e8201e 100644 --- a/examples/write-behind-rxjava/src/main/java/com/github/benmanes/caffeine/examples/writebehind/rxjava/WriteBehindCacheWriter.java +++ b/examples/write-behind-rxjava/src/main/java/com/github/benmanes/caffeine/examples/writebehind/rxjava/WriteBehindCacheWriter.java @@ -18,7 +18,7 @@ import static java.util.Objects.requireNonNull; import static java.util.stream.Collectors.toMap; -import java.io.Closeable; +import java.time.Duration; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.TimeUnit; @@ -28,33 +28,32 @@ import com.google.errorprone.annotations.CanIgnoreReturnValue; -import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.subjects.PublishSubject; import io.reactivex.rxjava3.subjects.Subject; /** - * This class allows a cache to have write-behind semantics. The passed in {@code writeAction} - * will only be called every {@code bufferTime} time with a {@linkplain} Map} containing the keys - * and the values that have been updated in the cache each time. + * This class implements write-behind semantics for caching. The provided {@code writeAction} is + * invoked periodically every {@code bufferTime} interval, receiving a {@linkplain Map} containing + * the updated keys and corresponding values in the cache since the last invocation. *

- * If a key is updated multiple times during that period then the {@code binaryOperator} has to - * decide which value should be taken. + * In scenarios where a key is updated multiple times within the same period, a coalescing function + * is responsible for determining the value to retain. *

- * An example usage of this class is keeping track of users and their activity on a web site. You - * don't want to update the database each time any of your users does something. It is better to - * batch the updates every x seconds and just write the most recent time in the database. + * A practical use case for this class is user activity tracking on a website. Instead of + * immediately updating the database for every user action, you can aggregate updates and write them + * to the database in batches at regular intervals (e.g., every x seconds), recording only the most + * recent timestamp. * * @param the type of the key in the cache * @param the type of the value in the cache * @author wim.deblauwe@gmail.com (Wim Deblauwe) */ -public final class WriteBehindCacheWriter implements BiConsumer, Closeable { - final Subject> subject; - final Disposable subscription; +public final class WriteBehindCacheWriter implements BiConsumer { + private final Subject> subject; private WriteBehindCacheWriter(Builder builder) { subject = PublishSubject.>create().toSerialized(); - subscription = subject.buffer(builder.bufferTimeNanos, TimeUnit.NANOSECONDS) + subject.buffer(builder.bufferTime.toNanos(), TimeUnit.NANOSECONDS) .map(entries -> entries.stream().collect( toMap(Entry::getKey, Entry::getValue, builder.coalescer))) .subscribe(builder.writeAction::accept); @@ -64,44 +63,40 @@ private WriteBehindCacheWriter(Builder builder) { subject.onNext(Map.entry(key, value)); } - @Override public void close() { - subscription.dispose(); - } - public static final class Builder { private Consumer> writeAction; private BinaryOperator coalescer; - private long bufferTimeNanos; + private Duration bufferTime; /** * The duration that the calls to the cache should be buffered before calling the * {@code writeAction}. */ @CanIgnoreReturnValue - public Builder bufferTime(long duration, TimeUnit unit) { - this.bufferTimeNanos = TimeUnit.NANOSECONDS.convert(duration, unit); + public Builder bufferTime(Duration duration) { + this.bufferTime = requireNonNull(duration); return this; } - /** The callback to perform the writing to the database or repository. */ + /** The callback to perform the batch write. */ @CanIgnoreReturnValue public Builder writeAction(Consumer> writeAction) { this.writeAction = requireNonNull(writeAction); return this; } - /** The action that decides which value to take in case a key was updated multiple times. */ + /** The strategy that decides which value to take in case a key was updated multiple times. */ @CanIgnoreReturnValue public Builder coalesce(BinaryOperator coalescer) { this.coalescer = requireNonNull(coalescer); return this; } - /** Returns a writer that batches changes to the system of record. */ + /** Returns a writer that batches changes to the data store. */ public WriteBehindCacheWriter build() { requireNonNull(coalescer); + requireNonNull(bufferTime); requireNonNull(writeAction); - requireNonNull(bufferTimeNanos); return new WriteBehindCacheWriter<>(this); } } diff --git a/examples/write-behind-rxjava/src/main/java/com/github/benmanes/caffeine/examples/writebehind/rxjava/package-info.java b/examples/write-behind-rxjava/src/main/java/com/github/benmanes/caffeine/examples/writebehind/rxjava/package-info.java deleted file mode 100644 index ed1e9c2aaf..0000000000 --- a/examples/write-behind-rxjava/src/main/java/com/github/benmanes/caffeine/examples/writebehind/rxjava/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -@CheckReturnValue -package com.github.benmanes.caffeine.examples.writebehind.rxjava; - -import com.google.errorprone.annotations.CheckReturnValue; diff --git a/examples/write-behind-rxjava/src/test/java/com/github/benmanes/caffeine/examples/writebehind/rxjava/WriteBehindCacheWriterTest.java b/examples/write-behind-rxjava/src/test/java/com/github/benmanes/caffeine/examples/writebehind/rxjava/WriteBehindCacheWriterTest.java index 16cff98324..07f82aa857 100644 --- a/examples/write-behind-rxjava/src/test/java/com/github/benmanes/caffeine/examples/writebehind/rxjava/WriteBehindCacheWriterTest.java +++ b/examples/write-behind-rxjava/src/test/java/com/github/benmanes/caffeine/examples/writebehind/rxjava/WriteBehindCacheWriterTest.java @@ -19,8 +19,8 @@ import static org.awaitility.Awaitility.await; import static org.hamcrest.Matchers.is; +import java.time.Duration; import java.time.ZonedDateTime; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -43,23 +43,22 @@ public void singleKey() { var writerCalled = new AtomicBoolean(); // Given this cache... - try (var writer = new WriteBehindCacheWriter.Builder() - .bufferTime(1, TimeUnit.SECONDS) - .writeAction(entries -> writerCalled.set(true)) + var writer = new WriteBehindCacheWriter.Builder() .coalesce(BinaryOperator.maxBy(ZonedDateTime::compareTo)) - .build()) { - Cache cache = Caffeine.newBuilder().build(); - - // When this cache update happens... - cache.asMap().computeIfAbsent(1, key -> { - var value = ZonedDateTime.now(); - writer.accept(key, value); - return value; - }); - - // Then the write behind action is called - await().untilTrue(writerCalled); - } + .writeAction(entries -> writerCalled.set(true)) + .bufferTime(Duration.ofSeconds(1)) + .build(); + Cache cache = Caffeine.newBuilder().build(); + + // When this cache update happens... + cache.asMap().computeIfAbsent(1, key -> { + var value = ZonedDateTime.now(); + writer.accept(key, value); + return value; + }); + + // Then the write behind action is called + await().untilTrue(writerCalled); } @Test @@ -67,25 +66,24 @@ public void multipleKeys() { var numberOfEntries = new AtomicInteger(); // Given this cache... - try (var writer = new WriteBehindCacheWriter.Builder() - .bufferTime(1, TimeUnit.SECONDS) - .coalesce(BinaryOperator.maxBy(ZonedDateTime::compareTo)) + var writer = new WriteBehindCacheWriter.Builder() .writeAction(entries -> numberOfEntries.addAndGet(entries.size())) - .build()) { - Cache cache = Caffeine.newBuilder().build(); - - // When these cache updates happen... - for (int i = 1; i <= 3; i++) { - cache.asMap().computeIfAbsent(i, key -> { - var value = ZonedDateTime.now(); - writer.accept(key, value); - return value; - }); - } - - // Then the write behind action gets 3 entries to write - await().untilAtomic(numberOfEntries, is(3)); + .coalesce(BinaryOperator.maxBy(ZonedDateTime::compareTo)) + .bufferTime(Duration.ofSeconds(1)) + .build(); + Cache cache = Caffeine.newBuilder().build(); + + // When these cache updates happen... + for (int i = 1; i <= 3; i++) { + cache.asMap().computeIfAbsent(i, key -> { + var value = ZonedDateTime.now(); + writer.accept(key, value); + return value; + }); } + + // Then the write behind action gets 3 entries to write + await().untilAtomic(numberOfEntries, is(3)); } @Test @@ -94,9 +92,9 @@ public void singleKey_mostRecent() { var numberOfEntries = new AtomicInteger(); // Given this cache... - try (var writer = new WriteBehindCacheWriter.Builder() - .bufferTime(1, TimeUnit.SECONDS) + var writer = new WriteBehindCacheWriter.Builder() .coalesce(BinaryOperator.maxBy(ZonedDateTime::compareTo)) + .bufferTime(Duration.ofSeconds(1)) .writeAction(entries -> { // We might get here before the cache has been written to, // so just wait for the next time we are called @@ -107,24 +105,23 @@ public void singleKey_mostRecent() { var zonedDateTime = entries.values().iterator().next(); timeInWriteBehind.set(zonedDateTime); numberOfEntries.set(entries.size()); - }).build()) { - Cache cache = Caffeine.newBuilder().build(); - - // When these cache updates happen... - var latest = ZonedDateTime.now().truncatedTo(DAYS); - for (int i = 0; i < 4; i++) { - latest = latest.plusNanos(200); - - var value = latest; - cache.asMap().compute(1L, (key, oldValue) -> { - writer.accept(key, value); - return value; - }); - } - - // Then the write behind action gets 1 entry to write with the most recent time - await().untilAtomic(numberOfEntries, is(1)); - await().untilAtomic(timeInWriteBehind, is(latest)); + }).build(); + Cache cache = Caffeine.newBuilder().build(); + + // When these cache updates happen... + var latest = ZonedDateTime.now().truncatedTo(DAYS); + for (int i = 0; i < 4; i++) { + latest = latest.plusNanos(200); + + var value = latest; + cache.asMap().compute(1L, (key, oldValue) -> { + writer.accept(key, value); + return value; + }); } + + // Then the write behind action gets 1 entry to write with the most recent time + await().untilAtomic(numberOfEntries, is(1)); + await().untilAtomic(timeInWriteBehind, is(latest)); } } diff --git a/examples/write-behind-rxjava/src/test/java/com/github/benmanes/caffeine/examples/writebehind/rxjava/package-info.java b/examples/write-behind-rxjava/src/test/java/com/github/benmanes/caffeine/examples/writebehind/rxjava/package-info.java deleted file mode 100644 index ed1e9c2aaf..0000000000 --- a/examples/write-behind-rxjava/src/test/java/com/github/benmanes/caffeine/examples/writebehind/rxjava/package-info.java +++ /dev/null @@ -1,4 +0,0 @@ -@CheckReturnValue -package com.github.benmanes.caffeine.examples.writebehind.rxjava; - -import com.google.errorprone.annotations.CheckReturnValue;