Celery
Requires Celery 5.2+, Python 3.10+. See the compatibility matrix for the full pin string.
z4j is a Celery dashboard, monitoring, and control plane that captures task lifecycle events, persists them in Postgres with a tamper-evident audit chain, and lets operators retry, cancel, or schedule tasks from a unified UI. See the marketing landing for the Celery feature overview and the Flower alternative page if you’re migrating from Flower.
Install
Section titled “Install”pip install z4j-celeryPip transitively pulls z4j-core and z4j-bare. Auto-discovered - no explicit registration needed if you use z4j-django / z4j-flask / z4j-fastapi. The worker_init signal handler is wired:
- by
z4j-django(which eagerly importsz4j_celeryat module-load time when inINSTALLED_APPS), or - by direct
import z4j_celeryin yourcelery.py(Flask/FastAPI/bare layouts), or - automatically by celery itself via the entry point.
Worker bootstrap
Section titled “Worker bootstrap”When a Celery worker process starts, the worker_init signal fires and z4j_celery.worker_bootstrap._on_worker_init():
- Confirms the process is
celery worker(notcelery inspect/control/purge/etc. - those don’t deserve an agent slot). - Resolves the Celery app from
sender.apporcurrent_app. - Calls
install_agent(engines=[CeleryEngineAdapter(celery_app=...)]), which readsZ4J_BRAIN_URL/Z4J_TOKEN/Z4J_HMAC_SECRET/Z4J_PROJECT_ID/Z4J_AGENT_NAMEfrom the environment. - Logs
INFO:z4j.celery.worker_bootstrap:z4j worker bootstrap: agent runtime started (celery_app=..., framework=...)on success.
If you don’t see that log line on worker boot, the agent is not running and tasks will not reach z4j. Check the Z4J_* env vars and that the worker is actually invoked as celery worker (the bootstrap signal only fires under that command).
What it captures
Section titled “What it captures”| Signal | z4j event |
|---|---|
before_task_publish | task_sent |
task_prerun | task_started |
task_postrun (state=SUCCESS) | task_succeeded |
task_postrun (state=FAILURE) | task_failed |
task_retry | task_retry |
task_revoked | task_revoked |
Plus the payload: args, kwargs (redacted), queue, routing key, exchange, retries count, ETA, expires, task parent (chord / group support).
Actions
Section titled “Actions”| Verb | How z4j performs it |
|---|---|
retry | app.send_task(name, args, kwargs, ...) with original payload; marks old task retried_as=<new_id> |
cancel | app.control.revoke(task_id, terminate=False) - worker-level cancellation |
cancel + terminate | requires operator+ role - sends SIGTERM to the worker |
purge_queue | app.control.purge() on the queue |
bulk_retry | loop native retry, capped at 10k |
Chord / group support
Section titled “Chord / group support”Celery’s chord and group primitives carry a group_id. z4j preserves this, letting the dashboard:
- Show sub-tasks grouped under the parent.
- Offer “retry whole chord” as a single button.
- Attribute failures to the root group in alerts.
Caveats
Section titled “Caveats”- Canvas signatures -
.si()/.s()are not re-serialized exactly; the agent captures the resolved payload frombefore_task_publish. Re-enqueueing re-runs with the same captured args, not the same signature object. This is fine 99% of the time. - eta/countdown on retry - z4j’s retry does not replay the original ETA. Retries run immediately.
- Broker state visibility - queue length reporting requires
redisbroker; RabbitMQ support works but depends onrabbitmq-managementplugin.
Config
Section titled “Config”The adapter takes no explicit config - it reads the Celery app’s config via the framework adapter.
For Django, the app is auto-detected via 5 candidates (see quickstart §Auto-detect). If your layout is unusual:
# settings.py - top-level Django setting, NOT inside the Z4J dictCELERY_APP = "myproject.celery:app"For Flask / FastAPI / bare-Python layouts, pass the app directly to the engine:
from z4j_celery.engine import CeleryEngineAdapterfrom z4j_bare.install import install_agent
install_agent( engines=[CeleryEngineAdapter(celery_app=my_celery_app)], # brain_url / token / hmac_secret / project_id / agent_name # are read from Z4J_* env vars if not passed explicitly)Performance notes
Section titled “Performance notes”- Event capture adds ~50µs per task (measured on Apple M3). Negligible.
task_postrunsignal is synchronous in Celery; the z4j handler does its work on a worker thread to avoid blocking.