Source code for langchain_mongodb.pipelines

"""Aggregation pipeline components used in Atlas Full-Text, Vector, and Hybrid Search

See the following for more:
    - `Full-Text Search <https://www.mongodb.com/docs/atlas/atlas-search/aggregation-stages/search/#mongodb-pipeline-pipe.-search>`_
    - `MongoDB Operators <https://www.mongodb.com/docs/atlas/atlas-search/operators-and-collectors/#std-label-operators-ref>`_
    - `Vector Search <https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-stage/>`_
    - `Filter Example <https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-stage/#atlas-vector-search-pre-filter>`_
"""

from typing import Any, Dict, List, Optional


[docs] def text_search_stage( query: str, search_field: str, index_name: str, limit: Optional[int] = None, filter: Optional[Dict[str, Any]] = None, include_scores: Optional[bool] = True, **kwargs: Any, ) -> List[Dict[str, Any]]: # noqa: E501 """Full-Text search using Lucene's standard (BM25) analyzer Args: query: Input text to search for search_field: Field in Collection that will be searched index_name: Atlas Search Index name limit: Maximum number of documents to return. Default of no limit filter: Any MQL match expression comparing an indexed field include_scores: Scores provide measure of relative relevance Returns: Dictionary defining the $search stage """ pipeline = [ { "$search": { "index": index_name, "text": {"query": query, "path": search_field}, } } ] if filter: pipeline.append({"$match": filter}) # type: ignore if include_scores: pipeline.append({"$set": {"score": {"$meta": "searchScore"}}}) if limit: pipeline.append({"$limit": limit}) # type: ignore return pipeline # type: ignore
[docs] def vector_search_stage( query_vector: List[float], search_field: str, index_name: str, top_k: int = 4, filter: Optional[Dict[str, Any]] = None, oversampling_factor: int = 10, **kwargs: Any, ) -> Dict[str, Any]: # noqa: E501 """Vector Search Stage without Scores. Scoring is applied later depending on strategy. vector search includes a vectorSearchScore that is typically used. hybrid uses Reciprocal Rank Fusion. Args: query_vector: List of embedding vector search_field: Field in Collection containing embedding vectors index_name: Name of Atlas Vector Search Index tied to Collection top_k: Number of documents to return oversampling_factor: this times limit is the number of candidates filter: MQL match expression comparing an indexed field. Some operators are not supported. See `vectorSearch filter docs <https://www.mongodb.com/docs/atlas/atlas-vector-search/vector-search-stage/#atlas-vector-search-pre-filter>`_ Returns: Dictionary defining the $vectorSearch """ stage = { "index": index_name, "path": search_field, "queryVector": query_vector, "numCandidates": top_k * oversampling_factor, "limit": top_k, } if filter: stage["filter"] = filter return {"$vectorSearch": stage}
[docs] def combine_pipelines( pipeline: List[Any], stage: List[Dict[str, Any]], collection_name: str ) -> None: """Combines two aggregations into a single result set in-place.""" if pipeline: pipeline.append({"$unionWith": {"coll": collection_name, "pipeline": stage}}) else: pipeline.extend(stage)
[docs] def reciprocal_rank_stage( score_field: str, penalty: float = 0, **kwargs: Any ) -> List[Dict[str, Any]]: """Stage adds Reciprocal Rank Fusion weighting. First, it pushes documents retrieved from previous stage into a temporary sub-document. It then unwinds to establish the rank to each and applies the penalty. Args: score_field: A unique string to identify the search being ranked penalty: A non-negative float. extra_fields: Any fields other than text_field that one wishes to keep. Returns: RRF score := \frac{1}{rank + penalty} with rank in [1,2,..,n] """ rrf_pipeline = [ {"$group": {"_id": None, "docs": {"$push": "$$ROOT"}}}, {"$unwind": {"path": "$docs", "includeArrayIndex": "rank"}}, { "$addFields": { f"docs.{score_field}": { "$divide": [1.0, {"$add": ["$rank", penalty, 1]}] }, "docs.rank": "$rank", "_id": "$docs._id", } }, {"$replaceRoot": {"newRoot": "$docs"}}, ] return rrf_pipeline # type: ignore
[docs] def final_hybrid_stage( scores_fields: List[str], limit: int, **kwargs: Any ) -> List[Dict[str, Any]]: """Sum weighted scores, sort, and apply limit. Args: scores_fields: List of fields given to scores of vector and text searches limit: Number of documents to return Returns: Final aggregation stages """ return [ {"$group": {"_id": "$_id", "docs": {"$mergeObjects": "$$ROOT"}}}, {"$replaceRoot": {"newRoot": "$docs"}}, {"$set": {score: {"$ifNull": [f"${score}", 0]} for score in scores_fields}}, {"$addFields": {"score": {"$add": [f"${score}" for score in scores_fields]}}}, {"$sort": {"score": -1}}, {"$limit": limit}, ]