Skip to content

Commit 7451a30

Browse files
authored
Merge pull request #17 from ncode/juliano/updates
updates
2 parents 054c8e3 + 802cb67 commit 7451a30

File tree

7 files changed

+123
-24
lines changed

7 files changed

+123
-24
lines changed

README.md

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,12 @@ Consul based leader election with tagging support and hooks
1111

1212
Consul lacks a built-in feature for leader election among registered services. This tool is designed to fill that gap. It functions by designating a leader among multiple services, marking the chosen leader with a specified tag. Additionally, it allows for the execution of a script whenever a leader election occurs.
1313

14-
### How do I test it?
14+
### How does it work?
15+
16+
Ballot uses Consul's session API to create a session for each service. The session is then used to create a lock on a key. The service that successfully creates the lock is elected as the leader. The leader is then tagged with a specified tag. The leader election is monitored and the leader is updated if the current leader is no longer healthy.
17+
More info about the sessions here [https://developer.hashicorp.com/consul/tutorials/developer-configuration/application-leader-elections](https://developer.hashicorp.com/consul/tutorials/developer-configuration/application-leader-elections).
18+
19+
### How do I use it?
1520

1621
1. Install Ballot
1722
```bash
@@ -53,10 +58,33 @@ $PORT # Port of the service
5358
$SESSIONID # Current SessionID of the elected master
5459
```
5560

61+
### Configuration
62+
63+
The configuration file is a yaml file with the following structure:
64+
65+
```yaml
66+
consul:
67+
token: # Consul token
68+
election:
69+
enabled:
70+
- my-service-name # Name of the service enabled for election
71+
services:
72+
my-service-name: # Name of the service
73+
id: my-service-name # ID of the service
74+
key: my-service-name # Key to be used for the lock in Consul, this should be the same across all nodes
75+
token: # Token to be used for the session in Consul
76+
serviceChecks: # List of checks to be used to determine the health of the service
77+
- ping # Name of the check
78+
primaryTag: primary # Tag to be used to mark the leader
79+
execOnPromote: '/bin/echo primary' # Command to be executed when the service is elected as leader
80+
execOnDemote: '/bin/echo secondary' # Command to be executed when the service is demoted as leader
81+
ttl: 10s # TTL for the session
82+
lockDelay: 5s # Lock delay for the session
83+
```
84+
5685
### TODO:
5786
58-
- Write tests
87+
- Write more tests
5988
- Add more examples
60-
- Re-enable the hooks on state change
6189
- Allow to pre-define the preferred leader
6290
- Update the docks with the lock delays and timeouts

examples/config/ballot1.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ election:
1111
serviceChecks:
1212
- service:election1
1313
primaryTag: primary
14-
execOnPromote: '/usr/bin/say I am da boss'
15-
execOnDemote: '/usr/bin/say I am no longer da boss'
14+
execOnPromote: '/usr/bin/say primary'
15+
execOnDemote: '/usr/bin/say secondary'

examples/config/ballot2.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ election:
1111
serviceChecks:
1212
- service:election2
1313
primaryTag: primary
14-
execOnPromote: '/usr/bin/say I am da boss'
15-
execOnDemote: '/usr/bin/say I am no longer da boss'
14+
execOnPromote: '/usr/bin/say primary'
15+
execOnDemote: '/usr/bin/say secondary'

examples/consul/my-service1.json

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,5 @@
1111
"Interval": "10s",
1212
"Timeout": "5s"
1313
},
14-
"EnableTagOverride": true,
15-
"Weights": {
16-
"Passing": 10,
17-
"Warning": 1
18-
}
14+
"EnableTagOverride": true
1915
}

examples/consul/my-service2.json

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,5 @@
1111
"Interval": "10s",
1212
"Timeout": "5s"
1313
},
14-
"EnableTagOverride": true,
15-
"Weights": {
16-
"Passing": 10,
17-
"Warning": 1
18-
}
14+
"EnableTagOverride": true
1915
}

internal/ballot/ballot.go

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,12 @@ type ElectionPayload struct {
3333
SessionID string
3434
}
3535

36+
type commandExecutor struct{}
37+
38+
func (c *commandExecutor) Command(name string, arg ...string) *exec.Cmd {
39+
return exec.Command(name, arg...)
40+
}
41+
3642
// New returns a new Ballot instance.
3743
func New(ctx context.Context, name string) (b *Ballot, err error) {
3844
if ctx == nil {
@@ -55,6 +61,7 @@ func New(ctx context.Context, name string) (b *Ballot, err error) {
5561
b.leader.Store(false)
5662
b.Token = consulConfig.Token
5763
b.ctx = ctx
64+
b.executor = &commandExecutor{}
5865

5966
b.Name = name
6067
if b.LockDelay == 0 {
@@ -84,7 +91,7 @@ type Ballot struct {
8491
leader atomic.Bool `mapstructure:"-"`
8592
client ConsulClient `mapstructure:"-"`
8693
ctx context.Context `mapstructure:"-"`
87-
exec CommandExecutor `mapstructure:"-"`
94+
executor CommandExecutor `mapstructure:"-"`
8895
}
8996

9097
// Copy *api.AgentService to *api.AgentServiceRegistration
@@ -154,15 +161,15 @@ func (b *Ballot) runCommand(command string, electionPayload *ElectionPayload) ([
154161
if err != nil {
155162
return nil, err
156163
}
157-
cmd := b.exec.Command(args[0], args[1:]...)
164+
cmd := b.executor.Command(args[0], args[1:]...)
158165
cmd.Env = append(cmd.Env, fmt.Sprintf("ADDRESS=%s", electionPayload.Address))
159166
cmd.Env = append(cmd.Env, fmt.Sprintf("PORT=%d", electionPayload.Port))
160167
cmd.Env = append(cmd.Env, fmt.Sprintf("SESSIONID=%s", electionPayload.SessionID))
161168
return cmd.Output()
162169
}
163170

164171
// updateServiceTags updates the service tags.
165-
func (b *Ballot) updateServiceTags() error {
172+
func (b *Ballot) updateServiceTags(isLeader bool) error {
166173
service, _, err := b.getService()
167174
if err != nil {
168175
return err
@@ -175,10 +182,10 @@ func (b *Ballot) updateServiceTags() error {
175182
hasPrimaryTag := slices.Contains(registration.Tags, b.PrimaryTag)
176183

177184
// Update tags based on leadership status
178-
if b.IsLeader() && !hasPrimaryTag {
185+
if isLeader && !hasPrimaryTag {
179186
// Add primary tag if not present and this node is the leader
180187
registration.Tags = append(registration.Tags, b.PrimaryTag)
181-
} else if !b.IsLeader() && hasPrimaryTag {
188+
} else if !isLeader && hasPrimaryTag {
182189
// Remove primary tag if present and this node is not the leader
183190
index := slices.Index(registration.Tags, b.PrimaryTag)
184191
registration.Tags = append(registration.Tags[:index], registration.Tags[index+1:]...)
@@ -187,6 +194,35 @@ func (b *Ballot) updateServiceTags() error {
187194
return nil
188195
}
189196

197+
// Run the command associated with the new leadership status
198+
var command string
199+
if isLeader {
200+
command = b.ExecOnPromote
201+
} else {
202+
command = b.ExecOnDemote
203+
}
204+
if command != "" && b.executor != nil {
205+
go func(isLeader bool, command string) {
206+
// Run the command in a separate goroutine
207+
ctx, cancel := context.WithTimeout(b.ctx, (b.TTL+b.LockDelay)*2)
208+
defer cancel()
209+
payload, err := b.waitForNextValidSessionData(ctx)
210+
output, err := b.runCommand(command, payload)
211+
if err != nil {
212+
log.WithFields(log.Fields{
213+
"caller": "updateLeadershipStatus",
214+
"isLeader": isLeader,
215+
"error": err,
216+
}).Error("failed to run command")
217+
}
218+
log.WithFields(log.Fields{
219+
"caller": "updateLeadershipStatus",
220+
"isLeader": isLeader,
221+
"output": string(output),
222+
}).Info("ran command")
223+
}(isLeader, command)
224+
}
225+
190226
// Log the updated tags
191227
log.WithFields(log.Fields{
192228
"caller": "updateServiceTags",
@@ -345,7 +381,7 @@ func (b *Ballot) updateLeadershipStatus(isLeader bool) error {
345381
b.leader.Store(isLeader)
346382

347383
// Update service tags based on leadership status
348-
err := b.updateServiceTags()
384+
err := b.updateServiceTags(isLeader)
349385
if err != nil {
350386
return err
351387
}
@@ -447,6 +483,25 @@ func (b *Ballot) IsLeader() bool {
447483
return b.leader.Load() && b.sessionID.Load() != nil
448484
}
449485

486+
func (b *Ballot) waitForNextValidSessionData(ctx context.Context) (data *ElectionPayload, err error) {
487+
ticker := time.NewTicker(1 * time.Second)
488+
defer ticker.Stop()
489+
for {
490+
select {
491+
case <-ticker.C:
492+
data, err := b.getSessionData()
493+
if err != nil {
494+
return data, err
495+
}
496+
if data != nil {
497+
return data, nil
498+
}
499+
case <-ctx.Done():
500+
return data, ctx.Err()
501+
}
502+
}
503+
}
504+
450505
func (b *Ballot) getSessionData() (data *ElectionPayload, err error) {
451506
sessionKey, _, err := b.client.KV().Get(b.Key, nil)
452507
if err != nil {

internal/ballot/ballot_test.go

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ func TestRunCommand(t *testing.T) {
103103

104104
// Create a Ballot instance with the mock executor
105105
b := &Ballot{
106-
exec: mockExecutor,
106+
executor: mockExecutor,
107107
}
108108

109109
// Define the command to run
@@ -178,3 +178,27 @@ func TestIsLeader(t *testing.T) {
178178
assert.False(t, b.IsLeader())
179179
})
180180
}
181+
182+
type MockConsulClient struct {
183+
mock.Mock
184+
}
185+
186+
func (m *MockConsulClient) Agent() *api.Agent {
187+
args := m.Called()
188+
return args.Get(0).(*api.Agent)
189+
}
190+
191+
func (m *MockConsulClient) Catalog() *api.Catalog {
192+
args := m.Called()
193+
return args.Get(0).(*api.Catalog)
194+
}
195+
196+
func (m *MockConsulClient) KV() *api.KV {
197+
args := m.Called()
198+
return args.Get(0).(*api.KV)
199+
}
200+
201+
func (m *MockConsulClient) Session() *api.Session {
202+
args := m.Called()
203+
return args.Get(0).(*api.Session)
204+
}

0 commit comments

Comments
 (0)