Queue Systems Architecture
RankDisco uses Cloudflare Queues for reliable, distributed background processing. This document covers all queue consumers, their configurations, message schemas, and operational considerations.
Queue Architecture Overview
Queue Bindings Summary
| Queue Name | Binding | Consumer | Status | DLQ |
|---|---|---|---|---|
domain-onboard | DOMAIN_ONBOARD_QUEUE | domain-onboard-consumer | ENABLED | Yes |
domain-classify | DOMAIN_CLASSIFY_QUEUE | domain-classify-consumer | ENABLED | Yes |
url-classify | URL_CLASSIFY_QUEUE | url-classify-consumer | ENABLED | Yes |
keyword-classify | KEYWORD_CLASSIFY_QUEUE | keyword-classify-consumer | ENABLED | Yes |
spam-score | SPAM_SCORE_QUEUE | spam-score-consumer | ENABLED | Yes |
app-info-fetch | APP_INFO_QUEUE | app-details-consumer | DRAINING | No |
app-keyword-search | APP_KEYWORD_QUEUE | app-keyword-consumer | DISABLED | No |
shelf-deep-crawl | SHELF_CRAWL_QUEUE | shelf-crawl-consumer | DRAINING | No |
social-scrape | SOCIAL_SCRAPE_QUEUE | social-scrape-consumer | DISABLED | No |
rankfabric-tasks-v2 | QUEUE | app-crawl-consumer | DRAINING | Yes |
rankfabric-dlq | DLQ | (manual processing) | ENABLED | - |
Consumer Reference
1. Domain Onboard Consumer
File: /packages/api/src/queue/domain-onboard-consumer.js
Purpose: Orchestrates customer domain onboarding by fetching backlinks, keywords, and domain summary from DataForSEO.
Queue Configuration
[[queues.consumers]]
queue = "domain-onboard"
max_batch_size = 1
max_batch_timeout = 60
max_retries = 3
max_concurrency = 2
dead_letter_queue = "rankfabric-dlq"
Message Types
fetch_backlinks
{
type: "fetch_backlinks",
domain_id: number,
domain: string,
limit?: number, // Default: 1000
queue_classification?: boolean // Default: true
}
fetch_keywords
{
type: "fetch_keywords",
domain_id: number,
domain: string,
limit?: number, // Default: 1000
fetch_all?: boolean, // Default: false
location_code?: number, // Default: 2840 (US)
language_code?: string, // Default: "en"
store?: boolean // Default: true
}
fetch_summary
{
type: "fetch_summary",
domain_id: number,
domain: string
}
fetch_properties
{
type: "fetch_properties",
domain_id: number,
domain: string
}
Processing Logic
- Validates DRAIN_MODE flag in KV cache
- Routes to appropriate handler based on
type - Calls DataForSEO APIs (backlinks/live, ranked_keywords, backlinks/summary)
- Stores results in D1 using optimized batch operations
- Queues URLs for classification via
BACKLINK_CLASSIFY_QUEUE - Checks if all onboarding jobs are complete, then runs local business detection
DRAIN_MODE Support
Yes - checks LOOKUP_CACHE.get("DRAIN_MODE") and delays messages by 60 seconds if active.
2. Domain Classify Consumer
File: /packages/api/src/queue/domain-classify-consumer.js
Purpose: Classifies domains through a 7-stage pipeline to determine domain_type, tier1_type, channel_bucket, and quality_tier.
Queue Configuration
[[queues.consumers]]
queue = "domain-classify"
max_batch_size = 20
max_batch_timeout = 30
max_retries = 3
max_concurrency = 20
dead_letter_queue = "rankfabric-dlq"
Message Types
classify_domain
{
type: "classify_domain",
domain: string,
domain_id?: number,
domain_rank?: number,
force?: boolean, // Skip cache check
min_confidence?: number, // Default: 70
source?: string, // e.g., "backlink", "api"
retry_count?: number // Auto-incremented on retry
}
classify_domain_batch
{
type: "classify_domain_batch",
domains: string[],
force?: boolean,
min_confidence?: number,
source?: string
}
Classification Pipeline Stages
- Cache Check - Check if domain is already classified
- Rules Engine - Known domains, TLDs, platform patterns 1.5. Google Ads Categories - Use cached category data
- Vectorize - Similarity to known classified domains
- Low-Noise Crawl - HEAD + partial GET for metadata
- Instant Pages - Fetch homepage, detect platform via DataForSEO
- LLM Fallback - Only if still uncertain
Retry Logic
- Exponential backoff: 5s, 10s, 20s (based on retry_count)
- After
MAX_QUEUE_RETRIES(3), marks domain as failed in DB - Failed domains have
classification_source = 'failed'
DRAIN_MODE Support
Yes - delays messages by 60 seconds if active.
Workflow Integration
Integrates with DOMAIN_CLASSIFY_WORKFLOW for DAG visualization and activity tracking. Falls back to direct classification if workflow unavailable.
3. URL Classify Consumer
File: /packages/api/src/queue/url-classify-consumer.js
Purpose: Classifies URLs through a 5-stage pipeline. Handles backlinks, source URLs, and target URLs.
Queue Configuration
[[queues.consumers]]
queue = "url-classify"
max_batch_size = 10
max_batch_timeout = 5
max_retries = 3
max_concurrency = 50
dead_letter_queue = "rankfabric-dlq"
Message Types
classify_url
{
type: "classify_url",
url_id?: number,
url: string,
domain: string,
domain_rank?: number,
target_domain?: string,
min_confidence?: number, // Default: 70
serp_title?: string, // From ranked keywords API
serp_description?: string,
skip_content?: boolean, // Skip page fetch if SERP data available
owner_domain_id?: number // For progress tracking
}
classify_backlink
{
type: "classify_backlink",
backlink_id: number,
source_url_id: number,
source_url: string,
source_domain: string,
target_url_id?: number,
target_url: string,
target_domain: string,
domain_rank?: number,
min_confidence?: number
}
classify_target_url
{
type: "classify_target_url",
url_id: number,
url: string,
domain: string
}
classify_domain_batch
{
type: "classify_domain_batch",
domain: string,
domain_rank?: number,
urls: Array<{
url_id: number,
url: string,
anchor_text?: string,
text_pre?: string,
text_post?: string,
page_title?: string,
dfs_platform_type?: string,
backlink_spam_score?: number
}>
}
classifier_eval_url / classifier_eval_domain
// For test harness
{
type: "classifier_eval_url",
run_id: string,
url: string,
domain: string,
expected_page_type: string,
expected_page_type_category?: string,
skip_llm?: boolean,
skip_vectorize?: boolean
}
Processing Logic
- Parallel Processing - All messages in batch processed concurrently via
Promise.allSettled - Domain Diversity Reordering - Messages reordered to spread load across domains
- KV Cache Check - 24-hour TTL cache prevents duplicate work
- Classification Pipeline - cache -> rules -> vectorize -> content fetch -> LLM
- Learning Loop - High-confidence results fed back to Vectorize (fire-and-forget)
- Batched Progress Updates - Single D1 write per owner domain per batch
Domain Inheritance Rule
tier1_type and domain_type ALWAYS come from the domain, never URL classification. URLs only classify: page_type, tactic_type, channel_bucket, media_type, ownership_type.
DRAIN_MODE Support
Yes - delays messages by 60 seconds if active.
4. Keyword Classify Consumer
File: /packages/api/src/queue/keyword-classify-consumer.js
Purpose: Classifies keywords through a 4-stage pipeline for journey_moment, intent_type, funnel_stage, persona, and other dimensions.
Queue Configuration
[[queues.consumers]]
queue = "keyword-classify"
max_batch_size = 20
max_batch_timeout = 5
max_retries = 3
max_concurrency = 50
dead_letter_queue = "rankfabric-dlq"
Message Types
classify_keyword
{
type: "classify_keyword",
keyword_id: string,
keyword: string,
keyword_info?: {
search_volume?: number,
cpc?: number,
competition?: number
},
search_intent_info?: {
main_intent?: string
},
keyword_properties?: object,
skip_llm?: boolean,
owner_domain_id?: number // For progress tracking
}
classify_keyword_batch
{
type: "classify_keyword_batch",
keywords: Array<{
keyword_id: string,
keyword: string,
keyword_info?: object,
search_intent_info?: object,
keyword_properties?: object
}>
}
reclassify_unclassified
{
type: "reclassify_unclassified",
limit?: number, // Default: 100
min_volume?: number, // Default: 0
skip_llm?: boolean,
skip_vectorize?: boolean
}
reclassify_journey
{
type: "reclassify_journey",
keyword_id: string,
keyword: string,
search_intent_info?: object
}
Classification Pipeline Stages
- Rules Engine (free) - Pattern matching, DataForSEO signals
- Vectorize (cheap) - Similarity to labeled examples
- Brand Detection (free) - Check against known brands
- LLM (expensive) - Semantic dimension classification
Processing Logic
- Pre-loads brand names - Single DB query at batch start
- Parallel Processing - All messages processed concurrently
- Per-dimension tracking - Confidence tracked per classification dimension
- Progress counters - Updates
keywords_classifiedon owner domain
DRAIN_MODE Support
Yes - delays messages by 60 seconds if active.
5. Spam Score Consumer
File: /packages/api/src/lib/dataforseo/dataforseo-spam-score.js (processSpamScoreBatch function)
Purpose: Fetches spam scores for URLs in bulk from DataForSEO.
Queue Configuration
[[queues.consumers]]
queue = "spam-score"
max_batch_size = 1
max_batch_timeout = 30
max_retries = 3
max_concurrency = 10
dead_letter_queue = "rankfabric-dlq"
Message Schema
{
type: "spam_score_batch",
url_ids: number[] // Up to 500 URL IDs per message
}
Processing Logic
- Fetches URL records from D1 using IDs (chunked to avoid 100-variable limit)
- Calls DataForSEO
bulk_spam_score/liveendpoint (up to 1000 targets) - Updates
backlinks_spam_scoreandspam_tieron urls table - Cost: ~$0.02 per 1000 targets
6. App Details Consumer
File: /packages/api/src/queue/app-details-consumer.js
Purpose: Fetches and enriches app metadata from iTunes API, HTML scraping, and DataForSEO.
Queue Configuration
# DRAINING - disabled for maintenance
# [[queues.consumers]]
# queue = "app-info-fetch"
# max_batch_size = 1
# max_batch_timeout = 5
# max_retries = 2
# max_concurrency = 3
Message Schema
{
type: "fetch_app_details",
app_id: string,
platform: "apple" | "google" | "google_play",
project_id?: string,
run_id?: string,
crawl_depth?: number, // 0 = don't process similar apps
force?: boolean
}
Apple Enrichment Strategy (Single Pass)
- HTML Scrape (FREE) - Direct fetch to apps.apple.com with desktop Safari UA
- iTunes API - Via ZenRows proxy (Apple blocks CF Workers)
- DataForSEO Fallback - Only if both above fail
Data Sources Priority
primary_category: HTML scrape (authoritative)rating,rating_count,release_date: iTunes API (authoritative)similar_apps,more_apps_by_developer: HTML scrape
Processing Logic
- Validates project if
project_idprovided - Checks data freshness (7-day threshold)
- Fetches via HTML scrape -> iTunes -> DataForSEO fallback
- Creates brand record via
ensureBrand() - Uploads icons to Cloudflare Images
- Backfills
app_category_rankingswith enrichment data - Queues similar apps with
crawl_depth=0(no recursion)
7. App Keyword Consumer
File: /packages/api/src/queue/app-keyword-consumer.js
Purpose: Processes app store keyword searches for ASO tracking.
Queue Configuration
# TEMPORARILY DISABLED
# [[queues.consumers]]
# queue = "app-keyword-search"
# max_batch_size = 5
# max_batch_timeout = 30
# max_retries = 2
# max_concurrency = 10
Message Schema
{
task_id?: string,
keyword_id: string,
keyword_text: string,
platform: "apple" | "google_play",
source?: "scraper" | "dataforseo",
location_code?: number,
language_code?: string,
depth?: number,
project_id?: string
}
Processing Logic
- Apple: Uses scraper first, DataForSEO fallback
- Google Play: DataForSEO only (scraper not implemented)
- Polls DataForSEO for results (5s intervals, 20 max attempts)
- Batch ensures apps exist before storing rankings
- Queues missing apps for detail fetching
8. Shelf Crawl Consumer
File: /packages/api/src/queue/shelf-crawl-consumer.js
Purpose: Processes discovered shelf/room/story URLs for complete app extraction.
Queue Configuration
# DRAINING - disabled for maintenance
# [[queues.consumers]]
# queue = "shelf-deep-crawl"
# max_batch_size = 5
# max_batch_timeout = 30
# max_retries = 2
# max_concurrency = 10
Message Schema
{
url: string,
category_id: string,
parent_category: string,
shelf_name: string,
shelf_type: "room" | "story" | "chart" | "hero" | "inline",
shelf_id: string,
shelf_position?: number,
shelf_count?: number,
absolute_rank_start?: number,
absolute_rank_end?: number,
priority?: string,
run_id?: string
}
Processing Logic
- Routes to appropriate parser (story, room, or shelf)
- Saves rankings to
app_category_rankingswith absolute rank - Queues each app for detail fetching via
APP_INFO_QUEUE - Updates
last_crawl_tson category - 3-second jittered delay between shelf crawls for rate limiting
9. Social Scrape Consumer
File: /packages/api/src/queue/social-scrape-consumer.js
Purpose: Scrapes social accounts from developer domain homepages.
Queue Configuration
# DISABLED
# [[queues.consumers]]
# queue = "social-scrape"
# max_batch_size = 10
# max_batch_timeout = 60
# max_retries = 1
# max_concurrency = 5
Message Schema
{
domain: string,
domainId: number,
developerId?: string,
platform?: string
}
Error Handling
- Permanent errors (404, 403, 401, 400): Logged and acknowledged
- Transient errors (timeout, 5xx, rate limits): Retried with exponential backoff (10s, 20s, 40s)
- Max 2 retries before giving up
10. App Crawl Consumer
File: /packages/api/src/queue/app-crawl-consumer.js
Purpose: Handles category crawls and app detail fetches for the main task queue.
Queue Configuration
# DRAINING - disabled for maintenance
# [[queues.consumers]]
# queue = "rankfabric-tasks-v2"
# max_batch_size = 1
# max_batch_timeout = 30
# max_retries = 3
# max_concurrency = 5
# dead_letter_queue = "rankfabric-dlq"
Message Types
crawl_category
{
type: "crawl_category",
category_id: string,
device?: string,
platform?: string,
skip_app_details?: boolean,
chart_type?: string,
crawl_run_id?: string,
project_id?: string,
related_app_depth?: number,
location_code?: number,
language_code?: string,
skip_category_discovery?: boolean
}
fetch_app / app_rankings
{
type: "fetch_app" | "app_rankings",
app_id: string,
device?: string,
platform?: string
}
Processing Logic
- Google Play: Uses DataForSEO
app_listings/search(LIVE, instant) - Apple groupings: Uses custom scraper (
handleAppleGrouping) - Apple rooms/charts/stories: Routes to appropriate handler
- Uses D1 batch operations for efficiency (chunks of 50)
- Queues app detail fetches for stale apps (>7 days)
11. SERP Queue Consumer
File: /packages/api/src/queue/serp-queue-consumer.js
Purpose: Processes user-initiated SERP tracking checks.
Message Schema
{
type: "serp_check",
project_id: string,
keyword_text: string,
location_code?: number,
language_code?: string,
device?: string,
target_domain?: string
}
12. SERP Consumer (Full)
File: /packages/api/src/queue/serp-consumer.js
Purpose: Full SERP tracking with organic + local pack, storing detailed position data.
Message Schema
{
type: string,
keyword_id: string,
keyword: string,
location_code: number,
language_code?: string,
device?: string,
project_id?: string,
channel?: "organic" | "bing_local_pack",
depth?: number,
asset_id?: string // For multi-location tracking
}
Processing Logic
- Validates project if provided
- Resolves location code from asset if
asset_idpresent - Calls DataForSEO SERP API (Google organic or Bing local pack)
- Uses batch operations for storing positions (~10 D1 batches total)
- Stores organic positions, local pack positions, related searches, refinement chips
- Tracks position changes from previous day
Dead Letter Queue
Configuration
[[queues.producers]]
binding = "DLQ"
queue = "rankfabric-dlq"
Queues with DLQ
domain-onboarddomain-classifyurl-classifykeyword-classifyspam-scorerankfabric-tasks-v2
Processing Failed Messages
Failed messages land in the DLQ after exceeding max_retries. To process them:
- Inspect DLQ contents via Cloudflare Dashboard or API
- Identify failure patterns - check
classification_failurestable - Fix root cause - API limits, schema issues, etc.
- Replay messages - Move from DLQ back to source queue
-- Check recent classification failures
SELECT failure_type, error_message, COUNT(*) as count
FROM classification_failures
WHERE created_at > strftime('%s', 'now', '-1 day') * 1000
GROUP BY failure_type, error_message
ORDER BY count DESC
LIMIT 20;
Idempotency
URL Classification
- KV Cache: 24-hour TTL cache keyed by
url-class:{url} - Prevents duplicate classification work across worker instances
- Checks confidence meets threshold before using cached result
Domain Classification
- D1 Check: Looks up existing
classification_confidencebefore processing - Skips if confidence >= 60% and
domain_typeexists forceflag bypasses cache checks
Keyword Classification
- Keyword ID: Uses deterministic hash of keyword text
ON CONFLICTclauses prevent duplicate inserts
App Details
- Freshness Check: 7-day threshold for re-fetching
- Checks
last_checked_ts,description,rating,similar_apps forceflag bypasses freshness check
Workflow Integration
Supported Workflows
DomainOnboardWorkflow- Orchestrates domain onboardingDomainClassifyWorkflow- DAG visualization for domain classificationUrlClassifyWorkflow- URL classification pipeline trackingKeywordClassifyWorkflow- Keyword classification pipeline tracking
Workflow Signaling
Messages from workflows include a _workflow object:
{
_workflow: {
id: string, // Workflow instance ID
step: string, // Current step name
resultKey: string, // KV key for results
index: number // Position in batch
}
}
Consumers signal results via signalWorkflowResult():
await signalWorkflowResult(env, body, {
type: "url",
url_id: 123,
page_type: "blog_post",
confidence: 85,
cost: 0.001
});
Results stored in WORKFLOW_RESULTS KV with 1-hour TTL.
Monitoring
Pipeline Activity Tracking
Consumers emit events to PIPELINE_ACTIVITY KV and PipelineMonitor Durable Object:
const activity = createActivityTracker(env, `url-classify-${Date.now()}`);
activity.start(DAG_NODE_IDS.CLASSIFY_BACKLINKS, { batch_size: 10 });
// ... processing ...
activity.complete(DAG_NODE_IDS.CLASSIFY_BACKLINKS, { succeeded: 9, failed: 1 });
WebSocket Events
Real-time pipeline updates via createPipelineEmitter():
const emit = createPipelineEmitter(env, "default");
emit.started(URL_CLASSIFY_NODE_IDS.TRIGGER_QUEUE, { batch_size: 10 });
emit.finished(URL_CLASSIFY_NODE_IDS.FINALIZE, { processed: 10 });
Key Metrics to Monitor
| Metric | Source | Alert Threshold |
|---|---|---|
| Messages in DLQ | Cloudflare Dashboard | > 100 |
| Classification failures | classification_failures table | > 50/hour |
| LLM usage rate | Consumer logs | > 20% of classifications |
| Average batch processing time | Consumer logs | > 30s |
| Queue depth | Cloudflare Dashboard | > 10,000 |
Useful Queries
-- Classification progress by domain
SELECT
d.domain,
d.urls_queued_for_classification,
d.urls_classified,
d.keywords_queued_for_classification,
d.keywords_classified,
CASE
WHEN d.urls_queued_for_classification > 0
THEN ROUND(d.urls_classified * 100.0 / d.urls_queued_for_classification, 1)
ELSE 0
END as url_progress_pct
FROM domains d
WHERE d.onboard_status = 'pending'
ORDER BY d.created_at DESC
LIMIT 20;
-- Recent DLQ failures by type
SELECT
failure_type,
SUBSTR(error_message, 1, 100) as error_preview,
COUNT(*) as count,
MAX(created_at) as last_seen
FROM classification_failures
WHERE created_at > strftime('%s', 'now', '-24 hours') * 1000
GROUP BY failure_type, SUBSTR(error_message, 1, 100)
ORDER BY count DESC;
DRAIN_MODE
Purpose
Gracefully pause queue processing during maintenance without losing messages.
Activation
// Enable DRAIN_MODE
await env.LOOKUP_CACHE.put("DRAIN_MODE", "true");
// Disable DRAIN_MODE
await env.LOOKUP_CACHE.delete("DRAIN_MODE");
Behavior
When DRAIN_MODE is active:
- Consumer checks KV at batch start
- All messages are retried with 60-second delay
- Messages remain in queue, not lost
- Processing resumes automatically when DRAIN_MODE disabled
Supported Consumers
- domain-onboard-consumer
- domain-classify-consumer
- url-classify-consumer
- keyword-classify-consumer
Global DRAIN_MODE (index.js)
// In queue() handler
const DRAIN_MODE = false;
if (DRAIN_MODE) {
for (const message of batch.messages) {
message.ack(); // Purge without processing
}
return;
}
Rate Limiting
DataForSEO Limits
- Concurrent requests: 30 max
- Requests per minute: 2000
Per-Consumer Batch Sizes
| Consumer | max_batch_size | max_concurrency | Effective Parallelism |
|---|---|---|---|
| domain-onboard | 1 | 2 | 2 |
| domain-classify | 20 | 20 | 400 |
| url-classify | 10 | 50 | 500 |
| keyword-classify | 20 | 50 | 1000 |
| spam-score | 1 | 10 | 10 |
| app-info-fetch | 1 | 3 | 3 |
Internal Rate Limiting
- App details: Sequential processing due to CF connection limits (~6 concurrent)
- Shelf crawl: 3-second jittered delay between shelves
- Apple scraping: Desktop Safari UA rotation, randomized headers
Cost Tracking
All external API calls are tracked via trackCost():
await trackCost(env, {
service: COST_SERVICES.DATAFORSEO_BACKLINKS,
cost_usd: actualCost,
request_count: 1,
endpoint: "/bulk_spam_score/live",
source: "spam_score_consumer"
});
Cost Services
DATAFORSEO_BACKLINKS- Backlinks, spam scores, domain ranksDATAFORSEO_SERP- SERP trackingDATAFORSEO_APP_LIST- App store dataZENROWS_BASIC- Basic proxy (~$0.0003/req)ZENROWS_PREMIUM- Premium proxy (~$0.003/req)WORKERS_AI- LLM inference