- Frontend: Vite + React + TypeScript chat interface - Backend: FastAPI gateway with LangGraph routing - Knowledge Service: ChromaDB RAG with Gitea scraper - LangGraph Service: Multi-agent orchestration - Airflow: Scheduled Gitea ingestion DAG - Documentation: Complete plan and implementation guides Architecture: - Modular Docker Compose per service - External ai-mesh network for communication - Fast rebuilds with /app/packages pattern - Intelligent agent routing (no hardcoded keywords) Services: - Frontend (5173): React chat UI - Chat Gateway (8000): FastAPI entry point - LangGraph (8090): Agent orchestration - Knowledge (8080): ChromaDB RAG - Airflow (8081): Scheduled ingestion - PostgreSQL (5432): Chat history Excludes: node_modules, .venv, chroma_db, logs, .env files Includes: All source code, configs, docs, docker files
1108 lines
31 KiB
Markdown
1108 lines
31 KiB
Markdown
# Implementation Plan: Gitea Ingestion, Airflow Scheduling, and LangGraph Orchestration
|
|
|
|
## Overview
|
|
Building a complete AI agent pipeline with:
|
|
1. **Gitea API Scraper** - Custom module to fetch repos, READMEs, and code
|
|
2. **Apache Airflow** - Multi-service Docker setup for scheduled ingestion
|
|
3. **LangGraph Supervisor** - Agent orchestration service for multi-agent routing
|
|
|
|
---
|
|
|
|
## Phase 1: Gitea API Scraper Module
|
|
|
|
### File: `/home/sam/development/knowledge_service/gitea_scraper.py`
|
|
|
|
```python
|
|
"""
|
|
Gitea API Scraper - Fetches repos, READMEs, and source code
|
|
for ingestion into the knowledge base.
|
|
"""
|
|
import os
|
|
import httpx
|
|
import logging
|
|
from typing import List, Dict, Optional
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class RepoMetadata:
|
|
name: str
|
|
description: str
|
|
url: str
|
|
default_branch: str
|
|
updated_at: str
|
|
language: Optional[str]
|
|
|
|
class GiteaScraper:
|
|
def __init__(self, base_url: str, token: str, username: str = "sam"):
|
|
self.base_url = base_url.rstrip("/")
|
|
self.token = token
|
|
self.username = username
|
|
self.headers = {"Authorization": f"token {token}"}
|
|
|
|
def get_user_repos(self) -> List[RepoMetadata]:
|
|
"""Fetch all repositories for the user."""
|
|
repos = []
|
|
page = 1
|
|
|
|
while True:
|
|
url = f"{self.base_url}/api/v1/users/{self.username}/repos?page={page}&limit=50"
|
|
|
|
try:
|
|
response = httpx.get(url, headers=self.headers, timeout=30.0)
|
|
response.raise_for_status()
|
|
|
|
data = response.json()
|
|
if not data:
|
|
break
|
|
|
|
for repo in data:
|
|
repos.append(RepoMetadata(
|
|
name=repo["name"],
|
|
description=repo.get("description", ""),
|
|
url=repo["html_url"],
|
|
default_branch=repo["default_branch"],
|
|
updated_at=repo["updated_at"],
|
|
language=repo.get("language")
|
|
))
|
|
|
|
logger.info(f"Fetched page {page}, got {len(data)} repos")
|
|
page += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error fetching repos: {e}")
|
|
break
|
|
|
|
return repos
|
|
|
|
def get_readme(self, repo_name: str) -> str:
|
|
"""Fetch README content for a repository."""
|
|
# Try common README filenames
|
|
readme_names = ["README.md", "readme.md", "Readme.md", "README.rst"]
|
|
|
|
for readme_name in readme_names:
|
|
url = f"{self.base_url}/api/v1/repos/{self.username}/{repo_name}/raw/{readme_name}"
|
|
|
|
try:
|
|
response = httpx.get(url, headers=self.headers, timeout=10.0)
|
|
if response.status_code == 200:
|
|
return response.text
|
|
except Exception as e:
|
|
logger.warning(f"Failed to fetch {readme_name}: {e}")
|
|
continue
|
|
|
|
return ""
|
|
|
|
def get_repo_files(self, repo_name: str, path: str = "") -> List[Dict]:
|
|
"""List files in a repository directory."""
|
|
url = f"{self.base_url}/api/v1/repos/{self.username}/{repo_name}/contents/{path}"
|
|
|
|
try:
|
|
response = httpx.get(url, headers=self.headers, timeout=10.0)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
logger.error(f"Error listing files in {repo_name}/{path}: {e}")
|
|
return []
|
|
|
|
def get_file_content(self, repo_name: str, filepath: str) -> str:
|
|
"""Fetch content of a specific file."""
|
|
url = f"{self.base_url}/api/v1/repos/{self.username}/{repo_name}/raw/{filepath}"
|
|
|
|
try:
|
|
response = httpx.get(url, headers=self.headers, timeout=10.0)
|
|
if response.status_code == 200:
|
|
return response.text
|
|
except Exception as e:
|
|
logger.error(f"Error fetching file {filepath}: {e}")
|
|
|
|
return ""
|
|
|
|
# Test function
|
|
if __name__ == "__main__":
|
|
scraper = GiteaScraper(
|
|
base_url=os.getenv("GITEA_URL", "https://gitea.lab.audasmedia.com.au"),
|
|
token=os.getenv("GITEA_TOKEN", ""),
|
|
username=os.getenv("GITEA_USERNAME", "sam")
|
|
)
|
|
|
|
repos = scraper.get_user_repos()
|
|
print(f"Found {len(repos)} repositories")
|
|
|
|
for repo in repos[:3]: # Test with first 3
|
|
print(f"\nRepo: {repo.name}")
|
|
readme = scraper.get_readme(repo.name)
|
|
if readme:
|
|
print(f"README preview: {readme[:200]}...")
|
|
```
|
|
|
|
---
|
|
|
|
## Phase 2: Apache Airflow Multi-Service Setup
|
|
|
|
### File: `/home/sam/development/airflow/docker-compose.yml`
|
|
|
|
```yaml
|
|
version: '3.8'
|
|
|
|
x-airflow-common:
|
|
&airflow-common
|
|
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.8.1}
|
|
environment:
|
|
&airflow-common-env
|
|
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
|
|
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
|
|
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
|
|
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
|
|
AIRFLOW__CORE__FERNET_KEY: ''
|
|
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
|
|
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
|
|
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
|
|
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
|
|
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
|
|
volumes:
|
|
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
|
|
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
|
|
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
|
|
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
|
|
user: "${AIRFLOW_UID:-50000}:0"
|
|
depends_on:
|
|
&airflow-common-depends-on
|
|
redis:
|
|
condition: service_healthy
|
|
postgres:
|
|
condition: service_healthy
|
|
|
|
services:
|
|
postgres:
|
|
image: postgres:13
|
|
environment:
|
|
POSTGRES_USER: airflow
|
|
POSTGRES_PASSWORD: airflow
|
|
POSTGRES_DB: airflow
|
|
volumes:
|
|
- postgres-db-volume:/var/lib/postgresql/data
|
|
healthcheck:
|
|
test: ["CMD", "pg_isready", "-U", "airflow"]
|
|
interval: 10s
|
|
retries: 5
|
|
start_period: 5s
|
|
restart: always
|
|
networks:
|
|
- ai-mesh
|
|
|
|
redis:
|
|
image: redis:latest
|
|
expose:
|
|
- 6379
|
|
healthcheck:
|
|
test: ["CMD", "redis-cli", "ping"]
|
|
interval: 10s
|
|
timeout: 30s
|
|
retries: 50
|
|
start_period: 30s
|
|
restart: always
|
|
networks:
|
|
- ai-mesh
|
|
|
|
airflow-webserver:
|
|
<<: *airflow-common
|
|
command: webserver
|
|
ports:
|
|
- "8081:8080"
|
|
healthcheck:
|
|
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
|
|
interval: 30s
|
|
timeout: 10s
|
|
retries: 5
|
|
start_period: 30s
|
|
restart: always
|
|
depends_on:
|
|
<<: *airflow-common-depends-on
|
|
airflow-init:
|
|
condition: service_completed_successfully
|
|
networks:
|
|
- ai-mesh
|
|
|
|
airflow-scheduler:
|
|
<<: *airflow-common
|
|
command: scheduler
|
|
healthcheck:
|
|
test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
|
|
interval: 30s
|
|
timeout: 10s
|
|
retries: 5
|
|
start_period: 30s
|
|
restart: always
|
|
depends_on:
|
|
<<: *airflow-common-depends-on
|
|
airflow-init:
|
|
condition: service_completed_successfully
|
|
networks:
|
|
- ai-mesh
|
|
|
|
airflow-worker:
|
|
<<: *airflow-common
|
|
command: celery worker
|
|
healthcheck:
|
|
test:
|
|
- "CMD-SHELL"
|
|
- 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
|
|
interval: 30s
|
|
timeout: 10s
|
|
retries: 5
|
|
start_period: 30s
|
|
restart: always
|
|
depends_on:
|
|
<<: *airflow-common-depends-on
|
|
airflow-init:
|
|
condition: service_completed_successfully
|
|
networks:
|
|
- ai-mesh
|
|
|
|
airflow-triggerer:
|
|
<<: *airflow-common
|
|
command: triggerer
|
|
healthcheck:
|
|
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
|
|
interval: 30s
|
|
timeout: 10s
|
|
retries: 5
|
|
start_period: 30s
|
|
restart: always
|
|
depends_on:
|
|
<<: *airflow-common-depends-on
|
|
airflow-init:
|
|
condition: service_completed_successfully
|
|
networks:
|
|
- ai-mesh
|
|
|
|
airflow-init:
|
|
<<: *airflow-common
|
|
entrypoint: /bin/bash
|
|
command:
|
|
- -c
|
|
- |
|
|
if [[ -z "${AIRFLOW_UID}" ]]; then
|
|
echo "WARNING!!!: AIRFLOW_UID not set!"
|
|
echo "Using default UID: 50000"
|
|
export AIRFLOW_UID=50000
|
|
fi
|
|
mkdir -p /sources/logs /sources/dags /sources/plugins
|
|
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
|
|
exec /entrypoint airflow version
|
|
environment:
|
|
<<: *airflow-common-env
|
|
_AIRFLOW_DB_MIGRATE: 'true'
|
|
_AIRFLOW_WWW_USER_CREATE: 'true'
|
|
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
|
|
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
|
|
user: "0:0"
|
|
volumes:
|
|
- ${AIRFLOW_PROJ_DIR:-.}:/sources
|
|
networks:
|
|
- ai-mesh
|
|
|
|
airflow-cli:
|
|
<<: *airflow-common
|
|
profiles:
|
|
- debug
|
|
environment:
|
|
<<: *airflow-common-env
|
|
CONNECTION_CHECK_MAX_COUNT: "0"
|
|
command:
|
|
- bash
|
|
- -c
|
|
- airflow
|
|
networks:
|
|
- ai-mesh
|
|
|
|
volumes:
|
|
postgres-db-volume:
|
|
|
|
networks:
|
|
ai-mesh:
|
|
external: true
|
|
```
|
|
|
|
### File: `/home/sam/development/airflow/dags/gitea_ingestion_dag.py`
|
|
|
|
```python
|
|
"""
|
|
Airflow DAG for scheduled Gitea repository ingestion.
|
|
Runs daily to fetch new/updated repos and ingest into ChromaDB.
|
|
"""
|
|
from datetime import datetime, timedelta
|
|
from airflow import DAG
|
|
from airflow.operators.python import PythonOperator
|
|
from airflow.providers.http.operators.http import SimpleHttpOperator
|
|
import os
|
|
import sys
|
|
import json
|
|
|
|
# Add knowledge_service to path for imports
|
|
sys.path.insert(0, '/opt/airflow/dags/repo')
|
|
|
|
default_args = {
|
|
'owner': 'airflow',
|
|
'depends_on_past': False,
|
|
'email_on_failure': False,
|
|
'email_on_retry': False,
|
|
'retries': 1,
|
|
'retry_delay': timedelta(minutes=5),
|
|
}
|
|
|
|
def fetch_gitea_repos(**context):
|
|
"""Task: Fetch all repositories from Gitea."""
|
|
from gitea_scraper import GiteaScraper
|
|
|
|
scraper = GiteaScraper(
|
|
base_url=os.getenv("GITEA_URL", "https://gitea.lab.audasmedia.com.au"),
|
|
token=os.getenv("GITEA_TOKEN", ""),
|
|
username=os.getenv("GITEA_USERNAME", "sam")
|
|
)
|
|
|
|
repos = scraper.get_user_repos()
|
|
|
|
# Push to XCom for downstream tasks
|
|
context['ti'].xcom_push(key='repo_count', value=len(repos))
|
|
context['ti'].xcom_push(key='repos', value=[
|
|
{
|
|
'name': r.name,
|
|
'description': r.description,
|
|
'url': r.url,
|
|
'updated_at': r.updated_at
|
|
}
|
|
for r in repos
|
|
])
|
|
|
|
return f"Fetched {len(repos)} repositories"
|
|
|
|
def fetch_readmes(**context):
|
|
"""Task: Fetch READMEs for all repositories."""
|
|
from gitea_scraper import GiteaScraper
|
|
|
|
ti = context['ti']
|
|
repos = ti.xcom_pull(task_ids='fetch_repos', key='repos')
|
|
|
|
scraper = GiteaScraper(
|
|
base_url=os.getenv("GITEA_URL", "https://gitea.lab.audasmedia.com.au"),
|
|
token=os.getenv("GITEA_TOKEN", ""),
|
|
username=os.getenv("GITEA_USERNAME", "sam")
|
|
)
|
|
|
|
readme_data = []
|
|
for repo in repos[:10]: # Limit to 10 repos per run for testing
|
|
readme = scraper.get_readme(repo['name'])
|
|
if readme:
|
|
readme_data.append({
|
|
'repo': repo['name'],
|
|
'content': readme[:5000], # First 5000 chars
|
|
'url': repo['url']
|
|
})
|
|
|
|
ti.xcom_push(key='readme_data', value=readme_data)
|
|
|
|
return f"Fetched {len(readme_data)} READMEs"
|
|
|
|
def ingest_to_chroma(**context):
|
|
"""Task: Ingest fetched data into ChromaDB via knowledge service."""
|
|
import httpx
|
|
|
|
ti = context['ti']
|
|
readme_data = ti.xcom_pull(task_ids='fetch_readmes', key='readme_data')
|
|
|
|
knowledge_service_url = os.getenv("KNOWLEDGE_SERVICE_URL", "http://knowledge-service:8080")
|
|
|
|
documents_ingested = 0
|
|
for item in readme_data:
|
|
try:
|
|
# Call knowledge service ingest endpoint
|
|
response = httpx.post(
|
|
f"{knowledge_service_url}/ingest",
|
|
json={
|
|
'source': f"gitea:{item['repo']}",
|
|
'content': item['content'],
|
|
'metadata': {
|
|
'repo': item['repo'],
|
|
'url': item['url'],
|
|
'type': 'readme'
|
|
}
|
|
},
|
|
timeout=30.0
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
documents_ingested += 1
|
|
|
|
except Exception as e:
|
|
print(f"Error ingesting {item['repo']}: {e}")
|
|
|
|
return f"Ingested {documents_ingested} documents into ChromaDB"
|
|
|
|
# Define the DAG
|
|
with DAG(
|
|
'gitea_daily_ingestion',
|
|
default_args=default_args,
|
|
description='Daily ingestion of Gitea repositories into knowledge base',
|
|
schedule_interval=timedelta(days=1), # Run daily
|
|
start_date=datetime(2024, 1, 1),
|
|
catchup=False,
|
|
tags=['gitea', 'ingestion', 'knowledge'],
|
|
) as dag:
|
|
|
|
# Task 1: Fetch repository list
|
|
fetch_repos_task = PythonOperator(
|
|
task_id='fetch_repos',
|
|
python_callable=fetch_gitea_repos,
|
|
)
|
|
|
|
# Task 2: Fetch README content
|
|
fetch_readmes_task = PythonOperator(
|
|
task_id='fetch_readmes',
|
|
python_callable=fetch_readmes,
|
|
)
|
|
|
|
# Task 3: Ingest into ChromaDB
|
|
ingest_task = PythonOperator(
|
|
task_id='ingest_to_chroma',
|
|
python_callable=ingest_to_chroma,
|
|
)
|
|
|
|
# Define task dependencies
|
|
fetch_repos_task >> fetch_readmes_task >> ingest_task
|
|
```
|
|
|
|
### File: `/home/sam/development/airflow/.env`
|
|
|
|
```bash
|
|
# Airflow Configuration
|
|
AIRFLOW_UID=1000
|
|
AIRFLOW_GID=0
|
|
AIRFLOW_PROJ_DIR=.
|
|
_AIRFLOW_WWW_USER_USERNAME=admin
|
|
_AIRFLOW_WWW_USER_PASSWORD=admin
|
|
|
|
# Gitea Configuration
|
|
GITEA_URL=https://gitea.lab.audasmedia.com.au
|
|
GITEA_TOKEN=your_token_here
|
|
GITEA_USERNAME=sam
|
|
|
|
# Knowledge Service
|
|
KNOWLEDGE_SERVICE_URL=http://knowledge-service:8080
|
|
```
|
|
|
|
---
|
|
|
|
## Phase 3: LangGraph Supervisor Service
|
|
|
|
### File: `/home/sam/development/langgraph_service/requirements.txt`
|
|
|
|
```
|
|
fastapi
|
|
uvicorn
|
|
langgraph
|
|
langchain
|
|
langchain-community
|
|
langchain-openai
|
|
httpx
|
|
pydantic
|
|
```
|
|
|
|
### File: `/home/sam/development/langgraph_service/supervisor_agent.py`
|
|
|
|
```python
|
|
"""
|
|
LangGraph Supervisor Agent - Routes queries to specialist agents
|
|
"""
|
|
from typing import TypedDict, Annotated, Sequence
|
|
from langgraph.graph import StateGraph, END
|
|
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
|
|
import operator
|
|
import httpx
|
|
import os
|
|
import logging
|
|
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# State definition
|
|
class AgentState(TypedDict):
|
|
messages: Annotated[Sequence[BaseMessage], operator.add]
|
|
next_agent: str
|
|
context: dict
|
|
|
|
# Agent routing logic
|
|
def supervisor_node(state: AgentState):
|
|
"""Supervisor decides which specialist agent to call."""
|
|
last_message = state["messages"][-1].content.lower()
|
|
|
|
# Simple routing logic based on keywords
|
|
if any(kw in last_message for kw in ["repo", "code", "git", "github", "gitea", "project", "development"]):
|
|
return {"next_agent": "librarian"}
|
|
elif any(kw in last_message for kw in ["write", "edit", "create", "fix", "bug", "implement", "code change"]):
|
|
return {"next_agent": "opencode"}
|
|
elif any(kw in last_message for kw in ["sam", "hobby", "music", "experience", "skill", "about"]):
|
|
return {"next_agent": "librarian"}
|
|
else:
|
|
return {"next_agent": "brain"} # Default to general LLM
|
|
|
|
def librarian_agent(state: AgentState):
|
|
"""Librarian agent - queries knowledge base (ChromaDB)."""
|
|
last_message = state["messages"][-1].content
|
|
|
|
try:
|
|
# Call knowledge service
|
|
response = httpx.post(
|
|
"http://knowledge-service:8080/query",
|
|
json={"question": last_message},
|
|
timeout=10.0
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
context = response.json().get("context", "")
|
|
return {
|
|
"messages": [AIMessage(content=f"Based on my knowledge base:\n\n{context}")],
|
|
"context": {"source": "librarian", "context": context}
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Librarian error: {e}")
|
|
|
|
return {
|
|
"messages": [AIMessage(content="I couldn't find relevant information in the knowledge base.")],
|
|
"context": {"source": "librarian", "error": str(e)}
|
|
}
|
|
|
|
def opencode_agent(state: AgentState):
|
|
"""Opencode agent - handles coding tasks via MCP."""
|
|
last_message = state["messages"][-1].content
|
|
|
|
# Placeholder - would integrate with opencode-brain
|
|
return {
|
|
"messages": [AIMessage(content=f"I'm the coding agent. I would help you with: {last_message}")],
|
|
"context": {"source": "opencode", "action": "coding_task"}
|
|
}
|
|
|
|
def brain_agent(state: AgentState):
|
|
"""Brain agent - general LLM fallback."""
|
|
last_message = state["messages"][-1].content
|
|
|
|
try:
|
|
# Call opencode-brain service
|
|
auth = httpx.BasicAuth("opencode", os.getenv("OPENCODE_PASSWORD", "sam4jo"))
|
|
timeout_long = httpx.Timeout(180.0, connect=10.0)
|
|
|
|
with httpx.AsyncClient(auth=auth, timeout=timeout_long) as client:
|
|
# Create session
|
|
session_res = client.post("http://opencode-brain:5000/session", json={"title": "Supervisor Query"})
|
|
session_id = session_res.json()["id"]
|
|
|
|
# Send message
|
|
response = client.post(
|
|
f"http://opencode-brain:5000/session/{session_id}/message",
|
|
json={"parts": [{"type": "text", "text": last_message}]}
|
|
)
|
|
|
|
data = response.json()
|
|
if "parts" in data:
|
|
for part in data["parts"]:
|
|
if part.get("type") == "text":
|
|
return {
|
|
"messages": [AIMessage(content=part["text"])],
|
|
"context": {"source": "brain"}
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Brain error: {e}")
|
|
|
|
return {
|
|
"messages": [AIMessage(content="I'm thinking about this...")],
|
|
"context": {"source": "brain"}
|
|
}
|
|
|
|
def route_decision(state: AgentState):
|
|
"""Routing function based on supervisor decision."""
|
|
return state["next_agent"]
|
|
|
|
# Build the graph
|
|
workflow = StateGraph(AgentState)
|
|
|
|
# Add nodes
|
|
workflow.add_node("supervisor", supervisor_node)
|
|
workflow.add_node("librarian", librarian_agent)
|
|
workflow.add_node("opencode", opencode_agent)
|
|
workflow.add_node("brain", brain_agent)
|
|
|
|
# Add edges
|
|
workflow.set_entry_point("supervisor")
|
|
|
|
# Conditional routing from supervisor
|
|
workflow.add_conditional_edges(
|
|
"supervisor",
|
|
route_decision,
|
|
{
|
|
"librarian": "librarian",
|
|
"opencode": "opencode",
|
|
"brain": "brain"
|
|
}
|
|
)
|
|
|
|
# All specialist agents end
|
|
workflow.add_edge("librarian", END)
|
|
workflow.add_edge("opencode", END)
|
|
workflow.add_edge("brain", END)
|
|
|
|
# Compile the graph
|
|
supervisor_graph = workflow.compile()
|
|
|
|
# Main entry point for queries
|
|
async def process_query(query: str) -> dict:
|
|
"""Process a query through the supervisor graph."""
|
|
result = await supervisor_graph.ainvoke({
|
|
"messages": [HumanMessage(content=query)],
|
|
"next_agent": "",
|
|
"context": {}
|
|
})
|
|
|
|
return {
|
|
"response": result["messages"][-1].content,
|
|
"context": result.get("context", {})
|
|
}
|
|
```
|
|
|
|
### File: `/home/sam/development/langgraph_service/main.py`
|
|
|
|
```python
|
|
"""
|
|
LangGraph Supervisor Service - FastAPI wrapper for agent orchestration
|
|
"""
|
|
from fastapi import FastAPI
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel
|
|
from supervisor_agent import process_query
|
|
import logging
|
|
import sys
|
|
|
|
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = FastAPI(title="LangGraph Supervisor Service")
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
class QueryRequest(BaseModel):
|
|
query: str
|
|
|
|
class QueryResponse(BaseModel):
|
|
response: str
|
|
agent_used: str
|
|
context: dict
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {"status": "healthy", "service": "langgraph-supervisor"}
|
|
|
|
@app.post("/query", response_model=QueryResponse)
|
|
async def query_supervisor(request: QueryRequest):
|
|
"""Main entry point for agent orchestration."""
|
|
logger.info(f"Received query: {request.query}")
|
|
|
|
try:
|
|
result = await process_query(request.query)
|
|
|
|
return QueryResponse(
|
|
response=result["response"],
|
|
agent_used=result["context"].get("source", "unknown"),
|
|
context=result["context"]
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error processing query: {e}")
|
|
return QueryResponse(
|
|
response="Error processing your request",
|
|
agent_used="error",
|
|
context={"error": str(e)}
|
|
)
|
|
|
|
@app.get("/agents")
|
|
async def list_agents():
|
|
"""List available specialist agents."""
|
|
return {
|
|
"agents": [
|
|
{
|
|
"name": "librarian",
|
|
"description": "Queries the knowledge base for semantic information",
|
|
"triggers": ["repo", "code", "git", "hobby", "about", "skill"]
|
|
},
|
|
{
|
|
"name": "opencode",
|
|
"description": "Handles coding tasks and file modifications",
|
|
"triggers": ["write", "edit", "create", "fix", "implement"]
|
|
},
|
|
{
|
|
"name": "brain",
|
|
"description": "General LLM for reasoning and generation",
|
|
"triggers": ["default", "general questions"]
|
|
}
|
|
]
|
|
}
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8090)
|
|
```
|
|
|
|
### File: `/home/sam/development/langgraph_service/Dockerfile`
|
|
|
|
```dockerfile
|
|
FROM python:3.11-slim
|
|
|
|
# Install system dependencies
|
|
RUN apt-get update && apt-get install -y \
|
|
gcc \
|
|
g++ \
|
|
&& rm -rf /var/lib/apt/lists/*
|
|
|
|
# Create app directory
|
|
WORKDIR /app
|
|
|
|
# Copy requirements
|
|
COPY requirements.txt .
|
|
RUN pip install --no-cache-dir -r requirements.txt
|
|
|
|
# Copy code
|
|
COPY . .
|
|
|
|
EXPOSE 8090
|
|
|
|
CMD ["python3", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8090"]
|
|
```
|
|
|
|
---
|
|
|
|
## Phase 4: Integration - Updated Docker Compose
|
|
|
|
### File: `/home/sam/development/docker-compose.integrated.yml`
|
|
|
|
```yaml
|
|
version: '3.8'
|
|
|
|
services:
|
|
# Existing Knowledge Service
|
|
knowledge-service:
|
|
build: ./knowledge_service
|
|
container_name: knowledge-service
|
|
ports:
|
|
- "8080:8080"
|
|
volumes:
|
|
- ./knowledge_service/data:/app/code/data
|
|
- ./knowledge_service/chroma_db:/app/code/chroma_db
|
|
- ./knowledge_service/main.py:/app/code/main.py:ro
|
|
- ./knowledge_service/gitea_scraper.py:/app/code/gitea_scraper.py:ro
|
|
environment:
|
|
- PYTHONUNBUFFERED=1
|
|
- OPENROUTER_API_KEY=${OPENROUTER_API_KEY}
|
|
- PYTHONPATH=/app/packages
|
|
- GITEA_URL=${GITEA_URL}
|
|
- GITEA_TOKEN=${GITEA_TOKEN}
|
|
- GITEA_USERNAME=${GITEA_USERNAME:-sam}
|
|
networks:
|
|
- ai-mesh
|
|
restart: unless-stopped
|
|
|
|
# LangGraph Supervisor Service
|
|
langgraph-service:
|
|
build: ./langgraph_service
|
|
container_name: langgraph-service
|
|
ports:
|
|
- "8090:8090"
|
|
environment:
|
|
- OPENCODE_PASSWORD=${OPENCODE_PASSWORD:-sam4jo}
|
|
- KNOWLEDGE_SERVICE_URL=http://knowledge-service:8080
|
|
depends_on:
|
|
- knowledge-service
|
|
networks:
|
|
- ai-mesh
|
|
restart: unless-stopped
|
|
|
|
# Chat Gateway (Updated to use LangGraph)
|
|
chat-gateway:
|
|
build: ./aboutme_chat_demo/backend
|
|
container_name: chat-gateway
|
|
ports:
|
|
- "8000:8000"
|
|
volumes:
|
|
- ./aboutme_chat_demo/backend:/app
|
|
environment:
|
|
- DATABASE_URL=postgresql://sam:sam4jo@db:5432/chat_demo
|
|
- LANGGRAPH_URL=http://langgraph-service:8090
|
|
depends_on:
|
|
- langgraph-service
|
|
- db
|
|
networks:
|
|
- ai-mesh
|
|
restart: unless-stopped
|
|
|
|
# Frontend
|
|
frontend:
|
|
build: ./aboutme_chat_demo/frontend
|
|
container_name: chat-frontend
|
|
ports:
|
|
- "5173:5173"
|
|
volumes:
|
|
- ./aboutme_chat_demo/frontend:/app
|
|
- /app/node_modules
|
|
environment:
|
|
- CHOKIDAR_USEPOLLING=true
|
|
networks:
|
|
- ai-mesh
|
|
|
|
# PostgreSQL for chat history
|
|
db:
|
|
image: postgres:15-alpine
|
|
container_name: chat-db
|
|
environment:
|
|
POSTGRES_USER: sam
|
|
POSTGRES_PASSWORD: sam4jo
|
|
POSTGRES_DB: chat_demo
|
|
ports:
|
|
- "5432:5432"
|
|
volumes:
|
|
- postgres_data:/var/lib/postgresql/data
|
|
networks:
|
|
- ai-mesh
|
|
restart: unless-stopped
|
|
|
|
volumes:
|
|
postgres_data:
|
|
|
|
networks:
|
|
ai-mesh:
|
|
external: true
|
|
```
|
|
|
|
---
|
|
|
|
## Phase 5: Updated Chat Gateway
|
|
|
|
### File: `/home/sam/development/aboutme_chat_demo/backend/main.py`
|
|
|
|
```python
|
|
from fastapi import FastAPI
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel
|
|
import httpx
|
|
import logging
|
|
import sys
|
|
import traceback
|
|
import os
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[logging.StreamHandler(sys.stdout)])
|
|
logger = logging.getLogger(__name__)
|
|
|
|
app = FastAPI()
|
|
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
|
|
|
|
class MessageRequest(BaseModel):
|
|
message: str
|
|
|
|
LANGGRAPH_URL = os.getenv("LANGGRAPH_URL", "http://langgraph-service:8090")
|
|
|
|
@app.post("/chat")
|
|
async def chat(request: MessageRequest):
|
|
"""Updated chat endpoint that routes through LangGraph Supervisor."""
|
|
logger.info(f"Gateway: Received message: {request.message}")
|
|
|
|
try:
|
|
# Call LangGraph Supervisor instead of direct brain
|
|
async with httpx.AsyncClient(timeout=httpx.Timeout(60.0, connect=10.0)) as client:
|
|
response = await client.post(
|
|
f"{LANGGRAPH_URL}/query",
|
|
json={"query": request.message}
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
logger.info(f"Gateway: Response from {result.get('agent_used', 'unknown')} agent")
|
|
return {"response": result["response"]}
|
|
else:
|
|
logger.error(f"Gateway: LangGraph error {response.status_code}")
|
|
return {"response": "Error: Orchestration service unavailable"}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Gateway: Error routing through LangGraph: {traceback.format_exc()}")
|
|
return {"response": "Error: Unable to process your request at this time."}
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {"status": "healthy", "service": "chat-gateway"}
|
|
```
|
|
|
|
---
|
|
|
|
## Terminal Commands
|
|
|
|
### Setup Airflow Environment
|
|
|
|
```bash
|
|
# Create airflow directory structure
|
|
mkdir -p /home/sam/development/airflow/{dags,logs,config,plugins}
|
|
|
|
# Copy gitea_scraper.py to airflow dags folder
|
|
cp /home/sam/development/knowledge_service/gitea_scraper.py /home/sam/development/airflow/dags/
|
|
|
|
# Set proper permissions (Airflow runs as UID 50000 in container)
|
|
echo -e "AIRFLOW_UID=1000\nAIRFLOW_GID=0" > /home/sam/development/airflow/.env
|
|
|
|
# Start Airflow services
|
|
cd /home/sam/development/airflow
|
|
docker-compose up -d
|
|
|
|
# Check Airflow webserver (wait 30 seconds for init)
|
|
sleep 30
|
|
curl http://localhost:8081/health
|
|
|
|
# Access Airflow UI
|
|
# http://localhost:8081 (login: admin/admin)
|
|
```
|
|
|
|
### Setup LangGraph Service
|
|
|
|
```bash
|
|
# Create langgraph_service directory
|
|
mkdir -p /home/sam/development/langgraph_service
|
|
|
|
# Write requirements.txt
|
|
cat > /home/sam/development/langgraph_service/requirements.txt << 'EOF'
|
|
fastapi
|
|
uvicorn
|
|
langgraph
|
|
langchain
|
|
langchain-community
|
|
langchain-openai
|
|
httpx
|
|
pydantic
|
|
EOF
|
|
|
|
# Build and start LangGraph service
|
|
cd /home/sam/development/langgraph_service
|
|
docker build -t langgraph-service:latest .
|
|
docker run -d \
|
|
--name langgraph-service \
|
|
-p 8090:8090 \
|
|
--network ai-mesh \
|
|
-e OPENCODE_PASSWORD=sam4jo \
|
|
langgraph-service:latest
|
|
|
|
# Test LangGraph service
|
|
curl http://localhost:8090/health
|
|
curl http://localhost:8090/agents
|
|
```
|
|
|
|
### Test Gitea Scraper Locally
|
|
|
|
```bash
|
|
# Set environment variables
|
|
export GITEA_URL=https://gitea.lab.audasmedia.com.au
|
|
export GITEA_TOKEN=your_token_here
|
|
export GITEA_USERNAME=sam
|
|
|
|
# Run scraper test
|
|
cd /home/sam/development/knowledge_service
|
|
python gitea_scraper.py
|
|
```
|
|
|
|
### Start Complete Integrated Stack
|
|
|
|
```bash
|
|
# Ensure ai-mesh network exists
|
|
docker network create ai-mesh 2>/dev/null || true
|
|
|
|
# Start all services
|
|
cd /home/sam/development
|
|
docker-compose -f docker-compose.integrated.yml up -d
|
|
|
|
# Verify all services
|
|
curl http://localhost:8000/health # Chat Gateway
|
|
curl http://localhost:8080/health # Knowledge Service
|
|
curl http://localhost:8090/health # LangGraph Service
|
|
curl http://localhost:8081/health # Airflow
|
|
|
|
# Test end-to-end
|
|
curl -X POST http://localhost:8000/chat \
|
|
-H "Content-Type: application/json" \
|
|
-d '{"message": "What are Sam\'s hobbies?"}'
|
|
```
|
|
|
|
### Manual Trigger Airflow DAG
|
|
|
|
```bash
|
|
# Trigger the Gitea ingestion DAG manually
|
|
curl -X POST http://localhost:8081/api/v1/dags/gitea_daily_ingestion/dagRuns \
|
|
-H "Content-Type: application/json" \
|
|
-u admin:admin \
|
|
-d '{"conf": {}}'
|
|
```
|
|
|
|
---
|
|
|
|
## Architecture Summary
|
|
|
|
```
|
|
User Query
|
|
|
|
|
v
|
|
┌─────────────────┐
|
|
│ Chat Gateway │ (Port 8000)
|
|
│ (FastAPI) │
|
|
└────────┬────────┘
|
|
|
|
|
v
|
|
┌─────────────────┐
|
|
│ LangGraph │ (Port 8090)
|
|
│ Supervisor │ - Routes to specialist agents
|
|
│ (StateGraph) │
|
|
└────────┬────────┘
|
|
|
|
|
┌────┴────┬──────────┐
|
|
▼ ▼ ▼
|
|
┌────────┐ ┌──────────┐ ┌────────┐
|
|
│Librarian│ │Opencode │ │ Brain │
|
|
│(RAG) │ │(Coding) │ │(LLM) │
|
|
└────┬───┘ └──────────┘ └────────┘
|
|
|
|
|
v
|
|
┌─────────────────┐ ┌─────────────────┐
|
|
│ Knowledge │◄────│ Apache Airflow │
|
|
│ Service │ │ (Port 8081) │
|
|
│ (ChromaDB) │ │ - Scheduled │
|
|
│ (Port 8080) │ │ ingestion │
|
|
└─────────────────┘ └────────┬────────┘
|
|
|
|
|
v
|
|
┌──────────────┐
|
|
│ Gitea API │
|
|
│ Scraper │
|
|
└──────────────┘
|
|
```
|
|
|
|
---
|
|
|
|
## Next Steps
|
|
|
|
1. **Add Gitea token** to `.env` file
|
|
2. **Build and test** Gitea scraper locally
|
|
3. **Deploy Airflow** with `docker-compose up -d`
|
|
4. **Build LangGraph service** and test routing
|
|
5. **Update Chat Gateway** to use LangGraph
|
|
6. **Test end-to-end** flow with a query like "What coding projects does Sam have?"
|
|
|
|
**All code is ready for copy-paste implementation.**
|