This repository was archived by the owner on Jun 27, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 125
added in a "monitor" subcommand to monitor jobs to completion #141
Open
dansteen
wants to merge
5
commits into
hashicorp:main
Choose a base branch
from
dansteen:monitor
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
7aef1ea
added in a "monitor" subcommand to monitor jobs to completion
dansteen 051bffc
fixed formatting and comments
dansteen 30b8a58
fixed command struct to remove unused args
dansteen d3e3b1f
updated for new logging
dansteen 2717cbd
Merge branch 'master' into monitor
jrasell File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,89 @@ | ||
| package command | ||
|
|
||
| import ( | ||
| "strings" | ||
|
|
||
| "github.com/jrasell/levant/levant" | ||
| "github.com/jrasell/levant/levant/structs" | ||
| "github.com/jrasell/levant/logging" | ||
| ) | ||
|
|
||
| // MonitorCommand is the command implementation that allows users to monitor | ||
| // a job until it completes | ||
| type MonitorCommand struct { | ||
| Meta | ||
| } | ||
|
|
||
| // Help provides the help information for the template command. | ||
| func (c *MonitorCommand) Help() string { | ||
| helpText := ` | ||
| Usage: levant monitor [options] [EVAL_ID] | ||
|
|
||
| Monitor a Nomad job until it completes | ||
|
|
||
| Arguments: | ||
|
|
||
| EVAL_ID nomad job evaluation | ||
|
|
||
| General Options: | ||
|
|
||
| -timeout=<int> | ||
| Number of seconds to allow until we exit with an error. | ||
|
|
||
| -log-level=<level> | ||
| Specify the verbosity level of Levant's logs. Valid values include DEBUG, | ||
| INFO, and WARN, in decreasing order of verbosity. The default is INFO. | ||
| ` | ||
| return strings.TrimSpace(helpText) | ||
| } | ||
|
|
||
| // Synopsis is provides a brief summary of the template command. | ||
| func (c *MonitorCommand) Synopsis() string { | ||
| return "Monitor a Nomad job until it completes" | ||
| } | ||
|
|
||
| // Run triggers a run of the Levant monitor function. | ||
| func (c *MonitorCommand) Run(args []string) int { | ||
|
|
||
| var timeout uint64 | ||
| var evalID string | ||
| var err error | ||
| config := &structs.Config{} | ||
|
|
||
| flags := c.Meta.FlagSet("monitor", FlagSetVars) | ||
| flags.Usage = func() { c.UI.Output(c.Help()) } | ||
|
|
||
| flags.Uint64Var(&timeout, "timeout", 0, "") | ||
| flags.StringVar(&config.LogLevel, "log-level", "INFO", "") | ||
|
|
||
| if err = flags.Parse(args); err != nil { | ||
| return 1 | ||
| } | ||
|
|
||
| args = flags.Args() | ||
|
|
||
| if err = logging.SetupLogger(config.LogLevel, config.LogFormat); err != nil { | ||
| c.UI.Error(err.Error()) | ||
| return 1 | ||
| } | ||
|
|
||
| if len(args) == 1 { | ||
| evalID = args[0] | ||
| } else if len(args) == 0 { | ||
| c.UI.Error(c.Help()) | ||
| c.UI.Error("\nERROR: Please provide an evaluation ID to monitor.") | ||
| return 1 | ||
| } else { | ||
| c.UI.Error(c.Help()) | ||
| return 1 | ||
| } | ||
|
|
||
| // trigger our monitor | ||
| err = levant.StartMonitor(config, &evalID, timeout, &c.Meta.flagVars) | ||
| if err != nil { | ||
| // we have already reported the errors so we don't need to do it again here. | ||
| return 1 | ||
| } | ||
|
|
||
| return 0 | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,185 @@ | ||
| package levant | ||
|
|
||
| import ( | ||
| "errors" | ||
| "sync" | ||
| "time" | ||
|
|
||
| nomad "github.com/hashicorp/nomad/api" | ||
| nomadStructs "github.com/hashicorp/nomad/nomad/structs" | ||
| "github.com/jrasell/levant/levant/structs" | ||
| "github.com/rs/zerolog/log" | ||
| ) | ||
|
|
||
| // StartMonitor will start monitoring a job | ||
| func StartMonitor(config *structs.Config, evalID *string, timeoutSeconds uint64, flagVars *map[string]string) error { | ||
| // Create our new deployment object. |~ | ||
| levantDep, err := newLevantDeployment(config) | ||
| if err != nil { | ||
| log.Error().Err(err).Msgf("levant/monitor: unable to setup Levant deployment: %v", err) | ||
| return err | ||
| } | ||
|
|
||
| // start monitoring | ||
| err = levantDep.monitor(evalID, timeoutSeconds) | ||
| if err != nil { | ||
| // we have already reported the error so we don't need to report it again here | ||
| return err | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| // Monitor follows a job until it completes. | ||
| func (l *levantDeployment) monitor(evalID *string, timeoutSeconds uint64) error { | ||
|
|
||
| // set our timeout | ||
| timeout := time.Tick(time.Second * time.Duration(timeoutSeconds)) | ||
|
|
||
| // close this to stop the job monitor | ||
| finish := make(chan bool) | ||
| defer close(finish) | ||
| // set up some communication channels | ||
| jobChan := make(chan *nomad.Job, 1) | ||
| errChan := make(chan error, 1) | ||
|
|
||
| // get the evaluation | ||
| eval, _, err := l.nomad.Evaluations().Info(*evalID, nil) | ||
| if err != nil { | ||
| log.Error().Err(err).Msgf("levant/monitor: unable to get evaluation %v: %v", *evalID, err) | ||
| return err | ||
| } | ||
|
|
||
| // get the job name | ||
| jobName := eval.JobID | ||
|
|
||
| // start our monitor | ||
| go l.monitorJobInfo(jobName, jobChan, errChan, finish) | ||
|
|
||
| for { | ||
| select { | ||
| case <-timeout: | ||
| log.Error().Err(err).Msgf("levant/monitor: timeout reached while monitoring job %s", jobName) | ||
| // run the allocation inspector | ||
| var allocIDS []string | ||
| // get some additional information about the exit of the job | ||
| allocs, _, err := l.nomad.Evaluations().Allocations(*evalID, nil) | ||
| if err != nil { | ||
| log.Error().Err(err).Msgf("levant/monitor: unable to get allocations from evaluation %s: %v", evalID, err) | ||
| return err | ||
| } | ||
| // check to see if any of our allocations failed | ||
| for _, alloc := range allocs { | ||
| for _, task := range alloc.TaskStates { | ||
| // we need to test for success | ||
| if task.State != nomadStructs.TaskStarted { | ||
| allocIDS = append(allocIDS, alloc.ID) | ||
| // once we add the allocation we don't need to add it again | ||
| break | ||
| } | ||
| } | ||
| } | ||
|
|
||
| l.inspectAllocs(allocIDS) | ||
| return errors.New("timeout reached") | ||
| case err = <-errChan: | ||
| log.Error().Err(err).Msgf("levant/monitor: unable to query job %s: %v", jobName, err) | ||
| log.Error().Err(err).Msg("Retrying...") | ||
|
|
||
| case job := <-jobChan: | ||
| // depending on the state of the job we do different things | ||
| switch *job.Status { | ||
| // if the job is stopped then we take some action depending on why it stopped | ||
| case nomadStructs.JobStatusDead: | ||
| var allocIDS []string | ||
| // get some additional information about the exit of the job | ||
| allocs, _, err := l.nomad.Evaluations().Allocations(*evalID, nil) | ||
| if err != nil { | ||
| log.Error().Err(err).Msgf("levant/monitor: unable to get allocations from evaluation %s: %v", evalID, err) | ||
| return err | ||
| } | ||
| // check to see if any of our allocations failed | ||
| for _, alloc := range allocs { | ||
| for _, task := range alloc.TaskStates { | ||
| if task.Failed { | ||
| allocIDS = append(allocIDS, alloc.ID) | ||
| } | ||
| } | ||
| } | ||
| if len(allocIDS) > 0 { | ||
| l.inspectAllocs(allocIDS) | ||
| return errors.New("Some or all allocations failed") | ||
| } | ||
| // otherwise we print a message and just return no error | ||
| log.Info().Msgf("levant/monitor: job %s has status %s", jobName, *job.Status) | ||
| return nil | ||
| case nomadStructs.JobStatusRunning: | ||
| log.Info().Msgf("levant/monitor: job %s has status %s", jobName, *job.Status) | ||
| // if its a paramaterized or periodic job we stop here | ||
| if job.IsParameterized() { | ||
| log.Info().Msgf("levant/monitor: job %s is parameterized. Running is its final state.", jobName) | ||
| return nil | ||
| } | ||
| if job.IsPeriodic() { | ||
| log.Info().Msgf("levant/monitor: job %s is periodic. Running is its final state.", jobName) | ||
| return nil | ||
| } | ||
| default: | ||
| log.Debug().Msgf("levant/monitor: got job state %s. Don't know what to do with that.", *job.Status) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // monitorJobInfo will get information on a job from nomad and returns the information on channels | ||
| // once it has updated | ||
| func (l *levantDeployment) monitorJobInfo(jobName string, jobChan chan<- *nomad.Job, errChan chan<- error, done chan bool) { | ||
|
|
||
| // Setup the Nomad QueryOptions to allow blocking query and a timeout. | ||
| q := &nomad.QueryOptions{WaitIndex: 0, WaitTime: time.Second * 10} | ||
|
|
||
| for { | ||
| select { | ||
| case <-done: | ||
| // allow us to exit on demand (technically, it will still need to wait for the namad query to return) | ||
| return | ||
| default: | ||
| // get our job info | ||
| job, meta, err := l.nomad.Jobs().Info(jobName, q) | ||
| if err != nil { | ||
| errChan <- err | ||
| // sleep a bit before retrying | ||
| time.Sleep(time.Second * 5) | ||
| continue | ||
| } | ||
| // only take action if the information has changed | ||
| if meta.LastIndex > q.WaitIndex { | ||
| q.WaitIndex = meta.LastIndex | ||
| jobChan <- job | ||
| } else { | ||
| // log a debug message | ||
| log.Debug().Msgf("levant/monitor: job %s currently has status %s", jobName, *job.Status) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // inspectAllocs is a helper function that will call the allocInspector for each of the provided allocations | ||
| // and return when it has completed | ||
| func (l *levantDeployment) inspectAllocs(allocs []string) { | ||
| // if we have failed allocations than we print information about them | ||
| if len(allocs) > 0 { | ||
| // we want to run throuh and get messages for all of our failed allocations in parallel | ||
| var wg sync.WaitGroup | ||
| wg.Add(+len(allocs)) | ||
|
|
||
| // Inspect each allocation. | ||
| for _, id := range allocs { | ||
| log.Debug().Msgf("levant/monitor: launching allocation inspector for alloc %v", id) | ||
| go l.allocInspector(id, &wg) | ||
| } | ||
|
|
||
| // wait until our allocations have printed messages | ||
| wg.Wait() | ||
| return | ||
| } | ||
| } |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More of a design question and I am happy to be wrong, but would it not be better if this was the JobID and then Levant would discover the allocs from this, rather than the eval ID?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I completely agree - that would be both better and more user friendly. The trouble is that if you are just monitoring the job via the job id, you aren't really guaranteed to be monitoring a particular invocation of
levant deploy....As an example, if you run
levant deploy job_1, and then a different user runslevant deploy job_1after you do and before you run themonitorcommand, then, if you runlevant monitor job_1and we use the job_id to pull the most recent evaluation, you will wind up monitoring the other users invocation of the job rather than your own. I agree this is somewhat unlikely, however I can certainly see automation tools running into this. It seemed like the need to guarantee that the command monitors what you think it is monitoring was more important.Having said that, I do think that you are right and many people will not care. We can put a ticket in for that, and I can add that in as an additional feature if that works for you.