Skip to content
This repository was archived by the owner on Jun 27, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions command/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package command

import (
"strings"

"github.com/jrasell/levant/levant"
"github.com/jrasell/levant/levant/structs"
"github.com/jrasell/levant/logging"
)

// MonitorCommand is the command implementation that allows users to monitor
// a job until it completes
type MonitorCommand struct {
Meta
}

// Help provides the help information for the template command.
func (c *MonitorCommand) Help() string {
helpText := `
Usage: levant monitor [options] [EVAL_ID]

Monitor a Nomad job until it completes

Arguments:

EVAL_ID nomad job evaluation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More of a design question and I am happy to be wrong, but would it not be better if this was the JobID and then Levant would discover the allocs from this, rather than the eval ID?

Copy link
Contributor Author

@dansteen dansteen Apr 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I completely agree - that would be both better and more user friendly. The trouble is that if you are just monitoring the job via the job id, you aren't really guaranteed to be monitoring a particular invocation of levant deploy ....

As an example, if you run levant deploy job_1, and then a different user runs levant deploy job_1 after you do and before you run the monitor command, then, if you run levant monitor job_1 and we use the job_id to pull the most recent evaluation, you will wind up monitoring the other users invocation of the job rather than your own. I agree this is somewhat unlikely, however I can certainly see automation tools running into this. It seemed like the need to guarantee that the command monitors what you think it is monitoring was more important.

Having said that, I do think that you are right and many people will not care. We can put a ticket in for that, and I can add that in as an additional feature if that works for you.


General Options:

-timeout=<int>
Number of seconds to allow until we exit with an error.

-log-level=<level>
Specify the verbosity level of Levant's logs. Valid values include DEBUG,
INFO, and WARN, in decreasing order of verbosity. The default is INFO.
`
return strings.TrimSpace(helpText)
}

// Synopsis is provides a brief summary of the template command.
func (c *MonitorCommand) Synopsis() string {
return "Monitor a Nomad job until it completes"
}

// Run triggers a run of the Levant monitor function.
func (c *MonitorCommand) Run(args []string) int {

var timeout uint64
var evalID string
var err error
config := &structs.Config{}

flags := c.Meta.FlagSet("monitor", FlagSetVars)
flags.Usage = func() { c.UI.Output(c.Help()) }

flags.Uint64Var(&timeout, "timeout", 0, "")
flags.StringVar(&config.LogLevel, "log-level", "INFO", "")

if err = flags.Parse(args); err != nil {
return 1
}

args = flags.Args()

if err = logging.SetupLogger(config.LogLevel, config.LogFormat); err != nil {
c.UI.Error(err.Error())
return 1
}

if len(args) == 1 {
evalID = args[0]
} else if len(args) == 0 {
c.UI.Error(c.Help())
c.UI.Error("\nERROR: Please provide an evaluation ID to monitor.")
return 1
} else {
c.UI.Error(c.Help())
return 1
}

// trigger our monitor
err = levant.StartMonitor(config, &evalID, timeout, &c.Meta.flagVars)
if err != nil {
// we have already reported the errors so we don't need to do it again here.
return 1
}

return 0
}
5 changes: 5 additions & 0 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ func Commands(metaPtr *command.Meta) map[string]cli.CommandFactory {
Meta: meta,
}, nil
},
"monitor": func() (cli.Command, error) {
return &command.MonitorCommand{
Meta: meta,
}, nil
},
"render": func() (cli.Command, error) {
return &command.RenderCommand{
Meta: meta,
Expand Down
185 changes: 185 additions & 0 deletions levant/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
package levant

import (
"errors"
"sync"
"time"

nomad "github.com/hashicorp/nomad/api"
nomadStructs "github.com/hashicorp/nomad/nomad/structs"
"github.com/jrasell/levant/levant/structs"
"github.com/rs/zerolog/log"
)

// StartMonitor will start monitoring a job
func StartMonitor(config *structs.Config, evalID *string, timeoutSeconds uint64, flagVars *map[string]string) error {
// Create our new deployment object. |~
levantDep, err := newLevantDeployment(config)
if err != nil {
log.Error().Err(err).Msgf("levant/monitor: unable to setup Levant deployment: %v", err)
return err
}

// start monitoring
err = levantDep.monitor(evalID, timeoutSeconds)
if err != nil {
// we have already reported the error so we don't need to report it again here
return err
}
return nil
}

// Monitor follows a job until it completes.
func (l *levantDeployment) monitor(evalID *string, timeoutSeconds uint64) error {

// set our timeout
timeout := time.Tick(time.Second * time.Duration(timeoutSeconds))

// close this to stop the job monitor
finish := make(chan bool)
defer close(finish)
// set up some communication channels
jobChan := make(chan *nomad.Job, 1)
errChan := make(chan error, 1)

// get the evaluation
eval, _, err := l.nomad.Evaluations().Info(*evalID, nil)
if err != nil {
log.Error().Err(err).Msgf("levant/monitor: unable to get evaluation %v: %v", *evalID, err)
return err
}

// get the job name
jobName := eval.JobID

// start our monitor
go l.monitorJobInfo(jobName, jobChan, errChan, finish)

for {
select {
case <-timeout:
log.Error().Err(err).Msgf("levant/monitor: timeout reached while monitoring job %s", jobName)
// run the allocation inspector
var allocIDS []string
// get some additional information about the exit of the job
allocs, _, err := l.nomad.Evaluations().Allocations(*evalID, nil)
if err != nil {
log.Error().Err(err).Msgf("levant/monitor: unable to get allocations from evaluation %s: %v", evalID, err)
return err
}
// check to see if any of our allocations failed
for _, alloc := range allocs {
for _, task := range alloc.TaskStates {
// we need to test for success
if task.State != nomadStructs.TaskStarted {
allocIDS = append(allocIDS, alloc.ID)
// once we add the allocation we don't need to add it again
break
}
}
}

l.inspectAllocs(allocIDS)
return errors.New("timeout reached")
case err = <-errChan:
log.Error().Err(err).Msgf("levant/monitor: unable to query job %s: %v", jobName, err)
log.Error().Err(err).Msg("Retrying...")

case job := <-jobChan:
// depending on the state of the job we do different things
switch *job.Status {
// if the job is stopped then we take some action depending on why it stopped
case nomadStructs.JobStatusDead:
var allocIDS []string
// get some additional information about the exit of the job
allocs, _, err := l.nomad.Evaluations().Allocations(*evalID, nil)
if err != nil {
log.Error().Err(err).Msgf("levant/monitor: unable to get allocations from evaluation %s: %v", evalID, err)
return err
}
// check to see if any of our allocations failed
for _, alloc := range allocs {
for _, task := range alloc.TaskStates {
if task.Failed {
allocIDS = append(allocIDS, alloc.ID)
}
}
}
if len(allocIDS) > 0 {
l.inspectAllocs(allocIDS)
return errors.New("Some or all allocations failed")
}
// otherwise we print a message and just return no error
log.Info().Msgf("levant/monitor: job %s has status %s", jobName, *job.Status)
return nil
case nomadStructs.JobStatusRunning:
log.Info().Msgf("levant/monitor: job %s has status %s", jobName, *job.Status)
// if its a paramaterized or periodic job we stop here
if job.IsParameterized() {
log.Info().Msgf("levant/monitor: job %s is parameterized. Running is its final state.", jobName)
return nil
}
if job.IsPeriodic() {
log.Info().Msgf("levant/monitor: job %s is periodic. Running is its final state.", jobName)
return nil
}
default:
log.Debug().Msgf("levant/monitor: got job state %s. Don't know what to do with that.", *job.Status)
}
}
}
}

// monitorJobInfo will get information on a job from nomad and returns the information on channels
// once it has updated
func (l *levantDeployment) monitorJobInfo(jobName string, jobChan chan<- *nomad.Job, errChan chan<- error, done chan bool) {

// Setup the Nomad QueryOptions to allow blocking query and a timeout.
q := &nomad.QueryOptions{WaitIndex: 0, WaitTime: time.Second * 10}

for {
select {
case <-done:
// allow us to exit on demand (technically, it will still need to wait for the namad query to return)
return
default:
// get our job info
job, meta, err := l.nomad.Jobs().Info(jobName, q)
if err != nil {
errChan <- err
// sleep a bit before retrying
time.Sleep(time.Second * 5)
continue
}
// only take action if the information has changed
if meta.LastIndex > q.WaitIndex {
q.WaitIndex = meta.LastIndex
jobChan <- job
} else {
// log a debug message
log.Debug().Msgf("levant/monitor: job %s currently has status %s", jobName, *job.Status)
}
}
}
}

// inspectAllocs is a helper function that will call the allocInspector for each of the provided allocations
// and return when it has completed
func (l *levantDeployment) inspectAllocs(allocs []string) {
// if we have failed allocations than we print information about them
if len(allocs) > 0 {
// we want to run throuh and get messages for all of our failed allocations in parallel
var wg sync.WaitGroup
wg.Add(+len(allocs))

// Inspect each allocation.
for _, id := range allocs {
log.Debug().Msgf("levant/monitor: launching allocation inspector for alloc %v", id)
go l.allocInspector(id, &wg)
}

// wait until our allocations have printed messages
wg.Wait()
return
}
}