File tree Expand file tree Collapse file tree 1 file changed +14
-6
lines changed Expand file tree Collapse file tree 1 file changed +14
-6
lines changed Original file line number Diff line number Diff line change 11package demux
22
3+ import "sync"
4+
35// Dynamic creates dynamic demultiplexer that routes items from 'in' based on keys returned by 'keyFunc'.
46// For each unique key, a new goroutine is spawned running 'consumeFunc'.
57// Each consumeFunc receives a channel that delivers values matching its key.
@@ -9,11 +11,7 @@ func Dynamic[T any, K comparable](
911 consumeFunc func (K , <- chan T ),
1012) {
1113 outChans := make (map [K ]chan T )
12- defer func () {
13- for _ , ch := range outChans {
14- close (ch )
15- }
16- }()
14+ var wg sync.WaitGroup
1715
1816 for t := range in {
1917 key := keyFunc (t )
@@ -22,8 +20,18 @@ func Dynamic[T any, K comparable](
2220 ch = make (chan T )
2321 outChans [key ] = ch
2422
25- go consumeFunc (key , ch )
23+ wg .Add (1 )
24+ go func () {
25+ defer wg .Done ()
26+ consumeFunc (key , ch )
27+ }()
2628 }
2729 ch <- t
2830 }
31+
32+ for _ , ch := range outChans {
33+ close (ch )
34+ }
35+
36+ wg .Wait ()
2937}
You can’t perform that action at this time.
0 commit comments