Add Scheduler and Postgres Vacuum (#2089)

* Add scheduler and vacuum

* Lint

* Refactor test workflow, add mock scheduler to tests

* 0 3 * * *

* Missing quote in toml

* Add maintenance service mock
This commit is contained in:
Nolan Tremelling
2025-03-24 16:06:18 -07:00
committed by GitHub
parent 648beb228f
commit e7db62e6bd
19 changed files with 481 additions and 31 deletions
@@ -47,7 +47,7 @@ jobs:
echo "::error::Package installation or import test failed."
exit 1
integration-test:
integration-test-azure-openai:
needs: package-install-test
runs-on: ubuntu-latest
timeout-minutes: 20
@@ -121,14 +121,6 @@ jobs:
--log-cli-level=INFO \
--junit-xml=test-results/junit.xml \
--html=test-results/report.html
- name: Run R2R Full Python Integration Test
run: |
cd py && uv run pytest tests/integration \
--verbose \
--capture=no \
--log-cli-level=INFO \
--junit-xml=test-results/junit.xml \
--html=test-results/report.html
- name: Upload test results
if: always()
@@ -238,3 +230,93 @@ jobs:
run: |
echo "::error::Gemini integration tests failed. Check the test results artifact for details."
exit 1
integration-test-azure-openai-full:
needs: integration-test-azure-openai
runs-on: ubuntu-latest
timeout-minutes: 20
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
GEMINI_API_KEY: ${{ secrets.GEMINI_API_KEY }}
AZURE_API_KEY: ${{ secrets.AZURE_API_KEY }}
AZURE_API_BASE: ${{ secrets.AZURE_API_BASE }}
AZURE_API_VERSION: ${{ secrets.AZURE_API_VERSION }}
TELEMETRY_ENABLED: 'false'
R2R_POSTGRES_HOST: localhost
R2R_POSTGRES_DBNAME: postgres
R2R_POSTGRES_PORT: '5432'
R2R_POSTGRES_PASSWORD: postgres
R2R_POSTGRES_USER: postgres
R2R_PROJECT_NAME: r2r_default
PYTHONUNBUFFERED: '1'
PYTEST_ADDOPTS: '--color=yes'
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Install Poppler
run: |
sudo apt-get update
sudo apt-get install -y poppler-utils
- name: Set up Python and install dependencies
uses: ./.github/actions/setup-python-light
with:
os: ubuntu-latest
python-version: '3.12'
r2r-version: 'latest'
- name: Setup and start PostgreSQL
uses: ./.github/actions/setup-postgres-ext
with:
os: ubuntu-latest
- name: Verify PostgreSQL and Vector Extension
run: |
pg_isready -h localhost -p 5432
sudo -u postgres psql -c "\dx vector;"
- name: Start R2R Light server
uses: ./.github/actions/start-r2r-light
id: start-server
- name: Wait for server to be ready
run: |
timeout=300 # 5 minutes timeout
while ! curl -s http://localhost:7272/health > /dev/null; do
if [ $timeout -le 0 ]; then
echo "Server failed to start within timeout"
exit 1
fi
echo "Waiting for server to be ready..."
sleep 5
timeout=$((timeout - 5))
done
- name: Run R2R Full Python Integration Test
run: |
cd py && uv run pytest tests/integration \
--verbose \
--capture=no \
--log-cli-level=INFO \
--junit-xml=test-results/junit.xml \
--html=test-results/report.html
- name: Upload test results
if: always()
uses: actions/upload-artifact@v4
with:
name: test-results
path: |
test-results/
pytest-logs/
- name: Check for test failures
if: failure()
run: |
echo "::error::Integration tests failed. Check the test results artifact for details."
exit 1
+1
View File
@@ -121,6 +121,7 @@ __all__ = [
"R2RProviderFactory",
"AuthService",
"IngestionService",
"MaintenanceService",
"ManagementService",
"RetrievalService",
"GraphService",
+4
View File
@@ -19,6 +19,7 @@ from .ingestion import (
)
from .llm import CompletionConfig, CompletionProvider
from .orchestration import OrchestrationConfig, OrchestrationProvider, Workflow
from .scheduler import SchedulerConfig, SchedulerProvider
__all__ = [
# Auth provider
@@ -56,4 +57,7 @@ __all__ = [
"OrchestrationConfig",
"OrchestrationProvider",
"Workflow",
# Scheduler provider
"SchedulerConfig",
"SchedulerProvider",
]
+9
View File
@@ -114,6 +114,12 @@ class LimitSettings(BaseModel):
)
class MaintenanceSettings(BaseModel):
vacuum_schedule: str = "0 3 * * *" # Run at 3 AM every day by default
vacuum_analyze: bool = True
vacuum_full: bool = False
class DatabaseConfig(ProviderConfig):
"""A base database configuration class."""
@@ -146,6 +152,9 @@ class DatabaseConfig(ProviderConfig):
limits: LimitSettings = LimitSettings(
global_per_min=60, route_per_min=20, monthly_limit=10000
)
# Maintenance settings
maintenance: MaintenanceSettings = MaintenanceSettings()
route_limits: dict[str, LimitSettings] = {}
user_limits: dict[UUID, LimitSettings] = {}
+39
View File
@@ -0,0 +1,39 @@
from abc import abstractmethod
from .base import Provider, ProviderConfig
class SchedulerConfig(ProviderConfig):
"""Configuration for scheduler provider"""
provider: str = "apscheduler"
def validate_config(self):
if self.provider not in self.supported_providers:
raise ValueError(
f"Scheduler provider {self.provider} is not supported."
)
@property
def supported_providers(self) -> list[str]:
return ["apscheduler"]
class SchedulerProvider(Provider):
"""Base class for scheduler providers"""
def __init__(self, config: SchedulerConfig):
super().__init__(config)
self.config = config
@abstractmethod
async def add_job(self, func, trigger, **kwargs):
pass
@abstractmethod
async def start(self):
pass
@abstractmethod
async def shutdown(self):
pass
+1
View File
@@ -18,6 +18,7 @@ __all__ = [
## R2R SERVICES
"AuthService",
"IngestionService",
"MaintenanceService",
"ManagementService",
"RetrievalService",
"GraphService",
+10 -6
View File
@@ -5,6 +5,7 @@ from pydantic import BaseModel
from core.providers import (
AnthropicCompletionProvider,
APSchedulerProvider,
AsyncSMTPEmailProvider,
ClerkAuthProvider,
ConsoleMockEmailProvider,
@@ -30,6 +31,7 @@ if TYPE_CHECKING:
from core.main.services.auth_service import AuthService
from core.main.services.graph_service import GraphService
from core.main.services.ingestion_service import IngestionService
from core.main.services.maintenance_service import MaintenanceService
from core.main.services.management_service import ManagementService
from core.main.services.retrieval_service import ( # type: ignore
RetrievalService, # type: ignore
@@ -45,6 +47,12 @@ class R2RProviders(BaseModel):
)
database: PostgresDatabaseProvider
ingestion: R2RIngestionProvider | UnstructuredIngestionProvider
email: (
AsyncSMTPEmailProvider
| ConsoleMockEmailProvider
| SendGridEmailProvider
| MailerSendEmailProvider
)
embedding: (
LiteLLMEmbeddingProvider
| OpenAIEmbeddingProvider
@@ -62,12 +70,7 @@ class R2RProviders(BaseModel):
| R2RCompletionProvider
)
orchestration: HatchetOrchestrationProvider | SimpleOrchestrationProvider
email: (
AsyncSMTPEmailProvider
| ConsoleMockEmailProvider
| SendGridEmailProvider
| MailerSendEmailProvider
)
scheduler: APSchedulerProvider
class Config:
arbitrary_types_allowed = True
@@ -77,6 +80,7 @@ class R2RProviders(BaseModel):
class R2RServices:
auth: "AuthService"
ingestion: "IngestionService"
maintenance: "MaintenanceService"
management: "ManagementService"
retrieval: "RetrievalService"
graph: "GraphService"
+11 -1
View File
@@ -17,6 +17,7 @@ from ..config import R2RConfig
from ..services.auth_service import AuthService # noqa: F401
from ..services.graph_service import GraphService # noqa: F401
from ..services.ingestion_service import IngestionService # noqa: F401
from ..services.maintenance_service import MaintenanceService # noqa: F401
from ..services.management_service import ManagementService # noqa: F401
from ..services.retrieval_service import ( # type: ignore
RetrievalService, # noqa: F401 # type: ignore
@@ -27,7 +28,14 @@ logger = logging.getLogger()
class R2RBuilder:
_SERVICES = ["auth", "ingestion", "management", "retrieval", "graph"]
_SERVICES = [
"auth",
"ingestion",
"maintenance",
"management",
"retrieval",
"graph",
]
def __init__(self, config: R2RConfig):
self.config = config
@@ -50,6 +58,8 @@ class R2RBuilder:
services = self._create_services(service_params)
await services.maintenance.initialize()
routers = {
"chunks_router": ChunksRouter(
providers=providers,
+21
View File
@@ -14,9 +14,11 @@ from core.base import (
EmbeddingProvider,
IngestionConfig,
OrchestrationConfig,
SchedulerConfig,
)
from core.providers import (
AnthropicCompletionProvider,
APSchedulerProvider,
AsyncSMTPEmailProvider,
BcryptCryptoConfig,
BCryptCryptoProvider,
@@ -294,6 +296,18 @@ class R2RProviderFactory:
f"Email provider {email_config.provider} not supported."
)
@staticmethod
async def create_scheduler_provider(
scheduler_config: SchedulerConfig, *args, **kwargs
) -> APSchedulerProvider:
"""Creates a scheduler provider based on configuration."""
if scheduler_config.provider == "apscheduler":
return APSchedulerProvider(scheduler_config)
else:
raise ValueError(
f"Scheduler provider {scheduler_config.provider} not supported."
)
async def create_providers(
self,
auth_provider_override: Optional[
@@ -324,6 +338,7 @@ class R2RProviderFactory:
| R2RCompletionProvider
] = None,
orchestration_provider_override: Optional[Any] = None,
scheduler_provider_override: Optional[APSchedulerProvider] = None,
*args,
**kwargs,
) -> R2RProviders:
@@ -405,6 +420,11 @@ class R2RProviderFactory:
or self.create_orchestration_provider(self.config.orchestration)
)
scheduler_provider = (
scheduler_provider_override
or await self.create_scheduler_provider(self.config.scheduler)
)
return R2RProviders(
auth=auth_provider,
database=database_provider,
@@ -414,4 +434,5 @@ class R2RProviderFactory:
llm=llm_provider,
email=email_provider,
orchestration=orchestration_provider,
scheduler=scheduler_provider,
)
+6 -3
View File
@@ -18,6 +18,7 @@ from ..base.providers.embedding import EmbeddingConfig
from ..base.providers.ingestion import IngestionConfig
from ..base.providers.llm import CompletionConfig
from ..base.providers.orchestration import OrchestrationConfig
from ..base.providers.scheduler import SchedulerConfig
from ..base.utils import deep_update
logger = logging.getLogger()
@@ -64,6 +65,7 @@ class R2RConfig:
"database": ["provider"],
"agent": ["generation_config"],
"orchestration": ["provider"],
"scheduler": ["provider"],
}
app: AppConfig
@@ -77,11 +79,11 @@ class R2RConfig:
ingestion: IngestionConfig
agent: RAGAgentConfig
orchestration: OrchestrationConfig
scheduler: SchedulerConfig
def __init__(self, config_data: dict[str, Any]):
"""
:param config_data: dictionary of configuration parameters
:param base_path: base path when a relative path is specified for the prompts directory
"""
# Load the default configuration
default_config = self.load_default_config()
@@ -120,13 +122,14 @@ class R2RConfig:
self.orchestration = OrchestrationConfig.create(
**self.orchestration, app=self.app
) # type: ignore
self.scheduler = SchedulerConfig.create(**self.scheduler, app=self.app) # type: ignore
IngestionConfig.set_default(**self.ingestion.dict())
IngestionConfig.set_default(**self.ingestion.model_dump())
# override GenerationConfig defaults
if self.completion.generation_config:
GenerationConfig.set_default(
**self.completion.generation_config.dict()
**self.completion.generation_config.model_dump()
)
def _validate_config_section(
+2
View File
@@ -1,6 +1,7 @@
from .auth_service import AuthService
from .graph_service import GraphService
from .ingestion_service import IngestionService, IngestionServiceAdapter
from .maintenance_service import MaintenanceService
from .management_service import ManagementService
from .retrieval_service import RetrievalService # type: ignore
@@ -8,6 +9,7 @@ __all__ = [
"AuthService",
"IngestionService",
"IngestionServiceAdapter",
"MaintenanceService",
"ManagementService",
"GraphService",
"RetrievalService",
@@ -0,0 +1,118 @@
import logging
from datetime import datetime
from typing import Any
from ..abstractions import R2RProviders
from ..config import R2RConfig
from .base import Service
logger = logging.getLogger(__name__)
class MaintenanceService(Service):
def __init__(
self,
config: R2RConfig,
providers: R2RProviders,
):
super().__init__(
config,
providers,
)
self.scheduled_jobs: list[Any] = []
async def initialize(self):
"""Initialize and schedule maintenance tasks from configuration"""
logger.info("Initializing database maintenance service")
await self.providers.scheduler.start()
maintenance_config = self.config.database.maintenance
# Parse the cron schedule
schedule_parts = self._parse_cron_schedule(
maintenance_config.vacuum_schedule
)
# Schedule the vacuum job
job = await self.providers.scheduler.add_job(
self.vacuum_database,
trigger="cron",
**schedule_parts,
kwargs={
"full": maintenance_config.vacuum_full,
"analyze": maintenance_config.vacuum_analyze,
},
)
self.scheduled_jobs.append(job)
def _parse_cron_schedule(self, cron_schedule: str) -> dict:
"""Parse a cron schedule string into kwargs for APScheduler"""
parts = cron_schedule.split()
# Handle both 5-part and 6-part cron expressions
if len(parts) == 6:
# With seconds field
second, minute, hour, day, month, day_of_week = parts
return {
"second": second,
"minute": minute,
"hour": hour,
"day": day,
"month": month,
"day_of_week": day_of_week,
}
elif len(parts) == 5:
# Standard cron (no seconds)
minute, hour, day, month, day_of_week = parts
return {
"minute": minute,
"hour": hour,
"day": day,
"month": month,
"day_of_week": day_of_week,
}
else:
logger.warning(
f"Invalid cron format: {cron_schedule}. Using defaults."
)
return {"hour": 3, "minute": 0}
async def vacuum_database(self, full: bool = False, analyze: bool = True):
"""Run vacuum on the entire database"""
start_time = datetime.now()
try:
await (
self.providers.database.maintenance_handler.vacuum_all_tables(
analyze=analyze, full=full
)
)
duration = datetime.now() - start_time
logger.info(
f"Database vacuum completed successfully in {duration.total_seconds():.2f} seconds"
)
except Exception as e:
logger.error(f"Database vacuum failed: {str(e)}")
async def vacuum_table(
self, table_name: str, full: bool = False, analyze: bool = True
):
"""Run vacuum on a specific table"""
start_time = datetime.now()
logger.info(
f"Running vacuum on table {table_name} (full={full}, analyze={analyze})"
)
try:
await self.providers.database.maintenance_handler.vacuum_table(
table_name=table_name, analyze=analyze, full=full
)
duration = datetime.now() - start_time
logger.info(
f"Table vacuum completed successfully in {duration.total_seconds():.2f} seconds"
)
except Exception as e:
logger.error(f"Table vacuum failed for {table_name}: {str(e)}")
+8 -3
View File
@@ -38,6 +38,9 @@ from .orchestration import (
HatchetOrchestrationProvider,
SimpleOrchestrationProvider,
)
from .scheduler import (
APSchedulerProvider,
)
__all__ = [
# Auth
@@ -66,12 +69,14 @@ __all__ = [
"ConsoleMockEmailProvider",
"SendGridEmailProvider",
"MailerSendEmailProvider",
# Orchestration
"HatchetOrchestrationProvider",
"SimpleOrchestrationProvider",
# LLM
"AnthropicCompletionProvider",
"OpenAICompletionProvider",
"R2RCompletionProvider",
"LiteLLMCompletionProvider",
# Orchestration
"HatchetOrchestrationProvider",
"SimpleOrchestrationProvider",
# Scheduler
"APSchedulerProvider",
]
+95
View File
@@ -0,0 +1,95 @@
import logging
from core.base import Handler
from .base import PostgresConnectionManager
logger = logging.getLogger(__name__)
class PostgresMaintenanceHandler(Handler):
def __init__(
self,
project_name: str,
connection_manager: PostgresConnectionManager,
):
"""
Initialize the PostgresMaintenanceHandler with the given project name and connection manager.
Args:
project_name (str): The name of the project.
connection_manager (PostgresConnectionManager): The connection manager to use.
"""
super().__init__(project_name, connection_manager)
logger.debug(
f"Initialized PostgresMaintenanceHandler for project: {project_name}"
)
async def create_tables(self):
pass
async def vacuum_table(
self,
table_name: str,
analyze: bool = False,
full: bool = False,
):
"""
VACUUM reclaims storage occupied by dead tuples. In normal PostgreSQL operation,
tuples that are deleted or obsoleted by an update are not physically removed from
their table; they remain present until a VACUUM is done.
Therefore it's necessary to do VACUUM periodically, especially on frequently-updated
tables.
VACUUM ANALYZE performs a VACUUM and then an ANALYZE for each selected table.
Plain VACUUM (without FULL) simply reclaims space and makes it available for re-use.
This form of the command can operate in parallel with normal reading and writing of the
table, as an exclusive lock is not obtained. However, extra space is not returned to
the operating system (in most cases); it's just kept available for re-use within the same
table.
VACUUM FULL rewrites the entire contents of the table into a new disk file with no extra
space, allowing unused space to be returned to the operating system. This form is much
slower and requires an ACCESS EXCLUSIVE lock on each table while it is being processed.
TODO: Implement VACUUM FULL
"""
vacuum_query = "VACUUM"
if analyze:
vacuum_query += " ANALYZE"
if full:
logger.warning(
"VACUUM FULL not implemented yet. Running plain VACUUM instead."
)
try:
await self.connection_manager.execute_query(
f"{vacuum_query} {table_name}"
)
except Exception as e:
logger.error(f"Error vacuuming table {table_name}: {str(e)}")
raise e
async def vacuum_all_tables(
self,
analyze: bool = False,
full: bool = False,
):
"""Vacuum all tables in the database"""
vacuum_query = "VACUUM"
if analyze:
vacuum_query += " ANALYZE"
if full:
logger.warning(
"VACUUM FULL not implemented yet. Running plain VACUUM instead."
)
try:
await self.connection_manager.execute_query(vacuum_query)
except Exception as e:
logger.error(f"Error vacuuming all tables: {str(e)}")
raise e
+7 -1
View File
@@ -22,6 +22,7 @@ from .graphs import (
PostgresRelationshipsHandler,
)
from .limits import PostgresLimitsHandler
from .maintenance import PostgresMaintenanceHandler
from .prompts_handler import PostgresPromptsHandler
from .tokens import PostgresTokensHandler
from .users import PostgresUserHandler
@@ -68,6 +69,7 @@ class PostgresDatabaseProvider(DatabaseProvider):
files_handler: PostgresFilesHandler
conversations_handler: PostgresConversationsHandler
limits_handler: PostgresLimitsHandler
maintenance_handler: PostgresMaintenanceHandler
def __init__(
self,
@@ -184,13 +186,16 @@ class PostgresDatabaseProvider(DatabaseProvider):
dimension=self.dimension,
quantization_type=self.quantization_type,
)
self.maintenance_handler = PostgresMaintenanceHandler(
project_name=self.project_name,
connection_manager=self.connection_manager,
)
self.prompts_handler = PostgresPromptsHandler(
self.project_name, self.connection_manager
)
self.files_handler = PostgresFilesHandler(
self.project_name, self.connection_manager
)
self.limits_handler = PostgresLimitsHandler(
project_name=self.project_name,
connection_manager=self.connection_manager,
@@ -229,6 +234,7 @@ class PostgresDatabaseProvider(DatabaseProvider):
await self.relationships_handler.create_tables()
await self.conversations_handler.create_tables()
await self.limits_handler.create_tables()
await self.maintenance_handler.create_tables()
def _get_postgres_configuration_settings(
self, config: DatabaseConfig
+3
View File
@@ -0,0 +1,3 @@
from .apscheduler import APSchedulerProvider
__all__ = ["APSchedulerProvider"]
@@ -0,0 +1,37 @@
import logging
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from core.base import SchedulerConfig, SchedulerProvider
logger = logging.getLogger(__name__)
class APSchedulerProvider(SchedulerProvider):
"""Implementation using APScheduler"""
def __init__(self, config: SchedulerConfig):
super().__init__(config)
self.scheduler = AsyncIOScheduler()
async def add_job(self, func, trigger, **kwargs):
logger.info(
f"Adding job {func.__name__} with trigger {trigger} and kwargs {kwargs}"
)
self.scheduler.add_job(func, trigger, **kwargs)
async def start(self):
self.scheduler.start()
logger.info("Scheduler started")
async def shutdown(self):
if self.scheduler.running:
self.scheduler.shutdown()
logger.info("Scheduler shutdown")
async def __aenter__(self):
await self.start()
return self
async def __aexit__(self, exc_type, exc, tb):
await self.shutdown()
+6
View File
@@ -72,6 +72,9 @@ collection_summary_prompt = "collection_summary"
[database.graph_enrichment_settings]
graph_communities_prompt = "graph_communities"
[database.maintenance]
vacuum_schedule = "0 3 * * *" # Run at 3:00 AM daily
[embedding]
provider = "litellm"
# For basic applications, use `openai/text-embedding-3-small` with `base_dimension = 512`
@@ -120,3 +123,6 @@ provider = "simple"
[email]
provider = "console_mock" # `smtp` | `sendgrid` supported
[scheduler]
provider = "apscheduler"
+12 -8
View File
@@ -25,6 +25,7 @@ from core.providers.embeddings import OpenAIEmbeddingProvider
from core.providers.ingestion import R2RIngestionProvider
from core.providers.llm import OpenAICompletionProvider
from core.providers.orchestration import SimpleOrchestrationProvider
from core.providers.scheduler import APSchedulerProvider
ROUTERS = [
UsersRouter,
@@ -60,31 +61,34 @@ def mock_providers():
mock_orchestration.config = Mock()
mock_email = create_autospec(ConsoleMockEmailProvider)
mock_email.config = Mock()
mock_scheduler = create_autospec(APSchedulerProvider)
mock_scheduler.config = Mock()
# Set up any needed methods
mock_auth.auth_wrapper = Mock(return_value=lambda: None)
providers = R2RProviders(
return R2RProviders(
auth=mock_auth,
database=mock_db,
ingestion=mock_ingestion,
embedding=mock_embedding,
completion_embedding=mock_completion_embedding,
database=mock_db,
email=mock_email,
embedding=mock_embedding,
ingestion=mock_ingestion,
llm=mock_llm,
orchestration=mock_orchestration,
email=mock_email,
scheduler=mock_scheduler,
)
return providers
@pytest.fixture
def mock_services():
return R2RServices(
management=Mock(),
auth=Mock(),
ingestion=Mock(),
retrieval=Mock(),
graph=Mock(),
maintenance=Mock(),
management=Mock(),
retrieval=Mock(),
)