Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ describe('GET /api/cron/dispatch-pending-code-reviews', () => {
ownersSkippedMissingBotUsers: 1,
coordinatorFailures: 0,
reviewsDispatched: 5,
staleReviewsCancelled: 2,
staleAttemptsCancelled: 1,
hasMoreCandidateOwners: true,
});

Expand All @@ -72,6 +74,8 @@ describe('GET /api/cron/dispatch-pending-code-reviews', () => {
ownersSkippedMissingBotUsers: 1,
coordinatorFailures: 0,
reviewsDispatched: 5,
staleReviewsCancelled: 2,
staleAttemptsCancelled: 1,
hasMoreCandidateOwners: true,
},
timestamp: expect.any(String),
Expand Down
70 changes: 70 additions & 0 deletions apps/web/src/lib/code-reviews/db/code-reviews.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
MAX_CONCURRENT_CODE_REVIEWS_PER_ORG,
staleQueuedCodeReviewCutoffSql,
staleRunningCodeReviewCutoffSql,
cronStaleCodeReviewExpiryCutoffSql,
type PendingCodeReviewCreatedAtWindow,
} from '../dispatch/dispatch-constants';

Expand Down Expand Up @@ -115,6 +116,14 @@ export type CancelledReviewRow = {
platformIntegrationId: string | null;
};

export const DISPATCH_EXPIRED_CODE_REVIEW_ERROR_MESSAGE =
'Code review expired before dispatch. Please retry manually.';

export type ExpiredCodeReviewCancellationSummary = {
reviewsCancelled: number;
attemptsCancelled: number;
};

const RETRYABLE_PARENT_REVIEW_STATUSES = ['queued', 'running'];

function canCreateInfraRetryAttempt(review: { status: string; terminal_reason: string | null }) {
Expand Down Expand Up @@ -1453,6 +1462,67 @@ export async function cancelSupersededReviewsForPR(
}
}

export async function cancelExpiredPendingAndRunningCodeReviews(): Promise<ExpiredCodeReviewCancellationSummary> {
const expiryCutoff = cronStaleCodeReviewExpiryCutoffSql();

try {
const result = await db.execute<{
reviews_cancelled: number;
attempts_cancelled: number;
}>(sql`
WITH cancelled_reviews AS (
UPDATE ${cloud_agent_code_reviews} AS reviews
SET
status = 'cancelled',
terminal_reason = 'dispatch_expired',
error_message = ${DISPATCH_EXPIRED_CODE_REVIEW_ERROR_MESSAGE},
completed_at = now(),
updated_at = now()
WHERE reviews.status IN ('pending', 'running')
AND reviews.created_at < ${expiryCutoff}
AND NOT EXISTS (
SELECT 1
FROM ${cloud_agent_code_review_attempts} AS fresh_attempts
WHERE fresh_attempts.code_review_id = reviews.id
AND fresh_attempts.status IN ('pending', 'queued', 'running')
AND COALESCE(
fresh_attempts.started_at,
fresh_attempts.updated_at,
fresh_attempts.created_at
) >= ${expiryCutoff}
)
RETURNING reviews.id
), cancelled_attempts AS (
UPDATE ${cloud_agent_code_review_attempts} AS attempts
SET
status = 'cancelled',
terminal_reason = 'dispatch_expired',
error_message = ${DISPATCH_EXPIRED_CODE_REVIEW_ERROR_MESSAGE},
completed_at = now(),
updated_at = now()
FROM cancelled_reviews
WHERE attempts.code_review_id = cancelled_reviews.id
AND attempts.status IN ('pending', 'queued', 'running')
RETURNING attempts.id
)
SELECT
(SELECT COUNT(*) FROM cancelled_reviews)::int AS reviews_cancelled,
(SELECT COUNT(*) FROM cancelled_attempts)::int AS attempts_cancelled
`);

const row = result.rows[0];
return {
reviewsCancelled: Number(row?.reviews_cancelled ?? 0),
attemptsCancelled: Number(row?.attempts_cancelled ?? 0),
};
} catch (error) {
captureException(error, {
tags: { operation: 'cancelExpiredPendingAndRunningCodeReviews' },
});
throw error;
}
}

export type ReviewContinuationScope =
| { platform: 'github' }
| { platform: 'gitlab'; integrationId: string; projectId: number };
Expand Down
5 changes: 5 additions & 0 deletions apps/web/src/lib/code-reviews/dispatch/dispatch-constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export const STALE_QUEUED_CODE_REVIEW_MINUTES = 5;
export const STALE_RUNNING_CODE_REVIEW_MINUTES = 90;
export const CRON_PENDING_CODE_REVIEW_MIN_AGE_MINUTES = 60;
export const CRON_PENDING_CODE_REVIEW_MAX_AGE_MINUTES = 75;
export const CRON_STALE_CODE_REVIEW_EXPIRY_MINUTES = CRON_PENDING_CODE_REVIEW_MAX_AGE_MINUTES;

export type PendingCodeReviewCreatedAtWindow = {
createdAtAfter: SQL;
Expand All @@ -31,6 +32,10 @@ export function cronPendingCodeReviewCreatedAtWindowSql(): PendingCodeReviewCrea
};
}

export function cronStaleCodeReviewExpiryCutoffSql() {
return sql`now() - interval '${sql.raw(String(CRON_STALE_CODE_REVIEW_EXPIRY_MINUTES))} minutes'`;
}

export function reconsiderableCodeReviewWorkCondition(
staleQueuedCutoff = staleQueuedCodeReviewCutoffSql(),
pendingCreatedAtWindow?: PendingCodeReviewCreatedAtWindow
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ jest.mock('@sentry/nextjs', () => ({
import { db } from '@/lib/drizzle';
import { insertTestUser } from '@/tests/helpers/user.helper';
import {
cloud_agent_code_review_attempts,
cloud_agent_code_reviews,
kilocode_users,
organizations,
type User,
} from '@kilocode/db/schema';
import { eq } from 'drizzle-orm';
import { listDispatchableCodeReviewOwnerCandidates } from '../db/code-reviews';
import {
DISPATCH_EXPIRED_CODE_REVIEW_ERROR_MESSAGE,
listDispatchableCodeReviewOwnerCandidates,
} from '../db/code-reviews';
import { cronPendingCodeReviewCreatedAtWindowSql } from './dispatch-constants';
import { dispatchPendingCodeReviewOwners } from './dispatch-pending-code-review-owners';

Expand Down Expand Up @@ -105,6 +109,25 @@ describe('dispatch pending code review owners', () => {
};
}

async function getReview(reviewId: string) {
const review = await db.query.cloud_agent_code_reviews.findFirst({
where: eq(cloud_agent_code_reviews.id, reviewId),
});

if (!review) {
throw new Error(`Expected review ${reviewId} to exist`);
}

return review;
}

async function listAttempts(reviewId: string) {
return await db
.select()
.from(cloud_agent_code_review_attempts)
.where(eq(cloud_agent_code_review_attempts.code_review_id, reviewId));
}

it('discovers unique eligible owners oldest-first with truncation and capacity prefiltering', async () => {
const oldestBlockedTimestamp = minutesAgo(40);
const oldestEligibleTimestamp = minutesAgo(30);
Expand Down Expand Up @@ -250,6 +273,118 @@ describe('dispatch pending code review owners', () => {
ownersSkippedMissingBotUsers: 0,
coordinatorFailures: 0,
reviewsDispatched: 1,
staleReviewsCancelled: 1,
staleAttemptsCancelled: 0,
hasMoreCandidateOwners: false,
});
expect(mockTryDispatchPendingReviews).toHaveBeenCalledTimes(1);
expect(mockTryDispatchPendingReviews).toHaveBeenCalledWith(
{
type: 'user',
id: secondUser.id,
userId: secondUser.id,
},
expect.objectContaining({ pendingCreatedAtWindow: expect.anything() })
);
});

it('cancels expired pending and running reviews before draining cron-window owners', async () => {
const expiredTimestamp = minutesAgo(76);
const oldParentTimestamp = minutesAgo(120);
const freshAttemptTimestamp = minutesAgo(10);
const dispatchWindowTimestamp = minutesAgo(65);

const [stalePendingReview, staleRunningReview, retryParentReview, dispatchWindowReview] =
await db
.insert(cloud_agent_code_reviews)
.values([
reviewValues({
owner: { type: 'user', id: firstUser.id },
status: 'pending',
createdAt: expiredTimestamp,
updatedAt: expiredTimestamp,
}),
reviewValues({
owner: { type: 'user', id: firstUser.id },
status: 'running',
createdAt: expiredTimestamp,
updatedAt: expiredTimestamp,
startedAt: expiredTimestamp,
}),
reviewValues({
owner: { type: 'user', id: firstUser.id },
status: 'running',
createdAt: oldParentTimestamp,
updatedAt: oldParentTimestamp,
startedAt: oldParentTimestamp,
}),
reviewValues({
owner: { type: 'user', id: secondUser.id },
status: 'pending',
createdAt: dispatchWindowTimestamp,
updatedAt: dispatchWindowTimestamp,
}),
])
.returning({ id: cloud_agent_code_reviews.id });

if (!stalePendingReview || !staleRunningReview || !retryParentReview || !dispatchWindowReview) {
throw new Error('Expected stale cancellation test reviews to be inserted');
}

await db.insert(cloud_agent_code_review_attempts).values({
code_review_id: staleRunningReview.id,
attempt_number: 1,
status: 'running',
started_at: expiredTimestamp,
created_at: expiredTimestamp,
updated_at: expiredTimestamp,
});

const [failedAttempt] = await db
.insert(cloud_agent_code_review_attempts)
.values({
code_review_id: retryParentReview.id,
attempt_number: 1,
status: 'failed',
terminal_reason: 'sandbox_error',
error_message: 'Sandbox failed before retry',
completed_at: expiredTimestamp,
created_at: expiredTimestamp,
updated_at: expiredTimestamp,
})
.returning({ id: cloud_agent_code_review_attempts.id });

if (!failedAttempt) {
throw new Error('Expected failed retry source attempt to be inserted');
}

await db.insert(cloud_agent_code_review_attempts).values({
code_review_id: retryParentReview.id,
attempt_number: 2,
retry_of_attempt_id: failedAttempt.id,
retry_reason: 'infra_failure',
status: 'pending',
created_at: freshAttemptTimestamp,
updated_at: freshAttemptTimestamp,
});

mockTryDispatchPendingReviews.mockResolvedValue({
dispatched: 1,
notDispatched: 0,
activeCount: 1,
});

const summary = await dispatchPendingCodeReviewOwners();

expect(summary).toEqual({
ownersConsidered: 1,
ownersProcessed: 1,
ownersWithNoNewDispatch: 0,
ownersSkippedMissingBotUsers: 0,
coordinatorFailures: 0,
reviewsDispatched: 1,
staleReviewsCancelled: 2,
staleAttemptsCancelled: 1,
hasMoreCandidateOwners: false,
});
expect(mockTryDispatchPendingReviews).toHaveBeenCalledTimes(1);
Expand All @@ -261,6 +396,47 @@ describe('dispatch pending code review owners', () => {
},
expect.objectContaining({ pendingCreatedAtWindow: expect.anything() })
);

const storedStalePendingReview = await getReview(stalePendingReview.id);
const storedStaleRunningReview = await getReview(staleRunningReview.id);
const storedRetryParentReview = await getReview(retryParentReview.id);
const storedDispatchWindowReview = await getReview(dispatchWindowReview.id);
const staleRunningAttempts = await listAttempts(staleRunningReview.id);
const retryParentAttempts = await listAttempts(retryParentReview.id);

expect(storedStalePendingReview).toEqual(
expect.objectContaining({
status: 'cancelled',
terminal_reason: 'dispatch_expired',
error_message: DISPATCH_EXPIRED_CODE_REVIEW_ERROR_MESSAGE,
})
);
expect(storedStalePendingReview.completed_at).toEqual(expect.any(String));
expect(storedStaleRunningReview).toEqual(
expect.objectContaining({
status: 'cancelled',
terminal_reason: 'dispatch_expired',
error_message: DISPATCH_EXPIRED_CODE_REVIEW_ERROR_MESSAGE,
})
);
expect(storedStaleRunningReview.completed_at).toEqual(expect.any(String));
expect(staleRunningAttempts).toEqual([
expect.objectContaining({
status: 'cancelled',
terminal_reason: 'dispatch_expired',
error_message: DISPATCH_EXPIRED_CODE_REVIEW_ERROR_MESSAGE,
completed_at: expect.any(String),
}),
]);
expect(storedRetryParentReview.status).toBe('running');
expect(storedRetryParentReview.terminal_reason).toBeNull();
expect(retryParentAttempts).toEqual(
expect.arrayContaining([
expect.objectContaining({ status: 'failed', terminal_reason: 'sandbox_error' }),
expect.objectContaining({ status: 'pending', retry_reason: 'infra_failure' }),
])
);
expect(storedDispatchWindowReview.status).toBe('pending');
});

it('summarizes dispatch, recovered bot owners, no-op owners, and isolated owner failures', async () => {
Expand Down Expand Up @@ -309,6 +485,8 @@ describe('dispatch pending code review owners', () => {
ownersSkippedMissingBotUsers: 0,
coordinatorFailures: 1,
reviewsDispatched: 2,
staleReviewsCancelled: 0,
staleAttemptsCancelled: 0,
hasMoreCandidateOwners: false,
});
expect(mockTryDispatchPendingReviews).toHaveBeenCalledTimes(4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import pLimit from 'p-limit';
import { captureException } from '@sentry/nextjs';
import { ensureBotUserForOrg } from '@/lib/bot-users/bot-user-service';
import {
cancelExpiredPendingAndRunningCodeReviews,
listDispatchableCodeReviewOwnerCandidates,
type DispatchableCodeReviewOwnerCandidate,
} from '../db/code-reviews';
Expand All @@ -23,6 +24,8 @@ export type DispatchPendingCodeReviewOwnersSummary = {
ownersSkippedMissingBotUsers: number;
coordinatorFailures: number;
reviewsDispatched: number;
staleReviewsCancelled: number;
staleAttemptsCancelled: number;
hasMoreCandidateOwners: boolean;
};

Expand Down Expand Up @@ -68,6 +71,7 @@ async function drainOwner(
}

export async function dispatchPendingCodeReviewOwners(): Promise<DispatchPendingCodeReviewOwnersSummary> {
const staleCancellationSummary = await cancelExpiredPendingAndRunningCodeReviews();
const pendingCreatedAtWindow = cronPendingCodeReviewCreatedAtWindowSql();
const candidates = await listDispatchableCodeReviewOwnerCandidates({
limit: OWNER_SCAN_LIMIT,
Expand All @@ -85,6 +89,8 @@ export async function dispatchPendingCodeReviewOwners(): Promise<DispatchPending
ownersSkippedMissingBotUsers: 0,
coordinatorFailures: 0,
reviewsDispatched: 0,
staleReviewsCancelled: staleCancellationSummary.reviewsCancelled,
staleAttemptsCancelled: staleCancellationSummary.attemptsCancelled,
hasMoreCandidateOwners: candidates.hasMore,
};

Expand Down
2 changes: 2 additions & 0 deletions packages/db/src/schema-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1414,6 +1414,7 @@ export const CODE_REVIEW_TERMINAL_REASONS = [
'selected_model_unavailable',
'user_cancelled',
'superseded',
'dispatch_expired',
'interrupted',
'timeout',
'upstream_error',
Expand Down Expand Up @@ -1442,6 +1443,7 @@ export const CODE_REVIEW_BENIGN_TERMINAL_REASONS = [
'selected_model_unavailable',
'user_cancelled',
'superseded',
'dispatch_expired',
] as const satisfies readonly CodeReviewTerminalReason[];

export type CodeReviewBenignTerminalReason = (typeof CODE_REVIEW_BENIGN_TERMINAL_REASONS)[number];
Expand Down
Loading