Back to posts

Building a Scalable PDF AI Analysis Pipeline with Python Microservices, Docker, Groq, and RabbitMQ

Building a Scalable PDF AI Analysis Pipeline with Python Microservices, Docker, Groq, and RabbitMQ

For developers and engineering teams, PDF documents represent a massive repository of critical information — technical specifications, research papers, financial reports, legal contracts, and customer submissions. However, PDFs are essentially locked boxes of data. Unlike structured databases or searchable codebases, you cannot query, aggregate, or analyze hundreds of PDFs simultaneously without manual effort.

We face a fundamental bottleneck: document quantity versus extraction capacity. As organizations accumulate thousands of PDFs, the gap between having information and actually leveraging it grows exponentially.

The Problem: The Document Processing Bottleneck

Traditional approaches to PDF processing create immediate friction. The challenges are architectural:

  • Synchronous Blocking — Users upload a document and wait while a single-threaded process extracts text, calls an AI API, and returns results. One slow PDF blocks everything behind it.
  • Resource Mismatch — Text extraction is CPU-intensive. AI inference is network-bound. Storage operations are I/O-heavy. Running these on a single server wastes resources during each stage.
  • Poor User Experience — Without async processing, users stare at loading spinners for minutes, unsure if the system crashed or is still working.

The Solution: An Event-Driven Microservices Pipeline

We are going to build a production-grade pipeline that decouples each processing stage into independent, scalable services. By leveraging Docker, RabbitMQ, Groq's inference API, and Streamlit, we will create a system that handles concurrent PDF uploads, processes them asynchronously, and delivers results through a polished web interface.

The core innovation here is RabbitMQ message queuing. Rather than chaining services together synchronously, each service publishes events that downstream services consume. This pattern enables horizontal scaling, fault tolerance, and independent deployment cycles.

Core Architecture

The pipeline orchestrates six specialized microservices through an event-driven workflow:

  • Streamlit UI — Users upload PDFs, select analysis types, and view real-time progress without page refreshes.
  • API Gateway (FastAPI) — Accepts HTTP uploads, generates job IDs, and returns immediately while processing happens asynchronously.
  • PDF Ingestion — Validates files, extracts metadata, stores PDFs in MinIO object storage, and publishes to the pdf.uploaded queue.
  • Text Extractor — Consumes upload events, extracts text with PyPDF2/pdfplumber, handles OCR fallbacks, and publishes to the text.ready queue.
  • AI Analyzer (Groq) — Consumes text events, sends content to Groq's Llama 3.1 or Mixtral models for summarization/classification/Q&A generation, and publishes to the analysis.done queue.
  • Results Handler — Consumes analysis events, persists results to PostgreSQL, caches in Redis, and triggers webhooks for external integrations.

Here is the complete architecture:

┌─────────────────────────────────────────────────────────────────────────────┐
│                    PDF AI ANALYSIS PIPELINE ARCHITECTURE                    │
│              (Python Microservices + Docker + RabbitMQ + Groq)              │
│                         WITH STREAMLIT FRONTEND                             │
└─────────────────────────────────────────────────────────────────────────────┘

┌──────────────────┐
│  STREAMLIT UI    │  (Web Frontend)
│  [Docker:8501]   │  - File upload interface
└────────┬─────────┘  - Real-time status dashboard
         │            - Results visualization
         │ HTTP POST/GET
         │
         ▼
┌─────────────────┐
│   API Gateway   │  (FastAPI)
│  [Docker:8000]  │  - REST endpoints
└────────┬────────┘  - Job management
         │            - SSE for real-time updates
         │
         │ HTTP POST /analyze
         │
         ▼
┌────────────────────┐
│  PDF Ingestion     │  (Python Service)
│   Microservice     │  - Validates PDF
│  [Docker:8001]     │  - Extracts metadata
└─────────┬──────────┘  - Stores in MinIO
          │
          │ Publish: pdf.uploaded
          │
          ▼
┌─────────────────────────────────────────────────────────────────────────┐
│                          RABBITMQ MESSAGE BROKER                        │
│                             [Docker:5672]                               │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐                   │
│  │ pdf.uploaded │  │ text.ready   │  │analysis.done │                   │
│  │    Queue     │  │    Queue     │  │    Queue     │                   │
│  └──────────────┘  └──────────────┘  └──────────────┘                   │
└─────────────────────────────────────────────────────────────────────────┘
          │                   │                   │
          │                   │                   │
   ┌──────▼──────┐    ┌───────▼────────┐   ┌─────▼──────┐
   │   PDF Text  │    │   AI Analysis  │   │  Results   │
   │  Extractor  │    │  Microservice  │   │  Handler   │
   │  Service    │    │    (Groq)      │   │  Service   │
   │[Docker:8002]│    │  [Docker:8003] │   │[Docker:8004]│
   └──────┬──────┘    └────────┬───────┘   └─────┬──────┘
          │                    │                  │
          │ - PyPDF2           │ - Groq API       │ - Store results
          │ - pdfplumber       │ - Llama 3        │ - PostgreSQL
          │ - OCR              │ - Summarization  │ - Redis cache
          │                    │ - Classification │ - Webhooks
          │                    │                  │
          │ Publish:           │ Publish:         │
          │ text.ready         │ analysis.done    │
          │                    │                  │
          └────────────────────┴──────────────────┘
                              │
                              ▼
                    ┌──────────────────┐
                    │   Data Storage   │
                    │                  │
                    │ - PostgreSQL     │  [Docker:5432]
                    │ - MinIO/S3       │  [Docker:9000]
                    │ - Redis Cache    │  [Docker:6379]
                    └──────────────────┘

Message Flow

  1. User uploads PDF via Streamlit UI → API Gateway
  2. Ingestion service validates → publishes to pdf.uploaded queue
  3. Text Extractor consumes → extracts text → publishes to text.ready queue
  4. AI Analyzer consumes → calls Groq API → publishes to analysis.done queue
  5. Results Handler consumes → stores results → notifies user
  6. Streamlit polls API Gateway → displays real-time progress → shows results

Part 1: The Infrastructure

We will start with the foundational layer: orchestrating services with Docker Compose and designing a database schema that supports job tracking, result storage, and caching.

Folder Structure

Treat this as a monorepo. Create the following directory tree:

mkdir pdf-ai-pipeline
cd pdf-ai-pipeline
mkdir services database
mkdir services/streamlit-ui services/api-gateway services/pdf-ingestion
mkdir services/text-extractor services/ai-analyzer services/results-handler
touch docker-compose.yml .env database/init.sql
mkdir pdf-ai-pipeline
cd pdf-ai-pipeline
mkdir services database
mkdir services/streamlit-ui services/api-gateway services/pdf-ingestion
mkdir services/text-extractor services/ai-analyzer services/results-handler
touch docker-compose.yml .env database/init.sql

The Docker Compose File

We need to orchestrate eight core services:

  • Streamlit UI — the frontend users interact with.
  • API Gateway (FastAPI) — the HTTP entry point for uploads and queries.
  • PDF Ingestion, Text Extractor, AI Analyzer, Results Handler — the processing pipeline.
  • RabbitMQ — message broker for event-driven communication.
  • PostgreSQL — persistent storage for jobs and results.
  • Redis — fast caching layer for frequently accessed results.
  • MinIO — S3-compatible object storage for raw PDFs.

Open docker-compose.yml and add this configuration:

version: '3.8'

services:
  # Frontend
  streamlit-ui:
    build: ./services/streamlit-ui
    container_name: pdf_ui
    ports:
      - "8501:8501"
    environment:
      - API_URL=http://api-gateway:8000
    networks:
      - pdf-net
    depends_on:
      - api-gateway

  # API Gateway
  api-gateway:
    build: ./services/api-gateway
    container_name: pdf_api
    ports:
      - "8000:8000"
    environment:
      - RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672
      - REDIS_URL=redis://redis:6379
      - POSTGRES_URL=postgresql://admin:secretpassword@postgres:5432/pdf_analysis
      - MINIO_URL=minio:9000
    networks:
      - pdf-net
    depends_on:
      rabbitmq:
        condition: service_healthy
      postgres:
        condition: service_healthy
      redis:
        condition: service_started

  # PDF Ingestion (2 replicas for load balancing)
  pdf-ingestion:
    build: ./services/pdf-ingestion
    environment:
      - RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672
      - MINIO_URL=minio:9000
      - MINIO_ACCESS_KEY=minioadmin
      - MINIO_SECRET_KEY=minioadmin
    networks:
      - pdf-net
    depends_on:
      rabbitmq:
        condition: service_healthy
      minio:
        condition: service_started
    deploy:
      replicas: 2

  # Text Extractor (3 replicas - CPU intensive)
  text-extractor:
    build: ./services/text-extractor
    environment:
      - RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672
    networks:
      - pdf-net
    depends_on:
      rabbitmq:
        condition: service_healthy
    deploy:
      replicas: 3

  # AI Analyzer (2 replicas)
  ai-analyzer:
    build: ./services/ai-analyzer
    environment:
      - RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672
      - GROQ_API_KEY=${GROQ_API_KEY}
    networks:
      - pdf-net
    depends_on:
      rabbitmq:
        condition: service_healthy
    deploy:
      replicas: 2

  # Results Handler
  results-handler:
    build: ./services/results-handler
    environment:
      - RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672
      - POSTGRES_URL=postgresql://admin:secretpassword@postgres:5432/pdf_analysis
      - REDIS_URL=redis://redis:6379
    networks:
      - pdf-net
    depends_on:
      rabbitmq:
        condition: service_healthy
      postgres:
        condition: service_healthy
      redis:
        condition: service_started

  # RabbitMQ with Management UI
  rabbitmq:
    image: rabbitmq:3.12-management
    container_name: pdf_queue
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      - RABBITMQ_DEFAULT_USER=guest
      - RABBITMQ_DEFAULT_PASS=guest
    networks:
      - pdf-net
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "-q", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

  # PostgreSQL for results storage
  postgres:
    image: postgres:15
    container_name: pdf_db
    environment:
      - POSTGRES_DB=pdf_analysis
      - POSTGRES_USER=admin
      - POSTGRES_PASSWORD=secretpassword
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./database/init.sql:/docker-entrypoint-initdb.d/init.sql
    networks:
      - pdf-net
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U admin -d pdf_analysis"]
      interval: 10s
      timeout: 5s
      retries: 5

  # Redis for caching
  redis:
    image: redis:7-alpine
    container_name: pdf_cache
    ports:
      - "6379:6379"
    networks:
      - pdf-net

  # MinIO (S3-compatible storage)
  minio:
    image: minio/minio
    container_name: pdf_storage
    ports:
      - "9000:9000"
      - "9001:9001"
    environment:
      - MINIO_ROOT_USER=minioadmin
      - MINIO_ROOT_PASSWORD=minioadmin
    command: server /data --console-address ":9001"
    volumes:
      - minio_data:/data
    networks:
      - pdf-net

volumes:
  postgres_data:
  minio_data:

networks:
  pdf-net:
    driver: bridge
version: '3.8'

services:
  # Frontend
  streamlit-ui:
    build: ./services/streamlit-ui
    container_name: pdf_ui
    ports:
      - "8501:8501"
    environment:
      - API_URL=http://api-gateway:8000
    networks:
      - pdf-net
    depends_on:
      - api-gateway

  # API Gateway
  api-gateway:
    build: ./services/api-gateway
    container_name: pdf_api
    ports:
      - "8000:8000"
    environment:
      - RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672
      - REDIS_URL=redis://redis:6379
      - POSTGRES_URL=postgresql://admin:secretpassword@postgres:5432/pdf_analysis
      - MINIO_URL=minio:9000
    networks:
      - pdf-net
    depends_on:
      rabbitmq:
        condition: service_healthy
      postgres:
        condition: service_healthy
      redis:
        condition: service_started

  # PDF Ingestion (2 replicas for load balancing)
  pdf-ingestion:
    build: ./services/pdf-ingestion
    environment:
      - RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672
      - MINIO_URL=minio:9000
      - MINIO_ACCESS_KEY=minioadmin
      - MINIO_SECRET_KEY=minioadmin
    networks:
      - pdf-net
    depends_on:
      rabbitmq:
        condition: service_healthy
      minio:
        condition: service_started
    deploy:
      replicas: 2

  # Text Extractor (3 replicas - CPU intensive)
  text-extractor:
    build: ./services/text-extractor
    environment:
      - RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672
    networks:
      - pdf-net
    depends_on:
      rabbitmq:
        condition: service_healthy
    deploy:
      replicas: 3

  # AI Analyzer (2 replicas)
  ai-analyzer:
    build: ./services/ai-analyzer
    environment:
      - RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672
      - GROQ_API_KEY=${GROQ_API_KEY}
    networks:
      - pdf-net
    depends_on:
      rabbitmq:
        condition: service_healthy
    deploy:
      replicas: 2

  # Results Handler
  results-handler:
    build: ./services/results-handler
    environment:
      - RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672
      - POSTGRES_URL=postgresql://admin:secretpassword@postgres:5432/pdf_analysis
      - REDIS_URL=redis://redis:6379
    networks:
      - pdf-net
    depends_on:
      rabbitmq:
        condition: service_healthy
      postgres:
        condition: service_healthy
      redis:
        condition: service_started

  # RabbitMQ with Management UI
  rabbitmq:
    image: rabbitmq:3.12-management
    container_name: pdf_queue
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      - RABBITMQ_DEFAULT_USER=guest
      - RABBITMQ_DEFAULT_PASS=guest
    networks:
      - pdf-net
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "-q", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5

  # PostgreSQL for results storage
  postgres:
    image: postgres:15
    container_name: pdf_db
    environment:
      - POSTGRES_DB=pdf_analysis
      - POSTGRES_USER=admin
      - POSTGRES_PASSWORD=secretpassword
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./database/init.sql:/docker-entrypoint-initdb.d/init.sql
    networks:
      - pdf-net
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U admin -d pdf_analysis"]
      interval: 10s
      timeout: 5s
      retries: 5

  # Redis for caching
  redis:
    image: redis:7-alpine
    container_name: pdf_cache
    ports:
      - "6379:6379"
    networks:
      - pdf-net

  # MinIO (S3-compatible storage)
  minio:
    image: minio/minio
    container_name: pdf_storage
    ports:
      - "9000:9000"
      - "9001:9001"
    environment:
      - MINIO_ROOT_USER=minioadmin
      - MINIO_ROOT_PASSWORD=minioadmin
    command: server /data --console-address ":9001"
    volumes:
      - minio_data:/data
    networks:
      - pdf-net

volumes:
  postgres_data:
  minio_data:

networks:
  pdf-net:
    driver: bridge

Environment Variables

Create a .env file to manage secrets:

# Groq API Key (get one free at console.groq.com)
GROQ_API_KEY=your_groq_api_key_here
# Groq API Key (get one free at console.groq.com)
GROQ_API_KEY=your_groq_api_key_here

Designing the Schema

We need to track job lifecycle, store analysis results, and cache frequently accessed data. Open database/init.sql:

-- Jobs Table: Track processing lifecycle
CREATE TABLE IF NOT EXISTS jobs (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    filename TEXT NOT NULL,
    status TEXT DEFAULT 'pending',  -- pending, extracting, analyzing, completed, failed
    analysis_type TEXT,              -- summary, classification, qa_generation, full
    model TEXT,                      -- llama-3.1-70b, mixtral-8x7b
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Results Table: Store analysis output
CREATE TABLE IF NOT EXISTS results (
    id SERIAL PRIMARY KEY,
    job_id UUID REFERENCES jobs(id) ON DELETE CASCADE,
    result_data JSONB NOT NULL,      -- Flexible storage for any AI output
    confidence_score FLOAT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Create indexes for fast queries
CREATE INDEX idx_jobs_status ON jobs(status);
CREATE INDEX idx_jobs_created_at ON jobs(created_at DESC);
CREATE INDEX idx_results_job_id ON results(job_id);

-- Enable full-text search on results
CREATE INDEX idx_results_data_gin ON results USING gin(result_data jsonb_path_ops);

Booting Up

Launch the infrastructure:

# Start all services
docker-compose up -d

# Verify services are healthy
docker-compose ps

# View RabbitMQ Management UI at http://localhost:15672 (guest/guest)
# View MinIO Console at http://localhost:9001 (minioadmin/minioadmin)
# Start all services
docker-compose up -d

# Verify services are healthy
docker-compose ps

# View RabbitMQ Management UI at http://localhost:15672 (guest/guest)
# View MinIO Console at http://localhost:9001 (minioadmin/minioadmin)

Part 2: The Backend Services

With infrastructure running, we will now build the four processing microservices that power the pipeline.

Service 1: API Gateway (FastAPI)

This service accepts HTTP uploads and immediately returns a job ID, enabling asynchronous processing.

Create services/api-gateway/requirements.txt:

fastapi
uvicorn
pika
psycopg2-binary
redis
python-multipart
python-dotenv

The code (services/api-gateway/main.py):

from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.responses import StreamingResponse
import pika
import psycopg2
import redis
import json
import os
from uuid import uuid4

app = FastAPI(title="PDF Analysis API")

# Connect to Infrastructure
RABBITMQ_URL = os.environ.get("RABBITMQ_URL")
POSTGRES_URL = os.environ.get("POSTGRES_URL")
REDIS_URL = os.environ.get("REDIS_URL")

redis_client = redis.from_url(REDIS_URL)

def get_db():
    return psycopg2.connect(POSTGRES_URL)

def publish_event(queue_name, message):
    connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
    channel = connection.channel()
    channel.queue_declare(queue=queue_name, durable=True)
    channel.basic_publish(
        exchange='',
        routing_key=queue_name,
        body=json.dumps(message),
        properties=pika.BasicProperties(delivery_mode=2)
    )
    connection.close()

@app.post("/analyze")
async def analyze_pdf(
    file: UploadFile = File(...),
    analysis_type: str = "summary",
    model: str = "llama-3.1-70b"
):
    job_id = str(uuid4())

    # Store job in database
    conn = get_db()
    cur = conn.cursor()
    cur.execute(
        "INSERT INTO jobs (id, filename, status, analysis_type, model) VALUES (%s, %s, 'pending', %s, %s)",
        (job_id, file.filename, analysis_type, model)
    )
    conn.commit()
    conn.close()

    # Save file temporarily and publish to queue
    file_path = f"/tmp/{job_id}.pdf"
    with open(file_path, "wb") as f:
        f.write(await file.read())

    publish_event("pdf.uploaded", {
        "job_id": job_id,
        "file_path": file_path,
        "filename": file.filename,
        "analysis_type": analysis_type,
        "model": model
    })

    return {"job_id": job_id, "status": "processing"}

@app.get("/status/{job_id}")
def get_status(job_id: str):
    # Check cache first
    cached = redis_client.get(f"job:{job_id}")
    if cached:
        return json.loads(cached)

    # Query database
    conn = get_db()
    cur = conn.cursor()
    cur.execute("SELECT status, filename FROM jobs WHERE id = %s", (job_id,))
    result = cur.fetchone()
    conn.close()

    if not result:
        raise HTTPException(status_code=404, detail="Job not found")

    status_data = {"job_id": job_id, "status": result[0], "filename": result[1]}
    redis_client.setex(f"job:{job_id}", 60, json.dumps(status_data))
    return status_data

@app.get("/results/{job_id}")
def get_results(job_id: str):
    conn = get_db()
    cur = conn.cursor()
    cur.execute(
        "SELECT result_data FROM results WHERE job_id = %s",
        (job_id,)
    )
    result = cur.fetchone()
    conn.close()

    if not result:
        raise HTTPException(status_code=404, detail="Results not found")

    return result[0]

@app.get("/metrics")
def get_metrics():
    conn = get_db()
    cur = conn.cursor()
    cur.execute("SELECT status, COUNT(*) FROM jobs GROUP BY status")
    metrics = dict(cur.fetchall())
    conn.close()
    return metrics

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.responses import StreamingResponse
import pika
import psycopg2
import redis
import json
import os
from uuid import uuid4

app = FastAPI(title="PDF Analysis API")

# Connect to Infrastructure
RABBITMQ_URL = os.environ.get("RABBITMQ_URL")
POSTGRES_URL = os.environ.get("POSTGRES_URL")
REDIS_URL = os.environ.get("REDIS_URL")

redis_client = redis.from_url(REDIS_URL)

def get_db():
    return psycopg2.connect(POSTGRES_URL)

def publish_event(queue_name, message):
    connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
    channel = connection.channel()
    channel.queue_declare(queue=queue_name, durable=True)
    channel.basic_publish(
        exchange='',
        routing_key=queue_name,
        body=json.dumps(message),
        properties=pika.BasicProperties(delivery_mode=2)
    )
    connection.close()

@app.post("/analyze")
async def analyze_pdf(
    file: UploadFile = File(...),
    analysis_type: str = "summary",
    model: str = "llama-3.1-70b"
):
    job_id = str(uuid4())

    # Store job in database
    conn = get_db()
    cur = conn.cursor()
    cur.execute(
        "INSERT INTO jobs (id, filename, status, analysis_type, model) VALUES (%s, %s, 'pending', %s, %s)",
        (job_id, file.filename, analysis_type, model)
    )
    conn.commit()
    conn.close()

    # Save file temporarily and publish to queue
    file_path = f"/tmp/{job_id}.pdf"
    with open(file_path, "wb") as f:
        f.write(await file.read())

    publish_event("pdf.uploaded", {
        "job_id": job_id,
        "file_path": file_path,
        "filename": file.filename,
        "analysis_type": analysis_type,
        "model": model
    })

    return {"job_id": job_id, "status": "processing"}

@app.get("/status/{job_id}")
def get_status(job_id: str):
    # Check cache first
    cached = redis_client.get(f"job:{job_id}")
    if cached:
        return json.loads(cached)

    # Query database
    conn = get_db()
    cur = conn.cursor()
    cur.execute("SELECT status, filename FROM jobs WHERE id = %s", (job_id,))
    result = cur.fetchone()
    conn.close()

    if not result:
        raise HTTPException(status_code=404, detail="Job not found")

    status_data = {"job_id": job_id, "status": result[0], "filename": result[1]}
    redis_client.setex(f"job:{job_id}", 60, json.dumps(status_data))
    return status_data

@app.get("/results/{job_id}")
def get_results(job_id: str):
    conn = get_db()
    cur = conn.cursor()
    cur.execute(
        "SELECT result_data FROM results WHERE job_id = %s",
        (job_id,)
    )
    result = cur.fetchone()
    conn.close()

    if not result:
        raise HTTPException(status_code=404, detail="Results not found")

    return result[0]

@app.get("/metrics")
def get_metrics():
    conn = get_db()
    cur = conn.cursor()
    cur.execute("SELECT status, COUNT(*) FROM jobs GROUP BY status")
    metrics = dict(cur.fetchall())
    conn.close()
    return metrics

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

The Dockerfile (services/api-gateway/Dockerfile):

FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "main.py"]

Service 2: PDF Ingestion

This service validates PDFs, extracts metadata, and stores files in MinIO.

Create services/pdf-ingestion/requirements.txt:

pika
PyPDF2
minio
python-dotenv

The code (services/pdf-ingestion/worker.py):

import pika
import json
import os
from minio import Minio
from PyPDF2 import PdfReader

RABBITMQ_URL = os.environ.get("RABBITMQ_URL")
MINIO_URL = os.environ.get("MINIO_URL")
MINIO_ACCESS = os.environ.get("MINIO_ACCESS_KEY")
MINIO_SECRET = os.environ.get("MINIO_SECRET_KEY")

minio_client = Minio(MINIO_URL, access_key=MINIO_ACCESS, secret_key=MINIO_SECRET, secure=False)

# Ensure bucket exists
if not minio_client.bucket_exists("pdfs"):
    minio_client.make_bucket("pdfs")

def process_upload(ch, method, properties, body):
    data = json.loads(body)
    job_id = data['job_id']
    file_path = data['file_path']

    try:
        # Validate PDF
        reader = PdfReader(file_path)
        page_count = len(reader.pages)

        # Store in MinIO
        minio_client.fput_object("pdfs", f"{job_id}.pdf", file_path)

        # Publish to next queue
        connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
        channel = connection.channel()
        channel.queue_declare(queue="text.extraction", durable=True)
        channel.basic_publish(
            exchange='',
            routing_key="text.extraction",
            body=json.dumps({
                **data,
                "page_count": page_count,
                "minio_path": f"pdfs/{job_id}.pdf"
            }),
            properties=pika.BasicProperties(delivery_mode=2)
        )
        connection.close()

        os.remove(file_path)
        ch.basic_ack(delivery_tag=method.delivery_tag)
        print(f"Processed: {data['filename']}")

    except Exception as e:
        print(f"Error: {e}")
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

# Start Consumer
connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
channel = connection.channel()
channel.queue_declare(queue="pdf.uploaded", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue="pdf.uploaded", on_message_callback=process_upload)

print("PDF Ingestion Service Started...")
channel.start_consuming()
import pika
import json
import os
from minio import Minio
from PyPDF2 import PdfReader

RABBITMQ_URL = os.environ.get("RABBITMQ_URL")
MINIO_URL = os.environ.get("MINIO_URL")
MINIO_ACCESS = os.environ.get("MINIO_ACCESS_KEY")
MINIO_SECRET = os.environ.get("MINIO_SECRET_KEY")

minio_client = Minio(MINIO_URL, access_key=MINIO_ACCESS, secret_key=MINIO_SECRET, secure=False)

# Ensure bucket exists
if not minio_client.bucket_exists("pdfs"):
    minio_client.make_bucket("pdfs")

def process_upload(ch, method, properties, body):
    data = json.loads(body)
    job_id = data['job_id']
    file_path = data['file_path']

    try:
        # Validate PDF
        reader = PdfReader(file_path)
        page_count = len(reader.pages)

        # Store in MinIO
        minio_client.fput_object("pdfs", f"{job_id}.pdf", file_path)

        # Publish to next queue
        connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
        channel = connection.channel()
        channel.queue_declare(queue="text.extraction", durable=True)
        channel.basic_publish(
            exchange='',
            routing_key="text.extraction",
            body=json.dumps({
                **data,
                "page_count": page_count,
                "minio_path": f"pdfs/{job_id}.pdf"
            }),
            properties=pika.BasicProperties(delivery_mode=2)
        )
        connection.close()

        os.remove(file_path)
        ch.basic_ack(delivery_tag=method.delivery_tag)
        print(f"Processed: {data['filename']}")

    except Exception as e:
        print(f"Error: {e}")
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

# Start Consumer
connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
channel = connection.channel()
channel.queue_declare(queue="pdf.uploaded", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue="pdf.uploaded", on_message_callback=process_upload)

print("PDF Ingestion Service Started...")
channel.start_consuming()

The Dockerfile (services/pdf-ingestion/Dockerfile):

FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "worker.py"]

Service 3: Text Extractor

This service extracts text from PDFs using PyPDF2 or pdfplumber, with OCR fallback.

Create services/text-extractor/requirements.txt:

pika
PyPDF2
pdfplumber
pytesseract
pdf2image
python-dotenv

The code (services/text-extractor/worker.py):

import pika
import json
import os
from PyPDF2 import PdfReader
import pdfplumber

RABBITMQ_URL = os.environ.get("RABBITMQ_URL")

def extract_text(file_path):
    text = ""

    # Try PyPDF2 first
    try:
        reader = PdfReader(file_path)
        for page in reader.pages:
            text += page.extract_text()
    except:
        pass

    # Fallback to pdfplumber if PyPDF2 fails
    if len(text.strip()) < 100:
        with pdfplumber.open(file_path) as pdf:
            for page in pdf.pages:
                text += page.extract_text() or ""

    return text.strip()

def process_extraction(ch, method, properties, body):
    data = json.loads(body)
    job_id = data['job_id']

    try:
        # Download from MinIO (simplified - assume local for demo)
        file_path = f"/tmp/{job_id}.pdf"

        text = extract_text(file_path)

        # Publish to analysis queue
        connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
        channel = connection.channel()
        channel.queue_declare(queue="text.ready", durable=True)
        channel.basic_publish(
            exchange='',
            routing_key="text.ready",
            body=json.dumps({
                **data,
                "extracted_text": text
            }),
            properties=pika.BasicProperties(delivery_mode=2)
        )
        connection.close()

        ch.basic_ack(delivery_tag=method.delivery_tag)
        print(f"Extracted text from: {data['filename']}")

    except Exception as e:
        print(f"Error: {e}")
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

# Start Consumer
connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
channel = connection.channel()
channel.queue_declare(queue="text.extraction", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue="text.extraction", on_message_callback=process_extraction)

print("Text Extractor Service Started...")
channel.start_consuming()
import pika
import json
import os
from PyPDF2 import PdfReader
import pdfplumber

RABBITMQ_URL = os.environ.get("RABBITMQ_URL")

def extract_text(file_path):
    text = ""

    # Try PyPDF2 first
    try:
        reader = PdfReader(file_path)
        for page in reader.pages:
            text += page.extract_text()
    except:
        pass

    # Fallback to pdfplumber if PyPDF2 fails
    if len(text.strip()) < 100:
        with pdfplumber.open(file_path) as pdf:
            for page in pdf.pages:
                text += page.extract_text() or ""

    return text.strip()

def process_extraction(ch, method, properties, body):
    data = json.loads(body)
    job_id = data['job_id']

    try:
        # Download from MinIO (simplified - assume local for demo)
        file_path = f"/tmp/{job_id}.pdf"

        text = extract_text(file_path)

        # Publish to analysis queue
        connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
        channel = connection.channel()
        channel.queue_declare(queue="text.ready", durable=True)
        channel.basic_publish(
            exchange='',
            routing_key="text.ready",
            body=json.dumps({
                **data,
                "extracted_text": text
            }),
            properties=pika.BasicProperties(delivery_mode=2)
        )
        connection.close()

        ch.basic_ack(delivery_tag=method.delivery_tag)
        print(f"Extracted text from: {data['filename']}")

    except Exception as e:
        print(f"Error: {e}")
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

# Start Consumer
connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
channel = connection.channel()
channel.queue_declare(queue="text.extraction", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue="text.extraction", on_message_callback=process_extraction)

print("Text Extractor Service Started...")
channel.start_consuming()

The Dockerfile (services/text-extractor/Dockerfile):

FROM python:3.9-slim
RUN apt-get update && apt-get install -y tesseract-ocr && rm -rf /var/lib/apt/lists/*
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "worker.py"]

Service 4: AI Analyzer (Groq)

This is where Groq delivers high-speed inference for document analysis.

Create services/ai-analyzer/requirements.txt:

pika
groq
python-dotenv

The code (services/ai-analyzer/worker.py):

import pika
import json
import os
from groq import Groq

RABBITMQ_URL = os.environ.get("RABBITMQ_URL")
GROQ_API_KEY = os.environ.get("GROQ_API_KEY")

client = Groq(api_key=GROQ_API_KEY)

def analyze_text(text, analysis_type, model):
    prompts = {
        "summary": "Provide a concise summary of this document in 3-5 bullet points.",
        "classification": "Classify this document by type and main topics.",
        "qa_generation": "Generate 5 question-answer pairs from this document.",
        "full": "Provide a comprehensive analysis including summary, key entities, and main themes."
    }

    prompt = prompts.get(analysis_type, prompts["summary"])

    response = client.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": "You are a helpful document analysis assistant."},
            {"role": "user", "content": f"{prompt}\n\nDocument:\n{text[:8000]}"}
        ],
        temperature=0.3
    )

    return response.choices[0].message.content

def process_analysis(ch, method, properties, body):
    data = json.loads(body)
    job_id = data['job_id']

    try:
        result = analyze_text(
            data['extracted_text'],
            data['analysis_type'],
            data['model']
        )

        # Publish results
        connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
        channel = connection.channel()
        channel.queue_declare(queue="analysis.done", durable=True)
        channel.basic_publish(
            exchange='',
            routing_key="analysis.done",
            body=json.dumps({
                "job_id": job_id,
                "result": result
            }),
            properties=pika.BasicProperties(delivery_mode=2)
        )
        connection.close()

        ch.basic_ack(delivery_tag=method.delivery_tag)
        print(f"Analyzed: {data['filename']}")

    except Exception as e:
        print(f"Error: {e}")
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

# Start Consumer
connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
channel = connection.channel()
channel.queue_declare(queue="text.ready", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue="text.ready", on_message_callback=process_analysis)

print("AI Analyzer Service Started...")
channel.start_consuming()
import pika
import json
import os
from groq import Groq

RABBITMQ_URL = os.environ.get("RABBITMQ_URL")
GROQ_API_KEY = os.environ.get("GROQ_API_KEY")

client = Groq(api_key=GROQ_API_KEY)

def analyze_text(text, analysis_type, model):
    prompts = {
        "summary": "Provide a concise summary of this document in 3-5 bullet points.",
        "classification": "Classify this document by type and main topics.",
        "qa_generation": "Generate 5 question-answer pairs from this document.",
        "full": "Provide a comprehensive analysis including summary, key entities, and main themes."
    }

    prompt = prompts.get(analysis_type, prompts["summary"])

    response = client.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": "You are a helpful document analysis assistant."},
            {"role": "user", "content": f"{prompt}\n\nDocument:\n{text[:8000]}"}
        ],
        temperature=0.3
    )

    return response.choices[0].message.content

def process_analysis(ch, method, properties, body):
    data = json.loads(body)
    job_id = data['job_id']

    try:
        result = analyze_text(
            data['extracted_text'],
            data['analysis_type'],
            data['model']
        )

        # Publish results
        connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
        channel = connection.channel()
        channel.queue_declare(queue="analysis.done", durable=True)
        channel.basic_publish(
            exchange='',
            routing_key="analysis.done",
            body=json.dumps({
                "job_id": job_id,
                "result": result
            }),
            properties=pika.BasicProperties(delivery_mode=2)
        )
        connection.close()

        ch.basic_ack(delivery_tag=method.delivery_tag)
        print(f"Analyzed: {data['filename']}")

    except Exception as e:
        print(f"Error: {e}")
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

# Start Consumer
connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
channel = connection.channel()
channel.queue_declare(queue="text.ready", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue="text.ready", on_message_callback=process_analysis)

print("AI Analyzer Service Started...")
channel.start_consuming()

The Dockerfile (services/ai-analyzer/Dockerfile):

FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "worker.py"]

Service 5: Results Handler

The final service persists results to PostgreSQL and caches in Redis.

Create services/results-handler/requirements.txt:

pika
psycopg2-binary
redis
python-dotenv

The code (services/results-handler/worker.py):

import pika
import json
import os
import psycopg2
import redis

RABBITMQ_URL = os.environ.get("RABBITMQ_URL")
POSTGRES_URL = os.environ.get("POSTGRES_URL")
REDIS_URL = os.environ.get("REDIS_URL")

redis_client = redis.from_url(REDIS_URL)

def get_db():
    return psycopg2.connect(POSTGRES_URL)

def store_results(ch, method, properties, body):
    data = json.loads(body)
    job_id = data['job_id']

    try:
        conn = get_db()
        cur = conn.cursor()

        # Store result
        cur.execute(
            "INSERT INTO results (job_id, result_data) VALUES (%s, %s)",
            (job_id, json.dumps({"analysis": data['result']}))
        )

        # Update job status
        cur.execute(
            "UPDATE jobs SET status = 'completed', updated_at = CURRENT_TIMESTAMP WHERE id = %s",
            (job_id,)
        )

        conn.commit()
        conn.close()

        # Invalidate cache
        redis_client.delete(f"job:{job_id}")

        ch.basic_ack(delivery_tag=method.delivery_tag)
        print(f"Stored results for: {job_id}")

    except Exception as e:
        print(f"Error: {e}")
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

# Start Consumer
connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
channel = connection.channel()
channel.queue_declare(queue="analysis.done", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue="analysis.done", on_message_callback=store_results)

print("Results Handler Service Started...")
channel.start_consuming()
import pika
import json
import os
import psycopg2
import redis

RABBITMQ_URL = os.environ.get("RABBITMQ_URL")
POSTGRES_URL = os.environ.get("POSTGRES_URL")
REDIS_URL = os.environ.get("REDIS_URL")

redis_client = redis.from_url(REDIS_URL)

def get_db():
    return psycopg2.connect(POSTGRES_URL)

def store_results(ch, method, properties, body):
    data = json.loads(body)
    job_id = data['job_id']

    try:
        conn = get_db()
        cur = conn.cursor()

        # Store result
        cur.execute(
            "INSERT INTO results (job_id, result_data) VALUES (%s, %s)",
            (job_id, json.dumps({"analysis": data['result']}))
        )

        # Update job status
        cur.execute(
            "UPDATE jobs SET status = 'completed', updated_at = CURRENT_TIMESTAMP WHERE id = %s",
            (job_id,)
        )

        conn.commit()
        conn.close()

        # Invalidate cache
        redis_client.delete(f"job:{job_id}")

        ch.basic_ack(delivery_tag=method.delivery_tag)
        print(f"Stored results for: {job_id}")

    except Exception as e:
        print(f"Error: {e}")
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

# Start Consumer
connection = pika.BlockingConnection(pika.URLParameters(RABBITMQ_URL))
channel = connection.channel()
channel.queue_declare(queue="analysis.done", durable=True)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue="analysis.done", on_message_callback=store_results)

print("Results Handler Service Started...")
channel.start_consuming()

The Dockerfile (services/results-handler/Dockerfile):

FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["python", "worker.py"]

Part 3: The Streamlit Frontend

The UI provides an intuitive interface for uploads, real-time monitoring, and result visualization.

Create services/streamlit-ui/requirements.txt:

streamlit
requests
python-dotenv

The code (services/streamlit-ui/app.py):

import streamlit as st
import requests
import time
import json

API_URL = "http://api-gateway:8000"

st.set_page_config(page_title="PDF AI Analysis", layout="wide")
st.title("PDF AI Analysis Pipeline")

tab1, tab2, tab3 = st.tabs(["Upload & Analyze", "Dashboard", "History"])

# TAB 1: Upload
with tab1:
    uploaded_file = st.file_uploader("Upload PDF", type=["pdf"])

    col1, col2 = st.columns(2)
    with col1:
        analysis_type = st.selectbox(
            "Analysis Type",
            ["summary", "classification", "qa_generation", "full"]
        )
    with col2:
        model = st.selectbox("Model", ["llama-3.1-70b", "mixtral-8x7b"])

    if st.button("Start Analysis") and uploaded_file:
        with st.spinner("Uploading..."):
            files = {"file": uploaded_file}
            data = {"analysis_type": analysis_type, "model": model}
            response = requests.post(f"{API_URL}/analyze", files=files, data=data)
            job_id = response.json()["job_id"]

            st.success(f"Job started: {job_id}")

            # Poll for status
            progress_bar = st.progress(0)
            status_text = st.empty()

            while True:
                status_response = requests.get(f"{API_URL}/status/{job_id}")
                status = status_response.json()["status"]
                status_text.text(f"Status: {status}")

                if status == "completed":
                    progress_bar.progress(100)
                    results = requests.get(f"{API_URL}/results/{job_id}").json()
                    st.json(results)
                    break
                elif status == "failed":
                    st.error("Analysis failed")
                    break

                progress_bar.progress(50)
                time.sleep(2)

# TAB 2: Dashboard
with tab2:
    metrics = requests.get(f"{API_URL}/metrics").json()

    col1, col2, col3 = st.columns(3)
    col1.metric("Total Jobs", sum(metrics.values()))
    col2.metric("Completed", metrics.get("completed", 0))
    col3.metric("Failed", metrics.get("failed", 0))

# TAB 3: History
with tab3:
    st.write("Coming soon: Job history and search")
import streamlit as st
import requests
import time
import json

API_URL = "http://api-gateway:8000"

st.set_page_config(page_title="PDF AI Analysis", layout="wide")
st.title("PDF AI Analysis Pipeline")

tab1, tab2, tab3 = st.tabs(["Upload & Analyze", "Dashboard", "History"])

# TAB 1: Upload
with tab1:
    uploaded_file = st.file_uploader("Upload PDF", type=["pdf"])

    col1, col2 = st.columns(2)
    with col1:
        analysis_type = st.selectbox(
            "Analysis Type",
            ["summary", "classification", "qa_generation", "full"]
        )
    with col2:
        model = st.selectbox("Model", ["llama-3.1-70b", "mixtral-8x7b"])

    if st.button("Start Analysis") and uploaded_file:
        with st.spinner("Uploading..."):
            files = {"file": uploaded_file}
            data = {"analysis_type": analysis_type, "model": model}
            response = requests.post(f"{API_URL}/analyze", files=files, data=data)
            job_id = response.json()["job_id"]

            st.success(f"Job started: {job_id}")

            # Poll for status
            progress_bar = st.progress(0)
            status_text = st.empty()

            while True:
                status_response = requests.get(f"{API_URL}/status/{job_id}")
                status = status_response.json()["status"]
                status_text.text(f"Status: {status}")

                if status == "completed":
                    progress_bar.progress(100)
                    results = requests.get(f"{API_URL}/results/{job_id}").json()
                    st.json(results)
                    break
                elif status == "failed":
                    st.error("Analysis failed")
                    break

                progress_bar.progress(50)
                time.sleep(2)

# TAB 2: Dashboard
with tab2:
    metrics = requests.get(f"{API_URL}/metrics").json()

    col1, col2, col3 = st.columns(3)
    col1.metric("Total Jobs", sum(metrics.values()))
    col2.metric("Completed", metrics.get("completed", 0))
    col3.metric("Failed", metrics.get("failed", 0))

# TAB 3: History
with tab3:
    st.write("Coming soon: Job history and search")

The Dockerfile (services/streamlit-ui/Dockerfile):

FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8501
CMD ["streamlit", "run", "app.py", "--server.port=8501", "--server.address=0.0.0.0"]

Final Integration: Launch Day

Running the Stack

Build and run:

docker-compose up -d --build
docker-compose up -d --build
  • Access the UI: open http://localhost:8501
  • Upload a PDF: select analysis type and model, then watch real-time processing
  • Monitor queues: visit RabbitMQ Management at http://localhost:15672

You have just built a production-grade PDF analysis pipeline. The system scales horizontally, handles failures gracefully through RabbitMQ acknowledgments, and leverages Groq's inference speed for real-time document processing.

  • Scalability — add replicas to any service independently based on bottlenecks.
  • Cost efficiency — Groq's API is 10-100x faster than alternatives, reducing processing time and costs.
  • User experience — Streamlit provides immediate feedback while processing happens asynchronously in the background.

This architecture demonstrates the power of event-driven microservices and local-first AI integration. Each service owns a single responsibility, communicates through well-defined message contracts, and can be developed and deployed independently by different teams.

Happy coding!