From fa4008ee9c3f6b9499dfdceca1427b76a665420c Mon Sep 17 00:00:00 2001 From: Shekhar Sharma <72765053+shekhars-li@users.noreply.github.com> Date: Wed, 23 Aug 2023 15:45:23 -0700 Subject: [PATCH] Ignore file owners comparison on restore when config is set (#1684) * Ignore file owners comparison on restore when config is set * Minor updates * Style fix in tests --------- Co-authored-by: Shekhar Sharma --- .../apache/samza/config/BlobStoreConfig.java | 8 ++ .../blobstore/BlobStoreBackupManager.java | 2 +- .../blobstore/BlobStoreRestoreManager.java | 21 +++-- .../storage/blobstore/util/DirDiffUtil.java | 29 ++++--- .../blobstore/TestBlobStoreBackupManager.java | 4 +- .../TestBlobStoreRestoreManager.java | 20 ++--- .../blobstore/util/TestBlobStoreUtil.java | 80 ++++++++++++++++--- .../util/TestDirDiffUtilAreSameFile.java | 18 ++++- 8 files changed, 138 insertions(+), 44 deletions(-) diff --git a/samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java b/samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java index c9a7ee1b0c..9ed2d51d61 100644 --- a/samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/BlobStoreConfig.java @@ -47,6 +47,10 @@ public class BlobStoreConfig extends MapConfig { public static final String RETRY_POLICY_JITTER_FACTOR = RETRY_POLICY_PREFIX + "jitter.factor"; // random retry delay between -0.1*retry-delay to 0.1*retry-delay public static final double DEFAULT_RETRY_POLICY_JITTER_FACTOR = 0.1; + // Set whether to compare file owners after restoring blobs from remote store. Useful when the job is started on a new + // machine with new gid/uid or if gid/uid changes due to host migration + public static final String COMPARE_FILE_OWNERS_ON_RESTORE = PREFIX + "compare.file.owners.on.restore"; + public static final boolean DEFAULT_COMPARE_FILE_OWNERS_ON_RESTORE = true; public BlobStoreConfig(Config config) { super(config); @@ -70,4 +74,8 @@ public RetryPolicyConfig getRetryPolicyConfig() { getInt(RETRY_POLICY_BACKOFF_DELAY_FACTOR, DEFAULT_RETRY_POLICY_BACKOFF_DELAY_FACTOR), ChronoUnit.MILLIS); return retryPolicyConfig; } + + public boolean shouldCompareFileOwnersOnRestore() { + return getBoolean(COMPARE_FILE_OWNERS_ON_RESTORE, DEFAULT_COMPARE_FILE_OWNERS_ON_RESTORE); + } } diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java index 5f87de95bb..997c5e6ca4 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreBackupManager.java @@ -207,7 +207,7 @@ public CompletableFuture> upload(CheckpointId checkpointId, long dirDiffStartTime = System.nanoTime(); // get the diff between previous and current store directories - DirDiff dirDiff = DirDiffUtil.getDirDiff(checkpointDir, prevDirIndex, DirDiffUtil.areSameFile(false)); + DirDiff dirDiff = DirDiffUtil.getDirDiff(checkpointDir, prevDirIndex, DirDiffUtil.areSameFile(false, true)); metrics.storeDirDiffNs.get(storeName).update(System.nanoTime() - dirDiffStartTime); DirDiff.Stats stats = DirDiff.getStats(dirDiff); diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java index c370c62dfd..a62e2812ca 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/BlobStoreRestoreManager.java @@ -164,7 +164,8 @@ public void init(Checkpoint checkpoint, boolean getDeleted) { @Override public CompletableFuture restore() { return restoreStores(jobName, jobId, taskModel.getTaskName(), storesToRestore, prevStoreSnapshotIndexes, loggedBaseDir, - storageConfig, metrics, storageManagerUtil, blobStoreUtil, dirDiffUtil, executor, false); + storageConfig, metrics, storageManagerUtil, blobStoreUtil, dirDiffUtil, executor, false, + blobStoreConfig.shouldCompareFileOwnersOnRestore()); } /** @@ -174,7 +175,8 @@ public CompletableFuture restore() { */ public CompletableFuture restore(boolean restoreDeleted) { return restoreStores(jobName, jobId, taskModel.getTaskName(), storesToRestore, prevStoreSnapshotIndexes, - loggedBaseDir, storageConfig, metrics, storageManagerUtil, blobStoreUtil, dirDiffUtil, executor, restoreDeleted); + loggedBaseDir, storageConfig, metrics, storageManagerUtil, blobStoreUtil, dirDiffUtil, executor, restoreDeleted, + blobStoreConfig.shouldCompareFileOwnersOnRestore()); } @Override @@ -227,7 +229,7 @@ static CompletableFuture restoreStores(String jobName, String jobId, TaskN Map> prevStoreSnapshotIndexes, File loggedBaseDir, StorageConfig storageConfig, BlobStoreRestoreManagerMetrics metrics, StorageManagerUtil storageManagerUtil, BlobStoreUtil blobStoreUtil, DirDiffUtil dirDiffUtil, - ExecutorService executor, boolean getDeleted) { + ExecutorService executor, boolean getDeleted, boolean compareFileOwners) { long restoreStartTime = System.nanoTime(); List> restoreFutures = new ArrayList<>(); LOG.debug("Starting restore for task: {} stores: {}", taskName, storesToRestore); @@ -288,7 +290,7 @@ static CompletableFuture restoreStores(String jobName, String jobId, TaskN metrics.storePreRestoreNs.get(storeName).set(System.nanoTime() - storeRestoreStartTime); enqueueRestore(jobName, jobId, taskName.toString(), storeName, storeDir, dirIndex, storeRestoreStartTime, - restoreFutures, blobStoreUtil, dirDiffUtil, metrics, executor, getDeleted); + restoreFutures, blobStoreUtil, dirDiffUtil, metrics, executor, getDeleted, compareFileOwners); } else { LOG.debug("Renaming store checkpoint directory: {} to store directory: {} since its contents are identical " + "to the remote snapshot.", storeCheckpointDir, storeDir); @@ -320,7 +322,7 @@ static boolean shouldRestore(String taskName, String storeName, DirIndex dirInde LOG.debug("Restoring task: {} store: {} from remote snapshot since the store is configured to be " + "restored on each restart.", taskName, storeName); restoreStore = true; - } else if (dirDiffUtil.areSameDir(FILES_TO_IGNORE, false).test(storeCheckpointDir.toFile(), dirIndex)) { + } else if (dirDiffUtil.areSameDir(FILES_TO_IGNORE, false, true).test(storeCheckpointDir.toFile(), dirIndex)) { restoreStore = false; // no restore required for this store. } else { // we don't optimize for the case when the local host doesn't contain the most recent store checkpoint @@ -349,7 +351,8 @@ static boolean shouldRestore(String taskName, String storeName, DirIndex dirInde @VisibleForTesting static void enqueueRestore(String jobName, String jobId, String taskName, String storeName, File storeDir, DirIndex dirIndex, long storeRestoreStartTime, List> restoreFutures, BlobStoreUtil blobStoreUtil, - DirDiffUtil dirDiffUtil, BlobStoreRestoreManagerMetrics metrics, ExecutorService executor, boolean getDeleted) { + DirDiffUtil dirDiffUtil, BlobStoreRestoreManagerMetrics metrics, ExecutorService executor, boolean getDeleted, + boolean compareFileOwners) { Metadata requestMetadata = new Metadata(storeDir.getAbsolutePath(), Optional.empty(), jobName, jobId, taskName, storeName); CompletableFuture restoreFuture = @@ -357,8 +360,10 @@ static void enqueueRestore(String jobName, String jobId, String taskName, String metrics.storeRestoreNs.get(storeName).set(System.nanoTime() - storeRestoreStartTime); long postRestoreStartTime = System.nanoTime(); - LOG.trace("Comparing restored store directory: {} and remote directory to verify restore.", storeDir); - if (!dirDiffUtil.areSameDir(FILES_TO_IGNORE, true).test(storeDir, dirIndex)) { + LOG.trace( + "Comparing restored store directory: {} and remote directory to verify restore with compareFileOwners set to: {}", + storeDir, compareFileOwners); + if (!dirDiffUtil.areSameDir(FILES_TO_IGNORE, true, compareFileOwners).test(storeDir, dirIndex)) { metrics.storePostRestoreNs.get(storeName).set(System.nanoTime() - postRestoreStartTime); throw new SamzaException( String.format("Restored store directory: %s contents " + "are not the same as the remote snapshot.", diff --git a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java index 6c2a70694a..b89f42a31e 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java +++ b/samza-core/src/main/java/org/apache/samza/storage/blobstore/util/DirDiffUtil.java @@ -64,22 +64,23 @@ public class DirDiffUtil { /** * Checks if a local directory and a remote directory are identical. Local and remote directories are identical iff: * 1. The local directory has exactly the same set of files as the remote directory, and the files are themselves - * identical, as determined by {@link #areSameFile(boolean)}, except for those allowed to differ according to + * identical, as determined by {@link #areSameFile(boolean, boolean)}, except for those allowed to differ according to * {@code filesToIgnore}. * 2. The local directory has exactly the same set of sub-directories as the remote directory. * * @param filesToIgnore a set of file names to ignore during the directory comparisons * (does not exclude directory names) * @param compareLargeFileChecksums whether to compare checksums for large files (> 1 MB). + * @param compareFileOwners whether to compare file owners * @return boolean indicating whether the local and remote directory are identical. */ // TODO HIGH shesharm add unit tests - public BiPredicate areSameDir(Set filesToIgnore, boolean compareLargeFileChecksums) { + public BiPredicate areSameDir(Set filesToIgnore, boolean compareLargeFileChecksums, boolean compareFileOwners) { return (localDir, remoteDir) -> { String remoteDirName = remoteDir.getDirName().equals(DirIndex.ROOT_DIR_NAME) ? "root" : remoteDir.getDirName(); LOG.debug("Creating diff between local dir: {} and remote dir: {} for comparison.", localDir.getAbsolutePath(), remoteDirName); - DirDiff dirDiff = DirDiffUtil.getDirDiff(localDir, remoteDir, DirDiffUtil.areSameFile(compareLargeFileChecksums)); + DirDiff dirDiff = DirDiffUtil.getDirDiff(localDir, remoteDir, DirDiffUtil.areSameFile(compareLargeFileChecksums, compareFileOwners)); boolean areSameDir = true; List filesRemoved = dirDiff.getFilesRemoved().stream() @@ -129,7 +130,7 @@ public BiPredicate areSameDir(Set filesToIgnore, boolean String localSubDirName = subDirRetained.getDirName(); File localSubDirFile = Paths.get(localDir.getAbsolutePath(), localSubDirName).toFile(); DirIndex remoteSubDir = remoteSubDirs.get(localSubDirName); - boolean areSameSubDir = areSameDir(filesToIgnore, false).test(localSubDirFile, remoteSubDir); + boolean areSameSubDir = areSameDir(filesToIgnore, false, compareFileOwners).test(localSubDirFile, remoteSubDir); if (!areSameSubDir) { LOG.debug("Local sub-dir: {} and remote sub-dir: {} are not same.", localSubDirFile.getAbsolutePath(), remoteSubDir.getDirName()); @@ -148,9 +149,11 @@ public BiPredicate areSameDir(Set filesToIgnore, boolean * the same file. Files with same attributes as well as content are same file. A SST file in a special case. They are * immutable, so we only compare their attributes but not the content. * @param compareLargeFileChecksums whether to compare checksums for large files (> 1 MB). + * @param compareFileOwners whether to compare owners of the files. Useful when migrating an exiting job to new machine(s) + * that may have different owner ids. * @return BiPredicate to test similarity of local and remote files */ - public static BiPredicate areSameFile(boolean compareLargeFileChecksums) { + public static BiPredicate areSameFile(boolean compareLargeFileChecksums, boolean compareFileOwners) { // Cache owner/group names to reduce calls to sun.nio.fs.UnixFileAttributes.group Cache groupCache = CacheBuilder.newBuilder().maximumSize(CACHE_SIZE).build(); @@ -168,11 +171,17 @@ public static BiPredicate areSameFile(boolean compareLargeFileC // Don't compare file timestamps. The ctime of a local file just restored will be different than the // remote file, and will cause the file to be uploaded again during the first commit after restore. - areSameFiles = localFileAttrs.size() == remoteFileMetadata.getSize() && - groupCache.get(String.valueOf(Files.getAttribute(localFile.toPath(), "unix:gid")), - () -> localFileAttrs.group().getName()).equals(remoteFileMetadata.getGroup()) && - ownerCache.get(String.valueOf(Files.getAttribute(localFile.toPath(), "unix:uid")), - () -> localFileAttrs.owner().getName()).equals(remoteFileMetadata.getOwner()); + areSameFiles = localFileAttrs.size() == remoteFileMetadata.getSize(); + // In case a job is moved to a new cluster/machine, the owners (gid/uid) may be different than the one present + // in the remote snapshot. This flag indicates if we should compare it at all. + if (compareFileOwners) { + LOG.trace("Comparing owners of remote and local copy of file {}", localFile.getAbsolutePath()); + areSameFiles = + areSameFiles && groupCache.get(String.valueOf(Files.getAttribute(localFile.toPath(), "unix:gid")), + () -> localFileAttrs.group().getName()).equals(remoteFileMetadata.getGroup()) && ownerCache.get( + String.valueOf(Files.getAttribute(localFile.toPath(), "unix:uid")), + () -> localFileAttrs.owner().getName()).equals(remoteFileMetadata.getOwner()); + } } catch (IOException | ExecutionException e) { LOG.error("Error reading attributes for file: {}", localFile.getAbsolutePath()); diff --git a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java index 4f35521dd4..b481665463 100644 --- a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java +++ b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreBackupManager.java @@ -261,7 +261,7 @@ public String answer(InvocationOnMock invocation) throws Throwable { File localCheckpointDir = new File(localRemoteSnapshotPair.getLeft() + "-" + checkpointId.serialize()); DirIndex dirIndex = new DirIndex(localCheckpointDir.getName(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList()); - return DirDiffUtil.getDirDiff(localCheckpointDir, dirIndex, DirDiffUtil.areSameFile(false)); + return DirDiffUtil.getDirDiff(localCheckpointDir, dirIndex, DirDiffUtil.areSameFile(false, true)); }).collect(Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing(DirDiff::getDirName)))); // assert - asset all DirDiff are put to blob store @@ -369,7 +369,7 @@ public String answer(InvocationOnMock invocation) throws Throwable { .stream() .map(localRemoteSnapshotPair -> DirDiffUtil.getDirDiff(new File(localRemoteSnapshotPair.getLeft() + "-" + checkpointId.serialize()), - localRemoteSnapshotPair.getRight().getDirIndex(), DirDiffUtil.areSameFile(false))) + localRemoteSnapshotPair.getRight().getDirIndex(), DirDiffUtil.areSameFile(false, true))) .collect(Collectors.toCollection(() -> new TreeSet<>(Comparator.comparing(DirDiff::getDirName)))); // assert - asset all DirDiff are put to blob store diff --git a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java index 67d62759ea..6caa69b6e5 100644 --- a/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java +++ b/samza-core/src/test/java/org/apache/samza/storage/blobstore/TestBlobStoreRestoreManager.java @@ -141,7 +141,7 @@ public void testShouldRestoreIfCheckpointDirNotIdenticalToRemoteSnapshot() throw StorageConfig storageConfig = mock(StorageConfig.class); when(storageConfig.cleanLoggedStoreDirsOnStart(anyString())).thenReturn(false); DirDiffUtil dirDiffUtil = mock(DirDiffUtil.class); - when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1, arg2) -> false); + when(dirDiffUtil.areSameDir(anySet(), anyBoolean(), anyBoolean())).thenReturn((arg1, arg2) -> false); boolean shouldRestore = BlobStoreRestoreManager.shouldRestore( taskName, storeName, dirIndex, storeCheckpointDir, storageConfig, dirDiffUtil); @@ -158,12 +158,12 @@ public void testShouldNotRestoreIfPreviousCheckpointDirIdenticalToRemoteSnapshot StorageConfig storageConfig = mock(StorageConfig.class); when(storageConfig.cleanLoggedStoreDirsOnStart(anyString())).thenReturn(false); DirDiffUtil dirDiffUtil = mock(DirDiffUtil.class); - when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1, arg2) -> true); // are same dir + when(dirDiffUtil.areSameDir(anySet(), anyBoolean(), anyBoolean())).thenReturn((arg1, arg2) -> true); // are same dir boolean shouldRestore = BlobStoreRestoreManager.shouldRestore( taskName, storeName, dirIndex, storeCheckpointDir, storageConfig, dirDiffUtil); - verify(dirDiffUtil, times(1)).areSameDir(anySet(), anyBoolean()); + verify(dirDiffUtil, times(1)).areSameDir(anySet(), anyBoolean(), anyBoolean()); assertFalse(shouldRestore); // should not restore, should retain checkpoint dir instead } @@ -200,11 +200,11 @@ public void testRestoreDeletesStoreDir() throws IOException { // return immediately without restoring. when(blobStoreUtil.restoreDir(eq(storeDir.toFile()), eq(dirIndex), any(Metadata.class), anyBoolean())) .thenReturn(CompletableFuture.completedFuture(null)); - when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1, arg2) -> true); + when(dirDiffUtil.areSameDir(anySet(), anyBoolean(), anyBoolean())).thenReturn((arg1, arg2) -> true); BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName, storesToRestore, prevStoreSnapshotIndexes, loggedBaseDir.toFile(), storageConfig, metrics, - storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false); + storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false, true); // verify that the store directory restore was called and skipped (i.e. shouldRestore == true) verify(blobStoreUtil, times(1)).restoreDir(eq(storeDir.toFile()), eq(dirIndex), any(Metadata.class), anyBoolean()); @@ -249,14 +249,14 @@ public void testRestoreDeletesCheckpointDirsIfRestoring() throws IOException { BlobStoreUtil blobStoreUtil = mock(BlobStoreUtil.class); DirDiffUtil dirDiffUtil = mock(DirDiffUtil.class); - when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1, arg2) -> true); + when(dirDiffUtil.areSameDir(anySet(), anyBoolean(), anyBoolean())).thenReturn((arg1, arg2) -> true); // return immediately without restoring. when(blobStoreUtil.restoreDir(eq(storeDir.toFile()), eq(dirIndex), any(Metadata.class), anyBoolean())) .thenReturn(CompletableFuture.completedFuture(null)); BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName, storesToRestore, prevStoreSnapshotIndexes, loggedBaseDir.toFile(), storageConfig, metrics, - storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false); + storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false, true); // verify that the store directory restore was called and skipped (i.e. shouldRestore == true) verify(blobStoreUtil, times(1)).restoreDir(eq(storeDir.toFile()), eq(dirIndex), any(Metadata.class), anyBoolean()); @@ -305,14 +305,14 @@ public void testRestoreRetainsCheckpointDirsIfValid() throws IOException { DirDiffUtil dirDiffUtil = mock(DirDiffUtil.class); // ensures shouldRestore is not called - when(dirDiffUtil.areSameDir(anySet(), anyBoolean())).thenReturn((arg1, arg2) -> true); + when(dirDiffUtil.areSameDir(anySet(), anyBoolean(), anyBoolean())).thenReturn((arg1, arg2) -> true); // return immediately without restoring. when(blobStoreUtil.restoreDir(eq(storeDir.toFile()), eq(dirIndex), any(Metadata.class), anyBoolean())) .thenReturn(CompletableFuture.completedFuture(null)); BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName, storesToRestore, prevStoreSnapshotIndexes, loggedBaseDir.toFile(), storageConfig, metrics, - storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false); + storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false, true); // verify that the store directory restore was not called (should have restored from checkpoint dir) verify(blobStoreUtil, times(0)).restoreDir(eq(storeDir.toFile()), eq(dirIndex), any(Metadata.class), anyBoolean()); @@ -349,7 +349,7 @@ public void testRestoreSkipsStoresWithMissingCheckpointSCM() { BlobStoreRestoreManager.restoreStores(jobName, jobId, taskName, storesToRestore, prevStoreSnapshotIndexes, loggedBaseDir.toFile(), storageConfig, metrics, - storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false); + storageManagerUtil, blobStoreUtil, dirDiffUtil, EXECUTOR, false, true); // verify that we checked the previously checkpointed SCMs. verify(prevStoreSnapshotIndexes, times(1)).containsKey(eq("newStoreName")); diff --git a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java index 5b79315b67..a44f86e644 100644 --- a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java +++ b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestBlobStoreUtil.java @@ -540,12 +540,12 @@ public void testAreSameFile() throws IOException { // checksum should be ignored for sst file. Set any dummy value FileIndex sstFileIndex = new FileIndex(sstFile.getFileName().toString(), Collections.emptyList(), sstFileMetadata, 0L); - assertTrue(DirDiffUtil.areSameFile(false).test(sstFile.toFile(), sstFileIndex)); + assertTrue(DirDiffUtil.areSameFile(false, true).test(sstFile.toFile(), sstFileIndex)); // 2. test with sst file with different timestamps // Update last modified time Files.setLastModifiedTime(sstFile, FileTime.fromMillis(System.currentTimeMillis() + 1000L)); - assertTrue(DirDiffUtil.areSameFile(false).test(sstFile.toFile(), sstFileIndex)); + assertTrue(DirDiffUtil.areSameFile(false, true).test(sstFile.toFile(), sstFileIndex)); // 3. test with non-sst files with same metadata and content Path tmpFile = Files.createTempFile("samza-testAreSameFiles-", ".tmp"); @@ -559,18 +559,18 @@ public void testAreSameFile() throws IOException { FileIndex tmpFileIndex = new FileIndex(tmpFile.getFileName().toString(), Collections.emptyList(), tmpFileMetadata, FileUtils.checksumCRC32(tmpFile.toFile())); - assertTrue(DirDiffUtil.areSameFile(false).test(tmpFile.toFile(), tmpFileIndex)); + assertTrue(DirDiffUtil.areSameFile(false, true).test(tmpFile.toFile(), tmpFileIndex)); // 4. test with non-sst files with different attributes // change lastModifiedTime of local file FileTime prevLastModified = tmpFileAttribs.lastModifiedTime(); Files.setLastModifiedTime(tmpFile, FileTime.fromMillis(System.currentTimeMillis() + 1000L)); - assertTrue(DirDiffUtil.areSameFile(false).test(tmpFile.toFile(), tmpFileIndex)); + assertTrue(DirDiffUtil.areSameFile(false, true).test(tmpFile.toFile(), tmpFileIndex)); // change content/checksum of local file Files.setLastModifiedTime(tmpFile, prevLastModified); // reset attributes to match with remote file fileUtil.writeToTextFile(tmpFile.toFile(), RandomStringUtils.random(1000), false); //new content - assertFalse(DirDiffUtil.areSameFile(false).test(tmpFile.toFile(), tmpFileIndex)); + assertFalse(DirDiffUtil.areSameFile(false, true).test(tmpFile.toFile(), tmpFileIndex)); } @Test @@ -626,7 +626,7 @@ public void testRestoreDirRestoresMultiPartFilesCorrectly() throws IOException { blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), mockDirIndex, metadata, false).join(); assertTrue( - new DirDiffUtil().areSameDir(Collections.emptySet(), false).test(restoreDirBasePath.toFile(), mockDirIndex)); + new DirDiffUtil().areSameDir(Collections.emptySet(), false, true).test(restoreDirBasePath.toFile(), mockDirIndex)); } @Test @@ -684,7 +684,7 @@ public void testRestoreDirRetriesFileRestoreOnRetriableExceptions() throws IOExc blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), mockDirIndex, metadata, false).join(); assertTrue( - new DirDiffUtil().areSameDir(Collections.emptySet(), false).test(restoreDirBasePath.toFile(), mockDirIndex)); + new DirDiffUtil().areSameDir(Collections.emptySet(), false, true).test(restoreDirBasePath.toFile(), mockDirIndex)); } @Test @@ -742,6 +742,66 @@ public void testRestoreDirFailsRestoreOnNonRetriableExceptions() throws IOExcept } } + + @Test + public void testRestoreIgnoresDifferentFileOwnersOnConfig() throws IOException { + Path restoreDirBasePath = Files.createTempDirectory(BlobStoreTestUtil.TEMP_DIR_PREFIX); + + // remote file == 26 blobs, blob ids from a to z, blob contents from a to z, offsets 0 to 25. + DirIndex mockDirIndex = mock(DirIndex.class); + when(mockDirIndex.getDirName()).thenReturn(DirIndex.ROOT_DIR_NAME); + FileIndex mockFileIndex = mock(FileIndex.class); + when(mockFileIndex.getFileName()).thenReturn("1.sst"); + + // setup mock file attributes. create a temp file to get current user/group/permissions so that they + // match with restored files. + File tmpFile = Paths.get(restoreDirBasePath.toString(), "tempfile-" + new Random().nextInt()).toFile(); + tmpFile.createNewFile(); + PosixFileAttributes attrs = Files.readAttributes(tmpFile.toPath(), PosixFileAttributes.class); + // create remote file with different owner than local file + FileMetadata fileMetadata = new FileMetadata(1234L, 1243L, 26, // ctime mtime does not matter. size == 26 + attrs.owner().getName() + "_different", attrs.group().getName(), PosixFilePermissions.toString(attrs.permissions())); + when(mockFileIndex.getFileMetadata()).thenReturn(fileMetadata); + Files.delete(tmpFile.toPath()); // delete so that it doesn't show up in restored dir contents. + + List mockFileBlobs = new ArrayList<>(); + StringBuilder fileContents = new StringBuilder(); + for (int i = 0; i < 26; i++) { + FileBlob mockFileBlob = mock(FileBlob.class); + char c = (char) ('a' + i); + fileContents.append(c); // blob contents == blobId + when(mockFileBlob.getBlobId()).thenReturn(String.valueOf(c)); + when(mockFileBlob.getOffset()).thenReturn(i); + mockFileBlobs.add(mockFileBlob); + } + when(mockFileIndex.getBlobs()).thenReturn(mockFileBlobs); + CRC32 checksum = new CRC32(); + checksum.update(fileContents.toString().getBytes()); + when(mockFileIndex.getChecksum()).thenReturn(checksum.getValue()); + when(mockDirIndex.getFilesPresent()).thenReturn(ImmutableList.of(mockFileIndex)); + + BlobStoreManager mockBlobStoreManager = mock(BlobStoreManager.class); + when(mockBlobStoreManager.get(anyString(), any(OutputStream.class), any(Metadata.class), any(Boolean.class))).thenAnswer( + (Answer>) invocationOnMock -> { + String blobId = invocationOnMock.getArgumentAt(0, String.class); + OutputStream outputStream = invocationOnMock.getArgumentAt(1, OutputStream.class); + // blob contents = blob id + outputStream.write(blobId.getBytes()); + + // force flush so that the checksum calculation later uses the full file contents. + ((FileOutputStream) outputStream).getFD().sync(); + return CompletableFuture.completedFuture(null); + }); + + BlobStoreConfig config = mock(BlobStoreConfig.class); + when(config.shouldCompareFileOwnersOnRestore()).thenReturn(false); + BlobStoreUtil blobStoreUtil = new BlobStoreUtil(mockBlobStoreManager, EXECUTOR, blobStoreConfig, null, null); + blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), mockDirIndex, metadata, false).join(); + + assertTrue( + new DirDiffUtil().areSameDir(Collections.emptySet(), false, config.shouldCompareFileOwnersOnRestore()).test(restoreDirBasePath.toFile(), mockDirIndex)); + } + @Test @Ignore // TODO remove public void testRestoreDirRecreatesEmptyFilesAndDirs() throws IOException { @@ -758,7 +818,7 @@ public void testRestoreDirRecreatesEmptyFilesAndDirs() throws IOException { outputStream.write(blobId.getBytes()); return CompletableFuture.completedFuture(null); }); - boolean result = new DirDiffUtil().areSameDir(new TreeSet<>(), false).test(localSnapshot.toFile(), dirIndex); + boolean result = new DirDiffUtil().areSameDir(new TreeSet<>(), false, true).test(localSnapshot.toFile(), dirIndex); assertFalse(result); //ToDo complete } @@ -788,7 +848,7 @@ public void testRestoreDirCreatesCorrectDirectoryStructure() throws IOException BlobStoreUtil blobStoreUtil = new BlobStoreUtil(mockBlobStoreManager, EXECUTOR, blobStoreConfig, null, null); blobStoreUtil.restoreDir(restoreDirBasePath.toFile(), dirIndex, metadata, false).join(); - assertTrue(new DirDiffUtil().areSameDir(Collections.emptySet(), false).test(restoreDirBasePath.toFile(), dirIndex)); + assertTrue(new DirDiffUtil().areSameDir(Collections.emptySet(), false, true).test(restoreDirBasePath.toFile(), dirIndex)); } /** @@ -950,7 +1010,7 @@ public void testGetCheckpointIndexIgnoresStoresNotInStoresToBackupRestoreSet() { * This test verifies that a retriable exception is retried more than 3 times (default retry is limited to 3 attempts) */ @Test - public void testPutFileRetriedMorethanThreeTimes() throws Exception { + public void testPutFileRetriedMoreThanThreeTimes() throws Exception { SnapshotMetadata snapshotMetadata = new SnapshotMetadata(checkpointId, jobName, jobId, taskName, storeName); Path path = Files.createTempFile("samza-testPutFileChecksum-", ".tmp"); FileUtil fileUtil = new FileUtil(); diff --git a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestDirDiffUtilAreSameFile.java b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestDirDiffUtilAreSameFile.java index 98f75122aa..1bd2ee3932 100644 --- a/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestDirDiffUtilAreSameFile.java +++ b/samza-core/src/test/java/org/apache/samza/storage/blobstore/util/TestDirDiffUtilAreSameFile.java @@ -1,4 +1,4 @@ -/* +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -60,7 +60,7 @@ public class TestDirDiffUtilAreSameFile { @Before public void testSetup() throws Exception { - areSameFile = DirDiffUtil.areSameFile(false); + areSameFile = DirDiffUtil.areSameFile(false, true); createFile(SMALL_FILE); } @@ -129,6 +129,18 @@ public void testAreSameFile_DifferentOwner() { Assert.assertFalse(areSameFile.test(localFile, remoteFile)); } + @Test + public void testAreSameFile_IgnoreDifferentOwner() { + areSameFile = DirDiffUtil.areSameFile(false, false); + remoteFileMetadata = new FileMetadata(0, 0, + localContentLength, + localFileAttrs.owner().getName() + "_different", + localFileAttrs.group().getName(), + PosixFilePermissions.toString(localFileAttrs.permissions())); + remoteFile = new FileIndex(localFile.getName(), new ArrayList<>(), remoteFileMetadata, localChecksum); + Assert.assertTrue(areSameFile.test(localFile, remoteFile)); + } + @Test public void testAreSameFile_DifferentGroup() { remoteFileMetadata = new FileMetadata(0, 0, @@ -197,7 +209,7 @@ public void testAreSameFile_Cache() throws Exception { createFile(LARGE_FILE); for (int i = 0; i < 5; i++) { - BiPredicate areSameFile = DirDiffUtil.areSameFile(false); + BiPredicate areSameFile = DirDiffUtil.areSameFile(false, true); for (int j = 0; j < 20; j++) { localFile = mock(File.class); when(localFile.getName()).thenReturn("name");