bigRAG
SDKs

Python SDK

Async Python client for the bigRAG API with full type hints and zero config.

Installation

pip install bigrag==2026.5.23

Requires Python 3.11+. The only runtime dependency is httpx. Published SDK releases use CalVer (YYYY.M.D).

Quick Start

import asyncio
from bigrag import BigRAG

async def main():
    async with BigRAG(api_key="bigrag_sk_…") as client:  # minted at /v1/admin/api-keys
        # Create a collection
        collection = await client.collections.create({
            "name": "knowledge_base",
            "description": "Company docs",
        })

        # Upload a document
        doc = await client.documents.upload(
            "knowledge_base",
            "/path/to/report.pdf",
        )

        # Query the collection
        results = await client.queries.query("knowledge_base", {
            "query": "What is our revenue?",
            "top_k": 5,
        })

        for r in results["results"]:
            print(f"[{r['score']:.3f}] {r['text'][:100]}")

asyncio.run(main())

Client Options

client = BigRAG(
    api_key="...",                  # or set BIGRAG_API_KEY env var
    base_url="http://localhost:4000",  # API server URL
    timeout=120.0,                  # request timeout in seconds
    max_retries=2,                  # retries on infrastructure 429/5xx/connection errors
    auto_idempotency_key=True,      # auto-add an Idempotency-Key to mutating requests
)

The client reads BIGRAG_API_KEY from the environment if api_key is not passed.

Resource Namespaces

NamespaceDescription
client.collectionsCollection CRUD, stats, and analytics
client.documentsDocument upload, list, delete, batch ops
client.chatGenerated answers, question suggestions, and streaming
client.queriesSingle, multi-collection, and batch queries
client.vectorsRaw vector upsert and delete
client.webhooksWebhook management
client.authSetup, login, logout, identity, password, and preferences
client.adminUsers, API keys, access logs, audit logs, runtime settings, vector storage overview, embedding presets, and MCP server keys
client.connectors.s3S3-compatible bucket prefixes, sources, and sync jobs
client.evaluationsGolden-set retrieval evaluation runs

S3 sync jobs support collection, source_id, and limit filters. Running jobs include details["progress"] with sync phase, percent, current item, and created/updated/skipped/deleted/failed counts.

For Cloudflare R2, set endpoint_url to the R2 S3 API endpoint and omit region or use auto.

Collections

# List collections
result = await client.collections.list(name="prefix", limit=10)

# Auto-paginate through every collection
async for collection in client.collections.list_all(name="prefix"):
    print(collection["name"])

# Create
collection = await client.collections.create({
    "name": "docs",
    "embedding_provider": "openai",
    "embedding_model": "text-embedding-3-small",
})

# Get
collection = await client.collections.get("docs")

# Update
await client.collections.update("docs", {
    "description": "Updated description",
    "embedding_api_key": None,
    "multimodal_enabled": True,
    "default_search_mode": "hybrid",
})

# Delete
await client.collections.delete("docs")

# Stats
stats = await client.collections.stats("docs")

# Truncate (delete all documents, keep the collection)
await client.collections.truncate("docs")

Set embedding_api_key or reranking_api_key to None in update calls to clear the stored key.

Documents

# Upload a file (accepts str, Path, bytes, BinaryIO, or (name, data) tuple)
doc = await client.documents.upload("docs", "/path/to/file.pdf")
doc = await client.documents.upload("docs", b"raw content")
doc = await client.documents.upload("docs", ("custom.txt", b"hello"))

# Upload with metadata
doc = await client.documents.upload(
    "docs", "/path/to/file.pdf",
    metadata={"department": "engineering"},
)

Chat

# Non-streaming generated answer with citations
answer = await client.chat.create({
    "collection": "docs",
    "message": "What is the PTO policy?",
    "top_k": 8,
    "search_mode": "hybrid",
})
print(answer["assistant_message"]["content"])

# Question suggestions
suggestions = await client.chat.get_question_suggestions("docs")
fresh = await client.chat.generate_question_suggestions({"collection": "docs"})

# Stream chat SSE events
async for event in client.chat.stream({
    "collection": "docs",
    "message": "Answer with citations",
}):
    if event["event"] == "delta":
        print(event["data"]["delta"], end="")

Queries

# Single collection query
result = await client.queries.query("docs", {
    "query": "search terms",
    "top_k": 10,
    "search_mode": "hybrid",  # "semantic", "keyword", or "hybrid"
    "min_score": 0.5,
    "rerank": True,
    "skip_cache": True,
    "multimodal": True,
})

# Multi-collection query
result = await client.queries.multi_query({
    "query": "search terms",
    "collections": ["docs", "articles"],
    "top_k": 10,
    "skip_cache": True,
})

# Batch query (up to 20)
result = await client.queries.batch_query({
    "queries": [
        {"collection": "docs", "query": "first query", "skip_cache": True},
        {"collection": "articles", "query": "second query"},
    ]
})

Vectors

# Upsert custom embeddings
result = await client.vectors.upsert("docs", [
    {"id": "v1", "embedding": [0.1, 0.2, ...], "text": "chunk text"},
    {"id": "v2", "embedding": [0.3, 0.4, ...], "metadata": {"source": "api"}},
])

# Delete vectors
result = await client.vectors.delete("docs", ["v1", "v2"])

Webhooks

# Create
webhook = await client.webhooks.create({
    "url": "https://example.com/webhook",
    "events": ["collection.truncated", "connector.sync.failed"],
})
print(webhook["secret"])

# List, get, update, delete
webhooks = await client.webhooks.list(limit=50, offset=0)
wh = await client.webhooks.get("webhook-id")
await client.webhooks.update("webhook-id", {"active": False})
await client.webhooks.delete("webhook-id")

# Test
result = await client.webhooks.test("webhook-id")

# Delivery history
deliveries = await client.webhooks.list_deliveries("webhook-id", limit=10)

Webhook events cover collection and connector sync data-operation changes. Examples: collection.truncated and connector.sync.failed.

Webhook management calls /v1/admin/webhooks and requires session-cookie admin auth. API-key clients receive 403.

Status Polling

Track document processing with REST polling:

# Single document status
doc = await client.documents.get("docs", "doc-id")
while doc["status"] in ("pending", "processing"):
    await asyncio.sleep(2)
    doc = await client.documents.get("docs", doc["id"])
    print(doc["progress"]["message"] if doc.get("progress") else doc["status"])

# Batch status (e.g., after batch upload)
doc_ids = [d["id"] for d in result["documents"]]
statuses = await client.documents.batch_get_status("docs", doc_ids)

Collection-Scoped Client

For repeated operations on the same collection:

col = client.collection("docs")

doc = await col.upload("/path/to/file.pdf")
results = await col.query({"query": "search terms"})
stats = await col.stats()
analytics = await col.analytics()

Admin

settings = await client.admin.settings.list()
await client.admin.settings.test({"values": {"turbopuffer_region": "aws-us-west-2"}})
await client.admin.settings.update({"values": {"trusted_proxies": ["10.0.0.0/8"]}})
await client.admin.settings.reset({"keys": ["trusted_proxies"]})
await client.admin.settings.purge_embedding_cache()

overview = await client.admin.vector_storage.overview()

Platform Endpoints

# Health check
health = await client.health()

# Readiness (checks Postgres, Redis, Turbopuffer, embedding provider)
ready = await client.readiness()

# Platform stats
stats = await client.get_stats()

# Usage rollup
usage = await client.get_usage(window_days=30)

# Pollable admin UI status helpers
overview_status = await client.get_overview_status()
collections_status = await client.get_collections_status()
usage_status = await client.get_usage_status(window_days=30)
access_status = await client.get_access_status(window_days=7)

# Available embedding models
models = await client.list_embedding_models()

# Collection analytics
analytics = await client.collections.analytics("docs")

Error Handling

from bigrag import (
    BigRAGError,        # base for all errors
    APIError,           # any HTTP error
    BadRequestError,    # 400
    AuthenticationError, # 401
    NotFoundError,      # 404
    RateLimitError,     # 429
    InternalServerError, # 500
    APIConnectionError, # network errors
    APITimeoutError,    # timeout
)

try:
    await client.collections.get("missing")
except NotFoundError as e:
    print(f"Not found: {e} (status={e.status})")
except APIConnectionError:
    print("Cannot reach the API server")
except APITimeoutError:
    print("Request timed out")

Retry Behavior

The SDK automatically retries on:

  • HTTP 429 from proxy or infrastructure layers
  • HTTP 5xx (server errors)
  • Connection errors and timeouts

Retries use exponential backoff: min(0.5 * 2^attempt, 4) seconds. Configure with max_retries (default: 2).

The Python SDK is fully typed with py.typed support. Use it with mypy or pyright for full type checking.

On this page