Assessment: Implement L1 Filtering and Post-Processing#895
Conversation
…icate detection - Added L1 pipeline orchestrator to run topic relevance and duplicate detection filters in series. - Introduced duplicate detection logic to filter out vague submissions and check for duplicates against a corpus. - Created topic relevance filter to assess the relevance of submissions based on user-defined criteria. - Integrated L1 results handling in the assessment service, allowing for conditional processing based on L1 outcomes. - Updated export functionality to include L1 results in the output, enhancing the assessment reporting capabilities. - Added Celery task for executing the L1 pipeline and managing assessment run statuses.
|
Warning Review limit reached
More reviews will be available in 28 minutes and 41 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (14)
📝 WalkthroughWalkthroughThis PR implements a multi-stage assessment pipeline infrastructure with prefilter gates (topic relevance and duplicate detection), export-time post-processing (computed columns, filtering, sorting), async task orchestration, run resumption, and comprehensive attachment-type routing. Assessment execution moves from inline batch submission to stage-based Celery dispatch, with polling-based completion detection and stage-wise result accumulation. ChangesL1 Assessment Pipeline & Post-Processing
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related PRs
Suggested reviewers
Poem
✨ Finishing Touches🧪 Generate unit tests (beta)
|
…plicate detection
…ed type detection and improved utility functions
…ctor batch submission to Celery task
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Actionable comments posted: 6
🧹 Nitpick comments (7)
backend/app/crud/assessment/batch.py (1)
306-316: 💤 Low valueDocstring is missing documentation for new parameters.
The
preloaded_rowsandrow_indicesparameters are not documented in the docstring.📝 Suggested docstring update
Args: session: Database session run: The AssessmentRun to process dataset: The dataset to read rows from config_blob: Resolved configuration blob assessment_input: Assessment input config (prompt_template, text_columns, etc.) organization_id: Organization ID project_id: Project ID + preloaded_rows: Pre-filtered rows to use instead of loading from dataset (e.g., post-L1) + row_indices: Original row indices for custom_id/key mapping when using filtered rows🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/crud/assessment/batch.py` around lines 306 - 316, The docstring for the function in backend/app/crud/assessment/batch.py is missing entries for the new parameters preloaded_rows and row_indices; update the Args section to document both: describe preloaded_rows as an optional sequence/list of pre-fetched dataset rows provided to avoid re-reading from the dataset (include expected element type, e.g., dict/Row), and describe row_indices as an optional sequence/list of integer indices specifying which dataset rows to process from the dataset/preloaded_rows; keep wording consistent with the existing entries (session, run, dataset, config_blob, assessment_input, organization_id, project_id) and ensure the new parameter descriptions include expected types and behavior when omitted.backend/app/services/assessment/utils/attachments.py (1)
100-114: ⚖️ Poor tradeoffConsider adding HEIC/HEIF magic byte detection.
The
_IMAGE_MIME_BY_EXTdictionary includes.heicand.heifextensions, but_image_mime_from_magicdoesn't detect these formats via magic bytes. HEIC/HEIF files start with aftypbox containing brand identifiers likeheic,heix,mif1.This means HEIC URLs without extensions will fail magic-byte detection and fall back to Content-Type probing, which may not always be reliable.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/assessment/utils/attachments.py` around lines 100 - 114, The _image_mime_from_magic function is missing HEIC/HEIF detection; extend it to detect the ISO BMFF 'ftyp' box by checking blob length (>=12), that blob[4:8] == b'ftyp', and that the brand field (e.g., blob[8:12] and/or subsequent bytes) matches known HEIC/HEIF brands like b'heic', b'heix', b'hevc', b'hevx', b'mif1' (or similar), and return the appropriate HEIF mime (e.g., "image/heif") when matched; this aligns magic detection with the _IMAGE_MIME_BY_EXT entries .heic/.heif so files without extensions are recognized.backend/app/core/config.py (1)
177-179: ⚡ Quick winTenant-specific store id as a global default is risky.
fileSearchStores/inquilabcorpus-782mxjcwisazlooks like a corpus belonging to a specific tenant. Shipping it as the built-in default means any deployment that forgets to set this env var silently runs duplicate detection against that corpus. Consider leaving the default empty and failing fast (or gating L1) when it's unset.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/core/config.py` around lines 177 - 179, The constant ASSESSMENT_L1_DUPLICATE_STORE_NAME currently hardcodes a tenant-specific store id; change its default to an empty string (or None) and add a startup/config validation that fails fast or disables/gates L1 duplicate detection if ASSESSMENT_L1_DUPLICATE_STORE_NAME is unset. Update the definition of ASSESSMENT_L1_DUPLICATE_STORE_NAME in config.py to be empty and add a clear validation check in the app initialization path (e.g., config validation or startup/init_assessment logic) that raises an error or disables L1 when the value is empty.backend/app/services/assessment/tasks.py (1)
13-13: 💤 Low valueImporting private function
_load_dataset_rowsfrom another module.The function
_load_dataset_rowsis prefixed with underscore, indicating it's intended as a module-private implementation detail. Importing it across module boundaries couples this code to an internal API that could change without notice.Consider either removing the underscore prefix in the batch module to make it public, or adding a public wrapper function for cross-module use.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/assessment/tasks.py` at line 13, The tasks.py module is importing the private function _load_dataset_rows from app.crud.assessment.batch; update usage to rely on a public API instead: add a public wrapper (e.g., load_dataset_rows) or rename the function in app.crud.assessment.batch to remove the leading underscore, then change the import in backend/app/services/assessment/tasks.py to import load_dataset_rows (and keep submit_assessment_batch as-is), ensuring callers reference the new public symbol rather than the module-private _load_dataset_rows.backend/app/services/assessment/utils/export.py (1)
674-696: 💤 Low valuePost-processing config not passed in multi-run zip export.
When exporting multiple runs as a zip file,
serialize_export_rowsis called withoutpost_processing_config(line 684), meaning computed columns, filtering, and sorting won't be applied. This may be intentional since each run could have different configs, but if users expect post-processing to apply consistently, this could be surprising.Consider whether post-processing should be applied per-run in the zip export path, or document this limitation.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/assessment/utils/export.py` around lines 674 - 696, The zip export omits post-processing because serialize_export_rows is called without post_processing_config for each run; update the multi-run export loop (the code iterating over runs_with_rows) to pass the appropriate post_processing_config into serialize_export_rows so computed columns/filtering/sorting are applied per-run (e.g., serialize_export_rows(rows, export_format, post_processing_config=post_processing_config)), or if per-run configs are needed, derive a per-run post_processing_config and pass that instead; ensure the variable name you pass matches the surrounding export function's parameter or local (post_processing_config) and keep export_format and file naming logic unchanged.backend/app/services/assessment/utils/post_processing.py (1)
144-158: ⚡ Quick winLoop variable
descnot bound in closure.The static analysis warning is valid:
descis captured by reference from the enclosing loop. While this works correctly here becausesorted()evaluates keys immediately before the next iteration, it's fragile and can break if the code is refactored (e.g., ifsort_keywere stored for later use).Bind
descas a default argument like_colalready is:♻️ Proposed fix
- def sort_key(row: dict[str, Any], _col: str = col) -> tuple: + def sort_key(row: dict[str, Any], _col: str = col, _desc: bool = desc) -> tuple: val = row.get(_col) if val is None: return (1, 0, "") try: - return (0, -float(val) if desc else float(val), "") + return (0, -float(val) if _desc else float(val), "") except (TypeError, ValueError): s = str(val).lower() return ( (0, 0, s) - if not desc - else (0, 0, "".join(chr(0x10FFFF - ord(c)) for c in s)) + if not _desc + else (0, 0, "".join(chr(0x10FFFF - ord(c)) for c in s)) )🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/assessment/utils/post_processing.py` around lines 144 - 158, The closure sort_key captures the loop variable desc by reference which is fragile; update sort_key signature to bind desc as a default parameter (similar to _col) so it captures the current boolean value at definition time (e.g., def sort_key(row: dict[str, Any], _col: str = col, _desc: bool = desc): ...) and then use _desc inside the function when negating or inverting string order; keep the sorted(result, key=sort_key) call unchanged.backend/app/services/assessment/l1/pipeline.py (1)
121-135: Consider Gemini rate limits withASSESSMENT_L1_CONCURRENT_WORKERS.Both filters fan out one Gemini request per row at
workersconcurrency with no client-side throttling or retry/backoff. On large datasets this can trip provider rate limits, surfacing as per-row errors (ACCEPT/ERROR defaults) that silently degrade L1 quality. Worth bounding concurrency relative to the model's quota or adding backoff on 429s.Also applies to: 166-179
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/assessment/l1/pipeline.py` around lines 121 - 135, The concurrent ThreadPoolExecutor fan-out calling run_topic_relevance (and the similar block that submits run_answer_reasoning) can exceed Gemini rate limits; change the concurrency model to use a bounded semaphore or token bucket so actual in-flight Gemini requests are limited (e.g., cap at a safe value derived from ASSESSMENT_L1_CONCURRENT_WORKERS and model quota) and instrument request-level retries with exponential backoff and jitter for 429/5xx responses inside the Gemini call site used by run_topic_relevance (and the other submitted function), or wrap the client calls in a retry helper/ decorator that catches rate-limit responses and retries with backoff and a max attempts counter; update both executor submission sites to acquire the semaphore/token before submitting or perform semaphore acquisition inside the worker functions to ensure global throttling.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@backend/app/celery/tasks/job_execution.py`:
- Around line 235-257: The task run_assessment_run is missing the gevent timeout
decorator used elsewhere (e.g., run_tts_result_processing); update
run_assessment_run to include
`@gevent_timeout`(settings.CELERY_TASK_SOFT_TIME_LIMIT, ...) above the task
definition (or provide a code comment justification in the function explaining
why gevent-based timeout/diagnostics are intentionally omitted) so timeout
behavior and logging are consistent with other tasks using gevent_timeout.
In `@backend/app/crud/assessment/batch.py`:
- Around line 174-178: Validate that row_indices, when provided, has the same
length as rows before the loop that iterates rows (the block initializing
type_cache and the for i, row in enumerate(rows) loop), and raise a clear
ValueError (or similar) if lengths differ; add the same check to the
build_google_jsonl function where row_indices is also accepted to prevent
IndexError or unused indices. Specifically, check if row_indices is not None and
len(row_indices) != len(rows) and fail early with a descriptive message
referencing rows and row_indices so callers must supply matching-length lists.
In `@backend/app/services/assessment/l1/duplicate_detection.py`:
- Around line 96-108: The generate_content call in duplicate_detection.py
(gemini_client.models.generate_content) lacks an http timeout and can hang when
file-search is slow; update this call to pass
http_options=types.HttpOptions(timeout=...) (same timeout value used in the
other fixes) so that _call_file_search and _check_vague have bounded HTTP
timeouts when dispatching concurrent file-search requests.
In `@backend/app/services/assessment/l1/topic_relevance.py`:
- Line 90: The response_schema currently uses lowercase OpenAPI type strings
(see output_schema) which the google-genai SDK expects in its internal
enum/casing (e.g., "OBJECT", "BOOLEAN"); update the construction of
output_schema used for response_schema so type values use the SDK's Type
constants (e.g., genai.types.Type.OBJECT/BOOLEAN) or the exact uppercase strings
the SDK requires, ensuring nested properties and arrays are converted likewise;
locate where output_schema is defined and replace lowercase
"object"/"boolean"/"array"/"string"/"integer" with the corresponding genai Type
constants or uppercase enum names so google-genai validation/structured output
will apply correctly.
- Around line 84-93: Add a per-request Gemini timeout and guard ThreadPool
futures: introduce ASSESSMENT_L1_REQUEST_TIMEOUT_MS in core/config.py (default
value in milliseconds), then pass an HttpOptions(timeout=...) to
gemini_client.models.generate_content calls in topic_relevance.py and both call
sites in duplicate_detection.py (convert ms -> seconds if HttpOptions expects
seconds), and update pipeline.py to call fut.result(timeout=...) using the same
timeout (converted to seconds) so ThreadPoolExecutor futures won't hang
indefinitely; reference the gemini_client.models.generate_content call sites,
types.HttpOptions (or HttpOptions), the ASSESSMENT_L1_REQUEST_TIMEOUT_MS config
symbol, and the fut.result(timeout=...) change in pipeline.py.
In `@backend/app/services/assessment/utils/attachments.py`:
- Around line 180-209: The _probe_url_type function currently calls requests.get
on user-provided URLs without SSRF protections; before making the request (and
before using _drive_file_id result) validate the URL: enforce allowed schemes
(e.g., require https), resolve the hostname to IP and block
localhost/private/reserved ranges (IPv4/IPv6), and check against an allowlist of
approved hosts or domains; if validation fails log and return None. Also disable
or tightly limit redirects (set allow_redirects=False or a small max), and
ensure the validation is re-run on redirected locations if redirects are
allowed.
---
Nitpick comments:
In `@backend/app/core/config.py`:
- Around line 177-179: The constant ASSESSMENT_L1_DUPLICATE_STORE_NAME currently
hardcodes a tenant-specific store id; change its default to an empty string (or
None) and add a startup/config validation that fails fast or disables/gates L1
duplicate detection if ASSESSMENT_L1_DUPLICATE_STORE_NAME is unset. Update the
definition of ASSESSMENT_L1_DUPLICATE_STORE_NAME in config.py to be empty and
add a clear validation check in the app initialization path (e.g., config
validation or startup/init_assessment logic) that raises an error or disables L1
when the value is empty.
In `@backend/app/crud/assessment/batch.py`:
- Around line 306-316: The docstring for the function in
backend/app/crud/assessment/batch.py is missing entries for the new parameters
preloaded_rows and row_indices; update the Args section to document both:
describe preloaded_rows as an optional sequence/list of pre-fetched dataset rows
provided to avoid re-reading from the dataset (include expected element type,
e.g., dict/Row), and describe row_indices as an optional sequence/list of
integer indices specifying which dataset rows to process from the
dataset/preloaded_rows; keep wording consistent with the existing entries
(session, run, dataset, config_blob, assessment_input, organization_id,
project_id) and ensure the new parameter descriptions include expected types and
behavior when omitted.
In `@backend/app/services/assessment/l1/pipeline.py`:
- Around line 121-135: The concurrent ThreadPoolExecutor fan-out calling
run_topic_relevance (and the similar block that submits run_answer_reasoning)
can exceed Gemini rate limits; change the concurrency model to use a bounded
semaphore or token bucket so actual in-flight Gemini requests are limited (e.g.,
cap at a safe value derived from ASSESSMENT_L1_CONCURRENT_WORKERS and model
quota) and instrument request-level retries with exponential backoff and jitter
for 429/5xx responses inside the Gemini call site used by run_topic_relevance
(and the other submitted function), or wrap the client calls in a retry helper/
decorator that catches rate-limit responses and retries with backoff and a max
attempts counter; update both executor submission sites to acquire the
semaphore/token before submitting or perform semaphore acquisition inside the
worker functions to ensure global throttling.
In `@backend/app/services/assessment/tasks.py`:
- Line 13: The tasks.py module is importing the private function
_load_dataset_rows from app.crud.assessment.batch; update usage to rely on a
public API instead: add a public wrapper (e.g., load_dataset_rows) or rename the
function in app.crud.assessment.batch to remove the leading underscore, then
change the import in backend/app/services/assessment/tasks.py to import
load_dataset_rows (and keep submit_assessment_batch as-is), ensuring callers
reference the new public symbol rather than the module-private
_load_dataset_rows.
In `@backend/app/services/assessment/utils/attachments.py`:
- Around line 100-114: The _image_mime_from_magic function is missing HEIC/HEIF
detection; extend it to detect the ISO BMFF 'ftyp' box by checking blob length
(>=12), that blob[4:8] == b'ftyp', and that the brand field (e.g., blob[8:12]
and/or subsequent bytes) matches known HEIC/HEIF brands like b'heic', b'heix',
b'hevc', b'hevx', b'mif1' (or similar), and return the appropriate HEIF mime
(e.g., "image/heif") when matched; this aligns magic detection with the
_IMAGE_MIME_BY_EXT entries .heic/.heif so files without extensions are
recognized.
In `@backend/app/services/assessment/utils/export.py`:
- Around line 674-696: The zip export omits post-processing because
serialize_export_rows is called without post_processing_config for each run;
update the multi-run export loop (the code iterating over runs_with_rows) to
pass the appropriate post_processing_config into serialize_export_rows so
computed columns/filtering/sorting are applied per-run (e.g.,
serialize_export_rows(rows, export_format,
post_processing_config=post_processing_config)), or if per-run configs are
needed, derive a per-run post_processing_config and pass that instead; ensure
the variable name you pass matches the surrounding export function's parameter
or local (post_processing_config) and keep export_format and file naming logic
unchanged.
In `@backend/app/services/assessment/utils/post_processing.py`:
- Around line 144-158: The closure sort_key captures the loop variable desc by
reference which is fragile; update sort_key signature to bind desc as a default
parameter (similar to _col) so it captures the current boolean value at
definition time (e.g., def sort_key(row: dict[str, Any], _col: str = col, _desc:
bool = desc): ...) and then use _desc inside the function when negating or
inverting string order; keep the sorted(result, key=sort_key) call unchanged.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 133027e3-00b7-48eb-985b-ce88b25541f8
📒 Files selected for processing (25)
backend/app/alembic/versions/064_add_l1_columns_to_assessment_run.pybackend/app/api/docs/assessment/update_post_processing.mdbackend/app/api/routes/assessment/runs.pybackend/app/celery/tasks/job_execution.pybackend/app/core/config.pybackend/app/crud/assessment/__init__.pybackend/app/crud/assessment/batch.pybackend/app/crud/assessment/core.pybackend/app/crud/assessment/cron.pybackend/app/models/assessment.pybackend/app/services/assessment/l1/__init__.pybackend/app/services/assessment/l1/duplicate_detection.pybackend/app/services/assessment/l1/pipeline.pybackend/app/services/assessment/l1/topic_relevance.pybackend/app/services/assessment/service.pybackend/app/services/assessment/tasks.pybackend/app/services/assessment/utils/attachments.pybackend/app/services/assessment/utils/export.pybackend/app/services/assessment/utils/post_processing.pybackend/app/tests/assessment/test_batch.pybackend/app/tests/assessment/test_cron.pybackend/app/tests/assessment/test_crud.pybackend/app/tests/assessment/test_export.pybackend/app/tests/assessment/test_service.pybackend/app/tests/assessment/test_topic_relevance.py
…ipeline orchestrator
… detection - Added a new prefilter pipeline orchestrator that runs topic relevance and duplicate detection filters in series. - Created `run_topic_relevance` and `run_duplicate_detection` functions to handle respective filtering logic. - Updated assessment service to utilize prefilter configuration instead of L1 configuration. - Modified assessment tasks to reflect the new prefilter processing status and error handling. - Adjusted utility functions and export logic to accommodate prefilter results. - Enhanced tests to cover the new prefilter functionality and ensure proper integration.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/services/assessment/tasks.py (1)
45-52:⚠️ Potential issue | 🟠 Major | ⚡ Quick winRun left in dangling state when parent assessment is missing.
When the parent
Assessmentis not found, the run remains in its current status (likelypending) without being marked as failed. This leaves an orphaned run that won't be picked up by cron polling or cleaned up.Proposed fix
assessment = session.get(Assessment, run.assessment_id) if assessment is None: logger.error( "[execute_assessment_run] parent assessment %s not found for run %s", run.assessment_id, run_id, ) + update_assessment_run_status( + session=session, + run=run, + status="failed", + error_message="Parent assessment not found.", + ) return🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/assessment/tasks.py` around lines 45 - 52, When the parent Assessment is missing in execute_assessment_run, the run (variable run) must be transitioned out of the dangling state: set run.status to a failure state (e.g., RunStatus.FAILED or equivalent), populate run.error or run.finished_at with a brief message/timestamp indicating "parent assessment not found", persist the change via session.add()/session.commit() (or session.flush()/session.commit() as per existing patterns), and then log the error; update the block that currently returns after logger.error to perform these updates on run and commit before returning.
🧹 Nitpick comments (6)
backend/app/alembic/versions/064_add_prefilter_columns_to_assessment_run.py (1)
25-25: 💤 Low valueMinor typo in column comment.
The comment says "prefilter filter results" which has a redundant "filter".
Suggested fix
- comment="S3 URL of stored prefilter filter results JSON", + comment="S3 URL of stored prefilter results JSON",🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/alembic/versions/064_add_prefilter_columns_to_assessment_run.py` at line 25, Fix the typo in the column comment inside the migration file 064_add_prefilter_columns_to_assessment_run.py by changing the comment string from "S3 URL of stored prefilter filter results JSON" to "S3 URL of stored prefilter results JSON" where the column is defined (the place that currently has comment="S3 URL of stored prefilter filter results JSON"); ensure you update the same comment text in both the upgrade and any corresponding downgrade/column definition blocks if duplicated.backend/app/tests/assessment/test_batch.py (1)
476-481: ⚡ Quick winPreserve the validator’s URL-return contract in these probe tests.
These patches replace
validate_callback_urlwith a bare mock, sodetect_item_typecan pass a mock object intorequests.getand the assertions still succeed. The redirect case also only checks call counts, so it would miss a regression where the revalidated URL is ignored. Please make the validator mock return the input URL (for example viaside_effect=lambda url: url) and assert the validated URL(s) passed torequests.get.Also applies to: 495-500, 507-512, 529-545, 559-564
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/tests/assessment/test_batch.py` around lines 476 - 481, The tests patching app.services.assessment.utils.attachments.validate_callback_url must preserve its URL-return contract: change those bare mocks to return the input URL (e.g., use side_effect=lambda url: url) so detect_item_type receives a real URL string rather than a Mock; then update assertions to verify the validated URL(s) were passed into requests.get (inspect mock_get.call_args / call_args_list) instead of only checking call counts. Apply this change to the mocks around validate_callback_url and requests.get in the probe tests (the blocks referencing validate_callback_url and mock_get at the shown locations) so redirects and revalidation actually use the validated URL.backend/app/tests/assessment/test_crud.py (2)
321-329: ⚡ Quick winVerify
flag_modifiedin theinput=Nonepath.This branch is the one most likely to miss JSON dirty tracking, but the test patches
flag_modifiedwithout asserting it was called. Checkingflag_modified(run, "input")here would lock down the persistence contract for the newly initialized blob.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/tests/assessment/test_crud.py` around lines 321 - 329, The test test_none_input_handled currently patches flag_modified but doesn't assert it was invoked; update the test so after calling update_run_post_processing_config(session=session, run=run, config=None) you assert flag_modified was called with the run object and the "input" attribute (e.g., assert flag_modified.called and/or assert flag_modified.call_args showing (run, "input")), keeping the existing patch of flag_modified and verifying the persistence contract for update_run_post_processing_config.
227-246: ⚡ Quick winAssert the new prefilter counters in
build_run_stats.This fixture only proves the extra fields do not crash when they are
None. It will still pass ifbuild_run_statssilently dropsprefilter_total_rows,prefilter_total_passed, orprefilter_total_rejected. Please add a case with concrete values and assert they are exposed onstats[0].🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/tests/assessment/test_crud.py` around lines 227 - 246, The test currently only covers None prefilter counters; add a second run in test_build_run_stats with concrete integer values for prefilter_total_rows, prefilter_total_passed, and prefilter_total_rejected so build_run_stats must preserve them; then assert on stats entry (e.g., stats[1] or the matching run object) that .prefilter_total_rows, .prefilter_total_passed, and .prefilter_total_rejected equal the expected integers to ensure build_run_stats exposes these fields unchanged. Ensure you reference the same SimpleNamespace fixture and the build_run_stats function when adding the new case.backend/app/tests/assessment/test_pipeline.py (2)
98-123: ⚡ Quick winCover original row-index preservation in the duplicate-detection path.
This case only exercises a surviving row at index
0, so it cannot catch a regression where the pipeline renumbers passed rows before callingrun_duplicate_detection. Since the downstream batch/export flow depends on original row indices, please add a case where an earlier row is filtered out and assert the duplicate-detection call still receives the original index.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/tests/assessment/test_pipeline.py` around lines 98 - 123, Add a second scenario in test_duplicate_detection_runs_on_passed_rows where an earlier row is filtered out so the surviving row is not at index 0 (e.g., three rows where the first is irrelevant and the second survives); call run_prefilter_pipeline the same way and assert dup_mock (from _patches) was called with the original row identifier (e.g., "row_1" or matching the original index) and that results preserves the original index in duplicate_detection, ensuring run_duplicate_detection receives original row indices.
25-59: ⚡ Quick winAdd type hints to
_patches.
tr_side,dup_return, and the return value are currently untyped in changed Python code.As per coding guidelines,
**/*.py: Always add type hints to all function parameters and return values in Python code.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/tests/assessment/test_pipeline.py` around lines 25 - 59, Add explicit type hints to the _patches function: annotate parameters tr_side and dup_return as Optional[Any] (or a more specific Union if you know the exact types used as side_effect/return), and annotate the return type as Tuple[MagicMock, MagicMock]; update the function signature (def _patches(stack: ExitStack, *, tr_side: Optional[Any] = None, dup_return: Optional[Any] = None) -> Tuple[MagicMock, MagicMock]:) and add the necessary imports (from typing import Optional, Any, Tuple and ensure MagicMock is imported from unittest.mock) so the function and its return are fully typed.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@backend/app/services/assessment/tasks.py`:
- Around line 45-52: When the parent Assessment is missing in
execute_assessment_run, the run (variable run) must be transitioned out of the
dangling state: set run.status to a failure state (e.g., RunStatus.FAILED or
equivalent), populate run.error or run.finished_at with a brief
message/timestamp indicating "parent assessment not found", persist the change
via session.add()/session.commit() (or session.flush()/session.commit() as per
existing patterns), and then log the error; update the block that currently
returns after logger.error to perform these updates on run and commit before
returning.
---
Nitpick comments:
In `@backend/app/alembic/versions/064_add_prefilter_columns_to_assessment_run.py`:
- Line 25: Fix the typo in the column comment inside the migration file
064_add_prefilter_columns_to_assessment_run.py by changing the comment string
from "S3 URL of stored prefilter filter results JSON" to "S3 URL of stored
prefilter results JSON" where the column is defined (the place that currently
has comment="S3 URL of stored prefilter filter results JSON"); ensure you update
the same comment text in both the upgrade and any corresponding downgrade/column
definition blocks if duplicated.
In `@backend/app/tests/assessment/test_batch.py`:
- Around line 476-481: The tests patching
app.services.assessment.utils.attachments.validate_callback_url must preserve
its URL-return contract: change those bare mocks to return the input URL (e.g.,
use side_effect=lambda url: url) so detect_item_type receives a real URL string
rather than a Mock; then update assertions to verify the validated URL(s) were
passed into requests.get (inspect mock_get.call_args / call_args_list) instead
of only checking call counts. Apply this change to the mocks around
validate_callback_url and requests.get in the probe tests (the blocks
referencing validate_callback_url and mock_get at the shown locations) so
redirects and revalidation actually use the validated URL.
In `@backend/app/tests/assessment/test_crud.py`:
- Around line 321-329: The test test_none_input_handled currently patches
flag_modified but doesn't assert it was invoked; update the test so after
calling update_run_post_processing_config(session=session, run=run, config=None)
you assert flag_modified was called with the run object and the "input"
attribute (e.g., assert flag_modified.called and/or assert
flag_modified.call_args showing (run, "input")), keeping the existing patch of
flag_modified and verifying the persistence contract for
update_run_post_processing_config.
- Around line 227-246: The test currently only covers None prefilter counters;
add a second run in test_build_run_stats with concrete integer values for
prefilter_total_rows, prefilter_total_passed, and prefilter_total_rejected so
build_run_stats must preserve them; then assert on stats entry (e.g., stats[1]
or the matching run object) that .prefilter_total_rows, .prefilter_total_passed,
and .prefilter_total_rejected equal the expected integers to ensure
build_run_stats exposes these fields unchanged. Ensure you reference the same
SimpleNamespace fixture and the build_run_stats function when adding the new
case.
In `@backend/app/tests/assessment/test_pipeline.py`:
- Around line 98-123: Add a second scenario in
test_duplicate_detection_runs_on_passed_rows where an earlier row is filtered
out so the surviving row is not at index 0 (e.g., three rows where the first is
irrelevant and the second survives); call run_prefilter_pipeline the same way
and assert dup_mock (from _patches) was called with the original row identifier
(e.g., "row_1" or matching the original index) and that results preserves the
original index in duplicate_detection, ensuring run_duplicate_detection receives
original row indices.
- Around line 25-59: Add explicit type hints to the _patches function: annotate
parameters tr_side and dup_return as Optional[Any] (or a more specific Union if
you know the exact types used as side_effect/return), and annotate the return
type as Tuple[MagicMock, MagicMock]; update the function signature (def
_patches(stack: ExitStack, *, tr_side: Optional[Any] = None, dup_return:
Optional[Any] = None) -> Tuple[MagicMock, MagicMock]:) and add the necessary
imports (from typing import Optional, Any, Tuple and ensure MagicMock is
imported from unittest.mock) so the function and its return are fully typed.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 9fd400ef-931b-4af1-b578-6e595abd868b
📒 Files selected for processing (22)
backend/app/alembic/versions/064_add_prefilter_columns_to_assessment_run.pybackend/app/api/routes/assessment/runs.pybackend/app/celery/tasks/job_execution.pybackend/app/core/config.pybackend/app/crud/assessment/__init__.pybackend/app/crud/assessment/batch.pybackend/app/crud/assessment/core.pybackend/app/models/assessment.pybackend/app/services/assessment/prefilter/__init__.pybackend/app/services/assessment/prefilter/duplicate_detection.pybackend/app/services/assessment/prefilter/pipeline.pybackend/app/services/assessment/prefilter/topic_relevance.pybackend/app/services/assessment/service.pybackend/app/services/assessment/tasks.pybackend/app/services/assessment/utils/attachments.pybackend/app/services/assessment/utils/export.pybackend/app/tests/assessment/test_batch.pybackend/app/tests/assessment/test_crud.pybackend/app/tests/assessment/test_duplicate_detection.pybackend/app/tests/assessment/test_pipeline.pybackend/app/tests/assessment/test_post_processing.pybackend/app/tests/assessment/test_topic_relevance.py
✅ Files skipped from review due to trivial changes (1)
- backend/app/services/assessment/prefilter/init.py
🚧 Files skipped from review as they are similar to previous changes (6)
- backend/app/celery/tasks/job_execution.py
- backend/app/api/routes/assessment/runs.py
- backend/app/tests/assessment/test_topic_relevance.py
- backend/app/services/assessment/service.py
- backend/app/crud/assessment/batch.py
- backend/app/services/assessment/utils/attachments.py
- Consolidated and refactored tests for the prefilter pipeline and related services. - Introduced new tests for orchestrating assessment runs, ensuring proper handling of pipeline stages and statuses. - Added tests for the topic relevance request builder and result parser, improving coverage for attachment handling. - Implemented failure guards in task execution to prevent dangling runs and ensure proper error handling. - Enhanced the resume functionality for failed assessment runs, allowing for retries from the last failed stage.
OpenAPI changes 🟢 31 non-breaking changesTip Safe to merge from an API-contract perspective. Full changelog ·
|
| Method | Path | Change | |
|---|---|---|---|
| 🟢 | GET |
/api/v1/assessment/assessments |
added the optional property data/anyOf[subschema #1]/items/run_stats/items/prefilter_total_passed to the response with the 200 status |
| 🟢 | GET |
/api/v1/assessment/assessments |
added the optional property data/anyOf[subschema #1]/items/run_stats/items/prefilter_total_rejected to the response with the 200 status |
| 🟢 | GET |
/api/v1/assessment/assessments |
added the optional property data/anyOf[subschema #1]/items/run_stats/items/prefilter_total_rows to the response with the 200 status |
| 🟢 | GET |
/api/v1/assessment/assessments |
added the optional property data/anyOf[subschema #1]/items/run_stats/items/stage to the response with the 200 status |
| 🟢 | GET |
/api/v1/assessment/assessments |
added the optional property data/anyOf[subschema #1]/items/run_stats/items/stage_status to the response with the 200 status |
| 🟢 | GET |
/api/v1/assessment/assessments/{assessment_id} |
added the optional property data/anyOf[subschema #1: AssessmentPublic]/run_stats/items/prefilter_total_passed to the response with the 200 status |
| 🟢 | GET |
/api/v1/assessment/assessments/{assessment_id} |
added the optional property data/anyOf[subschema #1: AssessmentPublic]/run_stats/items/prefilter_total_rejected to the response with the 200 status |
| 🟢 | GET |
/api/v1/assessment/assessments/{assessment_id} |
added the optional property data/anyOf[subschema #1: AssessmentPublic]/run_stats/items/prefilter_total_rows to the response with the 200 status |
| 🟢 | GET |
/api/v1/assessment/assessments/{assessment_id} |
added the optional property data/anyOf[subschema #1: AssessmentPublic]/run_stats/items/stage to the response with the 200 status |
| 🟢 | GET |
/api/v1/assessment/assessments/{assessment_id} |
added the optional property data/anyOf[subschema #1: AssessmentPublic]/run_stats/items/stage_status to the response with the 200 status |
| 🟢 | GET |
/api/v1/assessment/runs |
added the optional property data/anyOf[subschema #1]/items/pipeline to the response with the 200 status |
| 🟢 | GET |
/api/v1/assessment/runs |
added the optional property data/anyOf[subschema #1]/items/post_processing_config to the response with the 200 status |
| 🟢 | GET |
/api/v1/assessment/runs |
added the optional property data/anyOf[subschema #1]/items/prefilter_total_passed to the response with the 200 status |
| 🟢 | GET |
/api/v1/assessment/runs |
added the optional property data/anyOf[subschema #1]/items/prefilter_total_rejected to the response with the 200 status |
| 🟢 | GET |
/api/v1/assessment/runs |
added the optional property data/anyOf[subschema #1]/items/prefilter_total_rows to the response with the 200 status |
| 🟢 | GET |
/api/v1/assessment/runs |
added the optional property data/anyOf[subschema #1]/items/stage to the response with the 200 status |
| 🟢 | GET |
/api/v1/assessment/runs |
added the optional property data/anyOf[subschema #1]/items/stage_status to the response with the 200 status |
| 🟢 | POST |
/api/v1/assessment/runs |
added the new optional request property attachments/items/type_column |
| 🟢 | POST |
/api/v1/assessment/runs |
added the new optional request property attachments/items/type_value_map |
| 🟢 | POST |
/api/v1/assessment/runs |
added the new optional request property post_processing_config |
| 🟢 | POST |
/api/v1/assessment/runs |
added the new optional request property prefilter_config |
| 🟢 | POST |
/api/v1/assessment/runs |
added the new mixed enum value to the request property attachments/items/type |
| 🟢 | GET |
/api/v1/assessment/runs/{run_id} |
added the optional property data/anyOf[subschema #1: AssessmentRunPublic]/pipeline to the response with the 200 status |
| 🟢 | GET |
/api/v1/assessment/runs/{run_id} |
added the optional property data/anyOf[subschema #1: AssessmentRunPublic]/post_processing_config to the response with the 200 status |
| 🟢 | GET |
/api/v1/assessment/runs/{run_id} |
added the optional property data/anyOf[subschema #1: AssessmentRunPublic]/prefilter_total_passed to the response with the 200 status |
| 🟢 | GET |
/api/v1/assessment/runs/{run_id} |
added the optional property data/anyOf[subschema #1: AssessmentRunPublic]/prefilter_total_rejected to the response with the 200 status |
| 🟢 | GET |
/api/v1/assessment/runs/{run_id} |
added the optional property data/anyOf[subschema #1: AssessmentRunPublic]/prefilter_total_rows to the response with the 200 status |
| 🟢 | GET |
/api/v1/assessment/runs/{run_id} |
added the optional property data/anyOf[subschema #1: AssessmentRunPublic]/stage to the response with the 200 status |
| 🟢 | GET |
/api/v1/assessment/runs/{run_id} |
added the optional property data/anyOf[subschema #1: AssessmentRunPublic]/stage_status to the response with the 200 status |
| 🟢 | PATCH |
/api/v1/assessment/runs/{run_id}/post-processing |
endpoint added |
| 🟢 | POST |
/api/v1/assessment/runs/{run_id}/resume |
endpoint added |
main ↔ 11b5d0f0 · generated by oasdiff
There was a problem hiding this comment.
Actionable comments posted: 9
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
backend/app/services/assessment/utils/export.py (1)
333-338:⚠️ Potential issue | 🟠 Major | ⚡ Quick winKeep the base export columns in XLSX output.
fieldnamesalready contains the stable metadata columns, but the XLSX branch rebuildsexcel_fieldsfrom only input/prefilter/output/computed columns. That dropsrow_id,result_status,error, token counts, and run/config metadata from spreadsheets while CSV/JSON still include them.Proposed fix
- # Explicit ordering: inputs → prefilter → L2 → computed columns - excel_fields = input_col_names + prefilter_keys + output_keys + computed_names - if not excel_fields: - excel_fields = output_keys or ["output"] + # Preserve the standard export columns, then append any computed columns. + excel_fields = fieldnames + [ + name for name in computed_names if name not in fieldnames + ]🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/assessment/utils/export.py` around lines 333 - 338, The XLSX branch currently rebuilds excel_fields from only input_col_names + prefilter_keys + output_keys + computed_names which omits the stable base metadata in fieldnames (row_id, result_status, error, token counts, run/config metadata); change the construction of excel_fields to start from the existing fieldnames and then append the other groups while preserving order and deduplicating (e.g., excel_fields = fieldnames + input_col_names + prefilter_keys + output_keys + computed_names with a stable dedupe), then keep the existing fallback and call to _drop_empty_columns(expanded, excel_fields); update references to excel_fields, input_col_names, prefilter_keys, output_keys, computed_names, fieldnames, and _drop_empty_columns accordingly.backend/app/services/assessment/service.py (1)
197-202:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftBroker publish failures can leave runs stuck with no worker owning them.
Both call sites persist state before
run_assessment_pipeline.delay(...)is known to have succeeded. If the broker publish fails,start_assessmentcan return after partially creating/dispatched runs, andresume_assessment_runleaves the run committed asprocessing/PENDINGeven though no task was enqueued. The worker-side failure guard cannot recover this path because it only runs after task startup.Also applies to: 325-343
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/assessment/service.py` around lines 197 - 202, The code persists run state before calling run_assessment_pipeline.delay(...) in start_assessment and resume_assessment_run, which can leave runs marked processing/PENDING if the broker publish fails; change the flow so the broker publish is confirmed before committing the run state (either publish inside the DB transaction using an outbox pattern or delay committing status changes until after run_assessment_pipeline.delay(...) returns successfully), or catch publish errors and roll back or update the run record (e.g., set run.status back to CREATED/FAILED) if delay() raises; specifically update the logic around run_assessment_pipeline.delay, start_assessment, and resume_assessment_run to ensure atomicity between DB state and task enqueueing.
🧹 Nitpick comments (4)
backend/app/tests/assessment/test_topic_relevance.py (1)
14-15: ⚡ Quick winAdd a return type to
_gemini.This helper is missing a return annotation, which violates the repository's Python typing rule. A loose annotation is fine here if you want to avoid importing the private
_patchtype.Suggested change
+from typing import Any from unittest.mock import patch @@ -def _gemini(): +def _gemini() -> Any: return patch.object(settings, "ASSESSMENT_PREFILTER_PROVIDER", "google")As per coding guidelines, "
**/*.py: Always add type hints to all function parameters and return values in Python code".🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/tests/assessment/test_topic_relevance.py` around lines 14 - 15, The helper function _gemini lacks a return type annotation; add a return annotation (e.g., -> Any) to its signature and import Any from typing to satisfy the repo typing rule. Update the function def _gemini() to def _gemini() -> Any and ensure the existing return patch.object(settings, "ASSESSMENT_PREFILTER_PROVIDER", "google") remains unchanged so callers still get the patch object.backend/app/tests/assessment/test_export.py (1)
533-547: ⚡ Quick winAdd type hints to
_patches.This new helper is missing parameter and return annotations, so it falls short of the repo's Python typing rule and makes the shared patch setup harder to read and check.
Proposed fix
+from typing import Any + - def _patches(self, *, l2, prefilter=None, dataset_rows=None): + def _patches( + self, + *, + l2: dict[str, dict[str, Any]], + prefilter: dict[str, dict[str, Any]] | None = None, + dataset_rows: list[dict[str, str]] | None = None, + ) -> list[Any]:As per coding guidelines,
**/*.py: Always add type hints to all function parameters and return values in Python code.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/tests/assessment/test_export.py` around lines 533 - 547, The helper function _patches is missing type annotations; update its signature to include parameter and return types (e.g. def _patches(*, l2: Dict[str, Any], prefilter: Optional[Dict[str, Any]] = None, dataset_rows: Optional[List[Any]] = None) -> List[Any]:) and add the necessary typing imports (from typing import Any, Dict, List, Optional) at the top of the file so callers and linters understand the expected shapes and the function returns a list of patch objects.backend/app/tests/assessment/test_processing.py (1)
256-267: ⚡ Quick winAdd return annotations to the local test builders.
_parent,_run, and_jobare new helper factories in this file, but they still omit return annotations, and_jobalso leaveskwuntyped. That makes this changed test module inconsistent with the repo’s required Python typing baseline.As per coding guidelines, "Always add type hints to all function parameters and return values in Python code" and "Use Python 3.11+ with type hints throughout the codebase."
Also applies to: 373-376
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/tests/assessment/test_processing.py` around lines 256 - 267, The helper factory functions _parent, _run, and _job lack return type annotations (and _job’s kw parameter is untyped); update each definition to include proper type hints — e.g., annotate _parent() and _run() with their return types (SimpleNamespace or a more specific TypedDict/NamedTuple if available) and give _job(...) a typed signature for kw (Mapping[str, Any] or **kwargs: Any depending on usage) so the test module conforms to the repository’s Python 3.11 typing baseline; update both occurrences (lines around _parent/_run and the other instance at 373–376) to use these annotations.backend/app/crud/assessment/processing.py (1)
250-250: ⚡ Quick winAnnotate
batch_jobin_poll_stage_outcome.Line 250 introduces an untyped parameter on the batch-polling helper, which breaks the repository’s Python typing rule on one of the core orchestration paths.
BatchJoblooks like the intended type here.As per coding guidelines, "Always add type hints to all function parameters and return values in Python code" and "Use Python 3.11+ with type hints throughout the codebase."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/crud/assessment/processing.py` at line 250, The function _poll_stage_outcome has an untyped parameter batch_job; annotate it as BatchJob (i.e., def _poll_stage_outcome(session: Session, provider: BatchProvider, batch_job: BatchJob) -> str), add the necessary import for BatchJob at the top of the file (from the module that defines BatchJob), and run type checks to ensure call sites accept the new annotation; keep the existing return type str unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@backend/app/celery/tasks/job_execution.py`:
- Around line 237-244: The function run_assessment_pipeline currently accepts
but silently drops **kwargs and lacks type annotations; update its signature to
include proper type hints (e.g., run_id: int, organization_id: int, project_id:
int, trace_id: str, **kwargs: Any) and add a return type (likely -> None). Then
either forward **kwargs into the downstream pipeline invocation inside
run_assessment_pipeline (so extra task args are preserved) or explicitly
validate/raise (TypeError) for unexpected keys if they must not be accepted;
ensure you import Any from typing and apply the change where
run_assessment_pipeline is defined.
In `@backend/app/crud/assessment/cron.py`:
- Around line 118-127: The current catch-all in
poll_all_pending_assessment_evaluations treats every exception from
process_run_batches() as transient and retries; instead, detect and propagate
deterministic failures coming from process_run_batches (e.g., ValueError for
missing-parent) rather than retrying forever: update the except block to inspect
the exception type or sentinel from process_run_batches, handle known transient
exceptions by rolling back and incrementing still_processing, but for
deterministic errors (e.g., ValueError) mark the run as failed or re-raise so
the failure state is recorded (use the existing failure-recording helper used
elsewhere in processing.py) and do not leave runs stuck in PROCESSING; refer to
process_run_batches, poll_all_pending_assessment_evaluations, run.id and
run.assessment_id to locate and implement the conditional handling.
In `@backend/app/crud/assessment/processing.py`:
- Around line 358-369: The current flow calls advance_or_finalize(run) and
commits the updated run before invoking run_assessment_pipeline.delay, which can
leave the run stuck in stage=<next>, stage_status=PENDING if the broker call
fails; change the flow to dispatch the task first and only persist the
stage/state transition after a successful enqueue: call
run_assessment_pipeline.delay(...) (catching and handling exceptions), and on
success call advance_or_finalize(run), session.add(run), session.commit(), then
recompute_assessment_status(session=session, assessment_id=run.assessment_id);
ensure you reference the run object’s stage and stage_status fields and keep
run_assessment_pipeline.delay wrapped so failures prevent committing the new
stage.
In `@backend/app/models/assessment.py`:
- Around line 338-358: Add a Pydantic validation that rejects incomplete or
invalid "mixed" attachment configs: change type_value_map's annotation to
dict[str, Literal["image","pdf"]] | None and add a validator or root_validator
(e.g., validate_mixed_config) that, when self.type == "mixed", requires
type_column is not None and type_value_map is not None and that every value in
type_value_map is either "image" or "pdf"; raise a ValidationError with a clear
message if any check fails so malformed mixed configs are rejected at schema
boundary.
In `@backend/app/services/assessment/prefilter/topic_relevance.py`:
- Around line 100-106: The current parsing builds column_relevance with bool(v),
which converts non-empty strings like "false" to True; update the construction
of column_relevance (the dict built from data in the same block that reads
out.get("output") and sets decision) to accept only real booleans or explicit
"true"/"false" strings: iterate data.items() (excluding "decision" and
"reasoning") and set each value to True only when isinstance(v, bool) and v is
True, or when isinstance(v, str) and v.strip().lower() == "true" (treat "false"
as False); leave other types as False (or omit) so inverted/string values do not
become True unintentionally.
In `@backend/app/services/assessment/stages.py`:
- Around line 87-104: The config branch only handles base == "openai" but must
also handle the OpenAI-native provider; update the conditional in the stage
builder so that when settings.ASSESSMENT_PREFILTER_PROVIDER equals
LLMProvider.OPENAI_NATIVE (or the string "openai-native") it builds the
OpenAI-style config (endpoint, completion_window, description) instead of the
Gemini-style display_name/model; modify the conditional that creates config
(around the call to _get_batch_provider and before start_batch_job) to include
LLMProvider.OPENAI_NATIVE (or "openai-native") alongside "openai" so
OpenAIBatchProvider receives the correct payload for start_batch_job.
In `@backend/app/services/assessment/tasks.py`:
- Around line 95-101: The run is being advanced before the follow-up Celery task
is guaranteed to be queued, so if run_assessment_pipeline.delay(...) fails the
run stays advanced with no worker to process it; fix by ensuring dispatch
succeeds before committing the new stage: either move the call to _dispatch so
it runs (and returns successfully) before _persist_advance commits, or wrap
dispatch in transaction.on_commit (or catch exceptions from
run_assessment_pipeline.delay/apply_async and abort the commit) so that
_persist_advance only runs after the task has been successfully queued; apply
the same change to the other occurrence around run_assessment_pipeline.delay at
lines 266-274 (referencing _dispatch, _persist_advance, and
run_assessment_pipeline.delay/apply_async).
In `@backend/app/services/assessment/utils/attachments.py`:
- Around line 137-145: resolve_item_type currently coerces anything not
recognized to "image", causing unknown/ambiguous 'mixed' rows (when
attachment_type_for_row returns None) to be treated as images; change
resolve_item_type to return None for unresolved types instead of "image" by
updating its signature to return Optional[str] (str | None) and return None when
item_type not in ("image","pdf"); also update callers (e.g.,
attachment_type_for_row consumers) to handle a None result appropriately rather
than assuming "image".
In `@backend/app/services/assessment/utils/export.py`:
- Around line 619-632: The fallback export only iterates l2_by_row_id so rows
that were rejected at prefilter time (present only in prefilter_by_row_id) are
omitted; change the comprehension to iterate over the union of keys from
l2_by_row_id and prefilter_by_row_id (convert keys to strings consistently),
fetch l2_item as l2_by_row_id.get(row_id) and prefilter_item as
prefilter_by_row_id.get(row_id), and call _build_export_row with those values
(keeping the same arguments: run, assessment, dataset_name, row_id=str(row_id),
input_data=None, prefilter_item=..., l2_item=..., has_prefilter=has_prefilter)
so prefilter-only rows are included when the dataset cannot be reloaded.
---
Outside diff comments:
In `@backend/app/services/assessment/service.py`:
- Around line 197-202: The code persists run state before calling
run_assessment_pipeline.delay(...) in start_assessment and
resume_assessment_run, which can leave runs marked processing/PENDING if the
broker publish fails; change the flow so the broker publish is confirmed before
committing the run state (either publish inside the DB transaction using an
outbox pattern or delay committing status changes until after
run_assessment_pipeline.delay(...) returns successfully), or catch publish
errors and roll back or update the run record (e.g., set run.status back to
CREATED/FAILED) if delay() raises; specifically update the logic around
run_assessment_pipeline.delay, start_assessment, and resume_assessment_run to
ensure atomicity between DB state and task enqueueing.
In `@backend/app/services/assessment/utils/export.py`:
- Around line 333-338: The XLSX branch currently rebuilds excel_fields from only
input_col_names + prefilter_keys + output_keys + computed_names which omits the
stable base metadata in fieldnames (row_id, result_status, error, token counts,
run/config metadata); change the construction of excel_fields to start from the
existing fieldnames and then append the other groups while preserving order and
deduplicating (e.g., excel_fields = fieldnames + input_col_names +
prefilter_keys + output_keys + computed_names with a stable dedupe), then keep
the existing fallback and call to _drop_empty_columns(expanded, excel_fields);
update references to excel_fields, input_col_names, prefilter_keys, output_keys,
computed_names, fieldnames, and _drop_empty_columns accordingly.
---
Nitpick comments:
In `@backend/app/crud/assessment/processing.py`:
- Line 250: The function _poll_stage_outcome has an untyped parameter batch_job;
annotate it as BatchJob (i.e., def _poll_stage_outcome(session: Session,
provider: BatchProvider, batch_job: BatchJob) -> str), add the necessary import
for BatchJob at the top of the file (from the module that defines BatchJob), and
run type checks to ensure call sites accept the new annotation; keep the
existing return type str unchanged.
In `@backend/app/tests/assessment/test_export.py`:
- Around line 533-547: The helper function _patches is missing type annotations;
update its signature to include parameter and return types (e.g. def _patches(*,
l2: Dict[str, Any], prefilter: Optional[Dict[str, Any]] = None, dataset_rows:
Optional[List[Any]] = None) -> List[Any]:) and add the necessary typing imports
(from typing import Any, Dict, List, Optional) at the top of the file so callers
and linters understand the expected shapes and the function returns a list of
patch objects.
In `@backend/app/tests/assessment/test_processing.py`:
- Around line 256-267: The helper factory functions _parent, _run, and _job lack
return type annotations (and _job’s kw parameter is untyped); update each
definition to include proper type hints — e.g., annotate _parent() and _run()
with their return types (SimpleNamespace or a more specific TypedDict/NamedTuple
if available) and give _job(...) a typed signature for kw (Mapping[str, Any] or
**kwargs: Any depending on usage) so the test module conforms to the
repository’s Python 3.11 typing baseline; update both occurrences (lines around
_parent/_run and the other instance at 373–376) to use these annotations.
In `@backend/app/tests/assessment/test_topic_relevance.py`:
- Around line 14-15: The helper function _gemini lacks a return type annotation;
add a return annotation (e.g., -> Any) to its signature and import Any from
typing to satisfy the repo typing rule. Update the function def _gemini() to def
_gemini() -> Any and ensure the existing return patch.object(settings,
"ASSESSMENT_PREFILTER_PROVIDER", "google") remains unchanged so callers still
get the patch object.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: c1c9e50c-3b5d-4d7b-9b68-7ddcf07265d4
📒 Files selected for processing (31)
backend/app/alembic/versions/064_add_prefilter_columns_to_assessment_run.pybackend/app/api/docs/assessment/resume_run.mdbackend/app/api/routes/assessment/runs.pybackend/app/celery/tasks/job_execution.pybackend/app/core/config.pybackend/app/crud/assessment/batch.pybackend/app/crud/assessment/core.pybackend/app/crud/assessment/cron.pybackend/app/crud/assessment/processing.pybackend/app/models/assessment.pybackend/app/services/assessment/prefilter/__init__.pybackend/app/services/assessment/prefilter/duplicate_detection.pybackend/app/services/assessment/prefilter/pipeline.pybackend/app/services/assessment/prefilter/request_builder.pybackend/app/services/assessment/prefilter/topic_relevance.pybackend/app/services/assessment/service.pybackend/app/services/assessment/stages.pybackend/app/services/assessment/tasks.pybackend/app/services/assessment/utils/attachments.pybackend/app/services/assessment/utils/export.pybackend/app/tests/assessment/test_batch.pybackend/app/tests/assessment/test_cron.pybackend/app/tests/assessment/test_crud.pybackend/app/tests/assessment/test_duplicate_detection.pybackend/app/tests/assessment/test_export.pybackend/app/tests/assessment/test_pipeline.pybackend/app/tests/assessment/test_prefilter_batching.pybackend/app/tests/assessment/test_processing.pybackend/app/tests/assessment/test_service.pybackend/app/tests/assessment/test_tasks_failure_guard.pybackend/app/tests/assessment/test_topic_relevance.py
✅ Files skipped from review due to trivial changes (1)
- backend/app/api/docs/assessment/resume_run.md
🚧 Files skipped from review as they are similar to previous changes (2)
- backend/app/tests/assessment/test_crud.py
- backend/app/crud/assessment/core.py
| def run_assessment_pipeline( | ||
| self, | ||
| run_id: int, | ||
| organization_id: int, | ||
| project_id: int, | ||
| trace_id: str, | ||
| **kwargs, | ||
| ): |
There was a problem hiding this comment.
Don't silently swallow extra task kwargs here.
run_assessment_pipeline never forwards **kwargs, so unexpected task arguments are accepted and dropped instead of failing fast. This segment also misses the required type annotations on the new function signature.
Suggested fix
def run_assessment_pipeline(
- self,
+ self: celery.Task,
run_id: int,
organization_id: int,
project_id: int,
trace_id: str,
- **kwargs,
-):
+) -> None:As per coding guidelines, **/*.py: Always add type hints to all function parameters and return values in Python code.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def run_assessment_pipeline( | |
| self, | |
| run_id: int, | |
| organization_id: int, | |
| project_id: int, | |
| trace_id: str, | |
| **kwargs, | |
| ): | |
| def run_assessment_pipeline( | |
| self: celery.Task, | |
| run_id: int, | |
| organization_id: int, | |
| project_id: int, | |
| trace_id: str, | |
| ) -> None: |
🧰 Tools
🪛 Ruff (0.15.15)
[warning] 243-243: Unused function argument: kwargs
(ARG001)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@backend/app/celery/tasks/job_execution.py` around lines 237 - 244, The
function run_assessment_pipeline currently accepts but silently drops **kwargs
and lacks type annotations; update its signature to include proper type hints
(e.g., run_id: int, organization_id: int, project_id: int, trace_id: str,
**kwargs: Any) and add a return type (likely -> None). Then either forward
**kwargs into the downstream pipeline invocation inside run_assessment_pipeline
(so extra task args are preserved) or explicitly validate/raise (TypeError) for
unexpected keys if they must not be accepted; ensure you import Any from typing
and apply the change where run_assessment_pipeline is defined.
| data = json.loads(out.get("output") or "") | ||
| decision = str(data.get("decision", "ACCEPT")).upper() | ||
| column_relevance = { | ||
| k: bool(v) | ||
| for k, v in data.items() | ||
| if k not in ("decision", "reasoning") | ||
| } |
There was a problem hiding this comment.
Parse only real booleans from model output.
bool(v) turns "false", "0", and any other non-empty string into True. If the provider returns a schema-shaped but loosely typed payload, column_relevance will be inverted in the exported topic-relevance data. Accept only bool values here, or explicitly parse "true" / "false" strings.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@backend/app/services/assessment/prefilter/topic_relevance.py` around lines
100 - 106, The current parsing builds column_relevance with bool(v), which
converts non-empty strings like "false" to True; update the construction of
column_relevance (the dict built from data in the same block that reads
out.get("output") and sets decision) to accept only real booleans or explicit
"true"/"false" strings: iterate data.items() (excluding "decision" and
"reasoning") and set each value to True only when isinstance(v, bool) and v is
True, or when isinstance(v, str) and v.strip().lower() == "true" (treat "false"
as False); leave other types as False (or omit) so inverted/string values do not
become True unintentionally.
| def _dispatch(run_id: int, organization_id: int, project_id: int) -> None: | ||
| run_assessment_pipeline.delay( | ||
| run_id=run_id, | ||
| organization_id=organization_id, | ||
| project_id=project_id, | ||
| trace_id=correlation_id.get() or "", | ||
| ) |
There was a problem hiding this comment.
Advance the stage only after the follow-up task is successfully queued.
_persist_advance() commits the new stage and then calls _dispatch(). If run_assessment_pipeline.delay(...) fails, the run is left advanced to the next stage with no task scheduled to process it, and the new failure guard never runs because the worker was never started.
Also applies to: 266-274
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@backend/app/services/assessment/tasks.py` around lines 95 - 101, The run is
being advanced before the follow-up Celery task is guaranteed to be queued, so
if run_assessment_pipeline.delay(...) fails the run stays advanced with no
worker to process it; fix by ensuring dispatch succeeds before committing the
new stage: either move the call to _dispatch so it runs (and returns
successfully) before _persist_advance commits, or wrap dispatch in
transaction.on_commit (or catch exceptions from
run_assessment_pipeline.delay/apply_async and abort the commit) so that
_persist_advance only runs after the task has been successfully queued; apply
the same change to the other occurrence around run_assessment_pipeline.delay at
lines 266-274 (referencing _dispatch, _persist_advance, and
run_assessment_pipeline.delay/apply_async).
…e acceptance logic
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
backend/app/services/assessment/prefilter/topic_relevance.py (1)
64-67:⚠️ Potential issue | 🟡 Minor | ⚡ Quick win
cell.strip()can raise onNoneattachment cells.
row.get(att.column, "")yieldsNonewhen the key exists with aNonevalue (blank Excel cell), socell.strip()raisesAttributeError._record_textalready guards withor ''; mirror that here.🛡️ Proposed fix
- cell = row.get(att.column, "") + cell = row.get(att.column) or "" if not cell.strip():🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/assessment/prefilter/topic_relevance.py` around lines 64 - 67, The loop over attachments uses cell = row.get(att.column, "") and then calls cell.strip(), which can raise AttributeError if the dict key exists with value None; change the guard to coerce None to empty string before stripping (e.g., use row.get(att.column) or '' or otherwise set cell = (row.get(att.column) or '') ) so cell.strip() is always safe; update the loop in topic_relevance.py where attachments, att.column and cell.strip() are used to mirror the _record_text guard.backend/app/services/assessment/prefilter/duplicate_detection.py (1)
54-58:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winGuard against
Nonecell values.
row.get(col, "")returnsNonewhen the column key exists with aNonevalue (common for blank Excel cells), so.strip()raisesAttributeError. Use(row.get(col) or "")to stay consistent with the defensive pattern intopic_relevance._record_text.🛡️ Proposed fix
- parts = [ - f"{col}:\n{row.get(col, '')}" for col in columns if row.get(col, "").strip() - ] + parts = [ + f"{col}:\n{row.get(col) or ''}" + for col in columns + if (row.get(col) or "").strip() + ]🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/services/assessment/prefilter/duplicate_detection.py` around lines 54 - 58, The _combined_text function can raise AttributeError because row.get(col, "") may return None for existing keys with None values; update the comprehension to use (row.get(col) or "") everywhere (e.g., f"{col}:\n{row.get(col) or ''}" and if (row.get(col) or "").strip()) so .strip() never runs on None—match the defensive pattern used in topic_relevance._record_text.
🧹 Nitpick comments (1)
backend/app/tests/assessment/test_topic_relevance.py (1)
14-15: 💤 Low valueAdd a return type hint to
_gemini().As per coding guidelines ("Always add type hints to all function parameters and return values in Python code"), annotate the helper's return type.
✏️ Proposed fix
-def _gemini(): +def _gemini() -> Any: return patch.object(constants, "ASSESSMENT_PREFILTER_PROVIDER", "google")🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@backend/app/tests/assessment/test_topic_relevance.py` around lines 14 - 15, Add a return type hint to the helper `_gemini()` by annotating its signature to return the patch object type from unittest.mock (e.g., change `def _gemini():` to `def _gemini() -> unittest.mock._patch:`) and ensure `unittest.mock` is imported in the test module so the annotation resolves; keep the function body (`return patch.object(constants, "ASSESSMENT_PREFILTER_PROVIDER", "google")`) unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@backend/app/services/assessment/prefilter/duplicate_detection.py`:
- Around line 54-58: The _combined_text function can raise AttributeError
because row.get(col, "") may return None for existing keys with None values;
update the comprehension to use (row.get(col) or "") everywhere (e.g.,
f"{col}:\n{row.get(col) or ''}" and if (row.get(col) or "").strip()) so .strip()
never runs on None—match the defensive pattern used in
topic_relevance._record_text.
In `@backend/app/services/assessment/prefilter/topic_relevance.py`:
- Around line 64-67: The loop over attachments uses cell = row.get(att.column,
"") and then calls cell.strip(), which can raise AttributeError if the dict key
exists with value None; change the guard to coerce None to empty string before
stripping (e.g., use row.get(att.column) or '' or otherwise set cell =
(row.get(att.column) or '') ) so cell.strip() is always safe; update the loop in
topic_relevance.py where attachments, att.column and cell.strip() are used to
mirror the _record_text guard.
---
Nitpick comments:
In `@backend/app/tests/assessment/test_topic_relevance.py`:
- Around line 14-15: Add a return type hint to the helper `_gemini()` by
annotating its signature to return the patch object type from unittest.mock
(e.g., change `def _gemini():` to `def _gemini() -> unittest.mock._patch:`) and
ensure `unittest.mock` is imported in the test module so the annotation
resolves; keep the function body (`return patch.object(constants,
"ASSESSMENT_PREFILTER_PROVIDER", "google")`) unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 418ba255-ee89-44b8-8fa5-96885a78d03e
📒 Files selected for processing (12)
backend/app/crud/assessment/processing.pybackend/app/services/assessment/prefilter/constants.pybackend/app/services/assessment/prefilter/duplicate_detection.pybackend/app/services/assessment/prefilter/request_builder.pybackend/app/services/assessment/prefilter/topic_relevance.pybackend/app/services/assessment/stages.pybackend/app/services/assessment/tasks.pybackend/app/services/assessment/utils/attachments.pybackend/app/tests/assessment/test_batch.pybackend/app/tests/assessment/test_prefilter_batching.pybackend/app/tests/assessment/test_processing.pybackend/app/tests/assessment/test_topic_relevance.py
✅ Files skipped from review due to trivial changes (1)
- backend/app/services/assessment/prefilter/constants.py
🚧 Files skipped from review as they are similar to previous changes (5)
- backend/app/tests/assessment/test_processing.py
- backend/app/services/assessment/prefilter/request_builder.py
- backend/app/crud/assessment/processing.py
- backend/app/services/assessment/tasks.py
- backend/app/services/assessment/stages.py
…hment type validation
…ons in assessment prefilter constants
| ASSESSMENT_PREFILTER_MODEL: str = "gpt-5-mini" | ||
|
|
||
| # File-search/vector store holding the corpus for duplicate detection. | ||
| ASSESSMENT_PREFILTER_DUPLICATE_STORE: str = "vs_6a20339fbc148191867fd06d29133278" |
There was a problem hiding this comment.
@kartpop This provider will be used for the pre-filter (L1) stage. I've also added the OpenAI vector store knowledge base that will be used for duplicate detection.
|
|
||
| Handles MIME type detection, base64 decoding, Google Drive URL normalization, | ||
| data-URL parsing, and conversion of dataset cell values into provider input objects. | ||
| URL-only: dataset cells hold attachment URLs. Handles Google Drive URL |
There was a problem hiding this comment.
@kartpop using base64 inline data becoming expensive in celery task while building jsonl file which leads to sigkill or timeout issues in celery or sometime OOM issues as well. need to think about some better approach for handling base64 or else completely ditch this base64 approach and keep url only
| @@ -60,18 +63,6 @@ def to_direct_attachment_url(url: str, attachment_type: str) -> str: | |||
| return f"https://drive.google.com/uc?export=download&id={file_id}" | |||
There was a problem hiding this comment.
Google Drive is inquilab specific , in future we don't need to handle Google Drive specific case for attachements
|
|
||
| Returns 'image'/'pdf', or None to let normal detection (extension/declared) decide. | ||
| """ | ||
| type_column = getattr(att, "type_column", None) |
There was a problem hiding this comment.
@kartpop This function decides whether a row's file is an image or a PDF for "Mixed" attachment columns. On the UI, the user picks a "Type column" (e.g. "DOC type") and lists which values mean Image vs PDF. For each row, the function reads that column's value and matches it to image or PDF. If the value is empty, unknown, or maps to both, it returns nothing and lets normal file-extension detection decide.
Target issue is #904
Summary
What changed
PATCH /runs/{id}/post-processing.Checklist
Before submitting a pull request, please ensure that you mark these task.
fastapi run --reload app/main.pyordocker compose upin the repository root and test.