- Frontend: Vite + React + TypeScript chat interface - Backend: FastAPI gateway with LangGraph routing - Knowledge Service: ChromaDB RAG with Gitea scraper - LangGraph Service: Multi-agent orchestration - Airflow: Scheduled Gitea ingestion DAG - Documentation: Complete plan and implementation guides Architecture: - Modular Docker Compose per service - External ai-mesh network for communication - Fast rebuilds with /app/packages pattern - Intelligent agent routing (no hardcoded keywords) Services: - Frontend (5173): React chat UI - Chat Gateway (8000): FastAPI entry point - LangGraph (8090): Agent orchestration - Knowledge (8080): ChromaDB RAG - Airflow (8081): Scheduled ingestion - PostgreSQL (5432): Chat history Excludes: node_modules, .venv, chroma_db, logs, .env files Includes: All source code, configs, docs, docker files
1129 lines
32 KiB
Markdown
1129 lines
32 KiB
Markdown
# Modular Implementation: Gitea, Airflow, and LangGraph
|
|
|
|
## Overview
|
|
Self-contained modular architecture with separate docker-compose files per service.
|
|
Each service has its own packages directory for fast rebuilds.
|
|
|
|
---
|
|
|
|
## Phase 1: LangGraph Supervisor Service
|
|
|
|
### Directory Structure
|
|
```
|
|
/home/sam/development/langgraph_service/
|
|
├── docker-compose.yml
|
|
├── Dockerfile
|
|
├── requirements.txt
|
|
├── main.py
|
|
└── supervisor_agent.py
|
|
```
|
|
|
|
### File: `/home/sam/development/langgraph_service/requirements.txt`
|
|
|
|
```
|
|
fastapi
|
|
uvicorn
|
|
langgraph
|
|
langchain
|
|
langchain-community
|
|
langchain-openai
|
|
httpx
|
|
pydantic
|
|
```
|
|
|
|
### File: `/home/sam/development/langgraph_service/Dockerfile`
|
|
|
|
```dockerfile
|
|
FROM python:3.11-slim
|
|
|
|
# Install system dependencies
|
|
RUN apt-get update && apt-get install -y \
|
|
gcc \
|
|
g++ \
|
|
&& rm -rf /var/lib/apt/lists/*
|
|
|
|
# Create directories
|
|
RUN mkdir -p /app/packages /app/code
|
|
|
|
WORKDIR /app
|
|
|
|
# Install packages to isolated directory
|
|
COPY requirements.txt .
|
|
RUN pip install --target=/app/packages -r requirements.txt
|
|
|
|
# Copy code
|
|
COPY . /app/code/
|
|
|
|
ENV PYTHONPATH=/app/packages
|
|
ENV PYTHONUNBUFFERED=1
|
|
|
|
WORKDIR /app/code
|
|
EXPOSE 8090
|
|
|
|
CMD ["python3", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8090"]
|
|
```
|
|
|
|
### File: `/home/sam/development/langgraph_service/supervisor_agent.py`
|
|
|
|
```python
|
|
"""
|
|
LangGraph Supervisor Agent - Routes queries to specialist agents
|
|
"""
|
|
from typing import TypedDict, Annotated, Sequence
|
|
from langgraph.graph import StateGraph, END
|
|
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
|
|
import operator
|
|
import httpx
|
|
import os
|
|
import logging
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# State definition
|
|
class AgentState(TypedDict):
|
|
messages: Annotated[Sequence[BaseMessage], operator.add]
|
|
next_agent: str
|
|
context: dict
|
|
|
|
# Agent routing logic
|
|
def supervisor_node(state: AgentState):
|
|
"""Supervisor decides which specialist agent to call."""
|
|
last_message = state["messages"][-1].content.lower()
|
|
|
|
# Simple routing logic based on keywords
|
|
if any(kw in last_message for kw in ["repo", "code", "git", "github", "gitea", "project", "development"]):
|
|
return {"next_agent": "librarian"}
|
|
elif any(kw in last_message for kw in ["write", "edit", "create", "fix", "bug", "implement", "code change"]):
|
|
return {"next_agent": "opencode"}
|
|
elif any(kw in last_message for kw in ["sam", "hobby", "music", "experience", "skill", "about"]):
|
|
return {"next_agent": "librarian"}
|
|
else:
|
|
return {"next_agent": "brain"} # Default to general LLM
|
|
|
|
def librarian_agent(state: AgentState):
|
|
"""Librarian agent - queries knowledge base (ChromaDB)."""
|
|
last_message = state["messages"][-1].content
|
|
|
|
try:
|
|
# Call knowledge service
|
|
response = httpx.post(
|
|
"http://knowledge-service:8080/query",
|
|
json={"question": last_message},
|
|
timeout=10.0
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
context = response.json().get("context", "")
|
|
return {
|
|
"messages": [AIMessage(content=f"Based on my knowledge base:\n\n{context}")],
|
|
"context": {"source": "librarian", "context": context}
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Librarian error: {e}")
|
|
|
|
return {
|
|
"messages": [AIMessage(content="I couldn't find relevant information in the knowledge base.")],
|
|
"context": {"source": "librarian", "error": str(e)}
|
|
}
|
|
|
|
def opencode_agent(state: AgentState):
|
|
"""Opencode agent - handles coding tasks via MCP."""
|
|
last_message = state["messages"][-1].content
|
|
|
|
# Placeholder - would integrate with opencode-brain
|
|
return {
|
|
"messages": [AIMessage(content=f"I'm the coding agent. I would help you with: {last_message}")],
|
|
"context": {"source": "opencode", "action": "coding_task"}
|
|
}
|
|
|
|
def brain_agent(state: AgentState):
|
|
"""Brain agent - general LLM fallback."""
|
|
last_message = state["messages"][-1].content
|
|
|
|
try:
|
|
# Call opencode-brain service
|
|
auth = httpx.BasicAuth("opencode", os.getenv("OPENCODE_PASSWORD", "sam4jo"))
|
|
timeout_long = httpx.Timeout(180.0, connect=10.0)
|
|
|
|
with httpx.AsyncClient(auth=auth, timeout=timeout_long) as client:
|
|
# Create session
|
|
session_res = client.post("http://opencode-brain:5000/session", json={"title": "Supervisor Query"})
|
|
session_id = session_res.json()["id"]
|
|
|
|
# Send message
|
|
response = client.post(
|
|
f"http://opencode-brain:5000/session/{session_id}/message",
|
|
json={"parts": [{"type": "text", "text": last_message}]}
|
|
)
|
|
|
|
data = response.json()
|
|
if "parts" in data:
|
|
for part in data["parts"]:
|
|
if part.get("type") == "text":
|
|
return {
|
|
"messages": [AIMessage(content=part["text"])],
|
|
"context": {"source": "brain"}
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Brain error: {e}")
|
|
|
|
return {
|
|
"messages": [AIMessage(content="I'm thinking about this...")],
|
|
"context": {"source": "brain"}
|
|
}
|
|
|
|
def route_decision(state: AgentState):
|
|
"""Routing function based on supervisor decision."""
|
|
return state["next_agent"]
|
|
|
|
# Build the graph
|
|
workflow = StateGraph(AgentState)
|
|
|
|
# Add nodes
|
|
workflow.add_node("supervisor", supervisor_node)
|
|
workflow.add_node("librarian", librarian_agent)
|
|
workflow.add_node("opencode", opencode_agent)
|
|
workflow.add_node("brain", brain_agent)
|
|
|
|
# Add edges
|
|
workflow.set_entry_point("supervisor")
|
|
|
|
# Conditional routing from supervisor
|
|
workflow.add_conditional_edges(
|
|
"supervisor",
|
|
route_decision,
|
|
{
|
|
"librarian": "librarian",
|
|
"opencode": "opencode",
|
|
"brain": "brain"
|
|
}
|
|
)
|
|
|
|
# All specialist agents end
|
|
workflow.add_edge("librarian", END)
|
|
workflow.add_edge("opencode", END)
|
|
workflow.add_edge("brain", END)
|
|
|
|
# Compile the graph
|
|
supervisor_graph = workflow.compile()
|
|
|
|
# Main entry point for queries
|
|
async def process_query(query: str) -> dict:
|
|
"""Process a query through the supervisor graph."""
|
|
result = await supervisor_graph.ainvoke({
|
|
"messages": [HumanMessage(content=query)],
|
|
"next_agent": "",
|
|
"context": {}
|
|
})
|
|
|
|
return {
|
|
"response": result["messages"][-1].content,
|
|
"context": result.get("context", {})
|
|
}
|
|
```
|
|
|
|
### File: `/home/sam/development/langgraph_service/main.py`
|
|
|
|
```python
|
|
"""
|
|
LangGraph Supervisor Service - FastAPI wrapper for agent orchestration
|
|
"""
|
|
from fastapi import FastAPI
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel
|
|
from supervisor_agent import process_query
|
|
import logging
|
|
import sys
|
|
|
|
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = FastAPI(title="LangGraph Supervisor Service")
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
class QueryRequest(BaseModel):
|
|
query: str
|
|
|
|
class QueryResponse(BaseModel):
|
|
response: str
|
|
agent_used: str
|
|
context: dict
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {"status": "healthy", "service": "langgraph-supervisor"}
|
|
|
|
@app.post("/query", response_model=QueryResponse)
|
|
async def query_supervisor(request: QueryRequest):
|
|
"""Main entry point for agent orchestration."""
|
|
logger.info(f"Received query: {request.query}")
|
|
|
|
try:
|
|
result = await process_query(request.query)
|
|
|
|
return QueryResponse(
|
|
response=result["response"],
|
|
agent_used=result["context"].get("source", "unknown"),
|
|
context=result["context"]
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error processing query: {e}")
|
|
return QueryResponse(
|
|
response="Error processing your request",
|
|
agent_used="error",
|
|
context={"error": str(e)}
|
|
)
|
|
|
|
@app.get("/agents")
|
|
async def list_agents():
|
|
"""List available specialist agents."""
|
|
return {
|
|
"agents": [
|
|
{
|
|
"name": "librarian",
|
|
"description": "Queries the knowledge base for semantic information",
|
|
"triggers": ["repo", "code", "git", "hobby", "about", "skill"]
|
|
},
|
|
{
|
|
"name": "opencode",
|
|
"description": "Handles coding tasks and file modifications",
|
|
"triggers": ["write", "edit", "create", "fix", "implement"]
|
|
},
|
|
{
|
|
"name": "brain",
|
|
"description": "General LLM for reasoning and generation",
|
|
"triggers": ["default", "general questions"]
|
|
}
|
|
]
|
|
}
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8090)
|
|
```
|
|
|
|
### File: `/home/sam/development/langgraph_service/docker-compose.yml`
|
|
|
|
```yaml
|
|
version: '3.8'
|
|
|
|
services:
|
|
langgraph-service:
|
|
build: .
|
|
image: langgraph-service:latest
|
|
container_name: langgraph-service
|
|
ports:
|
|
- "8090:8090"
|
|
volumes:
|
|
# Only mount code files, not packages
|
|
- ./main.py:/app/code/main.py:ro
|
|
- ./supervisor_agent.py:/app/code/supervisor_agent.py:ro
|
|
environment:
|
|
- PYTHONUNBUFFERED=1
|
|
- PYTHONPATH=/app/packages
|
|
- OPENCODE_PASSWORD=${OPENCODE_PASSWORD:-sam4jo}
|
|
- KNOWLEDGE_SERVICE_URL=http://knowledge-service:8080
|
|
networks:
|
|
- ai-mesh
|
|
restart: unless-stopped
|
|
|
|
networks:
|
|
ai-mesh:
|
|
external: true
|
|
```
|
|
|
|
---
|
|
|
|
## Phase 2: Updated Chat Gateway (Replaces Hardcoded Logic)
|
|
|
|
### File: `/home/sam/development/aboutme_chat_demo/backend/main.py`
|
|
|
|
```python
|
|
"""
|
|
Chat Gateway - Routes all queries through LangGraph Supervisor
|
|
Removes hardcoded keywords, uses intelligent routing instead
|
|
"""
|
|
from fastapi import FastAPI
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel
|
|
import httpx
|
|
import logging
|
|
import sys
|
|
import traceback
|
|
import os
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.StreamHandler(sys.stdout)])
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = FastAPI()
|
|
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
|
|
|
|
class MessageRequest(BaseModel):
|
|
message: str
|
|
|
|
LANGGRAPH_URL = os.getenv("LANGGRAPH_URL", "http://langgraph-service:8090")
|
|
|
|
@app.post("/chat")
|
|
async def chat(request: MessageRequest):
|
|
"""
|
|
Routes all queries through LangGraph Supervisor.
|
|
No hardcoded keywords - LangGraph intelligently routes to:
|
|
- Librarian: For knowledge base queries (RAG)
|
|
- Opencode: For coding tasks
|
|
- Brain: For general LLM queries
|
|
"""
|
|
logger.info(f"Gateway: Routing query to LangGraph: {request.message}")
|
|
|
|
try:
|
|
async with httpx.AsyncClient(timeout=httpx.Timeout(60.0, connect=10.0)) as client:
|
|
response = await client.post(
|
|
f"{LANGGRAPH_URL}/query",
|
|
json={"query": request.message}
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
agent_used = result.get("agent_used", "unknown")
|
|
logger.info(f"Gateway: Response from {agent_used} agent")
|
|
return {"response": result["response"]}
|
|
else:
|
|
logger.error(f"Gateway: LangGraph error {response.status_code}")
|
|
return {"response": "Error: Orchestration service unavailable"}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Gateway: Error routing through LangGraph: {traceback.format_exc()}")
|
|
return {"response": "Error: Unable to process your request at this time."}
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {"status": "healthy", "service": "chat-gateway"}
|
|
|
|
@app.get("/agents")
|
|
async def list_agents():
|
|
"""List available agents from LangGraph."""
|
|
try:
|
|
async with httpx.AsyncClient(timeout=httpx.Timeout(10.0)) as client:
|
|
response = await client.get(f"{LANGGRAPH_URL}/agents")
|
|
if response.status_code == 200:
|
|
return response.json()
|
|
except Exception as e:
|
|
logger.error(f"Error fetching agents: {e}")
|
|
|
|
return {"agents": [], "error": "Could not retrieve agent list"}
|
|
```
|
|
|
|
### File: `/home/sam/development/aboutme_chat_demo/docker-compose.yml`
|
|
|
|
```yaml
|
|
version: '3.8'
|
|
|
|
services:
|
|
db:
|
|
image: postgres:15-alpine
|
|
environment:
|
|
POSTGRES_USER: sam
|
|
POSTGRES_PASSWORD: sam4jo
|
|
POSTGRES_DB: chat_demo
|
|
ports:
|
|
- "5432:5432"
|
|
volumes:
|
|
- postgres_data:/var/lib/postgresql/data
|
|
networks:
|
|
- ai-mesh
|
|
restart: unless-stopped
|
|
|
|
backend:
|
|
build: ./backend
|
|
ports:
|
|
- "8000:8000"
|
|
environment:
|
|
DATABASE_URL: postgresql://sam:sam4jo@db:5432/chat_demo
|
|
LANGGRAPH_URL: http://langgraph-service:8090
|
|
volumes:
|
|
- ./backend:/app
|
|
depends_on:
|
|
- db
|
|
- langgraph-service
|
|
networks:
|
|
- ai-mesh
|
|
restart: unless-stopped
|
|
|
|
frontend:
|
|
build: ./frontend
|
|
ports:
|
|
- "5173:5173"
|
|
volumes:
|
|
- ./frontend:/app
|
|
- /app/node_modules
|
|
environment:
|
|
- CHOKIDAR_USEPOLLING=true
|
|
depends_on:
|
|
- backend
|
|
networks:
|
|
- ai-mesh
|
|
|
|
volumes:
|
|
postgres_data:
|
|
|
|
networks:
|
|
ai-mesh:
|
|
external: true
|
|
```
|
|
|
|
---
|
|
|
|
## Phase 3: Gitea Scraper Module
|
|
|
|
### File: `/home/sam/development/knowledge_service/gitea_scraper.py`
|
|
|
|
```python
|
|
"""
|
|
Gitea API Scraper - Fetches repos, READMEs, and source code
|
|
for ingestion into the knowledge base.
|
|
"""
|
|
import os
|
|
import httpx
|
|
import logging
|
|
from typing import List, Dict, Optional
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class RepoMetadata:
|
|
name: str
|
|
description: str
|
|
url: str
|
|
default_branch: str
|
|
updated_at: str
|
|
language: Optional[str]
|
|
|
|
class GiteaScraper:
|
|
def __init__(self, base_url: str, token: str, username: str = "sam"):
|
|
self.base_url = base_url.rstrip("/")
|
|
self.token = token
|
|
self.username = username
|
|
self.headers = {"Authorization": f"token {token}"}
|
|
|
|
def get_user_repos(self) -> List[RepoMetadata]:
|
|
"""Fetch all repositories for the user."""
|
|
repos = []
|
|
page = 1
|
|
|
|
while True:
|
|
url = f"{self.base_url}/api/v1/users/{self.username}/repos?page={page}&limit=50"
|
|
|
|
try:
|
|
response = httpx.get(url, headers=self.headers, timeout=30.0)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
if not data:
|
|
break
|
|
|
|
for repo in data:
|
|
repos.append(RepoMetadata(
|
|
name=repo["name"],
|
|
description=repo.get("description", ""),
|
|
url=repo["html_url"],
|
|
default_branch=repo["default_branch"],
|
|
updated_at=repo["updated_at"],
|
|
language=repo.get("language")
|
|
))
|
|
|
|
logger.info(f"Fetched page {page}, got {len(data)} repos")
|
|
page += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching repos: {e}")
|
|
break
|
|
|
|
return repos
|
|
|
|
def get_readme(self, repo_name: str) -> str:
|
|
"""Fetch README content for a repository."""
|
|
readme_names = ["README.md", "readme.md", "Readme.md", "README.rst"]
|
|
|
|
for readme_name in readme_names:
|
|
url = f"{self.base_url}/api/v1/repos/{self.username}/{repo_name}/raw/{readme_name}"
|
|
|
|
try:
|
|
response = httpx.get(url, headers=self.headers, timeout=10.0)
|
|
if response.status_code == 200:
|
|
return response.text
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch {readme_name}: {e}")
|
|
continue
|
|
|
|
return ""
|
|
|
|
def get_repo_files(self, repo_name: str, path: str = "") -> List[Dict]:
|
|
"""List files in a repository directory."""
|
|
url = f"{self.base_url}/api/v1/repos/{self.username}/{repo_name}/contents/{path}"
|
|
|
|
try:
|
|
response = httpx.get(url, headers=self.headers, timeout=10.0)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
logger.error(f"Error listing files in {repo_name}/{path}: {e}")
|
|
return []
|
|
|
|
def get_file_content(self, repo_name: str, filepath: str) -> str:
|
|
"""Fetch content of a specific file."""
|
|
url = f"{self.base_url}/api/v1/repos/{self.username}/{repo_name}/raw/{filepath}"
|
|
|
|
try:
|
|
response = httpx.get(url, headers=self.headers, timeout=10.0)
|
|
if response.status_code == 200:
|
|
return response.text
|
|
except Exception as e:
|
|
logger.error(f"Error fetching file {filepath}: {e}")
|
|
|
|
return ""
|
|
|
|
# Test function
|
|
if __name__ == "__main__":
|
|
scraper = GiteaScraper(
|
|
base_url=os.getenv("GITEA_URL", "https://gitea.lab.audasmedia.com.au"),
|
|
token=os.getenv("GITEA_TOKEN", ""),
|
|
username=os.getenv("GITEA_USERNAME", "sam")
|
|
)
|
|
|
|
repos = scraper.get_user_repos()
|
|
print(f"Found {len(repos)} repositories")
|
|
|
|
for repo in repos[:3]:
|
|
print(f"\nRepo: {repo.name}")
|
|
readme = scraper.get_readme(repo.name)
|
|
if readme:
|
|
print(f"README preview: {readme[:200]}...")
|
|
```
|
|
|
|
---
|
|
|
|
## Phase 4: Apache Airflow Setup
|
|
|
|
### Directory Structure
|
|
```
|
|
/home/sam/development/airflow/
|
|
├── docker-compose.yml
|
|
├── .env
|
|
└── dags/
|
|
└── gitea_ingestion_dag.py
|
|
```
|
|
|
|
### File: `/home/sam/development/airflow/.env`
|
|
|
|
```bash
|
|
# Airflow Configuration
|
|
AIRFLOW_UID=1000
|
|
AIRFLOW_GID=0
|
|
AIRFLOW_PROJ_DIR=.
|
|
_AIRFLOW_WWW_USER_USERNAME=admin
|
|
_AIRFLOW_WWW_USER_PASSWORD=admin
|
|
|
|
# Gitea Configuration
|
|
GITEA_URL=https://gitea.lab.audasmedia.com.au
|
|
GITEA_TOKEN=your_token_here
|
|
GITEA_USERNAME=sam
|
|
|
|
# Knowledge Service
|
|
KNOWLEDGE_SERVICE_URL=http://knowledge-service:8080
|
|
```
|
|
|
|
### File: `/home/sam/development/airflow/dags/gitea_ingestion_dag.py`
|
|
|
|
```python
|
|
"""
|
|
Airflow DAG for scheduled Gitea repository ingestion.
|
|
Runs daily to fetch new/updated repos and ingest into ChromaDB.
|
|
"""
|
|
from datetime import datetime, timedelta
|
|
from airflow import DAG
|
|
from airflow.operators.python import PythonOperator
|
|
import os
|
|
import sys
|
|
|
|
# Add knowledge_service to path for imports
|
|
sys.path.insert(0, '/opt/airflow/dags/repo')
|
|
|
|
default_args = {
|
|
'owner': 'airflow',
|
|
'depends_on_past': False,
|
|
'email_on_failure': False,
|
|
'email_on_retry': False,
|
|
'retries': 1,
|
|
'retry_delay': timedelta(minutes=5),
|
|
}
|
|
|
|
def fetch_gitea_repos(**context):
|
|
"""Task: Fetch all repositories from Gitea."""
|
|
from gitea_scraper import GiteaScraper
|
|
|
|
scraper = GiteaScraper(
|
|
base_url=os.getenv("GITEA_URL", "https://gitea.lab.audasmedia.com.au"),
|
|
token=os.getenv("GITEA_TOKEN", ""),
|
|
username=os.getenv("GITEA_USERNAME", "sam")
|
|
)
|
|
|
|
repos = scraper.get_user_repos()
|
|
|
|
# Push to XCom for downstream tasks
|
|
context['ti'].xcom_push(key='repo_count', value=len(repos))
|
|
context['ti'].xcom_push(key='repos', value=[
|
|
{
|
|
'name': r.name,
|
|
'description': r.description,
|
|
'url': r.url,
|
|
'updated_at': r.updated_at
|
|
}
|
|
for r in repos
|
|
])
|
|
|
|
return f"Fetched {len(repos)} repositories"
|
|
|
|
def fetch_readmes(**context):
|
|
"""Task: Fetch READMEs for all repositories."""
|
|
from gitea_scraper import GiteaScraper
|
|
|
|
ti = context['ti']
|
|
repos = ti.xcom_pull(task_ids='fetch_repos', key='repos')
|
|
|
|
scraper = GiteaScraper(
|
|
base_url=os.getenv("GITEA_URL", "https://gitea.lab.audasmedia.com.au"),
|
|
token=os.getenv("GITEA_TOKEN", ""),
|
|
username=os.getenv("GITEA_USERNAME", "sam")
|
|
)
|
|
|
|
readme_data = []
|
|
for repo in repos[:10]: # Limit to 10 repos per run for testing
|
|
readme = scraper.get_readme(repo['name'])
|
|
if readme:
|
|
readme_data.append({
|
|
'repo': repo['name'],
|
|
'content': readme[:5000], # First 5000 chars
|
|
'url': repo['url']
|
|
})
|
|
|
|
ti.xcom_push(key='readme_data', value=readme_data)
|
|
|
|
return f"Fetched {len(readme_data)} READMEs"
|
|
|
|
def ingest_to_chroma(**context):
|
|
"""Task: Ingest fetched data into ChromaDB via knowledge service."""
|
|
import httpx
|
|
|
|
ti = context['ti']
|
|
readme_data = ti.xcom_pull(task_ids='fetch_readmes', key='readme_data')
|
|
|
|
knowledge_service_url = os.getenv("KNOWLEDGE_SERVICE_URL", "http://knowledge-service:8080")
|
|
|
|
documents_ingested = 0
|
|
for item in readme_data:
|
|
try:
|
|
# Call knowledge service ingest endpoint
|
|
response = httpx.post(
|
|
f"{knowledge_service_url}/ingest",
|
|
json={
|
|
'source': f"gitea:{item['repo']}",
|
|
'content': item['content'],
|
|
'metadata': {
|
|
'repo': item['repo'],
|
|
'url': item['url'],
|
|
'type': 'readme'
|
|
}
|
|
},
|
|
timeout=30.0
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
documents_ingested += 1
|
|
|
|
except Exception as e:
|
|
print(f"Error ingesting {item['repo']}: {e}")
|
|
|
|
return f"Ingested {documents_ingested} documents into ChromaDB"
|
|
|
|
# Define the DAG
|
|
with DAG(
|
|
'gitea_daily_ingestion',
|
|
default_args=default_args,
|
|
description='Daily ingestion of Gitea repositories into knowledge base',
|
|
schedule_interval=timedelta(days=1),
|
|
start_date=datetime(2024, 1, 1),
|
|
catchup=False,
|
|
tags=['gitea', 'ingestion', 'knowledge'],
|
|
) as dag:
|
|
|
|
fetch_repos_task = PythonOperator(
|
|
task_id='fetch_repos',
|
|
python_callable=fetch_gitea_repos,
|
|
)
|
|
|
|
fetch_readmes_task = PythonOperator(
|
|
task_id='fetch_readmes',
|
|
python_callable=fetch_readmes,
|
|
)
|
|
|
|
ingest_task = PythonOperator(
|
|
task_id='ingest_to_chroma',
|
|
python_callable=ingest_to_chroma,
|
|
)
|
|
|
|
fetch_repos_task >> fetch_readmes_task >> ingest_task
|
|
```
|
|
|
|
### File: `/home/sam/development/airflow/docker-compose.yml`
|
|
|
|
```yaml
|
|
version: '3.8'
|
|
|
|
x-airflow-common:
|
|
&airflow-common
|
|
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.8.1}
|
|
environment:
|
|
&airflow-common-env
|
|
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
|
|
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
|
|
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
|
|
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
|
|
AIRFLOW__CORE__FERNET_KEY: ''
|
|
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
|
|
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
|
|
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
|
|
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
|
|
volumes:
|
|
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
|
|
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
|
|
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
|
|
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
|
|
- /home/sam/development/knowledge_service:/opt/airflow/dags/repo:ro
|
|
user: "${AIRFLOW_UID:-50000}:0"
|
|
depends_on:
|
|
&airflow-common-depends-on
|
|
redis:
|
|
condition: service_healthy
|
|
postgres:
|
|
condition: service_healthy
|
|
|
|
services:
|
|
postgres:
|
|
image: postgres:13
|
|
environment:
|
|
POSTGRES_USER: airflow
|
|
POSTGRES_PASSWORD: airflow
|
|
POSTGRES_DB: airflow
|
|
volumes:
|
|
- postgres-db-volume:/var/lib/postgresql/data
|
|
healthcheck:
|
|
test: ["CMD", "pg_isready", "-U", "airflow"]
|
|
interval: 10s
|
|
retries: 5
|
|
start_period: 5s
|
|
restart: always
|
|
networks:
|
|
- ai-mesh
|
|
|
|
redis:
|
|
image: redis:latest
|
|
expose:
|
|
- 6379
|
|
healthcheck:
|
|
test: ["CMD", "redis-cli", "ping"]
|
|
interval: 10s
|
|
timeout: 30s
|
|
retries: 50
|
|
start_period: 30s
|
|
restart: always
|
|
networks:
|
|
- ai-mesh
|
|
|
|
airflow-webserver:
|
|
<<: *airflow-common
|
|
command: webserver
|
|
ports:
|
|
- "8081:8080"
|
|
healthcheck:
|
|
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
|
|
interval: 30s
|
|
timeout: 10s
|
|
retries: 5
|
|
start_period: 30s
|
|
restart: always
|
|
depends_on:
|
|
<<: *airflow-common-depends-on
|
|
airflow-init:
|
|
condition: service_completed_successfully
|
|
networks:
|
|
- ai-mesh
|
|
|
|
airflow-scheduler:
|
|
<<: *airflow-common
|
|
command: scheduler
|
|
healthcheck:
|
|
test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
|
|
interval: 30s
|
|
timeout: 10s
|
|
retries: 5
|
|
start_period: 30s
|
|
restart: always
|
|
depends_on:
|
|
<<: *airflow-common-depends-on
|
|
airflow-init:
|
|
condition: service_completed_successfully
|
|
networks:
|
|
- ai-mesh
|
|
|
|
airflow-worker:
|
|
<<: *airflow-common
|
|
command: celery worker
|
|
healthcheck:
|
|
test:
|
|
- "CMD-SHELL"
|
|
- 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
|
|
interval: 30s
|
|
timeout: 10s
|
|
retries: 5
|
|
start_period: 30s
|
|
restart: always
|
|
depends_on:
|
|
<<: *airflow-common-depends-on
|
|
airflow-init:
|
|
condition: service_completed_successfully
|
|
networks:
|
|
- ai-mesh
|
|
|
|
airflow-init:
|
|
<<: *airflow-common
|
|
entrypoint: /bin/bash
|
|
command:
|
|
- -c
|
|
- |
|
|
if [[ -z "${AIRFLOW_UID}" ]]; then
|
|
echo "WARNING!!!: AIRFLOW_UID not set!"
|
|
echo "Using default UID: 50000"
|
|
export AIRFLOW_UID=50000
|
|
fi
|
|
mkdir -p /sources/logs /sources/dags /sources/plugins
|
|
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
|
|
exec /entrypoint airflow version
|
|
environment:
|
|
<<: *airflow-common-env
|
|
_AIRFLOW_DB_MIGRATE: 'true'
|
|
_AIRFLOW_WWW_USER_CREATE: 'true'
|
|
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
|
|
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
|
|
user: "0:0"
|
|
volumes:
|
|
- ${AIRFLOW_PROJ_DIR:-.}:/sources
|
|
networks:
|
|
- ai-mesh
|
|
|
|
volumes:
|
|
postgres-db-volume:
|
|
|
|
networks:
|
|
ai-mesh:
|
|
external: true
|
|
```
|
|
|
|
---
|
|
|
|
## Phase 5: Terminal Commands
|
|
|
|
### Setup LangGraph Service
|
|
|
|
```bash
|
|
# Create langgraph_service directory
|
|
mkdir -p /home/sam/development/langgraph_service
|
|
|
|
# Create requirements.txt
|
|
cat > /home/sam/development/langgraph_service/requirements.txt << 'EOF'
|
|
fastapi
|
|
uvicorn
|
|
langgraph
|
|
langchain
|
|
langchain-community
|
|
langchain-openai
|
|
httpx
|
|
pydantic
|
|
EOF
|
|
|
|
# Start LangGraph service
|
|
cd /home/sam/development/langgraph_service
|
|
docker-compose up -d --build
|
|
|
|
# Verify
|
|
sleep 10
|
|
curl http://localhost:8090/health
|
|
curl http://localhost:8090/agents
|
|
```
|
|
|
|
### Setup Airflow
|
|
|
|
```bash
|
|
# Create directory structure
|
|
mkdir -p /home/sam/development/airflow/{dags,logs,config,plugins}
|
|
|
|
# Create .env file
|
|
cat > /home/sam/development/airflow/.env << 'EOF'
|
|
AIRFLOW_UID=1000
|
|
AIRFLOW_GID=0
|
|
AIRFLOW_PROJ_DIR=.
|
|
_AIRFLOW_WWW_USER_USERNAME=admin
|
|
_AIRFLOW_WWW_USER_PASSWORD=admin
|
|
GITEA_URL=https://gitea.lab.audasmedia.com.au
|
|
GITEA_TOKEN=your_token_here
|
|
GITEA_USERNAME=sam
|
|
KNOWLEDGE_SERVICE_URL=http://knowledge-service:8080
|
|
EOF
|
|
|
|
# Copy gitea_scraper.py to dags
|
|
cp /home/sam/development/knowledge_service/gitea_scraper.py /home/sam/development/airflow/dags/
|
|
|
|
# Start Airflow
|
|
cd /home/sam/development/airflow
|
|
docker-compose up -d
|
|
|
|
# Wait and verify
|
|
sleep 60
|
|
curl http://localhost:8081/health
|
|
```
|
|
|
|
### Update Chat Gateway
|
|
|
|
```bash
|
|
# Replace backend main.py with new version
|
|
# (Copy the main.py content from this file to: /home/sam/development/aboutme_chat_demo/backend/main.py)
|
|
|
|
# Update docker-compose to include langgraph dependency
|
|
cd /home/sam/development/aboutme_chat_demo
|
|
docker-compose up -d --build
|
|
|
|
# Verify
|
|
curl http://localhost:8000/health
|
|
```
|
|
|
|
### Test Gitea Scraper Locally
|
|
|
|
```bash
|
|
# Set environment variables
|
|
export GITEA_URL=https://gitea.lab.audasmedia.com.au
|
|
export GITEA_TOKEN=your_token_here
|
|
export GITEA_USERNAME=sam
|
|
|
|
# Run test
|
|
cd /home/sam/development/knowledge_service
|
|
python gitea_scraper.py
|
|
```
|
|
|
|
### Trigger Airflow DAG Manually
|
|
|
|
```bash
|
|
# Trigger the DAG
|
|
curl -X POST http://localhost:8081/api/v1/dags/gitea_daily_ingestion/dagRuns \
|
|
-H "Content-Type: application/json" \
|
|
-u admin:admin \
|
|
-d '{"conf": {}}'
|
|
```
|
|
|
|
### Complete Stack Startup
|
|
|
|
```bash
|
|
# Ensure network exists
|
|
docker network create ai-mesh 2>/dev/null || true
|
|
|
|
# Start all services in order
|
|
cd /home/sam/development/knowledge_service && docker-compose up -d
|
|
cd /home/sam/development/langgraph_service && docker-compose up -d
|
|
cd /home/sam/development/aboutme_chat_demo && docker-compose up -d
|
|
cd /home/sam/development/airflow && docker-compose up -d
|
|
|
|
# Verify all services
|
|
echo "Testing services..."
|
|
sleep 30
|
|
curl -s http://localhost:8000/health && echo "✓ Chat Gateway"
|
|
curl -s http://localhost:8080/health && echo "✓ Knowledge Service"
|
|
curl -s http://localhost:8090/health && echo "✓ LangGraph Service"
|
|
curl -s http://localhost:8081/health && echo "✓ Airflow"
|
|
```
|
|
|
|
### Test End-to-End
|
|
|
|
```bash
|
|
# Test query through LangGraph
|
|
curl -X POST http://localhost:8000/chat \
|
|
-H "Content-Type: application/json" \
|
|
-d '{"message": "What are Sam'"'"'s coding projects?"}'
|
|
|
|
# Test agent routing
|
|
curl -X POST http://localhost:8000/chat \
|
|
-H "Content-Type: application/json" \
|
|
-d '{"message": "Write a Python function to calculate fibonacci"}'
|
|
```
|
|
|
|
---
|
|
|
|
## Architecture Summary
|
|
|
|
```
|
|
User Query
|
|
|
|
|
v
|
|
┌───────────────────────┐
|
|
│ Chat Gateway │ Port 8000
|
|
│ (FastAPI) │ Routes to LangGraph
|
|
└───────────┬───────────┘
|
|
|
|
|
v
|
|
┌───────────────────────┐
|
|
│ LangGraph Supervisor │ Port 8090
|
|
│ (Agent Router) │ - Decides agent
|
|
└───────────┬───────────┘
|
|
|
|
|
┌───────────────┼───────────────┐
|
|
| | |
|
|
v v v
|
|
┌─────────┐ ┌──────────┐ ┌──────────┐
|
|
│Librarian│ │ Opencode │ │ Brain │
|
|
│ (RAG) │ │ (Coding) │ │ (LLM) │
|
|
└────┬────┘ └──────────┘ └──────────┘
|
|
|
|
|
v
|
|
┌─────────────────┐ ┌─────────────────┐
|
|
│ Knowledge │◄────│ Apache Airflow │
|
|
│ Service │ │ (Scheduler) │
|
|
│ (ChromaDB) │ └────────┬────────┘
|
|
│ Port 8080 │ |
|
|
└─────────────────┘ v
|
|
┌──────────────┐
|
|
│ Gitea API │
|
|
│ Scraper │
|
|
│ (Daily DAG) │
|
|
└──────────────┘
|
|
```
|
|
|
|
---
|
|
|
|
## Key Improvements
|
|
|
|
1. **Modular Docker Compose**: Each service has its own file
|
|
2. **Self-contained packages**: LangGraph has its own venv, no sharing
|
|
3. **Fast rebuilds**: /app/packages pattern for all services
|
|
4. **No hardcoded keywords**: LangGraph intelligently routes based on context
|
|
5. **Scalable**: Each service can be updated independently
|
|
6. **Scheduled ingestion**: Airflow runs daily Gitea scrapes
|
|
|
|
All code is modular, self-contained, and ready for copy-paste implementation.
|