📚 Documentation: Explore the Docs 📖
🔍 Source Code: View on GitHub 💾
💬 Join the Discussion: Discord Community
PGQueuer is a minimalist, high-performance job queue library for Python, leveraging the robustness of PostgreSQL. Designed for simplicity and efficiency, PGQueuer uses PostgreSQL's LISTEN/NOTIFY to manage job queues effortlessly.
- Simple Integration: Easy to integrate with existing Python applications using PostgreSQL.
- Efficient Concurrency Handling: Utilizes PostgreSQL's
FOR UPDATE SKIP LOCKED
for reliable and concurrent job processing. - Real-time Notifications: Leverages
LISTEN
andNOTIFY
for real-time updates on job status changes.
To install PGQueuer, simply install with pip the following command:
pip install pgqueuer
Here's how you can use PGQueuer in a typical scenario processing incoming data messages:
Start a long-lived consumer that will begin processing jobs as soon as they are enqueued by another process. In this case we want to be a bit more carefull as we want gracefull shutdowns, pgqueuer run
will setup signals to
ensure this.
from __future__ import annotations
import asyncpg
from pgqueuer.db import AsyncpgDriver, dsn
from pgqueuer.models import Job
from pgqueuer.qm import QueueManager
async def main() -> QueueManager:
connection = await asyncpg.connect(dsn())
driver = AsyncpgDriver(connection)
qm = QueueManager(driver)
# Setup the 'fetch' entrypoint
@qm.entrypoint("fetch")
async def process_message(job: Job) -> None:
print(f"Processed message: {job}")
return qm
python3 -m pgqueuer run tools.consumer.main
Start a short-lived producer that will enqueue 10,000 jobs.
from __future__ import annotations
import asyncio
import sys
import asyncpg
from pgqueuer.db import AsyncpgDriver
from pgqueuer.queries import Queries
async def main(N: int) -> None:
connection = await asyncpg.connect()
driver = AsyncpgDriver(connection)
queries = Queries(driver)
await queries.enqueue(
["fetch"] * N,
[f"this is from me: {n}".encode() for n in range(1, N+1)],
[0] * N,
)
if __name__ == "__main__":
print(sys.argv)
N = 1_000 if len(sys.argv) == 1 else int(sys.argv[1])
asyncio.run(main(N))
python3 tools/producer.py 10000