Skip to content

Celery and Worker Runtime

IntegraMon runs both classic Celery workers and a dedicated controller process in backend/src/cpi/worker.py.

Process model

Supervisor starts these runtime groups after migrations are done:

  • Gunicorn web workers
  • multiple Celery workers grouped by queue type
  • one Python controller process called worker

The controller loop is important because it:

  • dispatches tenant polling jobs
  • reconciles orphaned worker runs after restarts
  • watches thread health
  • pushes queue-size and scheduling work into Celery

Worker groups and default concurrency

Group Default concurrency Queues
celery-light 2 messagelog_read, payload_read, package_read
celery-light-cold 2 messagelog_read_cold
celery-api-details 3 messagelog_get_details_hot, payload_get_details
celery-api-details-cold 2 messagelog_get_details_cold
celery-alert-details 1 messagelog_get_details_alert
celery-alerts 1 trigger_alerts
celery-periodic 1 periodic_run, stats_cache
celery-ai 1 ai
celery-messagelog-process-batch 2 messagelog_process_batch
celery-messagelog-process-batch-cold 2 messagelog_process_batch_cold
celery-medium 2 iflow_download, archive_data, messagelog_correlations, messagelog_customheaders

All of them use --prefetch-multiplier=1, which is a deliberate fairness choice for mixed-duration tasks.

Important task families

Tenant polling and CPI ingestion

  • read_message_logs
  • read_message_logs_cold
  • read_payloads
  • read_packages
  • deep_sync_iflow_artifacts

Alerting and archive

  • check_and_create_alerts
  • archive_cpi_day
  • stats_update_t
  • periodic_jobs

Platform operations

  • dispatch_due_agent_reports_task
  • run_agent_report_task
  • cleanup_config_task
  • cleanup_pending_configs_task
  • storage_snapshot_quick_task
  • storage_snapshot_deep_task
  • storage_dispatch_task
  • host_metrics_collect_task
  • host_metrics_dispatch_task
  • django_metrics_collect_task
  • django_metrics_dispatch_task

AI

  • process_chat_task on queue ai

Periodic scheduling model

There is no single Celery Beat file in the current codebase. Scheduling is split across:

  • the controller loop in cpi/worker.py
  • periodic config stored in cConfigExt
  • platform metric dispatch tasks
  • report scheduling through cJobConfig.next_run_at

Operational consequence:

  • queue delay and worker health matter more than only cron syntax
  • runtime state lives partly in the database, not only in process memory

Retry behavior and failure handling

Current behavior is mixed by task family:

  • archive_cpi_day explicitly runs with max_retries=0
  • some tasks reschedule themselves through the controller or periodic logic
  • several tasks record failure state into cWorkerJobRun
  • task results are stored in Redis result backend when configured

There is no generic dead-letter queue layer in the current Supervisor configuration.

Orphan and stale job recovery

cpi/worker.py includes startup reconciliation for:

  • queued runs that stayed queued too long without starting
  • running rows left behind by crashed or restarted processes

Current stale queue timeout:

  • 45 minutes for queued rows

Recovered rows are marked skipped with reconciliation metadata.

This is a useful self-healing feature, especially after container restarts or broker disruption.

Monitoring surfaces

Useful runtime views are:

  • /superadmin/jobs
  • /superadmin/metrics
  • /superadmin/redis-metrics
  • /superadmin/worker-tuning

Relevant persistence models include:

  • cWorkerJobRun
  • cJobRun
  • cWorkerTuningSettings
  • cMetricSettings

Worker tuning profiles

The built-in profile presets are:

Safe

  • light queues reduced
  • cold queues reduced
  • periodic and alert queues stay at 1
  • good for small environments or constrained CPUs

Balanced

  • the current default profile
  • moderate parallelism across hot and cold paths
  • good first production baseline

Fast

  • more hot readers and batch processors
  • useful only when Redis, DB, and CPU have enough headroom

Sizing guidance by installation size

Small

  • keep balanced or safe
  • avoid aggressive cold backfill
  • SQLite can still work if overall write pressure is low

Medium

  • PostgreSQL recommended
  • external Redis recommended
  • balanced is usually the best starting point

Large

  • external PostgreSQL and Redis expected
  • move to fast or a custom profile only after reading Redis, host, and Django metrics together
  • watch queue depth, DB write pressure, and archive runtime carefully

Operational anti-patterns

Avoid these shortcuts:

  • raising worker concurrency before validating Redis and DB headroom
  • keeping internal Redis for important production queues without accepting restart loss
  • assuming queue growth is only a Celery problem when DB or CPI APIs are actually the bottleneck