diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index 2cc849e78de..05b26d0cbe0 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -263,7 +263,9 @@ export class RunEngineTriggerTaskService { if (!queueSizeGuard.ok) { throw new ServiceValidationError( - `Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}` + `Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`, + undefined, + "warn" ); } } diff --git a/apps/webapp/app/v3/scheduleEngine.server.ts b/apps/webapp/app/v3/scheduleEngine.server.ts index ef3cabe64df..15b07423fc5 100644 --- a/apps/webapp/app/v3/scheduleEngine.server.ts +++ b/apps/webapp/app/v3/scheduleEngine.server.ts @@ -1,4 +1,5 @@ import { ScheduleEngine } from "@internal/schedule-engine"; +import type { TriggerScheduledTaskErrorType } from "@internal/schedule-engine"; import { stringifyIO } from "@trigger.dev/core/v3"; import { prisma } from "~/db.server"; import { env } from "~/env.server"; @@ -8,6 +9,7 @@ import { singleton } from "~/utils/singleton"; import { TriggerTaskService } from "./services/triggerTask.server"; import { meter, tracer } from "./tracer.server"; import { workerQueue } from "~/services/worker.server"; +import { ServiceValidationError } from "./services/common.server"; export const scheduleEngine = singleton("ScheduleEngine", createScheduleEngine); @@ -111,9 +113,20 @@ function createScheduleEngine() { return { success: !!result }; } catch (error) { + const errorMessage = error instanceof Error ? error.message : String(error); + let errorType: TriggerScheduledTaskErrorType = "SYSTEM_ERROR"; + + if ( + error instanceof ServiceValidationError && + errorMessage.includes("queue size limit for this environment has been reached") + ) { + errorType = "QUEUE_LIMIT"; + } + return { success: false, - error: error instanceof Error ? error.message : String(error), + error: errorMessage, + errorType, }; } }, diff --git a/apps/webapp/app/v3/services/batchTriggerV3.server.ts b/apps/webapp/app/v3/services/batchTriggerV3.server.ts index e4bc583b7cc..3674157140a 100644 --- a/apps/webapp/app/v3/services/batchTriggerV3.server.ts +++ b/apps/webapp/app/v3/services/batchTriggerV3.server.ts @@ -249,7 +249,9 @@ export class BatchTriggerV3Service extends BaseService { if (!queueSizeGuard.isWithinLimits) { throw new ServiceValidationError( - `Cannot trigger ${newRunCount} tasks as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}` + `Cannot trigger ${newRunCount} tasks as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`, + undefined, + "warn" ); } diff --git a/apps/webapp/app/v3/services/common.server.ts b/apps/webapp/app/v3/services/common.server.ts index 31f401e00e5..015e8e23d53 100644 --- a/apps/webapp/app/v3/services/common.server.ts +++ b/apps/webapp/app/v3/services/common.server.ts @@ -1,5 +1,11 @@ +export type ServiceValidationErrorLevel = "error" | "warn" | "info"; + export class ServiceValidationError extends Error { - constructor(message: string, public status?: number) { + constructor( + message: string, + public status?: number, + public logLevel?: ServiceValidationErrorLevel + ) { super(message); this.name = "ServiceValidationError"; } diff --git a/apps/webapp/app/v3/services/triggerTaskV1.server.ts b/apps/webapp/app/v3/services/triggerTaskV1.server.ts index efc6510ef3c..d36130109c9 100644 --- a/apps/webapp/app/v3/services/triggerTaskV1.server.ts +++ b/apps/webapp/app/v3/services/triggerTaskV1.server.ts @@ -134,7 +134,9 @@ export class TriggerTaskServiceV1 extends BaseService { if (!queueSizeGuard.isWithinLimits) { throw new ServiceValidationError( - `Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}` + `Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`, + undefined, + "warn" ); } } diff --git a/internal-packages/run-engine/src/engine/errors.ts b/internal-packages/run-engine/src/engine/errors.ts index 772282debd1..820f0ec4ce6 100644 --- a/internal-packages/run-engine/src/engine/errors.ts +++ b/internal-packages/run-engine/src/engine/errors.ts @@ -69,11 +69,14 @@ export function runStatusFromError( } } +export type ServiceValidationErrorLevel = "error" | "warn" | "info"; + export class ServiceValidationError extends Error { constructor( message: string, public status?: number, - public metadata?: Record + public metadata?: Record, + public logLevel?: ServiceValidationErrorLevel ) { super(message); this.name = "ServiceValidationError"; diff --git a/internal-packages/schedule-engine/src/engine/index.ts b/internal-packages/schedule-engine/src/engine/index.ts index 47dc1e31e19..4eb641176b2 100644 --- a/internal-packages/schedule-engine/src/engine/index.ts +++ b/internal-packages/schedule-engine/src/engine/index.ts @@ -497,17 +497,28 @@ export class ScheduleEngine { span.setAttribute("trigger_success", true); } else { - this.logger.error("Failed to trigger scheduled task", { - instanceId: params.instanceId, - taskIdentifier: instance.taskSchedule.taskIdentifier, - durationMs: triggerDuration, - error: result.error, - }); + const isQueueLimit = result.errorType === "QUEUE_LIMIT"; + + if (isQueueLimit) { + this.logger.warn("Scheduled task trigger skipped due to queue limit", { + instanceId: params.instanceId, + taskIdentifier: instance.taskSchedule.taskIdentifier, + durationMs: triggerDuration, + error: result.error, + }); + } else { + this.logger.error("Failed to trigger scheduled task", { + instanceId: params.instanceId, + taskIdentifier: instance.taskSchedule.taskIdentifier, + durationMs: triggerDuration, + error: result.error, + }); + } this.scheduleExecutionFailureCounter.add(1, { environment_type: environmentType, schedule_type: scheduleType, - error_type: "task_failure", + error_type: isQueueLimit ? "queue_limit" : "task_failure", }); span.setAttribute("trigger_success", false); diff --git a/internal-packages/schedule-engine/src/engine/types.ts b/internal-packages/schedule-engine/src/engine/types.ts index 8cad80c3033..f4888c447ae 100644 --- a/internal-packages/schedule-engine/src/engine/types.ts +++ b/internal-packages/schedule-engine/src/engine/types.ts @@ -24,8 +24,14 @@ export type TriggerScheduledTaskParams = { exactScheduleTime?: Date; }; +export type TriggerScheduledTaskErrorType = "QUEUE_LIMIT" | "SYSTEM_ERROR"; + export interface TriggerScheduledTaskCallback { - (params: TriggerScheduledTaskParams): Promise<{ success: boolean; error?: string }>; + (params: TriggerScheduledTaskParams): Promise<{ + success: boolean; + error?: string; + errorType?: TriggerScheduledTaskErrorType; + }>; } export interface ScheduleEngineOptions { diff --git a/internal-packages/schedule-engine/src/index.ts b/internal-packages/schedule-engine/src/index.ts index 22f6c07b30c..6c96f2cd54b 100644 --- a/internal-packages/schedule-engine/src/index.ts +++ b/internal-packages/schedule-engine/src/index.ts @@ -3,4 +3,5 @@ export type { ScheduleEngineOptions, TriggerScheduleParams, TriggerScheduledTaskCallback, + TriggerScheduledTaskErrorType, } from "./engine/types.js"; diff --git a/packages/redis-worker/src/worker.ts b/packages/redis-worker/src/worker.ts index 203b6a8a785..8ee85dd4ad8 100644 --- a/packages/redis-worker/src/worker.ts +++ b/packages/redis-worker/src/worker.ts @@ -895,6 +895,8 @@ class Worker { const errorMessage = error instanceof Error ? error.message : String(error); const shouldLogError = catalogItem.logErrors ?? true; + const errorLogLevel = + error && typeof error === "object" && "logLevel" in error ? error.logLevel : undefined; const logAttributes = { name: this.options.name, @@ -906,10 +908,14 @@ class Worker { errorMessage, }; - if (shouldLogError) { - this.logger.error(`Worker error processing item`, logAttributes); - } else { + if (!shouldLogError) { this.logger.info(`Worker failed to process item`, logAttributes); + } else if (errorLogLevel === "warn") { + this.logger.warn(`Worker error processing item`, logAttributes); + } else if (errorLogLevel === "info") { + this.logger.info(`Worker error processing item`, logAttributes); + } else { + this.logger.error(`Worker error processing item`, logAttributes); } // Attempt requeue logic. @@ -922,13 +928,18 @@ class Worker { const retryDelay = calculateNextRetryDelay(retrySettings, newAttempt); if (!retryDelay) { - if (shouldLogError) { - this.logger.error(`Worker item reached max attempts. Moving to DLQ.`, { + if (!shouldLogError || errorLogLevel === "info") { + this.logger.info(`Worker item reached max attempts. Moving to DLQ.`, { + ...logAttributes, + attempt: newAttempt, + }); + } else if (errorLogLevel === "warn") { + this.logger.warn(`Worker item reached max attempts. Moving to DLQ.`, { ...logAttributes, attempt: newAttempt, }); } else { - this.logger.info(`Worker item reached max attempts. Moving to DLQ.`, { + this.logger.error(`Worker item reached max attempts. Moving to DLQ.`, { ...logAttributes, attempt: newAttempt, });