From e7db62e6bde1f23cc12dc6f6e6fafc60d5d66ad1 Mon Sep 17 00:00:00 2001 From: Nolan Tremelling <34580718+NolanTrem@users.noreply.github.com> Date: Mon, 24 Mar 2025 16:06:18 -0700 Subject: [PATCH] Add Scheduler and Postgres Vacuum (#2089) * Add scheduler and vacuum * Lint * Refactor test workflow, add mock scheduler to tests * 0 3 * * * * Missing quote in toml * Add maintenance service mock --- .../r2r-light-py-integration-tests.yml | 100 +++++++++++++-- py/core/__init__.py | 1 + py/core/base/providers/__init__.py | 4 + py/core/base/providers/database.py | 9 ++ py/core/base/providers/scheduler.py | 39 ++++++ py/core/main/__init__.py | 1 + py/core/main/abstractions.py | 16 ++- py/core/main/assembly/builder.py | 12 +- py/core/main/assembly/factory.py | 21 ++++ py/core/main/config.py | 9 +- py/core/main/services/__init__.py | 2 + py/core/main/services/maintenance_service.py | 118 ++++++++++++++++++ py/core/providers/__init__.py | 11 +- py/core/providers/database/maintenance.py | 95 ++++++++++++++ py/core/providers/database/postgres.py | 8 +- py/core/providers/scheduler/__init__.py | 3 + py/core/providers/scheduler/apscheduler.py | 37 ++++++ py/r2r/r2r.toml | 6 + py/tests/unit/test_routes.py | 20 +-- 19 files changed, 481 insertions(+), 31 deletions(-) create mode 100644 py/core/base/providers/scheduler.py create mode 100644 py/core/main/services/maintenance_service.py create mode 100644 py/core/providers/database/maintenance.py create mode 100644 py/core/providers/scheduler/__init__.py create mode 100644 py/core/providers/scheduler/apscheduler.py diff --git a/.github/workflows/r2r-light-py-integration-tests.yml b/.github/workflows/r2r-light-py-integration-tests.yml index be873d036..c8ae5362b 100644 --- a/.github/workflows/r2r-light-py-integration-tests.yml +++ b/.github/workflows/r2r-light-py-integration-tests.yml @@ -47,7 +47,7 @@ jobs: echo "::error::Package installation or import test failed." exit 1 - integration-test: + integration-test-azure-openai: needs: package-install-test runs-on: ubuntu-latest timeout-minutes: 20 @@ -121,14 +121,6 @@ jobs: --log-cli-level=INFO \ --junit-xml=test-results/junit.xml \ --html=test-results/report.html - - name: Run R2R Full Python Integration Test - run: | - cd py && uv run pytest tests/integration \ - --verbose \ - --capture=no \ - --log-cli-level=INFO \ - --junit-xml=test-results/junit.xml \ - --html=test-results/report.html - name: Upload test results if: always() @@ -238,3 +230,93 @@ jobs: run: | echo "::error::Gemini integration tests failed. Check the test results artifact for details." exit 1 + + integration-test-azure-openai-full: + needs: integration-test-azure-openai + runs-on: ubuntu-latest + timeout-minutes: 20 + + env: + OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }} + GEMINI_API_KEY: ${{ secrets.GEMINI_API_KEY }} + AZURE_API_KEY: ${{ secrets.AZURE_API_KEY }} + AZURE_API_BASE: ${{ secrets.AZURE_API_BASE }} + AZURE_API_VERSION: ${{ secrets.AZURE_API_VERSION }} + TELEMETRY_ENABLED: 'false' + R2R_POSTGRES_HOST: localhost + R2R_POSTGRES_DBNAME: postgres + R2R_POSTGRES_PORT: '5432' + R2R_POSTGRES_PASSWORD: postgres + R2R_POSTGRES_USER: postgres + R2R_PROJECT_NAME: r2r_default + PYTHONUNBUFFERED: '1' + PYTEST_ADDOPTS: '--color=yes' + + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Install Poppler + run: | + sudo apt-get update + sudo apt-get install -y poppler-utils + + - name: Set up Python and install dependencies + uses: ./.github/actions/setup-python-light + with: + os: ubuntu-latest + python-version: '3.12' + r2r-version: 'latest' + + - name: Setup and start PostgreSQL + uses: ./.github/actions/setup-postgres-ext + with: + os: ubuntu-latest + + - name: Verify PostgreSQL and Vector Extension + run: | + pg_isready -h localhost -p 5432 + sudo -u postgres psql -c "\dx vector;" + + - name: Start R2R Light server + uses: ./.github/actions/start-r2r-light + id: start-server + + - name: Wait for server to be ready + run: | + timeout=300 # 5 minutes timeout + while ! curl -s http://localhost:7272/health > /dev/null; do + if [ $timeout -le 0 ]; then + echo "Server failed to start within timeout" + exit 1 + fi + echo "Waiting for server to be ready..." + sleep 5 + timeout=$((timeout - 5)) + done + + - name: Run R2R Full Python Integration Test + run: | + cd py && uv run pytest tests/integration \ + --verbose \ + --capture=no \ + --log-cli-level=INFO \ + --junit-xml=test-results/junit.xml \ + --html=test-results/report.html + + - name: Upload test results + if: always() + uses: actions/upload-artifact@v4 + with: + name: test-results + path: | + test-results/ + pytest-logs/ + + - name: Check for test failures + if: failure() + run: | + echo "::error::Integration tests failed. Check the test results artifact for details." + exit 1 diff --git a/py/core/__init__.py b/py/core/__init__.py index 5d7377b92..cbe29abbc 100644 --- a/py/core/__init__.py +++ b/py/core/__init__.py @@ -121,6 +121,7 @@ __all__ = [ "R2RProviderFactory", "AuthService", "IngestionService", + "MaintenanceService", "ManagementService", "RetrievalService", "GraphService", diff --git a/py/core/base/providers/__init__.py b/py/core/base/providers/__init__.py index b902944d9..bcc380252 100644 --- a/py/core/base/providers/__init__.py +++ b/py/core/base/providers/__init__.py @@ -19,6 +19,7 @@ from .ingestion import ( ) from .llm import CompletionConfig, CompletionProvider from .orchestration import OrchestrationConfig, OrchestrationProvider, Workflow +from .scheduler import SchedulerConfig, SchedulerProvider __all__ = [ # Auth provider @@ -56,4 +57,7 @@ __all__ = [ "OrchestrationConfig", "OrchestrationProvider", "Workflow", + # Scheduler provider + "SchedulerConfig", + "SchedulerProvider", ] diff --git a/py/core/base/providers/database.py b/py/core/base/providers/database.py index 845a81093..2e0f80d1b 100644 --- a/py/core/base/providers/database.py +++ b/py/core/base/providers/database.py @@ -114,6 +114,12 @@ class LimitSettings(BaseModel): ) +class MaintenanceSettings(BaseModel): + vacuum_schedule: str = "0 3 * * *" # Run at 3 AM every day by default + vacuum_analyze: bool = True + vacuum_full: bool = False + + class DatabaseConfig(ProviderConfig): """A base database configuration class.""" @@ -146,6 +152,9 @@ class DatabaseConfig(ProviderConfig): limits: LimitSettings = LimitSettings( global_per_min=60, route_per_min=20, monthly_limit=10000 ) + + # Maintenance settings + maintenance: MaintenanceSettings = MaintenanceSettings() route_limits: dict[str, LimitSettings] = {} user_limits: dict[UUID, LimitSettings] = {} diff --git a/py/core/base/providers/scheduler.py b/py/core/base/providers/scheduler.py new file mode 100644 index 000000000..49d43c714 --- /dev/null +++ b/py/core/base/providers/scheduler.py @@ -0,0 +1,39 @@ +from abc import abstractmethod + +from .base import Provider, ProviderConfig + + +class SchedulerConfig(ProviderConfig): + """Configuration for scheduler provider""" + + provider: str = "apscheduler" + + def validate_config(self): + if self.provider not in self.supported_providers: + raise ValueError( + f"Scheduler provider {self.provider} is not supported." + ) + + @property + def supported_providers(self) -> list[str]: + return ["apscheduler"] + + +class SchedulerProvider(Provider): + """Base class for scheduler providers""" + + def __init__(self, config: SchedulerConfig): + super().__init__(config) + self.config = config + + @abstractmethod + async def add_job(self, func, trigger, **kwargs): + pass + + @abstractmethod + async def start(self): + pass + + @abstractmethod + async def shutdown(self): + pass diff --git a/py/core/main/__init__.py b/py/core/main/__init__.py index 7043d0290..d1e21b49e 100644 --- a/py/core/main/__init__.py +++ b/py/core/main/__init__.py @@ -18,6 +18,7 @@ __all__ = [ ## R2R SERVICES "AuthService", "IngestionService", + "MaintenanceService", "ManagementService", "RetrievalService", "GraphService", diff --git a/py/core/main/abstractions.py b/py/core/main/abstractions.py index 3aaf2dbfc..02af1d6d3 100644 --- a/py/core/main/abstractions.py +++ b/py/core/main/abstractions.py @@ -5,6 +5,7 @@ from pydantic import BaseModel from core.providers import ( AnthropicCompletionProvider, + APSchedulerProvider, AsyncSMTPEmailProvider, ClerkAuthProvider, ConsoleMockEmailProvider, @@ -30,6 +31,7 @@ if TYPE_CHECKING: from core.main.services.auth_service import AuthService from core.main.services.graph_service import GraphService from core.main.services.ingestion_service import IngestionService + from core.main.services.maintenance_service import MaintenanceService from core.main.services.management_service import ManagementService from core.main.services.retrieval_service import ( # type: ignore RetrievalService, # type: ignore @@ -45,6 +47,12 @@ class R2RProviders(BaseModel): ) database: PostgresDatabaseProvider ingestion: R2RIngestionProvider | UnstructuredIngestionProvider + email: ( + AsyncSMTPEmailProvider + | ConsoleMockEmailProvider + | SendGridEmailProvider + | MailerSendEmailProvider + ) embedding: ( LiteLLMEmbeddingProvider | OpenAIEmbeddingProvider @@ -62,12 +70,7 @@ class R2RProviders(BaseModel): | R2RCompletionProvider ) orchestration: HatchetOrchestrationProvider | SimpleOrchestrationProvider - email: ( - AsyncSMTPEmailProvider - | ConsoleMockEmailProvider - | SendGridEmailProvider - | MailerSendEmailProvider - ) + scheduler: APSchedulerProvider class Config: arbitrary_types_allowed = True @@ -77,6 +80,7 @@ class R2RProviders(BaseModel): class R2RServices: auth: "AuthService" ingestion: "IngestionService" + maintenance: "MaintenanceService" management: "ManagementService" retrieval: "RetrievalService" graph: "GraphService" diff --git a/py/core/main/assembly/builder.py b/py/core/main/assembly/builder.py index f72a15c94..8af514fc5 100644 --- a/py/core/main/assembly/builder.py +++ b/py/core/main/assembly/builder.py @@ -17,6 +17,7 @@ from ..config import R2RConfig from ..services.auth_service import AuthService # noqa: F401 from ..services.graph_service import GraphService # noqa: F401 from ..services.ingestion_service import IngestionService # noqa: F401 +from ..services.maintenance_service import MaintenanceService # noqa: F401 from ..services.management_service import ManagementService # noqa: F401 from ..services.retrieval_service import ( # type: ignore RetrievalService, # noqa: F401 # type: ignore @@ -27,7 +28,14 @@ logger = logging.getLogger() class R2RBuilder: - _SERVICES = ["auth", "ingestion", "management", "retrieval", "graph"] + _SERVICES = [ + "auth", + "ingestion", + "maintenance", + "management", + "retrieval", + "graph", + ] def __init__(self, config: R2RConfig): self.config = config @@ -50,6 +58,8 @@ class R2RBuilder: services = self._create_services(service_params) + await services.maintenance.initialize() + routers = { "chunks_router": ChunksRouter( providers=providers, diff --git a/py/core/main/assembly/factory.py b/py/core/main/assembly/factory.py index b982aa182..2fb138da7 100644 --- a/py/core/main/assembly/factory.py +++ b/py/core/main/assembly/factory.py @@ -14,9 +14,11 @@ from core.base import ( EmbeddingProvider, IngestionConfig, OrchestrationConfig, + SchedulerConfig, ) from core.providers import ( AnthropicCompletionProvider, + APSchedulerProvider, AsyncSMTPEmailProvider, BcryptCryptoConfig, BCryptCryptoProvider, @@ -294,6 +296,18 @@ class R2RProviderFactory: f"Email provider {email_config.provider} not supported." ) + @staticmethod + async def create_scheduler_provider( + scheduler_config: SchedulerConfig, *args, **kwargs + ) -> APSchedulerProvider: + """Creates a scheduler provider based on configuration.""" + if scheduler_config.provider == "apscheduler": + return APSchedulerProvider(scheduler_config) + else: + raise ValueError( + f"Scheduler provider {scheduler_config.provider} not supported." + ) + async def create_providers( self, auth_provider_override: Optional[ @@ -324,6 +338,7 @@ class R2RProviderFactory: | R2RCompletionProvider ] = None, orchestration_provider_override: Optional[Any] = None, + scheduler_provider_override: Optional[APSchedulerProvider] = None, *args, **kwargs, ) -> R2RProviders: @@ -405,6 +420,11 @@ class R2RProviderFactory: or self.create_orchestration_provider(self.config.orchestration) ) + scheduler_provider = ( + scheduler_provider_override + or await self.create_scheduler_provider(self.config.scheduler) + ) + return R2RProviders( auth=auth_provider, database=database_provider, @@ -414,4 +434,5 @@ class R2RProviderFactory: llm=llm_provider, email=email_provider, orchestration=orchestration_provider, + scheduler=scheduler_provider, ) diff --git a/py/core/main/config.py b/py/core/main/config.py index f49b4041c..ed043dd3c 100644 --- a/py/core/main/config.py +++ b/py/core/main/config.py @@ -18,6 +18,7 @@ from ..base.providers.embedding import EmbeddingConfig from ..base.providers.ingestion import IngestionConfig from ..base.providers.llm import CompletionConfig from ..base.providers.orchestration import OrchestrationConfig +from ..base.providers.scheduler import SchedulerConfig from ..base.utils import deep_update logger = logging.getLogger() @@ -64,6 +65,7 @@ class R2RConfig: "database": ["provider"], "agent": ["generation_config"], "orchestration": ["provider"], + "scheduler": ["provider"], } app: AppConfig @@ -77,11 +79,11 @@ class R2RConfig: ingestion: IngestionConfig agent: RAGAgentConfig orchestration: OrchestrationConfig + scheduler: SchedulerConfig def __init__(self, config_data: dict[str, Any]): """ :param config_data: dictionary of configuration parameters - :param base_path: base path when a relative path is specified for the prompts directory """ # Load the default configuration default_config = self.load_default_config() @@ -120,13 +122,14 @@ class R2RConfig: self.orchestration = OrchestrationConfig.create( **self.orchestration, app=self.app ) # type: ignore + self.scheduler = SchedulerConfig.create(**self.scheduler, app=self.app) # type: ignore - IngestionConfig.set_default(**self.ingestion.dict()) + IngestionConfig.set_default(**self.ingestion.model_dump()) # override GenerationConfig defaults if self.completion.generation_config: GenerationConfig.set_default( - **self.completion.generation_config.dict() + **self.completion.generation_config.model_dump() ) def _validate_config_section( diff --git a/py/core/main/services/__init__.py b/py/core/main/services/__init__.py index e6a6dec0e..b2637f857 100644 --- a/py/core/main/services/__init__.py +++ b/py/core/main/services/__init__.py @@ -1,6 +1,7 @@ from .auth_service import AuthService from .graph_service import GraphService from .ingestion_service import IngestionService, IngestionServiceAdapter +from .maintenance_service import MaintenanceService from .management_service import ManagementService from .retrieval_service import RetrievalService # type: ignore @@ -8,6 +9,7 @@ __all__ = [ "AuthService", "IngestionService", "IngestionServiceAdapter", + "MaintenanceService", "ManagementService", "GraphService", "RetrievalService", diff --git a/py/core/main/services/maintenance_service.py b/py/core/main/services/maintenance_service.py new file mode 100644 index 000000000..fce8bb0c3 --- /dev/null +++ b/py/core/main/services/maintenance_service.py @@ -0,0 +1,118 @@ +import logging +from datetime import datetime +from typing import Any + +from ..abstractions import R2RProviders +from ..config import R2RConfig +from .base import Service + +logger = logging.getLogger(__name__) + + +class MaintenanceService(Service): + def __init__( + self, + config: R2RConfig, + providers: R2RProviders, + ): + super().__init__( + config, + providers, + ) + self.scheduled_jobs: list[Any] = [] + + async def initialize(self): + """Initialize and schedule maintenance tasks from configuration""" + logger.info("Initializing database maintenance service") + await self.providers.scheduler.start() + + maintenance_config = self.config.database.maintenance + + # Parse the cron schedule + schedule_parts = self._parse_cron_schedule( + maintenance_config.vacuum_schedule + ) + + # Schedule the vacuum job + job = await self.providers.scheduler.add_job( + self.vacuum_database, + trigger="cron", + **schedule_parts, + kwargs={ + "full": maintenance_config.vacuum_full, + "analyze": maintenance_config.vacuum_analyze, + }, + ) + + self.scheduled_jobs.append(job) + + def _parse_cron_schedule(self, cron_schedule: str) -> dict: + """Parse a cron schedule string into kwargs for APScheduler""" + parts = cron_schedule.split() + + # Handle both 5-part and 6-part cron expressions + if len(parts) == 6: + # With seconds field + second, minute, hour, day, month, day_of_week = parts + return { + "second": second, + "minute": minute, + "hour": hour, + "day": day, + "month": month, + "day_of_week": day_of_week, + } + elif len(parts) == 5: + # Standard cron (no seconds) + minute, hour, day, month, day_of_week = parts + return { + "minute": minute, + "hour": hour, + "day": day, + "month": month, + "day_of_week": day_of_week, + } + else: + logger.warning( + f"Invalid cron format: {cron_schedule}. Using defaults." + ) + return {"hour": 3, "minute": 0} + + async def vacuum_database(self, full: bool = False, analyze: bool = True): + """Run vacuum on the entire database""" + start_time = datetime.now() + + try: + await ( + self.providers.database.maintenance_handler.vacuum_all_tables( + analyze=analyze, full=full + ) + ) + + duration = datetime.now() - start_time + logger.info( + f"Database vacuum completed successfully in {duration.total_seconds():.2f} seconds" + ) + except Exception as e: + logger.error(f"Database vacuum failed: {str(e)}") + + async def vacuum_table( + self, table_name: str, full: bool = False, analyze: bool = True + ): + """Run vacuum on a specific table""" + start_time = datetime.now() + logger.info( + f"Running vacuum on table {table_name} (full={full}, analyze={analyze})" + ) + + try: + await self.providers.database.maintenance_handler.vacuum_table( + table_name=table_name, analyze=analyze, full=full + ) + + duration = datetime.now() - start_time + logger.info( + f"Table vacuum completed successfully in {duration.total_seconds():.2f} seconds" + ) + except Exception as e: + logger.error(f"Table vacuum failed for {table_name}: {str(e)}") diff --git a/py/core/providers/__init__.py b/py/core/providers/__init__.py index 7cfa82ebc..7fae81217 100644 --- a/py/core/providers/__init__.py +++ b/py/core/providers/__init__.py @@ -38,6 +38,9 @@ from .orchestration import ( HatchetOrchestrationProvider, SimpleOrchestrationProvider, ) +from .scheduler import ( + APSchedulerProvider, +) __all__ = [ # Auth @@ -66,12 +69,14 @@ __all__ = [ "ConsoleMockEmailProvider", "SendGridEmailProvider", "MailerSendEmailProvider", - # Orchestration - "HatchetOrchestrationProvider", - "SimpleOrchestrationProvider", # LLM "AnthropicCompletionProvider", "OpenAICompletionProvider", "R2RCompletionProvider", "LiteLLMCompletionProvider", + # Orchestration + "HatchetOrchestrationProvider", + "SimpleOrchestrationProvider", + # Scheduler + "APSchedulerProvider", ] diff --git a/py/core/providers/database/maintenance.py b/py/core/providers/database/maintenance.py new file mode 100644 index 000000000..378d4a527 --- /dev/null +++ b/py/core/providers/database/maintenance.py @@ -0,0 +1,95 @@ +import logging + +from core.base import Handler + +from .base import PostgresConnectionManager + +logger = logging.getLogger(__name__) + + +class PostgresMaintenanceHandler(Handler): + def __init__( + self, + project_name: str, + connection_manager: PostgresConnectionManager, + ): + """ + Initialize the PostgresMaintenanceHandler with the given project name and connection manager. + + Args: + project_name (str): The name of the project. + connection_manager (PostgresConnectionManager): The connection manager to use. + """ + super().__init__(project_name, connection_manager) + + logger.debug( + f"Initialized PostgresMaintenanceHandler for project: {project_name}" + ) + + async def create_tables(self): + pass + + async def vacuum_table( + self, + table_name: str, + analyze: bool = False, + full: bool = False, + ): + """ + VACUUM reclaims storage occupied by dead tuples. In normal PostgreSQL operation, + tuples that are deleted or obsoleted by an update are not physically removed from + their table; they remain present until a VACUUM is done. + + Therefore it's necessary to do VACUUM periodically, especially on frequently-updated + tables. + + VACUUM ANALYZE performs a VACUUM and then an ANALYZE for each selected table. + + Plain VACUUM (without FULL) simply reclaims space and makes it available for re-use. + This form of the command can operate in parallel with normal reading and writing of the + table, as an exclusive lock is not obtained. However, extra space is not returned to + the operating system (in most cases); it's just kept available for re-use within the same + table. + + VACUUM FULL rewrites the entire contents of the table into a new disk file with no extra + space, allowing unused space to be returned to the operating system. This form is much + slower and requires an ACCESS EXCLUSIVE lock on each table while it is being processed. + + TODO: Implement VACUUM FULL + """ + + vacuum_query = "VACUUM" + if analyze: + vacuum_query += " ANALYZE" + if full: + logger.warning( + "VACUUM FULL not implemented yet. Running plain VACUUM instead." + ) + + try: + await self.connection_manager.execute_query( + f"{vacuum_query} {table_name}" + ) + except Exception as e: + logger.error(f"Error vacuuming table {table_name}: {str(e)}") + raise e + + async def vacuum_all_tables( + self, + analyze: bool = False, + full: bool = False, + ): + """Vacuum all tables in the database""" + + vacuum_query = "VACUUM" + if analyze: + vacuum_query += " ANALYZE" + if full: + logger.warning( + "VACUUM FULL not implemented yet. Running plain VACUUM instead." + ) + try: + await self.connection_manager.execute_query(vacuum_query) + except Exception as e: + logger.error(f"Error vacuuming all tables: {str(e)}") + raise e diff --git a/py/core/providers/database/postgres.py b/py/core/providers/database/postgres.py index acccc9c0b..ec58d1094 100644 --- a/py/core/providers/database/postgres.py +++ b/py/core/providers/database/postgres.py @@ -22,6 +22,7 @@ from .graphs import ( PostgresRelationshipsHandler, ) from .limits import PostgresLimitsHandler +from .maintenance import PostgresMaintenanceHandler from .prompts_handler import PostgresPromptsHandler from .tokens import PostgresTokensHandler from .users import PostgresUserHandler @@ -68,6 +69,7 @@ class PostgresDatabaseProvider(DatabaseProvider): files_handler: PostgresFilesHandler conversations_handler: PostgresConversationsHandler limits_handler: PostgresLimitsHandler + maintenance_handler: PostgresMaintenanceHandler def __init__( self, @@ -184,13 +186,16 @@ class PostgresDatabaseProvider(DatabaseProvider): dimension=self.dimension, quantization_type=self.quantization_type, ) + self.maintenance_handler = PostgresMaintenanceHandler( + project_name=self.project_name, + connection_manager=self.connection_manager, + ) self.prompts_handler = PostgresPromptsHandler( self.project_name, self.connection_manager ) self.files_handler = PostgresFilesHandler( self.project_name, self.connection_manager ) - self.limits_handler = PostgresLimitsHandler( project_name=self.project_name, connection_manager=self.connection_manager, @@ -229,6 +234,7 @@ class PostgresDatabaseProvider(DatabaseProvider): await self.relationships_handler.create_tables() await self.conversations_handler.create_tables() await self.limits_handler.create_tables() + await self.maintenance_handler.create_tables() def _get_postgres_configuration_settings( self, config: DatabaseConfig diff --git a/py/core/providers/scheduler/__init__.py b/py/core/providers/scheduler/__init__.py new file mode 100644 index 000000000..9039a70a1 --- /dev/null +++ b/py/core/providers/scheduler/__init__.py @@ -0,0 +1,3 @@ +from .apscheduler import APSchedulerProvider + +__all__ = ["APSchedulerProvider"] diff --git a/py/core/providers/scheduler/apscheduler.py b/py/core/providers/scheduler/apscheduler.py new file mode 100644 index 000000000..9b52185cf --- /dev/null +++ b/py/core/providers/scheduler/apscheduler.py @@ -0,0 +1,37 @@ +import logging + +from apscheduler.schedulers.asyncio import AsyncIOScheduler + +from core.base import SchedulerConfig, SchedulerProvider + +logger = logging.getLogger(__name__) + + +class APSchedulerProvider(SchedulerProvider): + """Implementation using APScheduler""" + + def __init__(self, config: SchedulerConfig): + super().__init__(config) + self.scheduler = AsyncIOScheduler() + + async def add_job(self, func, trigger, **kwargs): + logger.info( + f"Adding job {func.__name__} with trigger {trigger} and kwargs {kwargs}" + ) + self.scheduler.add_job(func, trigger, **kwargs) + + async def start(self): + self.scheduler.start() + logger.info("Scheduler started") + + async def shutdown(self): + if self.scheduler.running: + self.scheduler.shutdown() + logger.info("Scheduler shutdown") + + async def __aenter__(self): + await self.start() + return self + + async def __aexit__(self, exc_type, exc, tb): + await self.shutdown() diff --git a/py/r2r/r2r.toml b/py/r2r/r2r.toml index a07893ab8..53f63bbbd 100644 --- a/py/r2r/r2r.toml +++ b/py/r2r/r2r.toml @@ -72,6 +72,9 @@ collection_summary_prompt = "collection_summary" [database.graph_enrichment_settings] graph_communities_prompt = "graph_communities" + [database.maintenance] + vacuum_schedule = "0 3 * * *" # Run at 3:00 AM daily + [embedding] provider = "litellm" # For basic applications, use `openai/text-embedding-3-small` with `base_dimension = 512` @@ -120,3 +123,6 @@ provider = "simple" [email] provider = "console_mock" # `smtp` | `sendgrid` supported + +[scheduler] +provider = "apscheduler" diff --git a/py/tests/unit/test_routes.py b/py/tests/unit/test_routes.py index 5c3dd092a..b262807d8 100644 --- a/py/tests/unit/test_routes.py +++ b/py/tests/unit/test_routes.py @@ -25,6 +25,7 @@ from core.providers.embeddings import OpenAIEmbeddingProvider from core.providers.ingestion import R2RIngestionProvider from core.providers.llm import OpenAICompletionProvider from core.providers.orchestration import SimpleOrchestrationProvider +from core.providers.scheduler import APSchedulerProvider ROUTERS = [ UsersRouter, @@ -60,31 +61,34 @@ def mock_providers(): mock_orchestration.config = Mock() mock_email = create_autospec(ConsoleMockEmailProvider) mock_email.config = Mock() + mock_scheduler = create_autospec(APSchedulerProvider) + mock_scheduler.config = Mock() # Set up any needed methods mock_auth.auth_wrapper = Mock(return_value=lambda: None) - providers = R2RProviders( + return R2RProviders( auth=mock_auth, - database=mock_db, - ingestion=mock_ingestion, - embedding=mock_embedding, completion_embedding=mock_completion_embedding, + database=mock_db, + email=mock_email, + embedding=mock_embedding, + ingestion=mock_ingestion, llm=mock_llm, orchestration=mock_orchestration, - email=mock_email, + scheduler=mock_scheduler, ) - return providers @pytest.fixture def mock_services(): return R2RServices( - management=Mock(), auth=Mock(), ingestion=Mock(), - retrieval=Mock(), graph=Mock(), + maintenance=Mock(), + management=Mock(), + retrieval=Mock(), )