Skip to content

Bare Python

Requires Python 3.11+. See the compatibility matrix for engine and scheduler pin strings.

  • Standalone workers with no web framework (cron scripts, pure RQ workers, Dramatiq workers).
  • Custom frameworks z4j does not provide an adapter for.
  • Library or SDK code that wants to ship its own agent.
from z4j_bare import (
install_agent, # function: install + start the runtime
AgentRuntime, # class: returned by install_agent
RuntimeState, # enum
BareFrameworkAdapter, # framework adapter for no-framework apps
BufferStore, # local SQLite buffer (advanced inspection)
safe_boundary, safe_call, # safety helpers for custom adapters
)

install_agent(...) returns a started AgentRuntime driving the WebSocket on a background thread. Hold the reference and call runtime.stop() on shutdown.

from z4j_bare import install_agent
from z4j_celery import CeleryEngineAdapter
runtime = install_agent(
engines=[CeleryEngineAdapter(celery_app=celery_app)],
# brain_url / token / hmac_secret / project_id default to the
# matching Z4J_* env vars; pass them explicitly only if you do
# not want the env-var lookup.
)
try:
your_worker_loop() # sync or async; the runtime is independent
finally:
runtime.stop()

What install_agent does:

  1. Opens the WebSocket, sends hello with engine and scheduler capabilities, waits for hello_ack.
  2. Installs patches on each supplied engine adapter to capture lifecycle events.
  3. Starts the heartbeat loop.
  4. On runtime.stop(), drains in-flight events, sends close, and joins the background thread.

install_agent(engines=[...]) accepts any number of adapters. The runtime fans events out for each:

from z4j_celery import CeleryEngineAdapter
from z4j_rq import RqEngineAdapter
runtime = install_agent(
engines=[
CeleryEngineAdapter(celery_app=primary_celery_app),
RqEngineAdapter(redis_url="redis://localhost:6379/0"),
],
)

Each adapter appears separately in the dashboard with its own queue namespace.

Every keyword argument has a matching Z4J_* env var; explicit kwargs win, then env vars, then defaults. Required (one of the two sources): brain_url, token, hmac_secret, project_id, plus at least one entry in engines. Common optional kwargs: agent_name, agent_id, schedulers, environment, tags, transport, dev_mode, strict_mode, autostart, buffer_path, max_payload_bytes, redaction_extra_key_patterns, redaction_extra_value_patterns. See help(install_agent) for the full list.

python -m z4j_bare doctor runs the same probes the agent runtime does (buffer dir writable, brain DNS / TCP / TLS, WebSocket upgrade) using Z4J_* env vars or CLI flags. This is the canonical doctor implementation; the framework adapters (django / flask / fastapi) wrap the same logic.

Terminal window
# Use env vars (typical):
Z4J_BRAIN_URL=https://tasks.example.com \
Z4J_TOKEN=... \
Z4J_HMAC_SECRET=... \
Z4J_PROJECT_ID=ml-pipeline \
python -m z4j_bare doctor
# Or flags (useful for ad-hoc checks):
python -m z4j_bare doctor \
--brain-url https://tasks.example.com \
--token ... \
--hmac-secret ... \
--project-id ml-pipeline
# Skip the WS upgrade probe:
python -m z4j_bare doctor --no-websocket
# JSON for scripting:
python -m z4j_bare doctor --json

Exit 0 on all-green, 1 on any failure. The same probes are exposed programmatically via z4j_bare.diagnostics so custom adapters can build their own doctor commands.

See service-user deployments for the buffer-path failure mode the doctor catches.

3.11+. The runtime uses match statements, StrEnum, and current asyncio idioms.

See websocket protocol for the actual frame schema. The runtime is the canonical reference implementation of the protocol.