Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 0 additions & 97 deletions cluster/aws/cfn.go

This file was deleted.

36 changes: 18 additions & 18 deletions cluster/aws/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ nohup ./nodeagent \
&>/var/log/nodeagent &
`

// Cluster is a cluster of EC2 nodes.
type Cluster struct {
Nodes []*Node

Expand Down Expand Up @@ -65,17 +66,18 @@ func collectPagesWithContext[IN any, OUT any](ctx context.Context, input IN, fn

type Option func(c *Cluster)

// WithRunInstancesInput registers a callback for customizing RunInstances calls when new nodes are created.
// WithRunInstancesInput registers a callback for customizing EC2 RunInstances calls when new nodes are created.
func (c *Cluster) WithRunInstancesInput(f func(input *ec2.RunInstancesInput) error) *Cluster {
c.RunInstancesConfig = f
return c
}

func (c *Cluster) WithLogger(l *zap.SugaredLogger) *Cluster {
c.config.log = l.Named("ec2_cluster")
c.config.withLogger(l.Named("ec2_cluster"))
return c
}

// WithInstanceType specifies the EC2 instance type to use when launching nodes.
func (c *Cluster) WithInstanceType(s string) *Cluster {
c.InstanceType = s
return c
Expand All @@ -86,19 +88,14 @@ func (c *Cluster) WithSession(sess *session.Session) *Cluster {
return c
}

func (c *Cluster) WithNodeAgentBin(binPath string) *Cluster {
c.config.nodeAgentBin = binPath
return c
}

// WithCleanupWait causes the Cleanup methods to wait for instance termination to succeed before returning.
func (c *Cluster) WithCleanupWait() *Cluster {
c.CleanupWait = true
return c
}

func (c *Cluster) WithAMIID(amiID string) *Cluster {
c.config.amiID = amiID
func (c *Cluster) WithResourceProvider(provider ResourcesProvider) *Cluster {
c.config.resourcesProvider = provider
return c
}

Expand All @@ -109,8 +106,7 @@ func (c *Cluster) Context(ctx context.Context) *Cluster {
}

// NewCluster creates a new AWS cluster.
// This uses standard AWS profile env vars.
// With no configuration, this uses the default profile.
// By default, this uses standard AWS profile env vars. With no configuration, this uses the default profile.
// The user/role used must have the appropriate permissions for the test runner,
// in order to find the resources in the account and launch/destroy EC2 instances.
//
Expand All @@ -123,6 +119,10 @@ func NewCluster() *Cluster {
}
}

func (c *Cluster) ensureLoaded() error {
return c.config.ensureLoaded()
}

func (c *Cluster) waitForInstances(ctx context.Context, instances []*ec2.Instance) ([]*ec2.Instance, error) {
var instanceIDs []*string
for _, inst := range instances {
Expand Down Expand Up @@ -171,8 +171,8 @@ func (c *Cluster) NewNodes(ctx context.Context, n int) (clusteriface.Nodes, erro
return nil, err
}
req, _ := c.config.s3Client.GetObjectRequest(&s3.GetObjectInput{
Bucket: &c.config.nodeAgentS3Bucket,
Key: &c.config.nodeAgentS3Key,
Bucket: &c.config.resources.NodeAgentS3Bucket,
Key: &c.config.resources.NodeAgentS3Key,
})
nodeagentURL, err := req.Presign(5 * time.Minute)
if err != nil {
Expand Down Expand Up @@ -203,8 +203,8 @@ func (c *Cluster) NewNodes(ctx context.Context, n int) (clusteriface.Nodes, erro

n64 := int64(n)
input := &ec2.RunInstancesInput{
ImageId: &c.config.amiID,
IamInstanceProfile: &ec2.IamInstanceProfileSpecification{Arn: &c.config.instanceProfileARN},
ImageId: &c.config.resources.AMIID,
IamInstanceProfile: &ec2.IamInstanceProfileSpecification{Arn: &c.config.resources.InstanceProfileARN},
InstanceType: &c.InstanceType,
MaxCount: &n64,
MinCount: &n64,
Expand All @@ -213,8 +213,8 @@ func (c *Cluster) NewNodes(ctx context.Context, n int) (clusteriface.Nodes, erro
NetworkInterfaces: []*ec2.InstanceNetworkInterfaceSpecification{{
AssociatePublicIpAddress: aws.Bool(true),
DeleteOnTermination: aws.Bool(true),
Groups: []*string{&c.config.instanceSecurityGroupID},
SubnetId: &c.config.subnetID,
Groups: []*string{&c.config.resources.InstanceSecurityGroupID},
SubnetId: &c.config.resources.SubnetID,
DeviceIndex: aws.Int64(0),
}},
}
Expand Down Expand Up @@ -248,7 +248,7 @@ func (c *Cluster) NewNodes(ctx context.Context, n int) (clusteriface.Nodes, erro
sess: c.config.session,
ec2Client: c.config.ec2Client,
instanceID: *inst.InstanceId,
accountID: c.config.accountID,
accountID: c.config.resources.AccountID,
cleanupWait: c.CleanupWait,
}
nodes = append(nodes, node)
Expand Down
Loading