-
Notifications
You must be signed in to change notification settings - Fork 180
Don't assume an orchestrator port number, but pull it from nomad #1368
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
f4dc73c
1634159
440f39b
eb4b55a
69d54bb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
[*.hcl] | ||
indent_size = 2 | ||
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -81,26 +81,113 @@ func (o *Orchestrator) listNomadNodes(ctx context.Context) ([]nodemanager.NomadS | |
defer listSpan.End() | ||
|
||
options := &nomadapi.QueryOptions{ | ||
// TODO: Use variable for node pool name ("default") | ||
Filter: "Status == \"ready\" and NodePool == \"default\"", | ||
Filter: `ClientStatus == "running" and JobID contains "orchestrator-"`, | ||
Params: map[string]string{"resources": "true"}, | ||
} | ||
nomadNodes, _, err := o.nomadClient.Nodes().List(options.WithContext(ctx)) | ||
nomadAllocations, _, err := o.nomadClient.Allocations().List(options.WithContext(ctx)) | ||
djeebus marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
result := make([]nodemanager.NomadServiceDiscovery, 0, len(nomadNodes)) | ||
for _, n := range nomadNodes { | ||
result := make([]nodemanager.NomadServiceDiscovery, 0, len(nomadAllocations)) | ||
for _, alloc := range nomadAllocations { | ||
if !isHealthy(alloc) { | ||
zap.L().Debug("Skipping unhealthy allocation", zap.String("allocation_id", alloc.ID)) | ||
|
||
continue | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Nomad Nodes Include Unhealthy AllocationsThe |
||
|
||
ip, port, ok := o.findPortInAllocation(alloc, "grpc") | ||
if !ok { | ||
zap.L().Warn("Cannot find port in allocation", | ||
zap.String("allocation_id", alloc.ID), zap.String("port_name", "grpc")) | ||
|
||
continue | ||
} | ||
|
||
zap.L().Debug("Found port in allocation", | ||
zap.String("allocation_id", alloc.ID), | ||
zap.String("port_name", "grpc"), | ||
zap.String("ip", ip), | ||
zap.Int("port", port), | ||
) | ||
|
||
result = append(result, nodemanager.NomadServiceDiscovery{ | ||
NomadNodeShortID: n.ID[:consts.NodeIDLength], | ||
OrchestratorAddress: fmt.Sprintf("%s:%s", n.Address, consts.OrchestratorPort), | ||
IPAddress: n.Address, | ||
NomadNodeShortID: alloc.NodeID[:consts.NodeIDLength], | ||
OrchestratorAddress: fmt.Sprintf("%s:%d", ip, port), | ||
IPAddress: ip, | ||
}) | ||
} | ||
|
||
return result, nil | ||
} | ||
|
||
func isHealthy(alloc *nomadapi.AllocationListStub) bool { | ||
if alloc == nil { | ||
zap.L().Warn("Allocation is nil") | ||
|
||
return false | ||
} | ||
|
||
if alloc.DeploymentStatus == nil { | ||
zap.L().Warn("Allocation deployment status is nil", zap.String("allocation_id", alloc.ID)) | ||
|
||
return false | ||
} | ||
|
||
if alloc.DeploymentStatus.Healthy == nil { | ||
zap.L().Warn("Allocation deployment status healthy is nil", zap.String("allocation_id", alloc.ID)) | ||
|
||
return false | ||
} | ||
|
||
return *alloc.DeploymentStatus.Healthy | ||
} | ||
|
||
func (o *Orchestrator) findPortInAllocation(allocation *nomadapi.AllocationListStub, portLabel string) (string, int, bool) { | ||
for _, task := range allocation.AllocatedResources.Tasks { | ||
for _, network := range task.Networks { | ||
host, port, ok := o.findPortInNetwork(network, portLabel) | ||
if ok { | ||
return host, port, true | ||
} | ||
} | ||
} | ||
|
||
for _, net := range allocation.AllocatedResources.Shared.Networks { | ||
host, port, ok := o.findPortInNetwork(net, portLabel) | ||
if ok { | ||
return host, port, true | ||
} | ||
} | ||
|
||
return "", 0, false | ||
} | ||
|
||
func (o *Orchestrator) findPortInNetwork(net *nomadapi.NetworkResource, portLabel string) (string, int, bool) { | ||
for _, port := range net.ReservedPorts { | ||
if port.Label == portLabel { | ||
return net.IP, port.Value, true | ||
} | ||
|
||
if port.Value == o.defaultPort { | ||
return net.IP, o.defaultPort, true | ||
} | ||
} | ||
|
||
for _, port := range net.DynamicPorts { | ||
if port.Label == portLabel { | ||
return net.IP, port.Value, true | ||
} | ||
|
||
if port.Value == o.defaultPort { | ||
return net.IP, o.defaultPort, true | ||
} | ||
} | ||
|
||
return "", 0, false | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Port Discovery Fails with Default ValueThe There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is intentional; if we don't find a port that matches the default one, we assume that we've found something that isn't an orchestrator job and move on. |
||
|
||
func (o *Orchestrator) GetNode(clusterID uuid.UUID, nodeID string) *nodemanager.Node { | ||
scopedKey := o.scopedNodeID(clusterID, nodeID) | ||
n, _ := o.nodes.Get(scopedKey) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,6 +35,10 @@ const statusLogInterval = time.Second * 20 | |
var ErrNodeNotFound = errors.New("node not found") | ||
|
||
type Orchestrator struct { | ||
// Deprecated: Nomad knows which port the orchestrator is listening on. Keep this | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We also need to track proxy port. It will ve needed for sandbox traffic routing done by client proxy. Data are send via Redis sandbox catalog done in API. We are now sending just IP to orchestrator job There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Definitely! I was hoping to keep this PR smaller by omitting those changes, but I can extend this to include those changes as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Feel free to skip it for now. Let's add a dependent task so we will not forget about it. |
||
// around temporarily until all nomad jobs have a port labeled "grpc", then this can be removed. | ||
defaultPort int | ||
|
||
httpClient *http.Client | ||
nomadClient *nomadapi.Client | ||
sandboxStore sandbox.Store | ||
|
@@ -124,6 +128,7 @@ func New( | |
sqlcDB: sqlcDB, | ||
tel: tel, | ||
clusters: clusters, | ||
defaultPort: config.DefaultOrchestratorPort, | ||
|
||
sandboxCounter: sandboxCounter, | ||
createdCounter: createdCounter, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,12 +1,8 @@ | ||
package consts | ||
|
||
import "os" | ||
|
||
const NodeIDLength = 8 | ||
|
||
// ClientID Sandbox ID client part used during migration when we are still returning client but its no longer serving its purpose, | ||
// and we are returning it only for backward compatibility with SDK clients. | ||
// We don't want to use some obviously dummy value such as empty zeros, because for users it will look like something is wrong with the sandbox id | ||
const ClientID = "6532622b" | ||
|
||
var OrchestratorPort = os.Getenv("ORCHESTRATOR_PORT") |
Uh oh!
There was an error while loading. Please reload this page.