From 0aaa02f65443dafce0e2bb9adc5cc6b348af640d Mon Sep 17 00:00:00 2001 From: AybH26 <58746253+AybH26@users.noreply.github.com> Date: Tue, 16 Jun 2026 16:53:06 +0200 Subject: [PATCH 01/10] fix(site_worker): remove watchmedo auto-restart from Celery command --- docker-compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose.yml b/docker-compose.yml index f5b32b753..5b66662c7 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -200,7 +200,7 @@ services: #---------------------------------------------------------------------------------------------------- site_worker: # This auto-reloads - command: ["watchmedo auto-restart -p '*.py' --recursive -- celery -A celery_config worker -B -Q site-worker -l info -n site-worker@%n --concurrency=2"] + command: ["celery -A celery_config worker -B -Q site-worker -l info -n site-worker@%n --concurrency=2"] working_dir: /app/src container_name: site_worker image: django_site-worker From 531ba157f8f7b6c555ecb30e72e836f8a9b3a82d Mon Sep 17 00:00:00 2001 From: Idir Chikhoune Date: Thu, 18 Jun 2026 11:41:00 +0200 Subject: [PATCH 02/10] auto completion for queue selection --- .../riot/competitions/editor/_participation.tag | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/static/riot/competitions/editor/_participation.tag b/src/static/riot/competitions/editor/_participation.tag index 610ff7716..466bfae3a 100644 --- a/src/static/riot/competitions/editor/_participation.tag +++ b/src/static/riot/competitions/editor/_participation.tag @@ -133,11 +133,17 @@ const initUI = () => { try { $('.ui.checkbox', self.root).checkbox() } catch(e) {} + try { if (self.refs && self.refs.group_queue) { const $q = $(self.refs.group_queue) if (!$q.data('dd-init')) { - $q.dropdown({ clearable: true, placeholder: 'None' }) + $q.dropdown({ + clearable: true, + placeholder: 'None', + search: true, + fullTextSearch: true, + }) $q.data('dd-init', true) } } @@ -145,8 +151,10 @@ try { let $rest = $('.ui.dropdown', self.root) - if (self.refs && self.refs.group_queue) $rest = $rest.not(self.refs.group_queue) - if (self.refs && self.refs.group_user_select) $rest = $rest.not(self.refs.group_user_select) + if (self.refs && self.refs.group_queue) + $rest = $rest.not(self.refs.group_queue) + if (self.refs && self.refs.group_user_select) + $rest = $rest.not(self.refs.group_user_select) $rest.dropdown() } catch(e) {} From ab7ce5f986b371730d9d2dfca2ba2c1b298d6965 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Pav=C3=A3o?= Date: Thu, 18 Jun 2026 11:44:40 +0200 Subject: [PATCH 03/10] Update version.json --- version.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/version.json b/version.json index c817aa11d..f85de2f51 100644 --- a/version.json +++ b/version.json @@ -1,5 +1,5 @@ { - "tag_name": "v1.28", - "release_name": "v1.28", - "html_url": "https://github.com/codalab/codabench/releases/tag/v1.28" + "tag_name": "v1.29", + "release_name": "v1.29", + "html_url": "https://github.com/codalab/codabench/releases/tag/v1.29" } From 31f6ea0e0304dc85ad5ee6d16707457b21df99a5 Mon Sep 17 00:00:00 2001 From: jens behley Date: Mon, 18 May 2026 17:09:44 +0200 Subject: [PATCH 04/10] remove also volumes when removing container. --- compute_worker/compute_worker.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/compute_worker/compute_worker.py b/compute_worker/compute_worker.py index c02abcb0f..243dcd16d 100644 --- a/compute_worker/compute_worker.py +++ b/compute_worker/compute_worker.py @@ -968,7 +968,7 @@ async def _run_container_engine_cmd(self, container, kind): await websocket.wait_closed() except Exception as e: logger.error(e) - client.remove_container(container, force=True) + client.remove_container(container, v=True, force=True) logger.debug(f"Container {container.get('Id')} exited with status code : {str(return_Code['StatusCode'])}") @@ -983,7 +983,7 @@ async def _run_container_engine_cmd(self, container, kind): finally: try: # Last chance of removing container - client.remove_container(container.get("Id"), force=True) + client.remove_container(container.get("Id"), v=True, force=True) except Exception: pass @@ -1347,7 +1347,7 @@ def start(self): for container in containers_to_kill: try: - client.remove_container(str(container), force=True) + client.remove_container(str(container), v=True, force=True) except docker.errors.APIError as e: logger.error(e) except Exception as e: @@ -1398,7 +1398,7 @@ def start(self): containers_to_kill = self.scoring_program_container_name try: client.kill(containers_to_kill) - client.remove_container(containers_to_kill, force=True) + client.remove_container(containers_to_kill, v=True, force=True) except docker.errors.APIError as e: logger.error(e) except Exception as e: From c85093d4cf56d008c156938977921e7e477e9efc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adrien=20Pav=C3=A3o?= Date: Thu, 18 Jun 2026 18:33:27 +0200 Subject: [PATCH 05/10] Add average rank computation column (#2351) * Add average rank computation column * Update docs * Fix test * Fix variable --- .../Leaderboard-Functionality.md | 1 + .../Benchmark_Creation/Yaml-Structure.md | 2 +- src/apps/api/serializers/leaderboards.py | 44 +-- src/apps/api/views/competitions.py | 9 +- src/apps/competitions/models.py | 4 +- .../0010_alter_column_computation.py | 18 ++ src/apps/leaderboards/models.py | 2 + src/apps/leaderboards/ranking.py | 108 ++++++++ src/apps/leaderboards/tests/__init__.py | 0 src/apps/leaderboards/tests/test_ranking.py | 259 ++++++++++++++++++ .../riot/competitions/editor/_leaderboard.tag | 1 + 11 files changed, 423 insertions(+), 25 deletions(-) create mode 100644 src/apps/leaderboards/migrations/0010_alter_column_computation.py create mode 100644 src/apps/leaderboards/ranking.py create mode 100644 src/apps/leaderboards/tests/__init__.py create mode 100644 src/apps/leaderboards/tests/test_ranking.py diff --git a/documentation/docs/Organizers/Benchmark_Creation/Leaderboard-Functionality.md b/documentation/docs/Organizers/Benchmark_Creation/Leaderboard-Functionality.md index ded5fffa4..799e971b6 100644 --- a/documentation/docs/Organizers/Benchmark_Creation/Leaderboard-Functionality.md +++ b/documentation/docs/Organizers/Benchmark_Creation/Leaderboard-Functionality.md @@ -49,6 +49,7 @@ Computation options are: - avg - min - max + - avg_rank These are applied across the columns specified as `computation_indexes`. diff --git a/documentation/docs/Organizers/Benchmark_Creation/Yaml-Structure.md b/documentation/docs/Organizers/Benchmark_Creation/Yaml-Structure.md index 16bf7b0e3..12c8ffaa0 100644 --- a/documentation/docs/Organizers/Benchmark_Creation/Yaml-Structure.md +++ b/documentation/docs/Organizers/Benchmark_Creation/Yaml-Structure.md @@ -259,7 +259,7 @@ fact_sheet: { - Ascending: smaller scores are better - Descending: larger scores are better - **computation:** computation to be applied *must be accompanied by computation indexes* - - computation options: sum, avg, min, max + - computation options: sum, avg, min, max, avg_rank - **computation_indexes:** an array of indexes of the columns the computation should be applied to - **precision:** (*integer, default=2*) to round the score to *precision* number of digits - **hidden:** (*boolean, default=False*) to hide/unhide a column on leaderboard diff --git a/src/apps/api/serializers/leaderboards.py b/src/apps/api/serializers/leaderboards.py index 5b5c5e025..b4e244e1f 100644 --- a/src/apps/api/serializers/leaderboards.py +++ b/src/apps/api/serializers/leaderboards.py @@ -99,13 +99,6 @@ class Meta: def get_submissions(self, instance): primary_col = instance.columns.get(index=instance.primary_index) - - ordering = [ - F('primary_col').desc(nulls_last=True) - if primary_col.sorting == 'desc' - else F('primary_col').asc(nulls_last=True) - ] - submissions_qs = ( Submission.objects.filter( leaderboard=instance, @@ -129,25 +122,31 @@ def get_submissions(self, instance): ), ) ) - .annotate(primary_col=Sum( - 'scores__score', - filter=Q(scores__column=primary_col) - )) ) + # AVERAGE_RANK columns have no stored scores; skip DB-level sort and re-sort in the view. + if primary_col.computation == Column.AVERAGE_RANK: + ordering = ['created_when'] + submissions = submissions_qs + else: + ordering = [f'{"-" if primary_col.sorting == "desc" else ""}primary_col'] + submissions = submissions_qs.annotate(primary_col=Sum('scores__score', filter=Q(scores__column=primary_col))) + for column in instance.columns.exclude(id=primary_col.id).order_by('index'): + if column.computation == Column.AVERAGE_RANK: + continue col_name = f'col{column.index}' ordering.append( F(col_name).desc(nulls_last=True) if column.sorting == 'desc' else F(col_name).asc(nulls_last=True) ) - submissions_qs = submissions_qs.annotate(**{ + submissions = submissions.annotate(**{ col_name: Sum('scores__score', filter=Q(scores__column__index=column.index)) }) - submissions_qs = submissions_qs.order_by(*ordering, 'created_when') - return SubmissionLeaderBoardSerializer(submissions_qs, many=True).data + submissions = submissions.order_by(*ordering, 'created_when') + return SubmissionLeaderBoardSerializer(submissions, many=True).data class LeaderboardPhaseSerializer(serializers.ModelSerializer): @@ -183,12 +182,7 @@ def get_submissions(self, instance): # desc == -colname # asc == colname primary_col = instance.leaderboard.columns.get(index=instance.leaderboard.primary_index) - ordering = [ - F('primary_col').desc(nulls_last=True) - if primary_col.sorting == 'desc' - else F('primary_col').asc(nulls_last=True) - ] - submissions = ( + submissions_qs = ( Submission.objects.filter( phase=instance, is_soft_deleted=False, @@ -198,14 +192,22 @@ def get_submissions(self, instance): ) .select_related('owner') .prefetch_related('scores', 'scores__column') - .annotate(primary_col=Sum('scores__score', filter=Q(scores__column=primary_col))) ) + # AVERAGE_RANK columns have no stored scores; skip DB-level sort and re-sort in the view. + if primary_col.computation == Column.AVERAGE_RANK: + ordering = ['created_when'] + submissions = submissions_qs + else: + ordering = [f'{"-" if primary_col.sorting == "desc" else ""}primary_col'] + submissions = submissions_qs.annotate(primary_col=Sum('scores__score', filter=Q(scores__column=primary_col))) for column in ( instance.leaderboard.columns .filter(hidden=False) .exclude(id=primary_col.id) .order_by('index') ): + if column.computation == Column.AVERAGE_RANK: + continue col_name = f'col{column.index}' ordering.append( F(col_name).desc(nulls_last=True) diff --git a/src/apps/api/views/competitions.py b/src/apps/api/views/competitions.py index 6eb994f12..a8e4c1b49 100644 --- a/src/apps/api/views/competitions.py +++ b/src/apps/api/views/competitions.py @@ -31,7 +31,8 @@ from datasets.models import Data from competitions.tasks import batch_send_email, manual_migration, create_competition_dump from competitions.utils import get_popular_competitions, get_recent_competitions -from leaderboards.models import Leaderboard +from leaderboards.models import Leaderboard, Column +from leaderboards.ranking import inject_average_ranks from utils.data import make_url_sassy from api.permissions import IsOrganizerOrCollaborator from django.db import transaction @@ -937,6 +938,12 @@ def _clean_group_label(raw_name, submission_parent_id=None): for k, v in submissions_keys.items(): response['submissions'][v]['detailed_results'] = submission_detailed_results[k] + # Compute average rank for any AVERAGE_RANK columns and inject into response. + col_by_index = {col['index']: col for col in columns} + avg_rank_cols = [col for col in columns if col.get('computation') == Column.AVERAGE_RANK] + if avg_rank_cols: + inject_average_ranks(response['submissions'], avg_rank_cols, col_by_index, response['primary_index']) + # --- pagination addition --- total_count = len(response['submissions']) paginator = DynamicChoicePagination() diff --git a/src/apps/competitions/models.py b/src/apps/competitions/models.py index 87b0b5513..e5facb4fc 100644 --- a/src/apps/competitions/models.py +++ b/src/apps/competitions/models.py @@ -12,7 +12,7 @@ from decimal import Decimal from celery_config import app, app_for_vhost -from leaderboards.models import SubmissionScore +from leaderboards.models import SubmissionScore, Column from profiles.models import CustomGroup, User, Organization from utils.data import PathWrapper from utils.storage import BundleStorage @@ -697,7 +697,7 @@ def check_child_submission_statuses(self): def calculate_scores(self): # leaderboards = self.phase.competition.leaderboards.all() # for leaderboard in leaderboards: - columns = self.phase.leaderboard.columns.exclude(computation__isnull=True) + columns = self.phase.leaderboard.columns.exclude(computation__isnull=True).exclude(computation=Column.AVERAGE_RANK) for column in columns: scores = self.scores.filter(column__index__in=column.computation_indexes.split(',')).values_list('score', flat=True) diff --git a/src/apps/leaderboards/migrations/0010_alter_column_computation.py b/src/apps/leaderboards/migrations/0010_alter_column_computation.py new file mode 100644 index 000000000..11d39e8c3 --- /dev/null +++ b/src/apps/leaderboards/migrations/0010_alter_column_computation.py @@ -0,0 +1,18 @@ +# Generated by Django 5.2.12 on 2026-04-23 09:35 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('leaderboards', '0009_alter_column_id_alter_leaderboard_id_and_more'), + ] + + operations = [ + migrations.AlterField( + model_name='column', + name='computation', + field=models.TextField(blank=True, choices=[('avg', 'Average'), ('sum', 'Sum'), ('min', 'Min'), ('max', 'Max'), ('avg_rank', 'Average Rank')], null=True), + ), + ] diff --git a/src/apps/leaderboards/models.py b/src/apps/leaderboards/models.py index 4c9eda5a2..151d6d12a 100644 --- a/src/apps/leaderboards/models.py +++ b/src/apps/leaderboards/models.py @@ -36,11 +36,13 @@ class Column(models.Model): SUM = 'sum' MIN = 'min' MAX = 'max' + AVERAGE_RANK = 'avg_rank' COMPUTATION_CHOICES = ( (AVERAGE, 'Average'), (SUM, 'Sum'), (MIN, 'Min'), (MAX, 'Max'), + (AVERAGE_RANK, 'Average Rank'), ) SORTING = ( ('desc', 'Descending'), diff --git a/src/apps/leaderboards/ranking.py b/src/apps/leaderboards/ranking.py new file mode 100644 index 000000000..ab0ef8698 --- /dev/null +++ b/src/apps/leaderboards/ranking.py @@ -0,0 +1,108 @@ +from leaderboards.models import Column + + +def fractional_rank(values): + """ + Fractional (average) ranking: tied values receive the mean of the ranks they + would occupy, identical to scipy.stats.rankdata(method='average'). + Rank 1 is assigned to the smallest value. + """ + sorted_vals = sorted(values) + rank_sum = {} + rank_count = {} + for rank, val in enumerate(sorted_vals, start=1): + rank_sum[val] = rank_sum.get(val, 0) + rank + rank_count[val] = rank_count.get(val, 0) + 1 + return [rank_sum[v] / rank_count[v] for v in values] + + +def inject_average_ranks(submissions, avg_rank_cols, col_by_index, primary_index): + """ + For each AVERAGE_RANK column, rank submissions on each referenced sub-column + using fractional (average) ranking, compute the mean rank per submission, and + append it as a synthetic score entry. + If the primary column is AVERAGE_RANK, re-sort the list in-place afterward. + + Fractional ranking: tied submissions share the mean of the ranks they occupy + (e.g. two entries tying for positions 2 and 3 both receive rank 2.5). + Submissions missing a score for a sub-column are placed last (rank = n). + When a submission has multiple scores for the same column (multi-task), they are + summed before ranking, consistent with the ORM annotation in the serializer. + """ + # Pre-aggregate scores per submission per column (sum across tasks). + submission_col_scores = [] + for sub in submissions: + col_scores = {} + for s in sub['scores']: + idx = s['index'] + try: + val = float(s['score']) + except (ValueError, TypeError): + val = None + if idx not in col_scores: + col_scores[idx] = val + elif val is not None: + col_scores[idx] = (col_scores[idx] or 0) + val + submission_col_scores.append(col_scores) + + n = len(submissions) + + for col in avg_rank_cols: + if not col.get('computation_indexes'): + continue + sub_indices = [int(i) for i in col['computation_indexes']] + + per_column_ranks = [] + for sub_idx in sub_indices: + sub_col = col_by_index.get(sub_idx) + if sub_col is None: + continue + + valid_indices = [i for i in range(n) if submission_col_scores[i].get(sub_idx) is not None] + valid_scores = [submission_col_scores[i][sub_idx] for i in valid_indices] + + if not valid_scores: + continue + + # Negate descending columns so rank 1 = highest score. + scores_for_rank = [-s for s in valid_scores] if sub_col['sorting'] == 'desc' else valid_scores + fractions = fractional_rank(scores_for_rank) + + ranks = {i: float(n) for i in range(n)} # default: worst rank for unscored + for pos, sub_i in enumerate(valid_indices): + ranks[sub_i] = fractions[pos] + per_column_ranks.append(ranks) + + if not per_column_ranks: + continue + + is_primary = col['index'] == primary_index + for i, sub in enumerate(submissions): + sub_ranks = [r[i] for r in per_column_ranks] + avg_rank = sum(sub_ranks) / len(sub_ranks) + score_entry = { + 'index': col['index'], + 'column_key': col['key'], + 'score': str(round(avg_rank, col.get('precision', 2))), + 'is_primary': is_primary, + } + # The frontend matches scores by (task_id, column_key). Average rank is + # cross-task, so inject one copy per task that already has scores here. + task_ids = {s['task_id'] for s in sub['scores'] if s.get('task_id') is not None} + for task_id in task_ids: + sub['scores'].append({**score_entry, 'task_id': task_id}) + + primary_col = col_by_index.get(primary_index) + if primary_col and primary_col.get('computation') == Column.AVERAGE_RANK: + reverse = primary_col['sorting'] == 'desc' + + def _sort_key(sub): + for s in sub['scores']: + if s['index'] == primary_index: + try: + return float(s['score']) + except (ValueError, TypeError): + pass + return float('inf') if not reverse else float('-inf') + + submissions.sort(key=_sort_key, reverse=reverse) diff --git a/src/apps/leaderboards/tests/__init__.py b/src/apps/leaderboards/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/apps/leaderboards/tests/test_ranking.py b/src/apps/leaderboards/tests/test_ranking.py new file mode 100644 index 000000000..3056eb0af --- /dev/null +++ b/src/apps/leaderboards/tests/test_ranking.py @@ -0,0 +1,259 @@ +from leaderboards.ranking import fractional_rank, inject_average_ranks + + +# --------------------------------------------------------------------------- +# fractional_rank +# --------------------------------------------------------------------------- + +def test_fractional_rank_no_ties(): + # 3 distinct values → straight 1, 2, 3 + assert fractional_rank([3.0, 1.0, 2.0]) == [3.0, 1.0, 2.0] + + +def test_fractional_rank_two_way_tie(): + # Positions 1 and 2 are tied → both get 1.5 + assert fractional_rank([1.0, 1.0, 2.0]) == [1.5, 1.5, 3.0] + + +def test_fractional_rank_three_way_tie(): + # Positions 1, 2, 3 are all tied → mean(1,2,3) = 2.0 + assert fractional_rank([5.0, 5.0, 5.0]) == [2.0, 2.0, 2.0] + + +def test_fractional_rank_tie_at_end(): + # Two tied at the last positions + assert fractional_rank([1.0, 2.0, 2.0]) == [1.0, 2.5, 2.5] + + +def test_fractional_rank_single_element(): + assert fractional_rank([42.0]) == [1.0] + + +# --------------------------------------------------------------------------- +# Helpers for inject_average_ranks +# --------------------------------------------------------------------------- + +def _make_col(index, key, sorting='desc', computation_indexes=None, precision=2, computation='avg_rank'): + return { + 'id': index, + 'index': index, + 'key': key, + 'title': key, + 'sorting': sorting, + 'computation': computation, + 'computation_indexes': [str(i) for i in (computation_indexes or [])], + 'precision': precision, + 'hidden': False, + } + + +def _make_submission(scores): + """ + scores: list of (column_index, column_key, score_value, task_id) + """ + return { + 'scores': [ + {'index': idx, 'column_key': key, 'score': str(val), 'task_id': tid, 'is_primary': False} + for idx, key, val, tid in scores + ] + } + + +# --------------------------------------------------------------------------- +# inject_average_ranks +# --------------------------------------------------------------------------- + +def test_inject_average_ranks_basic_values(): + # 3 submissions, 1 descending sub-column (higher = better = rank 1). + # Scores 0.9, 0.6, 0.3 → ranks 1, 2, 3 → avg_rank 1.0, 2.0, 3.0 + col0 = _make_col(0, 'col0', sorting='desc') + avg_col = _make_col(2, 'avg_rank', computation_indexes=[0]) + + submissions = [ + _make_submission([(0, 'col0', 0.9, 1)]), + _make_submission([(0, 'col0', 0.6, 1)]), + _make_submission([(0, 'col0', 0.3, 1)]), + ] + + col_by_index = {0: col0, 2: avg_col} + inject_average_ranks(submissions, [avg_col], col_by_index, primary_index=0) + + avg_scores = [ + next(s for s in sub['scores'] if s['column_key'] == 'avg_rank') + for sub in submissions + ] + assert float(avg_scores[0]['score']) == 1.0 + assert float(avg_scores[1]['score']) == 2.0 + assert float(avg_scores[2]['score']) == 3.0 + + +def test_inject_average_ranks_with_ties(): + # Two submissions tied on the sub-column → both get fractional rank 1.5 + col0 = _make_col(0, 'col0', sorting='desc') + avg_col = _make_col(1, 'avg_rank', computation_indexes=[0]) + + submissions = [ + _make_submission([(0, 'col0', 0.8, 1)]), + _make_submission([(0, 'col0', 0.8, 1)]), + _make_submission([(0, 'col0', 0.5, 1)]), + ] + + col_by_index = {0: col0, 1: avg_col} + inject_average_ranks(submissions, [avg_col], col_by_index, primary_index=0) + + avg_scores = [ + float(next(s for s in sub['scores'] if s['column_key'] == 'avg_rank')['score']) + for sub in submissions + ] + assert avg_scores[0] == 1.5 + assert avg_scores[1] == 1.5 + assert avg_scores[2] == 3.0 + + +def test_inject_average_ranks_ascending_column(): + # Ascending column: lower score = better = rank 1 + col0 = _make_col(0, 'col0', sorting='asc') + avg_col = _make_col(1, 'avg_rank', computation_indexes=[0]) + + submissions = [ + _make_submission([(0, 'col0', 0.1, 1)]), # lowest → rank 1 + _make_submission([(0, 'col0', 0.5, 1)]), # rank 2 + _make_submission([(0, 'col0', 0.9, 1)]), # highest → rank 3 + ] + + col_by_index = {0: col0, 1: avg_col} + inject_average_ranks(submissions, [avg_col], col_by_index, primary_index=0) + + avg_scores = [ + float(next(s for s in sub['scores'] if s['column_key'] == 'avg_rank')['score']) + for sub in submissions + ] + assert avg_scores[0] == 1.0 + assert avg_scores[1] == 2.0 + assert avg_scores[2] == 3.0 + + +def test_inject_average_ranks_missing_score_gets_worst_rank(): + # Submission that has scores for other columns but not the avg_rank sub-column + # gets worst rank (= n = 3). It still has a task_id from its other scores so + # the injected entry can be matched by the frontend. + col0 = _make_col(0, 'col0', sorting='desc') + avg_col = _make_col(1, 'avg_rank', computation_indexes=[0]) + + submissions = [ + _make_submission([(0, 'col0', 0.9, 1)]), # rank 1 + _make_submission([(0, 'col0', 0.5, 1)]), # rank 2 + _make_submission([(99, 'other_col', 0.7, 1)]), # no col0 score → rank 3 + ] + + col_by_index = {0: col0, 1: avg_col} + inject_average_ranks(submissions, [avg_col], col_by_index, primary_index=0) + + avg_scores = [ + float(next(s for s in sub['scores'] if s['column_key'] == 'avg_rank')['score']) + for sub in submissions + ] + assert avg_scores[0] == 1.0 + assert avg_scores[1] == 2.0 + assert avg_scores[2] == 3.0 # worst rank = n = 3 + + +def test_inject_average_ranks_two_subcolumns(): + # Average over two sub-columns + col0 = _make_col(0, 'col0', sorting='desc') + col1 = _make_col(1, 'col1', sorting='desc') + avg_col = _make_col(2, 'avg_rank', computation_indexes=[0, 1]) + + # Sub0: col0=0.9 (rank1), col1=0.3 (rank3) → avg 2.0 + # Sub1: col0=0.6 (rank2), col1=0.6 (rank2) → avg 2.0 + # Sub2: col0=0.3 (rank3), col1=0.9 (rank1) → avg 2.0 + submissions = [ + _make_submission([(0, 'col0', 0.9, 1), (1, 'col1', 0.3, 1)]), + _make_submission([(0, 'col0', 0.6, 1), (1, 'col1', 0.6, 1)]), + _make_submission([(0, 'col0', 0.3, 1), (1, 'col1', 0.9, 1)]), + ] + + col_by_index = {0: col0, 1: col1, 2: avg_col} + inject_average_ranks(submissions, [avg_col], col_by_index, primary_index=0) + + avg_scores = [ + float(next(s for s in sub['scores'] if s['column_key'] == 'avg_rank')['score']) + for sub in submissions + ] + assert avg_scores[0] == 2.0 + assert avg_scores[1] == 2.0 + assert avg_scores[2] == 2.0 + + +def test_inject_average_ranks_task_id_propagation(): + # The injected score must carry the same task_id as existing scores so the + # frontend can match it via (task_id, column_key). + col0 = _make_col(0, 'col0', sorting='desc') + avg_col = _make_col(1, 'avg_rank', computation_indexes=[0]) + + task_id = 99 + submissions = [ + _make_submission([(0, 'col0', 0.9, task_id)]), + _make_submission([(0, 'col0', 0.5, task_id)]), + ] + + col_by_index = {0: col0, 1: avg_col} + inject_average_ranks(submissions, [avg_col], col_by_index, primary_index=0) + + for sub in submissions: + injected = [s for s in sub['scores'] if s['column_key'] == 'avg_rank'] + assert len(injected) == 1 + assert injected[0]['task_id'] == task_id + + +def test_inject_average_ranks_multi_task_injects_one_per_task(): + # Multi-task submissions have scores with different task_ids. + # One avg_rank entry must be injected per task_id. + col0 = _make_col(0, 'col0', sorting='desc') + avg_col = _make_col(1, 'avg_rank', computation_indexes=[0]) + + submissions = [ + _make_submission([(0, 'col0', 0.9, 10), (0, 'col0', 0.8, 20)]), + _make_submission([(0, 'col0', 0.5, 10), (0, 'col0', 0.4, 20)]), + ] + + col_by_index = {0: col0, 1: avg_col} + inject_average_ranks(submissions, [avg_col], col_by_index, primary_index=0) + + for sub in submissions: + injected = [s for s in sub['scores'] if s['column_key'] == 'avg_rank'] + injected_task_ids = {s['task_id'] for s in injected} + assert injected_task_ids == {10, 20} + + +def test_inject_average_ranks_sorts_by_primary_avg_rank(): + # When the avg_rank column is the primary, submissions are re-sorted + # ascending (rank 1 = best position = first row). + col0 = _make_col(0, 'col0', sorting='desc') + avg_col = _make_col(1, 'avg_rank', sorting='asc', computation_indexes=[0]) + + # Scores: 0.3 → rank3, 0.9 → rank1, 0.6 → rank2 + # After sort ascending by avg_rank: rank1 first, then rank2, then rank3 + submissions = [ + _make_submission([(0, 'col0', 0.3, 1)]), # will become rank 3 + _make_submission([(0, 'col0', 0.9, 1)]), # will become rank 1 + _make_submission([(0, 'col0', 0.6, 1)]), # will become rank 2 + ] + + col_by_index = {0: col0, 1: avg_col} + inject_average_ranks(submissions, [avg_col], col_by_index, primary_index=1) + + avg_scores = [ + float(next(s for s in sub['scores'] if s['column_key'] == 'avg_rank')['score']) + for sub in submissions + ] + assert avg_scores == [1.0, 2.0, 3.0] + + +def test_inject_average_ranks_no_avg_rank_cols_is_noop(): + submissions = [_make_submission([(0, 'col0', 0.9, 1)])] + original_scores = list(submissions[0]['scores']) + + inject_average_ranks(submissions, [], {}, primary_index=0) + + assert submissions[0]['scores'] == original_scores diff --git a/src/static/riot/competitions/editor/_leaderboard.tag b/src/static/riot/competitions/editor/_leaderboard.tag index 274cb4da6..c53882c06 100644 --- a/src/static/riot/competitions/editor/_leaderboard.tag +++ b/src/static/riot/competitions/editor/_leaderboard.tag @@ -139,6 +139,7 @@
Sum
Min
Max
+
Average Rank
From 54b0460923a0ab5c9e92f36d85e0622fca170c6b Mon Sep 17 00:00:00 2001 From: Idir Chikhoune Date: Tue, 23 Jun 2026 10:40:08 +0200 Subject: [PATCH 06/10] reset branch --- .../riot/competitions/editor/_participation.tag | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/src/static/riot/competitions/editor/_participation.tag b/src/static/riot/competitions/editor/_participation.tag index 466bfae3a..4408daa26 100644 --- a/src/static/riot/competitions/editor/_participation.tag +++ b/src/static/riot/competitions/editor/_participation.tag @@ -133,17 +133,11 @@ const initUI = () => { try { $('.ui.checkbox', self.root).checkbox() } catch(e) {} - try { if (self.refs && self.refs.group_queue) { const $q = $(self.refs.group_queue) if (!$q.data('dd-init')) { - $q.dropdown({ - clearable: true, - placeholder: 'None', - search: true, - fullTextSearch: true, - }) + $q.dropdown({ clearable: true, placeholder: 'None' }) $q.data('dd-init', true) } } @@ -151,10 +145,8 @@ try { let $rest = $('.ui.dropdown', self.root) - if (self.refs && self.refs.group_queue) - $rest = $rest.not(self.refs.group_queue) - if (self.refs && self.refs.group_user_select) - $rest = $rest.not(self.refs.group_user_select) + if (self.refs && self.refs.group_queue) $rest = $rest.not(self.refs.group_queue) + if (self.refs && self.refs.group_user_select) $rest = $rest.not(self.refs.group_user_select) $rest.dropdown() } catch(e) {} @@ -560,4 +552,4 @@ .ui.dropdown.multiple { width: 100% !important; } .ui.modal { z-index: 10000; } - \ No newline at end of file + From 03cd571e67586863891411c84cac1f1c92a608fd Mon Sep 17 00:00:00 2001 From: Idir Chikhoune Date: Tue, 23 Jun 2026 11:10:18 +0200 Subject: [PATCH 07/10] auto completion fixed --- .../competitions/editor/_participation.tag | 38 ++++++++++++++----- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/src/static/riot/competitions/editor/_participation.tag b/src/static/riot/competitions/editor/_participation.tag index 4408daa26..ee799ac44 100644 --- a/src/static/riot/competitions/editor/_participation.tag +++ b/src/static/riot/competitions/editor/_participation.tag @@ -91,10 +91,7 @@
- +
@@ -137,7 +134,24 @@ if (self.refs && self.refs.group_queue) { const $q = $(self.refs.group_queue) if (!$q.data('dd-init')) { - $q.dropdown({ clearable: true, placeholder: 'None' }) + ;(self.available_queues || []).forEach(q => { + $q.append(new Option(q.name, q.id)) + }) + $q.dropdown({ + apiSettings: { + url: `${URLS.API}queues/?search={query}&public=true`, + cache: false, + }, + clearable: true, + minCharacters: 2, + fields: { + remoteValues: 'results', + value: 'id', + name: 'name', + }, + maxResults: 5, + onChange: () => {} + }) $q.data('dd-init', true) } } @@ -145,8 +159,10 @@ try { let $rest = $('.ui.dropdown', self.root) - if (self.refs && self.refs.group_queue) $rest = $rest.not(self.refs.group_queue) - if (self.refs && self.refs.group_user_select) $rest = $rest.not(self.refs.group_user_select) + if (self.refs && self.refs.group_queue) + $rest = $rest.not(self.refs.group_queue) + if (self.refs && self.refs.group_user_select) + $rest = $rest.not(self.refs.group_user_select) $rest.dropdown() } catch(e) {} @@ -299,8 +315,12 @@ const qFound = (self.available_queues || []).find(x => x.name === group.queue) if (qFound) queueId = qFound.id } - if (queueId) $(self.refs.group_queue).dropdown('set selected', String(queueId)) - else $(self.refs.group_queue).dropdown('clear') + if (queueId && group.queue) { + $(self.refs.group_queue).dropdown('set text', String(group.queue)) + $(self.refs.group_queue).dropdown('set value', String(queueId)) + } else { + $(self.refs.group_queue).dropdown('clear') + } } catch(e) {} try { From e829e09648636fd46e1ac7184cbede76031d99ef Mon Sep 17 00:00:00 2001 From: Idir Chikhoune Date: Tue, 23 Jun 2026 14:24:43 +0200 Subject: [PATCH 08/10] Worker status/monitoring for competition organizers (from rabbit) (#2395) * feature ok / needs to be tested * feature deactivated by default + hiden behind a button by default + visibile only by admin or comp organizer or collaborators * worker monitoring button behind user menu * files blacked for fixing the formatting issues * fixing synthax and format * rebase on dev * clean feature * git rebase continue * feature in progress * git rebase continue * git rebase continue * compute worker monitoring on private queues (amazing stuff) * test number 245 * git rebase continue * rebase and fix incoming * rebase and fix incoming * feature clean * conflicts solved * conflicts solved * private CW pb solved * linter fix * remove comment * feature en cours * monitor queues feature for admin ok * UX/UI improved * UI/UX improvment * final push * adding colors (final push) * panel resize * UI cleaning * save panel state * revert details.tag before CW monitoring merge * worker jobs fix first try * adding queue jobs display * queue jobs feaute ux/ui polish * improve ux with animation * worker count feature added * feature polish * bugfix, status on CW instead of queue * bugfix, status on CW instead of queue * bugfix, status on CW instead of queue * bugfix, status on CW instead of queue * bugfix, queue stats + organizer panel ux fix * queues stats dispay at top * Feature group routing for submissions (#2393) * backend & frontend OK / TODO: site worker and leaderboad * site worker sending submissions to group queue OK * leaderboad group feature * logs removed * fix json leaderboard * Clean up leaderboard ordering logic * competition queue on groups with out queue * some bugfix * UI bugfix * UI bugfix * leaderboard groups format parentsubID_groupname * fix conflicts issues * resolve conflict * clean site worker * branch update and linter fix * linter fix * linter fix * linter fix * bugfix group form * adding migrations files * fix logic to fix tests problem * fix logic to fix tests problem * E2E test fixed * E2E test fixed * E2E test fixed * E2E test fixed * Fix queue name in server status * fix queues visibility * fix queue visibility for groups * Flake8 --------- Co-authored-by: didayolo * site worker conflict solved * ux button check --------- Co-authored-by: didayolo --- src/apps/competitions/tasks.py | 634 ++++----- src/settings/base.py | 4 - .../riot/competitions/detail/_header.tag | 2 - .../riot/competitions/detail/detail.tag | 679 +--------- .../detail/worker-monitor-toggle.tag | 1189 ++++++++++++++--- src/templates/pages/monitor_queues.html | 16 + src/utils/consumers.py | 122 +- src/utils/worker_utils.py | 190 ++- 8 files changed, 1589 insertions(+), 1247 deletions(-) diff --git a/src/apps/competitions/tasks.py b/src/apps/competitions/tasks.py index ca5a26987..71acb85c1 100644 --- a/src/apps/competitions/tasks.py +++ b/src/apps/competitions/tasks.py @@ -4,31 +4,36 @@ import traceback import zipfile from datetime import timedelta, datetime - +from django.conf import settings from io import BytesIO -from tempfile import TemporaryDirectory, NamedTemporaryFile +from tempfile import NamedTemporaryFile, TemporaryDirectory import oyaml as yaml import requests from celery._state import app_or_default -from django.conf import settings +from competitions.models import ( + Competition, + CompetitionCreationTaskStatus, + CompetitionDump, + Phase, + Submission, + SubmissionDetails, +) +from competitions.unpackers.utils import CompetitionUnpackingException +from competitions.unpackers.v1 import V15Unpacker +from competitions.unpackers.v2 import V2Unpacker +from datasets.models import Data from django.core.exceptions import ObjectDoesNotExist from django.core.files.base import ContentFile -from django.db.models import Subquery, OuterRef, Count, Case, When, Value, F from django.db import transaction +from django.db.models import Case, Count, F, OuterRef, Subquery, Value, When from django.utils.text import slugify from django.utils.timezone import now +from leaderboards.models import Leaderboard from rest_framework.exceptions import ValidationError +from tasks.models import Task from celery_config import app -from competitions.models import Submission, CompetitionCreationTaskStatus, SubmissionDetails, Competition, \ - CompetitionDump, Phase -from competitions.unpackers.utils import CompetitionUnpackingException -from competitions.unpackers.v1 import V15Unpacker -from competitions.unpackers.v2 import V2Unpacker -from leaderboards.models import Leaderboard -from tasks.models import Task -from datasets.models import Data from utils.data import make_url_sassy from utils.email import codalab_send_markdown_email @@ -53,35 +58,35 @@ "reward", "contact_email", "fact_sheet", - "forum_enabled" + "forum_enabled", ] TASK_FIELDS = [ - 'name', - 'description', - 'key', - 'is_public', + "name", + "description", + "key", + "is_public", ] SOLUTION_FIELDS = [ - 'name', - 'description', - 'tasks', - 'key', + "name", + "description", + "tasks", + "key", ] PHASE_FIELDS = [ - 'index', - 'name', - 'description', - 'start', - 'end', - 'max_submissions_per_day', - 'max_submissions_per_person', - 'execution_time_limit', - 'auto_migrate_to_this_phase', - 'hide_output', - 'hide_prediction_output', - 'hide_score_output', + "index", + "name", + "description", + "start", + "end", + "max_submissions_per_day", + "max_submissions_per_person", + "execution_time_limit", + "auto_migrate_to_this_phase", + "hide_output", + "hide_prediction_output", + "hide_score_output", ] PHASE_FILES = [ "input_data", @@ -91,15 +96,12 @@ "public_data", "starting_kit", ] -PAGE_FIELDS = [ - "title" -] +PAGE_FIELDS = ["title"] LEADERBOARD_FIELDS = [ - 'title', - 'key', - 'hidden', - 'submission_rule', - + "title", + "key", + "hidden", + "submission_rule", # For later # 'force_submission_to_leaderboard', # 'force_best_submission_to_leaderboard', @@ -107,50 +109,18 @@ ] COLUMN_FIELDS = [ - 'title', - 'key', - 'index', - 'sorting', - 'computation', - 'computation_indexes', - 'hidden', - 'precision', + "title", + "key", + "index", + "sorting", + "computation", + "computation_indexes", + "hidden", + "precision", ] -MAX_EXECUTION_TIME_LIMIT = int(os.environ.get('MAX_EXECUTION_TIME_LIMIT', 600)) # time limit of the default queue - - -def _get_user_group_queues(user, competition): - all_user_groups = list( - competition.participant_groups - .filter(user__pk=user.pk) - .select_related('queue') - .distinct() - ) - - if not all_user_groups: - return [] - - groups_with_queue = [g for g in all_user_groups if g.queue_id is not None] - has_groups_without_queue = any(g.queue_id is None for g in all_user_groups) - - if not groups_with_queue: - return [] - - seen_ids = set() - queues = [] - for group in groups_with_queue: - if group.queue_id not in seen_ids: - seen_ids.add(group.queue_id) - queues.append(group.queue) - - if has_groups_without_queue: - competition_queue = competition.queue - if competition_queue is None: - queues.append(None) - elif competition_queue.id not in seen_ids: - queues.append(competition_queue) - - return queues +MAX_EXECUTION_TIME_LIMIT = int( + os.environ.get("MAX_EXECUTION_TIME_LIMIT", 600) +) # time limit of the default queue def _send_to_compute_worker(submission, is_scoring): @@ -159,43 +129,51 @@ def _send_to_compute_worker(submission, is_scoring): "submissions_api_url": settings.SUBMISSIONS_API_URL, "secret": submission.secret, "docker_image": submission.phase.competition.docker_image, - "execution_time_limit": min(MAX_EXECUTION_TIME_LIMIT, submission.phase.execution_time_limit), + "execution_time_limit": min( + MAX_EXECUTION_TIME_LIMIT, submission.phase.execution_time_limit + ), "id": submission.pk, "is_scoring": is_scoring, } - if not submission.detailed_result.name and submission.phase.competition.enable_detailed_results: - submission.detailed_result.save('detailed_results.html', ContentFile(''.encode())) # must encode here for GCS - submission.save(update_fields=['detailed_result']) + if ( + not submission.detailed_result.name + and submission.phase.competition.enable_detailed_results + ): + submission.detailed_result.save( + "detailed_results.html", ContentFile("".encode()) + ) # must encode here for GCS + submission.save(update_fields=["detailed_result"]) if not submission.prediction_result.name: - submission.prediction_result.save('prediction_result.zip', ContentFile(''.encode())) # must encode here for GCS - submission.save(update_fields=['prediction_result']) + submission.prediction_result.save( + "prediction_result.zip", ContentFile("".encode()) + ) # must encode here for GCS + submission.save(update_fields=["prediction_result"]) if not submission.scoring_result.name: - submission.scoring_result.save('scoring_result.zip', ContentFile(''.encode())) # must encode here for GCS - submission.save(update_fields=['scoring_result']) + submission.scoring_result.save( + "scoring_result.zip", ContentFile("".encode()) + ) # must encode here for GCS + submission.save(update_fields=["scoring_result"]) submission = Submission.objects.get(id=submission.id) task = submission.task if not is_scoring: - run_args['prediction_result'] = make_url_sassy( - path=submission.prediction_result.name, - permission='w' + run_args["prediction_result"] = make_url_sassy( + path=submission.prediction_result.name, permission="w" ) else: if submission.phase.competition.enable_detailed_results: - run_args['detailed_results_url'] = make_url_sassy( + run_args["detailed_results_url"] = make_url_sassy( path=submission.detailed_result.name, - permission='w', - content_type='text/html' + permission="w", + content_type="text/html", ) - run_args['prediction_result'] = make_url_sassy( - path=submission.prediction_result.name, - permission='r' + run_args["prediction_result"] = make_url_sassy( + path=submission.prediction_result.name, permission="r" ) - run_args['scoring_result'] = make_url_sassy( - path=submission.scoring_result.name, - permission='w' + run_args["scoring_result"] = make_url_sassy( + path=submission.scoring_result.name, permission="w" ) if task.ingestion_program: @@ -203,12 +181,12 @@ def _send_to_compute_worker(submission, is_scoring): run_args['ingestion_program_data'] = make_url_sassy(task.ingestion_program.data_file.name) if task.input_data and (not is_scoring or task.ingestion_only_during_scoring): - run_args['input_data'] = make_url_sassy(task.input_data.data_file.name) + run_args["input_data"] = make_url_sassy(task.input_data.data_file.name) if is_scoring and task.reference_data: - run_args['reference_data'] = make_url_sassy(task.reference_data.data_file.name) + run_args["reference_data"] = make_url_sassy(task.reference_data.data_file.name) - run_args['ingestion_only_during_scoring'] = task.ingestion_only_during_scoring + run_args["ingestion_only_during_scoring"] = task.ingestion_only_during_scoring if is_scoring: run_args['scoring_program_data'] = make_url_sassy(path=task.scoring_program.data_file.name) @@ -234,37 +212,43 @@ def _send_to_compute_worker(submission, is_scoring): time_padding = 60 * 20 # 20 minutes time_limit = submission.phase.execution_time_limit + time_padding - effective_queue = submission.queue or submission.phase.competition.queue - - if effective_queue: - run_args['execution_time_limit'] = submission.phase.execution_time_limit - if submission.queue != effective_queue: - submission.queue = effective_queue - submission.save(update_fields=["queue"]) - + if ( + submission.phase.competition.queue + ): # if the competition is running on a custom queue, not the default queue + submission.queue = submission.phase.competition.queue + run_args["execution_time_limit"] = ( + submission.phase.execution_time_limit + ) # use the competition time limit + submission.save(update_fields=["queue"]) if submission.status == Submission.SUBMITTING: + # Don't want to mark an already-prepared submission as "submitted" again, so + # only do this if we were previously "SUBMITTING" submission.status = Submission.SUBMITTED submission.save(update_fields=["status"]) def _enqueue_after_commit(): + # priority of scoring tasks is higher, we don't want to wait around for + # many submissions to be scored while we're waiting for results priority = 10 if is_scoring else 0 - if effective_queue: + if submission.phase.competition.queue: celery_app = app_or_default() with celery_app.connection() as new_connection: - new_connection.virtual_host = str(effective_queue.vhost) + new_connection.virtual_host = str( + submission.phase.competition.queue.vhost + ) task = celery_app.send_task( - 'compute_worker_run', + "compute_worker_run", args=(run_args,), - queue='compute-worker', + queue="compute-worker", soft_time_limit=time_limit, connection=new_connection, priority=priority, ) else: task = app.send_task( - 'compute_worker_run', + "compute_worker_run", args=(run_args,), - queue='compute-worker', + queue="compute-worker", soft_time_limit=time_limit, priority=priority, ) @@ -276,8 +260,12 @@ def _enqueue_after_commit(): def create_detailed_output_file(detail_name, submission): # Detail logs like stdout/etc. - new_details = SubmissionDetails.objects.create(submission=submission, name=detail_name) - new_details.data_file.save(f'{detail_name}.txt', ContentFile(''.encode())) # must encode here for GCS + new_details = SubmissionDetails.objects.create( + submission=submission, name=detail_name + ) + new_details.data_file.save( + f"{detail_name}.txt", ContentFile("".encode()) + ) # must encode here for GCS return make_url_sassy(new_details.data_file.name, permission="w") @@ -288,74 +276,70 @@ def run_submission(submission_pk, tasks=None, is_scoring=False): def send_submission_message(submission, data): from channels.layers import get_channel_layer + channel_layer = get_channel_layer() user = submission.owner - asyncio.get_event_loop().run_until_complete(channel_layer.group_send(f"submission_listening_{user.pk}", { - 'type': 'submission.message', - 'text': data, - 'submission_id': submission.pk, - })) + asyncio.get_event_loop().run_until_complete( + channel_layer.group_send( + f"submission_listening_{user.pk}", + { + "type": "submission.message", + "text": data, + "submission_id": submission.pk, + }, + ) + ) def send_parent_status(submission): """Helper function we can mock in tests, instead of having to do async mocks""" - send_submission_message(submission, { - "kind": "status_update", - "status": "Running" - }) + send_submission_message(submission, {"kind": "status_update", "status": "Running"}) def send_child_id(submission, child_id): """Helper function we can mock in tests, instead of having to do async mocks""" - send_submission_message(submission, { - "kind": "child_update", - "child_id": child_id - }) + send_submission_message(submission, {"kind": "child_update", "child_id": child_id}) -@app.task(queue='site-worker', soft_time_limit=60) +@app.task(queue="site-worker", soft_time_limit=60) def _run_submission(submission_pk, task_pks=None, is_scoring=False): - select_models = ('phase', 'phase__competition') + """This function is wrapped so that when we run tests we can run this function not + via celery""" + select_models = ( + "phase", + "phase__competition", + ) prefetch_models = ( - 'details', - 'phase__tasks__input_data', - 'phase__tasks__reference_data', - 'phase__tasks__scoring_program', - 'phase__tasks__ingestion_program', + "details", + "phase__tasks__input_data", + "phase__tasks__reference_data", + "phase__tasks__scoring_program", + "phase__tasks__ingestion_program", + ) + qs = Submission.objects.select_related(*select_models).prefetch_related( + *prefetch_models ) - qs = Submission.objects.select_related(*select_models).prefetch_related(*prefetch_models) submission = qs.get(pk=submission_pk) if submission.is_specific_task_re_run: + # Should only be one task for a specified task submission tasks = Task.objects.filter(pk__in=task_pks) elif task_pks is None: tasks = submission.phase.tasks.all() else: tasks = submission.phase.tasks.filter(pk__in=task_pks) - tasks = list(tasks.order_by('pk')) - - if submission.parent is None and not is_scoring: - group_queues = _get_user_group_queues(submission.owner, submission.phase.competition) - else: - group_queues = [] - is_multi_task = len(tasks) > 1 - is_multi_queue = len(group_queues) > 1 - - if is_multi_task or is_multi_queue: - if is_multi_task and is_multi_queue: - combos = [(task, queue) for task in tasks for queue in group_queues] - elif is_multi_task: - override = group_queues[0] if group_queues else None - combos = [(task, override) for task in tasks] - else: - combos = [(tasks[0], queue) for queue in group_queues] + tasks = tasks.order_by("pk") + if len(tasks) > 1: + # The initial submission object becomes the parent submission and we create children for each task submission.has_children = True submission.save() + send_parent_status(submission) - for task, queue in combos: + for task in tasks: + # TODO: make a duplicate submission method and use it here child_sub = Submission( owner=submission.owner, phase=submission.phase, @@ -364,25 +348,19 @@ def _run_submission(submission_pk, task_pks=None, is_scoring=False): parent=submission, task=task, fact_sheet_answers=submission.fact_sheet_answers, - queue=queue, ) child_sub.save(ignore_submission_limit=True) _send_to_compute_worker(child_sub, is_scoring=False) send_child_id(submission, child_sub.id) - else: + # The initial submission object is the only submission if not submission.task: submission.task = tasks[0] submission.save() - - if group_queues and submission.queue != group_queues[0]: - submission.queue = group_queues[0] - submission.save(update_fields=['queue']) - _send_to_compute_worker(submission, is_scoring) -@app.task(queue='site-worker', soft_time_limit=60 * 60) # 1 hour timeout +@app.task(queue="site-worker", soft_time_limit=60 * 60) # 1 hour timeout def unpack_competition(status_pk): logger.info(f"Starting unpack with status pk = {status_pk}") status = CompetitionCreationTaskStatus.objects.get(pk=status_pk) @@ -403,8 +381,12 @@ def mark_status_as_failed_and_delete_dataset(competition_creation_status, detail # Extract bundle try: with NamedTemporaryFile(mode="w+b") as temp_file: - logger.info(f"Download competition bundle: {competition_dataset.data_file.name}") - competition_bundle_url = make_url_sassy(competition_dataset.data_file.name) + logger.info( + f"Download competition bundle: {competition_dataset.data_file.name}" + ) + competition_bundle_url = make_url_sassy( + competition_dataset.data_file.url + ) try: with requests.get(competition_bundle_url, stream=True) as r: r.raise_for_status() @@ -412,12 +394,14 @@ def mark_status_as_failed_and_delete_dataset(competition_creation_status, detail temp_file.write(chunk) r.close() except requests.exceptions.RequestException as e: - raise CompetitionUnpackingException(f"Failed to download bundle from storage: {e}") + raise CompetitionUnpackingException( + f"Failed to download bundle from storage: {e}" + ) # seek back to the start of the tempfile after writing to it.. temp_file.seek(0) - with zipfile.ZipFile(temp_file.name, 'r') as zip_pointer: + with zipfile.ZipFile(temp_file.name, "r") as zip_pointer: zip_pointer.extractall(temp_directory) except zipfile.BadZipFile: raise CompetitionUnpackingException("Bad zip file uploaded.") @@ -434,20 +418,24 @@ def mark_status_as_failed_and_delete_dataset(competition_creation_status, detail with open(yaml_path) as f: competition_yaml = yaml.safe_load(f.read()) except yaml.YAMLError as e: - raise CompetitionUnpackingException(f"Error parsing competition.yaml: {e}") + raise CompetitionUnpackingException( + f"Error parsing competition.yaml: {e}" + ) except Exception as e: - raise CompetitionUnpackingException(f"Failed to read competition.yaml: {e}") + raise CompetitionUnpackingException( + f"Failed to read competition.yaml: {e}" + ) - yaml_version = str(competition_yaml.get('version', '1')) + yaml_version = str(competition_yaml.get("version", "1")) logger.info(f"The YAML version is: {yaml_version}") - if yaml_version in ['1', '1.5']: + if yaml_version in ["1", "1.5"]: unpacker_class = V15Unpacker - elif yaml_version == '2': + elif yaml_version == "2": unpacker_class = V2Unpacker else: raise CompetitionUnpackingException( - 'A suitable version could not be found for this competition. Make sure one is supplied in the yaml.' + "A suitable version could not be found for this competition. Make sure one is supplied in the yaml." ) unpacker = unpacker_class( @@ -460,6 +448,7 @@ def mark_status_as_failed_and_delete_dataset(competition_creation_status, detail try: competition = unpacker.save() except ValidationError as e: + def _get_error_string(error_dict): """Helps us nicely print out a ValidationError""" for key, errors in error_dict.items(): @@ -501,7 +490,7 @@ def _get_error_string(error_dict): mark_status_as_failed_and_delete_dataset(status, message) -@app.task(queue='site-worker', soft_time_limit=60 * 10) +@app.task(queue="site-worker", soft_time_limit=60 * 10) def create_competition_dump(competition_pk, keys_instead_of_files=False): yaml_data = {"version": "2"} try: @@ -510,7 +499,7 @@ def create_competition_dump(competition_pk, keys_instead_of_files=False): logger.info(f"Finding competition {competition_pk}") comp = Competition.objects.get(pk=competition_pk) zip_buffer = BytesIO() - current_date_time = datetime.today().strftime('%Y-%m-%d %H:%M:%S') + current_date_time = datetime.today().strftime("%Y-%m-%d %H:%M:%S") zip_name = f"{comp.title}-{current_date_time}.zip" zip_file = zipfile.ZipFile(zip_buffer, "w") @@ -518,14 +507,14 @@ def create_competition_dump(competition_pk, keys_instead_of_files=False): for field in COMPETITION_FIELDS: if hasattr(comp, field): value = getattr(comp, field, "") - if field == 'queue' and value is not None: + if field == "queue" and value is not None: value = str(value.vhost) yaml_data[field] = value if comp.logo: logger.info("Checking logo") try: - yaml_data['image'] = re.sub(r'.*/', '', comp.logo.name) - zip_file.writestr(yaml_data['image'], comp.logo.read()) + yaml_data["image"] = re.sub(r".*/", "", comp.logo.name) + zip_file.writestr(yaml_data["image"], comp.logo.read()) logger.info(f"Logo found for competition {comp.pk}") except OSError: logger.warning( @@ -534,25 +523,25 @@ def create_competition_dump(competition_pk, keys_instead_of_files=False): # -------- Competition Terms ------- if comp.terms: - yaml_data['terms'] = 'terms.md' - zip_file.writestr('terms.md', comp.terms) + yaml_data["terms"] = "terms.md" + zip_file.writestr("terms.md", comp.terms) # -------- Competition Pages ------- - yaml_data['pages'] = [] + yaml_data["pages"] = [] for page in comp.pages.all(): temp_page_data = {} for field in PAGE_FIELDS: if hasattr(page, field): temp_page_data[field] = getattr(page, field, "") page_file_name = f"{slugify(page.title)}-{page.pk}.md" - temp_page_data['file'] = page_file_name - yaml_data['pages'].append(temp_page_data) - zip_file.writestr(temp_page_data['file'], page.content) + temp_page_data["file"] = page_file_name + yaml_data["pages"].append(temp_page_data) + zip_file.writestr(temp_page_data["file"], page.content) # -------- Competition Tasks/Solutions ------- - yaml_data['tasks'] = [] - yaml_data['solutions'] = [] + yaml_data["tasks"] = [] + yaml_data["solutions"] = [] task_solution_pairs = {} tasks = [task for phase in comp.phases.all() for task in phase.tasks.all()] @@ -563,23 +552,18 @@ def create_competition_dump(competition_pk, keys_instead_of_files=False): for index, task in enumerate(tasks): task_solution_pairs[task.id] = { - 'index': index, - 'solutions': { - 'ids': [], - 'indexes': [] - } + "index": index, + "solutions": {"ids": [], "indexes": []}, } - temp_task_data = { - 'index': index - } + temp_task_data = {"index": index} for field in TASK_FIELDS: data = getattr(task, field, "") # If keys_instead of files is not true and field is key, then skip this filed - if not keys_instead_of_files and field == 'key': + if not keys_instead_of_files and field == "key": continue - if field == 'key': + if field == "key": data = str(data) temp_task_data[field] = data @@ -592,116 +576,136 @@ def create_competition_dump(competition_pk, keys_instead_of_files=False): temp_task_data[file_type] = str(temp_dataset.key) else: try: - temp_task_data[file_type] = f"{file_type}-{task.pk}.zip" - zip_file.writestr(temp_task_data[file_type], temp_dataset.data_file.read()) + temp_task_data[file_type] = ( + f"{file_type}-{task.pk}.zip" + ) + zip_file.writestr( + temp_task_data[file_type], + temp_dataset.data_file.read(), + ) except OSError: logger.error( f"The file field is set, but no actual" f" file was found for dataset: {temp_dataset.pk} with name {temp_dataset.name}" ) else: - logger.warning(f"Could not find data file for dataset object: {temp_dataset.pk}") + logger.warning( + f"Could not find data file for dataset object: {temp_dataset.pk}" + ) # Now for all of our solutions for the tasks, write those too for solution in task.solutions.all(): # for index_two, solution in enumerate(task.solutions.all()): # temp_index = index_two # IF OUR SOLUTION WAS ALREADY ADDED - if solution.id in task_solution_pairs[task.id]['solutions']['ids']: - for solution_data in yaml_data['solutions']: - if solution_data['key'] == solution.key: - solution_data['tasks'].append(task.id) + if solution.id in task_solution_pairs[task.id]["solutions"]["ids"]: + for solution_data in yaml_data["solutions"]: + if solution_data["key"] == solution.key: + solution_data["tasks"].append(task.id) break break # Else if our index is already taken - elif index_two in task_solution_pairs[task.id]['solutions']['indexes']: + elif index_two in task_solution_pairs[task.id]["solutions"]["indexes"]: index_two += 1 - task_solution_pairs[task.id]['solutions']['indexes'].append(index_two) - task_solution_pairs[task.id]['solutions']['ids'].append(solution.id) + task_solution_pairs[task.id]["solutions"]["indexes"].append(index_two) + task_solution_pairs[task.id]["solutions"]["ids"].append(solution.id) - temp_solution_data = { - 'index': index_two - } + temp_solution_data = {"index": index_two} for field in SOLUTION_FIELDS: if hasattr(solution, field): data = getattr(solution, field, "") - if field == 'key': + if field == "key": data = str(data) temp_solution_data[field] = data if solution.data: - temp_dataset = getattr(solution, 'data') + temp_dataset = getattr(solution, "data") if temp_dataset: if temp_dataset.data_file: try: - temp_solution_data['path'] = f"solution-{solution.pk}.zip" - zip_file.writestr(temp_solution_data['path'], temp_dataset.data_file.read()) + temp_solution_data["path"] = ( + f"solution-{solution.pk}.zip" + ) + zip_file.writestr( + temp_solution_data["path"], + temp_dataset.data_file.read(), + ) except OSError: logger.error( f"The file field is set, but no actual" f" file was found for dataset: {temp_dataset.pk} with name {temp_dataset.name}" ) else: - logger.warning(f"Could not find data file for dataset object: {temp_dataset.pk}") + logger.warning( + f"Could not find data file for dataset object: {temp_dataset.pk}" + ) # TODO: Make sure logic here is right. Needs to be outputted as a list, but what others can we tie to? - temp_solution_data['tasks'] = [index] - yaml_data['solutions'].append(temp_solution_data) + temp_solution_data["tasks"] = [index] + yaml_data["solutions"].append(temp_solution_data) index_two += 1 # End for loop for solutions; Append tasks data - yaml_data['tasks'].append(temp_task_data) + yaml_data["tasks"].append(temp_task_data) # -------- Competition Phases ------- - yaml_data['phases'] = [] + yaml_data["phases"] = [] for phase in comp.phases.all(): temp_phase_data = {} for field in PHASE_FIELDS: if hasattr(phase, field): - if field == 'start' or field == 'end': + if field == "start" or field == "end": temp_date = getattr(phase, field) if not temp_date: continue temp_date = temp_date.strftime("%Y-%m-%d") temp_phase_data[field] = temp_date - elif field == 'max_submissions_per_person': - temp_phase_data['max_submissions'] = getattr(phase, field) + elif field == "max_submissions_per_person": + temp_phase_data["max_submissions"] = getattr(phase, field) else: temp_phase_data[field] = getattr(phase, field, "") - task_indexes = [task_solution_pairs[task.id]['index'] for task in phase.tasks.all()] - temp_phase_data['tasks'] = task_indexes + task_indexes = [ + task_solution_pairs[task.id]["index"] for task in phase.tasks.all() + ] + temp_phase_data["tasks"] = task_indexes temp_phase_solutions = [] for task in phase.tasks.all(): - temp_phase_solutions += task_solution_pairs[task.id]['solutions']['indexes'] - temp_phase_data['solutions'] = temp_phase_solutions - yaml_data['phases'].append(temp_phase_data) - yaml_data['phases'] = sorted(yaml_data['phases'], key=lambda phase: phase['index']) + temp_phase_solutions += task_solution_pairs[task.id]["solutions"][ + "indexes" + ] + temp_phase_data["solutions"] = temp_phase_solutions + yaml_data["phases"].append(temp_phase_data) + yaml_data["phases"] = sorted( + yaml_data["phases"], key=lambda phase: phase["index"] + ) # -------- Leaderboards ------- - yaml_data['leaderboards'] = [] + yaml_data["leaderboards"] = [] # Have to grab leaderboards from phases - leaderboards = Leaderboard.objects.filter(id__in=comp.phases.all().values_list('leaderboard', flat=True)) + leaderboards = Leaderboard.objects.filter( + id__in=comp.phases.all().values_list("leaderboard", flat=True) + ) for index, leaderboard in enumerate(leaderboards): - ldb_data = { - 'index': index - } + ldb_data = {"index": index} for field in LEADERBOARD_FIELDS: if hasattr(leaderboard, field): ldb_data[field] = getattr(leaderboard, field, "") - ldb_data['columns'] = [] + ldb_data["columns"] = [] for column in leaderboard.columns.all(): col_data = {} for field in COLUMN_FIELDS: if hasattr(column, field): value = getattr(column, field, "") - if field == 'computation_indexes' and value is not None: - value = value.split(',') + if field == "computation_indexes" and value is not None: + value = value.split(",") if value is not None: col_data[field] = value - ldb_data['columns'].append(col_data) - yaml_data['leaderboards'].append(ldb_data) + ldb_data["columns"].append(col_data) + yaml_data["leaderboards"].append(ldb_data) # ------- Finalize -------- logger.info(f"YAML data to be written is: {yaml_data}") - comp_yaml = yaml.safe_dump(yaml_data, default_flow_style=False, allow_unicode=True, encoding="utf-8") + comp_yaml = yaml.safe_dump( + yaml_data, default_flow_style=False, allow_unicode=True, encoding="utf-8" + ) logger.info(f"YAML output: {comp_yaml}") zip_file.writestr("competition.yaml", comp_yaml) zip_file.close() @@ -712,8 +716,8 @@ def create_competition_dump(competition_pk, keys_instead_of_files=False): temp_dataset_bundle = Data.objects.create( created_by=comp.created_by, name=f"{comp.title} Dump #{bundle_count} Created {current_date_time}", - type='competition_bundle', - description='Automatically created competition dump', + type="competition_bundle", + description="Automatically created competition dump", # 'data_file'=, ) logger.info("Saving zip to Competition Bundle") @@ -722,61 +726,74 @@ def create_competition_dump(competition_pk, keys_instead_of_files=False): temp_comp_dump = CompetitionDump.objects.create( dataset=temp_dataset_bundle, status="Finished", - details="Competition Bundle {0} for Competition {1}".format(temp_dataset_bundle.pk, comp.pk), - competition=comp + details="Competition Bundle {0} for Competition {1}".format( + temp_dataset_bundle.pk, comp.pk + ), + competition=comp, + ) + logger.info( + f"Finished creating competition dump: {temp_comp_dump.pk} for competition: {comp.pk}" ) - logger.info(f"Finished creating competition dump: {temp_comp_dump.pk} for competition: {comp.pk}") except ObjectDoesNotExist: - logger.error("Could not find competition with pk {} to create a competition dump".format(competition_pk)) + logger.error( + "Could not find competition with pk {} to create a competition dump".format( + competition_pk + ) + ) -@app.task(queue='site-worker', soft_time_limit=60 * 5) +@app.task(queue="site-worker", soft_time_limit=60 * 5) def do_phase_migrations(): # Update phase statuses - previous_subquery = Phase.objects.filter( - competition=OuterRef('competition'), - end__lte=now() - ).order_by('-index').values('index')[:1] + previous_subquery = ( + Phase.objects.filter(competition=OuterRef("competition"), end__lte=now()) + .order_by("-index") + .values("index")[:1] + ) current_subquery = Phase.objects.filter( - competition=OuterRef('competition'), + competition=OuterRef("competition"), start__lte=now(), end__gt=now(), - ).values('index')[:1] + ).values("index")[:1] - next_subquery = Phase.objects.filter( - competition=OuterRef('competition'), - start__gt=now() - ).order_by('index').values('index')[:1] + next_subquery = ( + Phase.objects.filter(competition=OuterRef("competition"), start__gt=now()) + .order_by("index") + .values("index")[:1] + ) Phase.objects.annotate( previous_index=Subquery(previous_subquery), current_index=Subquery(current_subquery), next_index=Subquery(next_subquery), - ).update(status=Case( - When(index=F('previous_index'), then=Value(Phase.PREVIOUS)), - When(index=F('current_index'), then=Value(Phase.CURRENT)), - When(index=F('next_index'), then=Value(Phase.NEXT)), - default=None - )) + ).update( + status=Case( + When(index=F("previous_index"), then=Value(Phase.PREVIOUS)), + When(index=F("current_index"), then=Value(Phase.CURRENT)), + When(index=F("next_index"), then=Value(Phase.NEXT)), + default=None, + ) + ) # Updating Competitions whose phases have finished migrating to `is_migrating=False` - completed_statuses = [Submission.FINISHED, Submission.FAILED, Submission.CANCELLED, Submission.NONE] - - running_subs_query = Submission.objects.filter( - created_by_migration=OuterRef('pk') - ).exclude( - status__in=completed_statuses - ).values_list('pk')[:1] + completed_statuses = [ + Submission.FINISHED, + Submission.FAILED, + Submission.CANCELLED, + Submission.NONE, + ] + + running_subs_query = ( + Submission.objects.filter(created_by_migration=OuterRef("pk")) + .exclude(status__in=completed_statuses) + .values_list("pk")[:1] + ) Competition.objects.filter( - pk__in=Phase.objects.annotate( - running_subs=Count(Subquery(running_subs_query)) - ).filter( - running_subs=0, - competition__is_migrating=True, - status=Phase.PREVIOUS - ).values_list('competition__pk', flat=True) + pk__in=Phase.objects.annotate(running_subs=Count(Subquery(running_subs_query))) + .filter(running_subs=0, competition__is_migrating=True, status=Phase.PREVIOUS) + .values_list("competition__pk", flat=True) ).update(is_migrating=False) # Checking for new phases to start migrating @@ -784,7 +801,7 @@ def do_phase_migrations(): auto_migrate_to_this_phase=True, start__lte=now(), competition__is_migrating=False, - has_been_migrated=False + has_been_migrated=False, ) logger.info(f"Checking {len(new_phases)} phases for phase migrations.") @@ -793,51 +810,70 @@ def do_phase_migrations(): p.check_future_phase_submissions() -@app.task(queue='site-worker', soft_time_limit=60 * 5) +@app.task(queue="site-worker", soft_time_limit=60 * 5) def manual_migration(phase_id): try: source_phase = Phase.objects.get(id=phase_id) except Phase.DoesNotExist: - logger.error(f'Could not manually migrate phase with id: {phase_id}. Phase could not be found.') + logger.error( + f"Could not manually migrate phase with id: {phase_id}. Phase could not be found." + ) return try: - destination_phase = source_phase.competition.phases.get(index=source_phase.index + 1) + destination_phase = source_phase.competition.phases.get( + index=source_phase.index + 1 + ) except Phase.DoesNotExist: - logger.error(f'Could not manually migrate phase with id: {phase_id}. The next phase could not be found.') + logger.error( + f"Could not manually migrate phase with id: {phase_id}. The next phase could not be found." + ) return - destination_phase.competition.apply_phase_migration(source_phase, destination_phase, force_migration=True) + destination_phase.competition.apply_phase_migration( + source_phase, destination_phase, force_migration=True + ) -@app.task(queue='site-worker', soft_time_limit=60 * 5) +@app.task(queue="site-worker", soft_time_limit=60 * 5) def batch_send_email(comp_id, content): try: - competition = Competition.objects.prefetch_related('participants__user').get(id=comp_id) + competition = Competition.objects.prefetch_related("participants__user").get( + id=comp_id + ) except Competition.DoesNotExist: - logger.error(f'Not sending emails because competition with id {comp_id} could not be found') + logger.error( + f"Not sending emails because competition with id {comp_id} could not be found" + ) return codalab_send_markdown_email( - subject=f'A message from the admins of {competition.title}', + subject=f"A message from the admins of {competition.title}", markdown_content=content, - recipient_list=[participant.user.email for participant in competition.participants.all()] + recipient_list=[ + participant.user.email for participant in competition.participants.all() + ], ) -@app.task(queue='site-worker', soft_time_limit=60 * 5) +@app.task(queue="site-worker", soft_time_limit=60 * 5) def update_phase_statuses(): - competitions = Competition.objects.exclude(phases__in=Phase.objects.filter(is_final_phase=True, end__lt=now())) + competitions = Competition.objects.exclude( + phases__in=Phase.objects.filter(is_final_phase=True, end__lt=now()) + ) for comp in competitions: comp.update_phase_statuses() -@app.task(queue='site-worker') +@app.task(queue="site-worker") def submission_status_cleanup(): - submissions = Submission.objects.filter(status=Submission.RUNNING, has_children=False).select_related('phase', 'parent') + submissions = Submission.objects.filter( + status=Submission.RUNNING, has_children=False + ).select_related("phase", "parent") for sub in submissions: - # Check if the submission has been running for 24 hours longer than execution_time_limit - if sub.started_when < now() - timedelta(milliseconds=(3600000 * 24) + sub.phase.execution_time_limit): + if sub.started_when < now() - timedelta( + milliseconds=(3600000 * 24) + sub.phase.execution_time_limit + ): if sub.parent is not None: sub.parent.cancel(status=Submission.FAILED) else: diff --git a/src/settings/base.py b/src/settings/base.py index 47c76ad6b..d2698bb2a 100644 --- a/src/settings/base.py +++ b/src/settings/base.py @@ -276,10 +276,6 @@ 'task': 'profiles.tasks.clean_non_activated_users', 'schedule': timedelta(days=1), # Run every 24 hours }, - "refresh_compute_worker_health": { - "task": "competitions.tasks.refresh_compute_worker_health", - "schedule": 60, - }, } CELERY_TIMEZONE = 'UTC' CELERY_WORKER_PREFETCH_MULTIPLIER = 1 diff --git a/src/static/riot/competitions/detail/_header.tag b/src/static/riot/competitions/detail/_header.tag index f572fa959..b8234bb1d 100644 --- a/src/static/riot/competitions/detail/_header.tag +++ b/src/static/riot/competitions/detail/_header.tag @@ -36,13 +36,11 @@ Migrate -
diff --git a/src/static/riot/competitions/detail/detail.tag b/src/static/riot/competitions/detail/detail.tag index 7bed78b29..989a62285 100644 --- a/src/static/riot/competitions/detail/detail.tag +++ b/src/static/riot/competitions/detail/detail.tag @@ -1,648 +1,65 @@ -
-
- - - -
- - - - -
- + + + -
\ No newline at end of file diff --git a/src/static/riot/competitions/detail/worker-monitor-toggle.tag b/src/static/riot/competitions/detail/worker-monitor-toggle.tag index d31b64e2e..48ea73e50 100644 --- a/src/static/riot/competitions/detail/worker-monitor-toggle.tag +++ b/src/static/riot/competitions/detail/worker-monitor-toggle.tag @@ -1,17 +1,14 @@ -