Skip to main content

Cloudflare Workflows - Technical Deep Dive

This document provides an in-depth technical reference for the Cloudflare Workflows system in RankDisco, covering architecture patterns, execution flows, state management, and cost tracking.

See Also: workflows.md for high-level workflow summaries and diagrams.


Table of Contents

  1. Workflow Architecture
  2. Domain Onboard Workflow
  3. URL Classify Workflow
  4. Fan-out Pattern
  5. State Management
  6. Error Handling
  7. Cost Tracking
  8. Budget Checking

Workflow Architecture

RankDiscoWorkflow Base Class

All RankDisco workflows extend the RankDiscoWorkflow<Params> base class, which provides common utilities for orchestration, retry handling, state tracking, and cost management.

Location: /packages/api/src/workflows/base.ts

export abstract class RankDiscoWorkflow<Params = unknown> 
extends WorkflowEntrypoint<Env, Params> {

// Retry configurations
protected defaultRetries: RetryConfig; // 3 retries, exponential backoff
protected quickRetries: RetryConfig; // 2 retries, constant 1s delay
protected patientRetries: RetryConfig; // 4 retries, exponential from 10s

// Core methods
protected async fanOutToQueue<T, R>(...); // Queue coordination
protected async checkBudget(...); // Pre-operation budget check
protected async trackCost(...); // Post-operation cost tracking
protected async trackStart(...); // D1 workflow start
protected async trackComplete(...); // D1 workflow completion
protected async trackError(...); // D1 workflow error
}

Retry Configuration Presets

PresetLimitInitial DelayBackoffUse Case
quickRetries21 secondconstantD1 writes, cheap operations
defaultRetries35 secondsexponentialStandard operations
patientRetries410 secondsexponentialExternal APIs (DataForSEO)

Environment Bindings

The Env interface defines all required bindings:

export interface Env {
// Database
DB: D1Database;

// KV Namespaces
WORKFLOW_RESULTS: KVNamespace; // Fan-out result aggregation
DETECT_JS_KV: KVNamespace; // Fetch strategy caching
DFS_RUNS: KVNamespace; // DataForSEO run tracking
DFS_BUDGETS: KVNamespace; // Budget tracking
LOOKUP_CACHE?: KVNamespace; // API response caching

// Queues
QUEUE: Queue;
BACKLINK_CLASSIFY_QUEUE: Queue;
KEYWORD_CLASSIFY_QUEUE: Queue;
DOMAIN_CLASSIFY_QUEUE: Queue;

// Workflows
DOMAIN_ONBOARD_WORKFLOW: Workflow;
URL_CLASSIFY_WORKFLOW: Workflow;

// AI & Vectorize
AI: Ai;
VECTORIZE_DOMAINS: VectorizeIndex;
VECTORIZE_KEYWORDS: VectorizeIndex;

// External
DATAFORSEO_LOGIN: string;
DATAFORSEO_PASSWORD: string;
}

Workflow Lifecycle


Domain Onboard Workflow

The DomainOnboardWorkflow orchestrates complete domain intelligence gathering including backlinks, keywords, referring domains, and brand property discovery.

Location: /packages/api/src/workflows/domain-onboard-parallel.ts

Input Parameters

interface DomainOnboardParams {
domain_id: number; // Required: D1 domain ID
domain: string; // Required: Domain name
backlink_limit?: number; // Default: 100
keyword_limit?: number; // Default: 100
referring_domains_limit?: number; // Default: 100
skip_backlinks?: boolean; // Skip backlink fetch
skip_keywords?: boolean; // Skip keyword fetch
skip_referring_domains?: boolean; // Skip referring domains
skip_brand_properties?: boolean; // Skip brand discovery
skip_ranking_domain_properties?: boolean; // Skip competitor analysis
}

Execution Phases

The workflow executes in 3 distinct phases for optimal parallelization:

Step-by-Step Breakdown

StepNameDurationRetriesDescription
1ensure-domain~50ms2xCreate domain record if not exists
2initialize~50ms2xMark domain onboard_status = 'in_progress'
3parallel-fetch-all2-5s4xParallel DataForSEO API calls
4parallel-store-all1-3s3xBatch D1 inserts for all data
5track-all-queued~50ms2xUpdate domain queue counters
6fetch-ranking-url-spam-scores1-2s3xBulk spam score fetch
7discover-ranking-domain-properties2-5s2xBrand discovery for competitors
8finalize~50ms3xMark domain onboard_status = 'completed'
9queue-domain-classification~50ms2xQueue domain for classification

Pipeline Activity Tracking

The workflow emits real-time events for DAG visualization:

// WebSocket events for real-time UI
const emit = createPipelineEmitter(this.env, "default");
emit.started("dfs-backlinks", { domain, limit: backlink_limit });
emit.finished("dfs-backlinks", { count: backlinks.length });
emit.fallback("dfs-backlinks", { error: "API timeout" });

// KV-based activity for polling UI
const activity = createActivityTracker(this.env, `domain-onboard-${domain_id}`);
activity.start(DAG_NODE_IDS.DFS_BACKLINKS, { domain });
activity.complete(DAG_NODE_IDS.DFS_BACKLINKS, { count: 100 });
activity.queued(DAG_NODE_IDS.QUEUE_URL_CLASSIFY, 100);

Timing Characteristics

ScenarioDurationAPI CallsD1 Writes
Full onboard (100 each)8-15s4-5~400
Skip keywords5-10s3-4~200
Skip all optional3-5s2~100
Cache hit (referring domains)6-12s3-4~400

URL Classify Workflow

The UrlClassifyWorkflow implements a 4-stage classification pipeline optimized for cost, running free stages first and escalating to expensive stages only when needed.

Location: /packages/api/src/workflows/url-classify.ts

Input Parameters

interface UrlClassifyParams {
url: string; // Required: URL to classify
url_id?: number; // Optional: D1 URL ID
domain_id?: number; // Optional: Owner domain ID
max_rounds?: number; // Default: 3 (LLM retry limit)
skip_content?: boolean; // Skip content parsing stage
skip_llm?: boolean; // Skip LLM verification
force_llm?: boolean; // Always run LLM regardless of confidence
}

Classification Pipeline

Classification Thresholds

ThresholdValuePurpose
PIPELINE_MIN_CONFIDENCE70%Early exit threshold
LLM_VERIFICATION_THRESHOLD65%Below this triggers LLM
CACHE_MIN_CONFIDENCE60%Minimum for cache hits
HIGH_CONFIDENCE85%Triggers vectorize feedback

Stage Details

Stage 0: Cache Check

  • Queries D1 domains table for existing domain-level classification
  • Returns cached result if confidence >= 60%
  • Cost: FREE

Stage 1: Rules Engine

  • Pattern matching against 200+ URL rules
  • Handles known platforms (YouTube, GitHub, Medium, etc.)
  • Detects page types from URL structure
  • Cost: FREE
const rulesResult = rulesClassify({ url, domain });
// Returns: { domain_type, page_type, confidence, needs_vectorize }

Stage 2: Vectorize Similarity

  • Generates BGE embeddings via Workers AI
  • Queries VECTORIZE_DOMAINS index for similar URLs
  • Returns top 3 matches with confidence scores
  • Cost: ~$0.0001 per query
const vectorResult = await vectorizeClassify(
{ url, domain, partial_classification },
this.env
);

Stage 3: Content Parsing

  • Fetches page HTML with cascade strategy:
    1. Native fetch (FREE)
    2. ZenRows Basic ($0.001)
    3. ZenRows Premium ($0.01)
  • Extracts signals: title, h1, description, word count
  • Detects platform markers (pricing, login, affiliate links)
  • Cost: $0 - $0.01 depending on fetch method

Stage 4: LLM Verification

  • Uses Llama 3.3 70B via Workers AI
  • Structured prompt with taxonomy constraints
  • Retries up to max_rounds times with 30s backoff
  • Cost: ~$0.001 per call

Output Structure

interface ClassificationResult {
url: string;
url_id?: number;
confidence: number; // 0-100
domain_type?: string; // platform, commerce, etc.
page_type?: string; // article, product, etc.
tactic_type?: string; // seo, paid, etc.
channel_bucket?: string; // search, social, etc.
media_type?: string; // video, podcast, etc.
modifiers: string[]; // Additional tags
classification_source: string; // Last stage that ran
stages_run: string[]; // All stages executed
round: number; // LLM rounds used
cost_usd: number; // Total cost
fetch_method?: string; // How content was fetched
content_signals?: ContentSignals;
}

Fan-out Pattern

The fan-out pattern enables workflows to distribute work to queue consumers and wait for aggregated results.

Pattern Overview

Implementation

// Workflow side - fan out
protected async fanOutToQueue<T, R>(
step: WorkflowStep,
stepName: string,
queue: Queue,
messages: T[],
options: {
batchSize?: number; // Default: 100
waitTimeout?: WorkflowSleepDuration; // Default: 10 minutes
pollInterval?: WorkflowSleepDuration; // Default: 10 seconds
maxPolls?: number; // Default: 60
} = {},
): Promise<R[]>

Message Structure

Each message sent to the queue includes workflow metadata:

{
...originalMessage,
_workflow: {
id: "wf-1234567890", // Workflow instance ID
step: "classify-urls", // Step name
resultKey: "workflow:wf-1234567890:classify-urls",
index: 0 // Position in batch
}
}

Consumer Integration

Queue consumers detect workflow messages and signal results:

// In queue consumer
if (RankDiscoWorkflow.isWorkflowMessage(message.body)) {
const result = await processItem(message.body);
await RankDiscoWorkflow.signalResult(
env,
message.body._workflow,
result
);
}

Result Aggregation in KV

// KV structure
{
"expected": 100,
"received": 45,
"results": [result0, result1, ..., result44, undefined, ...],
"started_at": 1705123456789,
"updated_at": 1705123467890
}

State Management

Workflow state is persisted to D1 for observability and recovery.

Workflow Instances Table

CREATE TABLE workflow_instances (
id TEXT PRIMARY KEY, -- Workflow instance ID
workflow_type TEXT NOT NULL, -- e.g., "domain-onboard"
status TEXT NOT NULL, -- running, complete, errored
params TEXT, -- JSON input parameters
output TEXT, -- JSON result on completion
error TEXT, -- Error message if failed
started_at TEXT, -- ISO timestamp
completed_at TEXT, -- ISO timestamp
updated_at TEXT NOT NULL
);

State Transitions

Tracking Methods

// Called at workflow start
await this.trackStart(event, step, {
domain,
domain_id,
backlink_limit,
keyword_limit,
});

// Called on successful completion
await this.trackComplete(step, {
success: true,
metrics: { backlinks_fetched: 100, ... },
duration_ms: 8500
});

// Called on error
await this.trackError(step, "DataForSEO API timeout after 3 retries");

Domain-Specific State

For DomainOnboardWorkflow, additional state is tracked on the domain record:

-- Domain onboarding state
UPDATE domains SET
onboard_status = 'in_progress',
onboard_started_at = datetime('now'),
urls_queued_for_classification = 100,
keywords_queued_for_classification = 100,
classification_started_at = 1705123456789
WHERE id = ?;

-- On completion
UPDATE domains SET
onboard_status = 'completed',
onboard_completed_at = datetime('now'),
onboard_duration_ms = 8500
WHERE id = ?;

Error Handling

Retry Configuration

Each step can specify retry behavior:

await step.do(
"fetch-backlinks",
{
retries: {
limit: 5, // Max retry attempts
delay: "15 seconds", // Initial delay
backoff: "exponential" // constant | linear | exponential
},
timeout: "5 minutes" // Max step duration
},
async () => {
return await this.fetchBacklinksFromDataForSEO(domain, limit);
}
);

Backoff Calculation

Backoff TypeRetry 1Retry 2Retry 3Retry 4
constant15s15s15s15s
linear15s30s45s60s
exponential15s30s60s120s

Error Propagation

Graceful Degradation

The Domain Onboard workflow uses Promise.allSettled for parallel operations, allowing partial success:

const [backlinksResult, keywordsResult, brandResult] = 
await Promise.allSettled([
step.do("fetch-backlinks", ...),
step.do("fetch-keywords", ...),
step.do("fetch-brand-properties", ...),
]);

// Process each result independently
if (backlinksResult.status === "fulfilled") {
backlinks = backlinksResult.value;
} else {
errors.push(`Backlink fetch failed: ${backlinksResult.reason}`);
emit.fallback("dfs-backlinks", { error: backlinksResult.reason });
}

Error Tracking

const errors: string[] = [];

// Accumulate errors through workflow
errors.push(`Keyword fetch failed: ${error}`);

// Final status determination
const status = errors.length > 0 ? "completed_with_errors" : "completed";

// Store all errors
await this.trackError(step, errors.join("; "));

Cost Tracking

Cost Recording

Every external API call records its cost:

await trackCost(this.env, {
service: COST_SERVICES.DATAFORSEO_BACKLINKS,
cost_usd: 0.02,
run_id: this.instanceId,
success: true,
items_returned: 100,
source: "workflow"
});

Cost Services

Service ConstantTypical CostDescription
DATAFORSEO_BACKLINKS~$0.02Backlinks API call
DATAFORSEO_KEYWORDS~$0.05Ranked keywords API
DATAFORSEO_REFERRING_DOMAINS~$0.02Referring domains API
DATAFORSEO_SPAM_SCORE~$0.02/1000Bulk spam scores
CF_WORKERS_AI~$0.0001Embeddings/LLM
ZENROWS_BASIC$0.001JS rendering
ZENROWS_PREMIUM$0.01Premium proxy

Per-Step Cost Tracking

The URL Classify workflow tracks costs per stage:

// After vectorize stage
if (vectorResult.cost > 0) {
await this.trackApiCost(
step,
"vectorize",
COST_SERVICES.CF_WORKERS_AI,
vectorResult.cost
);
}

// After content fetch
await this.trackApiCost(
step,
"content",
contentResult.fetch_method === "zenrows_premium"
? COST_SERVICES.ZENROWS_PREMIUM
: COST_SERVICES.CF_NATIVE_FETCH,
contentResult.cost
);

API Costs Table

CREATE TABLE api_costs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
service TEXT NOT NULL,
cost REAL NOT NULL,
workflow_id TEXT,
workflow_type TEXT,
step_name TEXT,
success INTEGER,
metadata TEXT, -- JSON additional context
created_at TEXT NOT NULL
);

Cost Aggregation Queries

-- Cost by workflow instance
SELECT workflow_id, SUM(cost) as total_cost
FROM api_costs
WHERE workflow_id = ?
GROUP BY workflow_id;

-- Cost by service last 24 hours
SELECT service, SUM(cost) as total_cost, COUNT(*) as calls
FROM api_costs
WHERE created_at > datetime('now', '-1 day')
GROUP BY service
ORDER BY total_cost DESC;

-- Cost by workflow type
SELECT workflow_type, SUM(cost) as total_cost
FROM api_costs
WHERE workflow_type IS NOT NULL
GROUP BY workflow_type;

Budget Checking

Pre-Operation Budget Validation

Before expensive operations, workflows can check remaining budget:

const budget = await this.checkBudget(
step,
"fetch-backlinks",
"dataforseo",
maxCost: 1.00 // $1 max spend this hour
);

if (!budget.ok) {
throw new Error(`Budget exceeded: spent $${budget.spent}, max $1.00`);
}

// Proceed with operation
const backlinks = await this.fetchBacklinksFromDataForSEO(domain, limit);

Budget Check Implementation

protected async checkBudget(
step: WorkflowStep,
stepName: string,
service: string,
maxCost: number,
): Promise<{ ok: boolean; remaining: number; spent: number }> {
return await step.do(
`${stepName}-budget-check`,
{ retries: this.quickRetries },
async () => {
const result = await this.env.DB.prepare(`
SELECT COALESCE(SUM(cost), 0) as total_spent
FROM api_costs
WHERE workflow_id = ?
AND created_at > datetime('now', '-1 hour')
`).bind(this.instanceId).first();

const spent = result?.total_spent || 0;
const remaining = maxCost - spent;

return {
ok: remaining > 0,
remaining: Math.max(0, remaining),
spent,
};
},
);
}

Budget Enforcement Patterns

KV-Based Budgets

For cross-workflow budget limits, the DFS_BUDGETS KV namespace stores:

// Key: budget:dataforseo:daily
// Value:
{
"limit": 10.00,
"spent": 4.50,
"reset_at": "2025-01-21T00:00:00Z"
}

Appendix: Workflow Result Interface

All workflows return a standardized result:

interface WorkflowResult {
success: boolean;
metrics: Record<string, number>;
errors: string[];
error?: string; // Legacy single error
duration_ms?: number;
}

Domain Onboard Metrics

{
backlinks_fetched: 100,
backlinks_classified: 100,
referring_domains_fetched: 100,
keywords_fetched: 100,
keywords_classified: 100,
low_confidence_count: 5,
llm_verified_count: 5,
brand_properties_found: 3,
youtube_channels_resolved: 1,
social_profiles_resolved: 2,
ranking_domains_scraped: 15,
ranking_urls_spam_scores: 80,
duration_ms: 8500
}

URL Classify Metrics

{
confidence: 85,
stages_run: 3,
cost_usd: 0.0012,
duration_ms: 2500
}