Skip to content

Commit

Permalink
applyAsync on a root resource should create a dependency resource onl…
Browse files Browse the repository at this point in the history
…y if it is not already created and root resource always needs to be updated in this case
  • Loading branch information
anuchandy committed Sep 5, 2016
1 parent a55439f commit 0a85d84
Showing 1 changed file with 23 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,31 @@ public Observable<T> executeAsync() {
final List<Observable<T>> observables = new ArrayList<>();
while (nextNode != null) {
final DAGNode<U> thisNode = nextNode;
observables.add(nextNode.data().executeAsync()
.flatMap(new Func1<T, Observable<T>>() {
@Override
public Observable<T> call(T t) {
dag().reportedCompleted(thisNode);
if (dag().isRootNode(thisNode)) {
return Observable.just(t);
} else {
T cachedResult = nextNode.data().result();
if (cachedResult != null && !this.dag().isRootNode(nextNode)) {
observables.add(Observable.just(cachedResult)
.flatMap(new Func1<T, Observable<T>>() {
@Override
public Observable<T> call(T t) {
dag().reportedCompleted(thisNode);
return executeAsync();
}
}
}));
})
);
} else {
observables.add(nextNode.data().executeAsync()
.flatMap(new Func1<T, Observable<T>>() {
@Override
public Observable<T> call(T t) {
dag().reportedCompleted(thisNode);
if (dag().isRootNode(thisNode)) {
return Observable.just(t);
} else {
return executeAsync();
}
}
}));
}
nextNode = dag.getNext();
}
return Observable.merge(observables);
Expand Down

0 comments on commit 0a85d84

Please sign in to comment.