Skip to content

Commit a418614

Browse files
parallelisation Add support for execution groups with priorities (#717)
<!-- Copyright (C) 2020-2022 Arm Limited or its affiliates and Contributors. All rights reserved. SPDX-License-Identifier: Apache-2.0 --> ### Description <!-- Please add any detail or context that would be useful to a reviewer. --> Add support for execution groups with priorities. ### Test Coverage <!-- Please put an `x` in the correct box e.g. `[x]` to indicate the testing coverage of this change. --> - [x] This change is covered by existing or additional automated tests. - [ ] Manual testing has been performed (and evidence provided) as automated testing was not feasible. - [ ] Additional tests are not required for this change (e.g. documentation update).
1 parent 4fd2a6f commit a418614

File tree

4 files changed

+516
-0
lines changed

4 files changed

+516
-0
lines changed

changes/20251003121340.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
:sparkles: `parallelisation` Add support for execution groups with priorities

utils/parallelisation/contextual.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,18 @@ func NewContextualGroup(options ...StoreOption) *ContextualFunctionGroup {
2626
}
2727
}
2828

29+
// NewContextualGroupWithPriority returns a group executing contextual functions that will be run in priority order.
30+
func NewPriorityContextualGroup(options ...StoreOption) *PriorityExecutionGroup[ContextualFunc] {
31+
return newPriorityExecutionGroup[ContextualFunc](
32+
func(options ...StoreOption) IExecutionGroup[ContextualFunc] {
33+
return NewExecutionGroup[ContextualFunc](func(ctx context.Context, f ContextualFunc) error {
34+
return f(ctx)
35+
}, options...)
36+
},
37+
options...,
38+
)
39+
}
40+
2941
// ForEach executes all the contextual functions according to the store options and returns an error if one occurred.
3042
func ForEach(ctx context.Context, executionOptions *StoreOptions, contextualFunc ...ContextualFunc) error {
3143
group := NewContextualGroup(ExecuteAll(executionOptions).Options()...)
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package parallelisation
2+
3+
import (
4+
"context"
5+
"maps"
6+
"slices"
7+
8+
"github.com/sasha-s/go-deadlock"
9+
10+
"github.com/ARM-software/golang-utils/utils/commonerrors"
11+
)
12+
13+
var _ IExecutionGroup[IExecutor] = &PriorityExecutionGroup[IExecutor]{}
14+
15+
type PriorityExecutionGroup[T any] struct {
16+
mu deadlock.RWMutex
17+
groups map[uint]IExecutionGroup[T]
18+
options []StoreOption
19+
newGroup func(...StoreOption) IExecutionGroup[T]
20+
}
21+
22+
func newPriorityExecutionGroup[T any](newGroup func(...StoreOption) IExecutionGroup[T], options ...StoreOption) *PriorityExecutionGroup[T] {
23+
return &PriorityExecutionGroup[T]{
24+
mu: deadlock.RWMutex{},
25+
groups: make(map[uint]IExecutionGroup[T]),
26+
options: options,
27+
newGroup: newGroup,
28+
}
29+
}
30+
31+
// NewPriorityExecutionGroup returns an execution group that can execute functions in order according to priority rules.
32+
// Parallel commands with differing priorities will be executed in groups according to their priority.
33+
// Sequential commands will be executed in order of their priority, no guarantees are made about the order of when
34+
// the priority is the same as another command.
35+
func NewPriorityExecutionGroup(options ...StoreOption) *PriorityExecutionGroup[IExecutor] {
36+
return newPriorityExecutionGroup[IExecutor](
37+
func(options ...StoreOption) IExecutionGroup[IExecutor] {
38+
return NewExecutionGroup[IExecutor](func(ctx context.Context, e IExecutor) error {
39+
return e.Execute(ctx)
40+
}, options...)
41+
},
42+
options...,
43+
)
44+
}
45+
46+
func (g *PriorityExecutionGroup[T]) check() {
47+
g.mu.Lock()
48+
defer g.mu.Unlock()
49+
50+
if g.groups == nil {
51+
g.groups = make(map[uint]IExecutionGroup[T])
52+
}
53+
if g.options == nil {
54+
g.options = DefaultOptions().Options()
55+
}
56+
if g.newGroup == nil {
57+
g.newGroup = func(options ...StoreOption) IExecutionGroup[T] {
58+
// since none of the methods return errors directly we inject executors that will force the error and reveal it to the consumer
59+
return NewExecutionGroup[T](func(context.Context, T) error {
60+
return commonerrors.UndefinedVariableWithMessage("g.newGroup", "priority execution group has not been initialised correctly")
61+
})
62+
}
63+
}
64+
}
65+
66+
// RegisterExecutor registers executors with a specific priority (lower values indidcate higher priority)
67+
func (g *PriorityExecutionGroup[T]) RegisterFunctionWithPriority(priority uint, function ...T) {
68+
g.check()
69+
70+
g.mu.Lock()
71+
defer g.mu.Unlock()
72+
73+
if g.groups[priority] == nil {
74+
g.groups[priority] = g.newGroup(g.options...)
75+
}
76+
g.groups[priority].RegisterFunction(function...)
77+
}
78+
79+
// RegisterExecutor registers executors with a priority of zero (highest priority)
80+
func (g *PriorityExecutionGroup[T]) RegisterFunction(function ...T) {
81+
g.RegisterFunctionWithPriority(0, function...)
82+
}
83+
84+
func (g *PriorityExecutionGroup[T]) Len() (n int) {
85+
g.mu.RLock()
86+
defer g.mu.RUnlock()
87+
88+
for _, group := range g.groups {
89+
n += group.Len()
90+
}
91+
return
92+
}
93+
94+
func (g *PriorityExecutionGroup[T]) executors() (executor *CompoundExecutionGroup) {
95+
g.mu.RLock()
96+
defer g.mu.RUnlock()
97+
98+
executor = NewCompoundExecutionGroup(DefaultOptions().MergeWithOptions(Sequential).Options()...)
99+
for _, key := range slices.Sorted(maps.Keys(g.groups)) {
100+
executor.RegisterExecutor(g.groups[key])
101+
}
102+
return
103+
}
104+
105+
// Execute will execute all the groups according to the priorities of the functions
106+
func (g *PriorityExecutionGroup[T]) Execute(ctx context.Context) error {
107+
return g.executors().Execute(ctx)
108+
}

0 commit comments

Comments
 (0)