Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apps/sim/app/api/workflows/[id]/execute/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
input: callbackData.input,
error: callbackData.output.error,
durationMs: callbackData.executionTime || 0,
startedAt: callbackData.startedAt,
endedAt: callbackData.endedAt,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
Expand All @@ -641,6 +643,8 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
input: callbackData.input,
output: callbackData.output,
durationMs: callbackData.executionTime || 0,
startedAt: callbackData.startedAt,
endedAt: callbackData.endedAt,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ export function getBlockColor(blockType: string): string {
*/
export function formatDuration(ms?: number): string {
if (ms === undefined || ms === null) return '-'
if (ms < 1000) return `${ms}ms`
if (ms < 1000) {
return `${Math.round(ms)}ms`
}
return `${(ms / 1000).toFixed(2)}s`
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -964,8 +964,8 @@ export function useWorkflowExecution() {
const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel'
if (isContainerBlock) return

const startedAt = new Date(Date.now() - data.durationMs).toISOString()
const endedAt = new Date().toISOString()
const startedAt = data.startedAt
const endedAt = data.endedAt

accumulatedBlockLogs.push({
blockId: data.blockId,
Expand Down Expand Up @@ -1013,8 +1013,8 @@ export function useWorkflowExecution() {
// Track failed block execution in run path
setBlockRunStatus(data.blockId, 'error')

const startedAt = new Date(Date.now() - data.durationMs).toISOString()
const endedAt = new Date().toISOString()
const startedAt = data.startedAt
const endedAt = data.endedAt

// Accumulate block error log for the execution result
accumulatedBlockLogs.push({
Expand Down Expand Up @@ -1603,8 +1603,8 @@ export function useWorkflowExecution() {
const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel'
if (isContainerBlock) return

const startedAt = new Date(Date.now() - data.durationMs).toISOString()
const endedAt = new Date().toISOString()
const startedAt = data.startedAt
const endedAt = data.endedAt

accumulatedBlockLogs.push({
blockId: data.blockId,
Expand Down Expand Up @@ -1642,8 +1642,8 @@ export function useWorkflowExecution() {

setBlockRunStatus(data.blockId, 'error')

const startedAt = new Date(Date.now() - data.durationMs).toISOString()
const endedAt = new Date().toISOString()
const startedAt = data.startedAt
const endedAt = data.endedAt

accumulatedBlockLogs.push({
blockId: data.blockId,
Expand Down
20 changes: 14 additions & 6 deletions apps/sim/executor/execution/block-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export class BlockExecutor {
this.callOnBlockStart(ctx, node, block)
}

const startTime = Date.now()
const startTime = performance.now()
let resolvedInputs: Record<string, any> = {}

const nodeMetadata = this.buildNodeMetadata(node)
Expand Down Expand Up @@ -145,7 +145,7 @@ export class BlockExecutor {
})) as NormalizedBlockOutput
}

const duration = Date.now() - startTime
const duration = performance.now() - startTime

if (blockLog) {
blockLog.endedAt = new Date().toISOString()
Expand All @@ -169,7 +169,9 @@ export class BlockExecutor {
block,
this.sanitizeInputsForLog(resolvedInputs),
displayOutput,
duration
duration,
blockLog!.startedAt,
blockLog!.endedAt
)
}

Expand Down Expand Up @@ -221,7 +223,7 @@ export class BlockExecutor {
isSentinel: boolean,
phase: 'input_resolution' | 'execution'
): NormalizedBlockOutput {
const duration = Date.now() - startTime
const duration = performance.now() - startTime
const errorMessage = normalizeError(error)
const hasResolvedInputs =
resolvedInputs && typeof resolvedInputs === 'object' && Object.keys(resolvedInputs).length > 0
Expand Down Expand Up @@ -274,7 +276,9 @@ export class BlockExecutor {
block,
this.sanitizeInputsForLog(input),
displayOutput,
duration
duration,
blockLog!.startedAt,
blockLog!.endedAt
)
}

Expand Down Expand Up @@ -423,7 +427,9 @@ export class BlockExecutor {
block: SerializedBlock,
input: Record<string, any>,
output: NormalizedBlockOutput,
duration: number
duration: number,
startedAt: string,
endedAt: string
): void {
const blockId = node.id
const blockName = block.metadata?.name ?? blockId
Expand All @@ -440,6 +446,8 @@ export class BlockExecutor {
input,
output,
executionTime: duration,
startedAt,
endedAt,
},
iterationContext
)
Expand Down
14 changes: 7 additions & 7 deletions apps/sim/executor/execution/engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ export class ExecutionEngine {
}

async run(triggerBlockId?: string): Promise<ExecutionResult> {
const startTime = Date.now()
const startTime = performance.now()
try {
this.initializeQueue(triggerBlockId)

Expand All @@ -125,8 +125,8 @@ export class ExecutionEngine {
return this.buildPausedResult(startTime)
}

const endTime = Date.now()
this.context.metadata.endTime = new Date(endTime).toISOString()
const endTime = performance.now()
this.context.metadata.endTime = new Date().toISOString()
this.context.metadata.duration = endTime - startTime

if (this.cancelledFlag) {
Expand All @@ -146,8 +146,8 @@ export class ExecutionEngine {
metadata: this.context.metadata,
}
} catch (error) {
const endTime = Date.now()
this.context.metadata.endTime = new Date(endTime).toISOString()
const endTime = performance.now()
this.context.metadata.endTime = new Date().toISOString()
this.context.metadata.duration = endTime - startTime

if (this.cancelledFlag) {
Expand Down Expand Up @@ -433,8 +433,8 @@ export class ExecutionEngine {
}

private buildPausedResult(startTime: number): ExecutionResult {
const endTime = Date.now()
this.context.metadata.endTime = new Date(endTime).toISOString()
const endTime = performance.now()
this.context.metadata.endTime = new Date().toISOString()
this.context.metadata.duration = endTime - startTime
this.context.metadata.status = 'paused'

Expand Down
8 changes: 7 additions & 1 deletion apps/sim/executor/execution/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,13 @@ export interface ContextExtensions {
blockId: string,
blockName: string,
blockType: string,
output: { input?: any; output: NormalizedBlockOutput; executionTime: number },
output: {
input?: any
output: NormalizedBlockOutput
executionTime: number
startedAt: string
endedAt: string
},
iterationContext?: IterationContext
) => Promise<void>

Expand Down
3 changes: 3 additions & 0 deletions apps/sim/executor/orchestrators/loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,12 @@ export class LoopOrchestrator {

// Emit onBlockComplete for the loop container so the UI can track it
if (this.contextExtensions?.onBlockComplete) {
const now = new Date().toISOString()
this.contextExtensions.onBlockComplete(loopId, 'Loop', 'loop', {
output,
executionTime: DEFAULTS.EXECUTION_TIME,
startedAt: now,
endedAt: now,
})
}

Expand Down
3 changes: 3 additions & 0 deletions apps/sim/executor/orchestrators/parallel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,12 @@ export class ParallelOrchestrator {

// Emit onBlockComplete for the parallel container so the UI can track it
if (this.contextExtensions?.onBlockComplete) {
const now = new Date().toISOString()
this.contextExtensions.onBlockComplete(parallelId, 'Parallel', 'parallel', {
output,
executionTime: 0,
startedAt: now,
endedAt: now,
})
}

Expand Down
2 changes: 2 additions & 0 deletions apps/sim/executor/utils/subflow-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ export function addSubflowErrorLog(
input: inputData,
output: { error: errorMessage },
executionTime: 0,
startedAt: now,
endedAt: now,
})
}
}
65 changes: 21 additions & 44 deletions apps/sim/hooks/use-execution-stream.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,18 @@
import { useCallback, useRef } from 'react'
import { createLogger } from '@sim/logger'
import type { ExecutionEvent } from '@/lib/workflows/executor/execution-events'
import type {
BlockCompletedData,
BlockErrorData,
BlockStartedData,
ExecutionCancelledData,
ExecutionCompletedData,
ExecutionErrorData,
ExecutionEvent,
ExecutionStartedData,
StreamChunkData,
StreamDoneData,
} from '@/lib/workflows/executor/execution-events'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type { SubflowType } from '@/stores/workflows/workflow/types'

const logger = createLogger('useExecutionStream')

Expand Down Expand Up @@ -81,48 +91,15 @@ async function processSSEStream(
}

export interface ExecutionStreamCallbacks {
onExecutionStarted?: (data: { startTime: string }) => void
onExecutionCompleted?: (data: {
success: boolean
output: any
duration: number
startTime: string
endTime: string
}) => void
onExecutionError?: (data: { error: string; duration: number }) => void
onExecutionCancelled?: (data: { duration: number }) => void
onBlockStarted?: (data: {
blockId: string
blockName: string
blockType: string
iterationCurrent?: number
iterationTotal?: number
iterationType?: SubflowType
}) => void
onBlockCompleted?: (data: {
blockId: string
blockName: string
blockType: string
input?: any
output: any
durationMs: number
iterationCurrent?: number
iterationTotal?: number
iterationType?: SubflowType
}) => void
onBlockError?: (data: {
blockId: string
blockName: string
blockType: string
input?: any
error: string
durationMs: number
iterationCurrent?: number
iterationTotal?: number
iterationType?: SubflowType
}) => void
onStreamChunk?: (data: { blockId: string; chunk: string }) => void
onStreamDone?: (data: { blockId: string }) => void
onExecutionStarted?: (data: ExecutionStartedData) => void
onExecutionCompleted?: (data: ExecutionCompletedData) => void
onExecutionError?: (data: ExecutionErrorData) => void
onExecutionCancelled?: (data: ExecutionCancelledData) => void
onBlockStarted?: (data: BlockStartedData) => void
onBlockCompleted?: (data: BlockCompletedData) => void
onBlockError?: (data: BlockErrorData) => void
onStreamChunk?: (data: StreamChunkData) => void
onStreamDone?: (data: StreamDoneData) => void
}

export interface ExecuteStreamOptions {
Expand Down
8 changes: 7 additions & 1 deletion apps/sim/lib/workflows/executor/execution-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,13 @@ export async function executeWorkflowCore(
blockId: string,
blockName: string,
blockType: string,
output: { input?: unknown; output: NormalizedBlockOutput; executionTime: number },
output: {
input?: unknown
output: NormalizedBlockOutput
executionTime: number
startedAt: string
endedAt: string
},
iterationContext?: IterationContext
) => {
await loggingSession.onBlockComplete(blockId, blockName, blockType, output)
Expand Down
29 changes: 28 additions & 1 deletion apps/sim/lib/workflows/executor/execution-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ export interface BlockCompletedEvent extends BaseExecutionEvent {
input?: any
output: any
durationMs: number
startedAt: string
endedAt: string
// Iteration context for loops and parallels
iterationCurrent?: number
iterationTotal?: number
Expand All @@ -123,6 +125,8 @@ export interface BlockErrorEvent extends BaseExecutionEvent {
input?: any
error: string
durationMs: number
startedAt: string
endedAt: string
// Iteration context for loops and parallels
iterationCurrent?: number
iterationTotal?: number
Expand Down Expand Up @@ -167,6 +171,19 @@ export type ExecutionEvent =
| StreamChunkEvent
| StreamDoneEvent

/**
* Extracted data types for use in callbacks
*/
export type ExecutionStartedData = ExecutionStartedEvent['data']
export type ExecutionCompletedData = ExecutionCompletedEvent['data']
export type ExecutionErrorData = ExecutionErrorEvent['data']
export type ExecutionCancelledData = ExecutionCancelledEvent['data']
export type BlockStartedData = BlockStartedEvent['data']
export type BlockCompletedData = BlockCompletedEvent['data']
export type BlockErrorData = BlockErrorEvent['data']
export type StreamChunkData = StreamChunkEvent['data']
export type StreamDoneData = StreamDoneEvent['data']

/**
* Helper to create SSE formatted message
*/
Expand Down Expand Up @@ -235,7 +252,13 @@ export function createSSECallbacks(options: SSECallbackOptions) {
blockId: string,
blockName: string,
blockType: string,
callbackData: { input?: unknown; output: any; executionTime: number },
callbackData: {
input?: unknown
output: any
executionTime: number
startedAt: string
endedAt: string
},
iterationContext?: { iterationCurrent: number; iterationTotal: number; iterationType: string }
) => {
const hasError = callbackData.output?.error
Expand All @@ -260,6 +283,8 @@ export function createSSECallbacks(options: SSECallbackOptions) {
input: callbackData.input,
error: callbackData.output.error,
durationMs: callbackData.executionTime || 0,
startedAt: callbackData.startedAt,
endedAt: callbackData.endedAt,
...iterationData,
},
})
Expand All @@ -276,6 +301,8 @@ export function createSSECallbacks(options: SSECallbackOptions) {
input: callbackData.input,
output: callbackData.output,
durationMs: callbackData.executionTime || 0,
startedAt: callbackData.startedAt,
endedAt: callbackData.endedAt,
...iterationData,
},
})
Expand Down