Python SDK
Async Python client for the bigRAG API with full type hints and zero config.
Installation
pip install bigrag==2026.5.23Requires Python 3.11+. The only runtime dependency is httpx. Published SDK releases use CalVer (YYYY.M.D).
Quick Start
import asyncio
from bigrag import BigRAG
async def main():
async with BigRAG(api_key="bigrag_sk_…") as client: # minted at /v1/admin/api-keys
# Create a collection
collection = await client.collections.create({
"name": "knowledge_base",
"description": "Company docs",
})
# Upload a document
doc = await client.documents.upload(
"knowledge_base",
"/path/to/report.pdf",
)
# Query the collection
results = await client.queries.query("knowledge_base", {
"query": "What is our revenue?",
"top_k": 5,
})
for r in results["results"]:
print(f"[{r['score']:.3f}] {r['text'][:100]}")
asyncio.run(main())Client Options
client = BigRAG(
api_key="...", # or set BIGRAG_API_KEY env var
base_url="http://localhost:4000", # API server URL
timeout=120.0, # request timeout in seconds
max_retries=2, # retries on infrastructure 429/5xx/connection errors
auto_idempotency_key=True, # auto-add an Idempotency-Key to mutating requests
)The client reads BIGRAG_API_KEY from the environment if api_key is not passed.
Resource Namespaces
| Namespace | Description |
|---|---|
client.collections | Collection CRUD, stats, and analytics |
client.documents | Document upload, list, delete, batch ops |
client.chat | Generated answers, question suggestions, and streaming |
client.queries | Single, multi-collection, and batch queries |
client.vectors | Raw vector upsert and delete |
client.webhooks | Webhook management |
client.auth | Setup, login, logout, identity, password, and preferences |
client.admin | Users, API keys, access logs, audit logs, runtime settings, vector storage overview, embedding presets, and MCP server keys |
client.connectors.s3 | S3-compatible bucket prefixes, sources, and sync jobs |
client.evaluations | Golden-set retrieval evaluation runs |
S3 sync jobs support collection, source_id, and limit filters. Running jobs include details["progress"] with sync phase, percent, current item, and created/updated/skipped/deleted/failed counts.
For Cloudflare R2, set endpoint_url to the R2 S3 API endpoint and omit region or use auto.
Collections
# List collections
result = await client.collections.list(name="prefix", limit=10)
# Auto-paginate through every collection
async for collection in client.collections.list_all(name="prefix"):
print(collection["name"])
# Create
collection = await client.collections.create({
"name": "docs",
"embedding_provider": "openai",
"embedding_model": "text-embedding-3-small",
})
# Get
collection = await client.collections.get("docs")
# Update
await client.collections.update("docs", {
"description": "Updated description",
"embedding_api_key": None,
"multimodal_enabled": True,
"default_search_mode": "hybrid",
})
# Delete
await client.collections.delete("docs")
# Stats
stats = await client.collections.stats("docs")
# Truncate (delete all documents, keep the collection)
await client.collections.truncate("docs")Set embedding_api_key or reranking_api_key to None in update calls to clear the stored key.
Documents
# Upload a file (accepts str, Path, bytes, BinaryIO, or (name, data) tuple)
doc = await client.documents.upload("docs", "/path/to/file.pdf")
doc = await client.documents.upload("docs", b"raw content")
doc = await client.documents.upload("docs", ("custom.txt", b"hello"))
# Upload with metadata
doc = await client.documents.upload(
"docs", "/path/to/file.pdf",
metadata={"department": "engineering"},
)Chat
# Non-streaming generated answer with citations
answer = await client.chat.create({
"collection": "docs",
"message": "What is the PTO policy?",
"top_k": 8,
"search_mode": "hybrid",
})
print(answer["assistant_message"]["content"])
# Question suggestions
suggestions = await client.chat.get_question_suggestions("docs")
fresh = await client.chat.generate_question_suggestions({"collection": "docs"})
# Stream chat SSE events
async for event in client.chat.stream({
"collection": "docs",
"message": "Answer with citations",
}):
if event["event"] == "delta":
print(event["data"]["delta"], end="")Queries
# Single collection query
result = await client.queries.query("docs", {
"query": "search terms",
"top_k": 10,
"search_mode": "hybrid", # "semantic", "keyword", or "hybrid"
"min_score": 0.5,
"rerank": True,
"skip_cache": True,
"multimodal": True,
})
# Multi-collection query
result = await client.queries.multi_query({
"query": "search terms",
"collections": ["docs", "articles"],
"top_k": 10,
"skip_cache": True,
})
# Batch query (up to 20)
result = await client.queries.batch_query({
"queries": [
{"collection": "docs", "query": "first query", "skip_cache": True},
{"collection": "articles", "query": "second query"},
]
})Vectors
# Upsert custom embeddings
result = await client.vectors.upsert("docs", [
{"id": "v1", "embedding": [0.1, 0.2, ...], "text": "chunk text"},
{"id": "v2", "embedding": [0.3, 0.4, ...], "metadata": {"source": "api"}},
])
# Delete vectors
result = await client.vectors.delete("docs", ["v1", "v2"])Webhooks
# Create
webhook = await client.webhooks.create({
"url": "https://example.com/webhook",
"events": ["collection.truncated", "connector.sync.failed"],
})
print(webhook["secret"])
# List, get, update, delete
webhooks = await client.webhooks.list(limit=50, offset=0)
wh = await client.webhooks.get("webhook-id")
await client.webhooks.update("webhook-id", {"active": False})
await client.webhooks.delete("webhook-id")
# Test
result = await client.webhooks.test("webhook-id")
# Delivery history
deliveries = await client.webhooks.list_deliveries("webhook-id", limit=10)Webhook events cover collection and connector sync data-operation changes. Examples: collection.truncated and connector.sync.failed.
Webhook management calls /v1/admin/webhooks and requires session-cookie admin auth. API-key clients receive 403.
Status Polling
Track document processing with REST polling:
# Single document status
doc = await client.documents.get("docs", "doc-id")
while doc["status"] in ("pending", "processing"):
await asyncio.sleep(2)
doc = await client.documents.get("docs", doc["id"])
print(doc["progress"]["message"] if doc.get("progress") else doc["status"])
# Batch status (e.g., after batch upload)
doc_ids = [d["id"] for d in result["documents"]]
statuses = await client.documents.batch_get_status("docs", doc_ids)Collection-Scoped Client
For repeated operations on the same collection:
col = client.collection("docs")
doc = await col.upload("/path/to/file.pdf")
results = await col.query({"query": "search terms"})
stats = await col.stats()
analytics = await col.analytics()Admin
settings = await client.admin.settings.list()
await client.admin.settings.test({"values": {"turbopuffer_region": "aws-us-west-2"}})
await client.admin.settings.update({"values": {"trusted_proxies": ["10.0.0.0/8"]}})
await client.admin.settings.reset({"keys": ["trusted_proxies"]})
await client.admin.settings.purge_embedding_cache()
overview = await client.admin.vector_storage.overview()Platform Endpoints
# Health check
health = await client.health()
# Readiness (checks Postgres, Redis, Turbopuffer, embedding provider)
ready = await client.readiness()
# Platform stats
stats = await client.get_stats()
# Usage rollup
usage = await client.get_usage(window_days=30)
# Pollable admin UI status helpers
overview_status = await client.get_overview_status()
collections_status = await client.get_collections_status()
usage_status = await client.get_usage_status(window_days=30)
access_status = await client.get_access_status(window_days=7)
# Available embedding models
models = await client.list_embedding_models()
# Collection analytics
analytics = await client.collections.analytics("docs")Error Handling
from bigrag import (
BigRAGError, # base for all errors
APIError, # any HTTP error
BadRequestError, # 400
AuthenticationError, # 401
NotFoundError, # 404
RateLimitError, # 429
InternalServerError, # 500
APIConnectionError, # network errors
APITimeoutError, # timeout
)
try:
await client.collections.get("missing")
except NotFoundError as e:
print(f"Not found: {e} (status={e.status})")
except APIConnectionError:
print("Cannot reach the API server")
except APITimeoutError:
print("Request timed out")Retry Behavior
The SDK automatically retries on:
- HTTP 429 from proxy or infrastructure layers
- HTTP 5xx (server errors)
- Connection errors and timeouts
Retries use exponential backoff: min(0.5 * 2^attempt, 4) seconds. Configure with max_retries (default: 2).
The Python SDK is fully typed with py.typed support. Use it with mypy or pyright for full type checking.