Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions apps/sim/app/api/workflows/[id]/execute/route.async.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ const {
}))

vi.mock('@/lib/auth/hybrid', () => ({
AuthType: {
SESSION: 'session',
API_KEY: 'api_key',
INTERNAL_JWT: 'internal_jwt',
},
checkHybridAuth: mockCheckHybridAuth,
}))

Expand Down
12 changes: 12 additions & 0 deletions apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import {
} from '@/lib/execution/call-chain'
import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/event-buffer'
import { processInputFileFields } from '@/lib/execution/files'
import {
registerManualExecutionAborter,
unregisterManualExecutionAborter,
} from '@/lib/execution/manual-cancellation'
import { preprocessExecution } from '@/lib/execution/preprocessing'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import {
Expand Down Expand Up @@ -775,6 +779,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
const encoder = new TextEncoder()
const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.sync)
let isStreamClosed = false
let isManualAbortRegistered = false

const eventWriter = createExecutionEventWriter(executionId)
setExecutionMeta(executionId, {
Expand All @@ -787,6 +792,9 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
async start(controller) {
let finalMetaStatus: 'complete' | 'error' | 'cancelled' | null = null

registerManualExecutionAborter(executionId, timeoutController.abort)
isManualAbortRegistered = true

const sendEvent = (event: ExecutionEvent) => {
if (!isStreamClosed) {
try {
Expand Down Expand Up @@ -1154,6 +1162,10 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
})
finalMetaStatus = 'error'
} finally {
if (isManualAbortRegistered) {
unregisterManualExecutionAborter(executionId)
isManualAbortRegistered = false
}
try {
await eventWriter.close()
} catch (closeError) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
/**
* @vitest-environment node
*/

import { NextRequest } from 'next/server'
import { beforeEach, describe, expect, it, vi } from 'vitest'

const mockCheckHybridAuth = vi.fn()
const mockAuthorizeWorkflowByWorkspacePermission = vi.fn()
const mockMarkExecutionCancelled = vi.fn()
const mockAbortManualExecution = vi.fn()

vi.mock('@sim/logger', () => ({
createLogger: () => ({ info: vi.fn(), warn: vi.fn(), error: vi.fn() }),
}))

vi.mock('@/lib/auth/hybrid', () => ({
checkHybridAuth: (...args: unknown[]) => mockCheckHybridAuth(...args),
}))

vi.mock('@/lib/execution/cancellation', () => ({
markExecutionCancelled: (...args: unknown[]) => mockMarkExecutionCancelled(...args),
}))

vi.mock('@/lib/execution/manual-cancellation', () => ({
abortManualExecution: (...args: unknown[]) => mockAbortManualExecution(...args),
}))

vi.mock('@/lib/workflows/utils', () => ({
authorizeWorkflowByWorkspacePermission: (params: unknown) =>
mockAuthorizeWorkflowByWorkspacePermission(params),
}))

import { POST } from './route'

describe('POST /api/workflows/[id]/executions/[executionId]/cancel', () => {
beforeEach(() => {
vi.clearAllMocks()
mockCheckHybridAuth.mockResolvedValue({ success: true, userId: 'user-1' })
mockAuthorizeWorkflowByWorkspacePermission.mockResolvedValue({ allowed: true })
mockAbortManualExecution.mockReturnValue(false)
})

it('returns success when cancellation was durably recorded', async () => {
mockMarkExecutionCancelled.mockResolvedValue({
durablyRecorded: true,
reason: 'recorded',
})

const response = await POST(
new NextRequest('http://localhost/api/workflows/wf-1/executions/ex-1/cancel', {
method: 'POST',
}),
{
params: Promise.resolve({ id: 'wf-1', executionId: 'ex-1' }),
}
)

expect(response.status).toBe(200)
await expect(response.json()).resolves.toEqual({
success: true,
executionId: 'ex-1',
redisAvailable: true,
durablyRecorded: true,
locallyAborted: false,
reason: 'recorded',
})
})

it('returns unsuccessful response when Redis is unavailable', async () => {
mockMarkExecutionCancelled.mockResolvedValue({
durablyRecorded: false,
reason: 'redis_unavailable',
})

const response = await POST(
new NextRequest('http://localhost/api/workflows/wf-1/executions/ex-1/cancel', {
method: 'POST',
}),
{
params: Promise.resolve({ id: 'wf-1', executionId: 'ex-1' }),
}
)

expect(response.status).toBe(200)
await expect(response.json()).resolves.toEqual({
success: false,
executionId: 'ex-1',
redisAvailable: false,
durablyRecorded: false,
locallyAborted: false,
reason: 'redis_unavailable',
})
})

it('returns unsuccessful response when Redis persistence fails', async () => {
mockMarkExecutionCancelled.mockResolvedValue({
durablyRecorded: false,
reason: 'redis_write_failed',
})

const response = await POST(
new NextRequest('http://localhost/api/workflows/wf-1/executions/ex-1/cancel', {
method: 'POST',
}),
{
params: Promise.resolve({ id: 'wf-1', executionId: 'ex-1' }),
}
)

expect(response.status).toBe(200)
await expect(response.json()).resolves.toEqual({
success: false,
executionId: 'ex-1',
redisAvailable: true,
durablyRecorded: false,
locallyAborted: false,
reason: 'redis_write_failed',
})
})

it('returns success when local fallback aborts execution without Redis durability', async () => {
mockMarkExecutionCancelled.mockResolvedValue({
durablyRecorded: false,
reason: 'redis_unavailable',
})
mockAbortManualExecution.mockReturnValue(true)

const response = await POST(
new NextRequest('http://localhost/api/workflows/wf-1/executions/ex-1/cancel', {
method: 'POST',
}),
{
params: Promise.resolve({ id: 'wf-1', executionId: 'ex-1' }),
}
)

expect(response.status).toBe(200)
await expect(response.json()).resolves.toEqual({
success: true,
executionId: 'ex-1',
redisAvailable: false,
durablyRecorded: false,
locallyAborted: true,
reason: 'redis_unavailable',
})
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { markExecutionCancelled } from '@/lib/execution/cancellation'
import { abortManualExecution } from '@/lib/execution/manual-cancellation'
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'

const logger = createLogger('CancelExecutionAPI')
Expand Down Expand Up @@ -35,20 +36,27 @@ export async function POST(

logger.info('Cancel execution requested', { workflowId, executionId, userId: auth.userId })

const marked = await markExecutionCancelled(executionId)
const cancellation = await markExecutionCancelled(executionId)
const locallyAborted = abortManualExecution(executionId)

if (marked) {
if (cancellation.durablyRecorded) {
logger.info('Execution marked as cancelled in Redis', { executionId })
} else if (locallyAborted) {
logger.info('Execution cancelled via local in-process fallback', { executionId })
} else {
logger.info('Redis not available, cancellation will rely on connection close', {
logger.warn('Execution cancellation was not durably recorded', {
executionId,
reason: cancellation.reason,
})
}

return NextResponse.json({
success: true,
success: cancellation.durablyRecorded || locallyAborted,
executionId,
redisAvailable: marked,
redisAvailable: cancellation.reason !== 'redis_unavailable',
durablyRecorded: cancellation.durablyRecorded,
locallyAborted,
reason: cancellation.reason,
})
} catch (error: any) {
logger.error('Failed to cancel execution', { workflowId, executionId, error: error.message })
Expand Down
84 changes: 84 additions & 0 deletions apps/sim/lib/execution/cancellation.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { loggerMock } from '@sim/testing'
import { beforeEach, describe, expect, it, vi } from 'vitest'

const { mockGetRedisClient, mockRedisSet } = vi.hoisted(() => ({
mockGetRedisClient: vi.fn(),
mockRedisSet: vi.fn(),
}))

vi.mock('@sim/logger', () => loggerMock)

vi.mock('@/lib/core/config/redis', () => ({
getRedisClient: mockGetRedisClient,
}))

import { markExecutionCancelled } from './cancellation'
import {
abortManualExecution,
registerManualExecutionAborter,
unregisterManualExecutionAborter,
} from './manual-cancellation'

describe('markExecutionCancelled', () => {
beforeEach(() => {
vi.clearAllMocks()
})

it('returns redis_unavailable when no Redis client exists', async () => {
mockGetRedisClient.mockReturnValue(null)

await expect(markExecutionCancelled('execution-1')).resolves.toEqual({
durablyRecorded: false,
reason: 'redis_unavailable',
})
})

it('returns recorded when Redis write succeeds', async () => {
mockRedisSet.mockResolvedValue('OK')
mockGetRedisClient.mockReturnValue({ set: mockRedisSet })

await expect(markExecutionCancelled('execution-1')).resolves.toEqual({
durablyRecorded: true,
reason: 'recorded',
})
})

it('returns redis_write_failed when Redis write throws', async () => {
mockRedisSet.mockRejectedValue(new Error('set failed'))
mockGetRedisClient.mockReturnValue({ set: mockRedisSet })

await expect(markExecutionCancelled('execution-1')).resolves.toEqual({
durablyRecorded: false,
reason: 'redis_write_failed',
})
})
})

describe('manual execution cancellation registry', () => {
beforeEach(() => {
unregisterManualExecutionAborter('execution-1')
})

it('aborts registered executions', () => {
const abort = vi.fn()

registerManualExecutionAborter('execution-1', abort)

expect(abortManualExecution('execution-1')).toBe(true)
expect(abort).toHaveBeenCalledTimes(1)
})

it('returns false when no execution is registered', () => {
expect(abortManualExecution('execution-missing')).toBe(false)
})

it('unregisters executions', () => {
const abort = vi.fn()

registerManualExecutionAborter('execution-1', abort)
unregisterManualExecutionAborter('execution-1')

expect(abortManualExecution('execution-1')).toBe(false)
expect(abort).not.toHaveBeenCalled()
})
})
19 changes: 14 additions & 5 deletions apps/sim/lib/execution/cancellation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,36 @@ const logger = createLogger('ExecutionCancellation')
const EXECUTION_CANCEL_PREFIX = 'execution:cancel:'
const EXECUTION_CANCEL_EXPIRY = 60 * 60

export type ExecutionCancellationRecordResult =
| { durablyRecorded: true; reason: 'recorded' }
| {
durablyRecorded: false
reason: 'redis_unavailable' | 'redis_write_failed'
}

export function isRedisCancellationEnabled(): boolean {
return getRedisClient() !== null
}

/**
* Mark an execution as cancelled in Redis.
* Returns true if Redis is available and the flag was set, false otherwise.
* Returns whether the cancellation was durably recorded.
*/
export async function markExecutionCancelled(executionId: string): Promise<boolean> {
export async function markExecutionCancelled(
executionId: string
): Promise<ExecutionCancellationRecordResult> {
const redis = getRedisClient()
if (!redis) {
return false
return { durablyRecorded: false, reason: 'redis_unavailable' }
}

try {
await redis.set(`${EXECUTION_CANCEL_PREFIX}${executionId}`, '1', 'EX', EXECUTION_CANCEL_EXPIRY)
logger.info('Marked execution as cancelled', { executionId })
return true
return { durablyRecorded: true, reason: 'recorded' }
} catch (error) {
logger.error('Failed to mark execution as cancelled', { executionId, error })
return false
return { durablyRecorded: false, reason: 'redis_write_failed' }
}
}

Expand Down
19 changes: 19 additions & 0 deletions apps/sim/lib/execution/manual-cancellation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
const activeExecutionAborters = new Map<string, () => void>()

export function registerManualExecutionAborter(executionId: string, abort: () => void): void {
activeExecutionAborters.set(executionId, abort)
}

export function unregisterManualExecutionAborter(executionId: string): void {
activeExecutionAborters.delete(executionId)
}

export function abortManualExecution(executionId: string): boolean {
const abort = activeExecutionAborters.get(executionId)
if (!abort) {
return false
}

abort()
return true
}
Loading