Skip to content

Commit b394fe6

Browse files
committed
internal/osbuildexecutor: forward progress to composer
Uses status scanner to monitor osbuild to generate progress updates. Progress updates are sent every 30 seconds. Only one level of subprogress needs to be supported. Because osbuild manifests consist of pipelines and stages, more than 2 levels of progress are not expected.
1 parent 8846450 commit b394fe6

File tree

5 files changed

+270
-37
lines changed

5 files changed

+270
-37
lines changed

cmd/osbuild-worker/jobimpl-osbuild.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,7 @@ func (impl *OSBuildJobImpl) Run(job worker.Job) error {
527527
Stderr: os.Stderr,
528528
JSONOutput: true,
529529
}
530-
osbuildJobResult.OSBuildOutput, err = executor.RunOSBuild(jobArgs.Manifest, opts)
530+
osbuildJobResult.OSBuildOutput, err = executor.RunOSBuild(jobArgs.Manifest, logWithId, job, opts)
531531
// First handle the case when "running" osbuild failed
532532
if err != nil {
533533
osbuildJobResult.JobError = clienterrors.New(clienterrors.ErrorBuildJob, "osbuild build failed", err.Error())
Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,18 @@
11
package osbuildexecutor
22

33
import (
4+
"time"
5+
6+
"github.com/sirupsen/logrus"
7+
48
"github.com/osbuild/images/pkg/osbuild"
9+
"github.com/osbuild/osbuild-composer/internal/worker"
10+
)
11+
12+
const (
13+
MinTimeBetweenUpdates = time.Second * 30
514
)
615

716
type Executor interface {
8-
RunOSBuild(manifest []byte, opts *osbuild.OSBuildOptions) (*osbuild.Result, error)
17+
RunOSBuild(manifest []byte, logger logrus.FieldLogger, job worker.Job, opts *osbuild.OSBuildOptions) (*osbuild.Result, error)
918
}

internal/osbuildexecutor/runner-impl-aws-ec2.go

Lines changed: 71 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,16 @@ import (
1111
"os/exec"
1212
"path/filepath"
1313
"strings"
14+
"sync"
1415
"time"
1516

17+
"github.com/sirupsen/logrus"
1618
"golang.org/x/exp/slices"
1719

1820
"github.com/osbuild/images/pkg/osbuild"
19-
"github.com/sirupsen/logrus"
2021

2122
"github.com/osbuild/osbuild-composer/internal/cloud/awscloud"
23+
"github.com/osbuild/osbuild-composer/internal/worker"
2224
)
2325

2426
type awsEC2Executor struct {
@@ -28,9 +30,9 @@ type awsEC2Executor struct {
2830
tmpDir string
2931
}
3032

31-
func prepareSources(manifest []byte, opts *osbuild.OSBuildOptions) error {
33+
func prepareSources(manifest []byte, logger logrus.FieldLogger, opts *osbuild.OSBuildOptions) error {
3234
hostExecutor := NewHostExecutor()
33-
_, err := hostExecutor.RunOSBuild(manifest, &osbuild.OSBuildOptions{
35+
_, err := hostExecutor.RunOSBuild(manifest, logger, nil, &osbuild.OSBuildOptions{
3436
StoreDir: opts.StoreDir,
3537
ExtraEnv: opts.ExtraEnv,
3638
Stderr: opts.Stderr,
@@ -123,42 +125,89 @@ func writeInputArchive(cacheDir, store string, exports []string, manifestData []
123125
return archive, nil
124126
}
125127

126-
func handleBuild(inputArchive, host string) (*osbuild.Result, error) {
128+
func handleBuild(inputArchive, host string, logger logrus.FieldLogger, job worker.Job) (*osbuild.Result, error) {
127129
client := http.Client{
128130
Timeout: time.Minute * 60,
129131
}
130132
inputFile, err := os.Open(inputArchive)
131133
if err != nil {
132-
return nil, fmt.Errorf("Unable to open inputArchive (%s): %w", inputArchive, err)
134+
return nil, fmt.Errorf("unable to open inputArchive (%s): %w", inputArchive, err)
133135
}
134136
defer inputFile.Close()
135137

136138
resp, err := client.Post(fmt.Sprintf("%s/api/v1/build", host), "application/x-tar", inputFile)
137139
if err != nil {
138-
return nil, fmt.Errorf("Unable to request build from executor instance: %w", err)
140+
return nil, fmt.Errorf("unable to request build from executor instance: %w", err)
139141
}
140142
defer resp.Body.Close()
141143
if resp.StatusCode != 201 {
142144
body, err := io.ReadAll(resp.Body)
143145
if err != nil {
144-
return nil, fmt.Errorf("Unable to read body waiting for build to run: %w, http status: %d", err, resp.StatusCode)
146+
return nil, fmt.Errorf("unable to read body waiting for build to run: %w, http status: %d", err, resp.StatusCode)
145147
}
146-
return nil, fmt.Errorf("Something went wrong during executor build: http status: %v, %d, %s", err, resp.StatusCode, body)
148+
return nil, fmt.Errorf("something went wrong during executor build: http status: %v, %d, %s", err, resp.StatusCode, body)
147149
}
148150

149-
var osbuildResult osbuild.Result
150-
151-
body, err := io.ReadAll(resp.Body)
152-
if err != nil {
153-
return nil, fmt.Errorf("Unable to read response body: %w", err)
154-
}
151+
osbuildStatus := osbuild.NewStatusScanner(resp.Body)
152+
var lastUpdated time.Time
153+
var wg sync.WaitGroup
154+
for {
155+
st, err := osbuildStatus.Status()
156+
if err != nil {
157+
return nil, fmt.Errorf(`error parsing osbuild status, please report a bug: %w`, err)
158+
}
159+
if st == nil {
160+
break
161+
}
155162

156-
err = json.Unmarshal(body, &osbuildResult)
157-
if err != nil {
158-
return nil, fmt.Errorf("Unable to decode response body %q into osbuild result: %w", body, err)
163+
var progress string
164+
if st.Progress != nil {
165+
var subProgress string
166+
if st.Progress.SubProgress != nil {
167+
subProgress = fmt.Sprintf(": %d/%d", st.Progress.SubProgress.Done, st.Progress.SubProgress.Total)
168+
}
169+
progress = fmt.Sprintf(" (step %d/%d%s)", st.Progress.Done, st.Progress.Total, subProgress)
170+
}
171+
if st.Message != "" {
172+
logger.Infof("OSBuild status: %s%s", st.Message, progress)
173+
if job != nil && time.Since(lastUpdated) > MinTimeBetweenUpdates {
174+
lastUpdated = time.Now()
175+
wg.Add(1)
176+
go func() {
177+
defer wg.Done()
178+
partial := worker.JobResult{
179+
Progress: &worker.JobProgress{
180+
Message: st.Message,
181+
},
182+
}
183+
if st.Progress != nil {
184+
partial.Progress.Done = st.Progress.Done
185+
partial.Progress.Total = st.Progress.Total
186+
}
187+
// more than 1 level of subprogress is not expected, just
188+
// pipelines and stages.
189+
if st.Progress.SubProgress != nil {
190+
partial.Progress.SubProgress = &worker.JobProgress{
191+
Message: st.Progress.SubProgress.Message,
192+
Done: st.Progress.SubProgress.Done,
193+
Total: st.Progress.SubProgress.Total,
194+
}
195+
}
196+
err := job.Update(partial)
197+
if err != nil {
198+
logger.Errorf("Unable to update job: %s", err.Error())
199+
}
200+
}()
201+
}
202+
}
203+
if st.Trace != "" {
204+
logger.Errorf("%s", st.Trace)
205+
}
159206
}
160207

161-
return &osbuildResult, nil
208+
// avoid a partial job updates after this function has returned
209+
wg.Wait()
210+
return osbuildStatus.Result()
162211
}
163212

164213
func fetchOutputArchive(cacheDir, host string) (string, error) {
@@ -248,8 +297,8 @@ func extractOutputArchive(outputDirectory, outputTar string) error {
248297

249298
}
250299

251-
func (ec2e *awsEC2Executor) RunOSBuild(manifest []byte, opts *osbuild.OSBuildOptions) (*osbuild.Result, error) {
252-
err := prepareSources(manifest, opts)
300+
func (ec2e *awsEC2Executor) RunOSBuild(manifest []byte, logger logrus.FieldLogger, job worker.Job, opts *osbuild.OSBuildOptions) (*osbuild.Result, error) {
301+
err := prepareSources(manifest, logger, opts)
253302
if err != nil {
254303
return nil, fmt.Errorf("Failed to prepare sources: %w", err)
255304
}
@@ -289,9 +338,9 @@ func (ec2e *awsEC2Executor) RunOSBuild(manifest []byte, opts *osbuild.OSBuildOpt
289338
return nil, err
290339
}
291340

292-
osbuildResult, err := handleBuild(inputArchive, executorHost)
341+
osbuildResult, err := handleBuild(inputArchive, executorHost, logger, job)
293342
if err != nil {
294-
logrus.Errorf("Something went wrong handling the executor's build: %v", err)
343+
logrus.Errorf("something went wrong handling the executor's build: %v", err)
295344
return nil, err
296345
}
297346
if !osbuildResult.Success {

internal/osbuildexecutor/runner-impl-aws-ec2_test.go

Lines changed: 103 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package osbuildexecutor_test
33
import (
44
"archive/tar"
55
"context"
6-
"encoding/json"
76
"io"
87
"net/http"
98
"net/http/httptest"
@@ -14,13 +13,21 @@ import (
1413
"testing"
1514
"time"
1615

17-
"github.com/osbuild/images/pkg/osbuild"
16+
"github.com/google/uuid"
17+
"github.com/sirupsen/logrus"
18+
"github.com/sirupsen/logrus/hooks/test"
1819
"github.com/stretchr/testify/assert"
1920
"github.com/stretchr/testify/require"
2021

2122
"github.com/osbuild/osbuild-composer/internal/osbuildexecutor"
23+
"github.com/osbuild/osbuild-composer/internal/worker"
2224
)
2325

26+
func makeMockEntry() (*logrus.Entry, *test.Hook) {
27+
logger, hook := test.NewNullLogger()
28+
return logger.WithField("test", "test"), hook
29+
}
30+
2431
func TestWaitForSI(t *testing.T) {
2532
server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
2633
w.WriteHeader(http.StatusOK)
@@ -66,29 +73,113 @@ func TestWriteInputArchive(t *testing.T) {
6673
}, strings.Split(string(out), "\n"))
6774
}
6875

76+
type testJob struct {
77+
PartialUpdates []interface{}
78+
}
79+
80+
func (j *testJob) Id() uuid.UUID {
81+
return uuid.Nil
82+
}
83+
84+
func (j *testJob) Type() string {
85+
return "test-job"
86+
}
87+
88+
func (j *testJob) Args(args interface{}) error {
89+
return nil
90+
}
91+
92+
func (j *testJob) DynamicArgs(i int, args interface{}) error {
93+
return nil
94+
}
95+
96+
func (j *testJob) NDynamicArgs() int {
97+
return 0
98+
}
99+
100+
func (j *testJob) Update(result interface{}) error {
101+
j.PartialUpdates = append(j.PartialUpdates, result)
102+
return nil
103+
}
104+
105+
func (j *testJob) Finish(result interface{}) error {
106+
return nil
107+
}
108+
109+
func (j *testJob) Canceled() (bool, error) {
110+
return false, nil
111+
}
112+
113+
func (j *testJob) UploadArtifact(name string, readSeeker io.ReadSeeker) error {
114+
return nil
115+
}
116+
69117
func TestHandleBuild(t *testing.T) {
70118
buildServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
71119
input, err := io.ReadAll(r.Body)
72120
require.NoError(t, err)
73121
require.Equal(t, []byte("test"), input)
74122

75123
w.WriteHeader(http.StatusCreated)
76-
osbuildResult := osbuild.Result{
77-
Success: true,
78-
}
79-
data, err := json.Marshal(osbuildResult)
80-
require.NoError(t, err)
81-
_, err = w.Write(data)
124+
_, err = w.Write([]byte(`{"message": "starting pipeline", "context": {"origin": "osbuild.monitor", "pipeline": {"name": "source", "id": "pipeline-id", "stage": {}}, "id": "context-id"}, "progress": {"name": "pipelines/sources", "total": 1, "done": 0}}
125+
{"message": "stage in pipeline", "context": {"id": "context-id"}, "progress": {"name": "source", "total": 1, "done": 0, "progress": {"name": "source/stage", "total": 2, "done": 0}}}
126+
{"message": "finishing pipeline", "result": {"name": "source", "id": "pipeline-id", "success": true}, "context": {"id": "context-id"}, "progress": {"name": "source", "total": 1, "done": 1}}`))
82127
require.NoError(t, err)
83128
}))
84129

85130
cacheDir := t.TempDir()
86131
inputArchive := filepath.Join(cacheDir, "test.tar")
87132
require.NoError(t, os.WriteFile(inputArchive, []byte("test"), 0600))
88133

89-
osbuildResult, err := osbuildexecutor.HandleBuild(inputArchive, buildServer.URL)
134+
entry, hook := makeMockEntry()
135+
job := testJob{}
136+
osbuildResult, err := osbuildexecutor.HandleBuild(inputArchive, buildServer.URL, entry, &job)
90137
require.NoError(t, err)
91138
require.True(t, osbuildResult.Success)
139+
require.Len(t, hook.Entries, 3)
140+
require.Equal(t, "OSBuild status: starting pipeline (step 0/1)", hook.Entries[0].Message)
141+
require.Equal(t, "OSBuild status: stage in pipeline (step 0/1: 0/2)", hook.Entries[1].Message)
142+
require.Equal(t, "OSBuild status: finishing pipeline (step 1/1)", hook.Entries[2].Message)
143+
144+
// only 1 update every 30 seconds, so in a testing scenario only the first status should have been received
145+
require.Eventually(t, func() bool { return len(job.PartialUpdates) == 1 }, time.Second, time.Millisecond*10)
146+
partial, ok := job.PartialUpdates[0].(worker.JobResult)
147+
require.True(t, ok)
148+
require.Equal(t, worker.JobResult{
149+
Progress: &worker.JobProgress{
150+
Done: 0,
151+
Total: 1,
152+
Message: "starting pipeline",
153+
},
154+
}, partial)
155+
require.Len(t, job.PartialUpdates, 1)
156+
}
157+
158+
func TestHandleBuildTrace(t *testing.T) {
159+
buildServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
160+
input, err := io.ReadAll(r.Body)
161+
require.NoError(t, err)
162+
require.Equal(t, []byte("test"), input)
163+
164+
w.WriteHeader(http.StatusCreated)
165+
_, err = w.Write([]byte(`{"message": "starting pipeline", "context": {"origin": "osbuild.monitor", "pipeline": {"name": "source", "id": "pipeline-id", "stage": {}}, "id": "context-id"}, "progress": {"name": "pipelines/sources", "total": 1, "done": 0}}
166+
{"message": "no context, thus trace"}
167+
{"message": "failed pipeline", "result": {"name": "source", "id": "pipeline-id", "success": false}, "context": {"id": "context-id"}, "progress": {"name": "source", "total": 1, "done": 1}}`))
168+
require.NoError(t, err)
169+
}))
170+
171+
cacheDir := t.TempDir()
172+
inputArchive := filepath.Join(cacheDir, "test.tar")
173+
require.NoError(t, os.WriteFile(inputArchive, []byte("test"), 0600))
174+
175+
entry, hook := makeMockEntry()
176+
osbuildResult, err := osbuildexecutor.HandleBuild(inputArchive, buildServer.URL, entry, nil)
177+
require.NoError(t, err)
178+
require.False(t, osbuildResult.Success)
179+
require.Len(t, hook.Entries, 3)
180+
require.Equal(t, "OSBuild status: starting pipeline (step 0/1)", hook.Entries[0].Message)
181+
require.Equal(t, "no context, thus trace", hook.Entries[1].Message)
182+
require.Equal(t, "OSBuild status: failed pipeline (step 1/1)", hook.Entries[2].Message)
92183
}
93184

94185
func TestHandleBuildNoJSON(t *testing.T) {
@@ -105,8 +196,9 @@ func TestHandleBuildNoJSON(t *testing.T) {
105196
inputArchive := filepath.Join(cacheDir, "test.tar")
106197
require.NoError(t, os.WriteFile(inputArchive, []byte("test"), 0600))
107198

108-
_, err := osbuildexecutor.HandleBuild(inputArchive, buildServer.URL)
109-
require.ErrorContains(t, err, `Unable to decode response body "bad non-json text" into osbuild result:`)
199+
entry, _ := makeMockEntry()
200+
_, err := osbuildexecutor.HandleBuild(inputArchive, buildServer.URL, entry, nil)
201+
require.ErrorContains(t, err, `error parsing osbuild status, please report a bug: cannot scan line "bad non-json text": invalid character 'b' looking for beginning of value`)
110202
}
111203

112204
func TestHandleOutputArchive(t *testing.T) {

0 commit comments

Comments
 (0)