From c36444c1e584cdecf1b5150bea92cc9a6415a438 Mon Sep 17 00:00:00 2001 From: ryan19929 Date: Thu, 12 Jun 2025 14:57:00 +0800 Subject: [PATCH] feat: support medium sync policy in CCR tasks --- pkg/ccr/base/spec.go | 27 +++ pkg/ccr/base/specer.go | 1 + pkg/ccr/handle/create_table.go | 179 +++++++++++++++++- pkg/ccr/handle/modify_partitions.go | 61 ++++++ pkg/ccr/job.go | 93 +++++++-- pkg/ccr/job_manager.go | 11 ++ pkg/ccr/record/modify_partition_property.go | 112 +++++++++++ pkg/rpc/fe.go | 46 ++--- .../frontendservice/FrontendService.go | 109 +++++++++-- .../frontendservice/k-FrontendService.go | 51 +++++ pkg/rpc/thrift/FrontendService.thrift | 1 + pkg/service/http_service.go | 50 +++++ 12 files changed, 684 insertions(+), 57 deletions(-) create mode 100644 pkg/ccr/handle/modify_partitions.go create mode 100644 pkg/ccr/record/modify_partition_property.go diff --git a/pkg/ccr/base/spec.go b/pkg/ccr/base/spec.go index 0eb5f895..988a5bde 100644 --- a/pkg/ccr/base/spec.go +++ b/pkg/ccr/base/spec.go @@ -1555,6 +1555,33 @@ func (s *Spec) ModifyTableProperty(destTableName string, modifyProperty *record. return s.Exec(sql) } +func (s *Spec) ModifyPartitionProperty(destTableName string, batchModifyPartitionsInfo *record.BatchModifyPartitionsInfo) error { + if batchModifyPartitionsInfo == nil || len(batchModifyPartitionsInfo.Infos) == 0 { + log.Warnf("empty partition infos, skip modify partition property") + return nil + } + + dbName := utils.FormatKeywordName(s.Database) + destTableName = utils.FormatKeywordName(destTableName) + + for _, partitionInfo := range batchModifyPartitionsInfo.Infos { + if partitionInfo.DataProperty == nil || partitionInfo.DataProperty.StorageMedium == "" { + log.Warnf("partition %d has no storage medium, skip modify partition property", partitionInfo.PartitionId) + continue + } + + sql := fmt.Sprintf("ALTER TABLE %s.%s MODIFY PARTITION %s SET (\"storage_medium\" = \"%s\")", + dbName, destTableName, utils.FormatKeywordName(partitionInfo.PartitionName), partitionInfo.DataProperty.StorageMedium) + + log.Infof("modify partition property sql: %s", sql) + if err := s.Exec(sql); err != nil { + log.Warnf("modify partition %s property failed: %v", partitionInfo.PartitionName, err) + } + } + + return nil +} + // Determine whether the error are network related, eg connection refused, connection reset, exposed from net packages. func isNetworkRelated(err error) bool { msg := err.Error() diff --git a/pkg/ccr/base/specer.go b/pkg/ccr/base/specer.go index a64c7e5c..66176e1f 100644 --- a/pkg/ccr/base/specer.go +++ b/pkg/ccr/base/specer.go @@ -66,6 +66,7 @@ type Specer interface { AddPartition(destTableName string, addPartition *record.AddPartition) error DropPartition(destTableName string, dropPartition *record.DropPartition) error RenamePartition(destTableName, oldPartition, newPartition string) error + ModifyPartitionProperty(destTableName string, batchModifyPartitionsInfo *record.BatchModifyPartitionsInfo) error LightningIndexChange(tableAlias string, changes *record.ModifyTableAddOrDropInvertedIndices) error BuildIndex(tableAlias string, buildIndex *record.IndexChangeJob) error diff --git a/pkg/ccr/handle/create_table.go b/pkg/ccr/handle/create_table.go index 260c652e..9e7994cd 100644 --- a/pkg/ccr/handle/create_table.go +++ b/pkg/ccr/handle/create_table.go @@ -1,6 +1,8 @@ package handle import ( + "fmt" + "regexp" "strings" "github.com/selectdb/ccr_syncer/pkg/ccr" @@ -18,6 +20,176 @@ type CreateTableHandle struct { IdempotentJobHandle[*record.CreateTable] } +// Check if error message indicates storage medium or capacity related issues +func isStorageMediumError(errMsg string) bool { + log.Infof("STORAGE_MEDIUM_DEBUG: Analyzing error message: %s", errMsg) + + patterns := []string{ + "capExceedLimit", + "Failed to find enough backend", + "not enough backend", + "storage medium", + "storage_medium", + "avail capacity", + "disk space", + "not enough space", + "replication num", + "replication tag", + } + + for _, pattern := range patterns { + if strings.Contains(strings.ToLower(errMsg), strings.ToLower(pattern)) { + log.Infof("STORAGE_MEDIUM_DEBUG: Found storage/capacity related pattern '%s' in error message", pattern) + return true + } + } + + log.Infof("STORAGE_MEDIUM_DEBUG: No storage/capacity related patterns found in error message") + return false +} + +// Extract storage_medium from CREATE TABLE SQL +func extractStorageMediumFromCreateTableSql(createSql string) string { + pattern := `"storage_medium"\s*=\s*"([^"]*)"` + re := regexp.MustCompile(pattern) + matches := re.FindStringSubmatch(createSql) + if len(matches) >= 2 { + medium := strings.ToLower(matches[1]) + log.Infof("STORAGE_MEDIUM_DEBUG: Extracted storage medium: %s", medium) + return medium + } + log.Infof("STORAGE_MEDIUM_DEBUG: No storage medium found in SQL") + return "" +} + +// Switch storage medium between SSD and HDD +func switchStorageMedium(medium string) string { + switch strings.ToLower(medium) { + case "ssd": + return "hdd" + case "hdd": + return "ssd" + default: + // Default to hdd if not standard medium + return "hdd" + } +} + +// Set specific storage_medium in CREATE TABLE SQL +func setStorageMediumInCreateTableSql(createSql string, medium string) string { + // Remove existing storage_medium first + createSql = ccr.FilterStorageMediumFromCreateTableSql(createSql) + + // Check if PROPERTIES clause exists + propertiesPattern := `PROPERTIES\s*\(` + if matched, _ := regexp.MatchString(propertiesPattern, createSql); matched { + // Add storage_medium at the beginning of PROPERTIES + pattern := `(PROPERTIES\s*\(\s*)` + replacement := fmt.Sprintf(`${1}"storage_medium" = "%s", `, medium) + createSql = regexp.MustCompile(pattern).ReplaceAllString(createSql, replacement) + } else { + // Add entire PROPERTIES clause + pattern := `(\s*)$` + replacement := fmt.Sprintf(` PROPERTIES ("storage_medium" = "%s")`, medium) + createSql = regexp.MustCompile(pattern).ReplaceAllString(createSql, replacement) + } + + return createSql +} + +// Process CREATE TABLE SQL according to medium sync policy +func processCreateTableSqlByMediumPolicy(j *ccr.Job, createTable *record.CreateTable) error { + // Note: We need to access Job's medium sync policy and feature flags + // For now, we'll implement basic logic based on what we know the Job should do + + // Check if medium sync policy feature is enabled (we assume it's enabled for new handler) + // This is a simplified version that handles the main cases + mediumPolicy := j.MediumSyncPolicy + + switch mediumPolicy { + case ccr.MediumSyncPolicySameWithUpstream: + // Keep upstream storage_medium unchanged + log.Infof("using same_with_upstream policy, keeping original storage_medium") + return nil + + case ccr.MediumSyncPolicyHDD: + // Force set to HDD + log.Infof("using hdd policy, setting storage_medium to hdd") + createTable.Sql = setStorageMediumInCreateTableSql(createTable.Sql, "hdd") + return nil + + default: + log.Warnf("unknown medium sync policy: %s, falling back to filter storage_medium", mediumPolicy) + if ccr.FeatureFilterStorageMedium { + createTable.Sql = ccr.FilterStorageMediumFromCreateTableSql(createTable.Sql) + } + return nil + } +} + +// Create table with medium retry mechanism +func createTableWithMediumRetry(j *ccr.Job, createTable *record.CreateTable, srcDb string) error { + originalSql := createTable.Sql + log.Infof("STORAGE_MEDIUM_DEBUG: Starting create table with medium retry for table: %s", createTable.TableName) + + // Process SQL according to medium policy + if err := processCreateTableSqlByMediumPolicy(j, createTable); err != nil { + return err + } + + // First attempt + err := j.IDest.CreateTableOrView(createTable, srcDb) + if err == nil { + log.Infof("STORAGE_MEDIUM_DEBUG: Create table succeeded on first attempt") + return nil + } + + log.Warnf("STORAGE_MEDIUM_DEBUG: First attempt failed: %s", err.Error()) + + // Check if it's storage related error and should retry + if !isStorageMediumError(err.Error()) { + log.Infof("STORAGE_MEDIUM_DEBUG: Not a storage related error, no retry") + return err + } + + // Extract current medium and switch to the other one + currentMedium := extractStorageMediumFromCreateTableSql(createTable.Sql) + if currentMedium == "" { + currentMedium = "ssd" // default + } + + switchedMedium := switchStorageMedium(currentMedium) + log.Infof("STORAGE_MEDIUM_DEBUG: Switching from %s to %s", currentMedium, switchedMedium) + + createTable.Sql = setStorageMediumInCreateTableSql(originalSql, switchedMedium) + + // Second attempt with switched medium + err = j.IDest.CreateTableOrView(createTable, srcDb) + if err == nil { + log.Infof("STORAGE_MEDIUM_DEBUG: Create table succeeded after switching to %s", switchedMedium) + return nil + } + + log.Warnf("STORAGE_MEDIUM_DEBUG: Second attempt with %s also failed: %s", switchedMedium, err.Error()) + + // Final attempt: remove storage_medium if still storage related error + if isStorageMediumError(err.Error()) { + log.Infof("STORAGE_MEDIUM_DEBUG: Removing storage_medium for final attempt") + createTable.Sql = ccr.FilterStorageMediumFromCreateTableSql(originalSql) + + err = j.IDest.CreateTableOrView(createTable, srcDb) + if err == nil { + log.Infof("STORAGE_MEDIUM_DEBUG: Create table succeeded after removing storage_medium") + return nil + } + + log.Warnf("STORAGE_MEDIUM_DEBUG: Final attempt without storage_medium also failed: %s", err.Error()) + } + + log.Errorf("STORAGE_MEDIUM_DEBUG: All attempts failed, returning final error") + return err +} + func (h *CreateTableHandle) Handle(j *ccr.Job, commitSeq int64, createTable *record.CreateTable) error { if j.SyncType != ccr.DBSync { return xerror.Errorf(xerror.Normal, "invalid sync type: %v", j.SyncType) @@ -68,12 +240,11 @@ func (h *CreateTableHandle) Handle(j *ccr.Job, commitSeq int64, createTable *rec } } - if ccr.FeatureFilterStorageMedium { - createTable.Sql = ccr.FilterStorageMediumFromCreateTableSql(createTable.Sql) - } + // Remove old storage_medium filtering logic, handled by new retry function createTable.Sql = ccr.FilterDynamicPartitionStoragePolicyFromCreateTableSql(createTable.Sql) - if err := j.IDest.CreateTableOrView(createTable, j.Src.Database); err != nil { + // Use new create table function with medium retry mechanism + if err := createTableWithMediumRetry(j, createTable, j.Src.Database); err != nil { errMsg := err.Error() if strings.Contains(errMsg, "Can not found function") { log.Warnf("skip creating table/view because the UDF function is not supported yet: %s", errMsg) diff --git a/pkg/ccr/handle/modify_partitions.go b/pkg/ccr/handle/modify_partitions.go new file mode 100644 index 00000000..1ad91984 --- /dev/null +++ b/pkg/ccr/handle/modify_partitions.go @@ -0,0 +1,61 @@ +package handle + +import ( + "github.com/selectdb/ccr_syncer/pkg/ccr" + "github.com/selectdb/ccr_syncer/pkg/ccr/record" + festruct "github.com/selectdb/ccr_syncer/pkg/rpc/kitex_gen/frontendservice" + "github.com/selectdb/ccr_syncer/pkg/xerror" + log "github.com/sirupsen/logrus" +) + +func init() { + ccr.RegisterJobHandle[*record.BatchModifyPartitionsInfo](festruct.TBinlogType_MODIFY_PARTITIONS, &ModifyPartitionsHandle{}) +} + +type ModifyPartitionsHandle struct { + // The modify partitions binlog is idempotent + IdempotentJobHandle[*record.BatchModifyPartitionsInfo] +} + +func (h *ModifyPartitionsHandle) Handle(j *ccr.Job, commitSeq int64, batchModifyPartitionsInfo *record.BatchModifyPartitionsInfo) error { + // TODO: custom by medium_sync_policy + if !ccr.FeatureMediumSyncPolicy || j.MediumSyncPolicy == "hdd" { + log.Warnf("skip modify partitions for FeatureMediumSyncPolicy off or medium_sync_policy is hdd") + return nil + } + + // Safety check: ensure we have partition infos to process + if batchModifyPartitionsInfo == nil || len(batchModifyPartitionsInfo.Infos) == 0 { + return xerror.Errorf(xerror.Normal, "batch modify partitions info is empty or nil") + } + + // Get table ID from the first partition info (all partitions should belong to the same table) + tableId := batchModifyPartitionsInfo.GetTableId() + if tableId <= 0 { + return xerror.Errorf(xerror.Normal, "invalid table ID: %d", tableId) + } + + // Check if it's a materialized view table + if isAsyncMv, err := j.IsMaterializedViewTable(tableId); err != nil { + return err + } else if isAsyncMv { + log.Warnf("skip modify partitions for materialized view table %d", tableId) + return nil + } + + // Get destination table name + destTableName, err := j.GetDestNameBySrcId(tableId) + if err != nil { + return err + } + + // Get the source cluster meta information and supplement the partition name information + srcMeta := j.GetSrcMeta() + if err := batchModifyPartitionsInfo.EnrichWithPartitionNames(srcMeta); err != nil { + log.Errorf("failed to enrich partition names from source meta: %v", err) + return err + } + + // Call spec layer method directly + return j.Dest.ModifyPartitionProperty(destTableName, batchModifyPartitionsInfo) +} diff --git a/pkg/ccr/job.go b/pkg/ccr/job.go index 69cd1a36..ef57708c 100644 --- a/pkg/ccr/job.go +++ b/pkg/ccr/job.go @@ -77,6 +77,7 @@ var ( featureSkipCheckAsyncMvTable bool featurePipelineCommit bool featureSeperatedHandles bool + FeatureMediumSyncPolicy bool flagBinlogBatchSize int64 @@ -120,6 +121,8 @@ func init() { "enable pipeline commit for upsert binlogs") flag.BoolVar(&featureSeperatedHandles, "feature_seperated_handles", true, "enable the seperated handles (the refactor)") + flag.BoolVar(&FeatureMediumSyncPolicy, "feature_medium_sync_policy", true, + "enable medium sync policy support for sync job") flag.Int64Var(&flagBinlogBatchSize, "binlog_batch_size", 16, "the max num of binlogs to get in a batch") } @@ -131,6 +134,12 @@ const ( TableSync SyncType = 1 ) +// Medium sync policy constants +const ( + MediumSyncPolicyHDD = "hdd" + MediumSyncPolicySameWithUpstream = "same_with_upstream" +) + func (s SyncType) String() string { switch s { case DBSync: @@ -211,6 +220,8 @@ type Job struct { destMeta Metaer `json:"-"` State JobState `json:"state"` Extra JobExtra `json:"extra"` + // Medium sync policy for backup/restore operations: "hdd" or "same_with_upstream" + MediumSyncPolicy string `json:"medium_sync_policy"` factory *Factory `json:"-"` @@ -237,6 +248,7 @@ type JobContext struct { SkipError bool AllowTableExists bool ReuseBinlogLabel bool + MediumSyncPolicy string Factory *Factory } @@ -251,16 +263,30 @@ func NewJobFromService(name string, ctx context.Context) (*Job, error) { src := jobContext.Src dest := jobContext.Dest id := getJobId(name, src, dest) + // Set default medium sync policy if not specified + mediumSyncPolicy := jobContext.MediumSyncPolicy + log.Infof("NewJobFromService: received medium_sync_policy=%s from JobContext", mediumSyncPolicy) + if mediumSyncPolicy == "" { + mediumSyncPolicy = MediumSyncPolicyHDD + log.Infof("NewJobFromService: medium_sync_policy was empty, set to default=%s", mediumSyncPolicy) + } + // Validate medium sync policy + if mediumSyncPolicy != MediumSyncPolicyHDD && mediumSyncPolicy != MediumSyncPolicySameWithUpstream { + return nil, xerror.Errorf(xerror.Normal, "invalid medium sync policy: %s, must be %s or %s", + mediumSyncPolicy, MediumSyncPolicyHDD, MediumSyncPolicySameWithUpstream) + } + job := &Job{ - Name: name, - Id: id, - Src: src, - ISrc: factory.NewSpecer(&src), - srcMeta: factory.NewMeta(&jobContext.Src), - Dest: dest, - IDest: factory.NewSpecer(&dest), - destMeta: factory.NewMeta(&jobContext.Dest), - State: JobRunning, + Name: name, + Id: id, + Src: src, + ISrc: factory.NewSpecer(&src), + srcMeta: factory.NewMeta(&jobContext.Src), + Dest: dest, + IDest: factory.NewSpecer(&dest), + destMeta: factory.NewMeta(&jobContext.Dest), + State: JobRunning, + MediumSyncPolicy: mediumSyncPolicy, Extra: JobExtra{ allowTableExists: jobContext.AllowTableExists, @@ -277,6 +303,7 @@ func NewJobFromService(name string, ctx context.Context) (*Job, error) { concurrencyManager: rpc.NewConcurrencyManager(), } + log.Infof("NewJobFromService: Job created with MediumSyncPolicy=%s", job.MediumSyncPolicy) if err := job.valid(); err != nil { return nil, xerror.Wrap(err, xerror.Normal, "job is invalid") @@ -724,6 +751,14 @@ func (j *Job) partialSync() error { // resulting in different schema of upstream and downstream views. we need to force replace isForceReplace := featureRestoreReplaceDiffSchema && j.progress.PartialSyncData.IsView isAtomicRestore := featureAtomicRestore && isForceReplace + mediumSyncPolicy := "hdd" + log.Infof("partialSync: FeatureMediumSyncPolicy=%t, j.MediumSyncPolicy=%s", FeatureMediumSyncPolicy, j.MediumSyncPolicy) + if FeatureMediumSyncPolicy && j.MediumSyncPolicy != "" { + mediumSyncPolicy = j.MediumSyncPolicy + log.Infof("partialSync: using job medium_sync_policy=%s", mediumSyncPolicy) + } else { + log.Infof("partialSync: using default medium_sync_policy=%s", mediumSyncPolicy) + } restoreReq := rpc.RestoreSnapshotRequest{ TableRefs: tableRefs, @@ -731,11 +766,12 @@ func (j *Job) partialSync() error { SnapshotResult: snapshotResp, // DO NOT drop exists tables and partitions - CleanPartitions: false, - CleanTables: false, - AtomicRestore: isAtomicRestore, - Compress: false, - ForceReplace: isForceReplace, + CleanPartitions: false, + CleanTables: false, + AtomicRestore: isAtomicRestore, + Compress: false, + ForceReplace: isForceReplace, + MediumSyncPolicy: mediumSyncPolicy, } restoreResp, err := destRpc.RestoreSnapshot(dest, &restoreReq) if err != nil { @@ -1179,6 +1215,12 @@ func (j *Job) fullSync() error { if featureRestoreReplaceDiffSchema { restoreReq.ForceReplace = true } + if FeatureMediumSyncPolicy { + restoreReq.MediumSyncPolicy = j.MediumSyncPolicy + log.Infof("fullSync: FeatureMediumSyncPolicy enabled, setting MediumSyncPolicy=%s", j.MediumSyncPolicy) + } else { + log.Infof("fullSync: FeatureMediumSyncPolicy disabled, not setting MediumSyncPolicy") + } restoreResp, err := destRpc.RestoreSnapshot(dest, &restoreReq) if err != nil { return err @@ -4595,3 +4637,26 @@ func getJobId(name string, src base.Spec, dest base.Spec) string { io.WriteString(h, dest.String()) return fmt.Sprintf("%x", h.Sum(nil)) } + +func (j *Job) UpdateMediumSyncPolicy(mediumSyncPolicy string) error { + defer j.raiseInterruptSignal()() + j.lock.Lock() + defer j.lock.Unlock() + + // Validate medium sync policy + if mediumSyncPolicy != MediumSyncPolicyHDD && mediumSyncPolicy != MediumSyncPolicySameWithUpstream { + return xerror.Errorf(xerror.Normal, "invalid medium sync policy: %s, must be %s or %s", + mediumSyncPolicy, MediumSyncPolicyHDD, MediumSyncPolicySameWithUpstream) + } + + oldMediumSyncPolicy := j.MediumSyncPolicy + j.MediumSyncPolicy = mediumSyncPolicy + + if err := j.persistJob(); err != nil { + j.MediumSyncPolicy = oldMediumSyncPolicy + return err + } + + log.Infof("update job %s medium sync policy from %s to %s", j.Name, oldMediumSyncPolicy, mediumSyncPolicy) + return nil +} diff --git a/pkg/ccr/job_manager.go b/pkg/ccr/job_manager.go index 174e4ee4..32ef571d 100644 --- a/pkg/ccr/job_manager.go +++ b/pkg/ccr/job_manager.go @@ -277,3 +277,14 @@ func (jm *JobManager) SkipBinlog(jobName string, params SkipBinlogParams) error return xerror.Errorf(xerror.Normal, "job not exist: %s", jobName) } } + +func (jm *JobManager) UpdateMediumSyncPolicy(jobName string, mediumSyncPolicy string) error { + jm.lock.Lock() + defer jm.lock.Unlock() + + if job, ok := jm.jobs[jobName]; ok { + return job.UpdateMediumSyncPolicy(mediumSyncPolicy) + } else { + return xerror.Errorf(xerror.Normal, "job not exist: %s", jobName) + } +} diff --git a/pkg/ccr/record/modify_partition_property.go b/pkg/ccr/record/modify_partition_property.go new file mode 100644 index 00000000..3d76bc81 --- /dev/null +++ b/pkg/ccr/record/modify_partition_property.go @@ -0,0 +1,112 @@ +package record + +import ( + "encoding/json" + "fmt" + + "github.com/selectdb/ccr_syncer/pkg/xerror" + log "github.com/sirupsen/logrus" +) + +type DataProperty struct { + StorageMedium string `json:"storageMedium"` + CooldownTimeMs int64 `json:"cooldownTimeMs"` + StoragePolicy string `json:"storagePolicy"` + IsMutable bool `json:"isMutable"` + StorageMediumSpecified bool `json:"storageMediumSpecified,omitempty"` +} + +type Tag struct { + Type string `json:"type"` + Value string `json:"value"` +} + +type ReplicaAllocation struct { + AllocMap map[string]int16 `json:"allocMap"` +} + +// ModifyPartitionInfo represents single partition modification info +type ModifyPartitionInfo struct { + DbId int64 `json:"dbId"` + TableId int64 `json:"tableId"` + PartitionId int64 `json:"partitionId"` + DataProperty *DataProperty `json:"dataProperty"` + ReplicationNum int16 `json:"replicationNum"` + IsInMemory bool `json:"isInMemory"` + ReplicaAlloc *ReplicaAllocation `json:"replicaAlloc"` + StoragePolicy string `json:"storagePolicy"` + TblProperties map[string]string `json:"tableProperties"` + // PartitionName is not from Binlog:ModifyPartitionInfo, it's used for cross-cluster partition mapping + PartitionName string `json:"partitionName,omitempty"` +} + +type BatchModifyPartitionsInfo struct { + Infos []*ModifyPartitionInfo `json:"infos"` +} + +func (batchModifyPartitionsInfo *BatchModifyPartitionsInfo) Deserialize(data string) error { + err := json.Unmarshal([]byte(data), &batchModifyPartitionsInfo) + if err != nil { + return xerror.Wrap(err, xerror.Normal, "unmarshal batch modify partitions info error") + } + + if len(batchModifyPartitionsInfo.Infos) == 0 { + return xerror.Errorf(xerror.Normal, "modify partition infos is empty") + } + + return nil +} + +func NewBatchModifyPartitionsInfoFromJson(data string) (*BatchModifyPartitionsInfo, error) { + var batchModifyPartitionsInfo BatchModifyPartitionsInfo + if err := batchModifyPartitionsInfo.Deserialize(data); err != nil { + return nil, err + } + return &batchModifyPartitionsInfo, nil +} + +func (batchModifyPartitionsInfo *BatchModifyPartitionsInfo) String() string { + return fmt.Sprintf("BatchModifyPartitionsInfo: Infos count: %d", len(batchModifyPartitionsInfo.Infos)) +} + +// GetTableId implements Record interface by returning the first table ID in the batch +func (batchModifyPartitionsInfo *BatchModifyPartitionsInfo) GetTableId() int64 { + if len(batchModifyPartitionsInfo.Infos) == 0 { + // This should not happen, because the Infos should not be empty after successful deserialization + log.Warnf("BatchModifyPartitionsInfo.Infos is empty, this should not happen after successful deserialization") + return -1 // return -1 to indicate invalid TableId + } + return batchModifyPartitionsInfo.Infos[0].TableId +} + +// GetPartitionName returns the partition name for cross-cluster mapping +func (info *ModifyPartitionInfo) GetPartitionName() string { + return info.PartitionName +} + +// Metaer interface for accessing partition metadata +type Metaer interface { + GetPartitionName(tableId int64, partitionId int64) (string, error) +} + +// EnrichWithPartitionNames enriches the batch with partition name information from source cluster meta +func (batchModifyPartitionsInfo *BatchModifyPartitionsInfo) EnrichWithPartitionNames(srcMeta Metaer) error { + for _, partitionInfo := range batchModifyPartitionsInfo.Infos { + // If the partition name information is already in the binlog, skip + if partitionInfo.PartitionName != "" { + continue + } + + // Get the partition name from the source cluster meta + partitionName, err := srcMeta.GetPartitionName(partitionInfo.TableId, partitionInfo.PartitionId) + if err != nil { + return xerror.Wrapf(err, xerror.Normal, + "failed to get partition name for table %d partition %d", + partitionInfo.TableId, partitionInfo.PartitionId) + } + + partitionInfo.PartitionName = partitionName + } + + return nil +} diff --git a/pkg/rpc/fe.go b/pkg/rpc/fe.go index 2ab7b3f3..b46e23df 100644 --- a/pkg/rpc/fe.go +++ b/pkg/rpc/fe.go @@ -91,14 +91,15 @@ func canUseNextAddr(err error) bool { } type RestoreSnapshotRequest struct { - TableRefs []*festruct.TTableRef - SnapshotName string - SnapshotResult *festruct.TGetSnapshotResult_ - AtomicRestore bool - CleanPartitions bool - CleanTables bool - Compress bool - ForceReplace bool + TableRefs []*festruct.TTableRef + SnapshotName string + SnapshotResult *festruct.TGetSnapshotResult_ + AtomicRestore bool + CleanPartitions bool + CleanTables bool + Compress bool + ForceReplace bool + MediumSyncPolicy string } type IFeRpc interface { @@ -866,26 +867,27 @@ func (rpc *singleFeClient) RestoreSnapshot(spec *base.Spec, restoreReq *RestoreS } req := &festruct.TRestoreSnapshotRequest{ - Table: &spec.Table, - LabelName: &restoreReq.SnapshotName, - RepoName: &repoName, - TableRefs: restoreReq.TableRefs, - Properties: properties, - Meta: meta, - JobInfo: jobInfo, - CleanTables: &restoreReq.CleanTables, - CleanPartitions: &restoreReq.CleanPartitions, - AtomicRestore: &restoreReq.AtomicRestore, - Compressed: utils.ThriftValueWrapper(restoreReq.Compress), - ForceReplace: &restoreReq.ForceReplace, + Table: &spec.Table, + LabelName: &restoreReq.SnapshotName, + RepoName: &repoName, + TableRefs: restoreReq.TableRefs, + Properties: properties, + Meta: meta, + JobInfo: jobInfo, + CleanTables: &restoreReq.CleanTables, + CleanPartitions: &restoreReq.CleanPartitions, + AtomicRestore: &restoreReq.AtomicRestore, + Compressed: utils.ThriftValueWrapper(restoreReq.Compress), + ForceReplace: &restoreReq.ForceReplace, + MediumSyncPolicy: utils.ThriftValueWrapper(restoreReq.MediumSyncPolicy), } setAuthInfo(req, spec) // NOTE: ignore meta, because it's too large - log.Debugf("RestoreSnapshotRequest user %s, db %s, table %s, label name %s, properties %v, clean tables: %t, clean partitions: %t, atomic restore: %t, compressed: %t, forceReplace: %t", + log.Debugf("RestoreSnapshotRequest user %s, db %s, table %s, label name %s, properties %v, clean tables: %t, clean partitions: %t, atomic restore: %t, compressed: %t, forceReplace: %t, mediumSyncPolicy: %s", req.GetUser(), req.GetDb(), req.GetTable(), req.GetLabelName(), properties, restoreReq.CleanTables, restoreReq.CleanPartitions, restoreReq.AtomicRestore, - req.GetCompressed(), restoreReq.ForceReplace) + req.GetCompressed(), restoreReq.ForceReplace, req.GetMediumSyncPolicy()) if resp, err := client.RestoreSnapshot(context.Background(), req); err != nil { return nil, xerror.Wrapf(err, xerror.RPC, "RestoreSnapshot failed") diff --git a/pkg/rpc/kitex_gen/frontendservice/FrontendService.go b/pkg/rpc/kitex_gen/frontendservice/FrontendService.go index 09b13aca..ec932fbe 100644 --- a/pkg/rpc/kitex_gen/frontendservice/FrontendService.go +++ b/pkg/rpc/kitex_gen/frontendservice/FrontendService.go @@ -54064,23 +54064,24 @@ func (p *TTableRef) Field3DeepEqual(src *string) bool { } type TRestoreSnapshotRequest struct { - Cluster *string `thrift:"cluster,1,optional" frugal:"1,optional,string" json:"cluster,omitempty"` - User *string `thrift:"user,2,optional" frugal:"2,optional,string" json:"user,omitempty"` - Passwd *string `thrift:"passwd,3,optional" frugal:"3,optional,string" json:"passwd,omitempty"` - Db *string `thrift:"db,4,optional" frugal:"4,optional,string" json:"db,omitempty"` - Table *string `thrift:"table,5,optional" frugal:"5,optional,string" json:"table,omitempty"` - Token *string `thrift:"token,6,optional" frugal:"6,optional,string" json:"token,omitempty"` - LabelName *string `thrift:"label_name,7,optional" frugal:"7,optional,string" json:"label_name,omitempty"` - RepoName *string `thrift:"repo_name,8,optional" frugal:"8,optional,string" json:"repo_name,omitempty"` - TableRefs []*TTableRef `thrift:"table_refs,9,optional" frugal:"9,optional,list" json:"table_refs,omitempty"` - Properties map[string]string `thrift:"properties,10,optional" frugal:"10,optional,map" json:"properties,omitempty"` - Meta []byte `thrift:"meta,11,optional" frugal:"11,optional,binary" json:"meta,omitempty"` - JobInfo []byte `thrift:"job_info,12,optional" frugal:"12,optional,binary" json:"job_info,omitempty"` - CleanTables *bool `thrift:"clean_tables,13,optional" frugal:"13,optional,bool" json:"clean_tables,omitempty"` - CleanPartitions *bool `thrift:"clean_partitions,14,optional" frugal:"14,optional,bool" json:"clean_partitions,omitempty"` - AtomicRestore *bool `thrift:"atomic_restore,15,optional" frugal:"15,optional,bool" json:"atomic_restore,omitempty"` - Compressed *bool `thrift:"compressed,16,optional" frugal:"16,optional,bool" json:"compressed,omitempty"` - ForceReplace *bool `thrift:"force_replace,17,optional" frugal:"17,optional,bool" json:"force_replace,omitempty"` + Cluster *string `thrift:"cluster,1,optional" frugal:"1,optional,string" json:"cluster,omitempty"` + User *string `thrift:"user,2,optional" frugal:"2,optional,string" json:"user,omitempty"` + Passwd *string `thrift:"passwd,3,optional" frugal:"3,optional,string" json:"passwd,omitempty"` + Db *string `thrift:"db,4,optional" frugal:"4,optional,string" json:"db,omitempty"` + Table *string `thrift:"table,5,optional" frugal:"5,optional,string" json:"table,omitempty"` + Token *string `thrift:"token,6,optional" frugal:"6,optional,string" json:"token,omitempty"` + LabelName *string `thrift:"label_name,7,optional" frugal:"7,optional,string" json:"label_name,omitempty"` + RepoName *string `thrift:"repo_name,8,optional" frugal:"8,optional,string" json:"repo_name,omitempty"` + TableRefs []*TTableRef `thrift:"table_refs,9,optional" frugal:"9,optional,list" json:"table_refs,omitempty"` + Properties map[string]string `thrift:"properties,10,optional" frugal:"10,optional,map" json:"properties,omitempty"` + Meta []byte `thrift:"meta,11,optional" frugal:"11,optional,binary" json:"meta,omitempty"` + JobInfo []byte `thrift:"job_info,12,optional" frugal:"12,optional,binary" json:"job_info,omitempty"` + CleanTables *bool `thrift:"clean_tables,13,optional" frugal:"13,optional,bool" json:"clean_tables,omitempty"` + CleanPartitions *bool `thrift:"clean_partitions,14,optional" frugal:"14,optional,bool" json:"clean_partitions,omitempty"` + AtomicRestore *bool `thrift:"atomic_restore,15,optional" frugal:"15,optional,bool" json:"atomic_restore,omitempty"` + Compressed *bool `thrift:"compressed,16,optional" frugal:"16,optional,bool" json:"compressed,omitempty"` + ForceReplace *bool `thrift:"force_replace,17,optional" frugal:"17,optional,bool" json:"force_replace,omitempty"` + MediumSyncPolicy *string `thrift:"medium_sync_policy,18,optional" frugal:"18,optional,string" json:"medium_sync_policy,omitempty"` } func NewTRestoreSnapshotRequest() *TRestoreSnapshotRequest { @@ -54242,6 +54243,15 @@ func (p *TRestoreSnapshotRequest) GetForceReplace() (v bool) { } return *p.ForceReplace } + +var TRestoreSnapshotRequest_MediumSyncPolicy_DEFAULT string + +func (p *TRestoreSnapshotRequest) GetMediumSyncPolicy() (v string) { + if !p.IsSetMediumSyncPolicy() { + return TRestoreSnapshotRequest_MediumSyncPolicy_DEFAULT + } + return *p.MediumSyncPolicy +} func (p *TRestoreSnapshotRequest) SetCluster(val *string) { p.Cluster = val } @@ -54293,6 +54303,9 @@ func (p *TRestoreSnapshotRequest) SetCompressed(val *bool) { func (p *TRestoreSnapshotRequest) SetForceReplace(val *bool) { p.ForceReplace = val } +func (p *TRestoreSnapshotRequest) SetMediumSyncPolicy(val *string) { + p.MediumSyncPolicy = val +} var fieldIDToName_TRestoreSnapshotRequest = map[int16]string{ 1: "cluster", @@ -54312,6 +54325,7 @@ var fieldIDToName_TRestoreSnapshotRequest = map[int16]string{ 15: "atomic_restore", 16: "compressed", 17: "force_replace", + 18: "medium_sync_policy", } func (p *TRestoreSnapshotRequest) IsSetCluster() bool { @@ -54382,6 +54396,10 @@ func (p *TRestoreSnapshotRequest) IsSetForceReplace() bool { return p.ForceReplace != nil } +func (p *TRestoreSnapshotRequest) IsSetMediumSyncPolicy() bool { + return p.MediumSyncPolicy != nil +} + func (p *TRestoreSnapshotRequest) Read(iprot thrift.TProtocol) (err error) { var fieldTypeId thrift.TType @@ -54537,6 +54555,14 @@ func (p *TRestoreSnapshotRequest) Read(iprot thrift.TProtocol) (err error) { } else if err = iprot.Skip(fieldTypeId); err != nil { goto SkipFieldError } + case 18: + if fieldTypeId == thrift.STRING { + if err = p.ReadField18(iprot); err != nil { + goto ReadFieldError + } + } else if err = iprot.Skip(fieldTypeId); err != nil { + goto SkipFieldError + } default: if err = iprot.Skip(fieldTypeId); err != nil { goto SkipFieldError @@ -54783,6 +54809,17 @@ func (p *TRestoreSnapshotRequest) ReadField17(iprot thrift.TProtocol) error { p.ForceReplace = _field return nil } +func (p *TRestoreSnapshotRequest) ReadField18(iprot thrift.TProtocol) error { + + var _field *string + if v, err := iprot.ReadString(); err != nil { + return err + } else { + _field = &v + } + p.MediumSyncPolicy = _field + return nil +} func (p *TRestoreSnapshotRequest) Write(oprot thrift.TProtocol) (err error) { var fieldId int16 @@ -54858,6 +54895,10 @@ func (p *TRestoreSnapshotRequest) Write(oprot thrift.TProtocol) (err error) { fieldId = 17 goto WriteFieldError } + if err = p.writeField18(oprot); err != nil { + fieldId = 18 + goto WriteFieldError + } } if err = oprot.WriteFieldStop(); err != nil { goto WriteFieldStopError @@ -55218,6 +55259,25 @@ WriteFieldEndError: return thrift.PrependError(fmt.Sprintf("%T write field 17 end error: ", p), err) } +func (p *TRestoreSnapshotRequest) writeField18(oprot thrift.TProtocol) (err error) { + if p.IsSetMediumSyncPolicy() { + if err = oprot.WriteFieldBegin("medium_sync_policy", thrift.STRING, 18); err != nil { + goto WriteFieldBeginError + } + if err := oprot.WriteString(*p.MediumSyncPolicy); err != nil { + return err + } + if err = oprot.WriteFieldEnd(); err != nil { + goto WriteFieldEndError + } + } + return nil +WriteFieldBeginError: + return thrift.PrependError(fmt.Sprintf("%T write field 18 begin error: ", p), err) +WriteFieldEndError: + return thrift.PrependError(fmt.Sprintf("%T write field 18 end error: ", p), err) +} + func (p *TRestoreSnapshotRequest) String() string { if p == nil { return "" @@ -55283,6 +55343,9 @@ func (p *TRestoreSnapshotRequest) DeepEqual(ano *TRestoreSnapshotRequest) bool { if !p.Field17DeepEqual(ano.ForceReplace) { return false } + if !p.Field18DeepEqual(ano.MediumSyncPolicy) { + return false + } return true } @@ -55482,6 +55545,18 @@ func (p *TRestoreSnapshotRequest) Field17DeepEqual(src *bool) bool { } return true } +func (p *TRestoreSnapshotRequest) Field18DeepEqual(src *string) bool { + + if p.MediumSyncPolicy == src { + return true + } else if p.MediumSyncPolicy == nil || src == nil { + return false + } + if strings.Compare(*p.MediumSyncPolicy, *src) != 0 { + return false + } + return true +} type TRestoreSnapshotResult_ struct { Status *status.TStatus `thrift:"status,1,optional" frugal:"1,optional,status.TStatus" json:"status,omitempty"` diff --git a/pkg/rpc/kitex_gen/frontendservice/k-FrontendService.go b/pkg/rpc/kitex_gen/frontendservice/k-FrontendService.go index 9eae5c9f..48b92d4c 100644 --- a/pkg/rpc/kitex_gen/frontendservice/k-FrontendService.go +++ b/pkg/rpc/kitex_gen/frontendservice/k-FrontendService.go @@ -39360,6 +39360,20 @@ func (p *TRestoreSnapshotRequest) FastRead(buf []byte) (int, error) { goto SkipFieldError } } + case 18: + if fieldTypeId == thrift.STRING { + l, err = p.FastReadField18(buf[offset:]) + offset += l + if err != nil { + goto ReadFieldError + } + } else { + l, err = bthrift.Binary.Skip(buf[offset:], fieldTypeId) + offset += l + if err != nil { + goto SkipFieldError + } + } default: l, err = bthrift.Binary.Skip(buf[offset:], fieldTypeId) offset += l @@ -39659,6 +39673,19 @@ func (p *TRestoreSnapshotRequest) FastReadField17(buf []byte) (int, error) { return offset, nil } +func (p *TRestoreSnapshotRequest) FastReadField18(buf []byte) (int, error) { + offset := 0 + + if v, l, err := bthrift.Binary.ReadString(buf[offset:]); err != nil { + return offset, err + } else { + offset += l + p.MediumSyncPolicy = &v + + } + return offset, nil +} + // for compatibility func (p *TRestoreSnapshotRequest) FastWrite(buf []byte) int { return 0 @@ -39685,6 +39712,7 @@ func (p *TRestoreSnapshotRequest) FastWriteNocopy(buf []byte, binaryWriter bthri offset += p.fastWriteField10(buf[offset:], binaryWriter) offset += p.fastWriteField11(buf[offset:], binaryWriter) offset += p.fastWriteField12(buf[offset:], binaryWriter) + offset += p.fastWriteField18(buf[offset:], binaryWriter) } offset += bthrift.Binary.WriteFieldStop(buf[offset:]) offset += bthrift.Binary.WriteStructEnd(buf[offset:]) @@ -39712,6 +39740,7 @@ func (p *TRestoreSnapshotRequest) BLength() int { l += p.field15Length() l += p.field16Length() l += p.field17Length() + l += p.field18Length() } l += bthrift.Binary.FieldStopLength() l += bthrift.Binary.StructEndLength() @@ -39923,6 +39952,17 @@ func (p *TRestoreSnapshotRequest) fastWriteField17(buf []byte, binaryWriter bthr return offset } +func (p *TRestoreSnapshotRequest) fastWriteField18(buf []byte, binaryWriter bthrift.BinaryWriter) int { + offset := 0 + if p.IsSetMediumSyncPolicy() { + offset += bthrift.Binary.WriteFieldBegin(buf[offset:], "medium_sync_policy", thrift.STRING, 18) + offset += bthrift.Binary.WriteStringNocopy(buf[offset:], binaryWriter, *p.MediumSyncPolicy) + + offset += bthrift.Binary.WriteFieldEnd(buf[offset:]) + } + return offset +} + func (p *TRestoreSnapshotRequest) field1Length() int { l := 0 if p.IsSetCluster() { @@ -40120,6 +40160,17 @@ func (p *TRestoreSnapshotRequest) field17Length() int { return l } +func (p *TRestoreSnapshotRequest) field18Length() int { + l := 0 + if p.IsSetMediumSyncPolicy() { + l += bthrift.Binary.FieldBeginLength("medium_sync_policy", thrift.STRING, 18) + l += bthrift.Binary.StringLengthNocopy(*p.MediumSyncPolicy) + + l += bthrift.Binary.FieldEndLength() + } + return l +} + func (p *TRestoreSnapshotResult_) FastRead(buf []byte) (int, error) { var err error var offset int diff --git a/pkg/rpc/thrift/FrontendService.thrift b/pkg/rpc/thrift/FrontendService.thrift index 20a6131e..8f732da6 100644 --- a/pkg/rpc/thrift/FrontendService.thrift +++ b/pkg/rpc/thrift/FrontendService.thrift @@ -1221,6 +1221,7 @@ struct TRestoreSnapshotRequest { 15: optional bool atomic_restore 16: optional bool compressed; 17: optional bool force_replace + 18: optional string medium_sync_policy } struct TRestoreSnapshotResult { diff --git a/pkg/service/http_service.go b/pkg/service/http_service.go index 58c65314..a16f9d5b 100644 --- a/pkg/service/http_service.go +++ b/pkg/service/http_service.go @@ -103,6 +103,8 @@ type CreateCcrRequest struct { // For table sync, allow to create ccr job even if the target table already exists. AllowTableExists bool `json:"allow_table_exists"` ReuseBinlogLabel bool `json:"reuse_binlog_label"` + // Medium sync policy for backup/restore operations: "hdd" or "same_with_upstream" + MediumSyncPolicy string `json:"medium_sync_policy"` } // Stringer @@ -139,9 +141,11 @@ func createCcr(request *CreateCcrRequest, db storage.DB, jobManager *ccr.JobMana SkipError: request.SkipError, AllowTableExists: request.AllowTableExists, ReuseBinlogLabel: request.ReuseBinlogLabel, + MediumSyncPolicy: request.MediumSyncPolicy, Db: db, Factory: jobManager.GetFactory(), } + job, err := ccr.NewJobFromService(request.Name, ctx) if err != nil { return err @@ -1084,6 +1088,51 @@ func (s *HttpService) failpointHandler(w http.ResponseWriter, r *http.Request) { result = newSuccessResult() } +type UpdateMediumSyncPolicyRequest struct { + CcrCommonRequest + MediumSyncPolicy string `json:"medium_sync_policy"` +} + +func (s *HttpService) updateMediumSyncPolicyHandler(w http.ResponseWriter, r *http.Request) { + log.Infof("update medium sync policy") + + var result *defaultResult + defer func() { writeJson(w, result) }() + + // Parse the JSON request body + var request UpdateMediumSyncPolicyRequest + err := json.NewDecoder(r.Body).Decode(&request) + if err != nil { + log.Warnf("update medium sync policy failed: %+v", err) + result = newErrorResult(err.Error()) + return + } + + if request.Name == "" { + log.Warnf("update medium sync policy failed: name is empty") + result = newErrorResult("name is empty") + return + } + + if request.MediumSyncPolicy == "" { + log.Warnf("update medium sync policy failed: medium_sync_policy is empty") + result = newErrorResult("medium_sync_policy is empty") + return + } + + if s.redirect(request.Name, w, r) { + return + } + + log.Infof("update medium sync policy for job %s to %s", request.Name, request.MediumSyncPolicy) + if err := s.jobManager.UpdateMediumSyncPolicy(request.Name, request.MediumSyncPolicy); err != nil { + log.Warnf("update medium sync policy failed: %+v", err) + result = newErrorResult(err.Error()) + } else { + result = newSuccessResult() + } +} + func (s *HttpService) RegisterHandlers() { s.mux.HandleFunc("/version", s.versionHandler) s.mux.HandleFunc("/create_ccr", s.createHandler) @@ -1104,6 +1153,7 @@ func (s *HttpService) RegisterHandlers() { s.mux.Handle("/metrics", xmetrics.GetHttpHandler()) s.mux.HandleFunc("/sync", s.syncHandler) s.mux.HandleFunc("/view", s.showJobStateHandler) + s.mux.HandleFunc("/update_medium_sync_policy", s.updateMediumSyncPolicyHandler) } func (s *HttpService) Start() error {