Skip to content

Commit 2e6882e

Browse files
committed
Refactor Lambda to expect event to match Job Lifecycle Status Change event
1 parent 75dc4fa commit 2e6882e

File tree

1 file changed

+61
-55
lines changed

1 file changed

+61
-55
lines changed

cloudformation/auxiliary_templates/custom_balancer/custom_queue_job_balancer.yaml

Lines changed: 61 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -39,67 +39,72 @@ Resources:
3939
def lambda_handler(event, context):
4040
deadline = boto3.client("deadline")
4141
42-
# input event is list of objects with farm and queue ids
43-
# loop through each one and balance the job weights
42+
# Input event matches the structure of a Job Lifecycle Status Change event
43+
# {"detail": {"farmId": "${FarmId}", "queueId": "${QueueId}"}}
4444
response = []
45-
for item in event:
46-
updates = []
47-
farm_id = item["farmId"]
48-
queue_id = item["queueId"]
45+
updates = []
46+
farm_id = event.get("detail", {}).get("farmId", "")
47+
queue_id = event.get("detail", {}).get("queueId", "")
48+
previous_lifecycle_status = event.get("detail", {}).get("previousLifecycleStatus" , None)
49+
if not farm_id and queue_id:
50+
return {"statusCode": 422, "body": "Missing .detail.farmId or .detail.queueId"}
4951
50-
active_fleet_ids = get_all_associated_fleet_ids(deadline, farm_id, queue_id)
51-
total_max_worker_count = sum(
52-
[get_max_worker_count(deadline, farm_id, x) for x in active_fleet_ids]
53-
)
52+
if previous_lifecycle_status is not None:
53+
return {"statusCode": 200, "body": "Balancer skipping lifecycle events that are not job submission"}
54+
55+
active_fleet_ids = get_all_associated_fleet_ids(deadline, farm_id, queue_id)
56+
total_max_worker_count = sum(
57+
[get_max_worker_count(deadline, farm_id, x) for x in active_fleet_ids]
58+
)
5459
55-
active_jobs = get_all_active_jobs(deadline, farm_id, queue_id)
56-
active_job_count = len(active_jobs)
60+
active_jobs = get_all_active_jobs(deadline, farm_id, queue_id)
61+
active_job_count = len(active_jobs)
5762
58-
priority_100_jobs = [job for job in active_jobs if job["priority"] == 100]
59-
priority_100_job_count = len(priority_100_jobs)
63+
priority_100_jobs = [job for job in active_jobs if job["priority"] == 100]
64+
priority_100_job_count = len(priority_100_jobs)
6065
61-
priority_0_jobs = [job for job in active_jobs if job["priority"] == 0]
62-
priority_0_job_count = len(priority_0_jobs)
66+
priority_0_jobs = [job for job in active_jobs if job["priority"] == 0]
67+
priority_0_job_count = len(priority_0_jobs)
6368
64-
if priority_100_job_count > 0:
65-
# Priority 100 jobs get all the capacity if they exist
66-
# so allocate all the weights to them
67-
active_jobs = priority_100_jobs
68-
elif priority_0_job_count < active_job_count:
69-
# Priority 0 jobs get no capacity until all other jobs are done
70-
# so allocate all the weights to higher priority jobs
71-
active_jobs = [job for job in active_jobs if job["priority"] != 0]
69+
if priority_100_job_count > 0:
70+
# Priority 100 jobs get all the capacity if they exist
71+
# so allocate all the weights to them
72+
active_jobs = priority_100_jobs
73+
elif priority_0_job_count < active_job_count:
74+
# Priority 0 jobs get no capacity until all other jobs are done
75+
# so allocate all the weights to higher priority jobs
76+
active_jobs = [job for job in active_jobs if job["priority"] != 0]
7277
73-
for job in active_jobs:
74-
job["weight"] = calculate_job_weight(deadline, farm_id, queue_id, job)
78+
for job in active_jobs:
79+
job["weight"] = calculate_job_weight(deadline, farm_id, queue_id, job)
7580
76-
job_weight_sum = max(1, sum(job["weight"] for job in active_jobs))
77-
for job in active_jobs:
78-
# ceil because we'd rather have too high of a limit rather than idle workers
79-
job_max_worker_count = max(
80-
1, ceil((job["weight"] / job_weight_sum) * total_max_worker_count)
81-
)
82-
update_response = update_job_max_worker_count(
83-
deadline, farm_id, queue_id, job["jobId"], job_max_worker_count
84-
)
85-
updates.append(
86-
{
87-
"name": job["name"],
88-
"jobId": job["jobId"],
89-
"priority": job["priority"],
90-
"weight": job["weight"],
91-
"workerLimit": job_max_worker_count,
92-
"response": update_response,
93-
}
94-
)
95-
response.append({
96-
"totalMaxWorkerCount": total_max_worker_count,
97-
"activeJobCount": active_job_count,
98-
"priority100JobCount": priority_100_job_count,
99-
"priority0JobCount": priority_0_job_count,
100-
"allocatedJobCount": len(active_jobs),
101-
"updates": updates
102-
})
81+
job_weight_sum = max(1, sum(job["weight"] for job in active_jobs))
82+
for job in active_jobs:
83+
# ceil because we'd rather have too high of a limit rather than idle workers
84+
job_max_worker_count = max(
85+
1, ceil((job["weight"] / job_weight_sum) * total_max_worker_count)
86+
)
87+
update_response = update_job_max_worker_count(
88+
deadline, farm_id, queue_id, job["jobId"], job_max_worker_count
89+
)
90+
updates.append(
91+
{
92+
"name": job["name"],
93+
"jobId": job["jobId"],
94+
"priority": job["priority"],
95+
"weight": job["weight"],
96+
"workerLimit": job_max_worker_count,
97+
"response": update_response,
98+
}
99+
)
100+
response.append({
101+
"totalMaxWorkerCount": total_max_worker_count,
102+
"activeJobCount": active_job_count,
103+
"priority100JobCount": priority_100_job_count,
104+
"priority0JobCount": priority_0_job_count,
105+
"allocatedJobCount": len(active_jobs),
106+
"updates": updates
107+
})
103108
print(json.dumps(response))
104109
return {"statusCode": 200, "body": json.dumps(response)}
105110
@@ -199,7 +204,8 @@ Resources:
199204
steps = steps_pages.build_full_result()["steps"]
200205
# get every task across all steps
201206
for step in steps:
202-
tasks_pages = deadline.get_paginator("list_tasks").paginate(farmId=farm_id, queueId=queue_id, jobId=job["jobId"], stepId=step["stepId"])
207+
tasks_pages = deadline.get_paginator("list_tasks").paginate(farmId=farm_id, queueId=queue_id,
208+
jobId=job["jobId"], stepId=step["stepId"])
203209
tasks.extend(tasks_pages.build_full_result()["tasks"])
204210
205211
error_count = sum([task["failureRetryCount"] for task in tasks])
@@ -268,7 +274,7 @@ Resources:
268274
Arn: !GetAtt
269275
- UnprocessedBalancingEventQueue
270276
- Arn
271-
Input: !Sub '[{"farmId": "${FarmId}", "queueId": "${QueueId}"}]'
277+
Input: !Sub '{"detail": {"farmId": "${FarmId}", "queueId": "${QueueId}"}}'
272278
RetryPolicy:
273279
MaximumRetryAttempts: 1
274280
RoleArn: !GetAtt

0 commit comments

Comments
 (0)