Dramatiq
Package: z4j-dramatiq - Dramatiq 1.15+.
Install
Section titled “Install”pip install z4j-dramatiqWhat it captures
Section titled “What it captures”z4j registers a Middleware on the broker:
from dramatiq import get_brokerfrom z4j_dramatiq import Z4JMiddleware
broker = get_broker()broker.add_middleware(Z4JMiddleware())The framework adapters do this automatically. For bare usage, add it yourself.
| Middleware hook | z4j event |
|---|---|
after_enqueue | task_sent |
before_process_message | task_started |
after_process_message (no exception) | task_succeeded |
after_process_message (exception) | task_failed |
retry (via Retries middleware) | task_retry |
Actions
Section titled “Actions”| Verb | How |
|---|---|
retry | actor.send(*args, **kwargs) with original payload |
cancel | Not supported natively. z4j marks the task cancelled and the agent tries best-effort abort via Results middleware cancel flag |
purge_queue | broker.flush(queue_name) |
Dramatiq’s design (reliability first) deliberately does not expose an ad-hoc cancel API. If you need cancel, consider a different engine or build an application-level cancel token.
Middleware ordering
Section titled “Middleware ordering”Insert Z4JMiddleware after Retries and before Results:
broker.add_middleware(Retries(max_retries=3))broker.add_middleware(Z4JMiddleware())broker.add_middleware(Results(backend=redis_backend))This ensures retry events are captured after the retry decision is made.
Caveats
Section titled “Caveats”- No chord/group primitive in Dramatiq → no chord-aware UI.
- Delayed messages (
send_with_options(delay=...)) show astask_sentwith anetafield. - Message priorities (if using RabbitMQ) are captured.
Config
Section titled “Config”# Django settingsZ4J = { ..., "dramatiq": {"broker": "myapp.broker.redis_broker"},}See scheduler: APScheduler - the typical scheduler for Dramatiq apps.