Zeros and OnesLLC

title: "Python / FastAPI Quickstart" description: "Add TitaniumVault authentication to your Python FastAPI application in under 10 minutes." order: 3

Python / FastAPI Quickstart

This guide will walk you through adding TitaniumVault authentication to your Python FastAPI application.

Prerequisites

  • Python 3.9+ installed
  • A TitaniumVault account and API configured
  • A FastAPI project

Installation

Install the required packages:

pip install fastapi uvicorn python-jose[cryptography] httpx python-dotenv

Configuration

1. Set up environment variables

Create a .env file:

TV_DOMAIN=your-org.titanium-vault.com
TV_AUDIENCE=https://api.example.com
TV_ALGORITHMS=RS256

2. Create the authentication module

Create auth.py:

import os
from typing import Optional
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, JWTError
import httpx
from functools import lru_cache
from dotenv import load_dotenv

load_dotenv()

security = HTTPBearer()

TV_DOMAIN = os.getenv("TV_DOMAIN")
TV_AUDIENCE = os.getenv("TV_AUDIENCE")
TV_ALGORITHMS = os.getenv("TV_ALGORITHMS", "RS256").split(",")


class JWKSCache:
    """Cache for JWKS to avoid fetching on every request."""

    def __init__(self):
        self._keys: Optional[dict] = None

    async def get_signing_key(self, kid: str) -> dict:
        if self._keys is None:
            await self._fetch_jwks()

        for key in self._keys.get("keys", []):
            if key.get("kid") == kid:
                return key

        # Key not found, refresh cache and try again
        await self._fetch_jwks()
        for key in self._keys.get("keys", []):
            if key.get("kid") == kid:
                return key

        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Unable to find signing key",
        )

    async def _fetch_jwks(self):
        async with httpx.AsyncClient() as client:
            response = await client.get(
                f"https://{TV_DOMAIN}/.well-known/jwks.json"
            )
            response.raise_for_status()
            self._keys = response.json()


jwks_cache = JWKSCache()


async def verify_token(
    credentials: HTTPAuthorizationCredentials = Depends(security),
) -> dict:
    """Verify the JWT token and return the payload."""
    token = credentials.credentials

    try:
        # Decode header to get the key ID
        unverified_header = jwt.get_unverified_header(token)
        kid = unverified_header.get("kid")

        if not kid:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Token missing key ID",
            )

        # Get the signing key
        signing_key = await jwks_cache.get_signing_key(kid)

        # Verify and decode the token
        payload = jwt.decode(
            token,
            signing_key,
            algorithms=TV_ALGORITHMS,
            audience=TV_AUDIENCE,
            issuer=f"https://{TV_DOMAIN}/",
        )

        return payload

    except JWTError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=f"Invalid token: {str(e)}",
        )


def require_scopes(required_scopes: list[str]):
    """Dependency to check for required scopes."""

    async def check_scopes(
        payload: dict = Depends(verify_token),
    ) -> dict:
        token_scopes = payload.get("scope", "").split()

        for scope in required_scopes:
            if scope not in token_scopes:
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail=f"Missing required scope: {scope}",
                )

        return payload

    return check_scopes

Basic FastAPI Setup

Create your FastAPI app with authentication:

from fastapi import FastAPI, Depends
from fastapi.middleware.cors import CORSMiddleware
from auth import verify_token, require_scopes

app = FastAPI(title="My API")

# CORS middleware
app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:3000"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


@app.get("/api/public")
async def public_endpoint():
    """Public endpoint - no authentication required."""
    return {"message": "This is a public endpoint"}


@app.get("/api/protected")
async def protected_endpoint(payload: dict = Depends(verify_token)):
    """Protected endpoint - requires valid JWT."""
    return {
        "message": "This is a protected endpoint",
        "user_id": payload.get("sub"),
        "email": payload.get("email"),
    }


@app.get("/api/users")
async def get_users(
    payload: dict = Depends(require_scopes(["read:users"]))
):
    """Requires 'read:users' scope."""
    return {
        "users": [
            {"id": 1, "name": "Alice"},
            {"id": 2, "name": "Bob"},
        ]
    }


@app.post("/api/users")
async def create_user(
    payload: dict = Depends(require_scopes(["write:users"]))
):
    """Requires 'write:users' scope."""
    return {"success": True, "message": "User created"}


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="0.0.0.0", port=8000)

Using Pydantic Models

Create type-safe user models:

from pydantic import BaseModel
from typing import Optional


class TokenPayload(BaseModel):
    sub: str
    email: Optional[str] = None
    scope: Optional[str] = None
    exp: int
    iat: int
    iss: str
    aud: str


class User(BaseModel):
    id: str
    email: Optional[str]
    scopes: list[str]

    @classmethod
    def from_token(cls, payload: dict) -> "User":
        return cls(
            id=payload.get("sub", ""),
            email=payload.get("email"),
            scopes=payload.get("scope", "").split(),
        )


async def get_current_user(
    payload: dict = Depends(verify_token),
) -> User:
    """Convert token payload to User model."""
    return User.from_token(payload)


# Use in your routes
@app.get("/api/me")
async def get_me(user: User = Depends(get_current_user)):
    return {
        "id": user.id,
        "email": user.email,
        "scopes": user.scopes,
    }

Machine-to-Machine Authentication

Get tokens for server-to-server communication:

import httpx
import os


async def get_m2m_token() -> str:
    """Get an access token using client credentials."""
    async with httpx.AsyncClient() as client:
        response = await client.post(
            f"https://{os.getenv('TV_DOMAIN')}/oauth/token",
            json={
                "grant_type": "client_credentials",
                "client_id": os.getenv("TV_CLIENT_ID"),
                "client_secret": os.getenv("TV_CLIENT_SECRET"),
                "audience": os.getenv("TV_AUDIENCE"),
            },
        )
        response.raise_for_status()
        return response.json()["access_token"]


async def call_external_api():
    """Call another API using the M2M token."""
    token = await get_m2m_token()

    async with httpx.AsyncClient() as client:
        response = await client.get(
            "https://other-api.example.com/data",
            headers={"Authorization": f"Bearer {token}"},
        )
        return response.json()

Running the Application

# Development
uvicorn main:app --reload --port 8000

# Production
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4

Testing with cURL

# Get a token from your frontend or M2M flow
TOKEN="your-access-token"

# Test protected endpoint
curl -H "Authorization: Bearer $TOKEN" http://localhost:8000/api/protected

# Test scoped endpoint
curl -H "Authorization: Bearer $TOKEN" http://localhost:8000/api/users

Next Steps

Troubleshooting

"Token expired" errors

Ensure your frontend refreshes tokens before expiration. The exp claim in the token indicates when it expires.

JWKS fetch failures

Check network connectivity to your TitaniumVault domain. The JWKS endpoint must be accessible from your server.

Scope validation issues

Ensure the scopes are configured correctly in your TitaniumVault API settings and that tokens are requested with the appropriate scopes.