Skip to content

[DO NOT MERGE] Testing #1500 #1518

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions dev.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
name: gh-ost

env:
TESTCONTAINERS_DOCKER_SOCKET_OVERRIDE: /var/run/docker.sock
TESTCONTAINERS_RYUK_DISABLED: "true"

up:
- go:
version: "1.22.12"
- podman
- custom:
name: Go Dependencies
met?: go mod download
Copy link
Preview

Copilot AI Mar 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key 'met?' is inconsistent with the key 'meet' on the following line, which may lead to configuration errors. Both keys should be identical if they are intended to represent the same check.

Copilot uses AI. Check for mistakes.

meet: echo 'go mod failed to download dependencies'; false

commands:
test:
desc: Run all the tests.
run: |
export DOCKER_HOST=unix://$(podman machine inspect --format '{{.ConnectionInfo.PodmanSocket.Path}}')
script/test
2 changes: 2 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ type MigrationContext struct {
HooksHintOwner string
HooksHintToken string
HooksStatusIntervalSec int64
PanicOnWarnings bool

DropServeSocket bool
ServeSocketFile string
Expand Down Expand Up @@ -231,6 +232,7 @@ type MigrationContext struct {
ColumnRenameMap map[string]string
DroppedColumnsMap map[string]bool
MappedSharedColumns *sql.ColumnList
MigrationLastInsertSQLWarnings []string
MigrationRangeMinValues *sql.ColumnValues
MigrationRangeMaxValues *sql.ColumnValues
Iteration int64
Expand Down
1 change: 1 addition & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func main() {
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 10-100,000)")
dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-100)")
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
flag.BoolVar(&migrationContext.PanicOnWarnings, "panic-on-warnings", false, "Panic when SQL warnings are encountered when copying a batch indicating data loss")
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout) or attempting instant DDL")
niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after")

Expand Down
51 changes: 42 additions & 9 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ func (this *Applier) ReadMigrationRangeValues() error {
// which will be used for copying the next chunk of rows. Ir returns "false" if there is
// no further chunk to work through, i.e. we're past the last chunk and are done with
// iterating the range (and this done with copying row chunks)
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) {
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, expectedRowCount int64, err error) {
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues
if this.migrationContext.MigrationIterationRangeMinValues == nil {
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
Expand All @@ -683,32 +683,36 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
)
if err != nil {
return hasFurtherRange, err
return hasFurtherRange, expectedRowCount, err
}

rows, err := this.db.Query(query, explodedArgs...)
if err != nil {
return hasFurtherRange, err
return hasFurtherRange, expectedRowCount, err
}
defer rows.Close()

iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len())
iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len() + 1)
for rows.Next() {
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
return hasFurtherRange, err
return hasFurtherRange, expectedRowCount, err
}
hasFurtherRange = true

expectedRowCount = (*iterationRangeMaxValues.ValuesPointers[len(iterationRangeMaxValues.ValuesPointers)-1].(*interface{})).(int64)
iterationRangeMaxValues = sql.ToColumnValues(iterationRangeMaxValues.AbstractValues()[:len(iterationRangeMaxValues.AbstractValues())-1])

hasFurtherRange = expectedRowCount > 0
}
if err = rows.Err(); err != nil {
return hasFurtherRange, err
return hasFurtherRange, expectedRowCount, err
}
if hasFurtherRange {
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
return hasFurtherRange, nil
return hasFurtherRange, expectedRowCount, nil
}
}
this.migrationContext.Log.Debugf("Iteration complete: no further range to iterate")
return hasFurtherRange, nil
return hasFurtherRange, expectedRowCount, nil
}

// ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where
Expand Down Expand Up @@ -753,6 +757,35 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
if err != nil {
return nil, err
}

if this.migrationContext.PanicOnWarnings {
//nolint:execinquery
rows, err := tx.Query("SHOW WARNINGS")
if err != nil {
return nil, err
}
defer rows.Close()
if err = rows.Err(); err != nil {
return nil, err
}

var sqlWarnings []string
for rows.Next() {
var level, message string
var code int
if err := rows.Scan(&level, &code, &message); err != nil {
this.migrationContext.Log.Warningf("Failed to read SHOW WARNINGS row")
continue
}
migrationUniqueKeySuffix := fmt.Sprintf("for key '%s.%s'", this.migrationContext.GetGhostTableName(), this.migrationContext.UniqueKey.Name)
if strings.HasPrefix(message, "Duplicate entry") && strings.HasSuffix(message, migrationUniqueKeySuffix) {
continue
}
sqlWarnings = append(sqlWarnings, fmt.Sprintf("%s: %s (%d)", level, message, code))
}
this.migrationContext.MigrationLastInsertSQLWarnings = sqlWarnings
}

if err := tx.Commit(); err != nil {
return nil, err
}
Expand Down
92 changes: 92 additions & 0 deletions go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,98 @@ func (suite *ApplierTestSuite) TestCreateGhostTable() {
suite.Require().Equal("CREATE TABLE `_testing_gho` (\n `id` int DEFAULT NULL,\n `item_id` int DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci", createDDL)
}

func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySucceedsWithUniqueKeyWarningInsertedByDMLEvent() {
ctx := context.Background()

var err error

_, err = suite.db.ExecContext(ctx, "CREATE TABLE test.testing (id INT, item_id INT, UNIQUE KEY (item_id));")
suite.Require().NoError(err)

_, err = suite.db.ExecContext(ctx, "CREATE TABLE test._testing_gho (id INT, item_id INT, UNIQUE KEY (item_id));")
suite.Require().NoError(err)

connectionConfig, err := GetConnectionConfig(ctx, suite.mysqlContainer)
suite.Require().NoError(err)

migrationContext := base.NewMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.DatabaseName = "test"
migrationContext.SkipPortValidation = true
migrationContext.OriginalTableName = "testing"
migrationContext.SetConnectionConfig("innodb")

migrationContext.PanicOnWarnings = true

migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "item_id"})
migrationContext.UniqueKey = &sql.UniqueKey{
Name: "item_id",
Columns: *sql.NewColumnList([]string{"item_id"}),
}

applier := NewApplier(migrationContext)
suite.Require().NoError(applier.prepareQueries())
defer applier.Teardown()

err = applier.InitDBConnections()
suite.Require().NoError(err)

_, err = suite.db.ExecContext(ctx, "INSERT INTO test.testing (id, item_id) VALUES (123456, 42);")
suite.Require().NoError(err)

dmlEvents := []*binlog.BinlogDMLEvent{
{
DatabaseName: "test",
TableName: "testing",
DML: binlog.InsertDML,
NewColumnValues: sql.ToColumnValues([]interface{}{123456, 42}),
},
}
err = applier.ApplyDMLEventQueries(dmlEvents)
suite.Require().NoError(err)

err = applier.CreateChangelogTable()
suite.Require().NoError(err)
err = applier.ReadMigrationRangeValues()
suite.Require().NoError(err)

hasFurtherRange, expectedRangeSize, err := applier.CalculateNextIterationRangeEndValues()
suite.Require().NoError(err)
suite.Require().True(hasFurtherRange)
suite.Require().Equal(int64(1), expectedRangeSize)

_, rowsAffected, _, err := applier.ApplyIterationInsertQuery()
suite.Require().NoError(err)
suite.Require().Equal(int64(0), rowsAffected)

// Ensure Duplicate entry '42' for key '_testing_gho.item_id' is ignored correctly
suite.Require().Empty(applier.migrationContext.MigrationLastInsertSQLWarnings)

// Check that the row was inserted
rows, err := suite.db.Query("SELECT * FROM test._testing_gho")
suite.Require().NoError(err)
defer rows.Close()

var count, id, item_id int
for rows.Next() {
err = rows.Scan(&id, &item_id)
suite.Require().NoError(err)
count += 1
}
suite.Require().NoError(rows.Err())

suite.Require().Equal(1, count)
suite.Require().Equal(123456, id)
suite.Require().Equal(42, item_id)

suite.Require().
Equal(int64(1), migrationContext.TotalDMLEventsApplied)
suite.Require().
Equal(int64(0), migrationContext.RowsDeltaEstimate)
}

func TestApplier(t *testing.T) {
suite.Run(t, new(ApplierTestSuite))
}
16 changes: 15 additions & 1 deletion go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1238,8 +1238,9 @@ func (this *Migrator) iterateChunks() error {
// When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever

hasFurtherRange := false
expectedRangeSize := int64(0)
if err := this.retryOperation(func() (e error) {
hasFurtherRange, e = this.applier.CalculateNextIterationRangeEndValues()
hasFurtherRange, expectedRangeSize, e = this.applier.CalculateNextIterationRangeEndValues()
return e
}); err != nil {
return terminateRowIteration(err)
Expand All @@ -1265,6 +1266,19 @@ func (this *Migrator) iterateChunks() error {
if err != nil {
return err // wrapping call will retry
}

if this.migrationContext.PanicOnWarnings {
if len(this.migrationContext.MigrationLastInsertSQLWarnings) > 0 {
for _, warning := range this.migrationContext.MigrationLastInsertSQLWarnings {
this.migrationContext.Log.Infof("ApplyIterationInsertQuery has SQL warnings! %s", warning)
}
if expectedRangeSize != rowsAffected {
joinedWarnings := strings.Join(this.migrationContext.MigrationLastInsertSQLWarnings, "; ")
terminateRowIteration(fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings))
Copy link
Preview

Copilot AI Mar 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The call to 'terminateRowIteration' is not returned, so its effect on the control flow is lost. Prepend 'return' to properly terminate the iteration when SQL warnings occur.

Suggested change
terminateRowIteration(fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings))
return terminateRowIteration(fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings))

Copilot uses AI. Check for mistakes.

}
}
}

atomic.AddInt64(&this.migrationContext.TotalRowsCopied, rowsAffected)
atomic.AddInt64(&this.migrationContext.Iteration, 1)
return nil
Expand Down
68 changes: 54 additions & 14 deletions go/sql/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string

uniqueKeyColumnNames := duplicateNames(uniqueKeyColumns.Names())
uniqueKeyColumnAscending := make([]string, len(uniqueKeyColumnNames))
uniqueKeyColumnDescending := make([]string, len(uniqueKeyColumnNames))
uniqueKeyColumnDescending := make([]string, len(uniqueKeyColumnNames)) // TODO unused variable
Copy link
Preview

Copilot AI Mar 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable 'uniqueKeyColumnDescending' is declared but never used. Remove it if it is not needed to avoid confusion.

Suggested change
uniqueKeyColumnDescending := make([]string, len(uniqueKeyColumnNames)) // TODO unused variable

Copilot uses AI. Check for mistakes.

for i, column := range uniqueKeyColumns.Columns() {
uniqueKeyColumnNames[i] = EscapeName(uniqueKeyColumnNames[i])
if column.Type == EnumColumnType {
Expand All @@ -286,25 +286,46 @@ func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string
uniqueKeyColumnDescending[i] = fmt.Sprintf("%s desc", uniqueKeyColumnNames[i])
}
}
joinedColumnNames := strings.Join(uniqueKeyColumnNames, ", ")
result = fmt.Sprintf(`
select /* gh-ost %s.%s %s */
%s
from
%s.%s
where
%s and %s
%s,
(select count(*) from (
select
%s
from
%s.%s
where
%s and %s
limit
%d
) select_osc_chunk)
from (
select
%s
from
%s.%s
where
%s and %s
limit
%d
) select_osc_chunk
order by
%s
limit 1
offset %d`,
databaseName, tableName, hint,
strings.Join(uniqueKeyColumnNames, ", "),
joinedColumnNames, joinedColumnNames,
databaseName, tableName,
rangeStartComparison, rangeEndComparison,
rangeStartComparison, rangeEndComparison, chunkSize,
joinedColumnNames,
databaseName, tableName,
rangeStartComparison, rangeEndComparison, chunkSize,
strings.Join(uniqueKeyColumnAscending, ", "),
(chunkSize - 1),
)
return result, explodedArgs, nil
// 2x the explodedArgs for the subquery (CTE would be possible but not supported by MySQL 5)
return result, append(explodedArgs, explodedArgs...), nil
}

func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
Expand Down Expand Up @@ -342,8 +363,22 @@ func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName str
uniqueKeyColumnDescending[i] = fmt.Sprintf("%s desc", uniqueKeyColumnNames[i])
}
}

joinedColumnNames := strings.Join(uniqueKeyColumnNames, ", ")
result = fmt.Sprintf(`
select /* gh-ost %s.%s %s */ %s
select /* gh-ost %s.%s %s */
%s,
(select count(*) from (
select
%s
from
%s.%s
where
%s and %s
order by
%s
limit %d
) select_osc_chunk)
from (
select
%s
Expand All @@ -353,17 +388,22 @@ func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName str
%s and %s
order by
%s
limit %d) select_osc_chunk
limit %d
) select_osc_chunk
order by
%s
limit 1`,
databaseName, tableName, hint, strings.Join(uniqueKeyColumnNames, ", "),
strings.Join(uniqueKeyColumnNames, ", "), databaseName, tableName,
databaseName, tableName, hint, joinedColumnNames,
joinedColumnNames, databaseName, tableName,
rangeStartComparison, rangeEndComparison,
strings.Join(uniqueKeyColumnAscending, ", "), chunkSize,
joinedColumnNames, databaseName, tableName,
rangeStartComparison, rangeEndComparison,
strings.Join(uniqueKeyColumnAscending, ", "), chunkSize,
strings.Join(uniqueKeyColumnDescending, ", "),
)
return result, explodedArgs, nil
// 2x the explodedArgs for the subquery (CTE would be possible but not supported by MySQL 5)
return result, append(explodedArgs, explodedArgs...), nil
}

func BuildUniqueKeyMinValuesPreparedQuery(databaseName, tableName string, uniqueKey *UniqueKey) (string, error) {
Expand Down
Loading
Loading