Skip to content

Dramatiq

Package: z4j-dramatiq - Dramatiq 1.15+.

Terminal window
pip install z4j-dramatiq

z4j registers a Middleware on the broker:

from dramatiq import get_broker
from z4j_dramatiq import Z4JMiddleware
broker = get_broker()
broker.add_middleware(Z4JMiddleware())

The framework adapters do this automatically. For bare usage, add it yourself.

Middleware hookz4j event
after_enqueuetask_sent
before_process_messagetask_started
after_process_message (no exception)task_succeeded
after_process_message (exception)task_failed
retry (via Retries middleware)task_retry
VerbHow
retryactor.send(*args, **kwargs) with original payload
cancelNot supported natively. z4j marks the task cancelled and the agent tries best-effort abort via Results middleware cancel flag
purge_queuebroker.flush(queue_name)

Dramatiq’s design (reliability first) deliberately does not expose an ad-hoc cancel API. If you need cancel, consider a different engine or build an application-level cancel token.

Insert Z4JMiddleware after Retries and before Results:

broker.add_middleware(Retries(max_retries=3))
broker.add_middleware(Z4JMiddleware())
broker.add_middleware(Results(backend=redis_backend))

This ensures retry events are captured after the retry decision is made.

  • No chord/group primitive in Dramatiq → no chord-aware UI.
  • Delayed messages (send_with_options(delay=...)) show as task_sent with an eta field.
  • Message priorities (if using RabbitMQ) are captured.
# Django settings
Z4J = {
...,
"dramatiq": {"broker": "myapp.broker.redis_broker"},
}

See scheduler: APScheduler - the typical scheduler for Dramatiq apps.