Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -2291,6 +2291,7 @@ func (w *worker) updateJobRowCount(taskKey string, jobID int64) {
w.getReorgCtx(jobID).setRowCount(rowCount)
}

<<<<<<< HEAD
// submitAndWaitTask submits a task and wait for it to finish.
func submitAndWaitTask(ctx context.Context, taskKey string, taskType proto.TaskType, concurrency int, targetScope string, taskMeta []byte) error {
task, err := handle.SubmitTask(ctx, taskKey, taskType, concurrency, targetScope, taskMeta)
Expand All @@ -2300,6 +2301,8 @@ func submitAndWaitTask(ctx context.Context, taskKey string, taskType proto.TaskT
return handle.WaitTaskDoneOrPaused(ctx, task.ID)
}

=======
>>>>>>> b08a92082fe (disttask: fix task metrics for distributed execute framework panel (#59940))
func getNextPartitionInfo(reorg *reorgInfo, t table.PartitionedTable, currPhysicalTableID int64) (int64, kv.Key, kv.Key, error) {
pi := t.Meta().GetPartitionInfo()
if pi == nil {
Expand Down
1 change: 0 additions & 1 deletion pkg/disttask/framework/handle/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ go_library(
deps = [
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/storage",
"//pkg/metrics",
"//pkg/util/backoff",
"//pkg/util/logutil",
"@com_github_pingcap_errors//:errors",
Expand Down
2 changes: 0 additions & 2 deletions pkg/disttask/framework/handle/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/util/backoff"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
Expand Down Expand Up @@ -77,7 +76,6 @@ func SubmitTask(ctx context.Context, taskKey string, taskType proto.TaskType, co
if err != nil {
return nil, err
}
metrics.UpdateMetricsForAddTask(&task.TaskBase)

NotifyTaskChange()
return task, nil
Expand Down
15 changes: 15 additions & 0 deletions pkg/disttask/framework/mock/scheduler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion pkg/disttask/framework/scheduler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ go_library(
"//pkg/domain/infosync",
"//pkg/kv",
"//pkg/lightning/log",
"//pkg/metrics",
"//pkg/sessionctx",
"//pkg/util",
"//pkg/util/backoff",
Expand Down
44 changes: 42 additions & 2 deletions pkg/disttask/framework/scheduler/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

var subtaskCollector = newCollector()
var disttaskCollector = newCollector()

func init() {
prometheus.MustRegister(subtaskCollector)
prometheus.MustRegister(disttaskCollector)
}

// Because the exec_id of a subtask may change, after all tasks
Expand All @@ -37,13 +37,20 @@ func init() {
// Therefore, a custom collector is used.
type collector struct {
subtaskInfo atomic.Pointer[[]*proto.SubtaskBase]
taskInfo atomic.Pointer[[]*proto.TaskBase]

tasks *prometheus.Desc
subtasks *prometheus.Desc
subtaskDuration *prometheus.Desc
}

func newCollector() *collector {
return &collector{
tasks: prometheus.NewDesc(
"tidb_disttask_task_status",
"Number of tasks.",
[]string{"task_type", "status"}, nil,
),
subtasks: prometheus.NewDesc(
"tidb_disttask_subtasks",
"Number of subtasks.",
Expand All @@ -59,12 +66,45 @@ func newCollector() *collector {

// Describe implements the prometheus.Collector interface.
func (c *collector) Describe(ch chan<- *prometheus.Desc) {
ch <- c.tasks
ch <- c.subtasks
ch <- c.subtaskDuration
}

// Collect implements the prometheus.Collector interface.
func (c *collector) Collect(ch chan<- prometheus.Metric) {
c.collectTasks(ch)
c.collectSubtasks(ch)
}

func (c *collector) collectTasks(ch chan<- prometheus.Metric) {
p := c.taskInfo.Load()
if p == nil {
return
}
tasks := *p
// task type => state => cnt
taskTypeStateCnt := make(map[string]map[string]int)
for _, task := range tasks {
tp := task.Type.String()
if _, ok := taskTypeStateCnt[tp]; !ok {
taskTypeStateCnt[tp] = make(map[string]int)
}
state := task.State.String()
taskTypeStateCnt[tp][state]++
}
for tp, stateCnt := range taskTypeStateCnt {
for state, cnt := range stateCnt {
ch <- prometheus.MustNewConstMetric(c.tasks, prometheus.GaugeValue,
float64(cnt),
tp,
state,
)
}
}
}

func (c *collector) collectSubtasks(ch chan<- prometheus.Metric) {
p := c.subtaskInfo.Load()
if p == nil {
return
Expand Down
2 changes: 2 additions & 0 deletions pkg/disttask/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type TaskManager interface {
// to make sure lower rank tasks can be scheduled if resource is enough.
// The returned tasks are sorted by task order, see proto.Task.
GetTopUnfinishedTasks(ctx context.Context) ([]*proto.TaskBase, error)
// GetAllTasks gets all tasks with basic columns.
GetAllTasks(ctx context.Context) ([]*proto.TaskBase, error)
// GetAllSubtasks gets all subtasks with basic columns.
GetAllSubtasks(ctx context.Context) ([]*proto.SubtaskBase, error)
GetTasksInStates(ctx context.Context, states ...any) (task []*proto.Task, err error)
Expand Down
13 changes: 6 additions & 7 deletions pkg/disttask/framework/scheduler/scheduler_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/pkg/disttask/framework/handle"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/metrics"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/syncutil"
Expand Down Expand Up @@ -284,8 +283,6 @@ func (sm *Manager) startSchedulers(schedulableTasks []*proto.TaskBase) error {
sm.logger.Info("start scheduler without allocating slots",
zap.Int64("task-id", task.ID), zap.Stringer("state", task.State))
}

metrics.UpdateMetricsForScheduleTask(task)
sm.startScheduler(task, allocateSlots, reservedExecID)
}
return nil
Expand Down Expand Up @@ -362,7 +359,6 @@ func (sm *Manager) startScheduler(basicTask *proto.TaskBase, allocateSlots bool,
handle.NotifyTaskChange()
sm.logger.Info("task scheduler exist", zap.Int64("task-id", task.ID))
}()
metrics.UpdateMetricsForRunTask(task)
scheduler.ScheduleTask()
sm.finishCh <- struct{}{}
})
Expand Down Expand Up @@ -436,7 +432,6 @@ func (sm *Manager) cleanupFinishedTasks(tasks []*proto.Task) error {
// if task doesn't register cleanup function, mark it as cleaned.
cleanedTasks = append(cleanedTasks, task)
}
metrics.UpdateMetricsForFinishTask(task)
}
if firstErr != nil {
sm.logger.Warn("cleanup routine failed", zap.Error(errors.Trace(firstErr)))
Expand Down Expand Up @@ -465,13 +460,17 @@ func (sm *Manager) collectLoop() {
}

func (sm *Manager) collect() {
tasks, err := sm.taskMgr.GetAllTasks(sm.ctx)
if err != nil {
sm.logger.Warn("get all tasks failed", zap.Error(err))
}
subtasks, err := sm.taskMgr.GetAllSubtasks(sm.ctx)
if err != nil {
sm.logger.Warn("get all subtasks failed", zap.Error(err))
return
}

subtaskCollector.subtaskInfo.Store(&subtasks)
disttaskCollector.taskInfo.Store(&tasks)
disttaskCollector.subtaskInfo.Store(&subtasks)
}

// MockScheduler mock one scheduler for one task, only used for tests.
Expand Down
16 changes: 16 additions & 0 deletions pkg/disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,22 @@ func (mgr *TaskManager) GetSubtasksWithHistory(ctx context.Context, taskID int64
return subtasks, nil
}

// GetAllTasks gets all tasks with basic columns.
func (mgr *TaskManager) GetAllTasks(ctx context.Context) ([]*proto.TaskBase, error) {
rs, err := mgr.ExecuteSQLWithNewSession(ctx, `select `+basicTaskColumns+` from mysql.tidb_global_task t`)
if err != nil {
return nil, err
}
if len(rs) == 0 {
return nil, nil
}
tasks := make([]*proto.TaskBase, 0, len(rs))
for _, r := range rs {
tasks = append(tasks, row2TaskBasic(r))
}
return tasks, nil
}

// GetAllSubtasks gets all subtasks with basic columns.
func (mgr *TaskManager) GetAllSubtasks(ctx context.Context) ([]*proto.SubtaskBase, error) {
rs, err := mgr.ExecuteSQLWithNewSession(ctx, `select `+basicSubtaskColumns+` from mysql.tidb_background_subtask`)
Expand Down
5 changes: 3 additions & 2 deletions pkg/disttask/importinto/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@ import (
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/executor/importer"
"github.com/pingcap/tidb/pkg/kv"
<<<<<<< HEAD
"github.com/pingcap/tidb/pkg/lightning/checkpoints"
"github.com/pingcap/tidb/pkg/metrics"
=======
>>>>>>> b08a92082fe (disttask: fix task metrics for distributed execute framework panel (#59940))
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tikv/client-go/v2/util"
Expand Down Expand Up @@ -105,8 +108,6 @@ func doSubmitTask(ctx context.Context, plan *importer.Plan, stmt string, instanc
return 0, nil, err
}

metrics.UpdateMetricsForAddTask(task)

logutil.BgLogger().Info("job submitted to task queue",
zap.Int64("job-id", jobID),
zap.Int64("task-id", task.ID),
Expand Down
1 change: 0 additions & 1 deletion pkg/metrics/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/metrics",
visibility = ["//visibility:public"],
deps = [
"//pkg/disttask/framework/proto",
"//pkg/lightning/metric",
"//pkg/parser/terror",
"//pkg/timer/metrics",
Expand Down
66 changes: 0 additions & 66 deletions pkg/metrics/disttask.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,55 +15,16 @@
package metrics

import (
"fmt"
"time"

"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/prometheus/client_golang/prometheus"
)

const (
lblTaskStatus = "status"
lblTaskType = "task_type"
lblTaskID = "task_id"
lblSubTaskID = "subtask_id"
lblExecID = "exec_id"
)

// status for task
const (
SchedulingStatus = "scheduling"
WaitingStatus = "waiting"
RunningStatus = "running"
CompletedStatus = "completed"
)

var (
//DistTaskGauge is the gauge of dist task count.
DistTaskGauge *prometheus.GaugeVec
//DistTaskStartTimeGauge is the gauge of dist task count.
DistTaskStartTimeGauge *prometheus.GaugeVec
// DistTaskUsedSlotsGauge is the gauge of used slots on executor node.
DistTaskUsedSlotsGauge *prometheus.GaugeVec
)

// InitDistTaskMetrics initializes disttask metrics.
func InitDistTaskMetrics() {
DistTaskGauge = NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "disttask",
Name: "task_status",
Help: "Gauge of disttask.",
}, []string{lblTaskType, lblTaskStatus})

DistTaskStartTimeGauge = NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "tidb",
Subsystem: "disttask",
Name: "start_time",
Help: "Gauge of start_time of disttask.",
}, []string{lblTaskType, lblTaskStatus, lblTaskID})
DistTaskUsedSlotsGauge = NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "tidb",
Expand All @@ -72,30 +33,3 @@ func InitDistTaskMetrics() {
Help: "Gauge of used slots on a executor node.",
}, []string{"service_scope"})
}

// UpdateMetricsForAddTask update metrics when a task is added
func UpdateMetricsForAddTask(task *proto.TaskBase) {
DistTaskGauge.WithLabelValues(task.Type.String(), WaitingStatus).Inc()
DistTaskStartTimeGauge.WithLabelValues(task.Type.String(), WaitingStatus, fmt.Sprint(task.ID)).Set(float64(time.Now().UnixMicro()))
}

// UpdateMetricsForScheduleTask update metrics when a task is added
func UpdateMetricsForScheduleTask(task *proto.TaskBase) {
DistTaskGauge.WithLabelValues(task.Type.String(), WaitingStatus).Dec()
DistTaskGauge.WithLabelValues(task.Type.String(), SchedulingStatus).Inc()
DistTaskStartTimeGauge.DeleteLabelValues(task.Type.String(), WaitingStatus, fmt.Sprint(task.ID))
DistTaskStartTimeGauge.WithLabelValues(task.Type.String(), SchedulingStatus, fmt.Sprint(task.ID)).SetToCurrentTime()
}

// UpdateMetricsForRunTask update metrics when a task starts running
func UpdateMetricsForRunTask(task *proto.Task) {
DistTaskStartTimeGauge.DeleteLabelValues(task.Type.String(), SchedulingStatus, fmt.Sprint(task.ID))
DistTaskGauge.WithLabelValues(task.Type.String(), SchedulingStatus).Dec()
DistTaskGauge.WithLabelValues(task.Type.String(), RunningStatus).Inc()
}

// UpdateMetricsForFinishTask update metrics when a task is finished
func UpdateMetricsForFinishTask(task *proto.Task) {
DistTaskGauge.WithLabelValues(task.Type.String(), RunningStatus).Dec()
DistTaskGauge.WithLabelValues(task.Type.String(), CompletedStatus).Inc()
}
4 changes: 3 additions & 1 deletion pkg/metrics/grafana/tidb.json
Original file line number Diff line number Diff line change
Expand Up @@ -15320,6 +15320,7 @@
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"decimals": 0,
"title": "Task Status",
"tooltip": {
"shared": true,
Expand All @@ -15341,7 +15342,8 @@
"logBase": 1,
"max": null,
"min": null,
"show": true
"show": true,
"decimals": 0
},
{
"format": "short",
Expand Down
2 changes: 0 additions & 2 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,6 @@ func RegisterMetrics() {
prometheus.MustRegister(PlanReplayerTaskCounter)
prometheus.MustRegister(PlanReplayerRegisterTaskGauge)

prometheus.MustRegister(DistTaskGauge)
prometheus.MustRegister(DistTaskStartTimeGauge)
prometheus.MustRegister(DistTaskUsedSlotsGauge)
prometheus.MustRegister(RunawayCheckerCounter)
prometheus.MustRegister(GlobalSortWriteToCloudStorageDuration)
Expand Down