Celery and Worker Runtime
IntegraMon runs both classic Celery workers and a dedicated controller process in backend/src/cpi/worker.py.
Process model
Supervisor starts these runtime groups after migrations are done:
- Gunicorn web workers
- multiple Celery workers grouped by queue type
- one Python controller process called
worker
The controller loop is important because it:
- dispatches tenant polling jobs
- reconciles orphaned worker runs after restarts
- watches thread health
- pushes queue-size and scheduling work into Celery
Worker groups and default concurrency
| Group | Default concurrency | Queues |
|---|---|---|
celery-light | 2 | messagelog_read, payload_read, package_read |
celery-light-cold | 2 | messagelog_read_cold |
celery-api-details | 3 | messagelog_get_details_hot, payload_get_details |
celery-api-details-cold | 2 | messagelog_get_details_cold |
celery-alert-details | 1 | messagelog_get_details_alert |
celery-alerts | 1 | trigger_alerts |
celery-periodic | 1 | periodic_run, stats_cache |
celery-ai | 1 | ai |
celery-messagelog-process-batch | 2 | messagelog_process_batch |
celery-messagelog-process-batch-cold | 2 | messagelog_process_batch_cold |
celery-medium | 2 | iflow_download, archive_data, messagelog_correlations, messagelog_customheaders |
All of them use --prefetch-multiplier=1, which is a deliberate fairness choice for mixed-duration tasks.
Important task families
Tenant polling and CPI ingestion
read_message_logsread_message_logs_coldread_payloadsread_packagesdeep_sync_iflow_artifacts
Alerting and archive
check_and_create_alertsarchive_cpi_daystats_update_tperiodic_jobs
Platform operations
dispatch_due_agent_reports_taskrun_agent_report_taskcleanup_config_taskcleanup_pending_configs_taskstorage_snapshot_quick_taskstorage_snapshot_deep_taskstorage_dispatch_taskhost_metrics_collect_taskhost_metrics_dispatch_taskdjango_metrics_collect_taskdjango_metrics_dispatch_task
AI
process_chat_taskon queueai
Periodic scheduling model
There is no single Celery Beat file in the current codebase. Scheduling is split across:
- the controller loop in
cpi/worker.py - periodic config stored in
cConfigExt - platform metric dispatch tasks
- report scheduling through
cJobConfig.next_run_at
Operational consequence:
- queue delay and worker health matter more than only cron syntax
- runtime state lives partly in the database, not only in process memory
Retry behavior and failure handling
Current behavior is mixed by task family:
archive_cpi_dayexplicitly runs withmax_retries=0- some tasks reschedule themselves through the controller or periodic logic
- several tasks record failure state into
cWorkerJobRun - task results are stored in Redis result backend when configured
There is no generic dead-letter queue layer in the current Supervisor configuration.
Orphan and stale job recovery
cpi/worker.py includes startup reconciliation for:
- queued runs that stayed queued too long without starting
- running rows left behind by crashed or restarted processes
Current stale queue timeout:
45minutes for queued rows
Recovered rows are marked skipped with reconciliation metadata.
This is a useful self-healing feature, especially after container restarts or broker disruption.
Monitoring surfaces
Useful runtime views are:
/superadmin/jobs/superadmin/metrics/superadmin/redis-metrics/superadmin/worker-tuning
Relevant persistence models include:
cWorkerJobRuncJobRuncWorkerTuningSettingscMetricSettings
Worker tuning profiles
The built-in profile presets are:
Safe
- light queues reduced
- cold queues reduced
- periodic and alert queues stay at
1 - good for small environments or constrained CPUs
Balanced
- the current default profile
- moderate parallelism across hot and cold paths
- good first production baseline
Fast
- more hot readers and batch processors
- useful only when Redis, DB, and CPU have enough headroom
Sizing guidance by installation size
Small
- keep
balancedorsafe - avoid aggressive cold backfill
- SQLite can still work if overall write pressure is low
Medium
- PostgreSQL recommended
- external Redis recommended
balancedis usually the best starting point
Large
- external PostgreSQL and Redis expected
- move to
fastor a custom profile only after reading Redis, host, and Django metrics together - watch queue depth, DB write pressure, and archive runtime carefully
Operational anti-patterns
Avoid these shortcuts:
- raising worker concurrency before validating Redis and DB headroom
- keeping internal Redis for important production queues without accepting restart loss
- assuming queue growth is only a Celery problem when DB or CPI APIs are actually the bottleneck