From 00aec7716cd7de5861b75b2e8e109a125e2f24c5 Mon Sep 17 00:00:00 2001 From: Shreyas Pimpalgaonkar Date: Fri, 1 Nov 2024 17:52:25 -0700 Subject: [PATCH] Enrichment caching (#1550) * up * up * checkin * checkin * up * up * up * up --- py/cli/commands/ingestion.py | 7 +- py/core/base/abstractions/__init__.py | 1 + py/core/base/providers/database.py | 18 +- py/core/examples/data/aristotle_v3.txt | 29 ++ .../main/orchestration/hatchet/kg_workflow.py | 50 ++- .../main/orchestration/simple/kg_workflow.py | 6 +- py/core/providers/database/document.py | 23 +- py/core/providers/database/kg.py | 352 ++++++++++++++---- py/shared/abstractions/document.py | 2 + py/shared/abstractions/graph.py | 28 ++ py/tests/core/providers/kg/test_kg_logic.py | 2 +- 11 files changed, 418 insertions(+), 100 deletions(-) create mode 100644 py/core/examples/data/aristotle_v3.txt diff --git a/py/cli/commands/ingestion.py b/py/cli/commands/ingestion.py index bd28823f4..0d4d3a091 100644 --- a/py/cli/commands/ingestion.py +++ b/py/cli/commands/ingestion.py @@ -138,10 +138,13 @@ async def update_files( @click.option( "--v2", is_flag=True, help="use aristotle_v2.txt (a smaller file)" ) +@click.option( + "--v3", is_flag=True, help="use aristotle_v3.txt (a larger file)" +) @pass_context -async def ingest_sample_file(ctx, v2=False): +async def ingest_sample_file(ctx, v2=False, v3=False): """Ingest the first sample file into R2R.""" - sample_file_url = f"https://raw.githubusercontent.com/SciPhi-AI/R2R/main/py/core/examples/data/aristotle{'_v2' if v2 else ''}.txt" + sample_file_url = f"https://raw.githubusercontent.com/SciPhi-AI/R2R/main/py/core/examples/data/aristotle{'_v2' if v2 else ''}{'_v3' if v3 else ''}.txt" client = ctx.obj with timer(): diff --git a/py/core/base/abstractions/__init__.py b/py/core/base/abstractions/__init__.py index ba748d443..1fa824a80 100644 --- a/py/core/base/abstractions/__init__.py +++ b/py/core/base/abstractions/__init__.py @@ -27,6 +27,7 @@ from shared.abstractions.graph import ( KGExtraction, RelationshipType, Triple, + CommunityInfo, ) from shared.abstractions.ingestion import ( ChunkEnrichmentSettings, diff --git a/py/core/base/providers/database.py b/py/core/base/providers/database.py index 0442aa06f..e6c18fbd1 100644 --- a/py/core/base/providers/database.py +++ b/py/core/base/providers/database.py @@ -680,7 +680,7 @@ class KGHandler(Handler): # Community management @abstractmethod - async def add_communities(self, communities: List[Any]) -> None: + async def add_community_info(self, communities: List[Any]) -> None: """Add communities to storage.""" pass @@ -862,7 +862,9 @@ class KGHandler(Handler): raise NotImplementedError @abstractmethod - async def get_all_triples(self, collection_id: UUID) -> List[Triple]: + async def get_all_triples( + self, collection_id: UUID, document_ids: Optional[list[UUID]] = None + ) -> List[Triple]: raise NotImplementedError @abstractmethod @@ -1566,9 +1568,9 @@ class DatabaseProvider(Provider): return await self.kg_handler.upsert_embeddings(data, table_name) # Community methods - async def add_communities(self, communities: List[Any]) -> None: + async def add_community_info(self, communities: List[Any]) -> None: """Forward to KG handler add_communities method.""" - return await self.kg_handler.add_communities(communities) + return await self.kg_handler.add_community_info(communities) async def get_communities( self, @@ -1728,8 +1730,12 @@ class DatabaseProvider(Provider): collection_id, kg_deduplication_settings ) - async def get_all_triples(self, collection_id: UUID) -> List[Triple]: - return await self.kg_handler.get_all_triples(collection_id) + async def get_all_triples( + self, collection_id: UUID, document_ids: Optional[list[UUID]] = None + ) -> List[Triple]: + return await self.kg_handler.get_all_triples( + collection_id, document_ids + ) async def update_entity_descriptions(self, entities: list[Entity]): return await self.kg_handler.update_entity_descriptions(entities) diff --git a/py/core/examples/data/aristotle_v3.txt b/py/core/examples/data/aristotle_v3.txt new file mode 100644 index 000000000..9b03b2d16 --- /dev/null +++ b/py/core/examples/data/aristotle_v3.txt @@ -0,0 +1,29 @@ + +Aristotle proposed a three-part structure for souls of plants, animals, and humans, making humans unique in having all three types of soul. +Aristotle's psychology, given in his treatise On the Soul (peri psychēs), posits three kinds of soul ("psyches"): the vegetative soul, the sensitive soul, and the rational soul. Humans have all three. The vegetative soul is concerned with growth and nourishment. The sensitive soul experiences sensations and movement. The unique part of the human, rational soul is its ability to receive forms of other things and to compare them using the nous (intellect) and logos (reason).[93] + +For Aristotle, the soul is the form of a living being. Because all beings are composites of form and matter, the form of living beings is that which endows them with what is specific to living beings, e.g. the ability to initiate movement (or in the case of plants, growth and transformations, which Aristotle considers types of movement).[11] In contrast to earlier philosophers, but in accordance with the Egyptians, he placed the rational soul in the heart, rather than the brain.[94] Notable is Aristotle's division of sensation and thought, which generally differed from the concepts of previous philosophers, with the exception of Alcmaeon.[95] + +In On the Soul, Aristotle famously criticizes Plato's theory of the soul and develops his own in response. The first criticism is against Plato's view of the soul in the Timaeus that the soul takes up space and is able to come into physical contact with bodies.[96] 20th-century scholarship overwhelmingly opposed Aristotle's interpretation of Plato and maintained that he had misunderstood him.[97] Today's scholars have tended to re-assess Aristotle's interpretation and been more positive about it.[98] Aristotle's other criticism is that Plato's view of reincarnation entails that it is possible for a soul and its body to be mis-matched; in principle, Aristotle alleges, any soul can go with any body, according to Plato's theory.[99] Aristotle's claim that the soul is the form of a living being eliminates that possibility and thus rules out reincarnation.[100] + +Memory +According to Aristotle in On the Soul, memory is the ability to hold a perceived experience in the mind and to distinguish between the internal "appearance" and an occurrence in the past.[101] In other words, a memory is a mental picture (phantasm) that can be recovered. Aristotle believed an impression is left on a semi-fluid bodily organ that undergoes several changes in order to make a memory. A memory occurs when stimuli such as sights or sounds are so complex that the nervous system cannot receive all the impressions at once. These changes are the same as those involved in the operations of sensation, Aristotelian 'common sense', and thinking.[102][103] + +Aristotle uses the term 'memory' for the actual retaining of an experience in the impression that can develop from sensation, and for the intellectual anxiety that comes with the impression because it is formed at a particular time and processing specific contents. Memory is of the past, prediction is of the future, and sensation is of the present. Retrieval of impressions cannot be performed suddenly. A transitional channel is needed and located in past experiences, both for previous experience and present experience.[104] + +Because Aristotle believes people receive all kinds of sense perceptions and perceive them as impressions, people are continually weaving together new impressions of experiences. To search for these impressions, people search the memory itself.[105] Within the memory, if one experience is offered instead of a specific memory, that person will reject this experience until they find what they are looking for. Recollection occurs when one retrieved experience naturally follows another. If the chain of "images" is needed, one memory will stimulate the next. When people recall experiences, they stimulate certain previous experiences until they reach the one that is needed.[106] Recollection is thus the self-directed activity of retrieving the information stored in a memory impression.[107] Only humans can remember impressions of intellectual activity, such as numbers and words. Animals that have perception of time can retrieve memories of their past observations. Remembering involves only perception of the things remembered and of the time passed.[108] + + +Senses, perception, memory, dreams, action in Aristotle's psychology. Impressions are stored in the sensorium (the heart), linked by his laws of association (similarity, contrast, and contiguity). +Aristotle believed the chain of thought, which ends in recollection of certain impressions, was connected systematically in relationships such as similarity, contrast, and contiguity, described in his laws of association. Aristotle believed that past experiences are hidden within the mind. A force operates to awaken the hidden material to bring up the actual experience. According to Aristotle, association is the power innate in a mental state, which operates upon the unexpressed remains of former experiences, allowing them to rise and be recalled.[109][110] + +Dreams +Further information: Dream § Other +Aristotle describes sleep in On Sleep and Wakefulness.[111] Sleep takes place as a result of overuse of the senses[112] or of digestion,[113] so it is vital to the body.[112] While a person is asleep, the critical activities, which include thinking, sensing, recalling and remembering, do not function as they do during wakefulness. Since a person cannot sense during sleep, they cannot have desire, which is the result of sensation. However, the senses are able to work during sleep,[114] albeit differently,[111] unless they are weary.[112] + +Dreams do not involve actually sensing a stimulus. In dreams, sensation is still involved, but in an altered manner.[112] Aristotle explains that when a person stares at a moving stimulus such as the waves in a body of water, and then looks away, the next thing they look at appears to have a wavelike motion. When a person perceives a stimulus and the stimulus is no longer the focus of their attention, it leaves an impression.[111] When the body is awake and the senses are functioning properly, a person constantly encounters new stimuli to sense and so the impressions of previously perceived stimuli are ignored.[112] However, during sleep the impressions made throughout the day are noticed as there are no new distracting sensory experiences.[111] So, dreams result from these lasting impressions. Since impressions are all that are left and not the exact stimuli, dreams do not resemble the actual waking experience.[115] During sleep, a person is in an altered state of mind. Aristotle compares a sleeping person to a person who is overtaken by strong feelings toward a stimulus. For example, a person who has a strong infatuation with someone may begin to think they see that person everywhere because they are so overtaken by their feelings. Since a person sleeping is in a suggestible state and unable to make judgements, they become easily deceived by what appears in their dreams, like the infatuated person.[111] This leads the person to believe the dream is real, even when the dreams are absurd in nature.[111] In De Anima iii 3, Aristotle ascribes the ability to create, to store, and to recall images in the absence of perception to the faculty of imagination, phantasia.[11] + +One component of Aristotle's theory of dreams disagrees with previously held beliefs. He claimed that dreams are not foretelling and not sent by a divine being. Aristotle reasoned naturalistically that instances in which dreams do resemble future events are simply coincidences.[116] Aristotle claimed that a dream is first established by the fact that the person is asleep when they experience it. If a person had an image appear for a moment after waking up or if they see something in the dark it is not considered a dream because they were awake when it occurred. Secondly, any sensory experience that is perceived while a person is asleep does not qualify as part of a dream. For example, if, while a person is sleeping, a door shuts and in their dream they hear a door is shut, this sensory experience is not part of the dream. Lastly, the images of dreams must be a result of lasting impressions of waking sensory experiences.[115] + +Practical philosophy +Aristotle's practical philosophy covers areas such as ethics, politics, economics, and rhetoric.[40] diff --git a/py/core/main/orchestration/hatchet/kg_workflow.py b/py/core/main/orchestration/hatchet/kg_workflow.py index 1e31dc6fa..11d9c05a3 100644 --- a/py/core/main/orchestration/hatchet/kg_workflow.py +++ b/py/core/main/orchestration/hatchet/kg_workflow.py @@ -151,7 +151,7 @@ def hatchet_kg_factory( f"Failed to update document status for {document_id}: {e}" ) - @orchestration_provider.workflow(name="create-graph", timeout="360m") + @orchestration_provider.workflow(name="create-graph", timeout="600m") class CreateGraphWorkflow: def __init__(self, kg_service: KgService): self.kg_service = kg_service @@ -201,7 +201,7 @@ def hatchet_kg_factory( ) results.append( ( - context.aio.spawn_workflow( + await context.aio.spawn_workflow( "kg-extract", { "request": { @@ -218,7 +218,7 @@ def hatchet_kg_factory( }, key=f"kg-extract-{cnt}/{len(document_ids)}", ) - ) + ).result() ) if not document_ids: @@ -233,6 +233,37 @@ def hatchet_kg_factory( "result": f"successfully ran graph creation workflows for {len(results)} documents" } + @orchestration_provider.step( + retries=1, parents=["kg_extraction_ingress"] + ) + async def update_enrichment_status(self, context: Context) -> dict: + + enrichment_status = ( + await self.kg_service.providers.database.get_workflow_status( + id=uuid.UUID( + context.workflow_input()["request"]["collection_id"] + ), + status_type="kg_enrichment_status", + ) + ) + + if enrichment_status == KGEnrichmentStatus.SUCCESS: + await self.kg_service.providers.database.set_workflow_status( + id=uuid.UUID( + context.workflow_input()["request"]["collection_id"] + ), + status_type="kg_enrichment_status", + status=KGEnrichmentStatus.OUTDATED, + ) + + logger.info( + f"Updated enrichment status for collection {context.workflow_input()['request']['collection_id']} to OUTDATED because an older enrichment was already successful" + ) + + return { + "result": f"updated enrichment status for collection {context.workflow_input()['request']['collection_id']} to OUTDATED because an older enrichment was already successful" + } + @orchestration_provider.workflow( name="entity-deduplication", timeout="360m" ) @@ -412,6 +443,19 @@ def hatchet_kg_factory( logger.info(f"Ran {len(results)} workflows for community summary") # set status to success + # for all documents in the collection, set kg_creation_status to ENRICHED + document_ids = await self.kg_service.providers.database.get_document_ids_by_status( + status_type="kg_extraction_status", + status=KGExtractionStatus.SUCCESS, + collection_id=collection_id, + ) + + await self.kg_service.providers.database.set_workflow_status( + id=document_ids, + status_type="kg_extraction_status", + status=KGExtractionStatus.ENRICHED, + ) + await self.kg_service.providers.database.set_workflow_status( id=collection_id, status_type="kg_enrichment_status", diff --git a/py/core/main/orchestration/simple/kg_workflow.py b/py/core/main/orchestration/simple/kg_workflow.py index ff111b448..cca48cc7d 100644 --- a/py/core/main/orchestration/simple/kg_workflow.py +++ b/py/core/main/orchestration/simple/kg_workflow.py @@ -76,6 +76,10 @@ def simple_kg_factory(service: KgService): num_communities = num_communities[0]["num_communities"] # TODO - Do not hardcode the number of parallel communities, # make it a configurable parameter at runtime & add server-side defaults + + if num_communities == 0: + raise R2RException("No communities found", 400) + parallel_communities = min(100, num_communities) total_workflows = math.ceil(num_communities / parallel_communities) @@ -111,7 +115,7 @@ def simple_kg_factory(service: KgService): status=KGEnrichmentStatus.FAILED, ) - raise R2RException(f"Error in enriching graph: {e}") + raise e async def kg_community_summary(input_data): diff --git a/py/core/providers/database/document.py b/py/core/providers/database/document.py index db08aaad8..c077a3cb3 100644 --- a/py/core/providers/database/document.py +++ b/py/core/providers/database/document.py @@ -202,7 +202,10 @@ class PostgresDocumentHandler(DocumentHandler): SELECT {status_type} FROM {self._get_table_name(table_name)} WHERE {column_name} = ANY($1) """ - return await self.connection_manager.fetch_query(query, [ids]) + return [ + row[status_type] + for row in await self.connection_manager.fetch_query(query, [ids]) + ] async def _get_ids_from_table( self, @@ -288,19 +291,17 @@ class PostgresDocumentHandler(DocumentHandler): Returns: The workflow status for the given document or list of documents. """ + ids = [id] if isinstance(id, UUID) else id out_model = self._get_status_model(status_type) - result = list( - map( - await self._get_status_from_table( - ids, - out_model.table_name(), - status_type, - out_model.id_column(), - ), - out_model, - ) + result = await self._get_status_from_table( + ids, + out_model.table_name(), + status_type, + out_model.id_column(), ) + + result = [out_model[status.upper()] for status in result] return result[0] if isinstance(id, UUID) else result async def set_workflow_status( diff --git a/py/core/providers/database/kg.py b/py/core/providers/database/kg.py index e79a42b27..dc025266a 100644 --- a/py/core/providers/database/kg.py +++ b/py/core/providers/database/kg.py @@ -18,6 +18,8 @@ from core.base import ( ) from core.base.abstractions import ( EntityLevel, + CommunityInfo, + KGEnrichmentStatus, KGCreationSettings, KGEnrichmentSettings, KGEntityDeduplicationSettings, @@ -500,31 +502,52 @@ class PostgresKGHandler(KGHandler): for property_name in property_names } - async def get_all_triples(self, collection_id: UUID) -> list[Triple]: + async def get_all_triples( + self, collection_id: UUID, document_ids: Optional[list[UUID]] = None + ) -> list[Triple]: # getting all documents for a collection - QUERY = f""" - select distinct document_id from {self._get_table_name("document_info")} where $1 = ANY(collection_ids) - """ - document_ids = await self.connection_manager.fetch_query( - QUERY, [collection_id] - ) - document_ids = [doc_id["document_id"] for doc_id in document_ids] + if document_ids is None: + QUERY = f""" + select distinct document_id from {self._get_table_name("document_info")} where $1 = ANY(collection_ids) + """ + document_ids_list = await self.connection_manager.fetch_query( + QUERY, [collection_id] + ) + document_ids = [ + doc_id["document_id"] for doc_id in document_ids_list + ] QUERY = f""" - SELECT id, subject, predicate, weight, object FROM {self._get_table_name("chunk_triple")} WHERE document_id = ANY($1) + SELECT id, subject, predicate, weight, object, document_id FROM {self._get_table_name("chunk_triple")} WHERE document_id = ANY($1) """ triples = await self.connection_manager.fetch_query( QUERY, [document_ids] ) return [Triple(**triple) for triple in triples] - async def add_communities(self, communities: list[Any]) -> None: + async def add_community_info( + self, communities: list[CommunityInfo] + ) -> None: QUERY = f""" INSERT INTO {self._get_table_name("community_info")} (node, cluster, parent_cluster, level, is_final_cluster, triple_ids, collection_id) VALUES ($1, $2, $3, $4, $5, $6, $7) """ - await self.connection_manager.execute_many(QUERY, communities) + communities_tuples_list = [ + ( + community.node, + community.cluster, + community.parent_cluster, + community.level, + community.is_final_cluster, + community.triple_ids, + community.collection_id, + ) + for community in communities + ] + await self.connection_manager.execute_many( + QUERY, communities_tuples_list + ) async def get_communities( self, @@ -608,6 +631,239 @@ class PostgresKGHandler(KGHandler): QUERY, [tuple(non_null_attrs.values())] ) + async def _create_graph_and_cluster( + self, triples: list[Triple], leiden_params: dict[str, Any] + ) -> Any: + + G = self.nx.Graph() + for triple in triples: + G.add_edge( + triple.subject, + triple.object, + weight=triple.weight, + id=triple.id, + ) + + hierarchical_communities = await self._compute_leiden_communities( + G, leiden_params + ) + + return hierarchical_communities + + async def _cluster_and_add_community_info( + self, + triples: list[Triple], + triple_ids_cache: dict[str, list[int]], + leiden_params: dict[str, Any], + collection_id: UUID, + ) -> int: + + # clear if there is any old information + QUERY = f""" + DELETE FROM {self._get_table_name("community_info")} WHERE collection_id = $1 + """ + await self.connection_manager.execute_query(QUERY, [collection_id]) + + QUERY = f""" + DELETE FROM {self._get_table_name("community_report")} WHERE collection_id = $1 + """ + await self.connection_manager.execute_query(QUERY, [collection_id]) + + start_time = time.time() + + hierarchical_communities = await self._create_graph_and_cluster( + triples, leiden_params + ) + + logger.info( + f"Computing Leiden communities completed, time {time.time() - start_time:.2f} seconds." + ) + + def triple_ids(node: str) -> list[int]: + return triple_ids_cache.get(node, []) + + logger.info( + f"Cached {len(triple_ids_cache)} triple ids, time {time.time() - start_time:.2f} seconds." + ) + + # upsert the communities into the database. + inputs = [ + CommunityInfo( + node=str(item.node), + cluster=item.cluster, + parent_cluster=item.parent_cluster, + level=item.level, + is_final_cluster=item.is_final_cluster, + triple_ids=triple_ids(item.node), + collection_id=collection_id, + ) + for item in hierarchical_communities + ] + + await self.add_community_info(inputs) + + num_communities = ( + max([item.cluster for item in hierarchical_communities]) + 1 + ) + + logger.info( + f"Generated {num_communities} communities, time {time.time() - start_time:.2f} seconds." + ) + + return num_communities + + async def _use_community_cache( + self, collection_id: UUID, triple_ids_cache: dict[str, list[int]] + ) -> bool: + + # check if status is enriched or stale + QUERY = f""" + SELECT kg_enrichment_status FROM {self._get_table_name("collections")} WHERE collection_id = $1 + """ + status = ( + await self.connection_manager.fetchrow_query( + QUERY, [collection_id] + ) + )["kg_enrichment_status"] + if status == KGEnrichmentStatus.PENDING: + return False + + # check the number of entities in the cache. + QUERY = f""" + SELECT COUNT(distinct node) FROM {self._get_table_name("community_info")} WHERE collection_id = $1 + """ + num_entities = ( + await self.connection_manager.fetchrow_query( + QUERY, [collection_id] + ) + )["count"] + + # a hard threshold of 80% of the entities in the cache. + if num_entities > 0.8 * len(triple_ids_cache): + return True + else: + return False + + async def _get_triple_ids_cache( + self, triples: list[Triple] + ) -> dict[str, list[int]]: + + # caching the triple ids + triple_ids_cache = dict[str, list[int]]() + for triple in triples: + if ( + triple.subject not in triple_ids_cache + and triple.subject is not None + ): + triple_ids_cache[triple.subject] = [] + if ( + triple.object not in triple_ids_cache + and triple.object is not None + ): + triple_ids_cache[triple.object] = [] + if triple.subject is not None and triple.id is not None: + triple_ids_cache[triple.subject].append(triple.id) + if triple.object is not None and triple.id is not None: + triple_ids_cache[triple.object].append(triple.id) + + return triple_ids_cache + + async def _incremental_clustering( + self, + triple_ids_cache: dict[str, list[int]], + leiden_params: dict[str, Any], + collection_id: UUID, + ) -> int: + """ + Performs incremental clustering on new triples by: + 1. Getting all triples and new triples + 2. Getting community mapping for all existing triples + 3. For each new triple: + - Check if subject/object exists in community mapping + - If exists, add its cluster to updated communities set + - If not, append triple to new_triple_ids list for clustering + 4. Run hierarchical clustering on new_triple_ids list + 5. Update community info table with new clusters, offsetting IDs by max_cluster_id + """ + + QUERY = f""" + SELECT node, cluster, is_final_cluster FROM {self._get_table_name("community_info")} WHERE collection_id = $1 + """ + + communities = await self.connection_manager.fetch_query( + QUERY, [collection_id] + ) + max_cluster_id = max( + [community["cluster"] for community in communities] + ) + + # TODO: modify above query to get a dict grouped by node (without aggregation) + communities_dict = {} # type: ignore + for community in communities: + if community["node"] not in communities_dict: + communities_dict[community["node"]] = [] + communities_dict[community["node"]].append(community) + + QUERY = f""" + SELECT document_id FROM {self._get_table_name("document_info")} WHERE $1 = ANY(collection_ids) and kg_extraction_status = $2 + """ + + new_document_ids = await self.connection_manager.fetch_query( + QUERY, [collection_id, KGExtractionStatus.SUCCESS] + ) + + new_triple_ids = await self.get_all_triples( + collection_id, new_document_ids + ) + + # community mapping for new triples + updated_communities = set() + new_triples = [] + for triple in new_triple_ids: + # bias towards subject + if triple.subject in communities_dict: + for community in communities_dict[triple.subject]: + updated_communities.add(community["cluster"]) + elif triple.object in communities_dict: + for community in communities_dict[triple.object]: + updated_communities.add(community["cluster"]) + else: + new_triples.append(triple) + + # delete the communities information for the updated communities + QUERY = f""" + DELETE FROM {self._get_table_name("community_report")} WHERE collection_id = $1 AND community_number = ANY($2) + """ + await self.connection_manager.execute_query( + QUERY, [collection_id, updated_communities] + ) + + hierarchical_communities_output = await self._create_graph_and_cluster( + new_triples, leiden_params + ) + + community_info = [] + for community in hierarchical_communities_output: + community_info.append( + CommunityInfo( + node=community.node, + cluster=community.cluster + max_cluster_id, + parent_cluster=( + community.parent_cluster + max_cluster_id + if community.parent_cluster is not None + else None + ), + level=community.level, + triple_ids=[], # FIXME: need to get the triple ids for the community + is_final_cluster=community.is_final_cluster, + collection_id=collection_id, + ) + ) + + await self.add_community_info(community_info) + num_communities = max([item.cluster for item in community_info]) + 1 + return num_communities + async def perform_graph_clustering( self, collection_id: UUID, @@ -631,77 +887,21 @@ class PostgresKGHandler(KGHandler): """ start_time = time.time() + triples = await self.get_all_triples(collection_id) logger.info(f"Clustering with settings: {leiden_params}") - G = self.nx.Graph() - for triple in triples: - G.add_edge( - triple.subject, - triple.object, - weight=triple.weight, - id=triple.id, + triple_ids_cache = await self._get_triple_ids_cache(triples) + + if await self._use_community_cache(collection_id, triple_ids_cache): + num_communities = await self._incremental_clustering( + triple_ids_cache, leiden_params, collection_id ) - - logger.info("Computing Leiden communities started.") - - hierarchical_communities = await self._compute_leiden_communities( - G, leiden_params - ) - - logger.info( - f"Computing Leiden communities completed, time {time.time() - start_time:.2f} seconds." - ) - - # caching the triple ids - triple_ids_cache = dict[str, list[int]]() - for triple in triples: - if ( - triple.subject not in triple_ids_cache - and triple.subject is not None - ): - triple_ids_cache[triple.subject] = [] - if ( - triple.object not in triple_ids_cache - and triple.object is not None - ): - triple_ids_cache[triple.object] = [] - if triple.subject is not None and triple.id is not None: - triple_ids_cache[triple.subject].append(triple.id) - if triple.object is not None and triple.id is not None: - triple_ids_cache[triple.object].append(triple.id) - - def triple_ids(node: str) -> list[int]: - return triple_ids_cache.get(node, []) - - logger.info( - f"Cached {len(triple_ids_cache)} triple ids, time {time.time() - start_time:.2f} seconds." - ) - - # upsert the communities into the database. - inputs = [ - ( - str(item.node), - item.cluster, - item.parent_cluster, - item.level, - item.is_final_cluster, - triple_ids(item.node), - collection_id, + else: + num_communities = await self._cluster_and_add_community_info( + triples, triple_ids_cache, leiden_params, collection_id ) - for item in hierarchical_communities - ] - - await self.add_communities(inputs) - - num_communities = len( - {item.cluster for item in hierarchical_communities} - ) - - logger.info( - f"Generated {num_communities} communities, time {time.time() - start_time:.2f} seconds." - ) return num_communities diff --git a/py/shared/abstractions/document.py b/py/shared/abstractions/document.py index 66064d337..624fca0ee 100644 --- a/py/shared/abstractions/document.py +++ b/py/shared/abstractions/document.py @@ -141,6 +141,7 @@ class KGExtractionStatus(str, Enum): PENDING = "pending" PROCESSING = "processing" SUCCESS = "success" + ENRICHED = "enriched" FAILED = "failed" def __str__(self): @@ -160,6 +161,7 @@ class KGEnrichmentStatus(str, Enum): PENDING = "pending" PROCESSING = "processing" + OUTDATED = "outdated" SUCCESS = "success" FAILED = "failed" diff --git a/py/shared/abstractions/graph.py b/py/shared/abstractions/graph.py index 049679cdc..8ec6f7b63 100644 --- a/py/shared/abstractions/graph.py +++ b/py/shared/abstractions/graph.py @@ -227,6 +227,34 @@ class Community(BaseModel): ) +@dataclass +class CommunityInfo(BaseModel): + """A protocol for a community in the system.""" + + node: str + cluster: int + parent_cluster: int | None + level: int + is_final_cluster: bool + collection_id: uuid.UUID + triple_ids: Optional[list[int]] = None + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + @classmethod + def from_dict(cls, d: dict[str, Any]) -> "CommunityInfo": + return CommunityInfo( + node=d["node"], + cluster=d["cluster"], + parent_cluster=d["parent_cluster"], + level=d["level"], + is_final_cluster=d["is_final_cluster"], + triple_ids=d["triple_ids"], + collection_id=d["collection_id"], + ) + + @dataclass class CommunityReport(BaseModel): """Defines an LLM-generated summary report of a community.""" diff --git a/py/tests/core/providers/kg/test_kg_logic.py b/py/tests/core/providers/kg/test_kg_logic.py index 9b018fec1..41378b438 100644 --- a/py/tests/core/providers/kg/test_kg_logic.py +++ b/py/tests/core/providers/kg/test_kg_logic.py @@ -393,7 +393,7 @@ async def test_get_community_details( await postgres_db_provider.add_triples( triples_raw_list, table_name="chunk_triple" ) - await postgres_db_provider.add_communities(community_table_info) + await postgres_db_provider.add_community_info(community_table_info) await postgres_db_provider.add_community_report(community_report_list[0]) community_level, entities, triples = (