""" Airflow DAG for scheduled Gitea repository ingestion. Runs daily to fetch new/updated repos and ingest into ChromaDB. """ from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.http.operators.http import SimpleHttpOperator import os import sys import json # Add knowledge_service to path for imports sys.path.insert(0, '/opt/airflow/dags/repo') default_args = { 'owner': 'airflow', 'depends_on_past': False, 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } def fetch_gitea_repos(**context): """Task: Fetch all repositories from Gitea.""" from gitea_scraper import GiteaScraper scraper = GiteaScraper( base_url=os.getenv("GITEA_URL", "https://gitea.lab.audasmedia.com.au"), token=os.getenv("GITEA_TOKEN", ""), username=os.getenv("GITEA_USERNAME", "sam") ) repos = scraper.get_user_repos() # Push to XCom for downstream tasks context['ti'].xcom_push(key='repo_count', value=len(repos)) context['ti'].xcom_push(key='repos', value=[ { 'name': r.name, 'description': r.description, 'url': r.url, 'updated_at': r.updated_at } for r in repos ]) return f"Fetched {len(repos)} repositories" def fetch_readmes(**context): """Task: Fetch READMEs for all repositories.""" from gitea_scraper import GiteaScraper ti = context['ti'] repos = ti.xcom_pull(task_ids='fetch_repos', key='repos') scraper = GiteaScraper( base_url=os.getenv("GITEA_URL", "https://gitea.lab.audasmedia.com.au"), token=os.getenv("GITEA_TOKEN", ""), username=os.getenv("GITEA_USERNAME", "sam") ) readme_data = [] for repo in repos[:10]: # Limit to 10 repos per run for testing readme = scraper.get_readme(repo['name']) if readme: readme_data.append({ 'repo': repo['name'], 'content': readme[:5000], # First 5000 chars 'url': repo['url'] }) ti.xcom_push(key='readme_data', value=readme_data) return f"Fetched {len(readme_data)} READMEs" def ingest_to_chroma(**context): """Task: Ingest fetched data into ChromaDB via knowledge service.""" import httpx ti = context['ti'] readme_data = ti.xcom_pull(task_ids='fetch_readmes', key='readme_data') knowledge_service_url = os.getenv("KNOWLEDGE_SERVICE_URL", "http://knowledge-service:8080") documents_ingested = 0 for item in readme_data: try: # Call knowledge service ingest endpoint response = httpx.post( f"{knowledge_service_url}/ingest", json={ 'source': f"gitea:{item['repo']}", 'content': item['content'], 'metadata': { 'repo': item['repo'], 'url': item['url'], 'type': 'readme' } }, timeout=30.0 ) if response.status_code == 200: documents_ingested += 1 except Exception as e: print(f"Error ingesting {item['repo']}: {e}") return f"Ingested {documents_ingested} documents into ChromaDB" # Define the DAG with DAG( 'gitea_daily_ingestion', default_args=default_args, description='Daily ingestion of Gitea repositories into knowledge base', schedule_interval=timedelta(days=1), # Run daily start_date=datetime(2024, 1, 1), catchup=False, tags=['gitea', 'ingestion', 'knowledge'], ) as dag: # Task 1: Fetch repository list fetch_repos_task = PythonOperator( task_id='fetch_repos', python_callable=fetch_gitea_repos, ) # Task 2: Fetch README content fetch_readmes_task = PythonOperator( task_id='fetch_readmes', python_callable=fetch_readmes, ) # Task 3: Ingest into ChromaDB ingest_task = PythonOperator( task_id='ingest_to_chroma', python_callable=ingest_to_chroma, ) # Define task dependencies fetch_repos_task >> fetch_readmes_task >> ingest_task