Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/internal/evaluator/caching.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ func (module *CachingEvaluator) getConsumerStatus(request *protocol.EvaluatorReq

func (module *CachingEvaluator) evaluateConsumerStatus(clusterAndConsumer string) (interface{}, error) {
// First off, we need to separate the cluster and consumer values from the string provided
parts := strings.Split(clusterAndConsumer, " ")
// Allow space in consumer name
parts := strings.SplitN(clusterAndConsumer, " ", 2)
if len(parts) != 2 {
module.Log.Error("query with bad clusterAndConsumer", zap.String("arg", clusterAndConsumer))
return nil, &cacheError{StatusCode: 500, Reason: "bad request"}
Expand Down
2 changes: 1 addition & 1 deletion core/internal/helpers/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func ValidateURL(rawURL string) bool {
// ValidateHostList returns true if the provided slice of strings can all be parsed by ValidateHostPort
func ValidateHostList(hosts []string) bool {
for _, host := range hosts {
if !ValidateHostPort(host, false) {
if !ValidateHostPort(host, true) {
return false
}
}
Expand Down
10 changes: 10 additions & 0 deletions core/internal/helpers/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ func TestValidateTopic(t *testing.T) {
}
}

func TestValidateFilename(t *testing.T) {
for i, testSet := range testTopics {
result := ValidateFilename(testSet.TestValue)
assert.Equalf(t, testSet.Result, result, "Test %v - Expected '%v' to return %v, not %v", i, testSet.TestValue, testSet.Result, result)
}
}

var testEmails = []TestSet{
{"[email protected]", true},
{"need@domain", false},
Expand Down Expand Up @@ -173,10 +180,13 @@ var testHostPorts = []TestSet{
{"host.example.com:23", true},
{"thissegmentiswaytoolongbecauseitshouldnotbemorethansixtythreecharacters.foo.com:36334", false},
{"underscores_are.not.valid.com:3453", false},
{":2453", true},
{"hostname:stringsNotValid", false},
}

func TestValidateHostList(t *testing.T) {
for i, testSet := range testHostPorts {
// Test allow blank hostname
result := ValidateHostList([]string{testSet.TestValue})
assert.Equalf(t, testSet.Result, result, "Test %v - Expected '%v' to return %v, not %v", i, testSet.TestValue, testSet.Result, result)
}
Expand Down
11 changes: 11 additions & 0 deletions core/internal/helpers/zookeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ func (z *BurrowZookeeperClient) GetW(path string) ([]byte, *zk.Stat, <-chan zk.E
return z.client.GetW(path)
}

// Exists returns a boolean stating whether or not the specified path exists.
func (z *BurrowZookeeperClient) Exists(path string) (bool, *zk.Stat, error) {
return z.client.Exists(path)
}

// ExistsW returns a boolean stating whether or not the specified path exists. This method also sets a watch on the node
// (exists if it does not currently exist, or a data watch otherwise), providing an event channel that will receive a
// message when the watch fires
Expand Down Expand Up @@ -115,6 +120,12 @@ func (m *MockZookeeperClient) GetW(path string) ([]byte, *zk.Stat, <-chan zk.Eve
return args.Get(0).([]byte), args.Get(1).(*zk.Stat), args.Get(2).(<-chan zk.Event), args.Error(3)
}

// Exists mocks protocol.ZookeeperClient.Exists
func (m *MockZookeeperClient) Exists(path string) (bool, *zk.Stat, error) {
args := m.Called(path)
return args.Bool(0), args.Get(1).(*zk.Stat), args.Error(2)
}

// ExistsW mocks protocol.ZookeeperClient.ExistsW
func (m *MockZookeeperClient) ExistsW(path string) (bool, *zk.Stat, <-chan zk.Event, error) {
args := m.Called(path)
Expand Down
14 changes: 10 additions & 4 deletions core/internal/zookeeper/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,16 @@ func (zc *Coordinator) createRecursive(path string) error {

parts := strings.Split(path, "/")
for i := 2; i <= len(parts); i++ {
_, err := zc.App.Zookeeper.Create(strings.Join(parts[:i], "/"), []byte{}, 0, zk.WorldACL(zk.PermAll))
// Ignore when the node exists already
if (err != nil) && (err != zk.ErrNodeExists) {
return err
// If the rootpath exists, skip the Create process to avoid "zk: not authenticated" error
exist, _, errExists := zc.App.Zookeeper.Exists(strings.Join(parts[:i], "/"))
if !exist {
_, err := zc.App.Zookeeper.Create(strings.Join(parts[:i], "/"), []byte{}, 0, zk.WorldACL(zk.PermAll))
// Ignore when the node exists already
if (err != nil) && (err != zk.ErrNodeExists) {
return err
}
} else {
return errExists
}
}
return nil
Expand Down
4 changes: 4 additions & 0 deletions core/internal/zookeeper/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,12 @@ func TestCoordinator_StartStop(t *testing.T) {
return &mockClient, eventChan, nil
}

offsetStat := &zk.Stat{}
mockClient.On("Exists", "/test").Return(true, offsetStat, nil)
mockClient.On("Create", "/test", []byte{}, int32(0), zk.WorldACL(zk.PermAll)).Return("", zk.ErrNodeExists)
mockClient.On("Exists", "/test/path").Return(true, offsetStat, nil)
mockClient.On("Create", "/test/path", []byte{}, int32(0), zk.WorldACL(zk.PermAll)).Return("", zk.ErrNodeExists)
mockClient.On("Exists", "/test/path/burrow").Return(false, offsetStat, nil)
mockClient.On("Create", "/test/path/burrow", []byte{}, int32(0), zk.WorldACL(zk.PermAll)).Return("", nil)
mockClient.On("Close").Run(func(args mock.Arguments) { close(eventChan) }).Return()

Expand Down
6 changes: 5 additions & 1 deletion core/protocol/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,17 @@ type ZookeeperClient interface {
// the children of the specified path, providing an event channel that will receive a message when the watch fires
GetW(path string) ([]byte, *zk.Stat, <-chan zk.Event, error)

// For the given path in Zookeeper, return a boolean stating whether or not the node exists.
// The method does not set watch on the node, but verifies existence of a node to avoid authentication error.
Exists(path string) (bool, *zk.Stat, error)

// For the given path in Zookeeper, return a boolean stating whether or not the node exists. This method also sets
// a watch on the node (exists if it does not currently exist, or a data watch otherwise), providing an event
// channel that will receive a message when the watch fires
ExistsW(path string) (bool, *zk.Stat, <-chan zk.Event, error)

// Create makes a new ZNode at the specified path with the contents set to the data byte-slice. Flags can be
// provided to specify that this is an ephemeral or sequence node, and an ACL must be provided. If no ACL is\
// provided to specify that this is an ephemeral or sequence node, and an ACL must be provided. If no ACL is
// desired, specify
// zk.WorldACL(zk.PermAll)
Create(string, []byte, int32, []zk.ACL) (string, error)
Expand Down