Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions crates/core/src/kernel/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,19 +618,34 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
let commit_or_bytes = this.commit_or_bytes;

if this.table_data.is_none() {
this.log_store
match this
.log_store
.write_commit_entry(0, commit_or_bytes.clone(), this.operation_id)
.await?;
return Ok(PostCommit {
version: 0,
data: this.data,
create_checkpoint: false,
cleanup_expired_logs: None,
log_store: this.log_store,
table_data: None,
custom_execute_handler: this.post_commit_hook_handler,
metrics: CommitMetrics { num_retries: 0 },
});
.await
{
Ok(_) => {
return Ok(PostCommit {
version: 0,
data: this.data,
create_checkpoint: false,
cleanup_expired_logs: None,
log_store: this.log_store,
table_data: None,
custom_execute_handler: this.post_commit_hook_handler,
metrics: CommitMetrics { num_retries: 0 },
})
}
Err(TransactionError::VersionAlreadyExists(0)) => {
// this can happen if the table has been created by another writer since the `this.table_data.is_none()` check above
// therefore, we need to re-download the table state
this.table_data = this
.log_store
.read_commit_entry(this.log_store.get_latest_version(0).await?)
.await?;
Ok(())
}
Err(e) => Err(e),
}?;
}

// unwrap() is safe here due to the above check
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to also load the snapshot now when VersionAlreadyExists was thrown the first time in it's current state this will panic.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uhh right. Done.
Can we test this? Seems tricky since we'd have to perfectly time writing the 0 version from another writer.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also initialize attempt_number to 2 in this case?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be easier to test in Rust

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test for this please? There are some concurrency tests, somewhere in the rust codebase which you can use as template

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll look into that. A more direct pointer would be appreciated

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delta-rs/crates/test/src/concurrent.rs here you can find some related stuff

Expand Down
Loading