Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ services:
- `mail` to send mails
- `save` to save structured execution reports to a directory
- `slack` to send messages via a slack webhook
- `signal` to send messages via signal

These can be configured by setting the options listed below in the `[global]` section of your config.ini, or via docker labels on the `ofelia` container (regardless of where your job will actually be running).

Expand All @@ -164,6 +165,12 @@ These can be configured by setting the options listed below in the `[global]` se
- `slack-webhook` - URL of the slack webhook.
- `slack-only-on-error` - only send a slack message if the execution was not successful.

**Signal** - to send messages to Signal need to start and configure signal-cli-rest-api from here: https://github.com/bbernhard/signal-cli-rest-api
- `signal-url` - URL of the signal-sli-rest-api server like 'http://localhost:6001'
- `signal-number` - number FROM message will be sent (Must be already registered) like: '+48123456789'
- `signal-recipients` - list of recipients (could be a number or group Id like: 'group.TkpRc6pmK2ZwdVN2SCtSClFhMExwMWVMYkQ5RDNnSkl2UC9PMXVhV1FyUT0='
- `signal-only-on-error` - only send a signal message if the execution was not successful.

### Overlap
**Ofelia** can prevent that a job is run twice in parallel (e.g. if the first execution didn't complete before a second execution was scheduled. If a job has the option `no-overlap` set, it will not be run concurrently.

Expand Down
16 changes: 13 additions & 3 deletions cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ var IsDockerEnv bool
// Config contains the configuration
type Config struct {
Global struct {
middlewares.SlackConfig `mapstructure:",squash"`
middlewares.SaveConfig `mapstructure:",squash"`
middlewares.MailConfig `mapstructure:",squash"`
middlewares.SlackConfig `mapstructure:",squash"`
middlewares.SaveConfig `mapstructure:",squash"`
middlewares.MailConfig `mapstructure:",squash"`
middlewares.SignalConfig `mapstructure:",squash"`
}
ExecJobs map[string]*ExecJobConfig `gcfg:"job-exec" mapstructure:"job-exec,squash"`
RunJobs map[string]*RunJobConfig `gcfg:"job-run" mapstructure:"job-run,squash"`
Expand Down Expand Up @@ -146,6 +147,7 @@ func (c *Config) buildSchedulerMiddlewares(sh *core.Scheduler) {
sh.Use(middlewares.NewSlack(&c.Global.SlackConfig))
sh.Use(middlewares.NewSave(&c.Global.SaveConfig))
sh.Use(middlewares.NewMail(&c.Global.MailConfig))
sh.Use(middlewares.NewSignal(&c.Global.SignalConfig))
}

// ExecJobConfig contains all configuration params needed to build a ExecJob
Expand All @@ -155,13 +157,15 @@ type ExecJobConfig struct {
middlewares.SlackConfig `mapstructure:",squash"`
middlewares.SaveConfig `mapstructure:",squash"`
middlewares.MailConfig `mapstructure:",squash"`
middlewares.SignalConfig `mapstructure:",squash"`
}

func (c *ExecJobConfig) buildMiddlewares() {
c.ExecJob.Use(middlewares.NewOverlap(&c.OverlapConfig))
c.ExecJob.Use(middlewares.NewSlack(&c.SlackConfig))
c.ExecJob.Use(middlewares.NewSave(&c.SaveConfig))
c.ExecJob.Use(middlewares.NewMail(&c.MailConfig))
c.ExecJob.Use(middlewares.NewSignal(&c.SignalConfig))
}

// RunServiceConfig contains all configuration params needed to build a RunJob
Expand All @@ -171,6 +175,7 @@ type RunServiceConfig struct {
middlewares.SlackConfig `mapstructure:",squash"`
middlewares.SaveConfig `mapstructure:",squash"`
middlewares.MailConfig `mapstructure:",squash"`
middlewares.SignalConfig `mapstructure:",squash"`
}

type RunJobConfig struct {
Expand All @@ -179,13 +184,15 @@ type RunJobConfig struct {
middlewares.SlackConfig `mapstructure:",squash"`
middlewares.SaveConfig `mapstructure:",squash"`
middlewares.MailConfig `mapstructure:",squash"`
middlewares.SignalConfig `mapstructure:",squash"`
}

func (c *RunJobConfig) buildMiddlewares() {
c.RunJob.Use(middlewares.NewOverlap(&c.OverlapConfig))
c.RunJob.Use(middlewares.NewSlack(&c.SlackConfig))
c.RunJob.Use(middlewares.NewSave(&c.SaveConfig))
c.RunJob.Use(middlewares.NewMail(&c.MailConfig))
c.RunJob.Use(middlewares.NewSignal(&c.SignalConfig))
}

// LocalJobConfig contains all configuration params needed to build a RunJob
Expand All @@ -195,18 +202,21 @@ type LocalJobConfig struct {
middlewares.SlackConfig `mapstructure:",squash"`
middlewares.SaveConfig `mapstructure:",squash"`
middlewares.MailConfig `mapstructure:",squash"`
middlewares.SignalConfig `mapstructure:",squash"`
}

func (c *LocalJobConfig) buildMiddlewares() {
c.LocalJob.Use(middlewares.NewOverlap(&c.OverlapConfig))
c.LocalJob.Use(middlewares.NewSlack(&c.SlackConfig))
c.LocalJob.Use(middlewares.NewSave(&c.SaveConfig))
c.LocalJob.Use(middlewares.NewMail(&c.MailConfig))
c.LocalJob.Use(middlewares.NewSignal(&c.SignalConfig))
}

func (c *RunServiceConfig) buildMiddlewares() {
c.RunServiceJob.Use(middlewares.NewOverlap(&c.OverlapConfig))
c.RunServiceJob.Use(middlewares.NewSlack(&c.SlackConfig))
c.RunServiceJob.Use(middlewares.NewSave(&c.SaveConfig))
c.RunServiceJob.Use(middlewares.NewMail(&c.MailConfig))
c.RunServiceJob.Use(middlewares.NewSignal(&c.SignalConfig))
}
6 changes: 6 additions & 0 deletions config-example.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[global]
signal-url=http://127.0.0.1:6001
signal-number=+4812345678
signal-recipients=group.TkpFc4pmK2ZwdANdSCtSZlFhMEcwMWVUYkQ5RDNnSkl2UC9PMXVhV1FyUT0=
signal-recipients=+49012345689
signal-only-on-error=true
5 changes: 5 additions & 0 deletions core/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,18 @@ func (s *Scheduler) Start() error {
}

s.Logger.Debugf("Starting scheduler with %d jobs", len(s.Jobs))
s.logMiddlewares()

s.mergeMiddlewares()
s.isRunning = true
s.cron.Start()
return nil
}

func (s *Scheduler) logMiddlewares() {
s.Logger.Debugf("Configured %d Middleware(s): %g", len(s.middlewareContainer.m), s.middlewareContainer.m)
}

func (s *Scheduler) mergeMiddlewares() {
for _, j := range s.Jobs {
j.Use(s.Middlewares()...)
Expand Down
101 changes: 101 additions & 0 deletions middlewares/signal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package middlewares

import (
"bytes"
"encoding/json"
"fmt"
"net/http"

"github.com/mcuadros/ofelia/core"
)

type SignalConfig struct {
SignalURL string `gcfg:"signal-url" mapstructure:"signal-url" json:"-"`
SignalNumber string `gcfg:"signal-number" mapstructure:"signal-number"`
SignalRecipients []string `gcfg:"signal-recipients" mapstructure:"signal-recipients"`
SignalOnlyOnError bool `gcfg:"signal-only-on-error" mapstructure:"signal-only-on-error"`
}

func NewSignal(c *SignalConfig) core.Middleware {
var m core.Middleware
if !IsEmpty(c) {
m = &Signal{*c}
}

return m
}

type Signal struct {
SignalConfig
}

func (m *Signal) ContinueOnStop() bool {
return true
}

func (m *Signal) Run(ctx *core.Context) error {
err := ctx.Next()
ctx.Stop(err)

if ctx.Execution.Failed || !m.SignalOnlyOnError {
m.pushMessage(ctx)
}

return err
}

func (m *Signal) pushMessage(ctx *core.Context) {
endpoint := m.SignalURL + "/v2/send"
payload, _ := json.Marshal(m.buildMessage(ctx))
ctx.Logger.Noticef("Sending Signal message. Sender: %s, Recipients: %v", m.SignalNumber, m.SignalRecipients)

resp, err := http.Post(endpoint, "application/json", bytes.NewBuffer(payload))
if err != nil {
ctx.Logger.Errorf("Error sending Signal message: %v", err)
return
}

if resp.StatusCode != http.StatusCreated {
ctx.Logger.Errorf("Failed to send Signal message. Non-201 status code: %d, URL: %q", resp.StatusCode, endpoint)
}
}

func (m *Signal) buildMessage(ctx *core.Context) *signalMessage {
msg := &signalMessage{
Number: m.SignalNumber,
Recipients: m.SignalRecipients,
}

var errorDetails string
if ctx.Execution.Failed && ctx.Execution.Error != nil {
errorDetails = fmt.Sprintf(" Error: %s", ctx.Execution.Error.Error())
}

msg.Message = fmt.Sprintf(
"[%s] Job *%q* finished in *%s*, command `%s`.%s",
getExecutionStatus(ctx.Execution),
ctx.Job.GetName(),
ctx.Execution.Duration,
ctx.Job.GetCommand(),
errorDetails,
)

return msg
}

func getExecutionStatus(e *core.Execution) string {
switch {
case e.Failed:
return "FAILED"
case e.Skipped:
return "SKIPPED"
default:
return "SUCCESS"
}
}

type signalMessage struct {
Message string `json:"message"`
Number string `json:"number"`
Recipients []string `json:"recipients"`
}
103 changes: 103 additions & 0 deletions middlewares/signal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package middlewares

import (
"encoding/json"
"errors"
"io"
"net/http"
"net/http/httptest"
"strings"

. "gopkg.in/check.v1"
)

type SuiteSignal struct {
BaseSuite
}

var _ = Suite(&SuiteSignal{})

func (s *SuiteSignal) TestNewSignalEmpty(c *C) {
c.Assert(NewSignal(&SignalConfig{}), IsNil)
}

func (s *SuiteSignal) TestRunSuccess(c *C) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

body, err := io.ReadAll(r.Body)
c.Assert(err, IsNil)

var m signalMessage
err = json.Unmarshal(body, &m)
c.Assert(err, IsNil)
c.Assert(strings.Contains(m.Message, "[SUCCESS]"), Equals, true)
}))
defer ts.Close()

s.ctx.Start()
s.ctx.Stop(nil)

m := NewSignal(&SignalConfig{SignalURL: ts.URL})
c.Assert(m.Run(s.ctx), IsNil)
}

func (s *SuiteSignal) TestRunSuccessFailed(c *C) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

body, err := io.ReadAll(r.Body)
c.Assert(err, IsNil)

var m signalMessage
err = json.Unmarshal(body, &m)
c.Assert(err, IsNil)
c.Assert(strings.Contains(m.Message, "[FAILED]"), Equals, true)
}))
defer ts.Close()

s.ctx.Start()
s.ctx.Stop(errors.New("foo"))

m := NewSignal(&SignalConfig{SignalURL: ts.URL})
c.Assert(m.Run(s.ctx), IsNil)
}

func (s *SuiteSignal) TestRunSuccessOnError(c *C) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
c.Assert(true, Equals, false)
}))

defer ts.Close()

s.ctx.Start()
s.ctx.Stop(nil)

m := NewSignal(&SignalConfig{SignalURL: ts.URL, SignalOnlyOnError: true})
c.Assert(m.Run(s.ctx), IsNil)
}

func (s *SuiteSignal) TestConfig(c *C) {
number := "223344"
recipients := []string{"reciepient1", "r2"}
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()

body, err := io.ReadAll(r.Body)
c.Assert(err, IsNil)

var m signalMessage
err = json.Unmarshal(body, &m)
c.Assert(err, IsNil)
c.Assert(m.Number, Equals, number)
c.Assert(m.Recipients, DeepEquals, recipients)
}))

defer ts.Close()

s.ctx.Start()
s.ctx.Stop(nil)

m := NewSignal(&SignalConfig{SignalURL: ts.URL, SignalNumber: number, SignalRecipients: recipients})
c.Assert(m.Run(s.ctx), IsNil)
}