Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[*.hcl]
indent_size = 2
26 changes: 21 additions & 5 deletions iac/provider-gcp/nomad/jobs/orchestrator.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,39 @@ job "orchestrator-${latest_orchestrator_job_id}" {
priority = 90

group "client-orchestrator" {
network {
mode = "host"

port "grpc" {
// todo: remove this once all API and client-proxy jobs
// can pull the port number from nomad.
static = "${port}"
}

port "proxy" {
// todo: remove this once all API and client-proxy jobs
// can pull the port number from nomad.
static = "${proxy_port}"
}
}

service {
name = "orchestrator"
port = "${port}"
port = "grpc"

check {
type = "grpc"
name = "health"
interval = "20s"
timeout = "5s"
grpc_use_tls = false
port = "${port}"
port = "grpc"
}
}

service {
name = "orchestrator-proxy"
port = "${proxy_port}"
port = "proxy"
}

task "check-placement" {
Expand Down Expand Up @@ -74,8 +90,8 @@ EOT
CLICKHOUSE_CONNECTION_STRING = "${clickhouse_connection_string}"
REDIS_URL = "${redis_url}"
REDIS_CLUSTER_URL = "${redis_cluster_url}"
GRPC_PORT = "${port}"
PROXY_PORT = "${proxy_port}"
GRPC_PORT = "$${NOMAD_PORT_grpc}"
PROXY_PORT = "$${NOMAD_PORT_proxy}"
GIN_MODE = "release"

%{ if launch_darkly_api_key != "" }
Expand Down
5 changes: 5 additions & 0 deletions packages/api/internal/cfg/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ type Config struct {
NomadAddress string `env:"NOMAD_ADDRESS" envDefault:"http://localhost:4646"`
NomadToken string `env:"NOMAD_TOKEN"`

// DefaultOrchestratorPort is the port that the Orchestrator listens on.
// Deprecated: Nomad knows which port the orchestrator is listening on. Keep this
// around temporarily until all nomad jobs have a port labeled "grpc", then this can be removed.
DefaultOrchestratorPort int `env:"ORCHESTRATOR_PORT" envDefault:"5008"`

PostgresConnectionString string `env:"POSTGRES_CONNECTION_STRING,required,notEmpty"`

PosthogAPIKey string `env:"POSTHOG_API_KEY"`
Expand Down
103 changes: 95 additions & 8 deletions packages/api/internal/orchestrator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,26 +81,113 @@ func (o *Orchestrator) listNomadNodes(ctx context.Context) ([]nodemanager.NomadS
defer listSpan.End()

options := &nomadapi.QueryOptions{
// TODO: Use variable for node pool name ("default")
Filter: "Status == \"ready\" and NodePool == \"default\"",
Filter: `ClientStatus == "running" and JobID contains "orchestrator-"`,
Params: map[string]string{"resources": "true"},
}
nomadNodes, _, err := o.nomadClient.Nodes().List(options.WithContext(ctx))
nomadAllocations, _, err := o.nomadClient.Allocations().List(options.WithContext(ctx))
if err != nil {
return nil, err
}

result := make([]nodemanager.NomadServiceDiscovery, 0, len(nomadNodes))
for _, n := range nomadNodes {
result := make([]nodemanager.NomadServiceDiscovery, 0, len(nomadAllocations))
for _, alloc := range nomadAllocations {
if !isHealthy(alloc) {
zap.L().Debug("Skipping unhealthy allocation", zap.String("allocation_id", alloc.ID))

continue
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Nomad Nodes Include Unhealthy Allocations

The listNomadNodes function includes Nomad allocations with an unknown health status (DeploymentStatus.Healthy is nil). This allows connecting to orchestrators that aren't explicitly healthy or ready, contrary to the goal of only adding healthy orchestrators.

Fix in Cursor Fix in Web


ip, port, ok := o.findPortInAllocation(alloc, "grpc")
if !ok {
zap.L().Warn("Cannot find port in allocation",
zap.String("allocation_id", alloc.ID), zap.String("port_name", "grpc"))

continue
}

zap.L().Debug("Found port in allocation",
zap.String("allocation_id", alloc.ID),
zap.String("port_name", "grpc"),
zap.String("ip", ip),
zap.Int("port", port),
)

result = append(result, nodemanager.NomadServiceDiscovery{
NomadNodeShortID: n.ID[:consts.NodeIDLength],
OrchestratorAddress: fmt.Sprintf("%s:%s", n.Address, consts.OrchestratorPort),
IPAddress: n.Address,
NomadNodeShortID: alloc.NodeID[:consts.NodeIDLength],
OrchestratorAddress: fmt.Sprintf("%s:%d", ip, port),
IPAddress: ip,
})
}

return result, nil
}

func isHealthy(alloc *nomadapi.AllocationListStub) bool {
if alloc == nil {
zap.L().Warn("Allocation is nil")

return false
}

if alloc.DeploymentStatus == nil {
zap.L().Warn("Allocation deployment status is nil", zap.String("allocation_id", alloc.ID))

return false
}

if alloc.DeploymentStatus.Healthy == nil {
zap.L().Warn("Allocation deployment status healthy is nil", zap.String("allocation_id", alloc.ID))

return false
}

return *alloc.DeploymentStatus.Healthy
}

func (o *Orchestrator) findPortInAllocation(allocation *nomadapi.AllocationListStub, portLabel string) (string, int, bool) {
for _, task := range allocation.AllocatedResources.Tasks {
for _, network := range task.Networks {
host, port, ok := o.findPortInNetwork(network, portLabel)
if ok {
return host, port, true
}
}
}

for _, net := range allocation.AllocatedResources.Shared.Networks {
host, port, ok := o.findPortInNetwork(net, portLabel)
if ok {
return host, port, true
}
}

return "", 0, false
}

func (o *Orchestrator) findPortInNetwork(net *nomadapi.NetworkResource, portLabel string) (string, int, bool) {
for _, port := range net.ReservedPorts {
if port.Label == portLabel {
return net.IP, port.Value, true
}

if port.Value == o.defaultPort {
return net.IP, o.defaultPort, true
}
}

for _, port := range net.DynamicPorts {
if port.Label == portLabel {
return net.IP, port.Value, true
}

if port.Value == o.defaultPort {
return net.IP, o.defaultPort, true
}
}

return "", 0, false
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Port Discovery Fails with Default Value

The findPortInNetwork function's port discovery has a flawed fallback. It checks for the default port value immediately after failing a label match for each port. This can return a port with the wrong label if its value matches the default and it appears before the correctly labeled port, potentially connecting to the wrong service.

Fix in Cursor Fix in Web

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentional; if we don't find a port that matches the default one, we assume that we've found something that isn't an orchestrator job and move on.


func (o *Orchestrator) GetNode(clusterID uuid.UUID, nodeID string) *nodemanager.Node {
scopedKey := o.scopedNodeID(clusterID, nodeID)
n, _ := o.nodes.Get(scopedKey)
Expand Down
5 changes: 5 additions & 0 deletions packages/api/internal/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ const statusLogInterval = time.Second * 20
var ErrNodeNotFound = errors.New("node not found")

type Orchestrator struct {
// Deprecated: Nomad knows which port the orchestrator is listening on. Keep this
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to track proxy port. It will ve needed for sandbox traffic routing done by client proxy.

Data are send via Redis sandbox catalog done in API. We are now sending just IP to orchestrator job

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely! I was hoping to keep this PR smaller by omitting those changes, but I can extend this to include those changes as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feel free to skip it for now. Let's add a dependent task so we will not forget about it.
It will also need some testing in edge service discovery (that does orchestrator discovery in BYOC) to add it there also.

// around temporarily until all nomad jobs have a port labeled "grpc", then this can be removed.
defaultPort int

httpClient *http.Client
nomadClient *nomadapi.Client
sandboxStore sandbox.Store
Expand Down Expand Up @@ -124,6 +128,7 @@ func New(
sqlcDB: sqlcDB,
tel: tel,
clusters: clusters,
defaultPort: config.DefaultOrchestratorPort,

sandboxCounter: sandboxCounter,
createdCounter: createdCounter,
Expand Down
4 changes: 0 additions & 4 deletions packages/shared/pkg/consts/sandboxes.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
package consts

import "os"

const NodeIDLength = 8

// ClientID Sandbox ID client part used during migration when we are still returning client but its no longer serving its purpose,
// and we are returning it only for backward compatibility with SDK clients.
// We don't want to use some obviously dummy value such as empty zeros, because for users it will look like something is wrong with the sandbox id
const ClientID = "6532622b"

var OrchestratorPort = os.Getenv("ORCHESTRATOR_PORT")
Loading