Cloudflare Workflows - Technical Deep Dive
This document provides an in-depth technical reference for the Cloudflare Workflows system in RankDisco, covering architecture patterns, execution flows, state management, and cost tracking.
See Also: workflows.md for high-level workflow summaries and diagrams.
Table of Contents
- Workflow Architecture
- Domain Onboard Workflow
- URL Classify Workflow
- Fan-out Pattern
- State Management
- Error Handling
- Cost Tracking
- Budget Checking
Workflow Architecture
RankDiscoWorkflow Base Class
All RankDisco workflows extend the RankDiscoWorkflow<Params> base class, which provides common utilities for orchestration, retry handling, state tracking, and cost management.
Location: /packages/api/src/workflows/base.ts
export abstract class RankDiscoWorkflow<Params = unknown>
extends WorkflowEntrypoint<Env, Params> {
// Retry configurations
protected defaultRetries: RetryConfig; // 3 retries, exponential backoff
protected quickRetries: RetryConfig; // 2 retries, constant 1s delay
protected patientRetries: RetryConfig; // 4 retries, exponential from 10s
// Core methods
protected async fanOutToQueue<T, R>(...); // Queue coordination
protected async checkBudget(...); // Pre-operation budget check
protected async trackCost(...); // Post-operation cost tracking
protected async trackStart(...); // D1 workflow start
protected async trackComplete(...); // D1 workflow completion
protected async trackError(...); // D1 workflow error
}
Retry Configuration Presets
| Preset | Limit | Initial Delay | Backoff | Use Case |
|---|---|---|---|---|
quickRetries | 2 | 1 second | constant | D1 writes, cheap operations |
defaultRetries | 3 | 5 seconds | exponential | Standard operations |
patientRetries | 4 | 10 seconds | exponential | External APIs (DataForSEO) |
Environment Bindings
The Env interface defines all required bindings:
export interface Env {
// Database
DB: D1Database;
// KV Namespaces
WORKFLOW_RESULTS: KVNamespace; // Fan-out result aggregation
DETECT_JS_KV: KVNamespace; // Fetch strategy caching
DFS_RUNS: KVNamespace; // DataForSEO run tracking
DFS_BUDGETS: KVNamespace; // Budget tracking
LOOKUP_CACHE?: KVNamespace; // API response caching
// Queues
QUEUE: Queue;
BACKLINK_CLASSIFY_QUEUE: Queue;
KEYWORD_CLASSIFY_QUEUE: Queue;
DOMAIN_CLASSIFY_QUEUE: Queue;
// Workflows
DOMAIN_ONBOARD_WORKFLOW: Workflow;
URL_CLASSIFY_WORKFLOW: Workflow;
// AI & Vectorize
AI: Ai;
VECTORIZE_DOMAINS: VectorizeIndex;
VECTORIZE_KEYWORDS: VectorizeIndex;
// External
DATAFORSEO_LOGIN: string;
DATAFORSEO_PASSWORD: string;
}
Workflow Lifecycle
Domain Onboard Workflow
The DomainOnboardWorkflow orchestrates complete domain intelligence gathering including backlinks, keywords, referring domains, and brand property discovery.
Location: /packages/api/src/workflows/domain-onboard-parallel.ts
Input Parameters
interface DomainOnboardParams {
domain_id: number; // Required: D1 domain ID
domain: string; // Required: Domain name
backlink_limit?: number; // Default: 100
keyword_limit?: number; // Default: 100
referring_domains_limit?: number; // Default: 100
skip_backlinks?: boolean; // Skip backlink fetch
skip_keywords?: boolean; // Skip keyword fetch
skip_referring_domains?: boolean; // Skip referring domains
skip_brand_properties?: boolean; // Skip brand discovery
skip_ranking_domain_properties?: boolean; // Skip competitor analysis
}
Execution Phases
The workflow executes in 3 distinct phases for optimal parallelization:
Step-by-Step Breakdown
| Step | Name | Duration | Retries | Description |
|---|---|---|---|---|
| 1 | ensure-domain | ~50ms | 2x | Create domain record if not exists |
| 2 | initialize | ~50ms | 2x | Mark domain onboard_status = 'in_progress' |
| 3 | parallel-fetch-all | 2-5s | 4x | Parallel DataForSEO API calls |
| 4 | parallel-store-all | 1-3s | 3x | Batch D1 inserts for all data |
| 5 | track-all-queued | ~50ms | 2x | Update domain queue counters |
| 6 | fetch-ranking-url-spam-scores | 1-2s | 3x | Bulk spam score fetch |
| 7 | discover-ranking-domain-properties | 2-5s | 2x | Brand discovery for competitors |
| 8 | finalize | ~50ms | 3x | Mark domain onboard_status = 'completed' |
| 9 | queue-domain-classification | ~50ms | 2x | Queue domain for classification |
Pipeline Activity Tracking
The workflow emits real-time events for DAG visualization:
// WebSocket events for real-time UI
const emit = createPipelineEmitter(this.env, "default");
emit.started("dfs-backlinks", { domain, limit: backlink_limit });
emit.finished("dfs-backlinks", { count: backlinks.length });
emit.fallback("dfs-backlinks", { error: "API timeout" });
// KV-based activity for polling UI
const activity = createActivityTracker(this.env, `domain-onboard-${domain_id}`);
activity.start(DAG_NODE_IDS.DFS_BACKLINKS, { domain });
activity.complete(DAG_NODE_IDS.DFS_BACKLINKS, { count: 100 });
activity.queued(DAG_NODE_IDS.QUEUE_URL_CLASSIFY, 100);
Timing Characteristics
| Scenario | Duration | API Calls | D1 Writes |
|---|---|---|---|
| Full onboard (100 each) | 8-15s | 4-5 | ~400 |
| Skip keywords | 5-10s | 3-4 | ~200 |
| Skip all optional | 3-5s | 2 | ~100 |
| Cache hit (referring domains) | 6-12s | 3-4 | ~400 |
URL Classify Workflow
The UrlClassifyWorkflow implements a 4-stage classification pipeline optimized for cost, running free stages first and escalating to expensive stages only when needed.
Location: /packages/api/src/workflows/url-classify.ts
Input Parameters
interface UrlClassifyParams {
url: string; // Required: URL to classify
url_id?: number; // Optional: D1 URL ID
domain_id?: number; // Optional: Owner domain ID
max_rounds?: number; // Default: 3 (LLM retry limit)
skip_content?: boolean; // Skip content parsing stage
skip_llm?: boolean; // Skip LLM verification
force_llm?: boolean; // Always run LLM regardless of confidence
}
Classification Pipeline
Classification Thresholds
| Threshold | Value | Purpose |
|---|---|---|
PIPELINE_MIN_CONFIDENCE | 70% | Early exit threshold |
LLM_VERIFICATION_THRESHOLD | 65% | Below this triggers LLM |
CACHE_MIN_CONFIDENCE | 60% | Minimum for cache hits |
HIGH_CONFIDENCE | 85% | Triggers vectorize feedback |
Stage Details
Stage 0: Cache Check
- Queries D1
domainstable for existing domain-level classification - Returns cached result if confidence >= 60%
- Cost: FREE
Stage 1: Rules Engine
- Pattern matching against 200+ URL rules
- Handles known platforms (YouTube, GitHub, Medium, etc.)
- Detects page types from URL structure
- Cost: FREE
const rulesResult = rulesClassify({ url, domain });
// Returns: { domain_type, page_type, confidence, needs_vectorize }
Stage 2: Vectorize Similarity
- Generates BGE embeddings via Workers AI
- Queries
VECTORIZE_DOMAINSindex for similar URLs - Returns top 3 matches with confidence scores
- Cost: ~$0.0001 per query
const vectorResult = await vectorizeClassify(
{ url, domain, partial_classification },
this.env
);
Stage 3: Content Parsing
- Fetches page HTML with cascade strategy:
- Native fetch (FREE)
- ZenRows Basic ($0.001)
- ZenRows Premium ($0.01)
- Extracts signals: title, h1, description, word count
- Detects platform markers (pricing, login, affiliate links)
- Cost: $0 - $0.01 depending on fetch method
Stage 4: LLM Verification
- Uses Llama 3.3 70B via Workers AI
- Structured prompt with taxonomy constraints
- Retries up to
max_roundstimes with 30s backoff - Cost: ~$0.001 per call
Output Structure
interface ClassificationResult {
url: string;
url_id?: number;
confidence: number; // 0-100
domain_type?: string; // platform, commerce, etc.
page_type?: string; // article, product, etc.
tactic_type?: string; // seo, paid, etc.
channel_bucket?: string; // search, social, etc.
media_type?: string; // video, podcast, etc.
modifiers: string[]; // Additional tags
classification_source: string; // Last stage that ran
stages_run: string[]; // All stages executed
round: number; // LLM rounds used
cost_usd: number; // Total cost
fetch_method?: string; // How content was fetched
content_signals?: ContentSignals;
}
Fan-out Pattern
The fan-out pattern enables workflows to distribute work to queue consumers and wait for aggregated results.
Pattern Overview
Implementation
// Workflow side - fan out
protected async fanOutToQueue<T, R>(
step: WorkflowStep,
stepName: string,
queue: Queue,
messages: T[],
options: {
batchSize?: number; // Default: 100
waitTimeout?: WorkflowSleepDuration; // Default: 10 minutes
pollInterval?: WorkflowSleepDuration; // Default: 10 seconds
maxPolls?: number; // Default: 60
} = {},
): Promise<R[]>
Message Structure
Each message sent to the queue includes workflow metadata:
{
...originalMessage,
_workflow: {
id: "wf-1234567890", // Workflow instance ID
step: "classify-urls", // Step name
resultKey: "workflow:wf-1234567890:classify-urls",
index: 0 // Position in batch
}
}
Consumer Integration
Queue consumers detect workflow messages and signal results:
// In queue consumer
if (RankDiscoWorkflow.isWorkflowMessage(message.body)) {
const result = await processItem(message.body);
await RankDiscoWorkflow.signalResult(
env,
message.body._workflow,
result
);
}
Result Aggregation in KV
// KV structure
{
"expected": 100,
"received": 45,
"results": [result0, result1, ..., result44, undefined, ...],
"started_at": 1705123456789,
"updated_at": 1705123467890
}
State Management
Workflow state is persisted to D1 for observability and recovery.
Workflow Instances Table
CREATE TABLE workflow_instances (
id TEXT PRIMARY KEY, -- Workflow instance ID
workflow_type TEXT NOT NULL, -- e.g., "domain-onboard"
status TEXT NOT NULL, -- running, complete, errored
params TEXT, -- JSON input parameters
output TEXT, -- JSON result on completion
error TEXT, -- Error message if failed
started_at TEXT, -- ISO timestamp
completed_at TEXT, -- ISO timestamp
updated_at TEXT NOT NULL
);
State Transitions
Tracking Methods
// Called at workflow start
await this.trackStart(event, step, {
domain,
domain_id,
backlink_limit,
keyword_limit,
});
// Called on successful completion
await this.trackComplete(step, {
success: true,
metrics: { backlinks_fetched: 100, ... },
duration_ms: 8500
});
// Called on error
await this.trackError(step, "DataForSEO API timeout after 3 retries");
Domain-Specific State
For DomainOnboardWorkflow, additional state is tracked on the domain record:
-- Domain onboarding state
UPDATE domains SET
onboard_status = 'in_progress',
onboard_started_at = datetime('now'),
urls_queued_for_classification = 100,
keywords_queued_for_classification = 100,
classification_started_at = 1705123456789
WHERE id = ?;
-- On completion
UPDATE domains SET
onboard_status = 'completed',
onboard_completed_at = datetime('now'),
onboard_duration_ms = 8500
WHERE id = ?;
Error Handling
Retry Configuration
Each step can specify retry behavior:
await step.do(
"fetch-backlinks",
{
retries: {
limit: 5, // Max retry attempts
delay: "15 seconds", // Initial delay
backoff: "exponential" // constant | linear | exponential
},
timeout: "5 minutes" // Max step duration
},
async () => {
return await this.fetchBacklinksFromDataForSEO(domain, limit);
}
);
Backoff Calculation
| Backoff Type | Retry 1 | Retry 2 | Retry 3 | Retry 4 |
|---|---|---|---|---|
| constant | 15s | 15s | 15s | 15s |
| linear | 15s | 30s | 45s | 60s |
| exponential | 15s | 30s | 60s | 120s |
Error Propagation
Graceful Degradation
The Domain Onboard workflow uses Promise.allSettled for parallel operations, allowing partial success:
const [backlinksResult, keywordsResult, brandResult] =
await Promise.allSettled([
step.do("fetch-backlinks", ...),
step.do("fetch-keywords", ...),
step.do("fetch-brand-properties", ...),
]);
// Process each result independently
if (backlinksResult.status === "fulfilled") {
backlinks = backlinksResult.value;
} else {
errors.push(`Backlink fetch failed: ${backlinksResult.reason}`);
emit.fallback("dfs-backlinks", { error: backlinksResult.reason });
}
Error Tracking
const errors: string[] = [];
// Accumulate errors through workflow
errors.push(`Keyword fetch failed: ${error}`);
// Final status determination
const status = errors.length > 0 ? "completed_with_errors" : "completed";
// Store all errors
await this.trackError(step, errors.join("; "));
Cost Tracking
Cost Recording
Every external API call records its cost:
await trackCost(this.env, {
service: COST_SERVICES.DATAFORSEO_BACKLINKS,
cost_usd: 0.02,
run_id: this.instanceId,
success: true,
items_returned: 100,
source: "workflow"
});
Cost Services
| Service Constant | Typical Cost | Description |
|---|---|---|
DATAFORSEO_BACKLINKS | ~$0.02 | Backlinks API call |
DATAFORSEO_KEYWORDS | ~$0.05 | Ranked keywords API |
DATAFORSEO_REFERRING_DOMAINS | ~$0.02 | Referring domains API |
DATAFORSEO_SPAM_SCORE | ~$0.02/1000 | Bulk spam scores |
CF_WORKERS_AI | ~$0.0001 | Embeddings/LLM |
ZENROWS_BASIC | $0.001 | JS rendering |
ZENROWS_PREMIUM | $0.01 | Premium proxy |
Per-Step Cost Tracking
The URL Classify workflow tracks costs per stage:
// After vectorize stage
if (vectorResult.cost > 0) {
await this.trackApiCost(
step,
"vectorize",
COST_SERVICES.CF_WORKERS_AI,
vectorResult.cost
);
}
// After content fetch
await this.trackApiCost(
step,
"content",
contentResult.fetch_method === "zenrows_premium"
? COST_SERVICES.ZENROWS_PREMIUM
: COST_SERVICES.CF_NATIVE_FETCH,
contentResult.cost
);
API Costs Table
CREATE TABLE api_costs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
service TEXT NOT NULL,
cost REAL NOT NULL,
workflow_id TEXT,
workflow_type TEXT,
step_name TEXT,
success INTEGER,
metadata TEXT, -- JSON additional context
created_at TEXT NOT NULL
);
Cost Aggregation Queries
-- Cost by workflow instance
SELECT workflow_id, SUM(cost) as total_cost
FROM api_costs
WHERE workflow_id = ?
GROUP BY workflow_id;
-- Cost by service last 24 hours
SELECT service, SUM(cost) as total_cost, COUNT(*) as calls
FROM api_costs
WHERE created_at > datetime('now', '-1 day')
GROUP BY service
ORDER BY total_cost DESC;
-- Cost by workflow type
SELECT workflow_type, SUM(cost) as total_cost
FROM api_costs
WHERE workflow_type IS NOT NULL
GROUP BY workflow_type;
Budget Checking
Pre-Operation Budget Validation
Before expensive operations, workflows can check remaining budget:
const budget = await this.checkBudget(
step,
"fetch-backlinks",
"dataforseo",
maxCost: 1.00 // $1 max spend this hour
);
if (!budget.ok) {
throw new Error(`Budget exceeded: spent $${budget.spent}, max $1.00`);
}
// Proceed with operation
const backlinks = await this.fetchBacklinksFromDataForSEO(domain, limit);
Budget Check Implementation
protected async checkBudget(
step: WorkflowStep,
stepName: string,
service: string,
maxCost: number,
): Promise<{ ok: boolean; remaining: number; spent: number }> {
return await step.do(
`${stepName}-budget-check`,
{ retries: this.quickRetries },
async () => {
const result = await this.env.DB.prepare(`
SELECT COALESCE(SUM(cost), 0) as total_spent
FROM api_costs
WHERE workflow_id = ?
AND created_at > datetime('now', '-1 hour')
`).bind(this.instanceId).first();
const spent = result?.total_spent || 0;
const remaining = maxCost - spent;
return {
ok: remaining > 0,
remaining: Math.max(0, remaining),
spent,
};
},
);
}
Budget Enforcement Patterns
KV-Based Budgets
For cross-workflow budget limits, the DFS_BUDGETS KV namespace stores:
// Key: budget:dataforseo:daily
// Value:
{
"limit": 10.00,
"spent": 4.50,
"reset_at": "2025-01-21T00:00:00Z"
}
Appendix: Workflow Result Interface
All workflows return a standardized result:
interface WorkflowResult {
success: boolean;
metrics: Record<string, number>;
errors: string[];
error?: string; // Legacy single error
duration_ms?: number;
}
Domain Onboard Metrics
{
backlinks_fetched: 100,
backlinks_classified: 100,
referring_domains_fetched: 100,
keywords_fetched: 100,
keywords_classified: 100,
low_confidence_count: 5,
llm_verified_count: 5,
brand_properties_found: 3,
youtube_channels_resolved: 1,
social_profiles_resolved: 2,
ranking_domains_scraped: 15,
ranking_urls_spam_scores: 80,
duration_ms: 8500
}
URL Classify Metrics
{
confidence: 85,
stages_run: 3,
cost_usd: 0.0012,
duration_ms: 2500
}