Source code for langchain_community.graph_vectorstores.cassandra
"""Apache Cassandra DB graph vector store integration."""from__future__importannotationsimportasyncioimportjsonimportloggingimportsecretsfromdataclassesimportasdict,is_dataclassfromtypingimport(TYPE_CHECKING,Any,AsyncIterable,Iterable,List,Optional,Sequence,Tuple,Type,TypeVar,cast,)fromlangchain_core._apiimportbetafromlangchain_core.documentsimportDocumentfromtyping_extensionsimportoverridefromlangchain_community.graph_vectorstores.baseimportGraphVectorStore,Nodefromlangchain_community.graph_vectorstores.linksimportMETADATA_LINKS_KEY,Linkfromlangchain_community.graph_vectorstores.mmr_helperimportMmrHelperfromlangchain_community.utilities.cassandraimportSetupModefromlangchain_community.vectorstores.cassandraimportCassandraasCassandraVectorStoreCGVST=TypeVar("CGVST",bound="CassandraGraphVectorStore")ifTYPE_CHECKING:fromcassandra.clusterimportSessionfromlangchain_core.embeddingsimportEmbeddingslogger=logging.getLogger(__name__)
[docs]def__init__(self,node:Node,embedding:list[float])->None:"""Create an Adjacent Node."""self.id=node.idor""self.links=node.linksself.embedding=embedding
def_serialize_links(links:list[Link])->str:classSetAndLinkEncoder(json.JSONEncoder):defdefault(self,obj:Any)->Any:# noqa: ANN401ifnotisinstance(obj,type)andis_dataclass(obj):returnasdict(obj)ifisinstance(obj,Iterable):returnlist(obj)# Let the base class default method raise the TypeErrorreturnsuper().default(obj)returnjson.dumps(links,cls=SetAndLinkEncoder)def_deserialize_links(json_blob:str|None)->set[Link]:return{Link(kind=link["kind"],direction=link["direction"],tag=link["tag"])forlinkincast(list[dict[str,Any]],json.loads(json_blobor"[]"))}def_metadata_link_key(link:Link)->str:returnf"link:{link.kind}:{link.tag}"def_metadata_link_value()->str:return"link"def_doc_to_node(doc:Document)->Node:metadata=doc.metadata.copy()links=_deserialize_links(metadata.get(METADATA_LINKS_KEY))metadata[METADATA_LINKS_KEY]=linksreturnNode(id=doc.id,text=doc.page_content,metadata=metadata,links=list(links),)def_incoming_links(node:Node|AdjacentNode)->set[Link]:return{linkforlinkinnode.linksiflink.directionin["in","bidir"]}def_outgoing_links(node:Node|AdjacentNode)->set[Link]:return{linkforlinkinnode.linksiflink.directionin["out","bidir"]}
[docs]def__init__(self,embedding:Embeddings,session:Session|None=None,keyspace:str|None=None,table_name:str="",ttl_seconds:int|None=None,*,body_index_options:list[tuple[str,Any]]|None=None,setup_mode:SetupMode=SetupMode.SYNC,metadata_deny_list:Optional[list[str]]=None,)->None:"""Apache Cassandra(R) for graph-vector-store workloads. To use it, you need a recent installation of the `cassio` library and a Cassandra cluster / Astra DB instance supporting vector capabilities. Example: .. code-block:: python from langchain_community.graph_vectorstores import CassandraGraphVectorStore from langchain_openai import OpenAIEmbeddings embeddings = OpenAIEmbeddings() session = ... # create your Cassandra session object keyspace = 'my_keyspace' # the keyspace should exist already table_name = 'my_graph_vector_store' vectorstore = CassandraGraphVectorStore( embeddings, session, keyspace, table_name, ) Args: embedding: Embedding function to use. session: Cassandra driver session. If not provided, it is resolved from cassio. keyspace: Cassandra keyspace. If not provided, it is resolved from cassio. table_name: Cassandra table (required). ttl_seconds: Optional time-to-live for the added texts. body_index_options: Optional options used to create the body index. Eg. body_index_options = [cassio.table.cql.STANDARD_ANALYZER] setup_mode: mode used to create the Cassandra table (SYNC, ASYNC or OFF). metadata_deny_list: Optional list of metadata keys to not index. i.e. to fine-tune which of the metadata fields are indexed. Note: if you plan to have massive unique text metadata entries, consider not indexing them for performance (and to overcome max-length limitations). Note: the `metadata_indexing` parameter from langchain_community.utilities.cassandra.Cassandra is not exposed since CassandraGraphVectorStore only supports the deny_list option. """self.embedding=embeddingifmetadata_deny_listisNone:metadata_deny_list=[]metadata_deny_list.append(METADATA_LINKS_KEY)self.vector_store=CassandraVectorStore(embedding=embedding,session=session,keyspace=keyspace,table_name=table_name,ttl_seconds=ttl_seconds,body_index_options=body_index_options,setup_mode=setup_mode,metadata_indexing=("deny_list",metadata_deny_list),)store_session:Session=self.vector_store.sessionself._insert_node=store_session.prepare(f""" INSERT INTO {keyspace}.{table_name} ( row_id, body_blob, vector, attributes_blob, metadata_s ) VALUES (?, ?, ?, ?, ?) """# noqa: S608)
@property@overridedefembeddings(self)->Embeddings|None:returnself.embeddingdef_get_metadata_filter(self,metadata:dict[str,Any]|None=None,outgoing_link:Link|None=None,)->dict[str,Any]:ifoutgoing_linkisNone:returnmetadataor{}metadata_filter={}ifmetadataisNoneelsemetadata.copy()metadata_filter[_metadata_link_key(link=outgoing_link)]=_metadata_link_value()returnmetadata_filterdef_restore_links(self,doc:Document)->Document:"""Restores the links in the document by deserializing them from metadata. Args: doc: A single Document Returns: The same Document with restored links. """links=_deserialize_links(doc.metadata.get(METADATA_LINKS_KEY))doc.metadata[METADATA_LINKS_KEY]=links# TODO: Could this be skipped if we put these metadata entries# only in the searchable `metadata_s` column?forincoming_link_keyin[_metadata_link_key(link=link)forlinkinlinksiflink.directionin["in","bidir"]]:ifincoming_link_keyindoc.metadata:deldoc.metadata[incoming_link_key]returndocdef_get_node_metadata_for_insertion(self,node:Node)->dict[str,Any]:metadata=node.metadata.copy()metadata[METADATA_LINKS_KEY]=_serialize_links(node.links)# TODO: Could we could put these metadata entries# only in the searchable `metadata_s` column?forincoming_linkin_incoming_links(node=node):metadata[_metadata_link_key(link=incoming_link)]=_metadata_link_value()returnmetadatadef_get_docs_for_insertion(self,nodes:Iterable[Node])->tuple[list[Document],list[str]]:docs=[]ids=[]fornodeinnodes:node_id=secrets.token_hex(8)ifnotnode.idelsenode.iddoc=Document(page_content=node.text,metadata=self._get_node_metadata_for_insertion(node=node),id=node_id,)docs.append(doc)ids.append(node_id)return(docs,ids)
[docs]@overridedefadd_nodes(self,nodes:Iterable[Node],**kwargs:Any,)->Iterable[str]:"""Add nodes to the graph store. Args: nodes: the nodes to add. **kwargs: Additional keyword arguments. """(docs,ids)=self._get_docs_for_insertion(nodes=nodes)returnself.vector_store.add_documents(docs,ids=ids)
[docs]@overrideasyncdefaadd_nodes(self,nodes:Iterable[Node],**kwargs:Any,)->AsyncIterable[str]:"""Add nodes to the graph store. Args: nodes: the nodes to add. **kwargs: Additional keyword arguments. """(docs,ids)=self._get_docs_for_insertion(nodes=nodes)forinserted_idinawaitself.vector_store.aadd_documents(docs,ids=ids):yieldinserted_id
[docs]@overridedefsimilarity_search(self,query:str,k:int=4,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Retrieve documents from this graph store. Args: query: The query string. k: The number of Documents to return. Defaults to 4. filter: Optional metadata to filter the results. **kwargs: Additional keyword arguments. Returns: Collection of retrieved documents. """return[self._restore_links(doc)fordocinself.vector_store.similarity_search(query=query,k=k,filter=filter,**kwargs,)]
[docs]@overrideasyncdefasimilarity_search(self,query:str,k:int=4,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Retrieve documents from this graph store. Args: query: The query string. k: The number of Documents to return. Defaults to 4. filter: Optional metadata to filter the results. **kwargs: Additional keyword arguments. Returns: Collection of retrieved documents. """return[self._restore_links(doc)fordocinawaitself.vector_store.asimilarity_search(query=query,k=k,filter=filter,**kwargs,)]
[docs]@overridedefsimilarity_search_by_vector(self,embedding:list[float],k:int=4,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Return docs most similar to embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. **kwargs: Additional arguments are ignored. Returns: The list of Documents most similar to the query vector. """return[self._restore_links(doc)fordocinself.vector_store.similarity_search_by_vector(embedding,k=k,filter=filter,**kwargs,)]
[docs]@overrideasyncdefasimilarity_search_by_vector(self,embedding:list[float],k:int=4,filter:dict[str,Any]|None=None,**kwargs:Any,)->list[Document]:"""Return docs most similar to embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Filter on the metadata to apply. **kwargs: Additional arguments are ignored. Returns: The list of Documents most similar to the query vector. """return[self._restore_links(doc)fordocinawaitself.vector_store.asimilarity_search_by_vector(embedding,k=k,filter=filter,**kwargs,)]
[docs]defmetadata_search(self,filter:dict[str,Any]|None=None,# noqa: A002n:int=5,)->Iterable[Document]:"""Get documents via a metadata search. Args: filter: the metadata to query for. n: the maximum number of documents to return. """return[self._restore_links(doc)fordocinself.vector_store.metadata_search(filter=filteror{},n=n,)]
[docs]asyncdefametadata_search(self,filter:dict[str,Any]|None=None,# noqa: A002n:int=5,)->Iterable[Document]:"""Get documents via a metadata search. Args: filter: the metadata to query for. n: the maximum number of documents to return. """return[self._restore_links(doc)fordocinawaitself.vector_store.ametadata_search(filter=filteror{},n=n,)]
[docs]defget_by_document_id(self,document_id:str)->Document|None:"""Retrieve a single document from the store, given its document ID. Args: document_id: The document ID Returns: The the document if it exists. Otherwise None. """doc=self.vector_store.get_by_document_id(document_id=document_id)returnself._restore_links(doc)ifdocisnotNoneelseNone
[docs]asyncdefaget_by_document_id(self,document_id:str)->Document|None:"""Retrieve a single document from the store, given its document ID. Args: document_id: The document ID Returns: The the document if it exists. Otherwise None. """doc=awaitself.vector_store.aget_by_document_id(document_id=document_id)returnself._restore_links(doc)ifdocisnotNoneelseNone
[docs]defget_node(self,node_id:str)->Node|None:"""Retrieve a single node from the store, given its ID. Args: node_id: The node ID Returns: The the node if it exists. Otherwise None. """doc=self.vector_store.get_by_document_id(document_id=node_id)ifdocisNone:returnNonereturn_doc_to_node(doc=doc)
[docs]@overrideasyncdefammr_traversal_search(# noqa: C901self,query:str,*,initial_roots:Sequence[str]=(),k:int=4,depth:int=2,fetch_k:int=100,adjacent_k:int=10,lambda_mult:float=0.5,score_threshold:float=float("-inf"),filter:dict[str,Any]|None=None,**kwargs:Any,)->AsyncIterable[Document]:"""Retrieve documents from this graph store using MMR-traversal. This strategy first retrieves the top `fetch_k` results by similarity to the question. It then selects the top `k` results based on maximum-marginal relevance using the given `lambda_mult`. At each step, it considers the (remaining) documents from `fetch_k` as well as any documents connected by edges to a selected document retrieved based on similarity (a "root"). Args: query: The query string to search for. initial_roots: Optional list of document IDs to use for initializing search. The top `adjacent_k` nodes adjacent to each initial root will be included in the set of initial candidates. To fetch only in the neighborhood of these nodes, set `fetch_k = 0`. k: Number of Documents to return. Defaults to 4. fetch_k: Number of initial Documents to fetch via similarity. Will be added to the nodes adjacent to `initial_roots`. Defaults to 100. adjacent_k: Number of adjacent Documents to fetch. Defaults to 10. depth: Maximum depth of a node (number of edges) from a node retrieved via similarity. Defaults to 2. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. score_threshold: Only documents with a score greater than or equal this threshold will be chosen. Defaults to -infinity. filter: Optional metadata to filter the results. **kwargs: Additional keyword arguments. """query_embedding=self.embedding.embed_query(query)helper=MmrHelper(k=k,query_embedding=query_embedding,lambda_mult=lambda_mult,score_threshold=score_threshold,)# For each unselected node, stores the outgoing links.outgoing_links_map:dict[str,set[Link]]={}visited_links:set[Link]=set()# Map from id to Documentretrieved_docs:dict[str,Document]={}asyncdeffetch_neighborhood(neighborhood:Sequence[str])->None:nonlocaloutgoing_links_map,visited_links,retrieved_docs# Put the neighborhood into the outgoing links, to avoid adding it# to the candidate set in the future.outgoing_links_map.update({content_id:set()forcontent_idinneighborhood})# Initialize the visited_links with the set of outgoing links from the# neighborhood. This prevents re-visiting them.visited_links=awaitself._get_outgoing_links(neighborhood)# Call `self._get_adjacent` to fetch the candidates.adjacent_nodes=awaitself._get_adjacent(links=visited_links,query_embedding=query_embedding,k_per_link=adjacent_k,filter=filter,retrieved_docs=retrieved_docs,)new_candidates:dict[str,list[float]]={}foradjacent_nodeinadjacent_nodes:ifadjacent_node.idnotinoutgoing_links_map:outgoing_links_map[adjacent_node.id]=_outgoing_links(node=adjacent_node)new_candidates[adjacent_node.id]=adjacent_node.embeddinghelper.add_candidates(new_candidates)asyncdeffetch_initial_candidates()->None:nonlocaloutgoing_links_map,visited_links,retrieved_docsresults=(awaitself.vector_store.asimilarity_search_with_embedding_id_by_vector(embedding=query_embedding,k=fetch_k,filter=filter,))candidates:dict[str,list[float]]={}fordoc,embedding,doc_idinresults:ifdoc_idnotinretrieved_docs:retrieved_docs[doc_id]=docifdoc_idnotinoutgoing_links_map:node=_doc_to_node(doc)outgoing_links_map[doc_id]=_outgoing_links(node=node)candidates[doc_id]=embeddinghelper.add_candidates(candidates)ifinitial_roots:awaitfetch_neighborhood(initial_roots)iffetch_k>0:awaitfetch_initial_candidates()# Tracks the depth of each candidate.depths={candidate_id:0forcandidate_idinhelper.candidate_ids()}# Select the best item, K times.for_inrange(k):selected_id=helper.pop_best()ifselected_idisNone:breaknext_depth=depths[selected_id]+1ifnext_depth<depth:# If the next nodes would not exceed the depth limit, find the# adjacent nodes.# Find the links linked to from the selected ID.selected_outgoing_links=outgoing_links_map.pop(selected_id)# Don't re-visit already visited links.selected_outgoing_links.difference_update(visited_links)# Find the nodes with incoming links from those links.adjacent_nodes=awaitself._get_adjacent(links=selected_outgoing_links,query_embedding=query_embedding,k_per_link=adjacent_k,filter=filter,retrieved_docs=retrieved_docs,)# Record the selected_outgoing_links as visited.visited_links.update(selected_outgoing_links)new_candidates={}foradjacent_nodeinadjacent_nodes:ifadjacent_node.idnotinoutgoing_links_map:outgoing_links_map[adjacent_node.id]=_outgoing_links(node=adjacent_node)new_candidates[adjacent_node.id]=adjacent_node.embeddingifnext_depth<depths.get(adjacent_node.id,depth+1):# If this is a new shortest depth, or there was no# previous depth, update the depths. This ensures that# when we discover a node we will have the shortest# depth available.## NOTE: No effort is made to traverse from nodes that# were previously selected if they become reachable via# a shorter path via nodes selected later. This is# currently "intended", but may be worth experimenting# with.depths[adjacent_node.id]=next_depthhelper.add_candidates(new_candidates)fordoc_id,similarity_score,mmr_scoreinzip(helper.selected_ids,helper.selected_similarity_scores,helper.selected_mmr_scores,):ifdoc_idinretrieved_docs:doc=self._restore_links(retrieved_docs[doc_id])doc.metadata["similarity_score"]=similarity_scoredoc.metadata["mmr_score"]=mmr_scoreyielddocelse:msg=f"retrieved_docs should contain id: {doc_id}"raiseRuntimeError(msg)
[docs]@overridedefmmr_traversal_search(self,query:str,*,initial_roots:Sequence[str]=(),k:int=4,depth:int=2,fetch_k:int=100,adjacent_k:int=10,lambda_mult:float=0.5,score_threshold:float=float("-inf"),filter:dict[str,Any]|None=None,**kwargs:Any,)->Iterable[Document]:"""Retrieve documents from this graph store using MMR-traversal. This strategy first retrieves the top `fetch_k` results by similarity to the question. It then selects the top `k` results based on maximum-marginal relevance using the given `lambda_mult`. At each step, it considers the (remaining) documents from `fetch_k` as well as any documents connected by edges to a selected document retrieved based on similarity (a "root"). Args: query: The query string to search for. initial_roots: Optional list of document IDs to use for initializing search. The top `adjacent_k` nodes adjacent to each initial root will be included in the set of initial candidates. To fetch only in the neighborhood of these nodes, set `fetch_k = 0`. k: Number of Documents to return. Defaults to 4. fetch_k: Number of initial Documents to fetch via similarity. Will be added to the nodes adjacent to `initial_roots`. Defaults to 100. adjacent_k: Number of adjacent Documents to fetch. Defaults to 10. depth: Maximum depth of a node (number of edges) from a node retrieved via similarity. Defaults to 2. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. score_threshold: Only documents with a score greater than or equal this threshold will be chosen. Defaults to -infinity. filter: Optional metadata to filter the results. **kwargs: Additional keyword arguments. """asyncdefcollect_docs()->Iterable[Document]:async_iter=self.ammr_traversal_search(query=query,initial_roots=initial_roots,k=k,depth=depth,fetch_k=fetch_k,adjacent_k=adjacent_k,lambda_mult=lambda_mult,score_threshold=score_threshold,filter=filter,**kwargs,)return[docasyncfordocinasync_iter]returnasyncio.run(collect_docs())
[docs]@overrideasyncdefatraversal_search(# noqa: C901self,query:str,*,k:int=4,depth:int=1,filter:dict[str,Any]|None=None,**kwargs:Any,)->AsyncIterable[Document]:"""Retrieve documents from this knowledge store. First, `k` nodes are retrieved using a vector search for the `query` string. Then, additional nodes are discovered up to the given `depth` from those starting nodes. Args: query: The query string. k: The number of Documents to return from the initial vector search. Defaults to 4. depth: The maximum depth of edges to traverse. Defaults to 1. filter: Optional metadata to filter the results. **kwargs: Additional keyword arguments. Returns: Collection of retrieved documents. """# Depth 0:# Query for `k` nodes similar to the question.# Retrieve `content_id` and `outgoing_links()`.## Depth 1:# Query for nodes that have an incoming link in the `outgoing_links()` set.# Combine node IDs.# Query for `outgoing_links()` of those "new" node IDs.## ...# Map from visited ID to depthvisited_ids:dict[str,int]={}# Map from visited link to depthvisited_links:dict[Link,int]={}# Map from id to Documentretrieved_docs:dict[str,Document]={}asyncdefvisit_nodes(d:int,docs:Iterable[Document])->None:"""Recursively visit nodes and their outgoing links."""nonlocalvisited_ids,visited_links,retrieved_docs# Iterate over nodes, tracking the *new* outgoing links for this# depth. These are links that are either new, or newly discovered at a# lower depth.outgoing_links:set[Link]=set()fordocindocs:ifdoc.idisnotNone:ifdoc.idnotinretrieved_docs:retrieved_docs[doc.id]=doc# If this node is at a closer depth, update visited_idsifd<=visited_ids.get(doc.id,depth):visited_ids[doc.id]=d# If we can continue traversing from this node,ifd<depth:node=_doc_to_node(doc=doc)# Record any new (or newly discovered at a lower depth)# links to the set to traverse.forlinkin_outgoing_links(node=node):ifd<=visited_links.get(link,depth):# Record that we'll query this link at the# given depth, so we don't fetch it again# (unless we find it an earlier depth)visited_links[link]=doutgoing_links.add(link)ifoutgoing_links:metadata_search_tasks=[]foroutgoing_linkinoutgoing_links:metadata_filter=self._get_metadata_filter(metadata=filter,outgoing_link=outgoing_link,)metadata_search_tasks.append(asyncio.create_task(self.vector_store.ametadata_search(filter=metadata_filter,n=1000)))results=awaitasyncio.gather(*metadata_search_tasks)# Visit targets concurrentlyvisit_target_tasks=[visit_targets(d=d+1,docs=docs)fordocsinresults]awaitasyncio.gather(*visit_target_tasks)asyncdefvisit_targets(d:int,docs:Iterable[Document])->None:"""Visit target nodes retrieved from outgoing links."""nonlocalvisited_ids,retrieved_docsnew_ids_at_next_depth=set()fordocindocs:ifdoc.idisnotNone:ifdoc.idnotinretrieved_docs:retrieved_docs[doc.id]=docifd<=visited_ids.get(doc.id,depth):new_ids_at_next_depth.add(doc.id)ifnew_ids_at_next_depth:visit_node_tasks=[visit_nodes(d=d,docs=[retrieved_docs[doc_id]])fordoc_idinnew_ids_at_next_depthifdoc_idinretrieved_docs]fetch_tasks=[asyncio.create_task(self.vector_store.aget_by_document_id(document_id=doc_id))fordoc_idinnew_ids_at_next_depthifdoc_idnotinretrieved_docs]new_docs:list[Document|None]=awaitasyncio.gather(*fetch_tasks)visit_node_tasks.extend(visit_nodes(d=d,docs=[new_doc])fornew_docinnew_docsifnew_docisnotNone)awaitasyncio.gather(*visit_node_tasks)# Start the traversalinitial_docs=self.vector_store.similarity_search(query=query,k=k,filter=filter,)awaitvisit_nodes(d=0,docs=initial_docs)fordoc_idinvisited_ids:ifdoc_idinretrieved_docs:yieldself._restore_links(retrieved_docs[doc_id])else:msg=f"retrieved_docs should contain id: {doc_id}"raiseRuntimeError(msg)
[docs]@overridedeftraversal_search(self,query:str,*,k:int=4,depth:int=1,filter:dict[str,Any]|None=None,**kwargs:Any,)->Iterable[Document]:"""Retrieve documents from this knowledge store. First, `k` nodes are retrieved using a vector search for the `query` string. Then, additional nodes are discovered up to the given `depth` from those starting nodes. Args: query: The query string. k: The number of Documents to return from the initial vector search. Defaults to 4. depth: The maximum depth of edges to traverse. Defaults to 1. filter: Optional metadata to filter the results. **kwargs: Additional keyword arguments. Returns: Collection of retrieved documents. """asyncdefcollect_docs()->Iterable[Document]:async_iter=self.atraversal_search(query=query,k=k,depth=depth,filter=filter,**kwargs,)return[docasyncfordocinasync_iter]returnasyncio.run(collect_docs())
asyncdef_get_outgoing_links(self,source_ids:Iterable[str])->set[Link]:"""Return the set of outgoing links for the given source IDs asynchronously. Args: source_ids: The IDs of the source nodes to retrieve outgoing links for. Returns: A set of `Link` objects representing the outgoing links from the source nodes. """links=set()# Create coroutine objects without scheduling them yetcoroutines=[self.vector_store.aget_by_document_id(document_id=source_id)forsource_idinsource_ids]# Schedule and await all coroutinesdocs=awaitasyncio.gather(*coroutines)fordocindocs:ifdocisnotNone:node=_doc_to_node(doc=doc)links.update(_outgoing_links(node=node))returnlinksasyncdef_get_adjacent(self,links:set[Link],query_embedding:list[float],retrieved_docs:dict[str,Document],k_per_link:int|None=None,filter:dict[str,Any]|None=None,# noqa: A002)->Iterable[AdjacentNode]:"""Return the target nodes with incoming links from any of the given links. Args: links: The links to look for. query_embedding: The query embedding. Used to rank target nodes. retrieved_docs: A cache of retrieved docs. This will be added to. k_per_link: The number of target nodes to fetch for each link. filter: Optional metadata to filter the results. Returns: Iterable of adjacent edges. """targets:dict[str,AdjacentNode]={}tasks=[]forlinkinlinks:metadata_filter=self._get_metadata_filter(metadata=filter,outgoing_link=link,)tasks.append(self.vector_store.asimilarity_search_with_embedding_id_by_vector(embedding=query_embedding,k=k_per_linkor10,filter=metadata_filter,))results=awaitasyncio.gather(*tasks)forresultinresults:fordoc,embedding,doc_idinresult:ifdoc_idnotinretrieved_docs:retrieved_docs[doc_id]=docifdoc_idnotintargets:node=_doc_to_node(doc=doc)targets[doc_id]=AdjacentNode(node=node,embedding=embedding)# TODO: Consider a combined limit based on the similarity and/or# predicated MMR score?returntargets.values()@staticmethoddef_build_docs_from_texts(texts:List[str],metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,)->List[Document]:docs:List[Document]=[]fori,textinenumerate(texts):doc=Document(page_content=text,)ifmetadatasisnotNone:doc.metadata=metadatas[i]ifidsisnotNone:doc.id=ids[i]docs.append(doc)returndocs
[docs]@classmethoddeffrom_texts(cls:Type[CGVST],texts:List[str],embedding:Embeddings,metadatas:Optional[List[dict]]=None,*,session:Optional[Session]=None,keyspace:Optional[str]=None,table_name:str="",ids:Optional[List[str]]=None,ttl_seconds:Optional[int]=None,body_index_options:Optional[List[Tuple[str,Any]]]=None,metadata_deny_list:Optional[list[str]]=None,**kwargs:Any,)->CGVST:"""Create a CassandraGraphVectorStore from raw texts. Args: texts: Texts to add to the vectorstore. embedding: Embedding function to use. metadatas: Optional list of metadatas associated with the texts. session: Cassandra driver session. If not provided, it is resolved from cassio. keyspace: Cassandra key space. If not provided, it is resolved from cassio. table_name: Cassandra table (required). ids: Optional list of IDs associated with the texts. ttl_seconds: Optional time-to-live for the added texts. body_index_options: Optional options used to create the body index. Eg. body_index_options = [cassio.table.cql.STANDARD_ANALYZER] metadata_deny_list: Optional list of metadata keys to not index. i.e. to fine-tune which of the metadata fields are indexed. Note: if you plan to have massive unique text metadata entries, consider not indexing them for performance (and to overcome max-length limitations). Note: the `metadata_indexing` parameter from langchain_community.utilities.cassandra.Cassandra is not exposed since CassandraGraphVectorStore only supports the deny_list option. Returns: a CassandraGraphVectorStore. """docs=cls._build_docs_from_texts(texts=texts,metadatas=metadatas,ids=ids,)returncls.from_documents(documents=docs,embedding=embedding,session=session,keyspace=keyspace,table_name=table_name,ttl_seconds=ttl_seconds,body_index_options=body_index_options,metadata_deny_list=metadata_deny_list,**kwargs,)
[docs]@classmethodasyncdefafrom_texts(cls:Type[CGVST],texts:List[str],embedding:Embeddings,metadatas:Optional[List[dict]]=None,*,session:Optional[Session]=None,keyspace:Optional[str]=None,table_name:str="",ids:Optional[List[str]]=None,ttl_seconds:Optional[int]=None,body_index_options:Optional[List[Tuple[str,Any]]]=None,metadata_deny_list:Optional[list[str]]=None,**kwargs:Any,)->CGVST:"""Create a CassandraGraphVectorStore from raw texts. Args: texts: Texts to add to the vectorstore. embedding: Embedding function to use. metadatas: Optional list of metadatas associated with the texts. session: Cassandra driver session. If not provided, it is resolved from cassio. keyspace: Cassandra key space. If not provided, it is resolved from cassio. table_name: Cassandra table (required). ids: Optional list of IDs associated with the texts. ttl_seconds: Optional time-to-live for the added texts. body_index_options: Optional options used to create the body index. Eg. body_index_options = [cassio.table.cql.STANDARD_ANALYZER] metadata_deny_list: Optional list of metadata keys to not index. i.e. to fine-tune which of the metadata fields are indexed. Note: if you plan to have massive unique text metadata entries, consider not indexing them for performance (and to overcome max-length limitations). Note: the `metadata_indexing` parameter from langchain_community.utilities.cassandra.Cassandra is not exposed since CassandraGraphVectorStore only supports the deny_list option. Returns: a CassandraGraphVectorStore. """docs=cls._build_docs_from_texts(texts=texts,metadatas=metadatas,ids=ids,)returnawaitcls.afrom_documents(documents=docs,embedding=embedding,session=session,keyspace=keyspace,table_name=table_name,ttl_seconds=ttl_seconds,body_index_options=body_index_options,metadata_deny_list=metadata_deny_list,**kwargs,)
[docs]@classmethoddeffrom_documents(cls:Type[CGVST],documents:List[Document],embedding:Embeddings,*,session:Optional[Session]=None,keyspace:Optional[str]=None,table_name:str="",ids:Optional[List[str]]=None,ttl_seconds:Optional[int]=None,body_index_options:Optional[List[Tuple[str,Any]]]=None,metadata_deny_list:Optional[list[str]]=None,**kwargs:Any,)->CGVST:"""Create a CassandraGraphVectorStore from a document list. Args: documents: Documents to add to the vectorstore. embedding: Embedding function to use. session: Cassandra driver session. If not provided, it is resolved from cassio. keyspace: Cassandra key space. If not provided, it is resolved from cassio. table_name: Cassandra table (required). ids: Optional list of IDs associated with the documents. ttl_seconds: Optional time-to-live for the added documents. body_index_options: Optional options used to create the body index. Eg. body_index_options = [cassio.table.cql.STANDARD_ANALYZER] metadata_deny_list: Optional list of metadata keys to not index. i.e. to fine-tune which of the metadata fields are indexed. Note: if you plan to have massive unique text metadata entries, consider not indexing them for performance (and to overcome max-length limitations). Note: the `metadata_indexing` parameter from langchain_community.utilities.cassandra.Cassandra is not exposed since CassandraGraphVectorStore only supports the deny_list option. Returns: a CassandraGraphVectorStore. """store=cls(embedding=embedding,session=session,keyspace=keyspace,table_name=table_name,ttl_seconds=ttl_seconds,body_index_options=body_index_options,metadata_deny_list=metadata_deny_list,**kwargs,)store.add_documents(documents=cls._add_ids_to_docs(docs=documents,ids=ids))returnstore
[docs]@classmethodasyncdefafrom_documents(cls:Type[CGVST],documents:List[Document],embedding:Embeddings,*,session:Optional[Session]=None,keyspace:Optional[str]=None,table_name:str="",ids:Optional[List[str]]=None,ttl_seconds:Optional[int]=None,body_index_options:Optional[List[Tuple[str,Any]]]=None,metadata_deny_list:Optional[list[str]]=None,**kwargs:Any,)->CGVST:"""Create a CassandraGraphVectorStore from a document list. Args: documents: Documents to add to the vectorstore. embedding: Embedding function to use. session: Cassandra driver session. If not provided, it is resolved from cassio. keyspace: Cassandra key space. If not provided, it is resolved from cassio. table_name: Cassandra table (required). ids: Optional list of IDs associated with the documents. ttl_seconds: Optional time-to-live for the added documents. body_index_options: Optional options used to create the body index. Eg. body_index_options = [cassio.table.cql.STANDARD_ANALYZER] metadata_deny_list: Optional list of metadata keys to not index. i.e. to fine-tune which of the metadata fields are indexed. Note: if you plan to have massive unique text metadata entries, consider not indexing them for performance (and to overcome max-length limitations). Note: the `metadata_indexing` parameter from langchain_community.utilities.cassandra.Cassandra is not exposed since CassandraGraphVectorStore only supports the deny_list option. Returns: a CassandraGraphVectorStore. """store=cls(embedding=embedding,session=session,keyspace=keyspace,table_name=table_name,ttl_seconds=ttl_seconds,setup_mode=SetupMode.ASYNC,body_index_options=body_index_options,metadata_deny_list=metadata_deny_list,**kwargs,)awaitstore.aadd_documents(documents=cls._add_ids_to_docs(docs=documents,ids=ids))returnstore