Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/main/java/com/instaclustr/esop/azure/AzureBackuper.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected void cleanup() throws Exception {
}

@Override
public FreshenResult freshenRemoteObject(ManifestEntry manifestEntry, final RemoteObjectReference object) throws Exception {
public RefreshingOutcome freshenRemoteObject(ManifestEntry manifestEntry, final RemoteObjectReference object) throws Exception {
final CloudBlockBlob blob = ((AzureRemoteObjectReference) object).blob;

final Instant now = Instant.now();
Expand All @@ -78,16 +78,16 @@ public FreshenResult freshenRemoteObject(ManifestEntry manifestEntry, final Remo
blob.getMetadata().put(DATE_TIME_METADATA_KEY, now.toString());
blob.uploadMetadata();

return FreshenResult.FRESHENED;
return new RefreshingOutcome(FreshenResult.FRESHENED, null);
} else {
return blob.exists() ? FreshenResult.FRESHENED : FreshenResult.UPLOAD_REQUIRED;
return blob.exists() ? new RefreshingOutcome(FreshenResult.FRESHENED, null) : new RefreshingOutcome(FreshenResult.UPLOAD_REQUIRED, null);
}
} catch (final StorageException e) {
if (e.getHttpStatusCode() != 404) {
throw e;
}

return FreshenResult.UPLOAD_REQUIRED;
return new RefreshingOutcome(FreshenResult.UPLOAD_REQUIRED, null);
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/main/java/com/instaclustr/esop/gcp/GCPBackuper.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public RemoteObjectReference objectKeyToNodeAwareRemoteReference(final Path obje
}

@Override
public FreshenResult freshenRemoteObject(ManifestEntry manifestEntry, final RemoteObjectReference object) {
public RefreshingOutcome freshenRemoteObject(ManifestEntry manifestEntry, final RemoteObjectReference object) {
final BlobId blobId = ((GCPRemoteObjectReference) object).blobId;

try {
Expand All @@ -63,21 +63,21 @@ public FreshenResult freshenRemoteObject(ManifestEntry manifestEntry, final Remo
.setTarget(BlobInfo.newBuilder(blobId).build(), Storage.BlobTargetOption.predefinedAcl(BUCKET_OWNER_FULL_CONTROL))
.build());

return FreshenResult.FRESHENED;
return new RefreshingOutcome(FreshenResult.FRESHENED, null);
} else {
final Blob blob = storage.get(blobId);
if (blob == null || !blob.exists()) {
return FreshenResult.UPLOAD_REQUIRED;
return new RefreshingOutcome(FreshenResult.UPLOAD_REQUIRED, null);
} else {
return FreshenResult.FRESHENED;
return new RefreshingOutcome(FreshenResult.FRESHENED, null);
}
}
} catch (final StorageException e) {
if (e.getCode() != 404) {
throw e;
}

return FreshenResult.UPLOAD_REQUIRED;
return new RefreshingOutcome(FreshenResult.UPLOAD_REQUIRED, null);
}
}

Expand Down
17 changes: 9 additions & 8 deletions src/main/java/com/instaclustr/esop/impl/AbstractTracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;

import com.instaclustr.esop.impl.hash.HashService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -48,7 +49,7 @@ public abstract class AbstractTracker<UNIT extends Unit, SESSION extends Session

protected final ListeningExecutorService finisherExecutorService;
protected final OperationsService operationsService;
protected final HashSpec hashSpec;
protected final HashService hashService;

protected final List<UNIT> units = Collections.synchronizedList(new ArrayList<>());
protected final Set<Session<UNIT>> sessions = Collections.synchronizedSet(new HashSet<>());
Expand All @@ -58,10 +59,10 @@ public abstract class AbstractTracker<UNIT extends Unit, SESSION extends Session

public AbstractTracker(final ListeningExecutorService finisherExecutorService,
final OperationsService operationsService,
final HashSpec hashSpec) {
final HashService hashService) {
this.finisherExecutorService = finisherExecutorService;
this.operationsService = operationsService;
this.hashSpec = hashSpec;
this.hashService = hashService;

}

Expand Down Expand Up @@ -90,7 +91,7 @@ public abstract UNIT constructUnitToSubmit(final INTERACTOR interactor,
final ManifestEntry manifestEntry,
final AtomicBoolean shouldCancel,
final String snapshotTag,
final HashSpec hashSpec);
final HashService hashService);

public abstract Session<UNIT> constructSession();

Expand Down Expand Up @@ -131,7 +132,7 @@ public synchronized Session<UNIT> submit(final INTERACTOR interactor,
}

if (alreadySubmitted == null) {
final UNIT unit = constructUnitToSubmit(interactor, entry, operation.getShouldCancel(), snapshotTag, hashSpec);
final UNIT unit = constructUnitToSubmit(interactor, entry, operation.getShouldCancel(), snapshotTag, hashService);

units.add(unit);
futures.put(executorService.submit(unit), unit);
Expand Down Expand Up @@ -226,7 +227,7 @@ public static abstract class Unit implements java.util.concurrent.Callable<Void>
@JsonIgnore
protected String snapshotTag;
@JsonIgnore
protected HashSpec hashSpec;
protected HashService hashService;
protected final ManifestEntry manifestEntry;
protected volatile State state = NOT_STARTED;
protected Throwable throwable = null;
Expand All @@ -235,10 +236,10 @@ public static abstract class Unit implements java.util.concurrent.Callable<Void>

public Unit(final ManifestEntry manifestEntry,
final AtomicBoolean shouldCancel,
final HashSpec hashSpec) {
final HashService hashService) {
this.manifestEntry = manifestEntry;
this.shouldCancel = shouldCancel;
this.hashSpec = hashSpec;
this.hashService = hashService;
}

public enum State {
Expand Down
9 changes: 2 additions & 7 deletions src/main/java/com/instaclustr/esop/impl/SSTableUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public class SSTableUtils {
private static final int SSTABLE_PREFIX_IDX = 1;
private static final int SSTABLE_GENERATION_IDX = 2;
private static final Pattern CHECKSUM_RE = Pattern.compile("^([a-zA-Z0-9]+).*");
private static final HashService hashService = new HashServiceImpl(new HashSpec());

public static String sstableHash(Path path) throws IOException {
final Matcher matcher = SSTABLE_RE.matcher(path.getFileName().toString());
Expand Down Expand Up @@ -105,14 +104,11 @@ public static String calculateChecksum(final Path filePath) throws IOException {
public static Map<String, List<ManifestEntry>> getSSTables(String keyspace,
String table,
Path snapshotDirectory,
Path tableBackupPath,
HashSpec hashSpec) throws IOException {
Path tableBackupPath) throws IOException {
if (!Files.exists(snapshotDirectory)) {
return Collections.emptyMap();
}

final HashService hashService = new HashServiceImpl(hashSpec);

return Files.list(snapshotDirectory)
.flatMap(path -> {
if (isCassandra22SecIndex(path)) {
Expand Down Expand Up @@ -151,12 +147,11 @@ public static Map<String, List<ManifestEntry>> getSSTables(String keyspace,
}

backupPath = backupPath.resolve(hash).resolve(manifestComponentFileName.getFileName());
final String hashOfFile = hashService.hash(sstableComponent);

entries.add(new ManifestEntry(backupPath,
sstableComponent,
ManifestEntry.Type.FILE,
hashOfFile,
null, // don't hash on listing, make it faster
new KeyspaceTable(keyspace, table),
null));
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/instaclustr/esop/impl/Snapshots.java
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ public static Table parse(final String keyspace, final String table, final List<
final Path tablePath = Paths.get("data").resolve(Paths.get(keyspace, table));

for (final Path path : value) {
tb.sstables.putAll(SSTableUtils.getSSTables(keyspace, table, path, tablePath, Snapshots.hashSpec));
tb.sstables.putAll(SSTableUtils.getSSTables(keyspace, table, path, tablePath));
}

final Optional<Path> schemaPath = value.stream().map(p -> p.resolve("schema.cql")).filter(Files::exists).findFirst();
Expand Down
14 changes: 13 additions & 1 deletion src/main/java/com/instaclustr/esop/impl/backup/Backuper.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,24 @@ protected Backuper(final BaseBackupOperationRequest request) {
this.retrier = RetrierFactory.getRetrier(request.retry);
}

public static class RefreshingOutcome
{
public FreshenResult result;
public String hash;

public RefreshingOutcome(FreshenResult result, String hash)
{
this.result = result;
this.hash = hash;
}
}

public enum FreshenResult {
FRESHENED,
UPLOAD_REQUIRED
}

public abstract FreshenResult freshenRemoteObject(ManifestEntry manifestEntry, final RemoteObjectReference object) throws Exception;
public abstract RefreshingOutcome freshenRemoteObject(ManifestEntry manifestEntry, final RemoteObjectReference object) throws Exception;

public abstract void uploadFile(final ManifestEntry manifestEntry,
final InputStream localFileStream,
Expand Down
32 changes: 22 additions & 10 deletions src/main/java/com/instaclustr/esop/impl/backup/UploadTracker.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.RateLimiter;

import com.instaclustr.esop.impl.hash.HashService;
import com.instaclustr.esop.impl.hash.HashServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -45,17 +47,17 @@ public class UploadTracker extends AbstractTracker<UploadUnit, UploadSession, Ba
@Inject
public UploadTracker(final @UploadingFinisher ListeningExecutorService finisherExecutorService,
final OperationsService operationsService,
final HashSpec hashSpec) {
super(finisherExecutorService, operationsService, hashSpec);
final HashService hashService) {
super(finisherExecutorService, operationsService, hashService);
}

@Override
public UploadUnit constructUnitToSubmit(final Backuper backuper,
final ManifestEntry manifestEntry,
final AtomicBoolean shouldCancel,
final String snapshotTag,
final HashSpec hashSpec) {
return new UploadUnit(backuper, manifestEntry, shouldCancel, snapshotTag, hashSpec);
final HashService hashService) {
return new UploadUnit(backuper, manifestEntry, shouldCancel, snapshotTag, hashService);
}

@Override
Expand Down Expand Up @@ -96,8 +98,8 @@ public UploadUnit(final Backuper backuper,
final ManifestEntry manifestEntry,
final AtomicBoolean shouldCancel,
final String snapshotTag,
final HashSpec hashSpec) {
super(manifestEntry, shouldCancel, hashSpec);
final HashService hashService) {
super(manifestEntry, shouldCancel, hashService);
this.backuper = backuper;
this.snapshotTag = snapshotTag;
}
Expand All @@ -118,23 +120,29 @@ public Void call() {
final RemoteObjectReference ref = getRemoteObjectReference(manifestEntry.objectKey);

// try to refresh object / decide if it is required to upload it
Callable<Boolean> condition = () -> {
Callable<Backuper.RefreshingOutcome> condition = () -> {
try {
return backuper.freshenRemoteObject(manifestEntry, ref) == FRESHENED;

return backuper.freshenRemoteObject(manifestEntry, ref);
} catch (final Exception ex) {
throw new RetriableException("Failed to refresh remote object" + ref.objectKey, ex);
}
};

if (manifestEntry.type != MANIFEST_FILE && getRetrier(backuper.request.retry).submit(condition)) {
logger.info("{}skipping the upload of alredy uploaded file {}",
Backuper.RefreshingOutcome refreshmentOutcome = getRetrier(backuper.request.retry).submit(condition);

if (manifestEntry.type != MANIFEST_FILE && refreshmentOutcome.result == FRESHENED) {
logger.info("{}skipping the upload of already uploaded file {}",
snapshotTag != null ? "Snapshot " + snapshotTag + " - " : "",
ref.canonicalPath);

state = State.FINISHED;
return null;
}

if (refreshmentOutcome.hash != null)
manifestEntry.hash = refreshmentOutcome.hash;

// do the upload
getRetrier(backuper.request.retry).submit(() -> {
try (final InputStream fileStream = new BufferedInputStream(new FileInputStream(manifestEntry.localFile.toFile()))) {
Expand All @@ -144,6 +152,10 @@ public Void call() {
snapshotTag != null ? "Snapshot " + snapshotTag + " - " : "",
manifestEntry.objectKey,
DataSize.bytesToHumanReadable(manifestEntry.size)));

if (manifestEntry.hash == null)
manifestEntry.hash = hashService.hash(manifestEntry.localFile);

// never encrypt manifest
if (manifestEntry.type == MANIFEST_FILE) {
backuper.uploadFile(manifestEntry, rateLimitedStream, ref);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ public void verify(final Path path, final String expectedHash) throws HashVerifi

final String hashOfFile = getHash(path.toAbsolutePath().toFile());

if (hashOfFile == null)
return;

if (!hashOfFile.equals(expectedHash)) {
throw new HashVerificationException(format("hash of %s (%s) does not match with expected hash %s",
path,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.google.common.util.concurrent.ListeningExecutorService;

import com.instaclustr.esop.impl.hash.HashService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -17,8 +18,6 @@
import com.instaclustr.esop.impl.ManifestEntry.Type;
import com.instaclustr.esop.impl.RemoteObjectReference;
import com.instaclustr.esop.impl.hash.HashService.HashVerificationException;
import com.instaclustr.esop.impl.hash.HashServiceImpl;
import com.instaclustr.esop.impl.hash.HashSpec;
import com.instaclustr.esop.impl.restore.DownloadTracker.DownloadSession;
import com.instaclustr.esop.impl.restore.DownloadTracker.DownloadUnit;
import com.instaclustr.esop.impl.restore.RestoreModules.DownloadingFinisher;
Expand All @@ -36,17 +35,17 @@ public class DownloadTracker extends AbstractTracker<DownloadUnit, DownloadSessi
@Inject
public DownloadTracker(final @DownloadingFinisher ListeningExecutorService finisherExecutorService,
final OperationsService operationsService,
final HashSpec hashSpec) {
super(finisherExecutorService, operationsService, hashSpec);
final HashService hashService) {
super(finisherExecutorService, operationsService, hashService);
}

@Override
public DownloadUnit constructUnitToSubmit(final Restorer restorer,
final ManifestEntry manifestEntry,
final AtomicBoolean shouldCancel,
final String snapshotTag,
final HashSpec hashSpec) {
return new DownloadUnit(restorer, manifestEntry, shouldCancel, snapshotTag, hashSpec);
final HashService hashService) {
return new DownloadUnit(restorer, manifestEntry, shouldCancel, snapshotTag, hashService);
}

@Override
Expand Down Expand Up @@ -80,8 +79,8 @@ public DownloadUnit(final Restorer restorer,
final ManifestEntry manifestEntry,
final AtomicBoolean shouldCancel,
final String snapshotTag,
final HashSpec hashSpec) {
super(manifestEntry, shouldCancel, hashSpec);
final HashService hashService) {
super(manifestEntry, shouldCancel, hashService);
this.restorer = restorer;
super.snapshotTag = snapshotTag;
}
Expand All @@ -106,7 +105,7 @@ public Void call() {
// hash upon downloading
try {
if (manifestEntry.type == Type.FILE) {
new HashServiceImpl(hashSpec).verify(localPath, manifestEntry.hash);
hashService.verify(localPath, manifestEntry.hash);
}
} catch (final HashVerificationException ex) {
// delete it if has is wrong so on the next try, it will be missing and we will download it again
Expand All @@ -123,7 +122,7 @@ public Void call() {
logger.info(String.format("Skipping download of file %s to %s, file already exists locally.",
remoteObjectReference.getObjectKey(), manifestEntry.localFile));
// if it exists, verify its hash to be sure it was not altered
new HashServiceImpl(hashSpec).verify(localPath, manifestEntry.hash);
hashService.verify(localPath, manifestEntry.hash);
state = FINISHED;
} else {
// if it exists and manifest does not have hash field, consider it to be finished without any check
Expand Down
Loading