"""This is the SQL Server module.This module provides the SQLServer_VectorStore class for managingvectorstores in SQL Server."""from__future__importannotationsimportjsonimportloggingimportreimportstructimportuuidfromenumimportEnumfromtypingimport(Any,Callable,Dict,Iterable,List,MutableMapping,Optional,Sequence,Tuple,Type,Union,)fromurllib.parseimporturlparseimportnumpyasnpimportsqlalchemyfromazure.identityimportDefaultAzureCredentialfromlangchain_core.documentsimportDocumentfromlangchain_core.embeddingsimportEmbeddingsfromlangchain_core.vectorstoresimportVectorStorefromlangchain_core.vectorstores.utilsimportmaximal_marginal_relevancefromsqlalchemyimport(Column,ColumnElement,Dialect,Index,Numeric,PrimaryKeyConstraint,SQLColumnExpression,Uuid,asc,bindparam,cast,create_engine,event,func,insert,label,select,text,)fromsqlalchemy.dialects.mssqlimportJSON,NVARCHAR,VARCHARfromsqlalchemy.dialects.mssql.baseimportMSTypeCompilerfromsqlalchemy.engineimportURL,Connection,Enginefromsqlalchemy.excimportDBAPIError,ProgrammingErrorfromsqlalchemy.ext.compilerimportcompilesfromsqlalchemy.ormimportSession,declarative_basefromsqlalchemy.poolimportConnectionPoolEntryfromsqlalchemy.sqlimportoperatorsfromsqlalchemy.typesimportUserDefinedTypeCOMPARISONS_TO_NATIVE:Dict[str,Callable[[ColumnElement,object],ColumnElement]]={"$eq":operators.eq,"$ne":operators.ne,}NUMERIC_OPERATORS:Dict[str,Callable[[ColumnElement,object],ColumnElement]]={"$lt":operators.lt,"$lte":operators.le,"$gt":operators.gt,"$gte":operators.ge,}SPECIAL_CASED_OPERATORS={"$in","$nin","$like",}BETWEEN_OPERATOR={"$between"}LOGICAL_OPERATORS={"$and","$or"}SUPPORTED_OPERATORS=(set(COMPARISONS_TO_NATIVE).union(NUMERIC_OPERATORS).union(SPECIAL_CASED_OPERATORS).union(BETWEEN_OPERATOR).union(LOGICAL_OPERATORS))
[docs]classDistanceStrategy(str,Enum):"""Distance Strategy class for SQLServer_VectorStore. Enumerator of the distance strategies for calculating distances between vectors. """EUCLIDEAN="euclidean"COSINE="cosine"DOT="dot"
[docs]classVectorType(UserDefinedType):"""VectorType - A custom type definition."""cache_ok=True
[docs]def__init__(self,length:int)->None:"""__init__ for VectorType class."""self.length=length
[docs]defget_col_spec(self,**kw:Any)->str:"""get_col_spec function for VectorType class."""return"vector(%s)"%self.length
[docs]defbind_processor(self,dialect:Any)->Any:"""bind_processor function for VectorType class."""defprocess(value:Any)->Any:returnvaluereturnprocess
[docs]defresult_processor(self,dialect:Any,coltype:Any)->Any:"""result_processor function for VectorType class."""defprocess(value:Any)->Any:returnvaluereturnprocess
# String Constants#AZURE_TOKEN_URL="https://database.windows.net/.default"# Token URL for Azure DBs.DISTANCE="distance"DEFAULT_DISTANCE_STRATEGY=DistanceStrategy.COSINEDEFAULT_TABLE_NAME="sqlserver_vectorstore"DISTANCE_STRATEGY="distancestrategy"EMBEDDING="embedding"EMBEDDING_LENGTH="embedding_length"EMBEDDING_VALUES="embeddingvalues"EMPTY_IDS_ERROR_MESSAGE="Empty list of ids provided"EXTRA_PARAMS=";Trusted_Connection=Yes"INVALID_IDS_ERROR_MESSAGE="Invalid list of ids provided"INVALID_INPUT_ERROR_MESSAGE="Input is not valid."INVALID_FILTER_INPUT_EXPECTED_DICT="""Invalid filter condition. Expected a dictionarybut got an empty dictionary"""INVALID_FILTER_INPUT_EXPECTED_AND_OR="""Invalid filter condition.Expected $and or $or but got: {}"""SQL_COPT_SS_ACCESS_TOKEN=1256# Connection option defined by microsoft in msodbcsql.hDEFAULT_BATCH_SIZE=100MAX_BATCH_SIZE=419# Query Constants#JSON_TO_VECTOR_QUERY=f"cast (:{EMBEDDING_VALUES} as vector(:{EMBEDDING_LENGTH}))"SERVER_JSON_CHECK_QUERY="select name from sys.types where system_type_id = 244"VECTOR_DISTANCE_QUERY=f"""VECTOR_DISTANCE(:{DISTANCE_STRATEGY},cast (:{EMBEDDING} as vector(:{EMBEDDING_LENGTH})), embeddings)"""
[docs]classSQLServer_VectorStore(VectorStore):"""SQL Server Vector Store. This class provides a vector store interface for adding texts and performing similarity searches on the texts in SQL Server. """
[docs]def__init__(self,*,connection:Optional[Connection]=None,connection_string:str,db_schema:Optional[str]=None,distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,embedding_function:Embeddings,embedding_length:int,relevance_score_fn:Optional[Callable[[float],float]]=None,table_name:str=DEFAULT_TABLE_NAME,batch_size:int=DEFAULT_BATCH_SIZE,)->None:"""Initialize the SQL Server vector store. Args: connection: Optional SQLServer connection. connection_string: SQLServer connection string. If the connection string does not contain a username & password or `TrustedConnection=yes`, Entra ID authentication is used. SQL Server ODBC connection string can be retrieved from the `Connection strings` pane of the database in Azure portal. Sample connection string format: - "Driver=<drivername>;Server=<servername>;Database=<dbname>; Uid=<username>;Pwd=<password>;TrustServerCertificate=no;" - "mssql+pyodbc://username:password@servername/dbname?other_params" db_schema: The schema in which the vector store will be created. This schema must exist and the user must have permissions to the schema. distance_strategy: The distance strategy to use for comparing embeddings. Default value is COSINE. Available options are: - COSINE - DOT - EUCLIDEAN embedding_function: Any embedding function implementing `langchain.embeddings.base.Embeddings` interface. embedding_length: The length (dimension) of the vectors to be stored in the table. Note that only vectors of same size can be added to the vector store. relevance_score_fn: Relevance score funtion to be used. Optional param, defaults to None. table_name: The name of the table to use for storing embeddings. Default value is `sqlserver_vectorstore`. batch_size: Number of documents/texts to be inserted at once to Db, max 419. """batch_size=self._validate_batch_size(batch_size)self.connection_string=self._get_connection_url(connection_string)self._distance_strategy=distance_strategyself.embedding_function=embedding_functionself._embedding_length=embedding_lengthself.schema=db_schemaself.override_relevance_score_fn=relevance_score_fnself.table_name=table_nameself._batch_size=batch_sizeself._bind:Union[Connection,Engine]=(connectionifconnectionelseself._create_engine())self._prepare_json_data_type()self._embedding_store=self._get_embedding_store(self.table_name,self.schema)self._create_table_if_not_exists()
def_validate_batch_size(self,batch_size:int)->int:ifbatch_size<=0orbatch_size>MAX_BATCH_SIZE:logging.error("The request contains an invalid batch_size.")raiseValueError(f"""The request contains an invalid batch_size {batch_size}. The server supports a maximum batch_size of {MAX_BATCH_SIZE}. Please reduce the batch_size and resend the request.""")elifbatch_sizeisNone:returnDEFAULT_BATCH_SIZEelse:returnbatch_sizedef_get_connection_url(self,conn_string:str)->str:ifconn_stringisNoneorlen(conn_string)==0:logging.error("Connection string value is None or empty.")raiseValueError("Connection string value cannot be None.")ifconn_string.startswith("mssql+pyodbc"):# Connection string is in a format that we can parse.#returnconn_stringtry:args=conn_string.split(";")arg_dict={}forarginargs:if"="inarg:# Split into key value pairs by the first positioned `=` found.# Key-Value pairs are inserted into the dictionary.#key,value=arg.split("=",1)arg_dict[key.lower().strip()]=value.strip()# This will throw a key error if server or database keyword# is not present in arg_dict from the connection string.#database=arg_dict.pop("database")# If `server` is present in the dictionary, we split by# `,` to obtain host and port details.#server=arg_dict.pop("server").split(",",1)server_host=server[0]server_port=None# Server details in SQLServer connection string from Azure portal# might be of the form `Server=tcp:servername`. In scenarios like this,# we remove the first part (tcp:) because `urlparse` function invoked in# `_can_connect_with_entra_id` expects an IP address when it sees `tcp:`# We can remove this without fear of a failure because it is omittable in# the connection string value.#if":"inserver_host:server_host=server_host.split(":",1)[1]# Check if port is provided in server details,if true,# cast value to int if possible.#iflen(server)>1andserver[1].isdigit():server_port=int(server[1])# Args needed to be checked#username=arg_dict.pop("uid",None)password=arg_dict.pop("pwd",None)if"driver"inarg_dict.keys():# Extract driver value from curly braces if present.driver=re.search(r"\{([^}]*)\}",arg_dict["driver"])ifdriverisnotNone:arg_dict["driver"]=driver.group(1)# Create connection URL for SQLAlchemy#url=URL.create("mssql+pyodbc",username=username,password=password,database=database,host=server_host,port=server_port,query=arg_dict,)exceptKeyErrorask:logging.error(f"Server, DB details were not provided in the connection string.\n{k}")raiseException("Server, DB details should be provided in connection string.")exceptExceptionase:logging.error(f"An error has occurred.\n{e.__cause__}")raise# Return string version of the URL and ensure password# passed in is not obfuscated.#returnurl.render_as_string(hide_password=False)def_can_connect_with_entra_id(self)->bool:"""Determine if Entra ID authentication can be used. Check the components of the connection string to determine if connection via Entra ID authentication is possible or not. The connection string is of expected to be of the form: "mssql+pyodbc://username:password@servername/dbname?other_params" which gets parsed into -> <scheme>://<netloc>/<path>?<query> """parsed_url=urlparse(self.connection_string)ifparsed_urlisNone:logging.error("Unable to parse connection string.")returnFalseinvalid_keywords=["trusted_connection=yes","trustedconnection=yes","authentication","integrated security",]if(parsed_url.usernameorparsed_url.passwordorany(keywordinparsed_url.query.lower()forkeywordininvalid_keywords)):returnFalsereturnTruedef_create_engine(self)->Engine:ifself._can_connect_with_entra_id():# Use Entra ID auth. Listen for a connection event# when `_create_engine` function from this class is called.#event.listen(Engine,"do_connect",self._provide_token,once=True)logging.info("Using Entra ID Authentication.")returncreate_engine(url=self.connection_string)def_create_table_if_not_exists(self)->None:logging.info(f"Creating table {self.table_name}.")try:withSession(self._bind)assession:self._embedding_store.__table__.create(session.get_bind(),checkfirst=True)session.commit()exceptProgrammingErrorase:logging.error(f"Create table {self.table_name} failed.")raiseException(e.__cause__)fromNonedef_get_embedding_store(self,name:str,schema:Optional[str])->Any:DynamicBase=declarative_base(class_registry=dict())# type: Anyifself._embedding_lengthisNoneorself._embedding_length<1:raiseValueError("`embedding_length` value is not valid.")classEmbeddingStore(DynamicBase):"""This is the base model for SQL vector store."""__tablename__=name__table_args__=(PrimaryKeyConstraint("id",mssql_clustered=False),Index("idx_custom_id","custom_id",mssql_clustered=False,unique=True),{"schema":schema},)id=Column(Uuid,primary_key=True,default=uuid.uuid4)custom_id=Column(VARCHAR(1000),nullable=True)# column for user defined ids.content_metadata=Column(JSON,nullable=True)content=Column(NVARCHAR,nullable=False)# defaults to NVARCHAR(MAX)embeddings=Column(VectorType(self._embedding_length),nullable=False)returnEmbeddingStoredef_prepare_json_data_type(self)->None:"""Prepare for JSON data type usage. Check if the server has the JSON data type available. If it does, we compile JSON data type as JSON instead of NVARCHAR(max) used by sqlalchemy. If it doesn't, this defaults to NVARCHAR(max) as specified by sqlalchemy. """try:withSession(self._bind)assession:result=session.scalar(text(SERVER_JSON_CHECK_QUERY))session.close()ifresultisnotNone:@compiles(JSON,"mssql")defcompile_json(element:JSON,compiler:MSTypeCompiler,**kw:Any)->str:# return JSON when JSON data type is specified in this class.returnresult# json data type name in sql serverexceptProgrammingErrorase:logging.error(f"Unable to get data types.\n{e.__cause__}\n")@propertydefembeddings(self)->Embeddings:"""`embeddings` property for SQLServer_VectorStore class."""returnself.embedding_function@propertydefdistance_strategy(self)->str:"""distance_strategy property for SQLServer_VectorStore class."""# Value of distance strategy passed in should be one of the supported values.ifisinstance(self._distance_strategy,DistanceStrategy):returnself._distance_strategy.value# Match string value with appropriate enum value, if supported.distance_strategy_lower=str.lower(self._distance_strategy)ifdistance_strategy_lower==DistanceStrategy.EUCLIDEAN.value:returnDistanceStrategy.EUCLIDEAN.valueelifdistance_strategy_lower==DistanceStrategy.COSINE.value:returnDistanceStrategy.COSINE.valueelifdistance_strategy_lower==DistanceStrategy.DOT.value:returnDistanceStrategy.DOT.valueelse:raiseValueError(f"{self._distance_strategy} is not supported.")@distance_strategy.setterdefdistance_strategy(self,value:DistanceStrategy)->None:self._distance_strategy=value@propertydefbatch_size(self)->int:"""`batch_size` property for SQLServer_VectorStore class."""returnself._batch_size
[docs]@classmethoddeffrom_texts(cls:Type[SQLServer_VectorStore],texts:List[str],embedding:Embeddings,metadatas:Optional[List[dict]]=None,connection_string:str=str(),embedding_length:int=0,table_name:str=DEFAULT_TABLE_NAME,db_schema:Optional[str]=None,distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,ids:Optional[List[str]]=None,batch_size:int=DEFAULT_BATCH_SIZE,**kwargs:Any,)->SQLServer_VectorStore:"""Create a SQL Server vectorStore initialized from texts and embeddings. Args: texts: Iterable of strings to add into the vectorstore. embedding: Any embedding function implementing `langchain.embeddings.base.Embeddings` interface. metadatas: Optional list of metadatas (python dicts) associated with the input texts. connection_string: SQLServer connection string. If the connection string does not contain a username & password or `TrustedConnection=yes`, Entra ID authentication is used. SQL Server ODBC connection string can be retrieved from the `Connection strings` pane of the database in Azure portal. Sample connection string format: - "Driver=<drivername>;Server=<servername>;Database=<dbname>; Uid=<username>;Pwd=<password>;TrustServerCertificate=no;" - "mssql+pyodbc://username:password@servername/dbname?other_params" embedding_length: The length (dimension) of the vectors to be stored in the table. Note that only vectors of same size can be added to the vector store. table_name: The name of the table to use for storing embeddings. db_schema: The schema in which the vector store will be created. This schema must exist and the user must have permissions to the schema. distance_strategy: The distance strategy to use for comparing embeddings. Default value is COSINE. Available options are: - COSINE - DOT - EUCLIDEAN ids: Optional list of IDs for the input texts. batch_size: Number of texts to be inserted at once to Db, max MAX_BATCH_SIZE. **kwargs: vectorstore specific parameters. Returns: SQLServer_VectorStore: A SQL Server vectorstore. """store=cls(connection_string=connection_string,db_schema=db_schema,distance_strategy=distance_strategy,embedding_function=embedding,embedding_length=embedding_length,table_name=table_name,batch_size=batch_size,**kwargs,)store.add_texts(texts,metadatas,ids,**kwargs)returnstore
[docs]@classmethoddeffrom_documents(cls:Type[SQLServer_VectorStore],documents:List[Document],embedding:Embeddings,connection_string:str=str(),embedding_length:int=0,table_name:str=DEFAULT_TABLE_NAME,db_schema:Optional[str]=None,distance_strategy:DistanceStrategy=DEFAULT_DISTANCE_STRATEGY,ids:Optional[List[str]]=None,batch_size:int=DEFAULT_BATCH_SIZE,**kwargs:Any,)->SQLServer_VectorStore:"""Create a SQL Server vectorStore initialized from texts and embeddings. Args: documents: Documents to add to the vectorstore. embedding: Any embedding function implementing `langchain.embeddings.base.Embeddings` interface. connection_string: SQLServer connection string. If the connection string does not contain a username & password or `TrustedConnection=yes`, Entra ID authentication is used. SQL Server ODBC connection string can be retrieved from the `Connection strings` pane of the database in Azure portal. Sample connection string format: - "Driver=<drivername>;Server=<servername>;Database=<dbname>; Uid=<username>;Pwd=<password>;TrustServerCertificate=no;" - "mssql+pyodbc://username:password@servername/dbname?other_params" embedding_length: The length (dimension) of the vectors to be stored in the table. Note that only vectors of same size can be added to the vector store. table_name: The name of the table to use for storing embeddings. Default value is `sqlserver_vectorstore`. db_schema: The schema in which the vector store will be created. This schema must exist and the user must have permissions to the schema. distance_strategy: The distance strategy to use for comparing embeddings. Default value is COSINE. Available options are: - COSINE - DOT - EUCLIDEAN ids: Optional list of IDs for the input texts. batch_size: Number of documents to be inserted at once to Db, max MAX_BATCH_SIZE. **kwargs: vectorstore specific parameters. Returns: SQLServer_VectorStore: A SQL Server vectorstore. """texts,metadatas=[],[]fordocindocuments:ifnotisinstance(doc,Document):raiseValueError(f"Expected an entry of type Document, but got {type(doc)}")texts.append(doc.page_content)metadatas.append(doc.metadata)store=cls(connection_string=connection_string,db_schema=db_schema,distance_strategy=distance_strategy,embedding_function=embedding,embedding_length=embedding_length,table_name=table_name,batch_size=batch_size,**kwargs,)store.add_texts(texts,metadatas,ids,**kwargs)returnstore
[docs]defget_by_ids(self,ids:Sequence[str],/)->List[Document]:"""Get documents by their IDs from the vectorstore. Args: ids: List of IDs to retrieve. Returns: List of Documents """documents=[]ifidsisNoneorlen(ids)==0:logging.info(EMPTY_IDS_ERROR_MESSAGE)else:result=self._get_documents_by_ids(ids)foriteminresult:ifitemisnotNone:documents.append(Document(id=item.custom_id,page_content=item.content,metadata=item.content_metadata,))returndocuments
def_get_documents_by_ids(self,ids:Sequence[str],/)->Sequence[Any]:result:Sequence[Any]=[]try:withSession(bind=self._bind)assession:statement=select(self._embedding_store.custom_id,self._embedding_store.content,self._embedding_store.content_metadata,).where(self._embedding_store.custom_id.in_(ids))result=session.execute(statement).fetchall()exceptDBAPIErrorase:logging.error(e.__cause__)returnresultdef_select_relevance_score_fn(self)->Callable[[float],float]:"""Determine relevance score function. The 'correct' relevance function may differ depending on a few things, including: - the distance / similarity metric used by the VectorStore - the scale of your embeddings (OpenAI's are unit normed. Many others are not!) - embedding dimensionality - etc. If no relevance function is provided in the class constructor, selection is based on the distance strategy provided. """ifself.override_relevance_score_fnisnotNone:returnself.override_relevance_score_fn# If the relevance score function is not provided, we default to using# the distance strategy specified by the user.ifself._distance_strategy==DistanceStrategy.COSINE:returnself._cosine_relevance_score_fnelifself._distance_strategy==DistanceStrategy.DOT:returnself._max_inner_product_relevance_score_fnelifself._distance_strategy==DistanceStrategy.EUCLIDEAN:returnself._euclidean_relevance_score_fnelse:raiseValueError("There is no supported normalization function for"f" {self._distance_strategy} distance strategy.""Consider providing relevance_score_fn to ""SQLServer_VectorStore construction.")
[docs]defmax_marginal_relevance_search(self,query:str,k:int=4,fetch_k:int=20,lambda_mult:float=0.5,**kwargs:Any,)->List[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: query: Text to look up documents similar to. k: Number of Documents to return. Defaults to 4. fetch_k: Number of Documents to fetch to pass to MMR algorithm. Default is 20. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. **kwargs: Arguments to pass to the search method. Returns: List of Documents selected by maximal marginal relevance. """embedded_query=self.embedding_function.embed_query(query)returnself.max_marginal_relevance_search_by_vector(embedded_query,k=k,fetch_k=fetch_k,lambda_mult=lambda_mult,**kwargs)
[docs]defmax_marginal_relevance_search_by_vector(self,embedding:list[float],k:int=4,fetch_k:int=20,lambda_mult:float=0.5,**kwargs:Any,)->List[Document]:"""Return docs selected using the maximal marginal relevance. Maximal marginal relevance optimizes for similarity to query AND diversity among selected documents. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. fetch_k: Number of Documents to fetch to pass to MMR algorithm. Default is 20. lambda_mult: Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5. **kwargs: Arguments to pass to the search method. Returns: List of Documents selected by maximal marginal relevance. """results=self._search_store(embedding,k=fetch_k,marginal_relevance=True,**kwargs)embedding_list=[json.loads(result[0])forresultinresults]mmr_selects=maximal_marginal_relevance(np.array(embedding,dtype=np.float32),embedding_list,lambda_mult=lambda_mult,k=k,)results_as_docs=self._docs_from_result(self._docs_and_scores_from_result(results))# Return list of Documents from results_as_docs whose position# corresponds to the indices in mmr_selects.return[valueforidx,valueinenumerate(results_as_docs)ifidxinmmr_selects]
[docs]defsimilarity_search(self,query:str,k:int=4,**kwargs:Any)->List[Document]:"""Return docs most similar to given query. Args: query: Text to look up the most similar embedding to. k: Number of Documents to return. Defaults to 4. **kwargs: Values for filtering on metadata during similarity search. Returns: List of Documents most similar to the query provided. """embedded_query=self.embedding_function.embed_query(query)returnself.similarity_search_by_vector(embedded_query,k,**kwargs)
[docs]defsimilarity_search_by_vector(self,embedding:List[float],k:int=4,**kwargs:Any)->List[Document]:"""Return docs most similar to the embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. **kwargs: Values for filtering on metadata during similarity search. Returns: List of Documents most similar to the embedding provided. """similar_docs_with_scores=self.similarity_search_by_vector_with_score(embedding,k,**kwargs)returnself._docs_from_result(similar_docs_with_scores)
[docs]defsimilarity_search_with_score(self,query:str,k:int=4,**kwargs:Any)->List[Tuple[Document,float]]:"""Similarity search with score. Run similarity search with distance and return docs most similar to the embedding vector. Args: query: Text to look up the most similar embedding to. k: Number of Documents to return. Defaults to 4. **kwargs: Values for filtering on metadata during similarity search. Returns: List of tuple of Document and an accompanying score in order of similarity to the query provided. Note that, a smaller score implies greater similarity. """embedded_query=self.embedding_function.embed_query(query)returnself.similarity_search_by_vector_with_score(embedded_query,k,**kwargs)
[docs]defsimilarity_search_by_vector_with_score(self,embedding:List[float],k:int=4,**kwargs:Any)->List[Tuple[Document,float]]:"""Similarity search by vector with score. Run similarity search with distance, given an embedding and return docs most similar to the embedding vector. Args: embedding: Embedding to look up documents similar to. k: Number of Documents to return. Defaults to 4. **kwargs: Values for filtering on metadata during similarity search. Returns: List of tuple of Document and an accompanying score in order of similarity to the embedding provided. Note that, a smaller score implies greater similarity. """similar_docs=self._search_store(embedding,k,**kwargs)docs_and_scores=self._docs_and_scores_from_result(similar_docs)returndocs_and_scores
[docs]defadd_texts(self,texts:Iterable[str],metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,**kwargs:Any,)->List[str]:"""`add_texts` function for SQLServer_VectorStore class. Compute the embeddings for the input texts and store embeddings in the vectorstore. Args: texts: Iterable of strings to add into the vectorstore. metadatas: List of metadatas (python dicts) associated with the input texts. ids: List of IDs for the input texts. **kwargs: vectorstore specific parameters. Returns: List of IDs generated from adding the texts into the vectorstore. """iftextsisNone:return[]# Initialize a list to store results from each batchembedded_texts=[]# Loop through the list of texts and process in batchestexts=list(texts)# Validate batch_size again to confirm if it is still valid.batch_size=self._validate_batch_size(self._batch_size)foriinrange(0,len(texts),batch_size):batch=texts[i:i+batch_size]batch_ids=ids[i:i+batch_size]ifidsisnotNoneelseNonebatch_metadatas=(metadatas[i:i+batch_size]ifmetadatasisnotNoneelseNone)batch_result=self.embedding_function.embed_documents(list(batch))embeddings=self._insert_embeddings(batch,batch_result,batch_metadatas,batch_ids)embedded_texts.extend(embeddings)returnembedded_texts
[docs]defdrop(self)->None:"""Drops every table created during initialization of vector store."""logging.info(f"Dropping vector store: {self.table_name}")try:withSession(bind=self._bind)assession:# Drop the table associated with the session bind.self._embedding_store.__table__.drop(session.get_bind())session.commit()logging.info(f"Vector store `{self.table_name}` dropped successfully.")exceptProgrammingErrorase:logging.error(f"Unable to drop vector store.\n{e.__cause__}.")
def_search_store(self,embedding:List[float],k:int,filter:Optional[dict]=None,marginal_relevance:Optional[bool]=False,)->List[Any]:try:withSession(self._bind)assession:filter_by=[]filter_clauses=self._create_filter_clause(filter)iffilter_clausesisnotNone:filter_by.append(filter_clauses)subquery=label(DISTANCE,text(VECTOR_DISTANCE_QUERY).bindparams(bindparam(DISTANCE_STRATEGY,self.distance_strategy,literal_execute=True,),bindparam(EMBEDDING,json.dumps(embedding),literal_execute=True,),bindparam(EMBEDDING_LENGTH,self._embedding_length,literal_execute=True,),),)# Results for marginal relevance includes additional# column for embeddings.ifmarginal_relevance:query=(select(text("cast (embeddings as NVARCHAR(MAX))"),subquery,self._embedding_store,).filter(*filter_by).order_by(asc(text(DISTANCE))).limit(k))results=list(session.execute(query).fetchall())else:results=(session.query(self._embedding_store,subquery,).filter(*filter_by).order_by(asc(text(DISTANCE))).limit(k).all())exceptProgrammingErrorase:logging.error(f"An error has occurred during the search.\n{e.__cause__}")raiseException(e.__cause__)fromNonereturnresultsdef_create_filter_clause(self,filters:Any)->Any:"""Create a filter clause. Convert LangChain Information Retrieval filter representation to matching SQLAlchemy clauses. At the top level, we still don't know if we're working with a field or an operator for the keys. After we've determined that we can call the appropriate logic to handle filter creation. Args: filters: Dictionary of filters to apply to the query. Returns: SQLAlchemy clause to apply to the query. Ex: For a filter, {"$or": [{"id": 1}, {"name": "bob"}]}, the result is JSON_VALUE(langchain_vector_store_tests.content_metadata, :JSON_VALUE_1) = :JSON_VALUE_2 OR JSON_VALUE(langchain_vector_store_tests.content_metadata, :JSON_VALUE_3) = :JSON_VALUE_4 """iffiltersisnotNone:ifnotisinstance(filters,dict):raiseValueError(f"Expected a dict, but got {type(filters)} for value: {filter}")iflen(filters)==1:# The only operators allowed at the top level are $AND and $OR# First check if an operator or a fieldkey,value=list(filters.items())[0]ifkey.startswith("$"):# Then it's an operatorifkey.lower()notinLOGICAL_OPERATORS:raiseValueError(INVALID_FILTER_INPUT_EXPECTED_AND_OR.format(key))else:# Then it's a fieldreturnself._handle_field_filter(key,filters[key])# Here we handle the $and and $or operatorsifnotisinstance(value,list):raiseValueError(f"Expected a list, but got {type(value)} for value: {value}")ifkey.lower()=="$and":and_=[self._create_filter_clause(el)forelinvalue]iflen(and_)>1:returnsqlalchemy.and_(*and_)eliflen(and_)==1:returnand_[0]else:raiseValueError(INVALID_FILTER_INPUT_EXPECTED_DICT)elifkey.lower()=="$or":or_=[self._create_filter_clause(el)forelinvalue]iflen(or_)>1:returnsqlalchemy.or_(*or_)eliflen(or_)==1:returnor_[0]else:raiseValueError(INVALID_FILTER_INPUT_EXPECTED_DICT)eliflen(filters)>1:# Then all keys have to be fields (they cannot be operators)forkeyinfilters.keys():ifkey.startswith("$"):raiseValueError(f"Invalid filter condition. Expected a field but got: {key}")# These should all be fields and combined using an $and operatorand_=[self._handle_field_filter(k,v)fork,vinfilters.items()]iflen(and_)>1:returnsqlalchemy.and_(*and_)eliflen(and_)==1:returnand_[0]else:raiseValueError(INVALID_FILTER_INPUT_EXPECTED_DICT)else:raiseValueError("Got an empty dictionary for filters.")else:logging.info("No filters are passed, returning")returnNonedef_handle_field_filter(self,field:str,value:Any,)->SQLColumnExpression:"""Create a filter for a specific field. Args: field: name of field value: value to filter If provided as is then this will be an equality filter If provided as a dictionary then this will be a filter, the key will be the operator and the value will be the value to filter by Returns: sqlalchemy expression Ex: For a filter, {"id": 1}, the result is JSON_VALUE(langchain_vector_store_tests.content_metadata, :JSON_VALUE_1) = :JSON_VALUE_2 """iffield.startswith("$"):raiseValueError(f"Invalid filter condition. Expected a field but got an operator: "f"{field}")# Allow [a-zA-Z0-9_], disallow $ for now until we support escape charactersifnotfield.isidentifier():raiseValueError(f"Invalid field name: {field}. Expected a valid identifier.")ifisinstance(value,dict):# This is a filter specification that only 1 filter will be for a given# field, if multiple filters they are mentioned separately and used with# an AND on the top if nothing is specifiediflen(value)!=1:raiseValueError("Invalid filter condition. Expected a value which ""is a dictionary with a single key that corresponds to an operator "f"but got a dictionary with {len(value)} keys. The first few "f"keys are: {list(value.keys())[:3]}")operator,filter_value=list(value.items())[0]# Verify that operator is an operatorifoperatornotinSUPPORTED_OPERATORS:raiseValueError(f"Invalid operator: {operator}. "f"Expected one of {SUPPORTED_OPERATORS}")else:# Then we assume an equality operatoroperator="$eq"filter_value=valueifoperatorinCOMPARISONS_TO_NATIVE:operation=COMPARISONS_TO_NATIVE[operator]native_result=func.JSON_VALUE(self._embedding_store.content_metadata,f"$.{field}")native_operation_result=operation(native_result,str(filter_value))returnnative_operation_resultelifoperatorinNUMERIC_OPERATORS:operation=NUMERIC_OPERATORS[str(operator)]numeric_result=func.JSON_VALUE(self._embedding_store.content_metadata,f"$.{field}")numeric_operation_result=operation(numeric_result,filter_value)ifnotisinstance(filter_value,str):numeric_operation_result=operation(cast(numeric_result,Numeric(10,2)),filter_value)returnnumeric_operation_resultelifoperatorinBETWEEN_OPERATOR:# Use AND with two comparisonslow,high=filter_value# Assuming lower_bound_value is a ColumnElementcolumn_value=func.JSON_VALUE(self._embedding_store.content_metadata,f"$.{field}")greater_operation=NUMERIC_OPERATORS["$gte"]lesser_operation=NUMERIC_OPERATORS["$lte"]lower_bound=greater_operation(column_value,low)upper_bound=lesser_operation(column_value,high)# Conditionally cast if filter_value is not a stringifnotisinstance(filter_value,str):lower_bound=greater_operation(cast(column_value,Numeric(10,2)),low)upper_bound=lesser_operation(cast(column_value,Numeric(10,2)),high)returnsqlalchemy.and_(lower_bound,upper_bound)elifoperatorinSPECIAL_CASED_OPERATORS:# We'll do force coercion to textifoperatorin{"$in","$nin"}:forvalinfilter_value:ifnotisinstance(val,(str,int,float)):raiseNotImplementedError(f"Unsupported type: {type(val)} for value: {val}")queried_field=func.JSON_VALUE(self._embedding_store.content_metadata,f"$.{field}")ifoperatorin{"$in"}:returnqueried_field.in_([str(val)forvalinfilter_value])elifoperatorin{"$nin"}:returnqueried_field.nin_([str(val)forvalinfilter_value])elifoperatorin{"$like"}:returnqueried_field.like(str(filter_value))else:raiseNotImplementedError(f"Operator is not implemented: {operator}. ")else:raiseNotImplementedError()def_docs_from_result(self,results:Any)->List[Document]:"""Formats the input into a result of type List[Document]."""docs=[docfordoc,_inresultsifdocisnotNone]returndocsdef_docs_and_scores_from_result(self,results:List[Any])->List[Tuple[Document,float]]:"""Formats the input into a result of type Tuple[Document, float]. If an invalid input is given, it does not attempt to format the value and instead logs an error. """docs_and_scores=[]forresultinresults:if(resultisnotNoneandresult.EmbeddingStoreisnotNoneandresult.distanceisnotNone):docs_and_scores.append((Document(page_content=result.EmbeddingStore.content,metadata=result.EmbeddingStore.content_metadata,),result.distance,))else:logging.error(INVALID_INPUT_ERROR_MESSAGE)returndocs_and_scoresdef_insert_embeddings(self,texts:Iterable[str],embeddings:List[List[float]],metadatas:Optional[List[dict]]=None,ids:Optional[List[str]]=None,**kwargs:Any,)->List[str]:"""Insert the embeddings and the texts in the vectorstore. Args: texts: Iterable of strings to add into the vectorstore. embeddings: List of list of embeddings. metadatas: List of metadatas (python dicts) associated with the input texts. ids: List of IDs for the input texts. **kwargs: vectorstore specific parameters. Returns: List of IDs generated from adding the texts into the vectorstore. """ifmetadatasisNone:metadatas=[{}for_intexts]try:ifidsisNone:# Get IDs from metadata if available.ids=[metadata.get("id",uuid.uuid4())formetadatainmetadatas]withSession(self._bind)assession:documents=[]foridx,queryinenumerate(texts):# For a query, if there is no corresponding ID,# we generate a uuid and add it to the list of IDs to be returned.ifidx<len(ids):custom_id=ids[idx]else:ids.append(str(uuid.uuid4()))custom_id=ids[-1]embedding=embeddings[idx]metadata=metadatas[idx]ifidx<len(metadatas)else{}# Construct text, embedding, metadata as EmbeddingStore model# to be inserted into the table.sqlquery=select(text(JSON_TO_VECTOR_QUERY).bindparams(bindparam(EMBEDDING_VALUES,json.dumps(embedding),literal_execute=True,# when unique is set to true, the name of the key# for each bindparameter is made unique, to avoid# using the wrong bound parameter during compile.# This is especially needed since we're creating# and storing multiple queries to be bulk inserted# later on.unique=True,),bindparam(EMBEDDING_LENGTH,self._embedding_length,literal_execute=True,),))# `embedding_store` is created in a dictionary format instead# of using the embedding_store object from this class.# This enables the use of `insert().values()` which can only# take a dict and not a custom object.embedding_store={"custom_id":custom_id,"content_metadata":metadata,"content":query,"embeddings":sqlquery,}documents.append(embedding_store)session.execute(insert(self._embedding_store).values(documents))session.commit()exceptDBAPIErrorase:logging.error(f"Add text failed:\n{e.__cause__}\n")raiseException(e.__cause__)fromNoneexceptAttributeError:logging.error("Metadata must be a list of dictionaries.")raisereturnids
[docs]defdelete(self,ids:Optional[List[str]]=None,**kwargs:Any)->Optional[bool]:"""Delete embeddings in the vectorstore by the ids. Args: ids: List of IDs to delete. If None, delete all. Default is None. No data is deleted if empty list is provided. kwargs: vectorstore specific parameters. Returns: Optional[bool] """ifidsisnotNoneandlen(ids)==0:logging.info(EMPTY_IDS_ERROR_MESSAGE)returnFalseresult=self._delete_texts_by_ids(ids)ifresult==0:logging.info(INVALID_IDS_ERROR_MESSAGE)returnFalselogging.info(f"{result} rows affected.")returnTrue
def_delete_texts_by_ids(self,ids:Optional[List[str]]=None)->int:try:withSession(bind=self._bind)assession:ifidsisNone:logging.info("Deleting all data in the vectorstore.")result=session.query(self._embedding_store).delete()else:result=(session.query(self._embedding_store).filter(self._embedding_store.custom_id.in_(ids)).delete())session.commit()exceptDBAPIErrorase:logging.error(e.__cause__)returnresultdef_provide_token(self,dialect:Dialect,conn_rec:Optional[ConnectionPoolEntry],cargs:List[str],cparams:MutableMapping[str,Any],)->None:"""Function to retreive access token for connection. Get token for SQLServer connection from token URL, and use the token to connect to the database. """credential=DefaultAzureCredential()# Remove Trusted_Connection param that SQLAlchemy adds to# the connection string by default.cargs[0]=cargs[0].replace(EXTRA_PARAMS,str())# Create credential tokentoken_bytes=credential.get_token(AZURE_TOKEN_URL).token.encode("utf-16-le")token_struct=struct.pack(f"<I{len(token_bytes)}s",len(token_bytes),token_bytes)# Apply credential token to keyword argumentcparams["attrs_before"]={SQL_COPT_SS_ACCESS_TOKEN:token_struct}