Skip to content

Commit 52c9cfd

Browse files
committed
all job status
1 parent aaac737 commit 52c9cfd

File tree

1 file changed

+90
-0
lines changed

1 file changed

+90
-0
lines changed

pkg/service/http_service.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"strconv"
1111
"strings"
1212

13+
"github.com/olekukonko/tablewriter"
1314
"github.com/prometheus/client_golang/prometheus/promhttp"
1415
"github.com/selectdb/ccr_syncer/pkg/ccr"
1516
"github.com/selectdb/ccr_syncer/pkg/ccr/base"
@@ -106,6 +107,51 @@ func (s *HttpService) versionHandler(w http.ResponseWriter, r *http.Request) {
106107
writeJson(w, result)
107108
}
108109

110+
func writeTable(w http.ResponseWriter, data interface{}) {
111+
if data == nil || (reflect.ValueOf(data).Kind() == reflect.Ptr && reflect.ValueOf(data).IsNil()) {
112+
return
113+
}
114+
115+
jsonData, err := json.Marshal(data)
116+
if err != nil {
117+
http.Error(w, err.Error(), http.StatusInternalServerError)
118+
return
119+
}
120+
121+
var jsonMap map[string]interface{}
122+
if err := json.Unmarshal(jsonData, &jsonMap); err != nil {
123+
http.Error(w, err.Error(), http.StatusInternalServerError)
124+
return
125+
}
126+
127+
table := tablewriter.NewWriter(w)
128+
table.SetHeader([]string{"Key", "Value"})
129+
table.SetBorder(true)
130+
table.SetCenterSeparator("+")
131+
table.SetColumnSeparator("|")
132+
table.SetRowSeparator("-")
133+
table.SetAlignment(tablewriter.ALIGN_LEFT)
134+
table.SetAutoWrapText(false)
135+
table.SetAutoFormatHeaders(true)
136+
table.SetHeaderAlignment(tablewriter.ALIGN_LEFT)
137+
table.SetAlignment(tablewriter.ALIGN_LEFT)
138+
table.SetHeaderLine(true)
139+
table.SetRowLine(true)
140+
141+
for key, value := range jsonMap {
142+
switch v := value.(type) {
143+
case []interface{}:
144+
for i, item := range v {
145+
table.Append([]string{fmt.Sprintf("%s[%d]", key, i), fmt.Sprintf("%v", item)})
146+
}
147+
default:
148+
table.Append([]string{key, fmt.Sprintf("%v", v)})
149+
}
150+
}
151+
152+
table.Render()
153+
}
154+
109155
// createCcr creates a new CCR job and adds it to the job manager.
110156
// It takes a CreateCcrRequest as input and returns an error if there was a problem creating the job or adding it to the job manager.
111157
func createCcr(request *CreateCcrRequest, db storage.DB, jobManager *ccr.JobManager) error {
@@ -512,6 +558,49 @@ func (s *HttpService) listJobsHandler(w http.ResponseWriter, r *http.Request) {
512558
}
513559
}
514560

561+
// ListJobsStatus service
562+
func (s *HttpService) listJobsStatusHandler(w http.ResponseWriter, r *http.Request) {
563+
log.Infof("list all jobs status")
564+
565+
type result struct {
566+
*defaultResult
567+
Jobs []string `json:"jobs,omitempty"`
568+
JobStatus []*ccr.JobStatus `json:"status,omitempty"`
569+
}
570+
571+
var allJobStatus *result
572+
defer func() { writeTable(w, allJobStatus) }()
573+
574+
// use GetAllData to get all jobs
575+
if ans, err := s.db.GetAllData(); err != nil {
576+
log.Warnf("when list jobs, get all data failed: %+v", err)
577+
578+
allJobStatus = &result{
579+
defaultResult: newErrorResult(err.Error()),
580+
}
581+
} else {
582+
var jobData []string
583+
jobData = ans["jobs"]
584+
allJobs := make([]string, 0)
585+
alljobStatus := make([]*ccr.JobStatus, 0)
586+
for _, eachJob := range jobData {
587+
jobName := strings.Trim(strings.Split(eachJob, ",")[0], " ")
588+
jobStatus, err := s.jobManager.GetJobStatus(jobName)
589+
allJobs = append(allJobs, jobName)
590+
if err != nil {
591+
log.Warnf("when list all jobs, get job status failed: %+v", err)
592+
}
593+
alljobStatus = append(alljobStatus, jobStatus)
594+
}
595+
596+
allJobStatus = &result{
597+
defaultResult: newSuccessResult(),
598+
Jobs: allJobs,
599+
JobStatus: alljobStatus,
600+
}
601+
}
602+
}
603+
515604
// get job progress
516605
func (s *HttpService) jobProgressHandler(w http.ResponseWriter, r *http.Request) {
517606
log.Infof("get job progress")
@@ -688,6 +777,7 @@ func (s *HttpService) RegisterHandlers() {
688777
s.mux.HandleFunc("/resume", s.resumeHandler)
689778
s.mux.HandleFunc("/delete", s.deleteHandler)
690779
s.mux.HandleFunc("/job_status", s.statusHandler)
780+
s.mux.HandleFunc("/all_job_status", s.listJobsStatusHandler)
691781
s.mux.HandleFunc("/desync", s.desyncHandler)
692782
s.mux.HandleFunc("/update_job", s.updateJobHandler)
693783
s.mux.HandleFunc("/list_jobs", s.listJobsHandler)

0 commit comments

Comments
 (0)