@@ -179,60 +179,7 @@ func (c *Controller[request]) Start(ctx context.Context) error {
179179 // NB(directxman12): launch the sources *before* trying to wait for the
180180 // caches to sync so that they have a chance to register their intended
181181 // caches.
182- errGroup := & errgroup.Group {}
183- for _ , watch := range c .startWatches {
184- log := c .LogConstructor (nil )
185- _ , ok := watch .(interface {
186- String () string
187- })
188-
189- if ! ok {
190- log = log .WithValues ("source" , fmt .Sprintf ("%T" , watch ))
191- } else {
192- log = log .WithValues ("source" , fmt .Sprintf ("%s" , watch ))
193- }
194- didStartSyncingSource := & atomic.Bool {}
195- errGroup .Go (func () error {
196- // Use a timeout for starting and syncing the source to avoid silently
197- // blocking startup indefinitely if it doesn't come up.
198- sourceStartCtx , cancel := context .WithTimeout (ctx , c .CacheSyncTimeout )
199- defer cancel ()
200-
201- sourceStartErrChan := make (chan error , 1 ) // Buffer chan to not leak goroutine if we time out
202- go func () {
203- defer close (sourceStartErrChan )
204- log .Info ("Starting EventSource" )
205- if err := watch .Start (ctx , c .Queue ); err != nil {
206- sourceStartErrChan <- err
207- return
208- }
209- syncingSource , ok := watch .(source.TypedSyncingSource [request ])
210- if ! ok {
211- return
212- }
213- didStartSyncingSource .Store (true )
214- if err := syncingSource .WaitForSync (sourceStartCtx ); err != nil {
215- err := fmt .Errorf ("failed to wait for %s caches to sync %v: %w" , c .Name , syncingSource , err )
216- log .Error (err , "Could not wait for Cache to sync" )
217- sourceStartErrChan <- err
218- }
219- }()
220-
221- select {
222- case err := <- sourceStartErrChan :
223- return err
224- case <- sourceStartCtx .Done ():
225- if didStartSyncingSource .Load () { // We are racing with WaitForSync, wait for it to let it tell us what happened
226- return <- sourceStartErrChan
227- }
228- if ctx .Err () != nil { // Don't return an error if the root context got cancelled
229- return nil
230- }
231- return fmt .Errorf ("timed out waiting for source %s to Start. Please ensure that its Start() method is non-blocking" , watch )
232- }
233- })
234- }
235- if err := errGroup .Wait (); err != nil {
182+ if err := c .startEventSources (ctx ); err != nil {
236183 return err
237184 }
238185
@@ -271,6 +218,65 @@ func (c *Controller[request]) Start(ctx context.Context) error {
271218 return nil
272219}
273220
221+ // startEventSources launches all the sources registered with this controller and waits
222+ // for them to sync. It returns an error if any of the sources fail to start or sync.
223+ func (c * Controller [request ]) startEventSources (ctx context.Context ) error {
224+ errGroup := & errgroup.Group {}
225+ for _ , watch := range c .startWatches {
226+ log := c .LogConstructor (nil )
227+ _ , ok := watch .(interface {
228+ String () string
229+ })
230+
231+ if ! ok {
232+ log = log .WithValues ("source" , fmt .Sprintf ("%T" , watch ))
233+ } else {
234+ log = log .WithValues ("source" , fmt .Sprintf ("%s" , watch ))
235+ }
236+ didStartSyncingSource := & atomic.Bool {}
237+ errGroup .Go (func () error {
238+ // Use a timeout for starting and syncing the source to avoid silently
239+ // blocking startup indefinitely if it doesn't come up.
240+ sourceStartCtx , cancel := context .WithTimeout (ctx , c .CacheSyncTimeout )
241+ defer cancel ()
242+
243+ sourceStartErrChan := make (chan error , 1 ) // Buffer chan to not leak goroutine if we time out
244+ go func () {
245+ defer close (sourceStartErrChan )
246+ log .Info ("Starting EventSource" )
247+ if err := watch .Start (ctx , c .Queue ); err != nil {
248+ sourceStartErrChan <- err
249+ return
250+ }
251+ syncingSource , ok := watch .(source.TypedSyncingSource [request ])
252+ if ! ok {
253+ return
254+ }
255+ didStartSyncingSource .Store (true )
256+ if err := syncingSource .WaitForSync (sourceStartCtx ); err != nil {
257+ err := fmt .Errorf ("failed to wait for %s caches to sync %v: %w" , c .Name , syncingSource , err )
258+ log .Error (err , "Could not wait for Cache to sync" )
259+ sourceStartErrChan <- err
260+ }
261+ }()
262+
263+ select {
264+ case err := <- sourceStartErrChan :
265+ return err
266+ case <- sourceStartCtx .Done ():
267+ if didStartSyncingSource .Load () { // We are racing with WaitForSync, wait for it to let it tell us what happened
268+ return <- sourceStartErrChan
269+ }
270+ if ctx .Err () != nil { // Don't return an error if the root context got cancelled
271+ return nil
272+ }
273+ return fmt .Errorf ("timed out waiting for source %s to Start. Please ensure that its Start() method is non-blocking" , watch )
274+ }
275+ })
276+ }
277+ return errGroup .Wait ()
278+ }
279+
274280// processNextWorkItem will read a single work item off the workqueue and
275281// attempt to process it, by calling the reconcileHandler.
276282func (c * Controller [request ]) processNextWorkItem (ctx context.Context ) bool {
0 commit comments