[docs]classPineconeSparseVectorStore(PineconeVectorStore):"""Pinecone sparse vector store integration. This class extends PineconeVectorStore to support sparse vector representations. It requires a Pinecone sparse index and PineconeSparseEmbeddings. Setup: ```python # Install required packages pip install langchain-pinecone pinecone-client ``` Key init args - indexing params: text_key (str): The metadata key where the document text will be stored. namespace (str): Pinecone namespace to use. distance_strategy (DistanceStrategy): Strategy for computing distances. Key init args - client params: index (pinecone.Index): A Pinecone sparse index. embedding (PineconeSparseEmbeddings): A sparse embeddings model. pinecone_api_key (str): The Pinecone API key. index_name (str): The name of the Pinecone index. See full list of supported init args and their descriptions in the params section. Instantiate: ```python from pinecone import Pinecone from langchain_pinecone import PineconeSparseVectorStore from langchain_pinecone.embeddings import PineconeSparseEmbeddings # Initialize Pinecone client pc = Pinecone(api_key="your-api-key") # Get your sparse index index = pc.Index("your-sparse-index-name") # Initialize embedding function embeddings = PineconeSparseEmbeddings() # Create vector store vectorstore = PineconeSparseVectorStore( index=index, embedding=embeddings, text_key="content", namespace="my-namespace" ) ``` Add Documents: ```python from langchain_core.documents import Document docs = [ Document(page_content="This is a sparse vector example"), Document(page_content="Another document for testing") ] # Option 1: Add from Document objects vectorstore.add_documents(docs) # Option 2: Add from texts texts = ["Text 1", "Text 2"] metadatas = [{"source": "source1"}, {"source": "source2"}] vectorstore.add_texts(texts, metadatas=metadatas) ``` Update Documents: Update documents by re-adding them with the same IDs. ```python ids = ["id1", "id2"] texts = ["Updated text 1", "Updated text 2"] metadatas = [{"source": "updated_source1"}, {"source": "updated_source2"}] vectorstore.add_texts(texts, metadatas=metadatas, ids=ids) ``` Delete Documents: ```python # Delete by IDs vectorstore.delete(ids=["id1", "id2"]) # Delete by filter vectorstore.delete(filter={"source": "source1"}) # Delete all documents in a namespace vectorstore.delete(delete_all=True, namespace="my-namespace") ``` Search: ```python # Search for similar documents docs = vectorstore.similarity_search("query text", k=5) # Search with filters docs = vectorstore.similarity_search( "query text", k=5, filter={"source": "source1"} ) # Maximal marginal relevance search for diversity docs = vectorstore.max_marginal_relevance_search( "query text", k=5, fetch_k=20, lambda_mult=0.5 ) ``` Search with score: ```python # Search with relevance scores docs_and_scores = vectorstore.similarity_search_with_score( "query text", k=5 ) for doc, score in docs_and_scores: print(f"Score: {score}, Document: {doc.page_content}") ``` Use as Retriever: ```python # Create a retriever retriever = vectorstore.as_retriever() # Customize retriever retriever = vectorstore.as_retriever( search_type="mmr", search_kwargs={"k": 5, "fetch_k": 20, "lambda_mult": 0.5}, filter={"source": "source1"} ) # Use the retriever docs = retriever.get_relevant_documents("query text") ``` """
[docs]def__init__(self,index:Optional[Any]=None,embedding:Optional[PineconeSparseEmbeddings]=None,text_key:Optional[str]="text",namespace:Optional[str]=None,distance_strategy:Optional[DistanceStrategy]=DistanceStrategy.COSINE,*,pinecone_api_key:Optional[str]=None,index_name:Optional[str]=None,):ifindexandindex.describe_index_stats()["vector_type"]!="sparse":raiseValueError("PineconeSparseVectorStore can only be used with Sparse Indexes")super().__init__(index,embedding,text_key,namespace,distance_strategy,pinecone_api_key=pinecone_api_key,index_name=index_name,)
@propertydefembeddings(self)->PineconeSparseEmbeddings:ifnotself._embedding:raiseValueError("Must provide a PineconeSparseEmbeddings to the PineconeSparseVectorStore")ifnotisinstance(self._embedding,PineconeSparseEmbeddings):raiseValueError("PineconeSparseVectorStore can only be used with PineconeSparseEmbeddings")returnself._embedding
[docs]defadd_texts(self,texts:Iterable[str],metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,namespace:Optional[str]=None,batch_size:int=32,embedding_chunk_size:int=1000,*,id_prefix:Optional[str]=None,**kwargs:Any,)->List[str]:ifnamespaceisNone:namespace=self._namespacetexts=list(texts)ids=idsor[str(uuid.uuid4())for_intexts]ifid_prefix:ids=[id_prefix+"#"+idifid_prefix+"#"notinidelseidforidinids]metadatas=metadatasor[{}for_intexts]formetadata,textinzip(metadatas,texts):metadata[self._text_key]=text# For loops to avoid memory issues and optimize when using HTTP based embeddings# The first loop runs the embeddings, it benefits when using OpenAI embeddingsforiinrange(0,len(texts),embedding_chunk_size):chunk_texts=texts[i:i+embedding_chunk_size]chunk_ids=ids[i:i+embedding_chunk_size]chunk_metadatas=metadatas[i:i+embedding_chunk_size]embeddings=self.embeddings.embed_documents(chunk_texts)vectors=[Vector(id=chunk_id,sparse_values=value,metadata=metadata)for(chunk_id,value,metadata)inzip(chunk_ids,embeddings,chunk_metadatas)]self.index.upsert(vectors=vectors,namespace=namespace,**kwargs,)returnids
[docs]asyncdefaadd_texts(self,texts:Iterable[str],metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,namespace:Optional[str]=None,batch_size:int=32,embedding_chunk_size:int=1000,*,id_prefix:Optional[str]=None,**kwargs:Any,)->list[str]:"""Asynchronously run more texts through the embeddings and add to the vectorstore. Upsert optimization is done by chunking the embeddings and upserting them. This is done to avoid memory issues and optimize using HTTP based embeddings. For OpenAI embeddings, use pool_threads>4 when constructing the pinecone.Index, embedding_chunk_size>1000 and batch_size~64 for best performance. Args: texts: Iterable of strings to add to the vectorstore. metadatas: Optional list of metadatas associated with the texts. ids: Optional list of ids to associate with the texts. namespace: Optional pinecone namespace to add the texts to. batch_size: Batch size to use when adding the texts to the vectorstore. embedding_chunk_size: Chunk size to use when embedding the texts. id_prefix: Optional string to use as an ID prefix when upserting vectors. Returns: List of ids from adding the texts into the vectorstore. """ifnamespaceisNone:namespace=self._namespacetexts=list(texts)ids=idsor[str(uuid.uuid4())for_intexts]ifid_prefix:ids=[id_prefix+"#"+idifid_prefix+"#"notinidelseidforidinids]metadatas=metadatasor[{}for_intexts]formetadata,textinzip(metadatas,texts):metadata[self._text_key]=text# For loops to avoid memory issues and optimize when using HTTP based embeddingsforiinrange(0,len(texts),embedding_chunk_size):chunk_texts=texts[i:i+embedding_chunk_size]chunk_ids=ids[i:i+embedding_chunk_size]chunk_metadatas=metadatas[i:i+embedding_chunk_size]embeddings=awaitself.embeddings.aembed_documents(chunk_texts)vector_tuples=zip(chunk_ids,embeddings,chunk_metadatas)asyncwithself.async_indexasidx:# Split into batches and upsert asynchronouslytasks=[]forbatch_vector_tuplesinbatch_iterate(batch_size,vector_tuples):task=idx.upsert(vectors=[Vector(id=chunk_id,sparse_values=sparse_values,metadata=metadata,)forchunk_id,sparse_values,metadatainbatch_vector_tuples],namespace=namespace,**kwargs,)tasks.append(task)# Wait for all upserts to completeawaitasyncio.gather(*tasks)returnids
[docs]defsimilarity_search_with_score(self,query:str,k:int=4,filter:Optional[dict]=None,namespace:Optional[str]=None,)->List[Tuple[Document,float]]:"""Return pinecone documents most similar to query, along with scores. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Dictionary of argument(s) to filter on metadata namespace: Namespace to search in. Default will search in '' namespace. Returns: List of Documents most similar to the query and score for each """returnself.similarity_search_by_vector_with_score(self.embeddings.embed_query(query),k=k,filter=filter,namespace=namespace)
[docs]asyncdefasimilarity_search_with_score(self,query:str,k:int=4,filter:Optional[dict]=None,namespace:Optional[str]=None,)->list[tuple[Document,float]]:"""Asynchronously return pinecone documents most similar to query, along with scores. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Dictionary of argument(s) to filter on metadata namespace: Namespace to search in. Default will search in '' namespace. Returns: List of Documents most similar to the query and score for each """returnawaitself.asimilarity_search_by_vector_with_score((awaitself.embeddings.aembed_query(query)),k=k,filter=filter,namespace=namespace,)
[docs]defsimilarity_search_by_vector_with_score(self,embedding:SparseValues,*,k:int=4,filter:Optional[dict]=None,namespace:Optional[str]=None,)->List[Tuple[Document,float]]:"""Return pinecone documents most similar to embedding, along with scores."""ifnamespaceisNone:namespace=self._namespacedocs=[]results=self.index.query(sparse_vector=embedding,top_k=k,include_metadata=True,namespace=namespace,filter=filter,)forresinresults["matches"]:metadata=res["metadata"]id=res.get("id")ifself._text_keyinmetadata:text=metadata.pop(self._text_key)score=res["score"]docs.append((Document(id=id,page_content=text,metadata=metadata),score))else:logger.warning(f"Found document with no `{self._text_key}` key. Skipping.")returndocs
[docs]asyncdefasimilarity_search_by_vector_with_score(self,embedding:SparseValues,*,k:int=4,filter:Optional[dict]=None,namespace:Optional[str]=None,)->List[Tuple[Document,float]]:"""Return pinecone documents most similar to embedding, along with scores asynchronously."""ifnamespaceisNone:namespace=self._namespacedocs=[]asyncwithself.async_indexasidx:results=awaitidx.query(sparse_vector=embedding,top_k=k,include_metadata=True,namespace=namespace,filter=filter,)forresinresults["matches"]:metadata=res["metadata"]id=res.get("id")ifself._text_keyinmetadata:text=metadata.pop(self._text_key)score=res["score"]docs.append((Document(id=id,page_content=text,metadata=metadata),score))else:logger.warning(f"Found document with no `{self._text_key}` key. Skipping.")returndocs
[docs]defsimilarity_search(self,query:str,k:int=4,filter:Optional[dict]=None,namespace:Optional[str]=None,**kwargs:Any,)->List[Document]:"""Return pinecone documents most similar to query. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. filter: Dictionary of argument(s) to filter on metadata namespace: Namespace to search in. Default will search in '' namespace. Returns: List of Documents most similar to the query and score for each """docs_and_scores=self.similarity_search_with_score(query,k=k,filter=filter,namespace=namespace,**kwargs)return[docfordoc,_indocs_and_scores]
[docs]defmax_marginal_relevance_search_by_vector(self,embedding:SparseValues,k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:Optional[dict]=None,namespace:Optional[str]=None,**kwargs:Any,)->List[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. filter: Dictionary of argument(s) to filter on metadata namespace: Namespace to search in. Default will search in '' namespace. Returns: List of Documents selected by maximal marginal relevance. """ifnamespaceisNone:namespace=self._namespaceresults=self.index.query(sparse_vector=embedding,top_k=fetch_k,include_values=True,include_metadata=True,namespace=namespace,filter=filter,)mmr_selected=sparse_maximal_marginal_relevance(query_embedding=embedding,embedding_list=[SparseValues.from_dict(item["sparse_values"])foriteminresults["matches"]# type: ignore],k=k,lambda_mult=lambda_mult,)selected=[results["matches"][i]["metadata"]foriinmmr_selected]return[Document(page_content=metadata.pop((self._text_key)),metadata=metadata)formetadatainselected]
[docs]asyncdefamax_marginal_relevance_search_by_vector(self,embedding:SparseValues,k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:Optional[dict]=None,namespace:Optional[str]=None,**kwargs:Any,)->List[Document]:"""Return docs selected using the maximal marginal relevance asynchronously. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. filter: Dictionary of argument(s) to filter on metadata namespace: Namespace to search in. Default will search in '' namespace. Returns: List of Documents selected by maximal marginal relevance. """ifnamespaceisNone:namespace=self._namespaceasyncwithself.async_indexasidx:results=awaitidx.query(sparse_vector=embedding,top_k=fetch_k,include_values=True,include_metadata=True,namespace=namespace,filter=filter,)mmr_selected=sparse_maximal_marginal_relevance(query_embedding=embedding,embedding_list=[SparseValues.from_dict(item["sparse_values"])foriteminresults["matches"]# type: ignore],k=k,lambda_mult=lambda_mult,)selected=[results["matches"][i]["metadata"]foriinmmr_selected]return[Document(page_content=metadata.pop(self._text_key),metadata=metadata)formetadatainselected]
[docs]defmax_marginal_relevance_search(self,query:str,k:int=4,fetch_k:int=20,lambda_mult:float=0.5,filter:Optional[dict]=None,namespace:Optional[str]=None,**kwargs:Any,)->List[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. fetch_k: Number of Documents to fetch to pass to MMR algorithm. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. filter: Dictionary of argument(s) to filter on metadata namespace: Namespace to search in. Default will search in '' namespace. Returns: List of Documents selected by maximal marginal relevance. """embedding=self.embeddings.embed_query(query)returnself.max_marginal_relevance_search_by_vector(embedding,k,fetch_k,lambda_mult,filter,namespace)
[docs]defdelete(self,ids:Optional[List[str]]=None,delete_all:Optional[bool]=None,namespace:Optional[str]=None,filter:Optional[dict]=None,**kwargs:Any,)->None:"""Delete by vector IDs or filter. Args: ids: List of ids to delete. delete_all: Whether delete all vectors in the index. filter: Dictionary of conditions to filter vectors to delete. namespace: Namespace to search in. Default will search in '' namespace. """ifnamespaceisNone:namespace=self._namespaceifdelete_all:self.index.delete(delete_all=True,namespace=namespace,**kwargs)elifidsisnotNone:chunk_size=1000foriinrange(0,len(ids),chunk_size):chunk=ids[i:i+chunk_size]self.index.delete(ids=chunk,namespace=namespace,**kwargs)eliffilterisnotNone:self.index.delete(filter=filter,namespace=namespace,**kwargs)else:raiseValueError("Either ids, delete_all, or filter must be provided.")returnNone
[docs]asyncdefadelete(self,ids:Optional[List[str]]=None,delete_all:Optional[bool]=None,namespace:Optional[str]=None,filter:Optional[dict]=None,**kwargs:Any,)->None:ifnamespaceisNone:namespace=self._namespaceifdelete_all:asyncwithself.async_indexasidx:awaitidx.delete(delete_all=True,namespace=namespace,**kwargs)elifidsisnotNone:chunk_size=1000asyncwithself.async_indexasidx:tasks=[]foriinrange(0,len(ids),chunk_size):chunk=ids[i:i+chunk_size]tasks.append(idx.delete(ids=chunk,namespace=namespace,**kwargs))awaitasyncio.gather(*tasks)eliffilterisnotNone:asyncwithself.async_indexasidx:awaitidx.delete(filter=filter,namespace=namespace,**kwargs)else:raiseValueError("Either ids, delete_all, or filter must be provided.")returnNone