Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,9 @@ export class RunEngineTriggerTaskService {

if (!queueSizeGuard.ok) {
throw new ServiceValidationError(
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`,
undefined,
"warn"
);
}
}
Expand Down
15 changes: 14 additions & 1 deletion apps/webapp/app/v3/scheduleEngine.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { ScheduleEngine } from "@internal/schedule-engine";
import type { TriggerScheduledTaskErrorType } from "@internal/schedule-engine";
import { stringifyIO } from "@trigger.dev/core/v3";
import { prisma } from "~/db.server";
import { env } from "~/env.server";
Expand All @@ -8,6 +9,7 @@ import { singleton } from "~/utils/singleton";
import { TriggerTaskService } from "./services/triggerTask.server";
import { meter, tracer } from "./tracer.server";
import { workerQueue } from "~/services/worker.server";
import { ServiceValidationError } from "./services/common.server";

export const scheduleEngine = singleton("ScheduleEngine", createScheduleEngine);

Expand Down Expand Up @@ -111,9 +113,20 @@ function createScheduleEngine() {

return { success: !!result };
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
let errorType: TriggerScheduledTaskErrorType = "SYSTEM_ERROR";

if (
error instanceof ServiceValidationError &&
errorMessage.includes("queue size limit for this environment has been reached")
) {
errorType = "QUEUE_LIMIT";
}

return {
success: false,
error: error instanceof Error ? error.message : String(error),
error: errorMessage,
errorType,
};
}
},
Expand Down
4 changes: 3 additions & 1 deletion apps/webapp/app/v3/services/batchTriggerV3.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,9 @@ export class BatchTriggerV3Service extends BaseService {

if (!queueSizeGuard.isWithinLimits) {
throw new ServiceValidationError(
`Cannot trigger ${newRunCount} tasks as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`
`Cannot trigger ${newRunCount} tasks as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`,
undefined,
"warn"
);
}

Expand Down
8 changes: 7 additions & 1 deletion apps/webapp/app/v3/services/common.server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
export type ServiceValidationErrorLevel = "error" | "warn" | "info";

export class ServiceValidationError extends Error {
constructor(message: string, public status?: number) {
constructor(
message: string,
public status?: number,
public logLevel?: ServiceValidationErrorLevel
) {
super(message);
this.name = "ServiceValidationError";
}
Expand Down
4 changes: 3 additions & 1 deletion apps/webapp/app/v3/services/triggerTaskV1.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ export class TriggerTaskServiceV1 extends BaseService {

if (!queueSizeGuard.isWithinLimits) {
throw new ServiceValidationError(
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`,
undefined,
"warn"
);
}
}
Expand Down
5 changes: 4 additions & 1 deletion internal-packages/run-engine/src/engine/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ export function runStatusFromError(
}
}

export type ServiceValidationErrorLevel = "error" | "warn" | "info";

export class ServiceValidationError extends Error {
constructor(
message: string,
public status?: number,
public metadata?: Record<string, unknown>
public metadata?: Record<string, unknown>,
public logLevel?: ServiceValidationErrorLevel
) {
super(message);
this.name = "ServiceValidationError";
Comment on lines 74 to 82
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🚩 Divergent ServiceValidationError constructor signatures between common.server.ts and errors.ts

The PR adds logLevel as the 3rd parameter in apps/webapp/app/v3/services/common.server.ts (message, status?, logLevel?) but as the 4th parameter in internal-packages/run-engine/src/engine/errors.ts (message, status?, metadata?, logLevel?). All callers modified in this PR use the common.server.ts version, so the current code is correct. However, this signature divergence is a maintenance risk — a developer creating a ServiceValidationError from the run-engine package with new ServiceValidationError(msg, undefined, "warn") would accidentally assign "warn" to metadata instead of logLevel. Consider aligning the signatures or renaming one class to prevent confusion.

(Refers to lines 74-84)

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Expand Down
25 changes: 18 additions & 7 deletions internal-packages/schedule-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -497,17 +497,28 @@ export class ScheduleEngine {

span.setAttribute("trigger_success", true);
} else {
this.logger.error("Failed to trigger scheduled task", {
instanceId: params.instanceId,
taskIdentifier: instance.taskSchedule.taskIdentifier,
durationMs: triggerDuration,
error: result.error,
});
const isQueueLimit = result.errorType === "QUEUE_LIMIT";

if (isQueueLimit) {
this.logger.warn("Scheduled task trigger skipped due to queue limit", {
instanceId: params.instanceId,
taskIdentifier: instance.taskSchedule.taskIdentifier,
durationMs: triggerDuration,
error: result.error,
});
} else {
this.logger.error("Failed to trigger scheduled task", {
instanceId: params.instanceId,
taskIdentifier: instance.taskSchedule.taskIdentifier,
durationMs: triggerDuration,
error: result.error,
});
}

this.scheduleExecutionFailureCounter.add(1, {
environment_type: environmentType,
schedule_type: scheduleType,
error_type: "task_failure",
error_type: isQueueLimit ? "queue_limit" : "task_failure",
});

span.setAttribute("trigger_success", false);
Expand Down
8 changes: 7 additions & 1 deletion internal-packages/schedule-engine/src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,14 @@ export type TriggerScheduledTaskParams = {
exactScheduleTime?: Date;
};

export type TriggerScheduledTaskErrorType = "QUEUE_LIMIT" | "SYSTEM_ERROR";

export interface TriggerScheduledTaskCallback {
(params: TriggerScheduledTaskParams): Promise<{ success: boolean; error?: string }>;
(params: TriggerScheduledTaskParams): Promise<{
success: boolean;
error?: string;
errorType?: TriggerScheduledTaskErrorType;
}>;
}

export interface ScheduleEngineOptions {
Expand Down
1 change: 1 addition & 0 deletions internal-packages/schedule-engine/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ export type {
ScheduleEngineOptions,
TriggerScheduleParams,
TriggerScheduledTaskCallback,
TriggerScheduledTaskErrorType,
} from "./engine/types.js";
23 changes: 17 additions & 6 deletions packages/redis-worker/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,8 @@ class Worker<TCatalog extends WorkerCatalog> {
const errorMessage = error instanceof Error ? error.message : String(error);

const shouldLogError = catalogItem.logErrors ?? true;
const errorLogLevel =
error && typeof error === "object" && "logLevel" in error ? error.logLevel : undefined;

const logAttributes = {
name: this.options.name,
Expand All @@ -906,10 +908,14 @@ class Worker<TCatalog extends WorkerCatalog> {
errorMessage,
};

if (shouldLogError) {
this.logger.error(`Worker error processing item`, logAttributes);
} else {
if (!shouldLogError) {
this.logger.info(`Worker failed to process item`, logAttributes);
} else if (errorLogLevel === "warn") {
this.logger.warn(`Worker error processing item`, logAttributes);
} else if (errorLogLevel === "info") {
this.logger.info(`Worker error processing item`, logAttributes);
} else {
this.logger.error(`Worker error processing item`, logAttributes);
}

// Attempt requeue logic.
Expand All @@ -922,13 +928,18 @@ class Worker<TCatalog extends WorkerCatalog> {
const retryDelay = calculateNextRetryDelay(retrySettings, newAttempt);

if (!retryDelay) {
if (shouldLogError) {
this.logger.error(`Worker item reached max attempts. Moving to DLQ.`, {
if (!shouldLogError || errorLogLevel === "info") {
this.logger.info(`Worker item reached max attempts. Moving to DLQ.`, {
...logAttributes,
attempt: newAttempt,
});
} else if (errorLogLevel === "warn") {
this.logger.warn(`Worker item reached max attempts. Moving to DLQ.`, {
...logAttributes,
attempt: newAttempt,
});
} else {
this.logger.info(`Worker item reached max attempts. Moving to DLQ.`, {
this.logger.error(`Worker item reached max attempts. Moving to DLQ.`, {
...logAttributes,
attempt: newAttempt,
});
Expand Down
Loading