From e4881f63560aa452ac0f549fcb0a5031fc15561f Mon Sep 17 00:00:00 2001 From: Yusuf Mirza Altay Date: Thu, 12 Mar 2026 22:59:57 +0300 Subject: [PATCH 1/5] fix(cancel): report cancellation durability truthfully Return explicit durability results for execution cancellation so success only reflects persisted cancellation state instead of best-effort Redis availability. --- .../[executionId]/cancel/route.test.ts | 112 ++++++++++++++++++ .../executions/[executionId]/cancel/route.ts | 13 +- apps/sim/lib/execution/cancellation.test.ts | 48 ++++++++ apps/sim/lib/execution/cancellation.ts | 19 ++- 4 files changed, 182 insertions(+), 10 deletions(-) create mode 100644 apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.test.ts create mode 100644 apps/sim/lib/execution/cancellation.test.ts diff --git a/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.test.ts b/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.test.ts new file mode 100644 index 00000000000..5470a275390 --- /dev/null +++ b/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.test.ts @@ -0,0 +1,112 @@ +/** + * @vitest-environment node + */ + +import { NextRequest } from 'next/server' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const mockCheckHybridAuth = vi.fn() +const mockAuthorizeWorkflowByWorkspacePermission = vi.fn() +const mockMarkExecutionCancelled = vi.fn() + +vi.mock('@sim/logger', () => ({ + createLogger: () => ({ info: vi.fn(), warn: vi.fn(), error: vi.fn() }), +})) + +vi.mock('@/lib/auth/hybrid', () => ({ + checkHybridAuth: (...args: unknown[]) => mockCheckHybridAuth(...args), +})) + +vi.mock('@/lib/execution/cancellation', () => ({ + markExecutionCancelled: (...args: unknown[]) => mockMarkExecutionCancelled(...args), +})) + +vi.mock('@/lib/workflows/utils', () => ({ + authorizeWorkflowByWorkspacePermission: (params: unknown) => + mockAuthorizeWorkflowByWorkspacePermission(params), +})) + +import { POST } from './route' + +describe('POST /api/workflows/[id]/executions/[executionId]/cancel', () => { + beforeEach(() => { + vi.clearAllMocks() + mockCheckHybridAuth.mockResolvedValue({ success: true, userId: 'user-1' }) + mockAuthorizeWorkflowByWorkspacePermission.mockResolvedValue({ allowed: true }) + }) + + it('returns success when cancellation was durably recorded', async () => { + mockMarkExecutionCancelled.mockResolvedValue({ + durablyRecorded: true, + reason: 'recorded', + }) + + const response = await POST( + new NextRequest('http://localhost/api/workflows/wf-1/executions/ex-1/cancel', { + method: 'POST', + }), + { + params: Promise.resolve({ id: 'wf-1', executionId: 'ex-1' }), + } + ) + + expect(response.status).toBe(200) + await expect(response.json()).resolves.toEqual({ + success: true, + executionId: 'ex-1', + redisAvailable: true, + durablyRecorded: true, + reason: 'recorded', + }) + }) + + it('returns unsuccessful response when Redis is unavailable', async () => { + mockMarkExecutionCancelled.mockResolvedValue({ + durablyRecorded: false, + reason: 'redis_unavailable', + }) + + const response = await POST( + new NextRequest('http://localhost/api/workflows/wf-1/executions/ex-1/cancel', { + method: 'POST', + }), + { + params: Promise.resolve({ id: 'wf-1', executionId: 'ex-1' }), + } + ) + + expect(response.status).toBe(200) + await expect(response.json()).resolves.toEqual({ + success: false, + executionId: 'ex-1', + redisAvailable: false, + durablyRecorded: false, + reason: 'redis_unavailable', + }) + }) + + it('returns unsuccessful response when Redis persistence fails', async () => { + mockMarkExecutionCancelled.mockResolvedValue({ + durablyRecorded: false, + reason: 'redis_write_failed', + }) + + const response = await POST( + new NextRequest('http://localhost/api/workflows/wf-1/executions/ex-1/cancel', { + method: 'POST', + }), + { + params: Promise.resolve({ id: 'wf-1', executionId: 'ex-1' }), + } + ) + + expect(response.status).toBe(200) + await expect(response.json()).resolves.toEqual({ + success: false, + executionId: 'ex-1', + redisAvailable: false, + durablyRecorded: false, + reason: 'redis_write_failed', + }) + }) +}) diff --git a/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.ts b/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.ts index 49c99e1ede6..abdacdcaa68 100644 --- a/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.ts +++ b/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.ts @@ -35,20 +35,23 @@ export async function POST( logger.info('Cancel execution requested', { workflowId, executionId, userId: auth.userId }) - const marked = await markExecutionCancelled(executionId) + const cancellation = await markExecutionCancelled(executionId) - if (marked) { + if (cancellation.durablyRecorded) { logger.info('Execution marked as cancelled in Redis', { executionId }) } else { - logger.info('Redis not available, cancellation will rely on connection close', { + logger.warn('Execution cancellation was not durably recorded', { executionId, + reason: cancellation.reason, }) } return NextResponse.json({ - success: true, + success: cancellation.durablyRecorded, executionId, - redisAvailable: marked, + redisAvailable: cancellation.durablyRecorded, + durablyRecorded: cancellation.durablyRecorded, + reason: cancellation.reason, }) } catch (error: any) { logger.error('Failed to cancel execution', { workflowId, executionId, error: error.message }) diff --git a/apps/sim/lib/execution/cancellation.test.ts b/apps/sim/lib/execution/cancellation.test.ts new file mode 100644 index 00000000000..f85e9991e67 --- /dev/null +++ b/apps/sim/lib/execution/cancellation.test.ts @@ -0,0 +1,48 @@ +import { loggerMock } from '@sim/testing' +import { beforeEach, describe, expect, it, vi } from 'vitest' + +const mockGetRedisClient = vi.fn() +const mockRedisSet = vi.fn() + +vi.mock('@sim/logger', () => loggerMock) + +vi.mock('@/lib/core/config/redis', () => ({ + getRedisClient: mockGetRedisClient, +})) + +import { markExecutionCancelled } from './cancellation' + +describe('markExecutionCancelled', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('returns redis_unavailable when no Redis client exists', async () => { + mockGetRedisClient.mockReturnValue(null) + + await expect(markExecutionCancelled('execution-1')).resolves.toEqual({ + durablyRecorded: false, + reason: 'redis_unavailable', + }) + }) + + it('returns recorded when Redis write succeeds', async () => { + mockRedisSet.mockResolvedValue('OK') + mockGetRedisClient.mockReturnValue({ set: mockRedisSet }) + + await expect(markExecutionCancelled('execution-1')).resolves.toEqual({ + durablyRecorded: true, + reason: 'recorded', + }) + }) + + it('returns redis_write_failed when Redis write throws', async () => { + mockRedisSet.mockRejectedValue(new Error('set failed')) + mockGetRedisClient.mockReturnValue({ set: mockRedisSet }) + + await expect(markExecutionCancelled('execution-1')).resolves.toEqual({ + durablyRecorded: false, + reason: 'redis_write_failed', + }) + }) +}) diff --git a/apps/sim/lib/execution/cancellation.ts b/apps/sim/lib/execution/cancellation.ts index 671209e9c66..26273f8521b 100644 --- a/apps/sim/lib/execution/cancellation.ts +++ b/apps/sim/lib/execution/cancellation.ts @@ -6,27 +6,36 @@ const logger = createLogger('ExecutionCancellation') const EXECUTION_CANCEL_PREFIX = 'execution:cancel:' const EXECUTION_CANCEL_EXPIRY = 60 * 60 +export type ExecutionCancellationRecordResult = + | { durablyRecorded: true; reason: 'recorded' } + | { + durablyRecorded: false + reason: 'redis_unavailable' | 'redis_write_failed' + } + export function isRedisCancellationEnabled(): boolean { return getRedisClient() !== null } /** * Mark an execution as cancelled in Redis. - * Returns true if Redis is available and the flag was set, false otherwise. + * Returns whether the cancellation was durably recorded. */ -export async function markExecutionCancelled(executionId: string): Promise { +export async function markExecutionCancelled( + executionId: string +): Promise { const redis = getRedisClient() if (!redis) { - return false + return { durablyRecorded: false, reason: 'redis_unavailable' } } try { await redis.set(`${EXECUTION_CANCEL_PREFIX}${executionId}`, '1', 'EX', EXECUTION_CANCEL_EXPIRY) logger.info('Marked execution as cancelled', { executionId }) - return true + return { durablyRecorded: true, reason: 'recorded' } } catch (error) { logger.error('Failed to mark execution as cancelled', { executionId, error }) - return false + return { durablyRecorded: false, reason: 'redis_write_failed' } } } From 2865ab3fb0f0cb8a0aa1e80f4e47de98c83164ae Mon Sep 17 00:00:00 2001 From: Yusuf Mirza Altay Date: Fri, 13 Mar 2026 02:00:41 +0300 Subject: [PATCH 2/5] fix: hoist cancellation test mocks Co-authored-by: Claude Opus 4.6 --- apps/sim/lib/execution/cancellation.test.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/sim/lib/execution/cancellation.test.ts b/apps/sim/lib/execution/cancellation.test.ts index f85e9991e67..e44704267bf 100644 --- a/apps/sim/lib/execution/cancellation.test.ts +++ b/apps/sim/lib/execution/cancellation.test.ts @@ -1,8 +1,10 @@ import { loggerMock } from '@sim/testing' import { beforeEach, describe, expect, it, vi } from 'vitest' -const mockGetRedisClient = vi.fn() -const mockRedisSet = vi.fn() +const { mockGetRedisClient, mockRedisSet } = vi.hoisted(() => ({ + mockGetRedisClient: vi.fn(), + mockRedisSet: vi.fn(), +})) vi.mock('@sim/logger', () => loggerMock) From a99aba21a9a6f475d9c52baf2962c5fa72bdbd5a Mon Sep 17 00:00:00 2001 From: Yusuf Mirza Altay Date: Fri, 13 Mar 2026 02:21:30 +0300 Subject: [PATCH 3/5] fix(sim): harden execution cancel durability Co-authored-by: Claude Opus 4.6 --- .../[id]/executions/[executionId]/cancel/route.test.ts | 2 +- .../api/workflows/[id]/executions/[executionId]/cancel/route.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.test.ts b/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.test.ts index 5470a275390..43fbd7ae2d0 100644 --- a/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.test.ts +++ b/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.test.ts @@ -104,7 +104,7 @@ describe('POST /api/workflows/[id]/executions/[executionId]/cancel', () => { await expect(response.json()).resolves.toEqual({ success: false, executionId: 'ex-1', - redisAvailable: false, + redisAvailable: true, durablyRecorded: false, reason: 'redis_write_failed', }) diff --git a/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.ts b/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.ts index abdacdcaa68..18e6d986b18 100644 --- a/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.ts +++ b/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.ts @@ -49,7 +49,7 @@ export async function POST( return NextResponse.json({ success: cancellation.durablyRecorded, executionId, - redisAvailable: cancellation.durablyRecorded, + redisAvailable: cancellation.reason !== 'redis_unavailable', durablyRecorded: cancellation.durablyRecorded, reason: cancellation.reason, }) From d6222b3890be9ba5ef0db7e5e31c462cb71141b7 Mon Sep 17 00:00:00 2001 From: test Date: Fri, 13 Mar 2026 13:34:30 +0300 Subject: [PATCH 4/5] fix(sim): fallback manual cancel without redis Abort active manual SSE executions locally when Redis cannot durably record the cancellation marker so the run still finalizes as cancelled instead of completing normally. --- .../app/api/workflows/[id]/execute/route.ts | 12 +++++++ .../[executionId]/cancel/route.test.ts | 36 +++++++++++++++++++ .../executions/[executionId]/cancel/route.ts | 7 +++- apps/sim/lib/execution/cancellation.test.ts | 34 ++++++++++++++++++ apps/sim/lib/execution/manual-cancellation.ts | 19 ++++++++++ 5 files changed, 107 insertions(+), 1 deletion(-) create mode 100644 apps/sim/lib/execution/manual-cancellation.ts diff --git a/apps/sim/app/api/workflows/[id]/execute/route.ts b/apps/sim/app/api/workflows/[id]/execute/route.ts index 1200debd413..db7c8d203ee 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.ts @@ -20,6 +20,10 @@ import { } from '@/lib/execution/call-chain' import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/event-buffer' import { processInputFileFields } from '@/lib/execution/files' +import { + registerManualExecutionAborter, + unregisterManualExecutionAborter, +} from '@/lib/execution/manual-cancellation' import { preprocessExecution } from '@/lib/execution/preprocessing' import { LoggingSession } from '@/lib/logs/execution/logging-session' import { @@ -775,6 +779,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: const encoder = new TextEncoder() const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.sync) let isStreamClosed = false + let isManualAbortRegistered = false const eventWriter = createExecutionEventWriter(executionId) setExecutionMeta(executionId, { @@ -787,6 +792,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: async start(controller) { let finalMetaStatus: 'complete' | 'error' | 'cancelled' | null = null + registerManualExecutionAborter(executionId, timeoutController.abort) + isManualAbortRegistered = true + const sendEvent = (event: ExecutionEvent) => { if (!isStreamClosed) { try { @@ -1154,6 +1162,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id: }) finalMetaStatus = 'error' } finally { + if (isManualAbortRegistered) { + unregisterManualExecutionAborter(executionId) + isManualAbortRegistered = false + } try { await eventWriter.close() } catch (closeError) { diff --git a/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.test.ts b/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.test.ts index 43fbd7ae2d0..e3f373675a0 100644 --- a/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.test.ts +++ b/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.test.ts @@ -8,6 +8,7 @@ import { beforeEach, describe, expect, it, vi } from 'vitest' const mockCheckHybridAuth = vi.fn() const mockAuthorizeWorkflowByWorkspacePermission = vi.fn() const mockMarkExecutionCancelled = vi.fn() +const mockAbortManualExecution = vi.fn() vi.mock('@sim/logger', () => ({ createLogger: () => ({ info: vi.fn(), warn: vi.fn(), error: vi.fn() }), @@ -21,6 +22,10 @@ vi.mock('@/lib/execution/cancellation', () => ({ markExecutionCancelled: (...args: unknown[]) => mockMarkExecutionCancelled(...args), })) +vi.mock('@/lib/execution/manual-cancellation', () => ({ + abortManualExecution: (...args: unknown[]) => mockAbortManualExecution(...args), +})) + vi.mock('@/lib/workflows/utils', () => ({ authorizeWorkflowByWorkspacePermission: (params: unknown) => mockAuthorizeWorkflowByWorkspacePermission(params), @@ -33,6 +38,7 @@ describe('POST /api/workflows/[id]/executions/[executionId]/cancel', () => { vi.clearAllMocks() mockCheckHybridAuth.mockResolvedValue({ success: true, userId: 'user-1' }) mockAuthorizeWorkflowByWorkspacePermission.mockResolvedValue({ allowed: true }) + mockAbortManualExecution.mockReturnValue(false) }) it('returns success when cancellation was durably recorded', async () => { @@ -56,6 +62,7 @@ describe('POST /api/workflows/[id]/executions/[executionId]/cancel', () => { executionId: 'ex-1', redisAvailable: true, durablyRecorded: true, + locallyAborted: false, reason: 'recorded', }) }) @@ -81,6 +88,7 @@ describe('POST /api/workflows/[id]/executions/[executionId]/cancel', () => { executionId: 'ex-1', redisAvailable: false, durablyRecorded: false, + locallyAborted: false, reason: 'redis_unavailable', }) }) @@ -106,7 +114,35 @@ describe('POST /api/workflows/[id]/executions/[executionId]/cancel', () => { executionId: 'ex-1', redisAvailable: true, durablyRecorded: false, + locallyAborted: false, reason: 'redis_write_failed', }) }) + + it('returns success when local fallback aborts execution without Redis durability', async () => { + mockMarkExecutionCancelled.mockResolvedValue({ + durablyRecorded: false, + reason: 'redis_unavailable', + }) + mockAbortManualExecution.mockReturnValue(true) + + const response = await POST( + new NextRequest('http://localhost/api/workflows/wf-1/executions/ex-1/cancel', { + method: 'POST', + }), + { + params: Promise.resolve({ id: 'wf-1', executionId: 'ex-1' }), + } + ) + + expect(response.status).toBe(200) + await expect(response.json()).resolves.toEqual({ + success: true, + executionId: 'ex-1', + redisAvailable: false, + durablyRecorded: false, + locallyAborted: true, + reason: 'redis_unavailable', + }) + }) }) diff --git a/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.ts b/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.ts index 18e6d986b18..04c24abbb28 100644 --- a/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.ts +++ b/apps/sim/app/api/workflows/[id]/executions/[executionId]/cancel/route.ts @@ -2,6 +2,7 @@ import { createLogger } from '@sim/logger' import { type NextRequest, NextResponse } from 'next/server' import { checkHybridAuth } from '@/lib/auth/hybrid' import { markExecutionCancelled } from '@/lib/execution/cancellation' +import { abortManualExecution } from '@/lib/execution/manual-cancellation' import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils' const logger = createLogger('CancelExecutionAPI') @@ -36,9 +37,12 @@ export async function POST( logger.info('Cancel execution requested', { workflowId, executionId, userId: auth.userId }) const cancellation = await markExecutionCancelled(executionId) + const locallyAborted = abortManualExecution(executionId) if (cancellation.durablyRecorded) { logger.info('Execution marked as cancelled in Redis', { executionId }) + } else if (locallyAborted) { + logger.info('Execution cancelled via local in-process fallback', { executionId }) } else { logger.warn('Execution cancellation was not durably recorded', { executionId, @@ -47,10 +51,11 @@ export async function POST( } return NextResponse.json({ - success: cancellation.durablyRecorded, + success: cancellation.durablyRecorded || locallyAborted, executionId, redisAvailable: cancellation.reason !== 'redis_unavailable', durablyRecorded: cancellation.durablyRecorded, + locallyAborted, reason: cancellation.reason, }) } catch (error: any) { diff --git a/apps/sim/lib/execution/cancellation.test.ts b/apps/sim/lib/execution/cancellation.test.ts index e44704267bf..d68fe46a2bf 100644 --- a/apps/sim/lib/execution/cancellation.test.ts +++ b/apps/sim/lib/execution/cancellation.test.ts @@ -13,6 +13,11 @@ vi.mock('@/lib/core/config/redis', () => ({ })) import { markExecutionCancelled } from './cancellation' +import { + abortManualExecution, + registerManualExecutionAborter, + unregisterManualExecutionAborter, +} from './manual-cancellation' describe('markExecutionCancelled', () => { beforeEach(() => { @@ -48,3 +53,32 @@ describe('markExecutionCancelled', () => { }) }) }) + +describe('manual execution cancellation registry', () => { + beforeEach(() => { + unregisterManualExecutionAborter('execution-1') + }) + + it('aborts registered executions', () => { + const abort = vi.fn() + + registerManualExecutionAborter('execution-1', abort) + + expect(abortManualExecution('execution-1')).toBe(true) + expect(abort).toHaveBeenCalledTimes(1) + }) + + it('returns false when no execution is registered', () => { + expect(abortManualExecution('execution-missing')).toBe(false) + }) + + it('unregisters executions', () => { + const abort = vi.fn() + + registerManualExecutionAborter('execution-1', abort) + unregisterManualExecutionAborter('execution-1') + + expect(abortManualExecution('execution-1')).toBe(false) + expect(abort).not.toHaveBeenCalled() + }) +}) diff --git a/apps/sim/lib/execution/manual-cancellation.ts b/apps/sim/lib/execution/manual-cancellation.ts new file mode 100644 index 00000000000..5e5da4b5187 --- /dev/null +++ b/apps/sim/lib/execution/manual-cancellation.ts @@ -0,0 +1,19 @@ +const activeExecutionAborters = new Map void>() + +export function registerManualExecutionAborter(executionId: string, abort: () => void): void { + activeExecutionAborters.set(executionId, abort) +} + +export function unregisterManualExecutionAborter(executionId: string): void { + activeExecutionAborters.delete(executionId) +} + +export function abortManualExecution(executionId: string): boolean { + const abort = activeExecutionAborters.get(executionId) + if (!abort) { + return false + } + + abort() + return true +} From 7aed473e167d2eb3952a9a22265c215d35238db1 Mon Sep 17 00:00:00 2001 From: test Date: Fri, 13 Mar 2026 14:00:00 +0300 Subject: [PATCH 5/5] test: mock AuthType in async execute route Keep the rebased async execute route test aligned with the current hybrid auth module exports so it exercises the queueing path instead of failing at import time. --- apps/sim/app/api/workflows/[id]/execute/route.async.test.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/apps/sim/app/api/workflows/[id]/execute/route.async.test.ts b/apps/sim/app/api/workflows/[id]/execute/route.async.test.ts index 0a9aa008ba9..1afe4f8b1df 100644 --- a/apps/sim/app/api/workflows/[id]/execute/route.async.test.ts +++ b/apps/sim/app/api/workflows/[id]/execute/route.async.test.ts @@ -18,6 +18,11 @@ const { })) vi.mock('@/lib/auth/hybrid', () => ({ + AuthType: { + SESSION: 'session', + API_KEY: 'api_key', + INTERNAL_JWT: 'internal_jwt', + }, checkHybridAuth: mockCheckHybridAuth, }))