Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type envConfig struct {
Retry int `envconfig:"RETRY" required:"false"`
BackoffPolicy string `envconfig:"BACKOFF_POLICY" required:"false"`
BackoffDelay time.Duration `envconfig:"BACKOFF_DELAY" default:"50ms" required:"false"`
Timeout time.Duration `envconfig:"TIMEOUT" default:"0s" required:"false"`

connection *amqp.Conn
channel wabbit.Channel
Expand Down Expand Up @@ -102,6 +103,7 @@ func main() {
SubscriberURL: env.SubscriberURL,
MaxRetries: env.Retry,
BackoffDelay: backoffDelay,
Timeout: env.Timeout,
BackoffPolicy: backoffPolicy,
WorkerCount: env.Parallelism,
}
Expand Down
10 changes: 8 additions & 2 deletions cmd/webhook/broker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (

"k8s.io/apimachinery/pkg/runtime/schema"

"knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/logging"
"knative.dev/pkg/signals"
"knative.dev/pkg/webhook"
"knative.dev/pkg/webhook/certificates"
Expand All @@ -40,10 +42,14 @@ var ourTypes = map[schema.GroupVersionKind]resourcesemantics.GenericCRD{
}

func NewValidationAdmissionController(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
// A function that infuses the context passed to ConvertTo/ConvertFrom/SetDefaults with custom metadata.
featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"))
featureStore.WatchConfigs(cmw)

// Decorate contexts with the current state of the config.
ctxFunc := func(ctx context.Context) context.Context {
return ctx
return featureStore.ToContext(ctx)
}

callbacks := map[schema.GroupVersionKind]validation.Callback{
v1.SchemeGroupVersion.WithKind("Broker"): validation.NewCallback(rabbitv1.ValidateBroker, webhook.Create, webhook.Update),
v1.SchemeGroupVersion.WithKind("Trigger"): validation.NewCallback(rabbitv1.ValidateTrigger(ctx), webhook.Create, webhook.Update),
Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ require (
sigs.k8s.io/controller-runtime v0.11.1
)

require k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9
require (
github.com/rickb777/date v1.13.0
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9
)

require (
cloud.google.com/go v0.99.0 // indirect
Expand Down Expand Up @@ -96,7 +99,6 @@ require (
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
github.com/prometheus/statsd_exporter v0.21.0 // indirect
github.com/rickb777/date v1.13.0 // indirect
github.com/rickb777/plural v1.2.1 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/rogpeppe/fastuuid v1.2.0 // indirect
Expand Down
12 changes: 0 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1711,8 +1711,6 @@ k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/
k8s.io/utils v0.0.0-20211116205334-6203023598ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9 h1:HNSDgDCrr/6Ly3WEGKZftiE7IY19Vz2GdbOCyI4qqhc=
k8s.io/utils v0.0.0-20220210201930-3a6ce19ff2f9/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
knative.dev/eventing v0.30.1-0.20220408112246-5accb7aceaf3 h1:PA76qWjIYEXxdsk54L5H0+rLYu9JqdtfddofSrkohsw=
knative.dev/eventing v0.30.1-0.20220408112246-5accb7aceaf3/go.mod h1:EcpeHeVqNIwFrth5ghfCoNWx1AWGcWELdOGfqZgMwXk=
knative.dev/eventing v0.30.1-0.20220412073008-22223245a4a4 h1:037hkBCKqsGcLA95hUVGnLRX5Xviy0hW9Ws8ZBDd83E=
knative.dev/eventing v0.30.1-0.20220412073008-22223245a4a4/go.mod h1:C1vLgthI00ev8zSqToU/sVgI6LI0ZQgezs0GX9uZ8l8=
knative.dev/eventing v0.30.1-0.20220415141711-ff55a456c3f9 h1:Cf5i43bXtjFTvq12JtItR3dyhlhOgTVXQ31o2lFxViM=
Expand All @@ -1722,23 +1720,13 @@ knative.dev/eventing v0.30.1-0.20220419135228-39eef14419d8/go.mod h1:XgJY27IxyBj
knative.dev/eventing v0.31.0 h1:Bu1cBSwxAT1BsaexQ6PJPWc1mxx2LF/DeszvYEIytJ0=
knative.dev/eventing v0.31.0/go.mod h1:XgJY27IxyBjmu/mz53cVlz+oMPPzzRaVXlPmWKCqEd8=
knative.dev/hack v0.0.0-20220328133751-f06773764ce3/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI=
knative.dev/hack v0.0.0-20220407171644-0e0784b13cef h1:tIBTdo8Ui1oVYyaQV9kup1qA3Rp9YQosS7c5fhjHnMc=
knative.dev/hack v0.0.0-20220407171644-0e0784b13cef/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI=
knative.dev/hack v0.0.0-20220411131823-6ffd8417de7c h1:aXsFXeky/GccNQxwf72CS4NR3EoqTqsCVNKQnblfwr0=
knative.dev/hack v0.0.0-20220411131823-6ffd8417de7c/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI=
knative.dev/hack/schema v0.0.0-20220407171644-0e0784b13cef/go.mod h1:ffjwmdcrH5vN3mPhO8RrF2KfNnbHeCE2C60A+2cv3U0=
knative.dev/hack/schema v0.0.0-20220411131823-6ffd8417de7c/go.mod h1:ffjwmdcrH5vN3mPhO8RrF2KfNnbHeCE2C60A+2cv3U0=
knative.dev/pkg v0.0.0-20220329144915-0a1ec2e0d46c/go.mod h1:0A5D5tOLettuVoi5x+0SLGRfrvVemXXtLH247WupPJk=
knative.dev/pkg v0.0.0-20220407210145-4d62e1dbb943 h1:eE0lLLThHkvWWqycAiJmyIIOMMeBVjZkFQTffoWLEJU=
knative.dev/pkg v0.0.0-20220407210145-4d62e1dbb943/go.mod h1:0A5D5tOLettuVoi5x+0SLGRfrvVemXXtLH247WupPJk=
knative.dev/pkg v0.0.0-20220411131850-75629c8ab60e/go.mod h1:0A5D5tOLettuVoi5x+0SLGRfrvVemXXtLH247WupPJk=
knative.dev/pkg v0.0.0-20220411234407-00c122e376d0/go.mod h1:0A5D5tOLettuVoi5x+0SLGRfrvVemXXtLH247WupPJk=
knative.dev/pkg v0.0.0-20220412134708-e325df66cb51 h1:4AmaxeY7+r/PYYz3HS9pMY21Mw3ykO6STLFEk2FoJ2s=
knative.dev/pkg v0.0.0-20220412134708-e325df66cb51/go.mod h1:j2MeD8s+JoCu1vegX80GbRXV/xd20Jm1NznxBYtVXiM=
knative.dev/reconciler-test v0.0.0-20220407164846-93ef9639ad95/go.mod h1:dNSuxm0NYNN/t0zzLhdBBZsxKNDGmae92Uc53qNIRIM=
knative.dev/reconciler-test v0.0.0-20220408105546-649869029f6b h1:tJ05+OZadjmFf8BlwX4ShwG9Fx9JK+2RTzxD71gy5xc=
knative.dev/reconciler-test v0.0.0-20220408105546-649869029f6b/go.mod h1:lpugBzYOTtwVbTS+UHDG8oCBEb6mRsBnJm7Lp/Yhcq4=
knative.dev/reconciler-test v0.0.0-20220411142808-55eab02440e9 h1:igM2d9OHjEabeX4WP1h5Ov95Kecl1x/7c8e5ZOqZAy0=
knative.dev/reconciler-test v0.0.0-20220411142808-55eab02440e9/go.mod h1:gH/ghOMDp7c2I2EAAhH4kYh0MI2hJ75OtiAMfeSl85o=
knative.dev/reconciler-test v0.0.0-20220412141310-6da8e62f926f/go.mod h1:/25SELzYrVr+e+nARQh88d43BdHSP0JBfwUe3x/88OE=
knative.dev/reconciler-test v0.0.0-20220412165608-994f0c3fab62 h1:NAX8bVXDuTOOGH+XPLHWA/A5ZH4NvDXBgx3GytAt3vk=
Expand Down
12 changes: 10 additions & 2 deletions pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"go.opencensus.io/plugin/ochttp/propagation/tracecontext"
"go.opencensus.io/trace"
"go.uber.org/zap"
"knative.dev/eventing-rabbitmq/pkg/utils"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/pkg/logging"
Expand All @@ -47,6 +48,7 @@ type Dispatcher struct {
SubscriberURL string
MaxRetries int
BackoffDelay time.Duration
Timeout time.Duration
BackoffPolicy eventingduckv1.BackoffPolicyType
WorkerCount int
}
Expand All @@ -68,10 +70,16 @@ func (d *Dispatcher) ConsumeFromQueue(ctx context.Context, channel wabbit.Channe
return errors.Wrap(err, "create consumer")
}

ceClient, err := client.NewClientHTTP([]cehttp.Option{cehttp.WithIsRetriableFunc(func(statusCode int) bool {
topts := []cehttp.Option{cehttp.WithIsRetriableFunc(func(statusCode int) bool {
retry, _ := kncloudevents.SelectiveRetry(ctx, &http.Response{StatusCode: statusCode}, nil)
return retry
})}, nil)
})}

if d.Timeout != 0 {
topts = append(topts, utils.WithTimeout(d.Timeout))
}

ceClient, err := client.NewClientHTTP(topts, nil)
if err != nil {
return errors.Wrap(err, "create http client")
}
Expand Down
40 changes: 35 additions & 5 deletions pkg/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ type fakeHandler struct {
exitAfter int
receiveCount int

// How long to wait before responding.
processingTime []time.Duration

// handlers for various requests
handlers []handlerFunc

Expand Down Expand Up @@ -98,6 +101,10 @@ func (h *fakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
if len(h.responseEvents) > 0 {
// write the response event out if there are any
if len(h.processingTime) > 0 {
time.Sleep(h.processingTime[h.receiveCount])
}

ev := h.responseEvents[h.receiveCount]
w.Header()["ce-specversion"] = []string{"1.0"}
w.Header()["ce-id"] = []string{ev.ID()}
Expand All @@ -107,7 +114,11 @@ func (h *fakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header()["content-type"] = []string{"application/json"}
w.Write(ev.Data())
} else {
h.handlers[h.receiveCount](w, r)
if len(h.processingTime) > 0 {
h.handlers[h.receiveCount](w, r, h.processingTime[h.receiveCount])
} else {
h.handlers[h.receiveCount](w, r, 0)
}
}
h.receiveCount++
h.exitAfter--
Expand All @@ -116,13 +127,15 @@ func (h *fakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

type handlerFunc func(http.ResponseWriter, *http.Request)
type handlerFunc func(http.ResponseWriter, *http.Request, time.Duration)

func accepted(writer http.ResponseWriter, req *http.Request) {
func accepted(writer http.ResponseWriter, req *http.Request, delay time.Duration) {
time.Sleep(delay)
writer.WriteHeader(http.StatusOK)
}

func failed(writer http.ResponseWriter, req *http.Request) {
func failed(writer http.ResponseWriter, req *http.Request, delay time.Duration) {
time.Sleep(delay)
writer.WriteHeader(500)
}

Expand Down Expand Up @@ -159,6 +172,11 @@ func TestEndToEnd(t *testing.T) {
// Delivery configuration
maxRetries int
backoffPolicy eventingduckv1.BackoffPolicyType
// timeout for dispatcher
timeout time.Duration

// Processing time for subscribers
processingTime []time.Duration

// Cloud Events to queue to Rabbit
events []ce.Event
Expand Down Expand Up @@ -224,6 +242,16 @@ func TestEndToEnd(t *testing.T) {
expectedBrokerBodies: []string{expectedResponseData},
consumeErr: context.Canceled,
},
"One event, long processing time, failed": {
subscriberReceiveCount: 1,
subscriberHandlers: []handlerFunc{accepted},
timeout: time.Second * 2,
processingTime: []time.Duration{time.Second * 5},
responseEvents: []ce.Event{createEvent(responseData)},
events: []ce.Event{createEvent(eventData)},
expectedSubscriberBodies: []string{expectedData},
consumeErr: context.Canceled,
},
// ** With retries **
"One event, 2 failures, 3rd one succeeds no response, linear retry, 2 retries": {
subscriberReceiveCount: 3,
Expand Down Expand Up @@ -262,6 +290,7 @@ func TestEndToEnd(t *testing.T) {
done: subscriberDone,
exitAfter: tc.subscriberReceiveCount,
responseEvents: tc.responseEvents,
processingTime: tc.processingTime,
}
subscriber := httptest.NewServer(subscriberHandler)
defer subscriber.Close()
Expand Down Expand Up @@ -311,6 +340,7 @@ func TestEndToEnd(t *testing.T) {
BackoffDelay: backoffDelay,
BackoffPolicy: backoffPolicy,
WorkerCount: 1,
Timeout: tc.timeout,
}

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -340,7 +370,7 @@ func TestEndToEnd(t *testing.T) {
case <-brokerDone:
brokerFinished = true
t.Logf("Broker Done")
case <-time.After(5 * time.Second):
case <-time.After(40 * time.Second):
t.Fatalf("Timed out the test. Subscriber or Broker did not get the wanted events: SubscriberFinished: %v BrokerFinished: %v", subscriberFinished, brokerFinished)
}
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/reconciler/trigger/resources/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/rickb777/date/period"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/eventing"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
Expand Down Expand Up @@ -123,6 +124,14 @@ func MakeDispatcherDeployment(args *DispatcherArgs) *appsv1.Deployment {
Value: *args.Delivery.BackoffDelay,
})
}
if args.Delivery.Timeout != nil {
timeout, _ := period.Parse(*args.Delivery.Timeout)
dispatcher.Env = append(dispatcher.Env,
corev1.EnvVar{
Name: "TIMEOUT",
Value: timeout.DurationApprox().String(),
})
}
}
if parallelism, ok := args.Trigger.ObjectMeta.Annotations[ParallelismAnnotation]; ok {
dispatcher.Env = append(dispatcher.Env,
Expand Down
3 changes: 3 additions & 0 deletions pkg/reconciler/trigger/resources/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/pointer"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/pkg/apis"
Expand Down Expand Up @@ -63,11 +64,13 @@ func TestMakeDispatcherDeployment(t *testing.T) {
Retry: Int32Ptr(10),
BackoffPolicy: &exponentialBackoff,
BackoffDelay: ptr.String("20s"),
Timeout: pointer.StringPtr("PT10S"),
})),
want: deployment(
withEnv(corev1.EnvVar{Name: "RETRY", Value: "10"}),
withEnv(corev1.EnvVar{Name: "BACKOFF_POLICY", Value: "exponential"}),
withEnv(corev1.EnvVar{Name: "BACKOFF_DELAY", Value: "20s"}),
withEnv(corev1.EnvVar{Name: "TIMEOUT", Value: "10s"}),
),
},
{
Expand Down
40 changes: 40 additions & 0 deletions pkg/utils/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
Copyright 2022 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
"errors"
"net/http"
"time"

cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
)

func WithTimeout(duration time.Duration) cehttp.Option {
return func(p *cehttp.Protocol) error {
if p == nil {
return errors.New("timeout cannot be set on nil protocol")
}

if p.Client == nil {
p.Client = http.DefaultClient
}

p.Client.Timeout = duration
return nil
}
}
Loading