Source code for langchain_community.chains.pebblo_retrieval.enforcement_filters
"""Identity & Semantic Enforcement filters for PebbloRetrievalQA chain:This module contains methods for applying Identity and Semantic Enforcement filtersin the PebbloRetrievalQA chain.These filters are used to control the retrieval of documents based on authorization andsemantic context.The Identity Enforcement filter ensures that only authorized identities can accesscertain documents, while the Semantic Enforcement filter controls document retrievalbased on semantic context.The methods in this module are designed to work with different types of vector stores."""importloggingfromtypingimportAny,List,Optional,Unionfromlangchain_core.vectorstoresimportVectorStoreRetrieverfromlangchain_community.chains.pebblo_retrieval.modelsimport(AuthContext,SemanticContext,)logger=logging.getLogger(__name__)PINECONE="Pinecone"QDRANT="Qdrant"PGVECTOR="PGVector"SUPPORTED_VECTORSTORES={PINECONE,QDRANT,PGVECTOR}
[docs]defclear_enforcement_filters(retriever:VectorStoreRetriever)->None:""" Clear the identity and semantic enforcement filters in the retriever search_kwargs. """ifretriever.vectorstore.__class__.__name__==PGVECTOR:search_kwargs=retriever.search_kwargsif"filter"insearch_kwargs:filters=search_kwargs["filter"]_pgvector_clear_pebblo_filters(search_kwargs,filters,"authorized_identities")_pgvector_clear_pebblo_filters(search_kwargs,filters,"pebblo_semantic_topics")_pgvector_clear_pebblo_filters(search_kwargs,filters,"pebblo_semantic_entities")
[docs]defset_enforcement_filters(retriever:VectorStoreRetriever,auth_context:Optional[AuthContext],semantic_context:Optional[SemanticContext],)->None:""" Set identity and semantic enforcement filters in the retriever. """# Clear existing enforcement filtersclear_enforcement_filters(retriever)ifauth_contextisnotNone:_set_identity_enforcement_filter(retriever,auth_context)ifsemantic_contextisnotNone:_set_semantic_enforcement_filter(retriever,semantic_context)
def_apply_qdrant_semantic_filter(search_kwargs:dict,semantic_context:Optional[SemanticContext])->None:""" Set semantic enforcement filter in search_kwargs for Qdrant vectorstore. """try:fromqdrant_client.httpimportmodelsasrestexceptImportErrorase:raiseValueError("Could not import `qdrant-client.http` python package. ""Please install it with `pip install qdrant-client`.")frome# Create a semantic enforcement filter conditionsemantic_filters:List[Union[rest.FieldCondition,rest.IsEmptyCondition,rest.IsNullCondition,rest.HasIdCondition,rest.NestedCondition,rest.Filter,]]=[]if(semantic_contextisnotNoneandsemantic_context.pebblo_semantic_topicsisnotNone):semantic_topics_filter=rest.FieldCondition(key="metadata.pebblo_semantic_topics",match=rest.MatchAny(any=semantic_context.pebblo_semantic_topics.deny),)semantic_filters.append(semantic_topics_filter)if(semantic_contextisnotNoneandsemantic_context.pebblo_semantic_entitiesisnotNone):semantic_entities_filter=rest.FieldCondition(key="metadata.pebblo_semantic_entities",match=rest.MatchAny(any=semantic_context.pebblo_semantic_entities.deny),)semantic_filters.append(semantic_entities_filter)# If 'filter' already exists in search_kwargsif"filter"insearch_kwargs:existing_filter:rest.Filter=search_kwargs["filter"]# Check if existing_filter is a qdrant-client filterifisinstance(existing_filter,rest.Filter):# If 'must_not' condition exists in the existing filterifisinstance(existing_filter.must_not,list):# Warn if 'pebblo_semantic_topics' or 'pebblo_semantic_entities'# filter is overriddennew_must_not_conditions:List[Union[rest.FieldCondition,rest.IsEmptyCondition,rest.IsNullCondition,rest.HasIdCondition,rest.NestedCondition,rest.Filter,]]=[]# Drop semantic filter conditions if already presentforconditioninexisting_filter.must_not:ifhasattr(condition,"key"):ifcondition.key=="metadata.pebblo_semantic_topics":continueifcondition.key=="metadata.pebblo_semantic_entities":continuenew_must_not_conditions.append(condition)# Add semantic enforcement filters to 'must_not' conditionsexisting_filter.must_not=new_must_not_conditionsexisting_filter.must_not.extend(semantic_filters)else:# Set 'must_not' condition with semantic enforcement filtersexisting_filter.must_not=semantic_filterselse:raiseTypeError("Using dict as a `filter` is deprecated. ""Please use qdrant-client filters directly: ""https://qdrant.tech/documentation/concepts/filtering/")else:# If 'filter' does not exist in search_kwargs, create itsearch_kwargs["filter"]=rest.Filter(must_not=semantic_filters)def_apply_qdrant_authorization_filter(search_kwargs:dict,auth_context:Optional[AuthContext])->None:""" Set identity enforcement filter in search_kwargs for Qdrant vectorstore. """try:fromqdrant_client.httpimportmodelsasrestexceptImportErrorase:raiseValueError("Could not import `qdrant-client.http` python package. ""Please install it with `pip install qdrant-client`.")fromeifauth_contextisnotNone:# Create a identity enforcement filter conditionidentity_enforcement_filter=rest.FieldCondition(key="metadata.authorized_identities",match=rest.MatchAny(any=auth_context.user_auth),)else:return# If 'filter' already exists in search_kwargsif"filter"insearch_kwargs:existing_filter:rest.Filter=search_kwargs["filter"]# Check if existing_filter is a qdrant-client filterifisinstance(existing_filter,rest.Filter):# If 'must' exists in the existing filterifexisting_filter.must:new_must_conditions:List[Union[rest.FieldCondition,rest.IsEmptyCondition,rest.IsNullCondition,rest.HasIdCondition,rest.NestedCondition,rest.Filter,]]=[]# Drop 'authorized_identities' filter condition if already presentforconditioninexisting_filter.must:if(hasattr(condition,"key")andcondition.key=="metadata.authorized_identities"):continuenew_must_conditions.append(condition)# Add identity enforcement filter to 'must' conditionsexisting_filter.must=new_must_conditionsexisting_filter.must.append(identity_enforcement_filter)else:# Set 'must' condition with identity enforcement filterexisting_filter.must=[identity_enforcement_filter]else:raiseTypeError("Using dict as a `filter` is deprecated. ""Please use qdrant-client filters directly: ""https://qdrant.tech/documentation/concepts/filtering/")else:# If 'filter' does not exist in search_kwargs, create itsearch_kwargs["filter"]=rest.Filter(must=[identity_enforcement_filter])def_apply_pinecone_semantic_filter(search_kwargs:dict,semantic_context:Optional[SemanticContext])->None:""" Set semantic enforcement filter in search_kwargs for Pinecone vectorstore. """# Check if semantic_context is providedsemantic_context=semantic_contextifsemantic_contextisnotNone:ifsemantic_context.pebblo_semantic_topicsisnotNone:# Add pebblo_semantic_topics filter to search_kwargssearch_kwargs.setdefault("filter",{})["pebblo_semantic_topics"]={"$nin":semantic_context.pebblo_semantic_topics.deny}ifsemantic_context.pebblo_semantic_entitiesisnotNone:# Add pebblo_semantic_entities filter to search_kwargssearch_kwargs.setdefault("filter",{})["pebblo_semantic_entities"]={"$nin":semantic_context.pebblo_semantic_entities.deny}def_apply_pinecone_authorization_filter(search_kwargs:dict,auth_context:Optional[AuthContext])->None:""" Set identity enforcement filter in search_kwargs for Pinecone vectorstore. """ifauth_contextisnotNone:search_kwargs.setdefault("filter",{})["authorized_identities"]={"$in":auth_context.user_auth}def_apply_pgvector_filter(search_kwargs:dict,filters:Optional[Any],pebblo_filter:dict)->None:""" Apply pebblo filters in the search_kwargs filters. """ifisinstance(filters,dict):iflen(filters)==1:# The only operators allowed at the top level are $and, $or, and $not# First check if an operator or a fieldkey,value=list(filters.items())[0]ifkey.startswith("$"):# Then it's an operatorifkey.lower()notin["$and","$or","$not"]:raiseValueError(f"Invalid filter condition. Expected $and, $or or $not "f"but got: {key}")ifnotisinstance(value,list):raiseValueError(f"Expected a list, but got {type(value)} for value: {value}")# Here we handle the $and, $or, and $not operators(Semantic filters)ifkey.lower()=="$and":# Add pebblo_filter to the $and list as it isvalue.append(pebblo_filter)elifkey.lower()=="$not":# Check if pebblo_filter is an operator or a field_key,_value=list(pebblo_filter.items())[0]if_key.startswith("$"):# Then it's a operatorif_key.lower()=="$not":# It's Semantic filter, add it's value to filtersvalue.append(_value)logger.warning("Adding $not operator to the existing $not operator")returnelse:# Only $not operator is supported in pebblo_filterraiseValueError(f"Invalid filter key. Expected '$not' but got: {_key}")else:# Then it's a field(Auth filter), move filters into $andsearch_kwargs["filter"]={"$and":[filters,pebblo_filter]}returnelifkey.lower()=="$or":search_kwargs["filter"]={"$and":[filters,pebblo_filter]}else:# Then it's a field and we can check pebblo_filter now# Check if pebblo_filter is an operator or a field_key,_=list(pebblo_filter.items())[0]if_key.startswith("$"):# Then it's a operatorif_key.lower()=="$not":# It's a $not operator(Semantic filter), move filters into $andsearch_kwargs["filter"]={"$and":[filters,pebblo_filter]}returnelse:# Only $not operator is allowed in pebblo_filterraiseValueError(f"Invalid filter key. Expected '$not' but got: {_key}")else:# Then it's a field(This handles Auth filter)filters.update(pebblo_filter)returneliflen(filters)>1:# Then all keys have to be fields (they cannot be operators)forkeyinfilters.keys():ifkey.startswith("$"):raiseValueError(f"Invalid filter condition. Expected a field but got: {key}")# filters should all be fields and we can check pebblo_filter now# Check if pebblo_filter is an operator or a field_key,_=list(pebblo_filter.items())[0]if_key.startswith("$"):# Then it's a operatorif_key.lower()=="$not":# It's a $not operator(Semantic filter), move filters into '$and'search_kwargs["filter"]={"$and":[filters,pebblo_filter]}returnelse:# Only $not operator is supported in pebblo_filterraiseValueError(f"Invalid filter key. Expected '$not' but got: {_key}")else:# Then it's a field(This handles Auth filter)filters.update(pebblo_filter)returnelse:# Got an empty dictionary for filters, set pebblo_filter in filtersearch_kwargs.setdefault("filter",{}).update(pebblo_filter)eliffiltersisNone:# If filters is None, set pebblo_filter as a new filtersearch_kwargs.setdefault("filter",{}).update(pebblo_filter)else:raiseValueError(f"Invalid filter. Expected a dictionary/None but got type: {type(filters)}")def_pgvector_clear_pebblo_filters(search_kwargs:dict,filters:dict,pebblo_filter_key:str)->None:""" Remove pebblo filters from the search_kwargs filters. """ifisinstance(filters,dict):iflen(filters)==1:# The only operators allowed at the top level are $and, $or, and $not# First check if an operator or a fieldkey,value=list(filters.items())[0]ifkey.startswith("$"):# Then it's an operator# Validate the operator's key and value typeifkey.lower()notin["$and","$or","$not"]:raiseValueError(f"Invalid filter condition. Expected $and, $or or $not "f"but got: {key}")elifnotisinstance(value,list):raiseValueError(f"Expected a list, but got {type(value)} for value: {value}")# Here we handle the $and, $or, and $not operatorsifkey.lower()=="$and":# Remove the pebblo filter from the $and listfori,_filterinenumerate(value):ifpebblo_filter_keyin_filter:# This handles Auth filtervalue.pop(i)break# Check for $not operator with Semantic filterif"$not"in_filter:sem_filter_found=False# This handles Semantic filterforj,nested_filterinenumerate(_filter["$not"]):ifpebblo_filter_keyinnested_filter:iflen(_filter["$not"])==1:# If only one filter is left,# then remove the $not operatorvalue.pop(i)else:value[i]["$not"].pop(j)sem_filter_found=Truebreakifsem_filter_found:breakiflen(value)==1:# If only one filter is left, then remove the $and operatorsearch_kwargs["filter"]=value[0]elifkey.lower()=="$not":# Remove the pebblo filter from the $not listfori,_filterinenumerate(value):ifpebblo_filter_keyin_filter:# This removes Semantic filtervalue.pop(i)breakiflen(value)==0:# If no filter is left, then unset the filtersearch_kwargs["filter"]={}elifkey.lower()=="$or":# If $or, pebblo filter will not be presentreturnelse:# Then it's a field, check if it's a pebblo filterifkey==pebblo_filter_key:filters.pop(key)returneliflen(filters)>1:# Then all keys have to be fields (they cannot be operators)ifpebblo_filter_keyinfilters:# This handles Auth filterfilters.pop(pebblo_filter_key)returnelse:# Got an empty dictionary for filters, ignore the filterreturneliffiltersisNone:# If filters is None, ignore the filterreturnelse:raiseValueError(f"Invalid filter. Expected a dictionary/None but got type: {type(filters)}")def_apply_pgvector_semantic_filter(search_kwargs:dict,semantic_context:Optional[SemanticContext])->None:""" Set semantic enforcement filter in search_kwargs for PGVector vectorstore. """# Check if semantic_context is providedifsemantic_contextisnotNone:_semantic_filters=[]filters=search_kwargs.get("filter")ifsemantic_context.pebblo_semantic_topicsisnotNone:# Add pebblo_semantic_topics filter to search_kwargstopic_filter:dict={"pebblo_semantic_topics":{"$eq":semantic_context.pebblo_semantic_topics.deny}}_semantic_filters.append(topic_filter)ifsemantic_context.pebblo_semantic_entitiesisnotNone:# Add pebblo_semantic_entities filter to search_kwargsentity_filter:dict={"pebblo_semantic_entities":{"$eq":semantic_context.pebblo_semantic_entities.deny}}_semantic_filters.append(entity_filter)iflen(_semantic_filters)>0:semantic_filter:dict={"$not":_semantic_filters}_apply_pgvector_filter(search_kwargs,filters,semantic_filter)def_apply_pgvector_authorization_filter(search_kwargs:dict,auth_context:Optional[AuthContext])->None:""" Set identity enforcement filter in search_kwargs for PGVector vectorstore. """ifauth_contextisnotNone:auth_filter:dict={"authorized_identities":{"$eq":auth_context.user_auth}}filters=search_kwargs.get("filter")_apply_pgvector_filter(search_kwargs,filters,auth_filter)def_set_identity_enforcement_filter(retriever:VectorStoreRetriever,auth_context:Optional[AuthContext])->None:""" Set identity enforcement filter in search_kwargs. This method sets the identity enforcement filter in the search_kwargs of the retriever based on the type of the vectorstore. """search_kwargs=retriever.search_kwargsifretriever.vectorstore.__class__.__name__==PINECONE:_apply_pinecone_authorization_filter(search_kwargs,auth_context)elifretriever.vectorstore.__class__.__name__==QDRANT:_apply_qdrant_authorization_filter(search_kwargs,auth_context)elifretriever.vectorstore.__class__.__name__==PGVECTOR:_apply_pgvector_authorization_filter(search_kwargs,auth_context)def_set_semantic_enforcement_filter(retriever:VectorStoreRetriever,semantic_context:Optional[SemanticContext])->None:""" Set semantic enforcement filter in search_kwargs. This method sets the semantic enforcement filter in the search_kwargs of the retriever based on the type of the vectorstore. """search_kwargs=retriever.search_kwargsifretriever.vectorstore.__class__.__name__==PINECONE:_apply_pinecone_semantic_filter(search_kwargs,semantic_context)elifretriever.vectorstore.__class__.__name__==QDRANT:_apply_qdrant_semantic_filter(search_kwargs,semantic_context)elifretriever.vectorstore.__class__.__name__==PGVECTOR:_apply_pgvector_semantic_filter(search_kwargs,semantic_context)