Source code for langchain_elasticsearch.client

from typing import Any, Dict, Optional

from elasticsearch import AsyncElasticsearch, Elasticsearch


[docs] def create_elasticsearch_client( url: Optional[str] = None, cloud_id: Optional[str] = None, api_key: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, params: Optional[Dict[str, Any]] = None, ) -> Elasticsearch: if url and cloud_id: raise ValueError( "Both es_url and cloud_id are defined. Please provide only one." ) connection_params: Dict[str, Any] = {} if url: connection_params["hosts"] = [url] elif cloud_id: connection_params["cloud_id"] = cloud_id else: raise ValueError("Please provide either elasticsearch_url or cloud_id.") if api_key: connection_params["api_key"] = api_key elif username and password: connection_params["basic_auth"] = (username, password) if params is not None: connection_params.update(params) es_client = Elasticsearch(**connection_params) es_client.info() # test connection return es_client
[docs] def create_async_elasticsearch_client( url: Optional[str] = None, cloud_id: Optional[str] = None, api_key: Optional[str] = None, username: Optional[str] = None, password: Optional[str] = None, params: Optional[Dict[str, Any]] = None, ) -> AsyncElasticsearch: if url and cloud_id: raise ValueError( "Both es_url and cloud_id are defined. Please provide only one." ) connection_params: Dict[str, Any] = {} if url: connection_params["hosts"] = [url] elif cloud_id: connection_params["cloud_id"] = cloud_id else: raise ValueError("Please provide either elasticsearch_url or cloud_id.") if api_key: connection_params["api_key"] = api_key elif username and password: connection_params["basic_auth"] = (username, password) if params is not None: connection_params.update(params) es_client = AsyncElasticsearch(**connection_params) return es_client