This guide takes you from pip install to a job reaching completed.
Before the code, here is the operational model Awa is built around:
- inserting a job writes durable job state to Postgres, so enqueuing can live inside the same transaction as your application write
- workers claim runnable jobs, heartbeat while they execute, and rescue them if the worker dies
- retries, callback waits, and progress checkpoints are persisted in Postgres and exposed as one hydrated job snapshot instead of being held only in memory
- when you debug or operate the system, inspect the job first; the CLI and UI are designed around that read-only inspection path
That means “what happened?” is usually a database inspection question, not a worker-log archaeology exercise.
- PostgreSQL running locally or remotely
- Python 3.10+
- A database URL exported as
DATABASE_URL
Example local URL:
export DATABASE_URL=postgres://postgres:test@localhost:15432/awa_testpython -m venv .venv
source .venv/bin/activate
pip install awa-pgpython -m awa --database-url "$DATABASE_URL" migrateCreate quickstart.py:
import asyncio
import os
from dataclasses import dataclass
import awa
DATABASE_URL = os.environ["DATABASE_URL"]
@dataclass
class SendEmail:
to: str
subject: str
async def main() -> None:
client = awa.AsyncClient(DATABASE_URL)
@client.task(SendEmail, queue="email")
async def handle_email(job):
print(f"sending email to {job.args.to}: {job.args.subject}")
await client.start([("email", 2)])
job = await client.insert(
SendEmail(to="alice@example.com", subject="Welcome"),
queue="email",
)
await asyncio.sleep(1)
result = await client.get_job(job.id)
print(f"job {result.id} state = {result.state}")
await client.shutdown()
asyncio.run(main())python quickstart.pyExpected output is similar to:
sending email to alice@example.com: Welcome
job 1 state = completed
python -m awa --database-url "$DATABASE_URL" job list --queue email
python -m awa --database-url "$DATABASE_URL" job dump 1
python -m awa --database-url "$DATABASE_URL" job dump-run 1
python -m awa --database-url "$DATABASE_URL" queue statsjob dump gives you the whole job snapshot as JSON. job dump-run focuses on one attempt: the current attempt uses live row data, while historical attempts are reconstructed from the stored errors[] history.
The dashboard ships in a separate wheel so the default awa-pg install stays small for workers and producers. Install the [ui] extra to bring in the awa-cli binary that hosts it:
pip install 'awa-pg[ui]'
python -m awa --database-url "$DATABASE_URL" serve
# → http://127.0.0.1:3000python -m awa serve delegates to the awa serve binary (you can also call awa serve directly once the extra is installed). The UI is read-only when the database reports transaction_read_only = on (e.g. on a replica) or when --read-only is passed.
await client.migrate()runs migrations from Python instead of the CLI.awa.Clientprovides a synchronous API for worker/admin/direct-producer code — all methods are plain (e.g.,client.insert(...),client.migrate()).client.start()accepts tuple queue configs for hard-reserved mode and dict configs for weighted mode. See Configuration reference.awa.QueueFanouthelps one hot logical queue use several physical queues while keeping routing deterministic. See Logical queue fanout.
Most applications should keep using their normal database stack for business tables. Use AsyncClient/Client for workers, admin calls, migrations, and queue-only producers; when a web request already has a transaction, enqueue through awa.bridge on that same connection/session.
Install the app database libraries you already use, for example:
pip install 'sqlalchemy[asyncio]' asyncpgThen enqueue in the same SQLAlchemy transaction as your application write:
from dataclasses import dataclass
from awa.bridge import insert_job
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
@dataclass
class SendEmail:
to: str
subject: str
async def create_order(session: AsyncSession, order_id: str, email: str) -> int:
async with session.begin():
await session.execute(
text("INSERT INTO orders (id, email) VALUES (:id, :email)"),
{"id": order_id, "email": email},
)
job = await insert_job(
session,
SendEmail(to=email, subject="Order confirmed"),
queue="email",
)
return job["id"]The same bridge supports asyncpg, psycopg3, SQLAlchemy, and Django; see Bridge Adapters for driver-specific examples.
By default, a queue is strict FIFO per (queue, priority). Operators can opt a contended queue into partitioned FIFO by raising awa.queue_meta.enqueue_shards — order is then preserved within each shard, but not across shards. If your producer enqueues jobs that must be processed in order (per-customer events, sequential workflow steps), pass ordering_key so they all land on one shard:
await client.insert(
UpdateCustomer(customer_id=42, payload=...),
queue="customer-updates",
ordering_key=b"customer-42", # str also accepted; UTF-8 encoded
)At the default enqueue_shards = 1 the key is ignored (everything is on shard 0 anyway). See ADR-025 for the partitioned-FIFO contract and docs/upgrade-0.5-to-0.6.md for the operator-side knob.
awa records 20+ metrics (throughput, pickup latency, in-flight jobs, rescues, …) on the Rust side. Python workers enable OTLP export by calling awa.init_telemetry(...) once before the worker starts:
import os
import awa
awa.init_telemetry(
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"], # e.g. http://localhost:4317
os.environ.get("OTEL_SERVICE_NAME", "my-service"),
)
# ... then build the client and start workers as normal.init_telemetry is idempotent; only the first call installs a provider. Call awa.shutdown_telemetry() at the end of short-lived scripts to flush pending metrics. See awa-python/examples/telemetry.py for a runnable example.