From 3cd3503f7caa6d6a12929dff14617e433cda1218 Mon Sep 17 00:00:00 2001 From: Idir Chikhoune Date: Tue, 31 Mar 2026 14:08:50 +0200 Subject: [PATCH 01/46] feature ok / needs to be tested --- src/apps/competitions/tasks.py | 1 - src/celery_config.py | 9 + .../riot/competitions/detail/detail.tag | 835 ++++++++++++------ src/utils/worker_utils.py | 109 +++ 4 files changed, 703 insertions(+), 251 deletions(-) diff --git a/src/apps/competitions/tasks.py b/src/apps/competitions/tasks.py index ca5a26987..9668a9f45 100644 --- a/src/apps/competitions/tasks.py +++ b/src/apps/competitions/tasks.py @@ -836,7 +836,6 @@ def submission_status_cleanup(): submissions = Submission.objects.filter(status=Submission.RUNNING, has_children=False).select_related('phase', 'parent') for sub in submissions: - # Check if the submission has been running for 24 hours longer than execution_time_limit if sub.started_when < now() - timedelta(milliseconds=(3600000 * 24) + sub.phase.execution_time_limit): if sub.parent is not None: sub.parent.cancel(status=Submission.FAILED) diff --git a/src/celery_config.py b/src/celery_config.py index e8ba5961e..614052be7 100644 --- a/src/celery_config.py +++ b/src/celery_config.py @@ -42,3 +42,12 @@ def app_for_vhost(vhost): vhost_app.conf.task_queues = app.conf.task_queues _vhost_apps[vhost] = vhost_app return _vhost_apps[vhost] + + + +app.conf.beat_schedule = { + "refresh-compute-worker-health": { + "task": "chemin.vers.refresh_compute_worker_health", + "schedule": 5.0, + }, +} \ No newline at end of file diff --git a/src/static/riot/competitions/detail/detail.tag b/src/static/riot/competitions/detail/detail.tag index 7bed78b29..f1dce44fa 100644 --- a/src/static/riot/competitions/detail/detail.tag +++ b/src/static/riot/competitions/detail/detail.tag @@ -6,25 +6,13 @@ - - - diff --git a/src/utils/worker_utils.py b/src/utils/worker_utils.py index 90067e2c2..343f1be82 100644 --- a/src/utils/worker_utils.py +++ b/src/utils/worker_utils.py @@ -124,14 +124,16 @@ def fetch_compute_workers(): continue cw_queue = next((q for q in queues if q["name"] == "compute-worker"), None) + messages_ready = cw_queue.get("messages_ready", 0) if cw_queue else 0 messages_unacked = cw_queue.get("messages_unacknowledged", 0) if cw_queue else 0 + messages_total = cw_queue.get("messages", messages_ready + messages_unacked) if cw_queue else 0 cw_consumers = cw_queue.get("consumers", 0) if cw_queue else 0 queue_stats.append( { "source_name": source_name, - "jobs_pending": messages_ready, + "jobs_pending": messages_total, "jobs_running": messages_unacked, "workers_count": cw_consumers, } From 737017bd031f4005b5ba1084b4b394d905c78895 Mon Sep 17 00:00:00 2001 From: Idir Chikhoune Date: Tue, 16 Jun 2026 11:28:39 +0200 Subject: [PATCH 44/46] queues stats dispay at top --- .../detail/worker-monitor-toggle.tag | 174 +++++++++--------- 1 file changed, 88 insertions(+), 86 deletions(-) diff --git a/src/static/riot/competitions/detail/worker-monitor-toggle.tag b/src/static/riot/competitions/detail/worker-monitor-toggle.tag index ce015d560..c6217d472 100644 --- a/src/static/riot/competitions/detail/worker-monitor-toggle.tag +++ b/src/static/riot/competitions/detail/worker-monitor-toggle.tag @@ -41,6 +41,50 @@ +
+
+
+ Queues stats +
+
+
+ + + + + + + + + + + + + + + + + + + + +
QueuesPendingRunning
+ { qs.source_name } + + { qs.workers_count || 0 } + + + + { qs.jobs_pending } + + + + { qs.jobs_running } + +
+
+
+
@@ -151,49 +195,6 @@
-
-
-
- Queues stats -
-
-
- - - - - - - - - - - - - - - - - - - - -
QueuesPendingRunning
- { qs.source_name } - - { qs.workers_count || 0 } - - - - { qs.jobs_pending } - - - - { qs.jobs_running } - -
-
-
@@ -240,6 +241,50 @@ +
+
+
+ Queues stats +
+
+
+ + + + + + + + + + + + + + + + + + + + +
QueuesPendingRunning
+ { qs.source_name } + + { qs.workers_count || 0 } + + + + { qs.jobs_pending } + + + + { qs.jobs_running } + +
+
+
+
@@ -351,49 +396,6 @@
-
-
-
- Queues stats -
-
-
- - - - - - - - - - - - - - - - - - - - -
QueuesPendingRunning
- { qs.source_name } - - { qs.workers_count || 0 } - - - - { qs.jobs_pending } - - - - { qs.jobs_running } - -
-
-
From 8e0c144acd8fa698c43f256d754c4faca01e397c Mon Sep 17 00:00:00 2001 From: Idir Chikhoune Date: Tue, 16 Jun 2026 14:16:14 +0200 Subject: [PATCH 45/46] Feature group routing for submissions (#2393) * backend & frontend OK / TODO: site worker and leaderboad * site worker sending submissions to group queue OK * leaderboad group feature * logs removed * fix json leaderboard * Clean up leaderboard ordering logic * competition queue on groups with out queue * some bugfix * UI bugfix * UI bugfix * leaderboard groups format parentsubID_groupname * fix conflicts issues * resolve conflict * clean site worker * branch update and linter fix * linter fix * linter fix * linter fix * bugfix group form * adding migrations files * fix logic to fix tests problem * fix logic to fix tests problem * E2E test fixed * E2E test fixed * E2E test fixed * E2E test fixed * Fix queue name in server status * fix queues visibility * fix queue visibility for groups * Flake8 --------- Co-authored-by: didayolo --- src/apps/competitions/tasks.py | 81 ++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/src/apps/competitions/tasks.py b/src/apps/competitions/tasks.py index 7e24d2a49..182b275c4 100644 --- a/src/apps/competitions/tasks.py +++ b/src/apps/competitions/tasks.py @@ -6,11 +6,16 @@ from datetime import timedelta, datetime from django.conf import settings from io import BytesIO +<<<<<<< HEAD from tempfile import NamedTemporaryFile, TemporaryDirectory +======= +from tempfile import TemporaryDirectory, NamedTemporaryFile +>>>>>>> ff231817 (Feature group routing for submissions (#2393)) import oyaml as yaml import requests from celery._state import app_or_default +<<<<<<< HEAD from competitions.models import ( Competition, CompetitionCreationTaskStatus, @@ -19,6 +24,20 @@ Submission, SubmissionDetails, ) +======= +from django.conf import settings +from django.core.exceptions import ObjectDoesNotExist +from django.core.files.base import ContentFile +from django.db.models import Subquery, OuterRef, Count, Case, When, Value, F +from django.db import transaction +from django.utils.text import slugify +from django.utils.timezone import now +from rest_framework.exceptions import ValidationError + +from celery_config import app +from competitions.models import Submission, CompetitionCreationTaskStatus, SubmissionDetails, Competition, \ + CompetitionDump, Phase +>>>>>>> ff231817 (Feature group routing for submissions (#2393)) from competitions.unpackers.utils import CompetitionUnpackingException from competitions.unpackers.v1 import V15Unpacker from competitions.unpackers.v2 import V2Unpacker @@ -246,6 +265,7 @@ def _send_to_compute_worker(submission, is_scoring): time_padding = 60 * 20 # 20 minutes time_limit = submission.phase.execution_time_limit + time_padding +<<<<<<< HEAD <<<<<<< HEAD effective_queue = submission.queue or submission.phase.competition.queue @@ -256,6 +276,8 @@ def _send_to_compute_worker(submission, is_scoring): submission.save(update_fields=["queue"]) ======= +======= +>>>>>>> ccef98f3 (Feature group routing for submissions (#2393)) if ( submission.phase.competition.queue ): # if the competition is running on a custom queue, not the default queue @@ -264,7 +286,20 @@ def _send_to_compute_worker(submission, is_scoring): submission.phase.execution_time_limit ) # use the competition time limit submission.save(update_fields=["queue"]) +<<<<<<< HEAD >>>>>>> 4958f28d (files blacked for fixing the formatting issues) +======= +======= + effective_queue = submission.queue or submission.phase.competition.queue + + if effective_queue: + run_args['execution_time_limit'] = submission.phase.execution_time_limit + if submission.queue != effective_queue: + submission.queue = effective_queue + submission.save(update_fields=["queue"]) + +>>>>>>> ff231817 (Feature group routing for submissions (#2393)) +>>>>>>> ccef98f3 (Feature group routing for submissions (#2393)) if submission.status == Submission.SUBMITTING: submission.status = Submission.SUBMITTED submission.save(update_fields=["status"]) @@ -274,6 +309,7 @@ def _enqueue_after_commit(): if effective_queue: celery_app = app_or_default() with celery_app.connection() as new_connection: +<<<<<<< HEAD <<<<<<< HEAD new_connection.virtual_host = str(effective_queue.vhost) ======= @@ -281,6 +317,14 @@ def _enqueue_after_commit(): submission.phase.competition.queue.vhost ) >>>>>>> 4958f28d (files blacked for fixing the formatting issues) +======= + new_connection.virtual_host = str( + submission.phase.competition.queue.vhost + ) +======= + new_connection.virtual_host = str(effective_queue.vhost) +>>>>>>> ff231817 (Feature group routing for submissions (#2393)) +>>>>>>> ccef98f3 (Feature group routing for submissions (#2393)) task = celery_app.send_task( "compute_worker_run", args=(run_args,), @@ -348,16 +392,25 @@ def send_child_id(submission, child_id): @app.task(queue="site-worker", soft_time_limit=60) def _run_submission(submission_pk, task_pks=None, is_scoring=False): +<<<<<<< HEAD <<<<<<< HEAD select_models = ('phase', 'phase__competition') ======= +======= +>>>>>>> ccef98f3 (Feature group routing for submissions (#2393)) """This function is wrapped so that when we run tests we can run this function not via celery""" select_models = ( "phase", "phase__competition", ) +<<<<<<< HEAD >>>>>>> 4958f28d (files blacked for fixing the formatting issues) +======= +======= + select_models = ('phase', 'phase__competition') +>>>>>>> ff231817 (Feature group routing for submissions (#2393)) +>>>>>>> ccef98f3 (Feature group routing for submissions (#2393)) prefetch_models = ( "details", "phase__tasks__input_data", @@ -378,6 +431,7 @@ def _run_submission(submission_pk, task_pks=None, is_scoring=False): tasks = submission.phase.tasks.filter(pk__in=task_pks) tasks = list(tasks.order_by('pk')) +<<<<<<< HEAD <<<<<<< HEAD if submission.parent is None and not is_scoring: group_queues = _get_user_group_queues(submission.owner, submission.phase.competition) @@ -398,6 +452,27 @@ def _run_submission(submission_pk, task_pks=None, is_scoring=False): ======= tasks = tasks.order_by("pk") >>>>>>> 4958f28d (files blacked for fixing the formatting issues) +======= + tasks = tasks.order_by("pk") +======= + if submission.parent is None and not is_scoring: + group_queues = _get_user_group_queues(submission.owner, submission.phase.competition) + else: + group_queues = [] + + is_multi_task = len(tasks) > 1 + is_multi_queue = len(group_queues) > 1 + + if is_multi_task or is_multi_queue: + if is_multi_task and is_multi_queue: + combos = [(task, queue) for task in tasks for queue in group_queues] + elif is_multi_task: + override = group_queues[0] if group_queues else None + combos = [(task, override) for task in tasks] + else: + combos = [(tasks[0], queue) for queue in group_queues] +>>>>>>> ff231817 (Feature group routing for submissions (#2393)) +>>>>>>> ccef98f3 (Feature group routing for submissions (#2393)) submission.has_children = True submission.save() @@ -412,10 +487,16 @@ def _run_submission(submission_pk, task_pks=None, is_scoring=False): parent=submission, task=task, fact_sheet_answers=submission.fact_sheet_answers, +<<<<<<< HEAD <<<<<<< HEAD queue=queue, ======= >>>>>>> 4958f28d (files blacked for fixing the formatting issues) +======= +======= + queue=queue, +>>>>>>> ff231817 (Feature group routing for submissions (#2393)) +>>>>>>> ccef98f3 (Feature group routing for submissions (#2393)) ) child_sub.save(ignore_submission_limit=True) _send_to_compute_worker(child_sub, is_scoring=False) From d9414f14ad31888da75823e4c546c491c8c226d5 Mon Sep 17 00:00:00 2001 From: Idir Chikhoune Date: Tue, 16 Jun 2026 15:07:56 +0200 Subject: [PATCH 46/46] site worker conflict solved --- src/apps/competitions/tasks.py | 178 +++------------------------------ 1 file changed, 12 insertions(+), 166 deletions(-) diff --git a/src/apps/competitions/tasks.py b/src/apps/competitions/tasks.py index 182b275c4..71acb85c1 100644 --- a/src/apps/competitions/tasks.py +++ b/src/apps/competitions/tasks.py @@ -6,16 +6,11 @@ from datetime import timedelta, datetime from django.conf import settings from io import BytesIO -<<<<<<< HEAD from tempfile import NamedTemporaryFile, TemporaryDirectory -======= -from tempfile import TemporaryDirectory, NamedTemporaryFile ->>>>>>> ff231817 (Feature group routing for submissions (#2393)) import oyaml as yaml import requests from celery._state import app_or_default -<<<<<<< HEAD from competitions.models import ( Competition, CompetitionCreationTaskStatus, @@ -24,20 +19,6 @@ Submission, SubmissionDetails, ) -======= -from django.conf import settings -from django.core.exceptions import ObjectDoesNotExist -from django.core.files.base import ContentFile -from django.db.models import Subquery, OuterRef, Count, Case, When, Value, F -from django.db import transaction -from django.utils.text import slugify -from django.utils.timezone import now -from rest_framework.exceptions import ValidationError - -from celery_config import app -from competitions.models import Submission, CompetitionCreationTaskStatus, SubmissionDetails, Competition, \ - CompetitionDump, Phase ->>>>>>> ff231817 (Feature group routing for submissions (#2393)) from competitions.unpackers.utils import CompetitionUnpackingException from competitions.unpackers.v1 import V15Unpacker from competitions.unpackers.v2 import V2Unpacker @@ -142,40 +123,6 @@ ) # time limit of the default queue -def _get_user_group_queues(user, competition): - all_user_groups = list( - competition.participant_groups - .filter(user__pk=user.pk) - .select_related('queue') - .distinct() - ) - - if not all_user_groups: - return [] - - groups_with_queue = [g for g in all_user_groups if g.queue_id is not None] - has_groups_without_queue = any(g.queue_id is None for g in all_user_groups) - - if not groups_with_queue: - return [] - - seen_ids = set() - queues = [] - for group in groups_with_queue: - if group.queue_id not in seen_ids: - seen_ids.add(group.queue_id) - queues.append(group.queue) - - if has_groups_without_queue: - competition_queue = competition.queue - if competition_queue is None: - queues.append(None) - elif competition_queue.id not in seen_ids: - queues.append(competition_queue) - - return queues - - def _send_to_compute_worker(submission, is_scoring): run_args = { "user_pk": submission.owner.pk, @@ -265,19 +212,6 @@ def _send_to_compute_worker(submission, is_scoring): time_padding = 60 * 20 # 20 minutes time_limit = submission.phase.execution_time_limit + time_padding -<<<<<<< HEAD -<<<<<<< HEAD - effective_queue = submission.queue or submission.phase.competition.queue - - if effective_queue: - run_args['execution_time_limit'] = submission.phase.execution_time_limit - if submission.queue != effective_queue: - submission.queue = effective_queue - submission.save(update_fields=["queue"]) - -======= -======= ->>>>>>> ccef98f3 (Feature group routing for submissions (#2393)) if ( submission.phase.competition.queue ): # if the competition is running on a custom queue, not the default queue @@ -286,45 +220,22 @@ def _send_to_compute_worker(submission, is_scoring): submission.phase.execution_time_limit ) # use the competition time limit submission.save(update_fields=["queue"]) -<<<<<<< HEAD ->>>>>>> 4958f28d (files blacked for fixing the formatting issues) -======= -======= - effective_queue = submission.queue or submission.phase.competition.queue - - if effective_queue: - run_args['execution_time_limit'] = submission.phase.execution_time_limit - if submission.queue != effective_queue: - submission.queue = effective_queue - submission.save(update_fields=["queue"]) - ->>>>>>> ff231817 (Feature group routing for submissions (#2393)) ->>>>>>> ccef98f3 (Feature group routing for submissions (#2393)) if submission.status == Submission.SUBMITTING: + # Don't want to mark an already-prepared submission as "submitted" again, so + # only do this if we were previously "SUBMITTING" submission.status = Submission.SUBMITTED submission.save(update_fields=["status"]) def _enqueue_after_commit(): + # priority of scoring tasks is higher, we don't want to wait around for + # many submissions to be scored while we're waiting for results priority = 10 if is_scoring else 0 - if effective_queue: + if submission.phase.competition.queue: celery_app = app_or_default() with celery_app.connection() as new_connection: -<<<<<<< HEAD -<<<<<<< HEAD - new_connection.virtual_host = str(effective_queue.vhost) -======= - new_connection.virtual_host = str( - submission.phase.competition.queue.vhost - ) ->>>>>>> 4958f28d (files blacked for fixing the formatting issues) -======= new_connection.virtual_host = str( submission.phase.competition.queue.vhost ) -======= - new_connection.virtual_host = str(effective_queue.vhost) ->>>>>>> ff231817 (Feature group routing for submissions (#2393)) ->>>>>>> ccef98f3 (Feature group routing for submissions (#2393)) task = celery_app.send_task( "compute_worker_run", args=(run_args,), @@ -392,25 +303,12 @@ def send_child_id(submission, child_id): @app.task(queue="site-worker", soft_time_limit=60) def _run_submission(submission_pk, task_pks=None, is_scoring=False): -<<<<<<< HEAD -<<<<<<< HEAD - select_models = ('phase', 'phase__competition') -======= -======= ->>>>>>> ccef98f3 (Feature group routing for submissions (#2393)) """This function is wrapped so that when we run tests we can run this function not via celery""" select_models = ( "phase", "phase__competition", ) -<<<<<<< HEAD ->>>>>>> 4958f28d (files blacked for fixing the formatting issues) -======= -======= - select_models = ('phase', 'phase__competition') ->>>>>>> ff231817 (Feature group routing for submissions (#2393)) ->>>>>>> ccef98f3 (Feature group routing for submissions (#2393)) prefetch_models = ( "details", "phase__tasks__input_data", @@ -424,61 +322,24 @@ def _run_submission(submission_pk, task_pks=None, is_scoring=False): submission = qs.get(pk=submission_pk) if submission.is_specific_task_re_run: + # Should only be one task for a specified task submission tasks = Task.objects.filter(pk__in=task_pks) elif task_pks is None: tasks = submission.phase.tasks.all() else: tasks = submission.phase.tasks.filter(pk__in=task_pks) - tasks = list(tasks.order_by('pk')) -<<<<<<< HEAD -<<<<<<< HEAD - if submission.parent is None and not is_scoring: - group_queues = _get_user_group_queues(submission.owner, submission.phase.competition) - else: - group_queues = [] - - is_multi_task = len(tasks) > 1 - is_multi_queue = len(group_queues) > 1 - - if is_multi_task or is_multi_queue: - if is_multi_task and is_multi_queue: - combos = [(task, queue) for task in tasks for queue in group_queues] - elif is_multi_task: - override = group_queues[0] if group_queues else None - combos = [(task, override) for task in tasks] - else: - combos = [(tasks[0], queue) for queue in group_queues] -======= tasks = tasks.order_by("pk") ->>>>>>> 4958f28d (files blacked for fixing the formatting issues) -======= - tasks = tasks.order_by("pk") -======= - if submission.parent is None and not is_scoring: - group_queues = _get_user_group_queues(submission.owner, submission.phase.competition) - else: - group_queues = [] - - is_multi_task = len(tasks) > 1 - is_multi_queue = len(group_queues) > 1 - - if is_multi_task or is_multi_queue: - if is_multi_task and is_multi_queue: - combos = [(task, queue) for task in tasks for queue in group_queues] - elif is_multi_task: - override = group_queues[0] if group_queues else None - combos = [(task, override) for task in tasks] - else: - combos = [(tasks[0], queue) for queue in group_queues] ->>>>>>> ff231817 (Feature group routing for submissions (#2393)) ->>>>>>> ccef98f3 (Feature group routing for submissions (#2393)) + if len(tasks) > 1: + # The initial submission object becomes the parent submission and we create children for each task submission.has_children = True submission.save() + send_parent_status(submission) - for task, queue in combos: + for task in tasks: + # TODO: make a duplicate submission method and use it here child_sub = Submission( owner=submission.owner, phase=submission.phase, @@ -487,30 +348,15 @@ def _run_submission(submission_pk, task_pks=None, is_scoring=False): parent=submission, task=task, fact_sheet_answers=submission.fact_sheet_answers, -<<<<<<< HEAD -<<<<<<< HEAD - queue=queue, -======= ->>>>>>> 4958f28d (files blacked for fixing the formatting issues) -======= -======= - queue=queue, ->>>>>>> ff231817 (Feature group routing for submissions (#2393)) ->>>>>>> ccef98f3 (Feature group routing for submissions (#2393)) ) child_sub.save(ignore_submission_limit=True) _send_to_compute_worker(child_sub, is_scoring=False) send_child_id(submission, child_sub.id) - else: + # The initial submission object is the only submission if not submission.task: submission.task = tasks[0] submission.save() - - if group_queues and submission.queue != group_queues[0]: - submission.queue = group_queues[0] - submission.save(update_fields=['queue']) - _send_to_compute_worker(submission, is_scoring)