Skip to content

Commit d9b4b6a

Browse files
authored
Batch messages poll endpoint (#65)
* support message poll rest endpoint * replace mongo install action in the github actions * returns 204 no-content if the message list is empty
1 parent 0224e63 commit d9b4b6a

File tree

11 files changed

+279
-37
lines changed

11 files changed

+279
-37
lines changed

.github/workflows/go.yml

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ jobs:
4040
build_test:
4141
name: build and test
4242
runs-on: ubuntu-latest
43+
strategy:
44+
matrix:
45+
mongodb-version: [4.2]
4346
steps:
4447
- name: Set up Go
4548
uses: actions/setup-go@v1
@@ -51,16 +54,13 @@ jobs:
5154
with:
5255
fetch-depth: 1
5356
path: go/src/github.com/kafkaesque-io/pulsar-beam
54-
- name: Install MongoDB
55-
run: |
56-
echo $GITHUB_EVENT_NAME
57-
echo $GITHUB_EVENT_PATH
58-
sudo apt-get update
59-
sudo apt-get install -y mongodb
57+
- name: Start MongoDB v${{ matrix.mongodb-version }}
58+
uses: supercharge/[email protected]
59+
with:
60+
mongodb-version: ${{ matrix.mongodb-version }}
6061
- name: Verify MongoDB Installation and Status
6162
run: |
62-
ls /var/lib/mongodb
63-
sudo systemctl status mongodb
63+
sudo docker ps
6464
- name: Build Binary
6565
run: |
6666
go mod download
@@ -111,7 +111,10 @@ jobs:
111111
e2e_test:
112112
name: e2e_test
113113
needs: [analysis, build_test]
114-
runs-on: ubuntu-latest
114+
runs-on: ubuntu-latest
115+
strategy:
116+
matrix:
117+
mongodb-version: [4.2]
115118
steps:
116119
- name: Check out code
117120
uses: actions/checkout@v1
@@ -123,16 +126,13 @@ jobs:
123126
run: |
124127
pwd
125128
go mod download
126-
- name: Install MongoDB
127-
run: |
128-
echo $GITHUB_EVENT_NAME
129-
echo $GITHUB_EVENT_PATH
130-
sudo apt-get update
131-
sudo apt-get install -y mongodb
129+
- name: Start MongoDB v${{ matrix.mongodb-version }}
130+
uses: supercharge/[email protected]
131+
with:
132+
mongodb-version: ${{ matrix.mongodb-version }}
132133
- name: Verify MongoDB Installation and Status
133134
run: |
134-
ls /var/lib/mongodb
135-
sudo systemctl status mongodb
135+
sudo docker ps
136136
- name: Set up root certificate
137137
env:
138138
PULSAR_CLIENT_CERT: ${{ secrets.PULSAR_CLIENT_CERT }}

README.md

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Beam is an http based streaming and queueing system backed up by Apache Pulsar.
1515
- [x] A message can be pushed to a webhook or Cloud Function for consumption.
1616
- [x] A webhook or Cloud Function receives a message, process it and reply another message, in a response body, back to another Pulsar topic via Pulsar Beam.
1717
- [x] Messages can be streamed via HTTP Sever Sent Event, [SSE](https://www.html5rocks.com/en/tutorials/eventsource/basics/)
18+
- [x] Support HTTP polling of batch messages
1819

1920
Opening an issue and PR are welcomed! Please email `[email protected]` for any inquiry or demo.
2021

@@ -59,6 +60,21 @@ Query parameters
5960
2. SubscriptionInitialPosition -> supported type are `latest` as default and `earliest`
6061
3. SubscriptionName -> the length must be 5 characters or longer. An auto-generated name will be provided in absence. Only the auto-generated subscription will be unsubscribed.
6162

63+
### Endpoint to poll batch messages
64+
Polls a batch of messages always from the earliest subscription position from a topic.
65+
```
66+
/v2/poll/{persistent}/{tenant}/{namespace}/{topic}
67+
```
68+
These HTTP headers may be required to map to Pulsar topic.
69+
1. Authorization -> Bearer token as Pulsar token
70+
2. PulsarUrl -> *optional* a fully qualified pulsar or pulsar+ssl URL where the message should be sent to. It is optional. The message will be sent to Pulsar URL specified under `PulsarBrokerURL` in the pulsar-beam.yml file if it is absent.
71+
72+
Query parameters
73+
1. SubscriptionType -> Supported type strings are `exclusive` as default, `shared`, and `failover`
74+
2. SubscriptionName -> the length must be 5 characters or longer. An auto-generated name will be provided in absence. Only the auto-generated subscription will be unsubscribed.
75+
3. size -> The batch size. The default is 10.
76+
4. perMessageTimeoutMs -> is a wait time out for the next message to arrive. It is in milliseconds per message. The default is 300ms.
77+
6278
### Webhook registration
6379
Webhook registration is done via REST API backed by a database of your choice, such as MongoDB, in momery cache, and Pulsar itself. Yes, you can use a compacted Pulsar topic as a database table to perform CRUD. The configuration parameter is `"PbDbType": "inmemory",` in the `pulsar_beam.yml` file or the env variable `PbDbType`.
6480

@@ -149,7 +165,7 @@ One end to end test is under `./src/e2e/e2etest.go`, that performs the following
149165
4. Verify the replied message on the sink topic
150166
5. Delete the topic and its webhook document via RESTful API
151167

152-
Since the set up is non-trivial involving Pulsar Beam, a Cloud function or webhook, the test tool, and Pulsar itself with SSL, we recommend to take advantage of [the free plan at Kafkaesque.io](https://kafkaesque.io) as the Pulsar server and a Cloud Function that we have verified GCP Fcuntion, Azure Function or AWS Lambda will suffice in the e2e flow.
168+
Since the set up is non-trivial involving Pulsar Beam, a Cloud function or webhook, the test tool, and Pulsar itself with SSL, we recommend to take advantage of [the free plan at kesque.com](https://kesque.com) as the Pulsar server and a Cloud Function that we have verified GCP Fcuntion, Azure Function or AWS Lambda will suffice in the e2e flow.
153169

154170
Step to perform unit test
155171
```bash

config/pulsar_beam.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ PulsarPrivateKey: ./unit-test/example_private_key
77
PbDbInterval: 10s
88
DbConnectionStr: mongodb://localhost:27017
99
DbName:
10-
DbPassword:
10+
DbPassword:
11+
TrustStore: "/etc/ssl/certs/ca-bundle.crt"

go.sum

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ github.com/boynton/repl v0.0.0-20170116235056-348863958e3e/go.mod h1:Crc/GCZ3NXD
1919
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
2020
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
2121
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
22+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
2223
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
2324
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
2425
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
@@ -79,6 +80,7 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi
7980
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
8081
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
8182
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
83+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
8284
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
8385
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
8486
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
@@ -113,6 +115,7 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
113115
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
114116
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
115117
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
118+
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
116119
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
117120
github.com/tidwall/pretty v1.0.1/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
118121
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=

src/broker/sse-broker.go

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
package broker
22

33
import (
4+
"strings"
5+
"time"
6+
47
"github.com/apache/pulsar-client-go/pulsar"
8+
"github.com/kafkaesque-io/pulsar-beam/src/model"
59
"github.com/kafkaesque-io/pulsar-beam/src/pulsardriver"
6-
)
7-
8-
const (
9-
// SSEBrokerMaxSize the max size of the number of HTTP SSE session is supported
10-
SSEBrokerMaxSize = 200
11-
12-
// TODO add counters and max limit for SSEBroker
10+
log "github.com/sirupsen/logrus"
1311
)
1412

1513
// GetPulsarClientConsumer returns Puslar client and consumer interface objects
@@ -31,3 +29,33 @@ func GetPulsarClientConsumer(url, token, topic, subscriptionName string, subType
3129

3230
return client, consumer, nil
3331
}
32+
33+
// PollBatchMessages polls a batch of consumer messages
34+
func PollBatchMessages(url, token, topic, subscriptionName string, subType pulsar.SubscriptionType, size, perMessageTimeoutMs int) (model.PulsarMessages, error) {
35+
log.Infof("getbatchmessages called")
36+
client, consumer, err := GetPulsarClientConsumer(url, token, topic, subscriptionName, subType, pulsar.SubscriptionPositionEarliest)
37+
if err != nil {
38+
return model.NewPulsarMessages(size), err
39+
}
40+
if strings.HasPrefix(subscriptionName, model.NonResumable) {
41+
defer consumer.Unsubscribe()
42+
}
43+
defer consumer.Close()
44+
defer client.Close()
45+
46+
messages := model.NewPulsarMessages(size)
47+
consumChan := consumer.Chan()
48+
for i := 0; i < size; i++ {
49+
select {
50+
case msg := <-consumChan:
51+
// log.Infof("received message %s on topic %s", string(msg.Payload()), msg.Topic())
52+
messages.AddPulsarMessage(msg)
53+
consumer.Ack(msg)
54+
55+
case <-time.After(time.Duration(perMessageTimeoutMs) * time.Millisecond): //TODO: this should be configurable
56+
i = size
57+
}
58+
}
59+
60+
return messages, nil
61+
}

src/model/message.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package model
2+
3+
import (
4+
"fmt"
5+
"time"
6+
7+
"github.com/apache/pulsar-client-go/pulsar"
8+
)
9+
10+
// PulsarMessage is the Pulsar Message type
11+
type PulsarMessage struct {
12+
Payload []byte `json:"payload"`
13+
Topic string `json:"topic"`
14+
EventTime time.Time `json:"eventTime"`
15+
PublishTime time.Time `json:"publishTime"`
16+
MessageID string `json:"messageId"`
17+
Key string `json:"key"`
18+
}
19+
20+
// PulsarMessages encapsulates a list of messages to be returned to a client
21+
type PulsarMessages struct {
22+
Limit int `json:"limit"`
23+
Size int `json:"size"`
24+
Messages []PulsarMessage `json:"messages"`
25+
}
26+
27+
// NewPulsarMessages create a PulsarMessages object
28+
func NewPulsarMessages(initSize int) PulsarMessages {
29+
return PulsarMessages{
30+
Limit: initSize,
31+
Size: 0,
32+
Messages: make([]PulsarMessage, 0),
33+
}
34+
}
35+
36+
// AddPulsarMessage adds a Pulsar Message to the payload, return true if reaches capacity
37+
func (msgs *PulsarMessages) AddPulsarMessage(msg pulsar.Message) bool {
38+
if msgs.Size >= msgs.Limit {
39+
return true
40+
}
41+
msgs.Messages = append(msgs.Messages, PulsarMessage{
42+
Payload: msg.Payload(),
43+
Topic: msg.Topic(),
44+
EventTime: msg.EventTime(),
45+
PublishTime: msg.PublishTime(),
46+
MessageID: fmt.Sprintf("%+v", msg.ID()),
47+
Key: msg.Key(),
48+
})
49+
msgs.Size++
50+
51+
return msgs.Size >= msgs.Limit
52+
}
53+
54+
// IsEmpty checks if the message list is empty
55+
func (msgs *PulsarMessages) IsEmpty() bool {
56+
return msgs.Size == 0
57+
}

src/route/handlers.go

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ func TokenSubjectHandler(w http.ResponseWriter, r *http.Request) {
5959
return
6060
}
6161
w.Write(respJSON)
62-
w.WriteHeader(http.StatusOK)
6362
}
6463
return
6564
}
@@ -106,15 +105,55 @@ func ReceiveHandler(w http.ResponseWriter, r *http.Request) {
106105
return
107106
}
108107

108+
// recoverHandler a function recovers from panic
109+
func recoverHandler(r *http.Request) {
110+
if r := recover(); r != nil {
111+
fmt.Printf("Recovered in http handler crash %v", r)
112+
} else {
113+
fmt.Printf("exit http handler")
114+
}
115+
}
116+
117+
// PollHandler polls messages from a Pulsar topic.
118+
func PollHandler(w http.ResponseWriter, r *http.Request) {
119+
defer recoverHandler(r)
120+
121+
u, _ := url.Parse(r.URL.String())
122+
params := u.Query()
123+
token, topicFN, pulsarURL, subName, _, subType, err := ConsumerConfigFromHTTPParts(util.AllowedPulsarURLs, &r.Header, mux.Vars(r), params)
124+
if err != nil {
125+
util.ResponseErrorJSON(err, w, http.StatusUnprocessableEntity)
126+
return
127+
}
128+
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
129+
130+
size := util.QueryParamInt(params, "batch", 10)
131+
perMessageTimeoutMs := util.QueryParamInt(params, "perMessageTimeoutMs", 300)
132+
133+
// subscription initial position is always set to earliest since this is short poll
134+
msgs, err := broker.PollBatchMessages(pulsarURL, token, topicFN, subName, subType, size, perMessageTimeoutMs)
135+
if err != nil {
136+
util.ResponseErrorJSON(err, w, http.StatusInternalServerError)
137+
return
138+
}
139+
140+
if msgs.IsEmpty() {
141+
w.WriteHeader(http.StatusNoContent)
142+
return
143+
}
144+
145+
data, err := json.Marshal(msgs)
146+
if err != nil {
147+
util.ResponseErrorJSON(err, w, http.StatusInternalServerError)
148+
return
149+
}
150+
w.WriteHeader(http.StatusOK)
151+
w.Write(data)
152+
}
153+
109154
// SSEHandler is the HTTP SSE handler
110155
func SSEHandler(w http.ResponseWriter, r *http.Request) {
111-
defer func() {
112-
if r := recover(); r != nil {
113-
fmt.Printf("Recovered in SSEHandler %v", r)
114-
} else {
115-
fmt.Printf("exit SSEHandler()")
116-
}
117-
}()
156+
defer recoverHandler(r)
118157

119158
u, _ := url.Parse(r.URL.String())
120159
params := u.Query()
@@ -189,7 +228,6 @@ func GetTopicHandler(w http.ResponseWriter, r *http.Request) {
189228
util.ResponseErrorJSON(err, w, http.StatusInternalServerError)
190229
} else {
191230
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
192-
w.WriteHeader(http.StatusOK)
193231
w.Write(resJSON)
194232
}
195233

@@ -199,6 +237,7 @@ func GetTopicHandler(w http.ResponseWriter, r *http.Request) {
199237
func UpdateTopicHandler(w http.ResponseWriter, r *http.Request) {
200238
decoder := json.NewDecoder(r.Body)
201239
defer r.Body.Close()
240+
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
202241

203242
var doc model.TopicConfig
204243
err := decoder.Decode(&doc)
@@ -270,7 +309,6 @@ func DeleteTopicHandler(w http.ResponseWriter, r *http.Request) {
270309
w.WriteHeader(http.StatusInternalServerError)
271310
} else {
272311
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
273-
w.WriteHeader(http.StatusOK)
274312
w.Write(resJSON)
275313
}
276314
}

src/route/routes.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,13 @@ var ReceiverRoutes = Routes{
7272
SSEHandler,
7373
middleware.AuthVerifyJWT,
7474
},
75+
Route{
76+
"poll-messages",
77+
http.MethodGet,
78+
"/v2/poll/{persistent}/{tenant}/{namespace}/{topic}",
79+
PollHandler,
80+
middleware.AuthVerifyJWT,
81+
},
7582
}
7683

7784
// RestRoutes definition

src/unit-test/model_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package tests
2+
3+
import (
4+
"testing"
5+
6+
. "github.com/kafkaesque-io/pulsar-beam/src/model"
7+
)
8+
9+
func TestPulsarMessages(t *testing.T) {
10+
11+
messages := NewPulsarMessages(10)
12+
equals(t, messages.Limit, 10)
13+
equals(t, messages.IsEmpty(), true)
14+
}

0 commit comments

Comments
 (0)