Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ RUN pip install -U pip
COPY requirements.txt /worker/requirements.txt
RUN pip install -r requirements.txt

# Add in tools to count GPUs
RUN apt-get install pciutils -y

# Copy our actual code
COPY *.py /worker/
COPY detailed_result_put.sh /worker/

# Run it
CMD celery -A worker worker -l info -Q compute-worker -n compute-worker%h -Ofast -Ofair --concurrency=1
CMD celery -A worker worker -l info -B -Q compute-worker -n compute-worker%h -Ofast -Ofair --concurrency=1
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mkdir -p /tmp/codalab && nvidia-docker run \
-v /var/run/docker.sock:/var/run/docker.sock \
-v /var/lib/nvidia-docker/nvidia-docker.sock:/var/lib/nvidia-docker/nvidia-docker.sock \
-v /tmp/codalab:/tmp/codalab \
-v worker_registration:/worker_registration \
-d \
--name compute_worker \
--env BROKER_URL=<queue broker url> \
Expand All @@ -31,6 +32,7 @@ nvidia-docker run \
-v /var/run/docker.sock:/var/run/docker.sock \
-v /var/lib/nvidia-docker/nvidia-docker.sock:/var/lib/nvidia-docker/nvidia-docker.sock \
-v /tmp/codalab:/tmp/codalab \
-v worker_registration:/worker_registration \
-d \
--name compute_worker \
--env-file .env \
Expand Down
16 changes: 7 additions & 9 deletions celeryconfig.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import os


BROKER_URL = os.environ.get('BROKER_URL')
BROKER_USE_SSL = os.environ.get('BROKER_USE_SSL', False)
CELERY_IMPORTS = ('worker',)
CELERY_TASK_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ('json',)
CELERYD_FORCE_EXECV = True
CELERYD_MAX_TASKS_PER_CHILD = 1
CELERYD_PREFETCH_MULTIPLIER = 1
CELERYD_CONCURRENCY = 1
broker_url = os.environ.get('BROKER_URL')
broker_use_ssl = os.environ.get('BROKER_USE_SSL', False)
imports = ('worker',)
task_serializer = 'json'
accept_content = ('json',)
worker_max_tasks_per_child = 1
worker_prefetch_multiplier = 1
8 changes: 7 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
celery==3.1.26.post2
celery==4.3.0
requests==1.2.3
PyYAML==3.10
psutil==2.1.1

# Working off of amqp master for now, needs to be set to an explicit version after release.
# "Drain events" problem is resolved here..
#
# Last checked August 12, 2019
https://github.com/celery/py-amqp/archive/master.zip
132 changes: 121 additions & 11 deletions worker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#!/usr/bin/env python
import multiprocessing
import subprocess
import sys
import hashlib
import urllib
Expand Down Expand Up @@ -27,9 +29,10 @@
from zipfile import ZipFile, BadZipfile

from billiard import SoftTimeLimitExceeded
from celery import Celery, task
from celery import Celery, task, signature

# from celery.app import app_or_default
from celery.signals import celeryd_after_setup

app = Celery('worker')
app.config_from_object('celeryconfig')
Expand All @@ -42,6 +45,117 @@
logger.propagate = False


def _delay_submission_updates_task(task_name, args, virtual_host):
with app.connection() as new_connection:
# We need to send on the main virtual host, not whatever host we're currently
# connected to.
new_connection.virtual_host = virtual_host
app.send_task(
task_name,
args=args,
connection=new_connection,
queue="submission-updates",
)


def register_worker(worker_id, ip, cpu_count, mem_mb, harddrive_gb, gpus, queue_vhost, virtual_host='/'):
logger.info("Registering worker id = {}, ip = {}, cpu_count = {}, mem_mb = {}, harddrive_gb = {}, gpus = {}, queue_vhost = {}".format(
worker_id,
ip,
cpu_count,
mem_mb,
harddrive_gb,
gpus,
queue_vhost,
))
_delay_submission_updates_task(
'apps.web.tasks.register_worker',
(worker_id, ip, cpu_count, mem_mb, harddrive_gb, gpus, queue_vhost),
virtual_host
)


def worker_job_started(worker_id, submission_secret, is_scoring, virtual_host='/'):
logger.info("Starting job worker id = {}, submission_secret = {}, is_scoring = {}".format(
worker_id,
submission_secret,
is_scoring,
))
_delay_submission_updates_task(
'apps.web.tasks.worker_job_started',
(worker_id, submission_secret, is_scoring),
virtual_host
)


def worker_job_ended(worker_id, submission_secret, is_scoring, virtual_host='/'):
logger.info("Ending job worker id = {}, submission_secret = {}, is_scoring = {}".format(
worker_id,
submission_secret,
is_scoring,
))
_delay_submission_updates_task(
'apps.web.tasks.worker_job_ended',
(worker_id, submission_secret, is_scoring),
virtual_host
)


def _get_worker_id():
# Save worker ID or get existing
registration_path = '/worker_registration/.worker_registration'
if not os.path.exists(registration_path):
logger.info("Doing first time worker configuration")
worker_id = str(uuid.uuid4())
with open(registration_path, 'w') as worker_info:
worker_info.write(worker_id)
logger.info("Wrote id = '{}' to '{}'".format(worker_id, registration_path))
else:
with open(registration_path, 'r') as worker_info:
worker_id = worker_info.read()
logger.info("Found existing worker id: {}".format(worker_id))
return worker_id


# This is set when configuring workers after celeryd setup
WORKER_ID = _get_worker_id()


@celeryd_after_setup.connect()
def configure_workers(sender, conf=None, **kwargs):
# print("INIT configure workers")
try:
gpus = str(subprocess.check_output(["lspci"])).count('3D controller:')
except FileNotFoundError:
gpus = 0

# Get vhost from broker_url
broker_url = str(app.conf.BROKER_URL)
if broker_url.endswith('//'):
queue_vhost = '/'
else:
queue_vhost = broker_url.split('/')[-1]

# We can execute this over and over, so long as we use the same WORKER_ID shouldn't make a duplicate
register_worker(
WORKER_ID,
requests.get("https://api.ipify.org").text,
multiprocessing.cpu_count(),
get_available_memory(),
psutil.disk_usage('/').total / (1024.0 ** 3), # in GB
gpus,
queue_vhost
)

# Make our keepalive so we can do statistics on worker utilization (up or down? in use?)
app.add_periodic_task(
60, # seconds
signature('apps.web.tasks.worker_keep_alive', args=(WORKER_ID,)),
name='keepalive every 60 seconds',
queue="submission-updates",
)


def _find_only_folder_with_metadata(path):
"""Looks through a bundle for a single folder that contains a metadata file and
returns that folder's name if found"""
Expand Down Expand Up @@ -203,16 +317,7 @@ def _send_update(task_id, status, secret, virtual_host='/', extra=None):
if extra:
task_args['extra'] = extra
logger.info("Updating task=%s status to %s", task_id, status)
with app.connection() as new_connection:
# We need to send on the main virtual host, not whatever host we're currently
# connected to.
new_connection.virtual_host = virtual_host
app.send_task(
'apps.web.tasks.update_submission',
args=(task_id, task_args, secret),
connection=new_connection,
queue="submission-updates",
)
_delay_submission_updates_task('apps.web.tasks.update_submission', (task_id, task_args, secret, WORKER_ID), virtual_host)


def put_blob(url, file_path):
Expand Down Expand Up @@ -275,6 +380,8 @@ def run(task_id, task_args):
cache_dir = os.environ.get('SUBMISSION_CACHE_DIR', '/tmp/cache')
root_dir = None

worker_job_started(WORKER_ID, secret, is_scoring_step)

do_docker_pull(docker_image, task_id, secret)

if not docker_image == ingestion_program_docker_image:
Expand Down Expand Up @@ -748,6 +855,8 @@ def run(task_id, task_args):
put_blob(stdout_url, stdout_file)
put_blob(stderr_url, stderr_file)

worker_job_ended(WORKER_ID, secret, is_scoring_step)

if run_ingestion_program:
ingestion_stdout.close()
ingestion_stderr.close()
Expand Down Expand Up @@ -827,6 +936,7 @@ def run(task_id, task_args):
'traceback': traceback.format_exc(),
'metadata': debug_metadata
})
worker_job_ended(WORKER_ID, secret, is_scoring_step)

# comment out for dev and viewing of raw folder outputs.
if root_dir is not None and not os.environ.get("DONT_FINALIZE_SUBMISSION"):
Expand Down