You need to enable JavaScript to run this app.
导航
通过 Python 连接 ES 实例
最近更新时间:2025.01.08 14:57:46首次发布时间:2023.08.24 17:53:35

本文介绍如何使用 Python 语言连接 ES 实例,并为您提供示例代码和注意事项。

准备工作

  • 提前创建火山引擎 ES 实例,并确保为正常运行状态。创建实例的具体操作,请参见创建实例
  • 确保运行 Python 代码的服务器与 ES 实例网络互通。
    • 如果运行 Python 代码的服务器与 ES 实例在相同的私有网络 VPC 中,则可以通过实例的私网地址进行连接。连接前,需要确保 VPC 私网访问白名单(默认为 0.0.0.0/0)中已添加了服务器的私网 IP 地址。具体操作步骤,请参见配置实例私网 IP 白名单
    • 如果运行 Python 代码的服务器在公网环境下,则可以通过实例的公网地址进行连接。连接前,需要开启实例公网地址,并修改公网地址访问白名单,将服务器的公网 IP 地址加入白名单中。具体操作步骤,请参见配置实例公网访问配置实例公网 IP 白名单

安装 Elasticsearch Python 客户端

建议 Elasticsearch Python 客户端和 ES 实例的版本保持一致。
例如需要访问的 ES 实例版本是7.10.2,则需要安装 7.10 版本的 Elasticsearch Python 客户端。
安装 Elasticsearch Python 客户端的命令如下:

pip install Elasticsearch==7.10

连接 HTTP 实例

该场景适用于连接访问方式为 HTTP 的 ES 实例。
示例代码如下:

from elasticsearch import Elasticsearch
from elasticsearch.connection import Urllib3HttpConnection
import socket

# 创建 Elasticsearch 客户端连接。
client = Elasticsearch(
    hosts=[
        {
            'host': "{实例访问域名}",
            'port': "9200",
        }
    ],
    scheme="http",
    http_auth=('admin', '{admin用户密码}'),
    connection_class=Urllib3HttpConnection,
    connection_params={
        'socket_options': [
            (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
            (socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 300),
            (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
        ]
    },
)

# 创建索引,根据需要设置 settings、mappings,以及字段信息。
index_name = '{索引名称}'
index_body = {
    'settings': {
        'number_of_shards': 3,
        'number_of_replicas': 1
    },
    'mappings': {
        'properties': {
            'name': {
                'type': 'text'
            },
            'age': {
                'type': 'integer'
            }
        }
    }
}

response = client.indices.create(index=index_name, body=index_body)
print(response)

运行程序,返回如下类似信息:

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'custom-index'}

连接 HTTPS 实例

使用证书

该场景适用于连接访问方式为 HTTPS 的 ES 实例,且连接过程需要校验证书。
示例代码如下:

from elasticsearch import Elasticsearch
from elasticsearch.connection import Urllib3HttpConnection
import socket

# 创建 Elasticsearch 客户端连接。
client = Elasticsearch(
    hosts=[
        {
            'host': "{实例访问域名}",
            'port': "9200",
        }
    ],
    scheme="https",
    http_auth=('admin', '{admin用户密码}'),
    ca_certs = '{证书保存地址}',
    connection_class=Urllib3HttpConnection,
    connection_params={
        'socket_options': [
            (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
            (socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 300),
            (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
        ]
    },
)

# 创建索引,根据需要设置 settings、mappings,以及字段信息。
index_name = '{索引名称}'
index_body = {
    'settings': {
        'number_of_shards': 3,
        'number_of_replicas': 1
    },
    'mappings': {
        'properties': {
            'name': {
                'type': 'text'
            },
            'age': {
                'type': 'integer'
            }
        }
    }
}

response = client.indices.create(index=index_name, body=index_body)
print(response)

运行程序,返回如下类似信息:

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'custom-index'}

忽略证书

该场景适用于连接访问方式为 HTTPS 的 ES 实例,且连接过程忽略证书。
示例代码如下:

from elasticsearch import Elasticsearch
from elasticsearch.connection import Urllib3HttpConnection
import socket

# 创建 Elasticsearch 客户端连接。
client = Elasticsearch(
    hosts=[
        {
            'host': "{实例访问域名}",
            'port': "9200",
        }
    ],
    scheme="https",
    http_auth=('admin', '{admin用户密码}'),
    use_ssl=True,
    verify_certs=False,
    ssl_assert_hostname=False,
    ssl_show_warn=False,
    connection_class=Urllib3HttpConnection,
    connection_params={
        'socket_options': [
            (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
            (socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 300),
            (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 5)
        ]
    },
)

# 创建索引,根据需要设置 settings、mappings,以及字段信息。
index_name = '{索引名称}'
index_body = {
    'settings': {
        'number_of_shards': 3,
        'number_of_replicas': 1
    },
    'mappings': {
        'properties': {
            'name': {
                'type': 'text'
            },
            'age': {
                'type': 'integer'
            }
        }
    }
}

response = client.indices.create(index=index_name, body=index_body)
print(response)

运行程序,返回如下类似信息:

{'acknowledged': True, 'shards_acknowledged': True, 'index': 'custom-index'}