Skip to content

Commit 4c9bda2

Browse files
authored
Merge add topic api (#292)
* add topic api
1 parent 0c5ece1 commit 4c9bda2

File tree

83 files changed

+10474
-8
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

83 files changed

+10474
-8
lines changed

.github/workflows/check-codegen.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ jobs:
1010
strategy:
1111
matrix:
1212
os: [ ubuntu-latest ]
13-
go: [1.17.x]
13+
go: [1.18.x]
1414
runs-on: ${{ matrix.os }}
1515
steps:
1616
- name: Checkout

.github/workflows/tests.yml

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,47 @@ jobs:
156156
file: ./scripting.txt
157157
flags: scripting,e2e,integration,${{ matrix.os }},${{ matrix.go-version }}
158158
name: scripting
159+
topic:
160+
strategy:
161+
matrix:
162+
go-version: [1.18.x]
163+
os: [ubuntu-latest]
164+
services:
165+
ydb:
166+
image: cr.yandex/yc/yandex-docker-local-ydb:latest
167+
ports:
168+
- 2135:2135
169+
- 2136:2136
170+
- 8765:8765
171+
volumes:
172+
- /tmp/ydb_certs:/ydb_certs
173+
env:
174+
YDB_LOCAL_SURVIVE_RESTART: true
175+
YDB_USE_IN_MEMORY_PDISKS: true
176+
options: '-h localhost'
177+
env:
178+
OS: ${{ matrix.os }}
179+
GO: ${{ matrix.go-version }}
180+
YDB_CONNECTION_STRING: grpcs://localhost:2135/local
181+
YDB_SSL_ROOT_CERTIFICATES_FILE: /tmp/ydb_certs/ca.pem
182+
runs-on: ${{ matrix.os }}
183+
steps:
184+
- name: Install Go
185+
uses: actions/setup-go@v3
186+
with:
187+
go-version: ${{ matrix.go-version }}
188+
- name: Checkout code
189+
uses: actions/checkout@v2
190+
- name: Wait database available
191+
run: bash ./.github/scripts/wait-ydb-container.sh
192+
- name: Test topic client
193+
run: go test -race -coverpkg=./... -coverprofile topic.txt -covermode atomic ./topic/client_e2e_test.go ./topic/reader_e2e_test.go
194+
- name: Upload coverage to Codecov
195+
uses: codecov/codecov-action@v2
196+
with:
197+
file: ./topic.txt
198+
flags: topic,e2e,integration,${{ matrix.os }},${{ matrix.go-version }}
199+
name: topic
159200
discovery:
160201
strategy:
161202
matrix:

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Extended the ydb.Connection interface with experimental db.Topic() client (control plane and reader API)
12
* Removed `ydb.RegisterParser()` function (was needed for `database/sql` driver outside `ydb-go-sdk` repository, necessity of `ydb.RegisterParser()` disappeared with implementation `database/sql` driver in same repository)
23
* Refactored `db.Table().CreateSession(ctx)` (maked retryable with internal create session timeout)
34
* Refactored `internal/table/client.createSession(ctx)` (got rid of unnecessary goroutine)

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
[![Telegram](https://img.shields.io/badge/chat-on%20Telegram-2ba2d9.svg)](https://t.me/YDBPlatform)
1313
[![WebSite](https://img.shields.io/badge/website-ydb.tech-blue.svg)](https://ydb.tech)
1414

15-
Supports `table`, `discovery`, `coordination`, `ratelimiter`, `scheme` and `scripting` clients for `YDB`.
15+
Supports `table`, `discovery`, `coordination`, `ratelimiter`, `scheme`, `scripting` and `topic` clients for `YDB`.
1616
`YDB` is an open-source Distributed SQL Database that combines high availability and scalability with strict consistency and ACID transactions.
1717

1818
## Installation

connection.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ import (
88

99
"google.golang.org/grpc"
1010

11-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
12-
1311
"github.com/ydb-platform/ydb-go-sdk/v3/config"
1412
"github.com/ydb-platform/ydb-go-sdk/v3/coordination"
1513
"github.com/ydb-platform/ydb-go-sdk/v3/discovery"
@@ -26,12 +24,16 @@ import (
2624
scriptingConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/scripting/config"
2725
internalTable "github.com/ydb-platform/ydb-go-sdk/v3/internal/table"
2826
tableConfig "github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config"
27+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/topic/topicclientinternal"
2928
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
29+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
3030
"github.com/ydb-platform/ydb-go-sdk/v3/log"
3131
"github.com/ydb-platform/ydb-go-sdk/v3/ratelimiter"
3232
"github.com/ydb-platform/ydb-go-sdk/v3/scheme"
3333
"github.com/ydb-platform/ydb-go-sdk/v3/scripting"
3434
"github.com/ydb-platform/ydb-go-sdk/v3/table"
35+
"github.com/ydb-platform/ydb-go-sdk/v3/topic"
36+
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
3537
"github.com/ydb-platform/ydb-go-sdk/v3/trace"
3638
)
3739

@@ -71,6 +73,9 @@ type Connection interface {
7173
// Scripting returns scripting client
7274
Scripting() scripting.Client
7375

76+
// Topic returns topic client
77+
Topic() topic.Client
78+
7479
// With makes child connection with the same options and another options
7580
With(ctx context.Context, opts ...Option) (Connection, error)
7681
}
@@ -104,6 +109,10 @@ type connection struct {
104109
ratelimiter *internalRatelimiter.Client
105110
ratelimiterOptions []ratelimiterConfig.Option
106111

112+
topicOnce initOnce
113+
topic *topicclientinternal.Client
114+
topicOptions []topicoptions.TopicOption
115+
107116
pool *conn.Pool
108117

109118
mtx sync.Mutex
@@ -141,6 +150,7 @@ func (c *connection) Close(ctx context.Context) error {
141150
c.schemeOnce.Close,
142151
c.scriptingOnce.Close,
143152
c.tableOnce.Close,
153+
c.topicOnce.Close,
144154
c.balancer.Close,
145155
c.pool.Release,
146156
)
@@ -305,6 +315,19 @@ func (c *connection) Scripting() scripting.Client {
305315
return c.scripting
306316
}
307317

318+
// Topic return topic client
319+
//
320+
// Experimental
321+
//
322+
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later release.
323+
func (c *connection) Topic() topic.Client {
324+
c.topicOnce.Init(func() closeFunc {
325+
c.topic = topicclientinternal.New(c.balancer, c.topicOptions...)
326+
return c.topic.Close
327+
})
328+
return c.topic
329+
}
330+
308331
// Open connects to database by DSN and return driver runtime holder
309332
//
310333
// DSN accept connection string like

example_test.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@ package ydb_test
33
import (
44
"context"
55
"fmt"
6+
"io/ioutil"
67
"log"
78

89
"github.com/ydb-platform/ydb-go-sdk/v3"
910
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
1011
"github.com/ydb-platform/ydb-go-sdk/v3/table"
1112
"github.com/ydb-platform/ydb-go-sdk/v3/table/result/named"
13+
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
1214
)
1315

1416
func Example_table() {
@@ -52,6 +54,37 @@ func Example_table() {
5254
}
5355
}
5456

57+
func Example_topic() {
58+
ctx := context.TODO()
59+
db, err := ydb.Open(ctx, "grpcs://localhost:2135/?database=/local")
60+
if err != nil {
61+
fmt.Printf("failed connect: %v", err)
62+
return
63+
}
64+
defer db.Close(ctx) // cleanup resources
65+
66+
reader, err := db.Topic().StartReader("consumer", topicoptions.ReadTopic("/topic/path"))
67+
if err != nil {
68+
fmt.Printf("failed start reader: %v", err)
69+
return
70+
}
71+
72+
for {
73+
mess, err := reader.ReadMessage(ctx)
74+
if err != nil {
75+
fmt.Printf("failed start reader: %v", err)
76+
return
77+
}
78+
79+
content, err := ioutil.ReadAll(mess)
80+
if err != nil {
81+
fmt.Printf("failed start reader: %v", err)
82+
return
83+
}
84+
fmt.Println(string(content))
85+
}
86+
}
87+
5588
func Example_scripting() {
5689
ctx := context.TODO()
5790
db, err := ydb.Open(ctx, "grpcs://localhost:2135/local")

go.mod

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ module github.com/ydb-platform/ydb-go-sdk/v3
33
go 1.18
44

55
require (
6+
github.com/golang/mock v1.6.0
67
github.com/jonboulle/clockwork v0.2.2
78
github.com/stretchr/testify v1.7.1
89
github.com/ydb-platform/ydb-go-genproto v0.0.0-20220801095836-cf975531fd1f
@@ -14,8 +15,8 @@ require (
1415
github.com/davecgh/go-spew v1.1.0 // indirect
1516
github.com/golang/protobuf v1.5.2 // indirect
1617
github.com/pmezard/go-difflib v1.0.0 // indirect
17-
golang.org/x/net v0.0.0-20201021035429-f5854403a974 // indirect
18-
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect
18+
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
19+
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 // indirect
1920
golang.org/x/text v0.3.3 // indirect
2021
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
2122
gopkg.in/yaml.v3 v3.0.0 // indirect

go.sum

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7
2222
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
2323
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
2424
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
25+
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
26+
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
2527
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
2628
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
2729
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
@@ -59,35 +61,45 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT
5961
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
6062
github.com/ydb-platform/ydb-go-genproto v0.0.0-20220801095836-cf975531fd1f h1:QmTU3AtCBOI8zarYB4N3q+VQQxpVxu8pRHJN5Fd8WKc=
6163
github.com/ydb-platform/ydb-go-genproto v0.0.0-20220801095836-cf975531fd1f/go.mod h1:Er+FePu1dNUieD+XTMDduGpQuCPssK5Q4BjF+IIXJ3I=
64+
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
6265
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
6366
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
67+
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
6468
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
6569
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
6670
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
6771
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
6872
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
73+
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
6974
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
7075
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
7176
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
7277
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
7378
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
7479
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
80+
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
7581
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
76-
golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI=
7782
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
83+
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
84+
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
7885
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
7986
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
8087
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
8188
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
8289
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
8390
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
91+
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
8492
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
8593
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
8694
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
8795
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
8896
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
89-
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 h1:myAQVi0cGEoqQVR5POX+8RR2mrocKqNN1hmeMqhX27k=
97+
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
9098
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
99+
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
100+
golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE=
101+
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
102+
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
91103
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
92104
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
93105
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
@@ -96,6 +108,10 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm
96108
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
97109
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
98110
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
111+
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
112+
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
113+
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
114+
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
99115
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
100116
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
101117
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

internal/background/worker.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package background
2+
3+
import (
4+
"context"
5+
"runtime/pprof"
6+
"sync"
7+
8+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
9+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
10+
)
11+
12+
// A Worker must not be copied after first use
13+
type Worker struct {
14+
ctx context.Context
15+
workers sync.WaitGroup
16+
17+
onceInit sync.Once
18+
19+
m xsync.Mutex
20+
stop xcontext.CancelErrFunc
21+
}
22+
23+
func NewWorker(parent context.Context) *Worker {
24+
w := Worker{}
25+
w.ctx, w.stop = xcontext.WithErrCancel(parent)
26+
27+
return &w
28+
}
29+
30+
func (b *Worker) Context() context.Context {
31+
b.init()
32+
33+
return b.ctx
34+
}
35+
36+
func (b *Worker) Start(name string, f func(ctx context.Context)) {
37+
b.init()
38+
39+
b.m.Lock()
40+
defer b.m.Unlock()
41+
42+
if b.ctx.Err() != nil {
43+
return
44+
}
45+
46+
b.workers.Add(1)
47+
go func() {
48+
defer b.workers.Done()
49+
50+
pprof.Do(b.ctx, pprof.Labels("background", name), f)
51+
}()
52+
}
53+
54+
func (b *Worker) Done() <-chan struct{} {
55+
b.init()
56+
57+
b.m.Lock()
58+
defer b.m.Unlock()
59+
60+
return b.ctx.Done()
61+
}
62+
63+
func (b *Worker) Close(ctx context.Context, err error) error {
64+
b.init()
65+
66+
b.stop(err)
67+
68+
waitCtx, waitCancel := context.WithCancel(ctx)
69+
70+
go func() {
71+
b.workers.Wait()
72+
waitCancel()
73+
}()
74+
75+
<-waitCtx.Done()
76+
return ctx.Err()
77+
}
78+
79+
func (b *Worker) init() {
80+
b.onceInit.Do(func() {
81+
if b.ctx == nil {
82+
b.ctx, b.stop = xcontext.WithErrCancel(context.Background())
83+
}
84+
})
85+
}

internal/empty/empty.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package empty
2+
3+
import "sync"
4+
5+
type (
6+
Chan chan struct{}
7+
Struct struct{}
8+
)
9+
10+
// DoNotCopy can be embedded in a struct to help prevent shallow copies.
11+
// This does not rely on a Go language feature, but rather a special case
12+
// within the vet checker.
13+
//
14+
// See https://golang.org/issues/8005.
15+
type DoNotCopy [0]sync.Mutex

0 commit comments

Comments
 (0)