Skip to content

Assessment: Implement L1 Filtering and Post-Processing#895

Open
vprashrex wants to merge 18 commits into
mainfrom
feat/assessment-pipeline-l1
Open

Assessment: Implement L1 Filtering and Post-Processing#895
vprashrex wants to merge 18 commits into
mainfrom
feat/assessment-pipeline-l1

Conversation

@vprashrex
Copy link
Copy Markdown
Collaborator

@vprashrex vprashrex commented May 27, 2026

Target issue is #904

Summary

What changed

  • Added L1 pipeline orchestrator to run topic relevance and duplicate detection filters in series.
  • Introduced duplicate detection logic to filter out vague submissions and check for duplicates against a corpus.
  • Integrated L1 results handling in the assessment service, allowing for conditional processing based on L1 outcomes.
  • Added export post-processing (computed columns, filter, sort), stored in the run input and editable via PATCH /runs/{id}/post-processing.
  • Updated export functionality to include L1 results in the output and post-processing, enhancing the assessment reporting capabilities.
  • Added Celery task for executing the L1 pipeline and managing assessment run statuses.

Checklist

Before submitting a pull request, please ensure that you mark these task.

  • Ran fastapi run --reload app/main.py or docker compose up in the repository root and test.
  • If you've fixed a bug or added code that is tested and has test cases.

…icate detection

- Added L1 pipeline orchestrator to run topic relevance and duplicate detection filters in series.
- Introduced duplicate detection logic to filter out vague submissions and check for duplicates against a corpus.
- Created topic relevance filter to assess the relevance of submissions based on user-defined criteria.
- Integrated L1 results handling in the assessment service, allowing for conditional processing based on L1 outcomes.
- Updated export functionality to include L1 results in the output, enhancing the assessment reporting capabilities.
- Added Celery task for executing the L1 pipeline and managing assessment run statuses.
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 27, 2026

Review Change Stack

Warning

Review limit reached

@vprashrex, we couldn't start this review because you've reached your PR review rate limit.

More reviews will be available in 28 minutes and 41 seconds. Learn how PR review limits work.

Your organization has run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available.

Please see our Fair Usage Limits Policy for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 01d57812-46aa-41d5-8d42-3caed81c13c0

📥 Commits

Reviewing files that changed from the base of the PR and between bb30f88 and 26c4230.

📒 Files selected for processing (14)
  • backend/app/crud/assessment/cron.py
  • backend/app/crud/assessment/processing.py
  • backend/app/models/assessment.py
  • backend/app/services/assessment/prefilter/constants.py
  • backend/app/services/assessment/tasks.py
  • backend/app/services/assessment/utils/attachments.py
  • backend/app/services/assessment/utils/export.py
  • backend/app/tests/assessment/test_batch.py
  • backend/app/tests/assessment/test_cron.py
  • backend/app/tests/assessment/test_duplicate_detection.py
  • backend/app/tests/assessment/test_export.py
  • backend/app/tests/assessment/test_pipeline.py
  • backend/app/tests/assessment/test_prefilter_batching.py
  • backend/app/tests/assessment/test_topic_relevance.py
📝 Walkthrough

Walkthrough

This PR implements a multi-stage assessment pipeline infrastructure with prefilter gates (topic relevance and duplicate detection), export-time post-processing (computed columns, filtering, sorting), async task orchestration, run resumption, and comprehensive attachment-type routing. Assessment execution moves from inline batch submission to stage-based Celery dispatch, with polling-based completion detection and stage-wise result accumulation.

Changes

L1 Assessment Pipeline & Post-Processing

Layer / File(s) Summary
Run schema and persisted state
backend/app/models/assessment.py, backend/app/alembic/versions/064_add_prefilter_columns_to_assessment_run.py
AssessmentRun gains stage/stage_status/pipeline/stage_batches for execution tracking; prefilter_total_* and prefilter_object_store_url for gate outputs. AssessmentRunStat and AssessmentRunPublic expose these fields. AssessmentAttachment.type allows "mixed" with per-row type mapping. AssessmentCreate accepts optional prefilter_config and post_processing_config. Migration adds all tracking columns.
Attachment routing and batch payload construction
backend/app/services/assessment/utils/attachments.py, backend/app/crud/assessment/batch.py, backend/app/tests/assessment/test_batch.py
Attachment utilities add resolve_item_type and attachment_type_for_row for per-row "image"/"pdf" resolution from mixed columns, and build_gemini_attachment_parts for Gemini payload generation. build_openai_jsonl and build_google_jsonl accept optional row_indices for remapped identifiers and per-row type_override-based attachment routing. submit_assessment_batch accepts optional preloaded_rows and row_indices. Tests cover type resolution and overrides.
Prefilter request builders and stage registry
backend/app/services/assessment/prefilter/constants.py, backend/app/services/assessment/prefilter/pipeline.py, backend/app/services/assessment/prefilter/topic_relevance.py, backend/app/services/assessment/prefilter/duplicate_detection.py, backend/app/services/assessment/prefilter/request_builder.py, backend/app/services/assessment/stages.py, backend/app/tests/assessment/test_pipeline.py, backend/app/tests/assessment/test_topic_relevance.py, backend/app/tests/assessment/test_duplicate_detection.py
Prefilter constants define provider/model/store identifiers. resolve_prefilter_settings flattens config; build_topic_relevance_requests/parse_topic_relevance_results handle topic gating; build_duplicate_detection_requests/parse_duplicate_detection_results handle duplicate gating. build_request_line routes to provider-specific payloads. stages.py defines stage registry, builds/orders/progresses pipeline, submits prefilter batches, and loads batch results with object-store fallback. Tests verify pipeline construction, request building, and result parsing.
Pipeline task orchestration and run resume
backend/app/celery/tasks/job_execution.py, backend/app/services/assessment/tasks.py, backend/app/services/assessment/service.py, backend/app/api/routes/assessment/runs.py, backend/app/tests/assessment/test_service.py, backend/app/tests/assessment/test_tasks_failure_guard.py, backend/app/tests/assessment/test_prefilter_batching.py
run_assessment_pipeline Celery task dispatches pipeline execution with trace ID propagation. execute_assessment_pipeline guards against timeout/exception and marks failed runs. _orchestrate initializes run pipeline and stage state, short-circuiting completed/failed runs. _submit_stage computes accepted indices, submits prefilter/L2 batches, and persists stage state. start_assessment creates runs per config and dispatches tasks instead of inline submission. resume_assessment_run resets failed-stage runs and re-dispatches. API resume endpoint and post-processing update endpoint wire orchestration. Tests cover Celery dispatch, orchestration state transitions, failure guards, and stage submission.
Stage polling, run progression, and stats
backend/app/crud/assessment/core.py, backend/app/crud/assessment/cron.py, backend/app/crud/assessment/processing.py, backend/app/tests/assessment/test_crud.py, backend/app/tests/assessment/test_cron.py, backend/app/tests/assessment/test_processing.py
CRUD adds update_run_post_processing_config and update_assessment_run_prefilter_stats to persist run input/metrics. compute_run_counts uses internal status groupings (active/completed/failed). Cron switches to stage-based process_run_batches with transient-error tolerance. process_run_batches polls stage batches, persists gate stats/accepted indices, marks stages failed/completed, and dispatches next stage. Tests cover CRUD persistence, transient polling, gate stat accumulation, and stage advancement.
Export post-processing and run-level configuration
backend/app/services/assessment/utils/post_processing.py, backend/app/services/assessment/utils/export.py, backend/app/api/routes/assessment/runs.py, backend/app/api/docs/assessment/update_post_processing.md, backend/app/api/docs/assessment/resume_run.md, backend/app/tests/assessment/test_post_processing.py, backend/app/tests/assessment/test_export.py
Post-processing engine provides evaluate_formula (safe arithmetic with @Column references), apply_computed_columns, apply_filter (AND-combined rule sets), apply_sort (multi-rule priority), and pipeline apply_post_processing. Export utilities expand prefilter JSON fields into top-level columns, rewrite load_export_rows_for_run to emit all dataset rows with merged prefilter/L2 annotations, and route post-processing through JSON and CSV/XLSX serialization. API export endpoint applies post-processing for JSON; patch endpoint persists post-processing config. Tests cover formula evaluation, filter/sort semantics, export field expansion, and prefilter/L2 row merging. Documentation describes resume and post-processing endpoints.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

  • ProjectTech4DevAI/kaapi-backend#820: Modifies build_google_jsonl request key emission; this PR adjusts the same function for row_indices and attachment routing, requiring careful merge review.

Suggested reviewers

  • kartpop
  • AkhileshNegi

Poem

🐰 A pipeline blooms in stages grand,
With prefilters acting as the first command,
Then post-processing polish, sort, and sing,
While async tasks make results spring!
hops gleefully

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/assessment-pipeline-l1

@vprashrex vprashrex self-assigned this Jun 2, 2026
@vprashrex vprashrex added the enhancement New feature or request label Jun 2, 2026
@vprashrex vprashrex linked an issue Jun 2, 2026 that may be closed by this pull request
@vprashrex vprashrex changed the title feat(assessment): Implement L1 and L2 Pipeline Assessment: Implement L1 Filtering and Post-Processing Jun 2, 2026
@vprashrex vprashrex requested review from AkhileshNegi, Prajna1999 and kartpop and removed request for AkhileshNegi June 2, 2026 06:30
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (7)
backend/app/crud/assessment/batch.py (1)

306-316: 💤 Low value

Docstring is missing documentation for new parameters.

The preloaded_rows and row_indices parameters are not documented in the docstring.

📝 Suggested docstring update
     Args:
         session: Database session
         run: The AssessmentRun to process
         dataset: The dataset to read rows from
         config_blob: Resolved configuration blob
         assessment_input: Assessment input config (prompt_template, text_columns, etc.)
         organization_id: Organization ID
         project_id: Project ID
+        preloaded_rows: Pre-filtered rows to use instead of loading from dataset (e.g., post-L1)
+        row_indices: Original row indices for custom_id/key mapping when using filtered rows
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/crud/assessment/batch.py` around lines 306 - 316, The docstring
for the function in backend/app/crud/assessment/batch.py is missing entries for
the new parameters preloaded_rows and row_indices; update the Args section to
document both: describe preloaded_rows as an optional sequence/list of
pre-fetched dataset rows provided to avoid re-reading from the dataset (include
expected element type, e.g., dict/Row), and describe row_indices as an optional
sequence/list of integer indices specifying which dataset rows to process from
the dataset/preloaded_rows; keep wording consistent with the existing entries
(session, run, dataset, config_blob, assessment_input, organization_id,
project_id) and ensure the new parameter descriptions include expected types and
behavior when omitted.
backend/app/services/assessment/utils/attachments.py (1)

100-114: ⚖️ Poor tradeoff

Consider adding HEIC/HEIF magic byte detection.

The _IMAGE_MIME_BY_EXT dictionary includes .heic and .heif extensions, but _image_mime_from_magic doesn't detect these formats via magic bytes. HEIC/HEIF files start with a ftyp box containing brand identifiers like heic, heix, mif1.

This means HEIC URLs without extensions will fail magic-byte detection and fall back to Content-Type probing, which may not always be reliable.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/services/assessment/utils/attachments.py` around lines 100 - 114,
The _image_mime_from_magic function is missing HEIC/HEIF detection; extend it to
detect the ISO BMFF 'ftyp' box by checking blob length (>=12), that blob[4:8] ==
b'ftyp', and that the brand field (e.g., blob[8:12] and/or subsequent bytes)
matches known HEIC/HEIF brands like b'heic', b'heix', b'hevc', b'hevx', b'mif1'
(or similar), and return the appropriate HEIF mime (e.g., "image/heif") when
matched; this aligns magic detection with the _IMAGE_MIME_BY_EXT entries
.heic/.heif so files without extensions are recognized.
backend/app/core/config.py (1)

177-179: ⚡ Quick win

Tenant-specific store id as a global default is risky.

fileSearchStores/inquilabcorpus-782mxjcwisaz looks like a corpus belonging to a specific tenant. Shipping it as the built-in default means any deployment that forgets to set this env var silently runs duplicate detection against that corpus. Consider leaving the default empty and failing fast (or gating L1) when it's unset.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/core/config.py` around lines 177 - 179, The constant
ASSESSMENT_L1_DUPLICATE_STORE_NAME currently hardcodes a tenant-specific store
id; change its default to an empty string (or None) and add a startup/config
validation that fails fast or disables/gates L1 duplicate detection if
ASSESSMENT_L1_DUPLICATE_STORE_NAME is unset. Update the definition of
ASSESSMENT_L1_DUPLICATE_STORE_NAME in config.py to be empty and add a clear
validation check in the app initialization path (e.g., config validation or
startup/init_assessment logic) that raises an error or disables L1 when the
value is empty.
backend/app/services/assessment/tasks.py (1)

13-13: 💤 Low value

Importing private function _load_dataset_rows from another module.

The function _load_dataset_rows is prefixed with underscore, indicating it's intended as a module-private implementation detail. Importing it across module boundaries couples this code to an internal API that could change without notice.

Consider either removing the underscore prefix in the batch module to make it public, or adding a public wrapper function for cross-module use.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/services/assessment/tasks.py` at line 13, The tasks.py module is
importing the private function _load_dataset_rows from
app.crud.assessment.batch; update usage to rely on a public API instead: add a
public wrapper (e.g., load_dataset_rows) or rename the function in
app.crud.assessment.batch to remove the leading underscore, then change the
import in backend/app/services/assessment/tasks.py to import load_dataset_rows
(and keep submit_assessment_batch as-is), ensuring callers reference the new
public symbol rather than the module-private _load_dataset_rows.
backend/app/services/assessment/utils/export.py (1)

674-696: 💤 Low value

Post-processing config not passed in multi-run zip export.

When exporting multiple runs as a zip file, serialize_export_rows is called without post_processing_config (line 684), meaning computed columns, filtering, and sorting won't be applied. This may be intentional since each run could have different configs, but if users expect post-processing to apply consistently, this could be surprising.

Consider whether post-processing should be applied per-run in the zip export path, or document this limitation.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/services/assessment/utils/export.py` around lines 674 - 696, The
zip export omits post-processing because serialize_export_rows is called without
post_processing_config for each run; update the multi-run export loop (the code
iterating over runs_with_rows) to pass the appropriate post_processing_config
into serialize_export_rows so computed columns/filtering/sorting are applied
per-run (e.g., serialize_export_rows(rows, export_format,
post_processing_config=post_processing_config)), or if per-run configs are
needed, derive a per-run post_processing_config and pass that instead; ensure
the variable name you pass matches the surrounding export function's parameter
or local (post_processing_config) and keep export_format and file naming logic
unchanged.
backend/app/services/assessment/utils/post_processing.py (1)

144-158: ⚡ Quick win

Loop variable desc not bound in closure.

The static analysis warning is valid: desc is captured by reference from the enclosing loop. While this works correctly here because sorted() evaluates keys immediately before the next iteration, it's fragile and can break if the code is refactored (e.g., if sort_key were stored for later use).

Bind desc as a default argument like _col already is:

♻️ Proposed fix
-        def sort_key(row: dict[str, Any], _col: str = col) -> tuple:
+        def sort_key(row: dict[str, Any], _col: str = col, _desc: bool = desc) -> tuple:
             val = row.get(_col)
             if val is None:
                 return (1, 0, "")
             try:
-                return (0, -float(val) if desc else float(val), "")
+                return (0, -float(val) if _desc else float(val), "")
             except (TypeError, ValueError):
                 s = str(val).lower()
                 return (
                     (0, 0, s)
-                    if not desc
-                    else (0, 0, "".join(chr(0x10FFFF - ord(c)) for c in s))
+                    if not _desc
+                    else (0, 0, "".join(chr(0x10FFFF - ord(c)) for c in s))
                 )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/services/assessment/utils/post_processing.py` around lines 144 -
158, The closure sort_key captures the loop variable desc by reference which is
fragile; update sort_key signature to bind desc as a default parameter (similar
to _col) so it captures the current boolean value at definition time (e.g., def
sort_key(row: dict[str, Any], _col: str = col, _desc: bool = desc): ...) and
then use _desc inside the function when negating or inverting string order; keep
the sorted(result, key=sort_key) call unchanged.
backend/app/services/assessment/l1/pipeline.py (1)

121-135: Consider Gemini rate limits with ASSESSMENT_L1_CONCURRENT_WORKERS.

Both filters fan out one Gemini request per row at workers concurrency with no client-side throttling or retry/backoff. On large datasets this can trip provider rate limits, surfacing as per-row errors (ACCEPT/ERROR defaults) that silently degrade L1 quality. Worth bounding concurrency relative to the model's quota or adding backoff on 429s.

Also applies to: 166-179

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/services/assessment/l1/pipeline.py` around lines 121 - 135, The
concurrent ThreadPoolExecutor fan-out calling run_topic_relevance (and the
similar block that submits run_answer_reasoning) can exceed Gemini rate limits;
change the concurrency model to use a bounded semaphore or token bucket so
actual in-flight Gemini requests are limited (e.g., cap at a safe value derived
from ASSESSMENT_L1_CONCURRENT_WORKERS and model quota) and instrument
request-level retries with exponential backoff and jitter for 429/5xx responses
inside the Gemini call site used by run_topic_relevance (and the other submitted
function), or wrap the client calls in a retry helper/ decorator that catches
rate-limit responses and retries with backoff and a max attempts counter; update
both executor submission sites to acquire the semaphore/token before submitting
or perform semaphore acquisition inside the worker functions to ensure global
throttling.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@backend/app/celery/tasks/job_execution.py`:
- Around line 235-257: The task run_assessment_run is missing the gevent timeout
decorator used elsewhere (e.g., run_tts_result_processing); update
run_assessment_run to include
`@gevent_timeout`(settings.CELERY_TASK_SOFT_TIME_LIMIT, ...) above the task
definition (or provide a code comment justification in the function explaining
why gevent-based timeout/diagnostics are intentionally omitted) so timeout
behavior and logging are consistent with other tasks using gevent_timeout.

In `@backend/app/crud/assessment/batch.py`:
- Around line 174-178: Validate that row_indices, when provided, has the same
length as rows before the loop that iterates rows (the block initializing
type_cache and the for i, row in enumerate(rows) loop), and raise a clear
ValueError (or similar) if lengths differ; add the same check to the
build_google_jsonl function where row_indices is also accepted to prevent
IndexError or unused indices. Specifically, check if row_indices is not None and
len(row_indices) != len(rows) and fail early with a descriptive message
referencing rows and row_indices so callers must supply matching-length lists.

In `@backend/app/services/assessment/l1/duplicate_detection.py`:
- Around line 96-108: The generate_content call in duplicate_detection.py
(gemini_client.models.generate_content) lacks an http timeout and can hang when
file-search is slow; update this call to pass
http_options=types.HttpOptions(timeout=...) (same timeout value used in the
other fixes) so that _call_file_search and _check_vague have bounded HTTP
timeouts when dispatching concurrent file-search requests.

In `@backend/app/services/assessment/l1/topic_relevance.py`:
- Line 90: The response_schema currently uses lowercase OpenAPI type strings
(see output_schema) which the google-genai SDK expects in its internal
enum/casing (e.g., "OBJECT", "BOOLEAN"); update the construction of
output_schema used for response_schema so type values use the SDK's Type
constants (e.g., genai.types.Type.OBJECT/BOOLEAN) or the exact uppercase strings
the SDK requires, ensuring nested properties and arrays are converted likewise;
locate where output_schema is defined and replace lowercase
"object"/"boolean"/"array"/"string"/"integer" with the corresponding genai Type
constants or uppercase enum names so google-genai validation/structured output
will apply correctly.
- Around line 84-93: Add a per-request Gemini timeout and guard ThreadPool
futures: introduce ASSESSMENT_L1_REQUEST_TIMEOUT_MS in core/config.py (default
value in milliseconds), then pass an HttpOptions(timeout=...) to
gemini_client.models.generate_content calls in topic_relevance.py and both call
sites in duplicate_detection.py (convert ms -> seconds if HttpOptions expects
seconds), and update pipeline.py to call fut.result(timeout=...) using the same
timeout (converted to seconds) so ThreadPoolExecutor futures won't hang
indefinitely; reference the gemini_client.models.generate_content call sites,
types.HttpOptions (or HttpOptions), the ASSESSMENT_L1_REQUEST_TIMEOUT_MS config
symbol, and the fut.result(timeout=...) change in pipeline.py.

In `@backend/app/services/assessment/utils/attachments.py`:
- Around line 180-209: The _probe_url_type function currently calls requests.get
on user-provided URLs without SSRF protections; before making the request (and
before using _drive_file_id result) validate the URL: enforce allowed schemes
(e.g., require https), resolve the hostname to IP and block
localhost/private/reserved ranges (IPv4/IPv6), and check against an allowlist of
approved hosts or domains; if validation fails log and return None. Also disable
or tightly limit redirects (set allow_redirects=False or a small max), and
ensure the validation is re-run on redirected locations if redirects are
allowed.

---

Nitpick comments:
In `@backend/app/core/config.py`:
- Around line 177-179: The constant ASSESSMENT_L1_DUPLICATE_STORE_NAME currently
hardcodes a tenant-specific store id; change its default to an empty string (or
None) and add a startup/config validation that fails fast or disables/gates L1
duplicate detection if ASSESSMENT_L1_DUPLICATE_STORE_NAME is unset. Update the
definition of ASSESSMENT_L1_DUPLICATE_STORE_NAME in config.py to be empty and
add a clear validation check in the app initialization path (e.g., config
validation or startup/init_assessment logic) that raises an error or disables L1
when the value is empty.

In `@backend/app/crud/assessment/batch.py`:
- Around line 306-316: The docstring for the function in
backend/app/crud/assessment/batch.py is missing entries for the new parameters
preloaded_rows and row_indices; update the Args section to document both:
describe preloaded_rows as an optional sequence/list of pre-fetched dataset rows
provided to avoid re-reading from the dataset (include expected element type,
e.g., dict/Row), and describe row_indices as an optional sequence/list of
integer indices specifying which dataset rows to process from the
dataset/preloaded_rows; keep wording consistent with the existing entries
(session, run, dataset, config_blob, assessment_input, organization_id,
project_id) and ensure the new parameter descriptions include expected types and
behavior when omitted.

In `@backend/app/services/assessment/l1/pipeline.py`:
- Around line 121-135: The concurrent ThreadPoolExecutor fan-out calling
run_topic_relevance (and the similar block that submits run_answer_reasoning)
can exceed Gemini rate limits; change the concurrency model to use a bounded
semaphore or token bucket so actual in-flight Gemini requests are limited (e.g.,
cap at a safe value derived from ASSESSMENT_L1_CONCURRENT_WORKERS and model
quota) and instrument request-level retries with exponential backoff and jitter
for 429/5xx responses inside the Gemini call site used by run_topic_relevance
(and the other submitted function), or wrap the client calls in a retry helper/
decorator that catches rate-limit responses and retries with backoff and a max
attempts counter; update both executor submission sites to acquire the
semaphore/token before submitting or perform semaphore acquisition inside the
worker functions to ensure global throttling.

In `@backend/app/services/assessment/tasks.py`:
- Line 13: The tasks.py module is importing the private function
_load_dataset_rows from app.crud.assessment.batch; update usage to rely on a
public API instead: add a public wrapper (e.g., load_dataset_rows) or rename the
function in app.crud.assessment.batch to remove the leading underscore, then
change the import in backend/app/services/assessment/tasks.py to import
load_dataset_rows (and keep submit_assessment_batch as-is), ensuring callers
reference the new public symbol rather than the module-private
_load_dataset_rows.

In `@backend/app/services/assessment/utils/attachments.py`:
- Around line 100-114: The _image_mime_from_magic function is missing HEIC/HEIF
detection; extend it to detect the ISO BMFF 'ftyp' box by checking blob length
(>=12), that blob[4:8] == b'ftyp', and that the brand field (e.g., blob[8:12]
and/or subsequent bytes) matches known HEIC/HEIF brands like b'heic', b'heix',
b'hevc', b'hevx', b'mif1' (or similar), and return the appropriate HEIF mime
(e.g., "image/heif") when matched; this aligns magic detection with the
_IMAGE_MIME_BY_EXT entries .heic/.heif so files without extensions are
recognized.

In `@backend/app/services/assessment/utils/export.py`:
- Around line 674-696: The zip export omits post-processing because
serialize_export_rows is called without post_processing_config for each run;
update the multi-run export loop (the code iterating over runs_with_rows) to
pass the appropriate post_processing_config into serialize_export_rows so
computed columns/filtering/sorting are applied per-run (e.g.,
serialize_export_rows(rows, export_format,
post_processing_config=post_processing_config)), or if per-run configs are
needed, derive a per-run post_processing_config and pass that instead; ensure
the variable name you pass matches the surrounding export function's parameter
or local (post_processing_config) and keep export_format and file naming logic
unchanged.

In `@backend/app/services/assessment/utils/post_processing.py`:
- Around line 144-158: The closure sort_key captures the loop variable desc by
reference which is fragile; update sort_key signature to bind desc as a default
parameter (similar to _col) so it captures the current boolean value at
definition time (e.g., def sort_key(row: dict[str, Any], _col: str = col, _desc:
bool = desc): ...) and then use _desc inside the function when negating or
inverting string order; keep the sorted(result, key=sort_key) call unchanged.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 133027e3-00b7-48eb-985b-ce88b25541f8

📥 Commits

Reviewing files that changed from the base of the PR and between c8b57d0 and 0addb71.

📒 Files selected for processing (25)
  • backend/app/alembic/versions/064_add_l1_columns_to_assessment_run.py
  • backend/app/api/docs/assessment/update_post_processing.md
  • backend/app/api/routes/assessment/runs.py
  • backend/app/celery/tasks/job_execution.py
  • backend/app/core/config.py
  • backend/app/crud/assessment/__init__.py
  • backend/app/crud/assessment/batch.py
  • backend/app/crud/assessment/core.py
  • backend/app/crud/assessment/cron.py
  • backend/app/models/assessment.py
  • backend/app/services/assessment/l1/__init__.py
  • backend/app/services/assessment/l1/duplicate_detection.py
  • backend/app/services/assessment/l1/pipeline.py
  • backend/app/services/assessment/l1/topic_relevance.py
  • backend/app/services/assessment/service.py
  • backend/app/services/assessment/tasks.py
  • backend/app/services/assessment/utils/attachments.py
  • backend/app/services/assessment/utils/export.py
  • backend/app/services/assessment/utils/post_processing.py
  • backend/app/tests/assessment/test_batch.py
  • backend/app/tests/assessment/test_cron.py
  • backend/app/tests/assessment/test_crud.py
  • backend/app/tests/assessment/test_export.py
  • backend/app/tests/assessment/test_service.py
  • backend/app/tests/assessment/test_topic_relevance.py

Comment thread backend/app/celery/tasks/job_execution.py Outdated
Comment thread backend/app/crud/assessment/batch.py Outdated
Comment thread backend/app/services/assessment/prefilter/duplicate_detection.py Outdated
Comment thread backend/app/services/assessment/prefilter/topic_relevance.py Outdated
Comment thread backend/app/services/assessment/prefilter/topic_relevance.py Outdated
Comment thread backend/app/services/assessment/utils/attachments.py Outdated
Comment thread backend/app/services/assessment/l1/pipeline.py Outdated
Comment thread backend/app/core/config.py Outdated
Comment thread backend/app/services/assessment/prefilter/pipeline.py Outdated
… detection

- Added a new prefilter pipeline orchestrator that runs topic relevance and duplicate detection filters in series.
- Created `run_topic_relevance` and `run_duplicate_detection` functions to handle respective filtering logic.
- Updated assessment service to utilize prefilter configuration instead of L1 configuration.
- Modified assessment tasks to reflect the new prefilter processing status and error handling.
- Adjusted utility functions and export logic to accommodate prefilter results.
- Enhanced tests to cover the new prefilter functionality and ensure proper integration.
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/app/services/assessment/tasks.py (1)

45-52: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Run left in dangling state when parent assessment is missing.

When the parent Assessment is not found, the run remains in its current status (likely pending) without being marked as failed. This leaves an orphaned run that won't be picked up by cron polling or cleaned up.

Proposed fix
         assessment = session.get(Assessment, run.assessment_id)
         if assessment is None:
             logger.error(
                 "[execute_assessment_run] parent assessment %s not found for run %s",
                 run.assessment_id,
                 run_id,
             )
+            update_assessment_run_status(
+                session=session,
+                run=run,
+                status="failed",
+                error_message="Parent assessment not found.",
+            )
             return
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/services/assessment/tasks.py` around lines 45 - 52, When the
parent Assessment is missing in execute_assessment_run, the run (variable run)
must be transitioned out of the dangling state: set run.status to a failure
state (e.g., RunStatus.FAILED or equivalent), populate run.error or
run.finished_at with a brief message/timestamp indicating "parent assessment not
found", persist the change via session.add()/session.commit() (or
session.flush()/session.commit() as per existing patterns), and then log the
error; update the block that currently returns after logger.error to perform
these updates on run and commit before returning.
🧹 Nitpick comments (6)
backend/app/alembic/versions/064_add_prefilter_columns_to_assessment_run.py (1)

25-25: 💤 Low value

Minor typo in column comment.

The comment says "prefilter filter results" which has a redundant "filter".

Suggested fix
-            comment="S3 URL of stored prefilter filter results JSON",
+            comment="S3 URL of stored prefilter results JSON",
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/alembic/versions/064_add_prefilter_columns_to_assessment_run.py`
at line 25, Fix the typo in the column comment inside the migration file
064_add_prefilter_columns_to_assessment_run.py by changing the comment string
from "S3 URL of stored prefilter filter results JSON" to "S3 URL of stored
prefilter results JSON" where the column is defined (the place that currently
has comment="S3 URL of stored prefilter filter results JSON"); ensure you update
the same comment text in both the upgrade and any corresponding downgrade/column
definition blocks if duplicated.
backend/app/tests/assessment/test_batch.py (1)

476-481: ⚡ Quick win

Preserve the validator’s URL-return contract in these probe tests.

These patches replace validate_callback_url with a bare mock, so detect_item_type can pass a mock object into requests.get and the assertions still succeed. The redirect case also only checks call counts, so it would miss a regression where the revalidated URL is ignored. Please make the validator mock return the input URL (for example via side_effect=lambda url: url) and assert the validated URL(s) passed to requests.get.

Also applies to: 495-500, 507-512, 529-545, 559-564

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/tests/assessment/test_batch.py` around lines 476 - 481, The tests
patching app.services.assessment.utils.attachments.validate_callback_url must
preserve its URL-return contract: change those bare mocks to return the input
URL (e.g., use side_effect=lambda url: url) so detect_item_type receives a real
URL string rather than a Mock; then update assertions to verify the validated
URL(s) were passed into requests.get (inspect mock_get.call_args /
call_args_list) instead of only checking call counts. Apply this change to the
mocks around validate_callback_url and requests.get in the probe tests (the
blocks referencing validate_callback_url and mock_get at the shown locations) so
redirects and revalidation actually use the validated URL.
backend/app/tests/assessment/test_crud.py (2)

321-329: ⚡ Quick win

Verify flag_modified in the input=None path.

This branch is the one most likely to miss JSON dirty tracking, but the test patches flag_modified without asserting it was called. Checking flag_modified(run, "input") here would lock down the persistence contract for the newly initialized blob.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/tests/assessment/test_crud.py` around lines 321 - 329, The test
test_none_input_handled currently patches flag_modified but doesn't assert it
was invoked; update the test so after calling
update_run_post_processing_config(session=session, run=run, config=None) you
assert flag_modified was called with the run object and the "input" attribute
(e.g., assert flag_modified.called and/or assert flag_modified.call_args showing
(run, "input")), keeping the existing patch of flag_modified and verifying the
persistence contract for update_run_post_processing_config.

227-246: ⚡ Quick win

Assert the new prefilter counters in build_run_stats.

This fixture only proves the extra fields do not crash when they are None. It will still pass if build_run_stats silently drops prefilter_total_rows, prefilter_total_passed, or prefilter_total_rejected. Please add a case with concrete values and assert they are exposed on stats[0].

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/tests/assessment/test_crud.py` around lines 227 - 246, The test
currently only covers None prefilter counters; add a second run in
test_build_run_stats with concrete integer values for prefilter_total_rows,
prefilter_total_passed, and prefilter_total_rejected so build_run_stats must
preserve them; then assert on stats entry (e.g., stats[1] or the matching run
object) that .prefilter_total_rows, .prefilter_total_passed, and
.prefilter_total_rejected equal the expected integers to ensure build_run_stats
exposes these fields unchanged. Ensure you reference the same SimpleNamespace
fixture and the build_run_stats function when adding the new case.
backend/app/tests/assessment/test_pipeline.py (2)

98-123: ⚡ Quick win

Cover original row-index preservation in the duplicate-detection path.

This case only exercises a surviving row at index 0, so it cannot catch a regression where the pipeline renumbers passed rows before calling run_duplicate_detection. Since the downstream batch/export flow depends on original row indices, please add a case where an earlier row is filtered out and assert the duplicate-detection call still receives the original index.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/tests/assessment/test_pipeline.py` around lines 98 - 123, Add a
second scenario in test_duplicate_detection_runs_on_passed_rows where an earlier
row is filtered out so the surviving row is not at index 0 (e.g., three rows
where the first is irrelevant and the second survives); call
run_prefilter_pipeline the same way and assert dup_mock (from _patches) was
called with the original row identifier (e.g., "row_1" or matching the original
index) and that results preserves the original index in duplicate_detection,
ensuring run_duplicate_detection receives original row indices.

25-59: ⚡ Quick win

Add type hints to _patches.

tr_side, dup_return, and the return value are currently untyped in changed Python code.

As per coding guidelines, **/*.py: Always add type hints to all function parameters and return values in Python code.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/tests/assessment/test_pipeline.py` around lines 25 - 59, Add
explicit type hints to the _patches function: annotate parameters tr_side and
dup_return as Optional[Any] (or a more specific Union if you know the exact
types used as side_effect/return), and annotate the return type as
Tuple[MagicMock, MagicMock]; update the function signature (def _patches(stack:
ExitStack, *, tr_side: Optional[Any] = None, dup_return: Optional[Any] = None)
-> Tuple[MagicMock, MagicMock]:) and add the necessary imports (from typing
import Optional, Any, Tuple and ensure MagicMock is imported from unittest.mock)
so the function and its return are fully typed.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@backend/app/services/assessment/tasks.py`:
- Around line 45-52: When the parent Assessment is missing in
execute_assessment_run, the run (variable run) must be transitioned out of the
dangling state: set run.status to a failure state (e.g., RunStatus.FAILED or
equivalent), populate run.error or run.finished_at with a brief
message/timestamp indicating "parent assessment not found", persist the change
via session.add()/session.commit() (or session.flush()/session.commit() as per
existing patterns), and then log the error; update the block that currently
returns after logger.error to perform these updates on run and commit before
returning.

---

Nitpick comments:
In `@backend/app/alembic/versions/064_add_prefilter_columns_to_assessment_run.py`:
- Line 25: Fix the typo in the column comment inside the migration file
064_add_prefilter_columns_to_assessment_run.py by changing the comment string
from "S3 URL of stored prefilter filter results JSON" to "S3 URL of stored
prefilter results JSON" where the column is defined (the place that currently
has comment="S3 URL of stored prefilter filter results JSON"); ensure you update
the same comment text in both the upgrade and any corresponding downgrade/column
definition blocks if duplicated.

In `@backend/app/tests/assessment/test_batch.py`:
- Around line 476-481: The tests patching
app.services.assessment.utils.attachments.validate_callback_url must preserve
its URL-return contract: change those bare mocks to return the input URL (e.g.,
use side_effect=lambda url: url) so detect_item_type receives a real URL string
rather than a Mock; then update assertions to verify the validated URL(s) were
passed into requests.get (inspect mock_get.call_args / call_args_list) instead
of only checking call counts. Apply this change to the mocks around
validate_callback_url and requests.get in the probe tests (the blocks
referencing validate_callback_url and mock_get at the shown locations) so
redirects and revalidation actually use the validated URL.

In `@backend/app/tests/assessment/test_crud.py`:
- Around line 321-329: The test test_none_input_handled currently patches
flag_modified but doesn't assert it was invoked; update the test so after
calling update_run_post_processing_config(session=session, run=run, config=None)
you assert flag_modified was called with the run object and the "input"
attribute (e.g., assert flag_modified.called and/or assert
flag_modified.call_args showing (run, "input")), keeping the existing patch of
flag_modified and verifying the persistence contract for
update_run_post_processing_config.
- Around line 227-246: The test currently only covers None prefilter counters;
add a second run in test_build_run_stats with concrete integer values for
prefilter_total_rows, prefilter_total_passed, and prefilter_total_rejected so
build_run_stats must preserve them; then assert on stats entry (e.g., stats[1]
or the matching run object) that .prefilter_total_rows, .prefilter_total_passed,
and .prefilter_total_rejected equal the expected integers to ensure
build_run_stats exposes these fields unchanged. Ensure you reference the same
SimpleNamespace fixture and the build_run_stats function when adding the new
case.

In `@backend/app/tests/assessment/test_pipeline.py`:
- Around line 98-123: Add a second scenario in
test_duplicate_detection_runs_on_passed_rows where an earlier row is filtered
out so the surviving row is not at index 0 (e.g., three rows where the first is
irrelevant and the second survives); call run_prefilter_pipeline the same way
and assert dup_mock (from _patches) was called with the original row identifier
(e.g., "row_1" or matching the original index) and that results preserves the
original index in duplicate_detection, ensuring run_duplicate_detection receives
original row indices.
- Around line 25-59: Add explicit type hints to the _patches function: annotate
parameters tr_side and dup_return as Optional[Any] (or a more specific Union if
you know the exact types used as side_effect/return), and annotate the return
type as Tuple[MagicMock, MagicMock]; update the function signature (def
_patches(stack: ExitStack, *, tr_side: Optional[Any] = None, dup_return:
Optional[Any] = None) -> Tuple[MagicMock, MagicMock]:) and add the necessary
imports (from typing import Optional, Any, Tuple and ensure MagicMock is
imported from unittest.mock) so the function and its return are fully typed.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 9fd400ef-931b-4af1-b578-6e595abd868b

📥 Commits

Reviewing files that changed from the base of the PR and between 0addb71 and e020717.

📒 Files selected for processing (22)
  • backend/app/alembic/versions/064_add_prefilter_columns_to_assessment_run.py
  • backend/app/api/routes/assessment/runs.py
  • backend/app/celery/tasks/job_execution.py
  • backend/app/core/config.py
  • backend/app/crud/assessment/__init__.py
  • backend/app/crud/assessment/batch.py
  • backend/app/crud/assessment/core.py
  • backend/app/models/assessment.py
  • backend/app/services/assessment/prefilter/__init__.py
  • backend/app/services/assessment/prefilter/duplicate_detection.py
  • backend/app/services/assessment/prefilter/pipeline.py
  • backend/app/services/assessment/prefilter/topic_relevance.py
  • backend/app/services/assessment/service.py
  • backend/app/services/assessment/tasks.py
  • backend/app/services/assessment/utils/attachments.py
  • backend/app/services/assessment/utils/export.py
  • backend/app/tests/assessment/test_batch.py
  • backend/app/tests/assessment/test_crud.py
  • backend/app/tests/assessment/test_duplicate_detection.py
  • backend/app/tests/assessment/test_pipeline.py
  • backend/app/tests/assessment/test_post_processing.py
  • backend/app/tests/assessment/test_topic_relevance.py
✅ Files skipped from review due to trivial changes (1)
  • backend/app/services/assessment/prefilter/init.py
🚧 Files skipped from review as they are similar to previous changes (6)
  • backend/app/celery/tasks/job_execution.py
  • backend/app/api/routes/assessment/runs.py
  • backend/app/tests/assessment/test_topic_relevance.py
  • backend/app/services/assessment/service.py
  • backend/app/crud/assessment/batch.py
  • backend/app/services/assessment/utils/attachments.py

Comment thread backend/app/celery/tasks/job_execution.py Outdated
Comment thread backend/app/core/config.py Outdated
Comment thread backend/app/core/config.py Outdated
Comment thread backend/app/crud/assessment/batch.py Outdated
Comment thread backend/app/crud/assessment/core.py Outdated
Comment thread backend/app/services/assessment/prefilter/topic_relevance.py Outdated
Comment thread backend/app/services/assessment/utils/attachments.py Outdated
Comment thread backend/app/services/assessment/utils/attachments.py Outdated
Comment thread backend/app/services/assessment/utils/post_processing.py
Comment thread backend/app/services/assessment/tasks.py Outdated
- Consolidated and refactored tests for the prefilter pipeline and related services.
- Introduced new tests for orchestrating assessment runs, ensuring proper handling of pipeline stages and statuses.
- Added tests for the topic relevance request builder and result parser, improving coverage for attachment handling.
- Implemented failure guards in task execution to prevent dangling runs and ensure proper error handling.
- Enhanced the resume functionality for failed assessment runs, allowing for retries from the last failed stage.
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Jun 4, 2026

OpenAPI changes   🟢 31 non-breaking changes

Tip

Safe to merge from an API-contract perspective.

Full changelog  ·  31
Method Path Change
🟢 GET /api/v1/assessment/assessments added the optional property data/anyOf[subschema #1]/items/run_stats/items/prefilter_total_passed to the response with the 200 status
🟢 GET /api/v1/assessment/assessments added the optional property data/anyOf[subschema #1]/items/run_stats/items/prefilter_total_rejected to the response with the 200 status
🟢 GET /api/v1/assessment/assessments added the optional property data/anyOf[subschema #1]/items/run_stats/items/prefilter_total_rows to the response with the 200 status
🟢 GET /api/v1/assessment/assessments added the optional property data/anyOf[subschema #1]/items/run_stats/items/stage to the response with the 200 status
🟢 GET /api/v1/assessment/assessments added the optional property data/anyOf[subschema #1]/items/run_stats/items/stage_status to the response with the 200 status
🟢 GET /api/v1/assessment/assessments/{assessment_id} added the optional property data/anyOf[subschema #1: AssessmentPublic]/run_stats/items/prefilter_total_passed to the response with the 200 status
🟢 GET /api/v1/assessment/assessments/{assessment_id} added the optional property data/anyOf[subschema #1: AssessmentPublic]/run_stats/items/prefilter_total_rejected to the response with the 200 status
🟢 GET /api/v1/assessment/assessments/{assessment_id} added the optional property data/anyOf[subschema #1: AssessmentPublic]/run_stats/items/prefilter_total_rows to the response with the 200 status
🟢 GET /api/v1/assessment/assessments/{assessment_id} added the optional property data/anyOf[subschema #1: AssessmentPublic]/run_stats/items/stage to the response with the 200 status
🟢 GET /api/v1/assessment/assessments/{assessment_id} added the optional property data/anyOf[subschema #1: AssessmentPublic]/run_stats/items/stage_status to the response with the 200 status
🟢 GET /api/v1/assessment/runs added the optional property data/anyOf[subschema #1]/items/pipeline to the response with the 200 status
🟢 GET /api/v1/assessment/runs added the optional property data/anyOf[subschema #1]/items/post_processing_config to the response with the 200 status
🟢 GET /api/v1/assessment/runs added the optional property data/anyOf[subschema #1]/items/prefilter_total_passed to the response with the 200 status
🟢 GET /api/v1/assessment/runs added the optional property data/anyOf[subschema #1]/items/prefilter_total_rejected to the response with the 200 status
🟢 GET /api/v1/assessment/runs added the optional property data/anyOf[subschema #1]/items/prefilter_total_rows to the response with the 200 status
🟢 GET /api/v1/assessment/runs added the optional property data/anyOf[subschema #1]/items/stage to the response with the 200 status
🟢 GET /api/v1/assessment/runs added the optional property data/anyOf[subschema #1]/items/stage_status to the response with the 200 status
🟢 POST /api/v1/assessment/runs added the new optional request property attachments/items/type_column
🟢 POST /api/v1/assessment/runs added the new optional request property attachments/items/type_value_map
🟢 POST /api/v1/assessment/runs added the new optional request property post_processing_config
🟢 POST /api/v1/assessment/runs added the new optional request property prefilter_config
🟢 POST /api/v1/assessment/runs added the new mixed enum value to the request property attachments/items/type
🟢 GET /api/v1/assessment/runs/{run_id} added the optional property data/anyOf[subschema #1: AssessmentRunPublic]/pipeline to the response with the 200 status
🟢 GET /api/v1/assessment/runs/{run_id} added the optional property data/anyOf[subschema #1: AssessmentRunPublic]/post_processing_config to the response with the 200 status
🟢 GET /api/v1/assessment/runs/{run_id} added the optional property data/anyOf[subschema #1: AssessmentRunPublic]/prefilter_total_passed to the response with the 200 status
🟢 GET /api/v1/assessment/runs/{run_id} added the optional property data/anyOf[subschema #1: AssessmentRunPublic]/prefilter_total_rejected to the response with the 200 status
🟢 GET /api/v1/assessment/runs/{run_id} added the optional property data/anyOf[subschema #1: AssessmentRunPublic]/prefilter_total_rows to the response with the 200 status
🟢 GET /api/v1/assessment/runs/{run_id} added the optional property data/anyOf[subschema #1: AssessmentRunPublic]/stage to the response with the 200 status
🟢 GET /api/v1/assessment/runs/{run_id} added the optional property data/anyOf[subschema #1: AssessmentRunPublic]/stage_status to the response with the 200 status
🟢 PATCH /api/v1/assessment/runs/{run_id}/post-processing endpoint added
🟢 POST /api/v1/assessment/runs/{run_id}/resume endpoint added

main11b5d0f0 · generated by oasdiff

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
backend/app/services/assessment/utils/export.py (1)

333-338: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Keep the base export columns in XLSX output.

fieldnames already contains the stable metadata columns, but the XLSX branch rebuilds excel_fields from only input/prefilter/output/computed columns. That drops row_id, result_status, error, token counts, and run/config metadata from spreadsheets while CSV/JSON still include them.

Proposed fix
-    # Explicit ordering: inputs → prefilter → L2 → computed columns
-    excel_fields = input_col_names + prefilter_keys + output_keys + computed_names
-    if not excel_fields:
-        excel_fields = output_keys or ["output"]
+    # Preserve the standard export columns, then append any computed columns.
+    excel_fields = fieldnames + [
+        name for name in computed_names if name not in fieldnames
+    ]
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/services/assessment/utils/export.py` around lines 333 - 338, The
XLSX branch currently rebuilds excel_fields from only input_col_names +
prefilter_keys + output_keys + computed_names which omits the stable base
metadata in fieldnames (row_id, result_status, error, token counts, run/config
metadata); change the construction of excel_fields to start from the existing
fieldnames and then append the other groups while preserving order and
deduplicating (e.g., excel_fields = fieldnames + input_col_names +
prefilter_keys + output_keys + computed_names with a stable dedupe), then keep
the existing fallback and call to _drop_empty_columns(expanded, excel_fields);
update references to excel_fields, input_col_names, prefilter_keys, output_keys,
computed_names, fieldnames, and _drop_empty_columns accordingly.
backend/app/services/assessment/service.py (1)

197-202: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Broker publish failures can leave runs stuck with no worker owning them.

Both call sites persist state before run_assessment_pipeline.delay(...) is known to have succeeded. If the broker publish fails, start_assessment can return after partially creating/dispatched runs, and resume_assessment_run leaves the run committed as processing/PENDING even though no task was enqueued. The worker-side failure guard cannot recover this path because it only runs after task startup.

Also applies to: 325-343

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/services/assessment/service.py` around lines 197 - 202, The code
persists run state before calling run_assessment_pipeline.delay(...) in
start_assessment and resume_assessment_run, which can leave runs marked
processing/PENDING if the broker publish fails; change the flow so the broker
publish is confirmed before committing the run state (either publish inside the
DB transaction using an outbox pattern or delay committing status changes until
after run_assessment_pipeline.delay(...) returns successfully), or catch publish
errors and roll back or update the run record (e.g., set run.status back to
CREATED/FAILED) if delay() raises; specifically update the logic around
run_assessment_pipeline.delay, start_assessment, and resume_assessment_run to
ensure atomicity between DB state and task enqueueing.
🧹 Nitpick comments (4)
backend/app/tests/assessment/test_topic_relevance.py (1)

14-15: ⚡ Quick win

Add a return type to _gemini.

This helper is missing a return annotation, which violates the repository's Python typing rule. A loose annotation is fine here if you want to avoid importing the private _patch type.

Suggested change
+from typing import Any
 from unittest.mock import patch
@@
-def _gemini():
+def _gemini() -> Any:
     return patch.object(settings, "ASSESSMENT_PREFILTER_PROVIDER", "google")

As per coding guidelines, "**/*.py: Always add type hints to all function parameters and return values in Python code".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/tests/assessment/test_topic_relevance.py` around lines 14 - 15,
The helper function _gemini lacks a return type annotation; add a return
annotation (e.g., -> Any) to its signature and import Any from typing to satisfy
the repo typing rule. Update the function def _gemini() to def _gemini() -> Any
and ensure the existing return patch.object(settings,
"ASSESSMENT_PREFILTER_PROVIDER", "google") remains unchanged so callers still
get the patch object.
backend/app/tests/assessment/test_export.py (1)

533-547: ⚡ Quick win

Add type hints to _patches.

This new helper is missing parameter and return annotations, so it falls short of the repo's Python typing rule and makes the shared patch setup harder to read and check.

Proposed fix
+from typing import Any
+
-    def _patches(self, *, l2, prefilter=None, dataset_rows=None):
+    def _patches(
+        self,
+        *,
+        l2: dict[str, dict[str, Any]],
+        prefilter: dict[str, dict[str, Any]] | None = None,
+        dataset_rows: list[dict[str, str]] | None = None,
+    ) -> list[Any]:

As per coding guidelines, **/*.py: Always add type hints to all function parameters and return values in Python code.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/tests/assessment/test_export.py` around lines 533 - 547, The
helper function _patches is missing type annotations; update its signature to
include parameter and return types (e.g. def _patches(*, l2: Dict[str, Any],
prefilter: Optional[Dict[str, Any]] = None, dataset_rows: Optional[List[Any]] =
None) -> List[Any]:) and add the necessary typing imports (from typing import
Any, Dict, List, Optional) at the top of the file so callers and linters
understand the expected shapes and the function returns a list of patch objects.
backend/app/tests/assessment/test_processing.py (1)

256-267: ⚡ Quick win

Add return annotations to the local test builders.

_parent, _run, and _job are new helper factories in this file, but they still omit return annotations, and _job also leaves kw untyped. That makes this changed test module inconsistent with the repo’s required Python typing baseline.

As per coding guidelines, "Always add type hints to all function parameters and return values in Python code" and "Use Python 3.11+ with type hints throughout the codebase."

Also applies to: 373-376

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/tests/assessment/test_processing.py` around lines 256 - 267, The
helper factory functions _parent, _run, and _job lack return type annotations
(and _job’s kw parameter is untyped); update each definition to include proper
type hints — e.g., annotate _parent() and _run() with their return types
(SimpleNamespace or a more specific TypedDict/NamedTuple if available) and give
_job(...) a typed signature for kw (Mapping[str, Any] or **kwargs: Any depending
on usage) so the test module conforms to the repository’s Python 3.11 typing
baseline; update both occurrences (lines around _parent/_run and the other
instance at 373–376) to use these annotations.
backend/app/crud/assessment/processing.py (1)

250-250: ⚡ Quick win

Annotate batch_job in _poll_stage_outcome.

Line 250 introduces an untyped parameter on the batch-polling helper, which breaks the repository’s Python typing rule on one of the core orchestration paths. BatchJob looks like the intended type here.

As per coding guidelines, "Always add type hints to all function parameters and return values in Python code" and "Use Python 3.11+ with type hints throughout the codebase."

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/crud/assessment/processing.py` at line 250, The function
_poll_stage_outcome has an untyped parameter batch_job; annotate it as BatchJob
(i.e., def _poll_stage_outcome(session: Session, provider: BatchProvider,
batch_job: BatchJob) -> str), add the necessary import for BatchJob at the top
of the file (from the module that defines BatchJob), and run type checks to
ensure call sites accept the new annotation; keep the existing return type str
unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@backend/app/celery/tasks/job_execution.py`:
- Around line 237-244: The function run_assessment_pipeline currently accepts
but silently drops **kwargs and lacks type annotations; update its signature to
include proper type hints (e.g., run_id: int, organization_id: int, project_id:
int, trace_id: str, **kwargs: Any) and add a return type (likely -> None). Then
either forward **kwargs into the downstream pipeline invocation inside
run_assessment_pipeline (so extra task args are preserved) or explicitly
validate/raise (TypeError) for unexpected keys if they must not be accepted;
ensure you import Any from typing and apply the change where
run_assessment_pipeline is defined.

In `@backend/app/crud/assessment/cron.py`:
- Around line 118-127: The current catch-all in
poll_all_pending_assessment_evaluations treats every exception from
process_run_batches() as transient and retries; instead, detect and propagate
deterministic failures coming from process_run_batches (e.g., ValueError for
missing-parent) rather than retrying forever: update the except block to inspect
the exception type or sentinel from process_run_batches, handle known transient
exceptions by rolling back and incrementing still_processing, but for
deterministic errors (e.g., ValueError) mark the run as failed or re-raise so
the failure state is recorded (use the existing failure-recording helper used
elsewhere in processing.py) and do not leave runs stuck in PROCESSING; refer to
process_run_batches, poll_all_pending_assessment_evaluations, run.id and
run.assessment_id to locate and implement the conditional handling.

In `@backend/app/crud/assessment/processing.py`:
- Around line 358-369: The current flow calls advance_or_finalize(run) and
commits the updated run before invoking run_assessment_pipeline.delay, which can
leave the run stuck in stage=<next>, stage_status=PENDING if the broker call
fails; change the flow to dispatch the task first and only persist the
stage/state transition after a successful enqueue: call
run_assessment_pipeline.delay(...) (catching and handling exceptions), and on
success call advance_or_finalize(run), session.add(run), session.commit(), then
recompute_assessment_status(session=session, assessment_id=run.assessment_id);
ensure you reference the run object’s stage and stage_status fields and keep
run_assessment_pipeline.delay wrapped so failures prevent committing the new
stage.

In `@backend/app/models/assessment.py`:
- Around line 338-358: Add a Pydantic validation that rejects incomplete or
invalid "mixed" attachment configs: change type_value_map's annotation to
dict[str, Literal["image","pdf"]] | None and add a validator or root_validator
(e.g., validate_mixed_config) that, when self.type == "mixed", requires
type_column is not None and type_value_map is not None and that every value in
type_value_map is either "image" or "pdf"; raise a ValidationError with a clear
message if any check fails so malformed mixed configs are rejected at schema
boundary.

In `@backend/app/services/assessment/prefilter/topic_relevance.py`:
- Around line 100-106: The current parsing builds column_relevance with bool(v),
which converts non-empty strings like "false" to True; update the construction
of column_relevance (the dict built from data in the same block that reads
out.get("output") and sets decision) to accept only real booleans or explicit
"true"/"false" strings: iterate data.items() (excluding "decision" and
"reasoning") and set each value to True only when isinstance(v, bool) and v is
True, or when isinstance(v, str) and v.strip().lower() == "true" (treat "false"
as False); leave other types as False (or omit) so inverted/string values do not
become True unintentionally.

In `@backend/app/services/assessment/stages.py`:
- Around line 87-104: The config branch only handles base == "openai" but must
also handle the OpenAI-native provider; update the conditional in the stage
builder so that when settings.ASSESSMENT_PREFILTER_PROVIDER equals
LLMProvider.OPENAI_NATIVE (or the string "openai-native") it builds the
OpenAI-style config (endpoint, completion_window, description) instead of the
Gemini-style display_name/model; modify the conditional that creates config
(around the call to _get_batch_provider and before start_batch_job) to include
LLMProvider.OPENAI_NATIVE (or "openai-native") alongside "openai" so
OpenAIBatchProvider receives the correct payload for start_batch_job.

In `@backend/app/services/assessment/tasks.py`:
- Around line 95-101: The run is being advanced before the follow-up Celery task
is guaranteed to be queued, so if run_assessment_pipeline.delay(...) fails the
run stays advanced with no worker to process it; fix by ensuring dispatch
succeeds before committing the new stage: either move the call to _dispatch so
it runs (and returns successfully) before _persist_advance commits, or wrap
dispatch in transaction.on_commit (or catch exceptions from
run_assessment_pipeline.delay/apply_async and abort the commit) so that
_persist_advance only runs after the task has been successfully queued; apply
the same change to the other occurrence around run_assessment_pipeline.delay at
lines 266-274 (referencing _dispatch, _persist_advance, and
run_assessment_pipeline.delay/apply_async).

In `@backend/app/services/assessment/utils/attachments.py`:
- Around line 137-145: resolve_item_type currently coerces anything not
recognized to "image", causing unknown/ambiguous 'mixed' rows (when
attachment_type_for_row returns None) to be treated as images; change
resolve_item_type to return None for unresolved types instead of "image" by
updating its signature to return Optional[str] (str | None) and return None when
item_type not in ("image","pdf"); also update callers (e.g.,
attachment_type_for_row consumers) to handle a None result appropriately rather
than assuming "image".

In `@backend/app/services/assessment/utils/export.py`:
- Around line 619-632: The fallback export only iterates l2_by_row_id so rows
that were rejected at prefilter time (present only in prefilter_by_row_id) are
omitted; change the comprehension to iterate over the union of keys from
l2_by_row_id and prefilter_by_row_id (convert keys to strings consistently),
fetch l2_item as l2_by_row_id.get(row_id) and prefilter_item as
prefilter_by_row_id.get(row_id), and call _build_export_row with those values
(keeping the same arguments: run, assessment, dataset_name, row_id=str(row_id),
input_data=None, prefilter_item=..., l2_item=..., has_prefilter=has_prefilter)
so prefilter-only rows are included when the dataset cannot be reloaded.

---

Outside diff comments:
In `@backend/app/services/assessment/service.py`:
- Around line 197-202: The code persists run state before calling
run_assessment_pipeline.delay(...) in start_assessment and
resume_assessment_run, which can leave runs marked processing/PENDING if the
broker publish fails; change the flow so the broker publish is confirmed before
committing the run state (either publish inside the DB transaction using an
outbox pattern or delay committing status changes until after
run_assessment_pipeline.delay(...) returns successfully), or catch publish
errors and roll back or update the run record (e.g., set run.status back to
CREATED/FAILED) if delay() raises; specifically update the logic around
run_assessment_pipeline.delay, start_assessment, and resume_assessment_run to
ensure atomicity between DB state and task enqueueing.

In `@backend/app/services/assessment/utils/export.py`:
- Around line 333-338: The XLSX branch currently rebuilds excel_fields from only
input_col_names + prefilter_keys + output_keys + computed_names which omits the
stable base metadata in fieldnames (row_id, result_status, error, token counts,
run/config metadata); change the construction of excel_fields to start from the
existing fieldnames and then append the other groups while preserving order and
deduplicating (e.g., excel_fields = fieldnames + input_col_names +
prefilter_keys + output_keys + computed_names with a stable dedupe), then keep
the existing fallback and call to _drop_empty_columns(expanded, excel_fields);
update references to excel_fields, input_col_names, prefilter_keys, output_keys,
computed_names, fieldnames, and _drop_empty_columns accordingly.

---

Nitpick comments:
In `@backend/app/crud/assessment/processing.py`:
- Line 250: The function _poll_stage_outcome has an untyped parameter batch_job;
annotate it as BatchJob (i.e., def _poll_stage_outcome(session: Session,
provider: BatchProvider, batch_job: BatchJob) -> str), add the necessary import
for BatchJob at the top of the file (from the module that defines BatchJob), and
run type checks to ensure call sites accept the new annotation; keep the
existing return type str unchanged.

In `@backend/app/tests/assessment/test_export.py`:
- Around line 533-547: The helper function _patches is missing type annotations;
update its signature to include parameter and return types (e.g. def _patches(*,
l2: Dict[str, Any], prefilter: Optional[Dict[str, Any]] = None, dataset_rows:
Optional[List[Any]] = None) -> List[Any]:) and add the necessary typing imports
(from typing import Any, Dict, List, Optional) at the top of the file so callers
and linters understand the expected shapes and the function returns a list of
patch objects.

In `@backend/app/tests/assessment/test_processing.py`:
- Around line 256-267: The helper factory functions _parent, _run, and _job lack
return type annotations (and _job’s kw parameter is untyped); update each
definition to include proper type hints — e.g., annotate _parent() and _run()
with their return types (SimpleNamespace or a more specific TypedDict/NamedTuple
if available) and give _job(...) a typed signature for kw (Mapping[str, Any] or
**kwargs: Any depending on usage) so the test module conforms to the
repository’s Python 3.11 typing baseline; update both occurrences (lines around
_parent/_run and the other instance at 373–376) to use these annotations.

In `@backend/app/tests/assessment/test_topic_relevance.py`:
- Around line 14-15: The helper function _gemini lacks a return type annotation;
add a return annotation (e.g., -> Any) to its signature and import Any from
typing to satisfy the repo typing rule. Update the function def _gemini() to def
_gemini() -> Any and ensure the existing return patch.object(settings,
"ASSESSMENT_PREFILTER_PROVIDER", "google") remains unchanged so callers still
get the patch object.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: c1c9e50c-3b5d-4d7b-9b68-7ddcf07265d4

📥 Commits

Reviewing files that changed from the base of the PR and between e020717 and 4a4e4f8.

📒 Files selected for processing (31)
  • backend/app/alembic/versions/064_add_prefilter_columns_to_assessment_run.py
  • backend/app/api/docs/assessment/resume_run.md
  • backend/app/api/routes/assessment/runs.py
  • backend/app/celery/tasks/job_execution.py
  • backend/app/core/config.py
  • backend/app/crud/assessment/batch.py
  • backend/app/crud/assessment/core.py
  • backend/app/crud/assessment/cron.py
  • backend/app/crud/assessment/processing.py
  • backend/app/models/assessment.py
  • backend/app/services/assessment/prefilter/__init__.py
  • backend/app/services/assessment/prefilter/duplicate_detection.py
  • backend/app/services/assessment/prefilter/pipeline.py
  • backend/app/services/assessment/prefilter/request_builder.py
  • backend/app/services/assessment/prefilter/topic_relevance.py
  • backend/app/services/assessment/service.py
  • backend/app/services/assessment/stages.py
  • backend/app/services/assessment/tasks.py
  • backend/app/services/assessment/utils/attachments.py
  • backend/app/services/assessment/utils/export.py
  • backend/app/tests/assessment/test_batch.py
  • backend/app/tests/assessment/test_cron.py
  • backend/app/tests/assessment/test_crud.py
  • backend/app/tests/assessment/test_duplicate_detection.py
  • backend/app/tests/assessment/test_export.py
  • backend/app/tests/assessment/test_pipeline.py
  • backend/app/tests/assessment/test_prefilter_batching.py
  • backend/app/tests/assessment/test_processing.py
  • backend/app/tests/assessment/test_service.py
  • backend/app/tests/assessment/test_tasks_failure_guard.py
  • backend/app/tests/assessment/test_topic_relevance.py
✅ Files skipped from review due to trivial changes (1)
  • backend/app/api/docs/assessment/resume_run.md
🚧 Files skipped from review as they are similar to previous changes (2)
  • backend/app/tests/assessment/test_crud.py
  • backend/app/crud/assessment/core.py

Comment on lines +237 to +244
def run_assessment_pipeline(
self,
run_id: int,
organization_id: int,
project_id: int,
trace_id: str,
**kwargs,
):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Don't silently swallow extra task kwargs here.

run_assessment_pipeline never forwards **kwargs, so unexpected task arguments are accepted and dropped instead of failing fast. This segment also misses the required type annotations on the new function signature.

Suggested fix
 def run_assessment_pipeline(
-    self,
+    self: celery.Task,
     run_id: int,
     organization_id: int,
     project_id: int,
     trace_id: str,
-    **kwargs,
-):
+) -> None:

As per coding guidelines, **/*.py: Always add type hints to all function parameters and return values in Python code.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def run_assessment_pipeline(
self,
run_id: int,
organization_id: int,
project_id: int,
trace_id: str,
**kwargs,
):
def run_assessment_pipeline(
self: celery.Task,
run_id: int,
organization_id: int,
project_id: int,
trace_id: str,
) -> None:
🧰 Tools
🪛 Ruff (0.15.15)

[warning] 243-243: Unused function argument: kwargs

(ARG001)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/celery/tasks/job_execution.py` around lines 237 - 244, The
function run_assessment_pipeline currently accepts but silently drops **kwargs
and lacks type annotations; update its signature to include proper type hints
(e.g., run_id: int, organization_id: int, project_id: int, trace_id: str,
**kwargs: Any) and add a return type (likely -> None). Then either forward
**kwargs into the downstream pipeline invocation inside run_assessment_pipeline
(so extra task args are preserved) or explicitly validate/raise (TypeError) for
unexpected keys if they must not be accepted; ensure you import Any from typing
and apply the change where run_assessment_pipeline is defined.

Comment thread backend/app/crud/assessment/cron.py
Comment thread backend/app/crud/assessment/processing.py Outdated
Comment thread backend/app/models/assessment.py
Comment on lines +100 to +106
data = json.loads(out.get("output") or "")
decision = str(data.get("decision", "ACCEPT")).upper()
column_relevance = {
k: bool(v)
for k, v in data.items()
if k not in ("decision", "reasoning")
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Parse only real booleans from model output.

bool(v) turns "false", "0", and any other non-empty string into True. If the provider returns a schema-shaped but loosely typed payload, column_relevance will be inverted in the exported topic-relevance data. Accept only bool values here, or explicitly parse "true" / "false" strings.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/services/assessment/prefilter/topic_relevance.py` around lines
100 - 106, The current parsing builds column_relevance with bool(v), which
converts non-empty strings like "false" to True; update the construction of
column_relevance (the dict built from data in the same block that reads
out.get("output") and sets decision) to accept only real booleans or explicit
"true"/"false" strings: iterate data.items() (excluding "decision" and
"reasoning") and set each value to True only when isinstance(v, bool) and v is
True, or when isinstance(v, str) and v.strip().lower() == "true" (treat "false"
as False); leave other types as False (or omit) so inverted/string values do not
become True unintentionally.

Comment thread backend/app/services/assessment/stages.py Outdated
Comment on lines +95 to +101
def _dispatch(run_id: int, organization_id: int, project_id: int) -> None:
run_assessment_pipeline.delay(
run_id=run_id,
organization_id=organization_id,
project_id=project_id,
trace_id=correlation_id.get() or "",
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Advance the stage only after the follow-up task is successfully queued.

_persist_advance() commits the new stage and then calls _dispatch(). If run_assessment_pipeline.delay(...) fails, the run is left advanced to the next stage with no task scheduled to process it, and the new failure guard never runs because the worker was never started.

Also applies to: 266-274

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/services/assessment/tasks.py` around lines 95 - 101, The run is
being advanced before the follow-up Celery task is guaranteed to be queued, so
if run_assessment_pipeline.delay(...) fails the run stays advanced with no
worker to process it; fix by ensuring dispatch succeeds before committing the
new stage: either move the call to _dispatch so it runs (and returns
successfully) before _persist_advance commits, or wrap dispatch in
transaction.on_commit (or catch exceptions from
run_assessment_pipeline.delay/apply_async and abort the commit) so that
_persist_advance only runs after the task has been successfully queued; apply
the same change to the other occurrence around run_assessment_pipeline.delay at
lines 266-274 (referencing _dispatch, _persist_advance, and
run_assessment_pipeline.delay/apply_async).

Comment thread backend/app/services/assessment/utils/attachments.py Outdated
Comment thread backend/app/services/assessment/utils/export.py
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
backend/app/services/assessment/prefilter/topic_relevance.py (1)

64-67: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

cell.strip() can raise on None attachment cells.

row.get(att.column, "") yields None when the key exists with a None value (blank Excel cell), so cell.strip() raises AttributeError. _record_text already guards with or ''; mirror that here.

🛡️ Proposed fix
-            cell = row.get(att.column, "")
+            cell = row.get(att.column) or ""
             if not cell.strip():
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/services/assessment/prefilter/topic_relevance.py` around lines 64
- 67, The loop over attachments uses cell = row.get(att.column, "") and then
calls cell.strip(), which can raise AttributeError if the dict key exists with
value None; change the guard to coerce None to empty string before stripping
(e.g., use row.get(att.column) or '' or otherwise set cell =
(row.get(att.column) or '') ) so cell.strip() is always safe; update the loop in
topic_relevance.py where attachments, att.column and cell.strip() are used to
mirror the _record_text guard.
backend/app/services/assessment/prefilter/duplicate_detection.py (1)

54-58: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Guard against None cell values.

row.get(col, "") returns None when the column key exists with a None value (common for blank Excel cells), so .strip() raises AttributeError. Use (row.get(col) or "") to stay consistent with the defensive pattern in topic_relevance._record_text.

🛡️ Proposed fix
-    parts = [
-        f"{col}:\n{row.get(col, '')}" for col in columns if row.get(col, "").strip()
-    ]
+    parts = [
+        f"{col}:\n{row.get(col) or ''}"
+        for col in columns
+        if (row.get(col) or "").strip()
+    ]
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/services/assessment/prefilter/duplicate_detection.py` around
lines 54 - 58, The _combined_text function can raise AttributeError because
row.get(col, "") may return None for existing keys with None values; update the
comprehension to use (row.get(col) or "") everywhere (e.g.,
f"{col}:\n{row.get(col) or ''}" and if (row.get(col) or "").strip()) so .strip()
never runs on None—match the defensive pattern used in
topic_relevance._record_text.
🧹 Nitpick comments (1)
backend/app/tests/assessment/test_topic_relevance.py (1)

14-15: 💤 Low value

Add a return type hint to _gemini().

As per coding guidelines ("Always add type hints to all function parameters and return values in Python code"), annotate the helper's return type.

✏️ Proposed fix
-def _gemini():
+def _gemini() -> Any:
     return patch.object(constants, "ASSESSMENT_PREFILTER_PROVIDER", "google")
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/tests/assessment/test_topic_relevance.py` around lines 14 - 15,
Add a return type hint to the helper `_gemini()` by annotating its signature to
return the patch object type from unittest.mock (e.g., change `def _gemini():`
to `def _gemini() -> unittest.mock._patch:`) and ensure `unittest.mock` is
imported in the test module so the annotation resolves; keep the function body
(`return patch.object(constants, "ASSESSMENT_PREFILTER_PROVIDER", "google")`)
unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@backend/app/services/assessment/prefilter/duplicate_detection.py`:
- Around line 54-58: The _combined_text function can raise AttributeError
because row.get(col, "") may return None for existing keys with None values;
update the comprehension to use (row.get(col) or "") everywhere (e.g.,
f"{col}:\n{row.get(col) or ''}" and if (row.get(col) or "").strip()) so .strip()
never runs on None—match the defensive pattern used in
topic_relevance._record_text.

In `@backend/app/services/assessment/prefilter/topic_relevance.py`:
- Around line 64-67: The loop over attachments uses cell = row.get(att.column,
"") and then calls cell.strip(), which can raise AttributeError if the dict key
exists with value None; change the guard to coerce None to empty string before
stripping (e.g., use row.get(att.column) or '' or otherwise set cell =
(row.get(att.column) or '') ) so cell.strip() is always safe; update the loop in
topic_relevance.py where attachments, att.column and cell.strip() are used to
mirror the _record_text guard.

---

Nitpick comments:
In `@backend/app/tests/assessment/test_topic_relevance.py`:
- Around line 14-15: Add a return type hint to the helper `_gemini()` by
annotating its signature to return the patch object type from unittest.mock
(e.g., change `def _gemini():` to `def _gemini() -> unittest.mock._patch:`) and
ensure `unittest.mock` is imported in the test module so the annotation
resolves; keep the function body (`return patch.object(constants,
"ASSESSMENT_PREFILTER_PROVIDER", "google")`) unchanged.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: 418ba255-ee89-44b8-8fa5-96885a78d03e

📥 Commits

Reviewing files that changed from the base of the PR and between 4a4e4f8 and bb30f88.

📒 Files selected for processing (12)
  • backend/app/crud/assessment/processing.py
  • backend/app/services/assessment/prefilter/constants.py
  • backend/app/services/assessment/prefilter/duplicate_detection.py
  • backend/app/services/assessment/prefilter/request_builder.py
  • backend/app/services/assessment/prefilter/topic_relevance.py
  • backend/app/services/assessment/stages.py
  • backend/app/services/assessment/tasks.py
  • backend/app/services/assessment/utils/attachments.py
  • backend/app/tests/assessment/test_batch.py
  • backend/app/tests/assessment/test_prefilter_batching.py
  • backend/app/tests/assessment/test_processing.py
  • backend/app/tests/assessment/test_topic_relevance.py
✅ Files skipped from review due to trivial changes (1)
  • backend/app/services/assessment/prefilter/constants.py
🚧 Files skipped from review as they are similar to previous changes (5)
  • backend/app/tests/assessment/test_processing.py
  • backend/app/services/assessment/prefilter/request_builder.py
  • backend/app/crud/assessment/processing.py
  • backend/app/services/assessment/tasks.py
  • backend/app/services/assessment/stages.py

ASSESSMENT_PREFILTER_MODEL: str = "gpt-5-mini"

# File-search/vector store holding the corpus for duplicate detection.
ASSESSMENT_PREFILTER_DUPLICATE_STORE: str = "vs_6a20339fbc148191867fd06d29133278"
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kartpop This provider will be used for the pre-filter (L1) stage. I've also added the OpenAI vector store knowledge base that will be used for duplicate detection.


Handles MIME type detection, base64 decoding, Google Drive URL normalization,
data-URL parsing, and conversion of dataset cell values into provider input objects.
URL-only: dataset cells hold attachment URLs. Handles Google Drive URL
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kartpop using base64 inline data becoming expensive in celery task while building jsonl file which leads to sigkill or timeout issues in celery or sometime OOM issues as well. need to think about some better approach for handling base64 or else completely ditch this base64 approach and keep url only

@@ -60,18 +63,6 @@ def to_direct_attachment_url(url: str, attachment_type: str) -> str:
return f"https://drive.google.com/uc?export=download&id={file_id}"
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Google Drive is inquilab specific , in future we don't need to handle Google Drive specific case for attachements


Returns 'image'/'pdf', or None to let normal detection (extension/declared) decide.
"""
type_column = getattr(att, "type_column", None)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kartpop This function decides whether a row's file is an image or a PDF for "Mixed" attachment columns. On the UI, the user picks a "Type column" (e.g. "DOC type") and lists which values mean Image vs PDF. For each row, the function reads that column's value and matches it to image or PDF. If the value is empty, unknown, or maps to both, it returns nothing and lets normal file-extension detection decide.

Image Image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request ready-for-review

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Assessment: L1 pipeline for L2 and Post-Processing

3 participants