- Frontend: Vite + React + TypeScript chat interface - Backend: FastAPI gateway with LangGraph routing - Knowledge Service: ChromaDB RAG with Gitea scraper - LangGraph Service: Multi-agent orchestration - Airflow: Scheduled Gitea ingestion DAG - Documentation: Complete plan and implementation guides Architecture: - Modular Docker Compose per service - External ai-mesh network for communication - Fast rebuilds with /app/packages pattern - Intelligent agent routing (no hardcoded keywords) Services: - Frontend (5173): React chat UI - Chat Gateway (8000): FastAPI entry point - LangGraph (8090): Agent orchestration - Knowledge (8080): ChromaDB RAG - Airflow (8081): Scheduled ingestion - PostgreSQL (5432): Chat history Excludes: node_modules, .venv, chroma_db, logs, .env files Includes: All source code, configs, docs, docker files
31 KiB
31 KiB
Implementation Plan: Gitea Ingestion, Airflow Scheduling, and LangGraph Orchestration
Overview
Building a complete AI agent pipeline with:
- Gitea API Scraper - Custom module to fetch repos, READMEs, and code
- Apache Airflow - Multi-service Docker setup for scheduled ingestion
- LangGraph Supervisor - Agent orchestration service for multi-agent routing
Phase 1: Gitea API Scraper Module
File: /home/sam/development/knowledge_service/gitea_scraper.py
"""
Gitea API Scraper - Fetches repos, READMEs, and source code
for ingestion into the knowledge base.
"""
import os
import httpx
import logging
from typing import List, Dict, Optional
from dataclasses import dataclass
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class RepoMetadata:
name: str
description: str
url: str
default_branch: str
updated_at: str
language: Optional[str]
class GiteaScraper:
def __init__(self, base_url: str, token: str, username: str = "sam"):
self.base_url = base_url.rstrip("/")
self.token = token
self.username = username
self.headers = {"Authorization": f"token {token}"}
def get_user_repos(self) -> List[RepoMetadata]:
"""Fetch all repositories for the user."""
repos = []
page = 1
while True:
url = f"{self.base_url}/api/v1/users/{self.username}/repos?page={page}&limit=50"
try:
response = httpx.get(url, headers=self.headers, timeout=30.0)
response.raise_for_status()
data = response.json()
if not data:
break
for repo in data:
repos.append(RepoMetadata(
name=repo["name"],
description=repo.get("description", ""),
url=repo["html_url"],
default_branch=repo["default_branch"],
updated_at=repo["updated_at"],
language=repo.get("language")
))
logger.info(f"Fetched page {page}, got {len(data)} repos")
page += 1
except Exception as e:
logger.error(f"Error fetching repos: {e}")
break
return repos
def get_readme(self, repo_name: str) -> str:
"""Fetch README content for a repository."""
# Try common README filenames
readme_names = ["README.md", "readme.md", "Readme.md", "README.rst"]
for readme_name in readme_names:
url = f"{self.base_url}/api/v1/repos/{self.username}/{repo_name}/raw/{readme_name}"
try:
response = httpx.get(url, headers=self.headers, timeout=10.0)
if response.status_code == 200:
return response.text
except Exception as e:
logger.warning(f"Failed to fetch {readme_name}: {e}")
continue
return ""
def get_repo_files(self, repo_name: str, path: str = "") -> List[Dict]:
"""List files in a repository directory."""
url = f"{self.base_url}/api/v1/repos/{self.username}/{repo_name}/contents/{path}"
try:
response = httpx.get(url, headers=self.headers, timeout=10.0)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Error listing files in {repo_name}/{path}: {e}")
return []
def get_file_content(self, repo_name: str, filepath: str) -> str:
"""Fetch content of a specific file."""
url = f"{self.base_url}/api/v1/repos/{self.username}/{repo_name}/raw/{filepath}"
try:
response = httpx.get(url, headers=self.headers, timeout=10.0)
if response.status_code == 200:
return response.text
except Exception as e:
logger.error(f"Error fetching file {filepath}: {e}")
return ""
# Test function
if __name__ == "__main__":
scraper = GiteaScraper(
base_url=os.getenv("GITEA_URL", "https://gitea.lab.audasmedia.com.au"),
token=os.getenv("GITEA_TOKEN", ""),
username=os.getenv("GITEA_USERNAME", "sam")
)
repos = scraper.get_user_repos()
print(f"Found {len(repos)} repositories")
for repo in repos[:3]: # Test with first 3
print(f"\nRepo: {repo.name}")
readme = scraper.get_readme(repo.name)
if readme:
print(f"README preview: {readme[:200]}...")
Phase 2: Apache Airflow Multi-Service Setup
File: /home/sam/development/airflow/docker-compose.yml
version: '3.8'
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.8.1}
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
retries: 5
start_period: 5s
restart: always
networks:
- ai-mesh
redis:
image: redis:latest
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 30s
retries: 50
start_period: 30s
restart: always
networks:
- ai-mesh
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- "8081:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
networks:
- ai-mesh
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
networks:
- ai-mesh
airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
networks:
- ai-mesh
airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
networks:
- ai-mesh
airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
command:
- -c
- |
if [[ -z "${AIRFLOW_UID}" ]]; then
echo "WARNING!!!: AIRFLOW_UID not set!"
echo "Using default UID: 50000"
export AIRFLOW_UID=50000
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_MIGRATE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
user: "0:0"
volumes:
- ${AIRFLOW_PROJ_DIR:-.}:/sources
networks:
- ai-mesh
airflow-cli:
<<: *airflow-common
profiles:
- debug
environment:
<<: *airflow-common-env
CONNECTION_CHECK_MAX_COUNT: "0"
command:
- bash
- -c
- airflow
networks:
- ai-mesh
volumes:
postgres-db-volume:
networks:
ai-mesh:
external: true
File: /home/sam/development/airflow/dags/gitea_ingestion_dag.py
"""
Airflow DAG for scheduled Gitea repository ingestion.
Runs daily to fetch new/updated repos and ingest into ChromaDB.
"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
import os
import sys
import json
# Add knowledge_service to path for imports
sys.path.insert(0, '/opt/airflow/dags/repo')
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
def fetch_gitea_repos(**context):
"""Task: Fetch all repositories from Gitea."""
from gitea_scraper import GiteaScraper
scraper = GiteaScraper(
base_url=os.getenv("GITEA_URL", "https://gitea.lab.audasmedia.com.au"),
token=os.getenv("GITEA_TOKEN", ""),
username=os.getenv("GITEA_USERNAME", "sam")
)
repos = scraper.get_user_repos()
# Push to XCom for downstream tasks
context['ti'].xcom_push(key='repo_count', value=len(repos))
context['ti'].xcom_push(key='repos', value=[
{
'name': r.name,
'description': r.description,
'url': r.url,
'updated_at': r.updated_at
}
for r in repos
])
return f"Fetched {len(repos)} repositories"
def fetch_readmes(**context):
"""Task: Fetch READMEs for all repositories."""
from gitea_scraper import GiteaScraper
ti = context['ti']
repos = ti.xcom_pull(task_ids='fetch_repos', key='repos')
scraper = GiteaScraper(
base_url=os.getenv("GITEA_URL", "https://gitea.lab.audasmedia.com.au"),
token=os.getenv("GITEA_TOKEN", ""),
username=os.getenv("GITEA_USERNAME", "sam")
)
readme_data = []
for repo in repos[:10]: # Limit to 10 repos per run for testing
readme = scraper.get_readme(repo['name'])
if readme:
readme_data.append({
'repo': repo['name'],
'content': readme[:5000], # First 5000 chars
'url': repo['url']
})
ti.xcom_push(key='readme_data', value=readme_data)
return f"Fetched {len(readme_data)} READMEs"
def ingest_to_chroma(**context):
"""Task: Ingest fetched data into ChromaDB via knowledge service."""
import httpx
ti = context['ti']
readme_data = ti.xcom_pull(task_ids='fetch_readmes', key='readme_data')
knowledge_service_url = os.getenv("KNOWLEDGE_SERVICE_URL", "http://knowledge-service:8080")
documents_ingested = 0
for item in readme_data:
try:
# Call knowledge service ingest endpoint
response = httpx.post(
f"{knowledge_service_url}/ingest",
json={
'source': f"gitea:{item['repo']}",
'content': item['content'],
'metadata': {
'repo': item['repo'],
'url': item['url'],
'type': 'readme'
}
},
timeout=30.0
)
if response.status_code == 200:
documents_ingested += 1
except Exception as e:
print(f"Error ingesting {item['repo']}: {e}")
return f"Ingested {documents_ingested} documents into ChromaDB"
# Define the DAG
with DAG(
'gitea_daily_ingestion',
default_args=default_args,
description='Daily ingestion of Gitea repositories into knowledge base',
schedule_interval=timedelta(days=1), # Run daily
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['gitea', 'ingestion', 'knowledge'],
) as dag:
# Task 1: Fetch repository list
fetch_repos_task = PythonOperator(
task_id='fetch_repos',
python_callable=fetch_gitea_repos,
)
# Task 2: Fetch README content
fetch_readmes_task = PythonOperator(
task_id='fetch_readmes',
python_callable=fetch_readmes,
)
# Task 3: Ingest into ChromaDB
ingest_task = PythonOperator(
task_id='ingest_to_chroma',
python_callable=ingest_to_chroma,
)
# Define task dependencies
fetch_repos_task >> fetch_readmes_task >> ingest_task
File: /home/sam/development/airflow/.env
# Airflow Configuration
AIRFLOW_UID=1000
AIRFLOW_GID=0
AIRFLOW_PROJ_DIR=.
_AIRFLOW_WWW_USER_USERNAME=admin
_AIRFLOW_WWW_USER_PASSWORD=admin
# Gitea Configuration
GITEA_URL=https://gitea.lab.audasmedia.com.au
GITEA_TOKEN=your_token_here
GITEA_USERNAME=sam
# Knowledge Service
KNOWLEDGE_SERVICE_URL=http://knowledge-service:8080
Phase 3: LangGraph Supervisor Service
File: /home/sam/development/langgraph_service/requirements.txt
fastapi
uvicorn
langgraph
langchain
langchain-community
langchain-openai
httpx
pydantic
File: /home/sam/development/langgraph_service/supervisor_agent.py
"""
LangGraph Supervisor Agent - Routes queries to specialist agents
"""
from typing import TypedDict, Annotated, Sequence
from langgraph.graph import StateGraph, END
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
import operator
import httpx
import os
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# State definition
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
next_agent: str
context: dict
# Agent routing logic
def supervisor_node(state: AgentState):
"""Supervisor decides which specialist agent to call."""
last_message = state["messages"][-1].content.lower()
# Simple routing logic based on keywords
if any(kw in last_message for kw in ["repo", "code", "git", "github", "gitea", "project", "development"]):
return {"next_agent": "librarian"}
elif any(kw in last_message for kw in ["write", "edit", "create", "fix", "bug", "implement", "code change"]):
return {"next_agent": "opencode"}
elif any(kw in last_message for kw in ["sam", "hobby", "music", "experience", "skill", "about"]):
return {"next_agent": "librarian"}
else:
return {"next_agent": "brain"} # Default to general LLM
def librarian_agent(state: AgentState):
"""Librarian agent - queries knowledge base (ChromaDB)."""
last_message = state["messages"][-1].content
try:
# Call knowledge service
response = httpx.post(
"http://knowledge-service:8080/query",
json={"question": last_message},
timeout=10.0
)
if response.status_code == 200:
context = response.json().get("context", "")
return {
"messages": [AIMessage(content=f"Based on my knowledge base:\n\n{context}")],
"context": {"source": "librarian", "context": context}
}
except Exception as e:
logger.error(f"Librarian error: {e}")
return {
"messages": [AIMessage(content="I couldn't find relevant information in the knowledge base.")],
"context": {"source": "librarian", "error": str(e)}
}
def opencode_agent(state: AgentState):
"""Opencode agent - handles coding tasks via MCP."""
last_message = state["messages"][-1].content
# Placeholder - would integrate with opencode-brain
return {
"messages": [AIMessage(content=f"I'm the coding agent. I would help you with: {last_message}")],
"context": {"source": "opencode", "action": "coding_task"}
}
def brain_agent(state: AgentState):
"""Brain agent - general LLM fallback."""
last_message = state["messages"][-1].content
try:
# Call opencode-brain service
auth = httpx.BasicAuth("opencode", os.getenv("OPENCODE_PASSWORD", "sam4jo"))
timeout_long = httpx.Timeout(180.0, connect=10.0)
with httpx.AsyncClient(auth=auth, timeout=timeout_long) as client:
# Create session
session_res = client.post("http://opencode-brain:5000/session", json={"title": "Supervisor Query"})
session_id = session_res.json()["id"]
# Send message
response = client.post(
f"http://opencode-brain:5000/session/{session_id}/message",
json={"parts": [{"type": "text", "text": last_message}]}
)
data = response.json()
if "parts" in data:
for part in data["parts"]:
if part.get("type") == "text":
return {
"messages": [AIMessage(content=part["text"])],
"context": {"source": "brain"}
}
except Exception as e:
logger.error(f"Brain error: {e}")
return {
"messages": [AIMessage(content="I'm thinking about this...")],
"context": {"source": "brain"}
}
def route_decision(state: AgentState):
"""Routing function based on supervisor decision."""
return state["next_agent"]
# Build the graph
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("supervisor", supervisor_node)
workflow.add_node("librarian", librarian_agent)
workflow.add_node("opencode", opencode_agent)
workflow.add_node("brain", brain_agent)
# Add edges
workflow.set_entry_point("supervisor")
# Conditional routing from supervisor
workflow.add_conditional_edges(
"supervisor",
route_decision,
{
"librarian": "librarian",
"opencode": "opencode",
"brain": "brain"
}
)
# All specialist agents end
workflow.add_edge("librarian", END)
workflow.add_edge("opencode", END)
workflow.add_edge("brain", END)
# Compile the graph
supervisor_graph = workflow.compile()
# Main entry point for queries
async def process_query(query: str) -> dict:
"""Process a query through the supervisor graph."""
result = await supervisor_graph.ainvoke({
"messages": [HumanMessage(content=query)],
"next_agent": "",
"context": {}
})
return {
"response": result["messages"][-1].content,
"context": result.get("context", {})
}
File: /home/sam/development/langgraph_service/main.py
"""
LangGraph Supervisor Service - FastAPI wrapper for agent orchestration
"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from supervisor_agent import process_query
import logging
import sys
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
logger = logging.getLogger(__name__)
app = FastAPI(title="LangGraph Supervisor Service")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class QueryRequest(BaseModel):
query: str
class QueryResponse(BaseModel):
response: str
agent_used: str
context: dict
@app.get("/health")
async def health():
return {"status": "healthy", "service": "langgraph-supervisor"}
@app.post("/query", response_model=QueryResponse)
async def query_supervisor(request: QueryRequest):
"""Main entry point for agent orchestration."""
logger.info(f"Received query: {request.query}")
try:
result = await process_query(request.query)
return QueryResponse(
response=result["response"],
agent_used=result["context"].get("source", "unknown"),
context=result["context"]
)
except Exception as e:
logger.error(f"Error processing query: {e}")
return QueryResponse(
response="Error processing your request",
agent_used="error",
context={"error": str(e)}
)
@app.get("/agents")
async def list_agents():
"""List available specialist agents."""
return {
"agents": [
{
"name": "librarian",
"description": "Queries the knowledge base for semantic information",
"triggers": ["repo", "code", "git", "hobby", "about", "skill"]
},
{
"name": "opencode",
"description": "Handles coding tasks and file modifications",
"triggers": ["write", "edit", "create", "fix", "implement"]
},
{
"name": "brain",
"description": "General LLM for reasoning and generation",
"triggers": ["default", "general questions"]
}
]
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8090)
File: /home/sam/development/langgraph_service/Dockerfile
FROM python:3.11-slim
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
g++ \
&& rm -rf /var/lib/apt/lists/*
# Create app directory
WORKDIR /app
# Copy requirements
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy code
COPY . .
EXPOSE 8090
CMD ["python3", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8090"]
Phase 4: Integration - Updated Docker Compose
File: /home/sam/development/docker-compose.integrated.yml
version: '3.8'
services:
# Existing Knowledge Service
knowledge-service:
build: ./knowledge_service
container_name: knowledge-service
ports:
- "8080:8080"
volumes:
- ./knowledge_service/data:/app/code/data
- ./knowledge_service/chroma_db:/app/code/chroma_db
- ./knowledge_service/main.py:/app/code/main.py:ro
- ./knowledge_service/gitea_scraper.py:/app/code/gitea_scraper.py:ro
environment:
- PYTHONUNBUFFERED=1
- OPENROUTER_API_KEY=${OPENROUTER_API_KEY}
- PYTHONPATH=/app/packages
- GITEA_URL=${GITEA_URL}
- GITEA_TOKEN=${GITEA_TOKEN}
- GITEA_USERNAME=${GITEA_USERNAME:-sam}
networks:
- ai-mesh
restart: unless-stopped
# LangGraph Supervisor Service
langgraph-service:
build: ./langgraph_service
container_name: langgraph-service
ports:
- "8090:8090"
environment:
- OPENCODE_PASSWORD=${OPENCODE_PASSWORD:-sam4jo}
- KNOWLEDGE_SERVICE_URL=http://knowledge-service:8080
depends_on:
- knowledge-service
networks:
- ai-mesh
restart: unless-stopped
# Chat Gateway (Updated to use LangGraph)
chat-gateway:
build: ./aboutme_chat_demo/backend
container_name: chat-gateway
ports:
- "8000:8000"
volumes:
- ./aboutme_chat_demo/backend:/app
environment:
- DATABASE_URL=postgresql://sam:sam4jo@db:5432/chat_demo
- LANGGRAPH_URL=http://langgraph-service:8090
depends_on:
- langgraph-service
- db
networks:
- ai-mesh
restart: unless-stopped
# Frontend
frontend:
build: ./aboutme_chat_demo/frontend
container_name: chat-frontend
ports:
- "5173:5173"
volumes:
- ./aboutme_chat_demo/frontend:/app
- /app/node_modules
environment:
- CHOKIDAR_USEPOLLING=true
networks:
- ai-mesh
# PostgreSQL for chat history
db:
image: postgres:15-alpine
container_name: chat-db
environment:
POSTGRES_USER: sam
POSTGRES_PASSWORD: sam4jo
POSTGRES_DB: chat_demo
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
networks:
- ai-mesh
restart: unless-stopped
volumes:
postgres_data:
networks:
ai-mesh:
external: true
Phase 5: Updated Chat Gateway
File: /home/sam/development/aboutme_chat_demo/backend/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import httpx
import logging
import sys
import traceback
import os
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.StreamHandler(sys.stdout)])
logger = logging.getLogger(__name__)
app = FastAPI()
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
class MessageRequest(BaseModel):
message: str
LANGGRAPH_URL = os.getenv("LANGGRAPH_URL", "http://langgraph-service:8090")
@app.post("/chat")
async def chat(request: MessageRequest):
"""Updated chat endpoint that routes through LangGraph Supervisor."""
logger.info(f"Gateway: Received message: {request.message}")
try:
# Call LangGraph Supervisor instead of direct brain
async with httpx.AsyncClient(timeout=httpx.Timeout(60.0, connect=10.0)) as client:
response = await client.post(
f"{LANGGRAPH_URL}/query",
json={"query": request.message}
)
if response.status_code == 200:
result = response.json()
logger.info(f"Gateway: Response from {result.get('agent_used', 'unknown')} agent")
return {"response": result["response"]}
else:
logger.error(f"Gateway: LangGraph error {response.status_code}")
return {"response": "Error: Orchestration service unavailable"}
except Exception as e:
logger.error(f"Gateway: Error routing through LangGraph: {traceback.format_exc()}")
return {"response": "Error: Unable to process your request at this time."}
@app.get("/health")
async def health():
return {"status": "healthy", "service": "chat-gateway"}
Terminal Commands
Setup Airflow Environment
# Create airflow directory structure
mkdir -p /home/sam/development/airflow/{dags,logs,config,plugins}
# Copy gitea_scraper.py to airflow dags folder
cp /home/sam/development/knowledge_service/gitea_scraper.py /home/sam/development/airflow/dags/
# Set proper permissions (Airflow runs as UID 50000 in container)
echo -e "AIRFLOW_UID=1000\nAIRFLOW_GID=0" > /home/sam/development/airflow/.env
# Start Airflow services
cd /home/sam/development/airflow
docker-compose up -d
# Check Airflow webserver (wait 30 seconds for init)
sleep 30
curl http://localhost:8081/health
# Access Airflow UI
# http://localhost:8081 (login: admin/admin)
Setup LangGraph Service
# Create langgraph_service directory
mkdir -p /home/sam/development/langgraph_service
# Write requirements.txt
cat > /home/sam/development/langgraph_service/requirements.txt << 'EOF'
fastapi
uvicorn
langgraph
langchain
langchain-community
langchain-openai
httpx
pydantic
EOF
# Build and start LangGraph service
cd /home/sam/development/langgraph_service
docker build -t langgraph-service:latest .
docker run -d \
--name langgraph-service \
-p 8090:8090 \
--network ai-mesh \
-e OPENCODE_PASSWORD=sam4jo \
langgraph-service:latest
# Test LangGraph service
curl http://localhost:8090/health
curl http://localhost:8090/agents
Test Gitea Scraper Locally
# Set environment variables
export GITEA_URL=https://gitea.lab.audasmedia.com.au
export GITEA_TOKEN=your_token_here
export GITEA_USERNAME=sam
# Run scraper test
cd /home/sam/development/knowledge_service
python gitea_scraper.py
Start Complete Integrated Stack
# Ensure ai-mesh network exists
docker network create ai-mesh 2>/dev/null || true
# Start all services
cd /home/sam/development
docker-compose -f docker-compose.integrated.yml up -d
# Verify all services
curl http://localhost:8000/health # Chat Gateway
curl http://localhost:8080/health # Knowledge Service
curl http://localhost:8090/health # LangGraph Service
curl http://localhost:8081/health # Airflow
# Test end-to-end
curl -X POST http://localhost:8000/chat \
-H "Content-Type: application/json" \
-d '{"message": "What are Sam\'s hobbies?"}'
Manual Trigger Airflow DAG
# Trigger the Gitea ingestion DAG manually
curl -X POST http://localhost:8081/api/v1/dags/gitea_daily_ingestion/dagRuns \
-H "Content-Type: application/json" \
-u admin:admin \
-d '{"conf": {}}'
Architecture Summary
User Query
|
v
┌─────────────────┐
│ Chat Gateway │ (Port 8000)
│ (FastAPI) │
└────────┬────────┘
|
v
┌─────────────────┐
│ LangGraph │ (Port 8090)
│ Supervisor │ - Routes to specialist agents
│ (StateGraph) │
└────────┬────────┘
|
┌────┴────┬──────────┐
▼ ▼ ▼
┌────────┐ ┌──────────┐ ┌────────┐
│Librarian│ │Opencode │ │ Brain │
│(RAG) │ │(Coding) │ │(LLM) │
└────┬───┘ └──────────┘ └────────┘
|
v
┌─────────────────┐ ┌─────────────────┐
│ Knowledge │◄────│ Apache Airflow │
│ Service │ │ (Port 8081) │
│ (ChromaDB) │ │ - Scheduled │
│ (Port 8080) │ │ ingestion │
└─────────────────┘ └────────┬────────┘
|
v
┌──────────────┐
│ Gitea API │
│ Scraper │
└──────────────┘
Next Steps
- Add Gitea token to
.envfile - Build and test Gitea scraper locally
- Deploy Airflow with
docker-compose up -d - Build LangGraph service and test routing
- Update Chat Gateway to use LangGraph
- Test end-to-end flow with a query like "What coding projects does Sam have?"
All code is ready for copy-paste implementation.