Files
aboutme_chat/code_1.md
Sam Rolfe 628ba96998 Initial commit: Multi-service AI agent system
- Frontend: Vite + React + TypeScript chat interface
- Backend: FastAPI gateway with LangGraph routing
- Knowledge Service: ChromaDB RAG with Gitea scraper
- LangGraph Service: Multi-agent orchestration
- Airflow: Scheduled Gitea ingestion DAG
- Documentation: Complete plan and implementation guides

Architecture:
- Modular Docker Compose per service
- External ai-mesh network for communication
- Fast rebuilds with /app/packages pattern
- Intelligent agent routing (no hardcoded keywords)

Services:
- Frontend (5173): React chat UI
- Chat Gateway (8000): FastAPI entry point
- LangGraph (8090): Agent orchestration
- Knowledge (8080): ChromaDB RAG
- Airflow (8081): Scheduled ingestion
- PostgreSQL (5432): Chat history

Excludes: node_modules, .venv, chroma_db, logs, .env files
Includes: All source code, configs, docs, docker files
2026-02-27 19:51:06 +11:00

32 KiB

Modular Implementation: Gitea, Airflow, and LangGraph

Overview

Self-contained modular architecture with separate docker-compose files per service. Each service has its own packages directory for fast rebuilds.


Phase 1: LangGraph Supervisor Service

Directory Structure

/home/sam/development/langgraph_service/
├── docker-compose.yml
├── Dockerfile
├── requirements.txt
├── main.py
└── supervisor_agent.py

File: /home/sam/development/langgraph_service/requirements.txt

fastapi
uvicorn
langgraph
langchain
langchain-community
langchain-openai
httpx
pydantic

File: /home/sam/development/langgraph_service/Dockerfile

FROM python:3.11-slim

# Install system dependencies
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    && rm -rf /var/lib/apt/lists/*

# Create directories
RUN mkdir -p /app/packages /app/code

WORKDIR /app

# Install packages to isolated directory
COPY requirements.txt .
RUN pip install --target=/app/packages -r requirements.txt

# Copy code
COPY . /app/code/

ENV PYTHONPATH=/app/packages
ENV PYTHONUNBUFFERED=1

WORKDIR /app/code
EXPOSE 8090

CMD ["python3", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8090"]

File: /home/sam/development/langgraph_service/supervisor_agent.py

"""
LangGraph Supervisor Agent - Routes queries to specialist agents
"""
from typing import TypedDict, Annotated, Sequence
from langgraph.graph import StateGraph, END
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
import operator
import httpx
import os
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# State definition
class AgentState(TypedDict):
    messages: Annotated[Sequence[BaseMessage], operator.add]
    next_agent: str
    context: dict

# Agent routing logic
def supervisor_node(state: AgentState):
    """Supervisor decides which specialist agent to call."""
    last_message = state["messages"][-1].content.lower()
    
    # Simple routing logic based on keywords
    if any(kw in last_message for kw in ["repo", "code", "git", "github", "gitea", "project", "development"]):
        return {"next_agent": "librarian"}
    elif any(kw in last_message for kw in ["write", "edit", "create", "fix", "bug", "implement", "code change"]):
        return {"next_agent": "opencode"}
    elif any(kw in last_message for kw in ["sam", "hobby", "music", "experience", "skill", "about"]):
        return {"next_agent": "librarian"}
    else:
        return {"next_agent": "brain"}  # Default to general LLM

def librarian_agent(state: AgentState):
    """Librarian agent - queries knowledge base (ChromaDB)."""
    last_message = state["messages"][-1].content
    
    try:
        # Call knowledge service
        response = httpx.post(
            "http://knowledge-service:8080/query",
            json={"question": last_message},
            timeout=10.0
        )
        
        if response.status_code == 200:
            context = response.json().get("context", "")
            return {
                "messages": [AIMessage(content=f"Based on my knowledge base:\n\n{context}")],
                "context": {"source": "librarian", "context": context}
            }
    except Exception as e:
        logger.error(f"Librarian error: {e}")
    
    return {
        "messages": [AIMessage(content="I couldn't find relevant information in the knowledge base.")],
        "context": {"source": "librarian", "error": str(e)}
    }

def opencode_agent(state: AgentState):
    """Opencode agent - handles coding tasks via MCP."""
    last_message = state["messages"][-1].content
    
    # Placeholder - would integrate with opencode-brain
    return {
        "messages": [AIMessage(content=f"I'm the coding agent. I would help you with: {last_message}")],
        "context": {"source": "opencode", "action": "coding_task"}
    }

def brain_agent(state: AgentState):
    """Brain agent - general LLM fallback."""
    last_message = state["messages"][-1].content
    
    try:
        # Call opencode-brain service
        auth = httpx.BasicAuth("opencode", os.getenv("OPENCODE_PASSWORD", "sam4jo"))
        timeout_long = httpx.Timeout(180.0, connect=10.0)
        
        with httpx.AsyncClient(auth=auth, timeout=timeout_long) as client:
            # Create session
            session_res = client.post("http://opencode-brain:5000/session", json={"title": "Supervisor Query"})
            session_id = session_res.json()["id"]
            
            # Send message
            response = client.post(
                f"http://opencode-brain:5000/session/{session_id}/message",
                json={"parts": [{"type": "text", "text": last_message}]}
            )
            
            data = response.json()
            if "parts" in data:
                for part in data["parts"]:
                    if part.get("type") == "text":
                        return {
                            "messages": [AIMessage(content=part["text"])],
                            "context": {"source": "brain"}
                        }
    except Exception as e:
        logger.error(f"Brain error: {e}")
    
    return {
        "messages": [AIMessage(content="I'm thinking about this...")],
        "context": {"source": "brain"}
    }

def route_decision(state: AgentState):
    """Routing function based on supervisor decision."""
    return state["next_agent"]

# Build the graph
workflow = StateGraph(AgentState)

# Add nodes
workflow.add_node("supervisor", supervisor_node)
workflow.add_node("librarian", librarian_agent)
workflow.add_node("opencode", opencode_agent)
workflow.add_node("brain", brain_agent)

# Add edges
workflow.set_entry_point("supervisor")

# Conditional routing from supervisor
workflow.add_conditional_edges(
    "supervisor",
    route_decision,
    {
        "librarian": "librarian",
        "opencode": "opencode",
        "brain": "brain"
    }
)

# All specialist agents end
workflow.add_edge("librarian", END)
workflow.add_edge("opencode", END)
workflow.add_edge("brain", END)

# Compile the graph
supervisor_graph = workflow.compile()

# Main entry point for queries
async def process_query(query: str) -> dict:
    """Process a query through the supervisor graph."""
    result = await supervisor_graph.ainvoke({
        "messages": [HumanMessage(content=query)],
        "next_agent": "",
        "context": {}
    })
    
    return {
        "response": result["messages"][-1].content,
        "context": result.get("context", {})
    }

File: /home/sam/development/langgraph_service/main.py

"""
LangGraph Supervisor Service - FastAPI wrapper for agent orchestration
"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from supervisor_agent import process_query
import logging
import sys

logging.basicConfig(level=logging.INFO, stream=sys.stdout)
logger = logging.getLogger(__name__)

app = FastAPI(title="LangGraph Supervisor Service")

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

class QueryRequest(BaseModel):
    query: str

class QueryResponse(BaseModel):
    response: str
    agent_used: str
    context: dict

@app.get("/health")
async def health():
    return {"status": "healthy", "service": "langgraph-supervisor"}

@app.post("/query", response_model=QueryResponse)
async def query_supervisor(request: QueryRequest):
    """Main entry point for agent orchestration."""
    logger.info(f"Received query: {request.query}")
    
    try:
        result = await process_query(request.query)
        
        return QueryResponse(
            response=result["response"],
            agent_used=result["context"].get("source", "unknown"),
            context=result["context"]
        )
    except Exception as e:
        logger.error(f"Error processing query: {e}")
        return QueryResponse(
            response="Error processing your request",
            agent_used="error",
            context={"error": str(e)}
        )

@app.get("/agents")
async def list_agents():
    """List available specialist agents."""
    return {
        "agents": [
            {
                "name": "librarian",
                "description": "Queries the knowledge base for semantic information",
                "triggers": ["repo", "code", "git", "hobby", "about", "skill"]
            },
            {
                "name": "opencode",
                "description": "Handles coding tasks and file modifications",
                "triggers": ["write", "edit", "create", "fix", "implement"]
            },
            {
                "name": "brain",
                "description": "General LLM for reasoning and generation",
                "triggers": ["default", "general questions"]
            }
        ]
    }

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8090)

File: /home/sam/development/langgraph_service/docker-compose.yml

version: '3.8'

services:
  langgraph-service:
    build: .
    image: langgraph-service:latest
    container_name: langgraph-service
    ports:
      - "8090:8090"
    volumes:
      # Only mount code files, not packages
      - ./main.py:/app/code/main.py:ro
      - ./supervisor_agent.py:/app/code/supervisor_agent.py:ro
    environment:
      - PYTHONUNBUFFERED=1
      - PYTHONPATH=/app/packages
      - OPENCODE_PASSWORD=${OPENCODE_PASSWORD:-sam4jo}
      - KNOWLEDGE_SERVICE_URL=http://knowledge-service:8080
    networks:
      - ai-mesh
    restart: unless-stopped

networks:
  ai-mesh:
    external: true

Phase 2: Updated Chat Gateway (Replaces Hardcoded Logic)

File: /home/sam/development/aboutme_chat_demo/backend/main.py

"""
Chat Gateway - Routes all queries through LangGraph Supervisor
Removes hardcoded keywords, uses intelligent routing instead
"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import httpx
import logging
import sys
import traceback
import os

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.StreamHandler(sys.stdout)])
logger = logging.getLogger(__name__)

app = FastAPI()
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])

class MessageRequest(BaseModel):
    message: str

LANGGRAPH_URL = os.getenv("LANGGRAPH_URL", "http://langgraph-service:8090")

@app.post("/chat")
async def chat(request: MessageRequest):
    """
    Routes all queries through LangGraph Supervisor.
    No hardcoded keywords - LangGraph intelligently routes to:
    - Librarian: For knowledge base queries (RAG)
    - Opencode: For coding tasks
    - Brain: For general LLM queries
    """
    logger.info(f"Gateway: Routing query to LangGraph: {request.message}")
    
    try:
        async with httpx.AsyncClient(timeout=httpx.Timeout(60.0, connect=10.0)) as client:
            response = await client.post(
                f"{LANGGRAPH_URL}/query",
                json={"query": request.message}
            )
            
            if response.status_code == 200:
                result = response.json()
                agent_used = result.get("agent_used", "unknown")
                logger.info(f"Gateway: Response from {agent_used} agent")
                return {"response": result["response"]}
            else:
                logger.error(f"Gateway: LangGraph error {response.status_code}")
                return {"response": "Error: Orchestration service unavailable"}
                
    except Exception as e:
        logger.error(f"Gateway: Error routing through LangGraph: {traceback.format_exc()}")
        return {"response": "Error: Unable to process your request at this time."}

@app.get("/health")
async def health():
    return {"status": "healthy", "service": "chat-gateway"}

@app.get("/agents")
async def list_agents():
    """List available agents from LangGraph."""
    try:
        async with httpx.AsyncClient(timeout=httpx.Timeout(10.0)) as client:
            response = await client.get(f"{LANGGRAPH_URL}/agents")
            if response.status_code == 200:
                return response.json()
    except Exception as e:
        logger.error(f"Error fetching agents: {e}")
    
    return {"agents": [], "error": "Could not retrieve agent list"}

File: /home/sam/development/aboutme_chat_demo/docker-compose.yml

version: '3.8'

services:
  db:
    image: postgres:15-alpine
    environment:
      POSTGRES_USER: sam
      POSTGRES_PASSWORD: sam4jo
      POSTGRES_DB: chat_demo
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
    networks:
      - ai-mesh
    restart: unless-stopped

  backend:
    build: ./backend
    ports:
      - "8000:8000"
    environment:
      DATABASE_URL: postgresql://sam:sam4jo@db:5432/chat_demo
      LANGGRAPH_URL: http://langgraph-service:8090
    volumes:
      - ./backend:/app
    depends_on:
      - db
      - langgraph-service
    networks:
      - ai-mesh
    restart: unless-stopped

  frontend:
    build: ./frontend
    ports:
      - "5173:5173"
    volumes:
      - ./frontend:/app
      - /app/node_modules
    environment:
      - CHOKIDAR_USEPOLLING=true
    depends_on:
      - backend
    networks:
      - ai-mesh

volumes:
  postgres_data:

networks:
  ai-mesh:
    external: true

Phase 3: Gitea Scraper Module

File: /home/sam/development/knowledge_service/gitea_scraper.py

"""
Gitea API Scraper - Fetches repos, READMEs, and source code
for ingestion into the knowledge base.
"""
import os
import httpx
import logging
from typing import List, Dict, Optional
from dataclasses import dataclass
from datetime import datetime

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@dataclass
class RepoMetadata:
    name: str
    description: str
    url: str
    default_branch: str
    updated_at: str
    language: Optional[str]

class GiteaScraper:
    def __init__(self, base_url: str, token: str, username: str = "sam"):
        self.base_url = base_url.rstrip("/")
        self.token = token
        self.username = username
        self.headers = {"Authorization": f"token {token}"}
        
    def get_user_repos(self) -> List[RepoMetadata]:
        """Fetch all repositories for the user."""
        repos = []
        page = 1
        
        while True:
            url = f"{self.base_url}/api/v1/users/{self.username}/repos?page={page}&limit=50"
            
            try:
                response = httpx.get(url, headers=self.headers, timeout=30.0)
                response.raise_for_status()
                
                data = response.json()
                if not data:
                    break
                    
                for repo in data:
                    repos.append(RepoMetadata(
                        name=repo["name"],
                        description=repo.get("description", ""),
                        url=repo["html_url"],
                        default_branch=repo["default_branch"],
                        updated_at=repo["updated_at"],
                        language=repo.get("language")
                    ))
                
                logger.info(f"Fetched page {page}, got {len(data)} repos")
                page += 1
                
            except Exception as e:
                logger.error(f"Error fetching repos: {e}")
                break
                
        return repos
    
    def get_readme(self, repo_name: str) -> str:
        """Fetch README content for a repository."""
        readme_names = ["README.md", "readme.md", "Readme.md", "README.rst"]
        
        for readme_name in readme_names:
            url = f"{self.base_url}/api/v1/repos/{self.username}/{repo_name}/raw/{readme_name}"
            
            try:
                response = httpx.get(url, headers=self.headers, timeout=10.0)
                if response.status_code == 200:
                    return response.text
            except Exception as e:
                logger.warning(f"Failed to fetch {readme_name}: {e}")
                continue
                
        return ""
    
    def get_repo_files(self, repo_name: str, path: str = "") -> List[Dict]:
        """List files in a repository directory."""
        url = f"{self.base_url}/api/v1/repos/{self.username}/{repo_name}/contents/{path}"
        
        try:
            response = httpx.get(url, headers=self.headers, timeout=10.0)
            response.raise_for_status()
            return response.json()
        except Exception as e:
            logger.error(f"Error listing files in {repo_name}/{path}: {e}")
            return []
    
    def get_file_content(self, repo_name: str, filepath: str) -> str:
        """Fetch content of a specific file."""
        url = f"{self.base_url}/api/v1/repos/{self.username}/{repo_name}/raw/{filepath}"
        
        try:
            response = httpx.get(url, headers=self.headers, timeout=10.0)
            if response.status_code == 200:
                return response.text
        except Exception as e:
            logger.error(f"Error fetching file {filepath}: {e}")
            
        return ""

# Test function
if __name__ == "__main__":
    scraper = GiteaScraper(
        base_url=os.getenv("GITEA_URL", "https://gitea.lab.audasmedia.com.au"),
        token=os.getenv("GITEA_TOKEN", ""),
        username=os.getenv("GITEA_USERNAME", "sam")
    )
    
    repos = scraper.get_user_repos()
    print(f"Found {len(repos)} repositories")
    
    for repo in repos[:3]:
        print(f"\nRepo: {repo.name}")
        readme = scraper.get_readme(repo.name)
        if readme:
            print(f"README preview: {readme[:200]}...")

Phase 4: Apache Airflow Setup

Directory Structure

/home/sam/development/airflow/
├── docker-compose.yml
├── .env
└── dags/
    └── gitea_ingestion_dag.py

File: /home/sam/development/airflow/.env

# Airflow Configuration
AIRFLOW_UID=1000
AIRFLOW_GID=0
AIRFLOW_PROJ_DIR=.
_AIRFLOW_WWW_USER_USERNAME=admin
_AIRFLOW_WWW_USER_PASSWORD=admin

# Gitea Configuration
GITEA_URL=https://gitea.lab.audasmedia.com.au
GITEA_TOKEN=your_token_here
GITEA_USERNAME=sam

# Knowledge Service
KNOWLEDGE_SERVICE_URL=http://knowledge-service:8080

File: /home/sam/development/airflow/dags/gitea_ingestion_dag.py

"""
Airflow DAG for scheduled Gitea repository ingestion.
Runs daily to fetch new/updated repos and ingest into ChromaDB.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import os
import sys

# Add knowledge_service to path for imports
sys.path.insert(0, '/opt/airflow/dags/repo')

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

def fetch_gitea_repos(**context):
    """Task: Fetch all repositories from Gitea."""
    from gitea_scraper import GiteaScraper
    
    scraper = GiteaScraper(
        base_url=os.getenv("GITEA_URL", "https://gitea.lab.audasmedia.com.au"),
        token=os.getenv("GITEA_TOKEN", ""),
        username=os.getenv("GITEA_USERNAME", "sam")
    )
    
    repos = scraper.get_user_repos()
    
    # Push to XCom for downstream tasks
    context['ti'].xcom_push(key='repo_count', value=len(repos))
    context['ti'].xcom_push(key='repos', value=[
        {
            'name': r.name,
            'description': r.description,
            'url': r.url,
            'updated_at': r.updated_at
        }
        for r in repos
    ])
    
    return f"Fetched {len(repos)} repositories"

def fetch_readmes(**context):
    """Task: Fetch READMEs for all repositories."""
    from gitea_scraper import GiteaScraper
    
    ti = context['ti']
    repos = ti.xcom_pull(task_ids='fetch_repos', key='repos')
    
    scraper = GiteaScraper(
        base_url=os.getenv("GITEA_URL", "https://gitea.lab.audasmedia.com.au"),
        token=os.getenv("GITEA_TOKEN", ""),
        username=os.getenv("GITEA_USERNAME", "sam")
    )
    
    readme_data = []
    for repo in repos[:10]:  # Limit to 10 repos per run for testing
        readme = scraper.get_readme(repo['name'])
        if readme:
            readme_data.append({
                'repo': repo['name'],
                'content': readme[:5000],  # First 5000 chars
                'url': repo['url']
            })
    
    ti.xcom_push(key='readme_data', value=readme_data)
    
    return f"Fetched {len(readme_data)} READMEs"

def ingest_to_chroma(**context):
    """Task: Ingest fetched data into ChromaDB via knowledge service."""
    import httpx
    
    ti = context['ti']
    readme_data = ti.xcom_pull(task_ids='fetch_readmes', key='readme_data')
    
    knowledge_service_url = os.getenv("KNOWLEDGE_SERVICE_URL", "http://knowledge-service:8080")
    
    documents_ingested = 0
    for item in readme_data:
        try:
            # Call knowledge service ingest endpoint
            response = httpx.post(
                f"{knowledge_service_url}/ingest",
                json={
                    'source': f"gitea:{item['repo']}",
                    'content': item['content'],
                    'metadata': {
                        'repo': item['repo'],
                        'url': item['url'],
                        'type': 'readme'
                    }
                },
                timeout=30.0
            )
            
            if response.status_code == 200:
                documents_ingested += 1
                
        except Exception as e:
            print(f"Error ingesting {item['repo']}: {e}")
    
    return f"Ingested {documents_ingested} documents into ChromaDB"

# Define the DAG
with DAG(
    'gitea_daily_ingestion',
    default_args=default_args,
    description='Daily ingestion of Gitea repositories into knowledge base',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['gitea', 'ingestion', 'knowledge'],
) as dag:
    
    fetch_repos_task = PythonOperator(
        task_id='fetch_repos',
        python_callable=fetch_gitea_repos,
    )
    
    fetch_readmes_task = PythonOperator(
        task_id='fetch_readmes',
        python_callable=fetch_readmes,
    )
    
    ingest_task = PythonOperator(
        task_id='ingest_to_chroma',
        python_callable=ingest_to_chroma,
    )
    
    fetch_repos_task >> fetch_readmes_task >> ingest_task

File: /home/sam/development/airflow/docker-compose.yml

version: '3.8'

x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.8.1}
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
    AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
  volumes:
    - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
    - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
    - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
    - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
    - /home/sam/development/knowledge_service:/opt/airflow/dags/repo:ro
  user: "${AIRFLOW_UID:-50000}:0"
  depends_on:
    &airflow-common-depends-on
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 10s
      retries: 5
      start_period: 5s
    restart: always
    networks:
      - ai-mesh

  redis:
    image: redis:latest
    expose:
      - 6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 30s
      retries: 50
      start_period: 30s
    restart: always
    networks:
      - ai-mesh

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - "8081:8080"
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully
    networks:
      - ai-mesh

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully
    networks:
      - ai-mesh

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 30s
      timeout: 10s
      retries: 5
      start_period: 30s
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully
    networks:
      - ai-mesh

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    command:
      - -c
      - |
        if [[ -z "${AIRFLOW_UID}" ]]; then
          echo "WARNING!!!: AIRFLOW_UID not set!"
          echo "Using default UID: 50000"
          export AIRFLOW_UID=50000
        fi
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
        exec /entrypoint airflow version
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_MIGRATE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
    user: "0:0"
    volumes:
      - ${AIRFLOW_PROJ_DIR:-.}:/sources
    networks:
      - ai-mesh

volumes:
  postgres-db-volume:

networks:
  ai-mesh:
    external: true

Phase 5: Terminal Commands

Setup LangGraph Service

# Create langgraph_service directory
mkdir -p /home/sam/development/langgraph_service

# Create requirements.txt
cat > /home/sam/development/langgraph_service/requirements.txt << 'EOF'
fastapi
uvicorn
langgraph
langchain
langchain-community
langchain-openai
httpx
pydantic
EOF

# Start LangGraph service
cd /home/sam/development/langgraph_service
docker-compose up -d --build

# Verify
sleep 10
curl http://localhost:8090/health
curl http://localhost:8090/agents

Setup Airflow

# Create directory structure
mkdir -p /home/sam/development/airflow/{dags,logs,config,plugins}

# Create .env file
cat > /home/sam/development/airflow/.env << 'EOF'
AIRFLOW_UID=1000
AIRFLOW_GID=0
AIRFLOW_PROJ_DIR=.
_AIRFLOW_WWW_USER_USERNAME=admin
_AIRFLOW_WWW_USER_PASSWORD=admin
GITEA_URL=https://gitea.lab.audasmedia.com.au
GITEA_TOKEN=your_token_here
GITEA_USERNAME=sam
KNOWLEDGE_SERVICE_URL=http://knowledge-service:8080
EOF

# Copy gitea_scraper.py to dags
cp /home/sam/development/knowledge_service/gitea_scraper.py /home/sam/development/airflow/dags/

# Start Airflow
cd /home/sam/development/airflow
docker-compose up -d

# Wait and verify
sleep 60
curl http://localhost:8081/health

Update Chat Gateway

# Replace backend main.py with new version
# (Copy the main.py content from this file to: /home/sam/development/aboutme_chat_demo/backend/main.py)

# Update docker-compose to include langgraph dependency
cd /home/sam/development/aboutme_chat_demo
docker-compose up -d --build

# Verify
curl http://localhost:8000/health

Test Gitea Scraper Locally

# Set environment variables
export GITEA_URL=https://gitea.lab.audasmedia.com.au
export GITEA_TOKEN=your_token_here
export GITEA_USERNAME=sam

# Run test
cd /home/sam/development/knowledge_service
python gitea_scraper.py

Trigger Airflow DAG Manually

# Trigger the DAG
curl -X POST http://localhost:8081/api/v1/dags/gitea_daily_ingestion/dagRuns \
  -H "Content-Type: application/json" \
  -u admin:admin \
  -d '{"conf": {}}'

Complete Stack Startup

# Ensure network exists
docker network create ai-mesh 2>/dev/null || true

# Start all services in order
cd /home/sam/development/knowledge_service && docker-compose up -d
cd /home/sam/development/langgraph_service && docker-compose up -d
cd /home/sam/development/aboutme_chat_demo && docker-compose up -d
cd /home/sam/development/airflow && docker-compose up -d

# Verify all services
echo "Testing services..."
sleep 30
curl -s http://localhost:8000/health && echo "✓ Chat Gateway"
curl -s http://localhost:8080/health && echo "✓ Knowledge Service"
curl -s http://localhost:8090/health && echo "✓ LangGraph Service"
curl -s http://localhost:8081/health && echo "✓ Airflow"

Test End-to-End

# Test query through LangGraph
curl -X POST http://localhost:8000/chat \
  -H "Content-Type: application/json" \
  -d '{"message": "What are Sam'"'"'s coding projects?"}'

# Test agent routing
curl -X POST http://localhost:8000/chat \
  -H "Content-Type: application/json" \
  -d '{"message": "Write a Python function to calculate fibonacci"}'

Architecture Summary

                    User Query
                        |
                        v
            ┌───────────────────────┐
            │   Chat Gateway        │  Port 8000
            │   (FastAPI)           │  Routes to LangGraph
            └───────────┬───────────┘
                        |
                        v
            ┌───────────────────────┐
            │  LangGraph Supervisor │  Port 8090
            │  (Agent Router)       │  - Decides agent
            └───────────┬───────────┘
                        |
        ┌───────────────┼───────────────┐
        |               |               |
        v               v               v
   ┌─────────┐   ┌──────────┐   ┌──────────┐
   │Librarian│   │ Opencode │   │  Brain   │
   │  (RAG)  │   │ (Coding) │   │  (LLM)   │
   └────┬────┘   └──────────┘   └──────────┘
        |
        v
┌─────────────────┐     ┌─────────────────┐
│  Knowledge      │◄────│  Apache Airflow │
│  Service        │     │  (Scheduler)    │
│  (ChromaDB)     │     └────────┬────────┘
│  Port 8080      │              |
└─────────────────┘              v
                          ┌──────────────┐
                          │ Gitea API    │
                          │ Scraper      │
                          │ (Daily DAG)  │
                          └──────────────┘

Key Improvements

  1. Modular Docker Compose: Each service has its own file
  2. Self-contained packages: LangGraph has its own venv, no sharing
  3. Fast rebuilds: /app/packages pattern for all services
  4. No hardcoded keywords: LangGraph intelligently routes based on context
  5. Scalable: Each service can be updated independently
  6. Scheduled ingestion: Airflow runs daily Gitea scrapes

All code is modular, self-contained, and ready for copy-paste implementation.