Skip to content

Commit

Permalink
Ignore file owners comparison on restore when config is set (#1684)
Browse files Browse the repository at this point in the history
* Ignore file owners comparison on restore when config is set

* Minor updates

* Style fix in tests

---------

Co-authored-by: Shekhar Sharma <shekhar.sharmaa@gmail.com>
  • Loading branch information
shekhars-li and Shekhars authored Aug 23, 2023
1 parent 0e8ac2e commit fa4008e
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public class BlobStoreConfig extends MapConfig {
public static final String RETRY_POLICY_JITTER_FACTOR = RETRY_POLICY_PREFIX + "jitter.factor";
// random retry delay between -0.1*retry-delay to 0.1*retry-delay
public static final double DEFAULT_RETRY_POLICY_JITTER_FACTOR = 0.1;
// Set whether to compare file owners after restoring blobs from remote store. Useful when the job is started on a new
// machine with new gid/uid or if gid/uid changes due to host migration
public static final String COMPARE_FILE_OWNERS_ON_RESTORE = PREFIX + "compare.file.owners.on.restore";
public static final boolean DEFAULT_COMPARE_FILE_OWNERS_ON_RESTORE = true;

public BlobStoreConfig(Config config) {
super(config);
Expand All @@ -70,4 +74,8 @@ public RetryPolicyConfig getRetryPolicyConfig() {
getInt(RETRY_POLICY_BACKOFF_DELAY_FACTOR, DEFAULT_RETRY_POLICY_BACKOFF_DELAY_FACTOR), ChronoUnit.MILLIS);
return retryPolicyConfig;
}

public boolean shouldCompareFileOwnersOnRestore() {
return getBoolean(COMPARE_FILE_OWNERS_ON_RESTORE, DEFAULT_COMPARE_FILE_OWNERS_ON_RESTORE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public CompletableFuture<Map<String, String>> upload(CheckpointId checkpointId,

long dirDiffStartTime = System.nanoTime();
// get the diff between previous and current store directories
DirDiff dirDiff = DirDiffUtil.getDirDiff(checkpointDir, prevDirIndex, DirDiffUtil.areSameFile(false));
DirDiff dirDiff = DirDiffUtil.getDirDiff(checkpointDir, prevDirIndex, DirDiffUtil.areSameFile(false, true));
metrics.storeDirDiffNs.get(storeName).update(System.nanoTime() - dirDiffStartTime);

DirDiff.Stats stats = DirDiff.getStats(dirDiff);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ public void init(Checkpoint checkpoint, boolean getDeleted) {
@Override
public CompletableFuture<Void> restore() {
return restoreStores(jobName, jobId, taskModel.getTaskName(), storesToRestore, prevStoreSnapshotIndexes, loggedBaseDir,
storageConfig, metrics, storageManagerUtil, blobStoreUtil, dirDiffUtil, executor, false);
storageConfig, metrics, storageManagerUtil, blobStoreUtil, dirDiffUtil, executor, false,
blobStoreConfig.shouldCompareFileOwnersOnRestore());
}

/**
Expand All @@ -174,7 +175,8 @@ public CompletableFuture<Void> restore() {
*/
public CompletableFuture<Void> restore(boolean restoreDeleted) {
return restoreStores(jobName, jobId, taskModel.getTaskName(), storesToRestore, prevStoreSnapshotIndexes,
loggedBaseDir, storageConfig, metrics, storageManagerUtil, blobStoreUtil, dirDiffUtil, executor, restoreDeleted);
loggedBaseDir, storageConfig, metrics, storageManagerUtil, blobStoreUtil, dirDiffUtil, executor, restoreDeleted,
blobStoreConfig.shouldCompareFileOwnersOnRestore());
}

@Override
Expand Down Expand Up @@ -227,7 +229,7 @@ static CompletableFuture<Void> restoreStores(String jobName, String jobId, TaskN
Map<String, Pair<String, SnapshotIndex>> prevStoreSnapshotIndexes,
File loggedBaseDir, StorageConfig storageConfig, BlobStoreRestoreManagerMetrics metrics,
StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil, DirDiffUtil dirDiffUtil,
ExecutorService executor, boolean getDeleted) {
ExecutorService executor, boolean getDeleted, boolean compareFileOwners) {
long restoreStartTime = System.nanoTime();
List<CompletionStage<Void>> restoreFutures = new ArrayList<>();
LOG.debug("Starting restore for task: {} stores: {}", taskName, storesToRestore);
Expand Down Expand Up @@ -288,7 +290,7 @@ static CompletableFuture<Void> restoreStores(String jobName, String jobId, TaskN

metrics.storePreRestoreNs.get(storeName).set(System.nanoTime() - storeRestoreStartTime);
enqueueRestore(jobName, jobId, taskName.toString(), storeName, storeDir, dirIndex, storeRestoreStartTime,
restoreFutures, blobStoreUtil, dirDiffUtil, metrics, executor, getDeleted);
restoreFutures, blobStoreUtil, dirDiffUtil, metrics, executor, getDeleted, compareFileOwners);
} else {
LOG.debug("Renaming store checkpoint directory: {} to store directory: {} since its contents are identical " +
"to the remote snapshot.", storeCheckpointDir, storeDir);
Expand Down Expand Up @@ -320,7 +322,7 @@ static boolean shouldRestore(String taskName, String storeName, DirIndex dirInde
LOG.debug("Restoring task: {} store: {} from remote snapshot since the store is configured to be " +
"restored on each restart.", taskName, storeName);
restoreStore = true;
} else if (dirDiffUtil.areSameDir(FILES_TO_IGNORE, false).test(storeCheckpointDir.toFile(), dirIndex)) {
} else if (dirDiffUtil.areSameDir(FILES_TO_IGNORE, false, true).test(storeCheckpointDir.toFile(), dirIndex)) {
restoreStore = false; // no restore required for this store.
} else {
// we don't optimize for the case when the local host doesn't contain the most recent store checkpoint
Expand Down Expand Up @@ -349,16 +351,19 @@ static boolean shouldRestore(String taskName, String storeName, DirIndex dirInde
@VisibleForTesting
static void enqueueRestore(String jobName, String jobId, String taskName, String storeName, File storeDir, DirIndex dirIndex,
long storeRestoreStartTime, List<CompletionStage<Void>> restoreFutures, BlobStoreUtil blobStoreUtil,
DirDiffUtil dirDiffUtil, BlobStoreRestoreManagerMetrics metrics, ExecutorService executor, boolean getDeleted) {
DirDiffUtil dirDiffUtil, BlobStoreRestoreManagerMetrics metrics, ExecutorService executor, boolean getDeleted,
boolean compareFileOwners) {

Metadata requestMetadata = new Metadata(storeDir.getAbsolutePath(), Optional.empty(), jobName, jobId, taskName, storeName);
CompletableFuture<Void> restoreFuture =
blobStoreUtil.restoreDir(storeDir, dirIndex, requestMetadata, getDeleted).thenRunAsync(() -> {
metrics.storeRestoreNs.get(storeName).set(System.nanoTime() - storeRestoreStartTime);

long postRestoreStartTime = System.nanoTime();
LOG.trace("Comparing restored store directory: {} and remote directory to verify restore.", storeDir);
if (!dirDiffUtil.areSameDir(FILES_TO_IGNORE, true).test(storeDir, dirIndex)) {
LOG.trace(
"Comparing restored store directory: {} and remote directory to verify restore with compareFileOwners set to: {}",
storeDir, compareFileOwners);
if (!dirDiffUtil.areSameDir(FILES_TO_IGNORE, true, compareFileOwners).test(storeDir, dirIndex)) {
metrics.storePostRestoreNs.get(storeName).set(System.nanoTime() - postRestoreStartTime);
throw new SamzaException(
String.format("Restored store directory: %s contents " + "are not the same as the remote snapshot.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,23 @@ public class DirDiffUtil {
/**
* Checks if a local directory and a remote directory are identical. Local and remote directories are identical iff:
* 1. The local directory has exactly the same set of files as the remote directory, and the files are themselves
* identical, as determined by {@link #areSameFile(boolean)}, except for those allowed to differ according to
* identical, as determined by {@link #areSameFile(boolean, boolean)}, except for those allowed to differ according to
* {@code filesToIgnore}.
* 2. The local directory has exactly the same set of sub-directories as the remote directory.
*
* @param filesToIgnore a set of file names to ignore during the directory comparisons
* (does not exclude directory names)
* @param compareLargeFileChecksums whether to compare checksums for large files (&gt; 1 MB).
* @param compareFileOwners whether to compare file owners
* @return boolean indicating whether the local and remote directory are identical.
*/
// TODO HIGH shesharm add unit tests
public BiPredicate<File, DirIndex> areSameDir(Set<String> filesToIgnore, boolean compareLargeFileChecksums) {
public BiPredicate<File, DirIndex> areSameDir(Set<String> filesToIgnore, boolean compareLargeFileChecksums, boolean compareFileOwners) {
return (localDir, remoteDir) -> {
String remoteDirName = remoteDir.getDirName().equals(DirIndex.ROOT_DIR_NAME) ? "root" : remoteDir.getDirName();
LOG.debug("Creating diff between local dir: {} and remote dir: {} for comparison.",
localDir.getAbsolutePath(), remoteDirName);
DirDiff dirDiff = DirDiffUtil.getDirDiff(localDir, remoteDir, DirDiffUtil.areSameFile(compareLargeFileChecksums));
DirDiff dirDiff = DirDiffUtil.getDirDiff(localDir, remoteDir, DirDiffUtil.areSameFile(compareLargeFileChecksums, compareFileOwners));

boolean areSameDir = true;
List<String> filesRemoved = dirDiff.getFilesRemoved().stream()
Expand Down Expand Up @@ -129,7 +130,7 @@ public BiPredicate<File, DirIndex> areSameDir(Set<String> filesToIgnore, boolean
String localSubDirName = subDirRetained.getDirName();
File localSubDirFile = Paths.get(localDir.getAbsolutePath(), localSubDirName).toFile();
DirIndex remoteSubDir = remoteSubDirs.get(localSubDirName);
boolean areSameSubDir = areSameDir(filesToIgnore, false).test(localSubDirFile, remoteSubDir);
boolean areSameSubDir = areSameDir(filesToIgnore, false, compareFileOwners).test(localSubDirFile, remoteSubDir);
if (!areSameSubDir) {
LOG.debug("Local sub-dir: {} and remote sub-dir: {} are not same.",
localSubDirFile.getAbsolutePath(), remoteSubDir.getDirName());
Expand All @@ -148,9 +149,11 @@ public BiPredicate<File, DirIndex> areSameDir(Set<String> filesToIgnore, boolean
* the same file. Files with same attributes as well as content are same file. A SST file in a special case. They are
* immutable, so we only compare their attributes but not the content.
* @param compareLargeFileChecksums whether to compare checksums for large files (&gt; 1 MB).
* @param compareFileOwners whether to compare owners of the files. Useful when migrating an exiting job to new machine(s)
* that may have different owner ids.
* @return BiPredicate to test similarity of local and remote files
*/
public static BiPredicate<File, FileIndex> areSameFile(boolean compareLargeFileChecksums) {
public static BiPredicate<File, FileIndex> areSameFile(boolean compareLargeFileChecksums, boolean compareFileOwners) {

// Cache owner/group names to reduce calls to sun.nio.fs.UnixFileAttributes.group
Cache<String, String> groupCache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).build();
Expand All @@ -168,11 +171,17 @@ public static BiPredicate<File, FileIndex> areSameFile(boolean compareLargeFileC

// Don't compare file timestamps. The ctime of a local file just restored will be different than the
// remote file, and will cause the file to be uploaded again during the first commit after restore.
areSameFiles = localFileAttrs.size() == remoteFileMetadata.getSize() &&
groupCache.get(String.valueOf(Files.getAttribute(localFile.toPath(), "unix:gid")),
() -> localFileAttrs.group().getName()).equals(remoteFileMetadata.getGroup()) &&
ownerCache.get(String.valueOf(Files.getAttribute(localFile.toPath(), "unix:uid")),
() -> localFileAttrs.owner().getName()).equals(remoteFileMetadata.getOwner());
areSameFiles = localFileAttrs.size() == remoteFileMetadata.getSize();
// In case a job is moved to a new cluster/machine, the owners (gid/uid) may be different than the one present
// in the remote snapshot. This flag indicates if we should compare it at all.
if (compareFileOwners) {
LOG.trace("Comparing owners of remote and local copy of file {}", localFile.getAbsolutePath());
areSameFiles =
areSameFiles && groupCache.get(String.valueOf(Files.getAttribute(localFile.toPath(), "unix:gid")),
() -> localFileAttrs.group().getName()).equals(remoteFileMetadata.getGroup()) && ownerCache.get(
String.valueOf(Files.getAttribute(localFile.toPath(), "unix:uid")),
() -> localFileAttrs.owner().getName()).equals(remoteFileMetadata.getOwner());
}

} catch (IOException | ExecutionException e) {
LOG.error("Error reading attributes for file: {}", localFile.getAbsolutePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ public String answer(InvocationOnMock invocation) throws Throwable {
File localCheckpointDir = new File(localRemoteSnapshotPair.getLeft() + "-" + checkpointId.serialize());
DirIndex dirIndex = new DirIndex(localCheckpointDir.getName(), Collections.emptyList(), Collections.emptyList(),
Collections.emptyList(), Collections.emptyList());
return DirDiffUtil.getDirDiff(localCheckpointDir, dirIndex, DirDiffUtil.areSameFile(false));
return DirDiffUtil.getDirDiff(localCheckpointDir, dirIndex, DirDiffUtil.areSameFile(false, true));
}).collect(Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing(DirDiff::getDirName))));

// assert - asset all DirDiff are put to blob store
Expand Down Expand Up @@ -369,7 +369,7 @@ public String answer(InvocationOnMock invocation) throws Throwable {
.stream()
.map(localRemoteSnapshotPair ->
DirDiffUtil.getDirDiff(new File(localRemoteSnapshotPair.getLeft() + "-" + checkpointId.serialize()),
localRemoteSnapshotPair.getRight().getDirIndex(), DirDiffUtil.areSameFile(false)))
localRemoteSnapshotPair.getRight().getDirIndex(), DirDiffUtil.areSameFile(false, true)))
.collect(Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing(DirDiff::getDirName))));

// assert - asset all DirDiff are put to blob store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void testShouldRestoreIfCheckpointDirNotIdenticalToRemoteSnapshot() throw
StorageConfig storageConfig = mock(StorageConfig.class);
when(storageConfig.cleanLoggedStoreDirsOnStart(anyString())).thenReturn(false);
DirDiffUtil dirDiffUtil = mock(DirDiffUtil.class);
when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1, arg2) -> false);
when(dirDiffUtil.areSameDir(anySet(), anyBoolean(), anyBoolean())).thenReturn((arg1, arg2) -> false);

boolean shouldRestore = BlobStoreRestoreManager.shouldRestore(
taskName, storeName, dirIndex, storeCheckpointDir, storageConfig, dirDiffUtil);
Expand All @@ -158,12 +158,12 @@ public void testShouldNotRestoreIfPreviousCheckpointDirIdenticalToRemoteSnapshot
StorageConfig storageConfig = mock(StorageConfig.class);
when(storageConfig.cleanLoggedStoreDirsOnStart(anyString())).thenReturn(false);
DirDiffUtil dirDiffUtil = mock(DirDiffUtil.class);
when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1, arg2) -> true); // are same dir
when(dirDiffUtil.areSameDir(anySet(), anyBoolean(), anyBoolean())).thenReturn((arg1, arg2) -> true); // are same dir

boolean shouldRestore = BlobStoreRestoreManager.shouldRestore(
taskName, storeName, dirIndex, storeCheckpointDir, storageConfig, dirDiffUtil);

verify(dirDiffUtil, times(1)).areSameDir(anySet(), anyBoolean());
verify(dirDiffUtil, times(1)).areSameDir(anySet(), anyBoolean(), anyBoolean());
assertFalse(shouldRestore); // should not restore, should retain checkpoint dir instead
}

Expand Down Expand Up @@ -200,11 +200,11 @@ public void testRestoreDeletesStoreDir() throws IOException {
// return immediately without restoring.
when(blobStoreUtil.restoreDir(eq(storeDir.toFile()), eq(dirIndex), any(Metadata.class), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(null));
when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1, arg2) -> true);
when(dirDiffUtil.areSameDir(anySet(), anyBoolean(), anyBoolean())).thenReturn((arg1, arg2) -> true);

BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName, storesToRestore, prevStoreSnapshotIndexes,
loggedBaseDir.toFile(), storageConfig, metrics,
storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false);
storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false, true);

// verify that the store directory restore was called and skipped (i.e. shouldRestore == true)
verify(blobStoreUtil, times(1)).restoreDir(eq(storeDir.toFile()), eq(dirIndex), any(Metadata.class), anyBoolean());
Expand Down Expand Up @@ -249,14 +249,14 @@ public void testRestoreDeletesCheckpointDirsIfRestoring() throws IOException {
BlobStoreUtil blobStoreUtil = mock(BlobStoreUtil.class);
DirDiffUtil dirDiffUtil = mock(DirDiffUtil.class);

when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1, arg2) -> true);
when(dirDiffUtil.areSameDir(anySet(), anyBoolean(), anyBoolean())).thenReturn((arg1, arg2) -> true);
// return immediately without restoring.
when(blobStoreUtil.restoreDir(eq(storeDir.toFile()), eq(dirIndex), any(Metadata.class), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(null));

BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName, storesToRestore, prevStoreSnapshotIndexes,
loggedBaseDir.toFile(), storageConfig, metrics,
storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false);
storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false, true);

// verify that the store directory restore was called and skipped (i.e. shouldRestore == true)
verify(blobStoreUtil, times(1)).restoreDir(eq(storeDir.toFile()), eq(dirIndex), any(Metadata.class), anyBoolean());
Expand Down Expand Up @@ -305,14 +305,14 @@ public void testRestoreRetainsCheckpointDirsIfValid() throws IOException {
DirDiffUtil dirDiffUtil = mock(DirDiffUtil.class);

// ensures shouldRestore is not called
when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1, arg2) -> true);
when(dirDiffUtil.areSameDir(anySet(), anyBoolean(), anyBoolean())).thenReturn((arg1, arg2) -> true);
// return immediately without restoring.
when(blobStoreUtil.restoreDir(eq(storeDir.toFile()), eq(dirIndex), any(Metadata.class), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(null));

BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName, storesToRestore, prevStoreSnapshotIndexes,
loggedBaseDir.toFile(), storageConfig, metrics,
storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false);
storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false, true);

// verify that the store directory restore was not called (should have restored from checkpoint dir)
verify(blobStoreUtil, times(0)).restoreDir(eq(storeDir.toFile()), eq(dirIndex), any(Metadata.class), anyBoolean());
Expand Down Expand Up @@ -349,7 +349,7 @@ public void testRestoreSkipsStoresWithMissingCheckpointSCM() {

BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName, storesToRestore, prevStoreSnapshotIndexes,
loggedBaseDir.toFile(), storageConfig, metrics,
storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false);
storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false, true);

// verify that we checked the previously checkpointed SCMs.
verify(prevStoreSnapshotIndexes, times(1)).containsKey(eq("newStoreName"));
Expand Down
Loading

0 comments on commit fa4008e

Please sign in to comment.