Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 28 additions & 24 deletions apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,28 +121,14 @@ export class PauseResumeManager {

const now = new Date()

await db
.insert(pausedExecutions)
.values({
id: randomUUID(),
workflowId,
executionId,
executionSnapshot: snapshotSeed,
pausePoints: pausePointsRecord,
totalPauseCount: pausePoints.length,
resumedCount: 0,
status: 'paused',
metadata: {
pauseScope: 'execution',
triggerIds: snapshotSeed.triggerIds,
executorUserId: executorUserId ?? null,
},
pausedAt: now,
updatedAt: now,
})
.onConflictDoUpdate({
target: pausedExecutions.executionId,
set: {
// Wrap persistence in a transaction to prevent race conditions with concurrent resume requests
await db.transaction(async (tx) => {
await tx
.insert(pausedExecutions)
.values({
id: randomUUID(),
workflowId,
executionId,
executionSnapshot: snapshotSeed,
pausePoints: pausePointsRecord,
totalPauseCount: pausePoints.length,
Expand All @@ -153,10 +139,28 @@ export class PauseResumeManager {
triggerIds: snapshotSeed.triggerIds,
executorUserId: executorUserId ?? null,
},
pausedAt: now,
updatedAt: now,
},
})
})
.onConflictDoUpdate({
target: pausedExecutions.executionId,
set: {
executionSnapshot: snapshotSeed,
pausePoints: pausePointsRecord,
totalPauseCount: pausePoints.length,
resumedCount: 0,
status: 'paused',
metadata: {
pauseScope: 'execution',
triggerIds: snapshotSeed.triggerIds,
executorUserId: executorUserId ?? null,
},
updatedAt: now,
},
})
})

// Process queued resumes after transaction commits to ensure visibility
await PauseResumeManager.processQueuedResumes(executionId)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* @vitest-environment node
*
* Tests for Issue #3081: Race Condition between pause persistence and resume requests
*/
import { databaseMock, loggerMock } from '@sim/testing'
import { beforeEach, describe, expect, it, vi } from 'vitest'

vi.mock('@sim/db', () => databaseMock)
vi.mock('@sim/logger', () => loggerMock)

import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
import type { PausePoint, SerializedSnapshot } from '@/executor/types'

describe('PauseResumeManager - Race Condition Fix (#3081)', () => {
beforeEach(() => {
vi.clearAllMocks()
})

const createTestSnapshot = (): SerializedSnapshot => ({
snapshot: JSON.stringify({
workflow: { blocks: [], connections: [] },
state: { blockStates: {}, executedBlocks: [] },
}),
triggerIds: [],
})

const createTestPausePoints = (): PausePoint[] => [
{
contextId: 'test-context',
blockId: 'pause-block-1',
response: {},
resumeStatus: 'paused',
snapshotReady: true,
registeredAt: new Date().toISOString(),
},
]

describe('persistPauseResult', () => {
it.concurrent('should use database transaction for atomic persistence', async () => {
const mockInsert = vi.fn().mockReturnValue({
values: vi.fn().mockReturnValue({
onConflictDoUpdate: vi.fn().mockResolvedValue(undefined),
}),
})

const mockTransaction = vi.fn().mockImplementation(async (callback) => {
const mockTx = { insert: mockInsert }
return await callback(mockTx as any)
})

vi.mocked(databaseMock.db.transaction).mockImplementation(mockTransaction)
vi.spyOn(PauseResumeManager, 'processQueuedResumes').mockResolvedValue(undefined)

await PauseResumeManager.persistPauseResult({
workflowId: 'test-workflow',
executionId: 'test-execution',
pausePoints: createTestPausePoints(),
snapshotSeed: createTestSnapshot(),
executorUserId: 'test-user',
})

expect(mockTransaction).toHaveBeenCalledTimes(1)
expect(mockInsert).toHaveBeenCalled()
})

it.concurrent('should call processQueuedResumes after transaction', async () => {
const mockInsert = vi.fn().mockReturnValue({
values: vi.fn().mockReturnValue({
onConflictDoUpdate: vi.fn().mockResolvedValue(undefined),
}),
})

vi.mocked(databaseMock.db.transaction).mockImplementation(async (callback) => {
const mockTx = { insert: mockInsert }
return await callback(mockTx as any)
})

const processQueuedResumesSpy = vi
.spyOn(PauseResumeManager, 'processQueuedResumes')
.mockResolvedValue(undefined)

await PauseResumeManager.persistPauseResult({
workflowId: 'test-workflow',
executionId: 'test-execution',
pausePoints: createTestPausePoints(),
snapshotSeed: createTestSnapshot(),
executorUserId: 'test-user',
})

expect(processQueuedResumesSpy).toHaveBeenCalledWith('test-execution')
})
})
})