diff --git a/src/main/java/com/instaclustr/esop/azure/AzureBackuper.java b/src/main/java/com/instaclustr/esop/azure/AzureBackuper.java index 76e44b23..b20710d2 100644 --- a/src/main/java/com/instaclustr/esop/azure/AzureBackuper.java +++ b/src/main/java/com/instaclustr/esop/azure/AzureBackuper.java @@ -68,7 +68,7 @@ protected void cleanup() throws Exception { } @Override - public FreshenResult freshenRemoteObject(ManifestEntry manifestEntry, final RemoteObjectReference object) throws Exception { + public RefreshingOutcome freshenRemoteObject(ManifestEntry manifestEntry, final RemoteObjectReference object) throws Exception { final CloudBlockBlob blob = ((AzureRemoteObjectReference) object).blob; final Instant now = Instant.now(); @@ -78,16 +78,16 @@ public FreshenResult freshenRemoteObject(ManifestEntry manifestEntry, final Remo blob.getMetadata().put(DATE_TIME_METADATA_KEY, now.toString()); blob.uploadMetadata(); - return FreshenResult.FRESHENED; + return new RefreshingOutcome(FreshenResult.FRESHENED, null); } else { - return blob.exists() ? FreshenResult.FRESHENED : FreshenResult.UPLOAD_REQUIRED; + return blob.exists() ? new RefreshingOutcome(FreshenResult.FRESHENED, null) : new RefreshingOutcome(FreshenResult.UPLOAD_REQUIRED, null); } } catch (final StorageException e) { if (e.getHttpStatusCode() != 404) { throw e; } - return FreshenResult.UPLOAD_REQUIRED; + return new RefreshingOutcome(FreshenResult.UPLOAD_REQUIRED, null); } } diff --git a/src/main/java/com/instaclustr/esop/gcp/GCPBackuper.java b/src/main/java/com/instaclustr/esop/gcp/GCPBackuper.java index d5acb1c7..ef03b207 100644 --- a/src/main/java/com/instaclustr/esop/gcp/GCPBackuper.java +++ b/src/main/java/com/instaclustr/esop/gcp/GCPBackuper.java @@ -53,7 +53,7 @@ public RemoteObjectReference objectKeyToNodeAwareRemoteReference(final Path obje } @Override - public FreshenResult freshenRemoteObject(ManifestEntry manifestEntry, final RemoteObjectReference object) { + public RefreshingOutcome freshenRemoteObject(ManifestEntry manifestEntry, final RemoteObjectReference object) { final BlobId blobId = ((GCPRemoteObjectReference) object).blobId; try { @@ -63,13 +63,13 @@ public FreshenResult freshenRemoteObject(ManifestEntry manifestEntry, final Remo .setTarget(BlobInfo.newBuilder(blobId).build(), Storage.BlobTargetOption.predefinedAcl(BUCKET_OWNER_FULL_CONTROL)) .build()); - return FreshenResult.FRESHENED; + return new RefreshingOutcome(FreshenResult.FRESHENED, null); } else { final Blob blob = storage.get(blobId); if (blob == null || !blob.exists()) { - return FreshenResult.UPLOAD_REQUIRED; + return new RefreshingOutcome(FreshenResult.UPLOAD_REQUIRED, null); } else { - return FreshenResult.FRESHENED; + return new RefreshingOutcome(FreshenResult.FRESHENED, null); } } } catch (final StorageException e) { @@ -77,7 +77,7 @@ public FreshenResult freshenRemoteObject(ManifestEntry manifestEntry, final Remo throw e; } - return FreshenResult.UPLOAD_REQUIRED; + return new RefreshingOutcome(FreshenResult.UPLOAD_REQUIRED, null); } } diff --git a/src/main/java/com/instaclustr/esop/impl/AbstractTracker.java b/src/main/java/com/instaclustr/esop/impl/AbstractTracker.java index 21e340ff..962b9a25 100644 --- a/src/main/java/com/instaclustr/esop/impl/AbstractTracker.java +++ b/src/main/java/com/instaclustr/esop/impl/AbstractTracker.java @@ -19,6 +19,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; +import com.instaclustr.esop.impl.hash.HashService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +49,7 @@ public abstract class AbstractTracker units = Collections.synchronizedList(new ArrayList<>()); protected final Set> sessions = Collections.synchronizedSet(new HashSet<>()); @@ -58,10 +59,10 @@ public abstract class AbstractTracker constructSession(); @@ -131,7 +132,7 @@ public synchronized Session submit(final INTERACTOR interactor, } if (alreadySubmitted == null) { - final UNIT unit = constructUnitToSubmit(interactor, entry, operation.getShouldCancel(), snapshotTag, hashSpec); + final UNIT unit = constructUnitToSubmit(interactor, entry, operation.getShouldCancel(), snapshotTag, hashService); units.add(unit); futures.put(executorService.submit(unit), unit); @@ -226,7 +227,7 @@ public static abstract class Unit implements java.util.concurrent.Callable @JsonIgnore protected String snapshotTag; @JsonIgnore - protected HashSpec hashSpec; + protected HashService hashService; protected final ManifestEntry manifestEntry; protected volatile State state = NOT_STARTED; protected Throwable throwable = null; @@ -235,10 +236,10 @@ public static abstract class Unit implements java.util.concurrent.Callable public Unit(final ManifestEntry manifestEntry, final AtomicBoolean shouldCancel, - final HashSpec hashSpec) { + final HashService hashService) { this.manifestEntry = manifestEntry; this.shouldCancel = shouldCancel; - this.hashSpec = hashSpec; + this.hashService = hashService; } public enum State { diff --git a/src/main/java/com/instaclustr/esop/impl/SSTableUtils.java b/src/main/java/com/instaclustr/esop/impl/SSTableUtils.java index 9a28a989..13d873b9 100644 --- a/src/main/java/com/instaclustr/esop/impl/SSTableUtils.java +++ b/src/main/java/com/instaclustr/esop/impl/SSTableUtils.java @@ -44,7 +44,6 @@ public class SSTableUtils { private static final int SSTABLE_PREFIX_IDX = 1; private static final int SSTABLE_GENERATION_IDX = 2; private static final Pattern CHECKSUM_RE = Pattern.compile("^([a-zA-Z0-9]+).*"); - private static final HashService hashService = new HashServiceImpl(new HashSpec()); public static String sstableHash(Path path) throws IOException { final Matcher matcher = SSTABLE_RE.matcher(path.getFileName().toString()); @@ -105,14 +104,11 @@ public static String calculateChecksum(final Path filePath) throws IOException { public static Map> getSSTables(String keyspace, String table, Path snapshotDirectory, - Path tableBackupPath, - HashSpec hashSpec) throws IOException { + Path tableBackupPath) throws IOException { if (!Files.exists(snapshotDirectory)) { return Collections.emptyMap(); } - final HashService hashService = new HashServiceImpl(hashSpec); - return Files.list(snapshotDirectory) .flatMap(path -> { if (isCassandra22SecIndex(path)) { @@ -151,12 +147,11 @@ public static Map> getSSTables(String keyspace, } backupPath = backupPath.resolve(hash).resolve(manifestComponentFileName.getFileName()); - final String hashOfFile = hashService.hash(sstableComponent); entries.add(new ManifestEntry(backupPath, sstableComponent, ManifestEntry.Type.FILE, - hashOfFile, + null, // don't hash on listing, make it faster new KeyspaceTable(keyspace, table), null)); } diff --git a/src/main/java/com/instaclustr/esop/impl/Snapshots.java b/src/main/java/com/instaclustr/esop/impl/Snapshots.java index b698cc16..2ba484dd 100644 --- a/src/main/java/com/instaclustr/esop/impl/Snapshots.java +++ b/src/main/java/com/instaclustr/esop/impl/Snapshots.java @@ -571,7 +571,7 @@ public static Table parse(final String keyspace, final String table, final List< final Path tablePath = Paths.get("data").resolve(Paths.get(keyspace, table)); for (final Path path : value) { - tb.sstables.putAll(SSTableUtils.getSSTables(keyspace, table, path, tablePath, Snapshots.hashSpec)); + tb.sstables.putAll(SSTableUtils.getSSTables(keyspace, table, path, tablePath)); } final Optional schemaPath = value.stream().map(p -> p.resolve("schema.cql")).filter(Files::exists).findFirst(); diff --git a/src/main/java/com/instaclustr/esop/impl/backup/Backuper.java b/src/main/java/com/instaclustr/esop/impl/backup/Backuper.java index 74fe108d..124fc2a2 100644 --- a/src/main/java/com/instaclustr/esop/impl/backup/Backuper.java +++ b/src/main/java/com/instaclustr/esop/impl/backup/Backuper.java @@ -19,12 +19,24 @@ protected Backuper(final BaseBackupOperationRequest request) { this.retrier = RetrierFactory.getRetrier(request.retry); } + public static class RefreshingOutcome + { + public FreshenResult result; + public String hash; + + public RefreshingOutcome(FreshenResult result, String hash) + { + this.result = result; + this.hash = hash; + } + } + public enum FreshenResult { FRESHENED, UPLOAD_REQUIRED } - public abstract FreshenResult freshenRemoteObject(ManifestEntry manifestEntry, final RemoteObjectReference object) throws Exception; + public abstract RefreshingOutcome freshenRemoteObject(ManifestEntry manifestEntry, final RemoteObjectReference object) throws Exception; public abstract void uploadFile(final ManifestEntry manifestEntry, final InputStream localFileStream, diff --git a/src/main/java/com/instaclustr/esop/impl/backup/UploadTracker.java b/src/main/java/com/instaclustr/esop/impl/backup/UploadTracker.java index ec0fda26..f7d8ddb8 100644 --- a/src/main/java/com/instaclustr/esop/impl/backup/UploadTracker.java +++ b/src/main/java/com/instaclustr/esop/impl/backup/UploadTracker.java @@ -12,6 +12,8 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.RateLimiter; +import com.instaclustr.esop.impl.hash.HashService; +import com.instaclustr.esop.impl.hash.HashServiceImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,8 +47,8 @@ public class UploadTracker extends AbstractTracker condition = () -> { + Callable condition = () -> { try { - return backuper.freshenRemoteObject(manifestEntry, ref) == FRESHENED; + + return backuper.freshenRemoteObject(manifestEntry, ref); } catch (final Exception ex) { throw new RetriableException("Failed to refresh remote object" + ref.objectKey, ex); } }; - if (manifestEntry.type != MANIFEST_FILE && getRetrier(backuper.request.retry).submit(condition)) { - logger.info("{}skipping the upload of alredy uploaded file {}", + Backuper.RefreshingOutcome refreshmentOutcome = getRetrier(backuper.request.retry).submit(condition); + + if (manifestEntry.type != MANIFEST_FILE && refreshmentOutcome.result == FRESHENED) { + logger.info("{}skipping the upload of already uploaded file {}", snapshotTag != null ? "Snapshot " + snapshotTag + " - " : "", ref.canonicalPath); @@ -135,6 +140,9 @@ public Void call() { return null; } + if (refreshmentOutcome.hash != null) + manifestEntry.hash = refreshmentOutcome.hash; + // do the upload getRetrier(backuper.request.retry).submit(() -> { try (final InputStream fileStream = new BufferedInputStream(new FileInputStream(manifestEntry.localFile.toFile()))) { @@ -144,6 +152,10 @@ public Void call() { snapshotTag != null ? "Snapshot " + snapshotTag + " - " : "", manifestEntry.objectKey, DataSize.bytesToHumanReadable(manifestEntry.size))); + + if (manifestEntry.hash == null) + manifestEntry.hash = hashService.hash(manifestEntry.localFile); + // never encrypt manifest if (manifestEntry.type == MANIFEST_FILE) { backuper.uploadFile(manifestEntry, rateLimitedStream, ref); diff --git a/src/main/java/com/instaclustr/esop/impl/hash/HashServiceImpl.java b/src/main/java/com/instaclustr/esop/impl/hash/HashServiceImpl.java index edcdce81..54b51b2d 100644 --- a/src/main/java/com/instaclustr/esop/impl/hash/HashServiceImpl.java +++ b/src/main/java/com/instaclustr/esop/impl/hash/HashServiceImpl.java @@ -69,6 +69,9 @@ public void verify(final Path path, final String expectedHash) throws HashVerifi final String hashOfFile = getHash(path.toAbsolutePath().toFile()); + if (hashOfFile == null) + return; + if (!hashOfFile.equals(expectedHash)) { throw new HashVerificationException(format("hash of %s (%s) does not match with expected hash %s", path, diff --git a/src/main/java/com/instaclustr/esop/impl/restore/DownloadTracker.java b/src/main/java/com/instaclustr/esop/impl/restore/DownloadTracker.java index 62e5673e..c6f4adc7 100644 --- a/src/main/java/com/instaclustr/esop/impl/restore/DownloadTracker.java +++ b/src/main/java/com/instaclustr/esop/impl/restore/DownloadTracker.java @@ -7,6 +7,7 @@ import com.google.common.util.concurrent.ListeningExecutorService; +import com.instaclustr.esop.impl.hash.HashService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,8 +18,6 @@ import com.instaclustr.esop.impl.ManifestEntry.Type; import com.instaclustr.esop.impl.RemoteObjectReference; import com.instaclustr.esop.impl.hash.HashService.HashVerificationException; -import com.instaclustr.esop.impl.hash.HashServiceImpl; -import com.instaclustr.esop.impl.hash.HashSpec; import com.instaclustr.esop.impl.restore.DownloadTracker.DownloadSession; import com.instaclustr.esop.impl.restore.DownloadTracker.DownloadUnit; import com.instaclustr.esop.impl.restore.RestoreModules.DownloadingFinisher; @@ -36,8 +35,8 @@ public class DownloadTracker extends AbstractTracker manifestEntries) { multipartAbortionService.abortOrphanedMultiparts(manifestEntries, request); } + private String extractHash(List tags) + { + if (tags == null || tags.isEmpty()) + return null; + + for (Tag tag : tags) + { + if (tag.key().equals("fullObjectChecksum")) + { + return tag.value(); + } + } + + return null; + } + @Override - public FreshenResult freshenRemoteObject(ManifestEntry manifestEntry, RemoteObjectReference object) { + public RefreshingOutcome freshenRemoteObject(ManifestEntry manifestEntry, RemoteObjectReference object) { List tags; + String hash; try { tags = s3Clients.getNonEncryptingClient() .getObjectTagging(GetObjectTaggingRequest.builder() .bucket(request.storageLocation.bucket) .key(object.canonicalPath) .build()).tagSet(); + + hash = extractHash(tags); } catch (S3Exception ex) { - return FreshenResult.UPLOAD_REQUIRED; + return new RefreshingOutcome(FreshenResult.UPLOAD_REQUIRED, null); } // If kms key was specified, it means we want to encrypt @@ -125,28 +144,35 @@ public FreshenResult freshenRemoteObject(ManifestEntry manifestEntry, RemoteObje String kmsKey = s3Clients.getKMSKeyOfEncryptedClient().get(); Tag kmsKeyTag = Tag.builder().key("kmsKey").value(kmsKey).build(); if (!tags.contains(kmsKeyTag)) { - return FreshenResult.UPLOAD_REQUIRED; + return new RefreshingOutcome(FreshenResult.UPLOAD_REQUIRED, null); } // However, if we have not set kmsKey as we do not want to encrypt // but remote tag contains kmsKey, then we need to basically re-upload // a file, but it will not be encrypted. } else if (!tags.isEmpty()) { if (tags.stream().anyMatch(t -> t.key().equals("kmsKey"))) { - return FreshenResult.UPLOAD_REQUIRED; + return new RefreshingOutcome(FreshenResult.UPLOAD_REQUIRED, null); } } - return FreshenResult.FRESHENED; + return new RefreshingOutcome(FreshenResult.FRESHENED, hash); } @Override public void uploadFile(ManifestEntry manifestEntry, InputStream localFileStream, RemoteObjectReference objectReference) { logger.info("Uploading {}", objectReference.canonicalPath); + + Tag.Builder tagBuilder = Tag.builder(); + + if (manifestEntry.hash != null) { + tagBuilder.key("fullObjectChecksum").value(manifestEntry.hash); + } + uploadFile(s3Clients.getNonEncryptingClient(), manifestEntry, localFileStream, objectReference, - Tagging.builder().build()); + Tagging.builder().tagSet(tagBuilder.build()).build()); } @Override @@ -161,16 +187,16 @@ public void uploadEncryptedFile(ManifestEntry manifestEntry, InputStream localFi assert s3Clients.getEncryptingClient().isPresent() : "encrypting client is not present!"; assert s3Clients.getKMSKeyOfEncryptedClient().isPresent() : "kms key is not present!"; + Tag.Builder tagBuilder = Tag.builder().key("kmsKey").value(s3Clients.getKMSKeyOfEncryptedClient().get()); + if (manifestEntry.hash != null) { + tagBuilder.key("fullObjectChecksum").value(manifestEntry.hash); + } + uploadFile(s3Clients.getEncryptingClient().get(), manifestEntry, localFileStream, objectReference, - Tagging.builder() - .tagSet(Tag.builder() - .key("kmsKey") - .value(s3Clients.getKMSKeyOfEncryptedClient().get()) - .build()) - .build()); + Tagging.builder().tagSet(tagBuilder.build()).build()); manifestEntry.kmsKeyId = s3Clients.getKMSKeyOfEncryptedClient().get(); @@ -235,8 +261,6 @@ private void uploadFile(S3Client s3Client, String uploadId = multipartUploadResponse.uploadId(); - MessageDigest sha256 = prepareMessageDigest(); - try { long partSize = Long.parseLong(System.getProperty("upload.max.part.size", Long.toString(100 * 1024 * 1024))); @@ -270,8 +294,6 @@ private void uploadFile(S3Client s3Client, .eTag(partResponse.eTag()) .checksumSHA256(partResponse.checksumSHA256()) .build()); - - sha256.update(byteBuffer); } // Complete the multipart upload @@ -302,20 +324,19 @@ private void uploadFile(S3Client s3Client, logger.debug("Object under key " + objectReference.canonicalPath + " exists"); - Tag checksumTag = Tag.builder() - .key("fullObjectChecksum") - .value(HashSpec.HashAlgorithm.SHA_256.getHasher().getHash(sha256.digest())) - .build(); + if (manifestEntry.hash != null) { + Tag checksumTag = Tag.builder().key("fullObjectChecksum").value(manifestEntry.hash).build(); - PutObjectTaggingResponse putObjectTaggingResponse = s3Client.putObjectTagging(PutObjectTaggingRequest.builder() - .bucket(request.storageLocation.bucket) - .key(objectReference.canonicalPath) - .tagging(Tagging.builder().tagSet(checksumTag).build()).build()); + PutObjectTaggingResponse putObjectTaggingResponse = s3Client.putObjectTagging(PutObjectTaggingRequest.builder() + .bucket(request.storageLocation.bucket) + .key(objectReference.canonicalPath) + .tagging(Tagging.builder().tagSet(checksumTag).build()).build()); - if (!putObjectTaggingResponse.sdkHttpResponse().isSuccessful()) { - throw new RuntimeException(String.format("Unsuccessful tagging of %s with checksum, upload id %s", objectReference.canonicalPath, uploadId)); - } else { - logger.debug("Tagged {} with {}", objectReference.canonicalPath, checksumTag.toString()); + if (!putObjectTaggingResponse.sdkHttpResponse().isSuccessful()) { + throw new RuntimeException(String.format("Unsuccessful tagging of %s with checksum, upload id %s", objectReference.canonicalPath, uploadId)); + } else { + logger.debug("Tagged {} with {}", objectReference.canonicalPath, checksumTag.toString()); + } } if (s3Clients.hasEncryptingClient()) { @@ -329,7 +350,6 @@ private void uploadFile(S3Client s3Client, .build()); manifestEntry.size = objectAttributes.objectSize(); - manifestEntry.hash = Base64.getEncoder().encodeToString(sha256.digest()); } catch (Throwable t) { logger.warn("Unable to get attribute {} for key {} by GetObjectAttributes request. Please check your permissions.", diff --git a/src/test/java/com/instaclustr/esop/backup/BackupRestoreTest.java b/src/test/java/com/instaclustr/esop/backup/BackupRestoreTest.java index b5ee470b..deaa5837 100644 --- a/src/test/java/com/instaclustr/esop/backup/BackupRestoreTest.java +++ b/src/test/java/com/instaclustr/esop/backup/BackupRestoreTest.java @@ -17,7 +17,6 @@ import com.instaclustr.cassandra.CassandraVersion; import com.instaclustr.esop.impl.ManifestEntry; import com.instaclustr.esop.impl.SSTableUtils; -import com.instaclustr.esop.impl.hash.HashSpec; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeTest; @@ -85,11 +84,11 @@ public void testSSTableLister() throws Exception { final String keyspace = "keyspace1"; final String table1 = "table1"; final Path table1Path = tempDirs.get(testFileConfig.cassandraVersion.toString()).resolve("data/" + keyspace + "/" + table1); - Map> sstables = SSTableUtils.getSSTables(keyspace, table1, table1Path, backupRoot.resolve(table1Path.getFileName()), new HashSpec()); + Map> sstables = SSTableUtils.getSSTables(keyspace, table1, table1Path, backupRoot.resolve(table1Path.getFileName())); final String table2 = "table2"; final Path table2Path = tempDirs.get(testFileConfig.cassandraVersion.toString()).resolve("data/" + keyspace + "/" + table2); - sstables.putAll(SSTableUtils.getSSTables(keyspace, table2, table2Path, backupRoot.resolve(table2Path.getFileName()), new HashSpec())); + sstables.putAll(SSTableUtils.getSSTables(keyspace, table2, table2Path, backupRoot.resolve(table2Path.getFileName()))); Map manifestMap = new HashMap<>(); for (ManifestEntry e : sstables.values().stream().flatMap(Collection::stream).collect(Collectors.toList())) { diff --git a/src/test/java/com/instaclustr/esop/backup/embedded/UploadTrackerTest.java b/src/test/java/com/instaclustr/esop/backup/embedded/UploadTrackerTest.java index 52014e88..2f9b06db 100644 --- a/src/test/java/com/instaclustr/esop/backup/embedded/UploadTrackerTest.java +++ b/src/test/java/com/instaclustr/esop/backup/embedded/UploadTrackerTest.java @@ -37,6 +37,8 @@ import com.instaclustr.esop.impl.backup.UploadTracker; import com.instaclustr.esop.impl.backup.coordination.ClearSnapshotOperation; import com.instaclustr.esop.impl.backup.coordination.TakeSnapshotOperation; +import com.instaclustr.esop.impl.hash.HashService; +import com.instaclustr.esop.impl.hash.HashServiceImpl; import com.instaclustr.esop.impl.hash.HashSpec; import com.instaclustr.esop.local.LocalFileBackuper; import com.instaclustr.esop.local.LocalFileModule; @@ -122,15 +124,15 @@ public void testUploadTracker() throws Exception { final ListeningExecutorService finisher = new Executors.FixedTasksExecutorSupplier().get(10); - uploadTracker = new UploadTracker(finisher, operationsService, new HashSpec()) { + uploadTracker = new UploadTracker(finisher, operationsService, new HashServiceImpl(new HashSpec())) { // override for testing purposes @Override public UploadUnit constructUnitToSubmit(final Backuper backuper, final ManifestEntry manifestEntry, final AtomicBoolean shouldCancel, final String snapshotTag, - final HashSpec hashSpec) { - return new TestingUploadUnit(wait, backuper, manifestEntry, shouldCancel, snapshotTag, hashSpec); + final HashService hashService) { + return new TestingUploadUnit(wait, backuper, manifestEntry, shouldCancel, snapshotTag, hashService); } }; @@ -277,8 +279,8 @@ public TestingUploadUnit(final AtomicBoolean wait, final ManifestEntry manifestEntry, final AtomicBoolean shouldCancel, final String snapshotTag, - final HashSpec hashSpec) { - super(backuper, manifestEntry, shouldCancel, snapshotTag, hashSpec); + final HashService hashService) { + super(backuper, manifestEntry, shouldCancel, snapshotTag, hashService); this.wait = wait; }