Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding basic FeedRanges API #17570

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
21e25e7
Initial draft of FeedRange artifacts
FabianMeiswinkel Nov 4, 2020
6e69300
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Nov 4, 2020
40ef415
Iterating on FeedRange Apis
FabianMeiswinkel Nov 5, 2020
54bed2e
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Nov 5, 2020
58f98e7
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Nov 11, 2020
46662e0
Adding public surface area
FabianMeiswinkel Nov 12, 2020
03617a4
Adding FeedRange unit tests
FabianMeiswinkel Nov 12, 2020
b7de6b3
Adding test FeedRangePKRangeId_GetEffectiveRangesAsync_Refresh
FabianMeiswinkel Nov 13, 2020
4095bc5
Adding test FeedRangePKRangeId_GetEffectiveRangesAsync_Null
FabianMeiswinkel Nov 13, 2020
8e864be
Adding test feedRangeEPK_getPartitionKeyRangesAsync
FabianMeiswinkel Nov 13, 2020
4ebd7cb
Adding test feedRangePK_getPartitionKeyRangesAsync
FabianMeiswinkel Nov 13, 2020
2e6eb79
Adding test feedRangePKRangeId_getPartitionKeyRangesAsync
FabianMeiswinkel Nov 13, 2020
dc4c66c
Adding request visitor unit tests
FabianMeiswinkel Nov 13, 2020
0ef3170
Finishing FeedRange tests
FabianMeiswinkel Nov 13, 2020
ae9dd96
Cleanup and prettifying
FabianMeiswinkel Nov 13, 2020
4811e50
Prettifying feed range tests
FabianMeiswinkel Nov 13, 2020
6515ff5
Fixes and new test for Conatiner.getFeedRanges()
FabianMeiswinkel Nov 13, 2020
30e162e
Addressing some SpotBug violations
FabianMeiswinkel Nov 13, 2020
b08488a
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Nov 16, 2020
9a4be27
Merge branch 'master' of https://github.com/Azure/azure-sdk-for-java …
FabianMeiswinkel Nov 17, 2020
8ac208a
Reacting to code review feedback
FabianMeiswinkel Nov 17, 2020
ee60826
Update sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/impleme…
FabianMeiswinkel Nov 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3682,7 +3682,7 @@ private static List<FeedRange> toFeedRanges(
throw new IllegalStateException("PartitionKeyRange list cannot be null");
}

List<FeedRange> feedRanges = new ArrayList<FeedRange>();
List<FeedRange> feedRanges = new ArrayList<FeedRange>(partitionKeyRangeList.size());
partitionKeyRangeList.forEach(pkRange -> {
feedRanges.add(toFeedRange(pkRange));
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation;
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved

import java.time.Duration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
import reactor.core.publisher.Mono;

abstract class FeedRangeAsyncVisitor<TResult> {
public abstract Mono<TResult> visitAsync(FeedRangePartitionKeyImpl feedRange);
public abstract Mono<TResult> visit(FeedRangePartitionKeyImpl feedRange);

public abstract Mono<TResult> visitAsync(FeedRangePartitionKeyRangeImpl feedRange);
public abstract Mono<TResult> visit(FeedRangePartitionKeyRangeImpl feedRange);

public abstract Mono<TResult> visitAsync(FeedRangeEpkImpl feedRange);
public abstract Mono<TResult> visit(FeedRangeEpkImpl feedRange);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
import reactor.core.publisher.Mono;

abstract class FeedRangeAsyncVisitorWithArg<TResult, TArg> {
public abstract Mono<TResult> visitAsync(FeedRangePartitionKeyImpl feedRange, TArg argument);
public abstract Mono<TResult> visit(FeedRangePartitionKeyImpl feedRange, TArg argument);

public abstract Mono<TResult> visitAsync(FeedRangePartitionKeyRangeImpl feedRange,
TArg argument);
public abstract Mono<TResult> visit(FeedRangePartitionKeyRangeImpl feedRange,
TArg argument);

public abstract Mono<TResult> visitAsync(FeedRangeEpkImpl feedRange, TArg argument);
public abstract Mono<TResult> visit(FeedRangeEpkImpl feedRange, TArg argument);
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache;
import com.azure.cosmos.implementation.directconnectivity.GatewayAddressCache;
import com.azure.cosmos.implementation.query.CompositeContinuationToken;
import com.azure.cosmos.implementation.routing.Range;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.io.IOException;
Expand All @@ -32,6 +35,7 @@
*/
final class FeedRangeCompositeContinuationImpl extends FeedRangeContinuation {

private final static Logger LOGGER = LoggerFactory.getLogger(FeedRangeCompositeContinuationImpl.class);
private final static ShouldRetryResult NO_RETRY = ShouldRetryResult.noRetry();
private final static ShouldRetryResult RETRY = ShouldRetryResult.retryAfter(Duration.ZERO);
private final Queue<CompositeContinuationToken> compositeContinuationTokens;
Expand Down Expand Up @@ -139,10 +143,7 @@ public void validateContainer(final String containerRid) throws IllegalArgumentE

@Override
public ShouldRetryResult handleChangeFeedNotModified(final RxDocumentServiceResponse response) {
if (response == null) {
throw new NullPointerException("response");
}

checkNotNull(response, "Argument 'response' must not be null");
final int statusCode = response.getStatusCode();
if (statusCode >= HttpConstants.StatusCodes.MINIMUM_SUCCESS_STATUSCODE
&& statusCode <= HttpConstants.StatusCodes.MAXIMUM_SUCCESS_STATUSCODE) {
Expand Down Expand Up @@ -171,16 +172,11 @@ public ShouldRetryResult handleChangeFeedNotModified(final RxDocumentServiceResp
}

@Override
public Mono<ShouldRetryResult> handleSplitAsync(final RxDocumentClientImpl client,
final RxDocumentServiceResponse response) {
public Mono<ShouldRetryResult> handleSplit(final RxDocumentClientImpl client,
final RxDocumentServiceResponse response) {

if (client == null) {
throw new NullPointerException("client");
}

if (response == null) {
throw new NullPointerException("response");
}
checkNotNull(client, "Argument 'client' must not be null");
checkNotNull(response, "Argument 'response' must not be null");

Integer nSubStatus = 0;
final String valueSubStatus =
Expand All @@ -200,7 +196,7 @@ public Mono<ShouldRetryResult> handleSplitAsync(final RxDocumentClientImpl clien

final RxPartitionKeyRangeCache partitionKeyRangeCache = client.getPartitionKeyRangeCache();
final Mono<Utils.ValueHolder<List<PartitionKeyRange>>> resolvedRangesTask =
this.tryGetOverlappingRangesAsync(
this.tryGetOverlappingRanges(
partitionKeyRangeCache, this.currentToken.getRange().getMin(),
this.currentToken.getRange().getMax(),
true);
Expand All @@ -216,10 +212,7 @@ public Mono<ShouldRetryResult> handleSplitAsync(final RxDocumentClientImpl clien

@Override
public void accept(final FeedRangeContinuationVisitor visitor) {
if (visitor == null) {
throw new NullPointerException("visitor");
}

checkNotNull(visitor, "Argument 'visitor' must not be null");
visitor.visit(this);
}

Expand Down Expand Up @@ -248,12 +241,8 @@ public static FeedRangeCompositeContinuationImpl createFromDeserializedTokens(
}

public static FeedRangeContinuation parse(final String jsonString) throws IOException {
if (jsonString == null) {
throw new NullPointerException("jsonString");
}

checkNotNull(jsonString, "Argument 'jsonString' must not be null");
final ObjectMapper mapper = Utils.getSimpleObjectMapper();

return mapper.readValue(jsonString, FeedRangeCompositeContinuationImpl.class);
}

Expand Down Expand Up @@ -330,7 +319,7 @@ private void moveToNextToken() {
}
}

private Mono<Utils.ValueHolder<List<PartitionKeyRange>>> tryGetOverlappingRangesAsync(
private Mono<Utils.ValueHolder<List<PartitionKeyRange>>> tryGetOverlappingRanges(
final RxPartitionKeyRangeCache partitionKeyRangeCache, final String min, final String max,
final Boolean forceRefresh) {

Expand Down Expand Up @@ -360,6 +349,10 @@ private static CompositeContinuationToken tryParseAsCompositeContinuationToken(

return null;
} catch (final IOException ioError) {
LOGGER.debug(
"Failed to parse as composite continuation token JSON ",
providedContinuation,
ioError);
return null;
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static FeedRangeContinuation tryParse(String toStringValue) {
public abstract ShouldRetryResult handleChangeFeedNotModified(
RxDocumentServiceResponse responseMessage);

public abstract Mono<ShouldRetryResult> handleSplitAsync(
public abstract Mono<ShouldRetryResult> handleSplit(
RxDocumentClientImpl client,
RxDocumentServiceResponse responseMessage);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Objects;

import static com.azure.cosmos.BridgeInternal.setProperty;
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

final class FeedRangeEpkImpl extends FeedRangeInternal {
private static final FeedRangeEpkImpl fullRangeEPK =
Expand All @@ -27,10 +28,7 @@ final class FeedRangeEpkImpl extends FeedRangeInternal {
private final UnmodifiableList<Range<String>> rangeList;

public FeedRangeEpkImpl(final Range<String> range) {
if (range == null) {
throw new NullPointerException("range");
}

checkNotNull(range, "Argument 'range' must not be null");
this.range = range;
final ArrayList<Range<String>> temp = new ArrayList<>();
temp.add(range);
Expand All @@ -48,33 +46,24 @@ public static FeedRangeEpkImpl forFullRange() {

@Override
public void accept(final FeedRangeVisitor visitor) {
if (visitor == null) {
throw new NullPointerException("visitor");
}

checkNotNull(visitor, "Argument 'visitor' must not be null");
visitor.visit(this);
}

@Override
public <TInput> void accept(GenericFeedRangeVisitor<TInput> visitor, TInput input) {
if (visitor == null) {
throw new NullPointerException("visitor");
}

checkNotNull(visitor, "Argument 'visitor' must not be null");
visitor.visit(this, input);
}

@Override
public <T> Mono<T> acceptAsync(final FeedRangeAsyncVisitor<T> visitor) {
if (visitor == null) {
throw new NullPointerException("visitor");
}

return visitor.visitAsync(this);
public <T> Mono<T> accept(final FeedRangeAsyncVisitor<T> visitor) {
checkNotNull(visitor, "Argument 'visitor' must not be null");
return visitor.visit(this);
}

@Override
public Mono<UnmodifiableList<Range<String>>> getEffectiveRangesAsync(
public Mono<UnmodifiableList<Range<String>>> getEffectiveRanges(
final IRoutingMapProvider routingMapProvider,
final String containerRid,
final PartitionKeyDefinition partitionKeyDefinition) {
Expand All @@ -83,7 +72,7 @@ public Mono<UnmodifiableList<Range<String>>> getEffectiveRangesAsync(
}

@Override
public Mono<UnmodifiableList<String>> getPartitionKeyRangesAsync(
public Mono<UnmodifiableList<String>> getPartitionKeyRanges(
final IRoutingMapProvider routingMapProvider,
final String containerRid,
final PartitionKeyDefinition partitionKeyDefinition) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,25 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.io.IOException;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

public abstract class FeedRangeInternal extends JsonSerializable implements FeedRange {
private final static Logger LOGGER = LoggerFactory.getLogger(FeedRangeInternal.class);

public abstract void accept(FeedRangeVisitor visitor);

public abstract <TInput> void accept(GenericFeedRangeVisitor<TInput> visitor, TInput input);

public abstract <T> Mono<T> acceptAsync(FeedRangeAsyncVisitor<T> visitor);
public abstract <T> Mono<T> accept(FeedRangeAsyncVisitor<T> visitor);

public static FeedRangeInternal convert(final FeedRange feedRange) {
if (feedRange == null) {
throw new NullPointerException("feedRange");
}

checkNotNull(feedRange, "Argument 'feedRange' must not be null");
if (feedRange instanceof FeedRangeInternal) {
return (FeedRangeInternal)feedRange;
}
Expand All @@ -47,14 +49,7 @@ public static FeedRangeInternal convert(final FeedRange feedRange) {
* @return A feed range
*/
public static FeedRangeInternal fromJsonString(String json) {
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
FeedRangeInternal parsedRange;

try {
parsedRange = FeedRangeInternal.tryParse(json);
} catch (IOException e) {
throw new IllegalArgumentException(
String.format("Unable to parse JSON %s", json), e);
}
FeedRangeInternal parsedRange = FeedRangeInternal.tryParse(json);

if (parsedRange == null) {
throw new IllegalArgumentException(
Expand All @@ -66,12 +61,12 @@ public static FeedRangeInternal fromJsonString(String json) {
return parsedRange;
}

public abstract Mono<UnmodifiableList<Range<String>>> getEffectiveRangesAsync(
public abstract Mono<UnmodifiableList<Range<String>>> getEffectiveRanges(
IRoutingMapProvider routingMapProvider,
String containerRid,
PartitionKeyDefinition partitionKeyDefinition);

public abstract Mono<UnmodifiableList<String>> getPartitionKeyRangesAsync(
public abstract Mono<UnmodifiableList<String>> getPartitionKeyRanges(
IRoutingMapProvider routingMapProvider,
String containerRid,
PartitionKeyDefinition partitionKeyDefinition);
Expand All @@ -88,32 +83,36 @@ public String toJsonString() {
return this.toJson();
}

public static FeedRangeInternal tryParse(final String jsonString) throws IOException {
if (jsonString == null) {
throw new NullPointerException("jsonString");
}

public static FeedRangeInternal tryParse(final String jsonString) {
checkNotNull(jsonString, "Argument 'jsonString' must not be null");
final ObjectMapper mapper = Utils.getSimpleObjectMapper();
JsonNode rootNode = mapper.readTree(jsonString);

JsonNode rangeNode = rootNode.get(Constants.Properties.RANGE);
if (rangeNode != null && rangeNode.isObject()) {
Range<String> range = new Range<>((ObjectNode)rangeNode);
return new FeedRangeEpkImpl(range);
}

JsonNode pkNode = rootNode.get(Constants.Properties.FEED_RANGE_PARTITION_KEY);
if (pkNode != null && pkNode.isArray()) {
PartitionKeyInternal pk = mapper.convertValue(pkNode, PartitionKeyInternal.class);
return new FeedRangePartitionKeyImpl(pk);
}

JsonNode pkRangeIdNode =
rootNode.get(Constants.Properties.FEED_RANGE_PARTITION_KEY_RANGE_ID);
if (pkRangeIdNode != null && pkRangeIdNode.isTextual()) {
return new FeedRangePartitionKeyRangeImpl(pkRangeIdNode.asText());
try {
JsonNode rootNode = mapper.readTree(jsonString);

JsonNode rangeNode = rootNode.get(Constants.Properties.RANGE);
if (rangeNode != null && rangeNode.isObject()) {
Range<String> range = new Range<>((ObjectNode)rangeNode);
return new FeedRangeEpkImpl(range);
}

JsonNode pkNode = rootNode.get(Constants.Properties.FEED_RANGE_PARTITION_KEY);
if (pkNode != null && pkNode.isArray()) {
PartitionKeyInternal pk = mapper.convertValue(pkNode, PartitionKeyInternal.class);
return new FeedRangePartitionKeyImpl(pk);
}

JsonNode pkRangeIdNode =
rootNode.get(Constants.Properties.FEED_RANGE_PARTITION_KEY_RANGE_ID);
if (pkRangeIdNode != null && pkRangeIdNode.isTextual()) {
return new FeedRangePartitionKeyRangeImpl(pkRangeIdNode.asText());
}

return null;

} catch (final IOException ioError) {
LOGGER.debug("Failed to parse feed range JSON %s", jsonString, ioError);
FabianMeiswinkel marked this conversation as resolved.
Show resolved Hide resolved
return null;
}

return null;
}
}
Loading