You need to enable JavaScript to run this app.
导航
通过 Java 连接 OpenSearch 实例
最近更新时间:2024.10.23 10:35:04首次发布时间:2024.06.06 14:31:42

本文介绍如何使用 Java 语言,通过 Rest High level Client 连接 OpenSearch 实例,并为您提供示例代码。

准备工作

  • 提前创建 OpenSearch 实例,并确保为正常运行状态。创建实例的具体操作,请参见创建实例
  • 您在连接实例前,请先在实例详情页面获取实例访问地址、实例访问用户。
    Image

    说明

    • 对于需要使用证书连接的 HTTPS 协议实例,还需要下载证书并保存到本地路径。
    • 如果遗忘实例访问用户(admin)的密码,可以选择重置密码。具体操作步骤,请参见重置访问密码
    • 如果需要使用实例公网地址访问,您可以为实例开启公网访问,然后绑定一个弹性公网 EIP。具体操作步骤,请参见配置实例公网访问
  • 运行 Java 代码的服务器需要提前安装 Java 环境,安装 11 或以上版本 JDK;以及安装 3.5 或以上版本 Maven。具体操作,请参见安装JDK安装Maven

注意事项

  • 使用 Java 连接 OpneSearch 实例,JDK 版本需要至少为 11,而在连接 ES 实例时 JDK 版本至少 1.8。
    如果您 JDK 版本太低,需要先升级 JDK 版本。
  • 建议 Rest High Level Client 客户端版本和火山引擎 OpenSearch 实例的版本保持一致。若您使用相比 OpenSearch 实例更高版本的 Rest High Level Client,则可能存在少量请求的兼容性问题。
  • 确保运行 Java 代码的服务器与 OpenSearch 实例网络互通。
    • 如果运行 Java 代码的服务器与实例在相同的私有网络 VPC 中,则可以通过实例的私网地址进行连接。
    • 如果运行 Java 代码的服务器在公网环境下,则可以通过实例的公网地址进行连接。具体操作步骤,请参见配置实例公网访问

添加依赖

pom.xml 文件中添加以下依赖。Rest High Level Client 客户端版本需要设置为 2.9.0 版本。

<dependency>
    <groupId>org.opensearch.client</groupId>
    <artifactId>opensearch-rest-high-level-client</artifactId>
    <version>2.9.0</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-core</artifactId>
    <version>2.15.2</version>
</dependency>

使用证书连接实例

连接实例

该场景适用于连接访问方式为 HTTPS 的 OpneSearch 实例,且需要认证实例的 HTTPS 证书。
示例代码如下:

package com.bytedance.openplatform.imgr.core.client;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.ssl.SSLContextBuilder;
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;

import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.cert.CertificateFactory;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class OSinitClientWithCA {

    RestHighLevelClient initClientWithCA(List<HttpHost> hosts, String caPath, String user, String password) throws Exception{
        if (hosts == null || hosts.isEmpty()) {
            throw new IllegalArgumentException("invalid empty hosts");
        }
        // add more parameters check here

        // 身份信息相关
        CredentialsProvider provider = new BasicCredentialsProvider();
        provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));

        SSLContext sslContext;
        try {
            InputStream is = Files.newInputStream(Paths.get(caPath));
            KeyStore trustStore = KeyStore.getInstance("pkcs12");
            trustStore.load(null,null);
            trustStore.setCertificateEntry("ca", CertificateFactory.getInstance("X.509").generateCertificate(is));
            sslContext = new SSLContextBuilder().loadTrustMaterial(trustStore, null).build();
        } catch (Exception e) {
            // add your own log here
            throw e;
        }

        RestClientBuilder builder = RestClient.builder(hosts.toArray(new HttpHost[0]))
                .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
                        .setConnectionTimeToLive(300, TimeUnit.SECONDS)
                        .setMaxConnPerRoute(10)
                        .setMaxConnTotal(20)
                        .setDefaultCredentialsProvider(provider)  // 设置鉴权凭证
                        .setSSLStrategy(new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE)))  // 跳过 HostName 校验
                .setRequestConfigCallback(requestBuilder -> requestBuilder
                        .setConnectionRequestTimeout(5000)  // 从 connect 池中获取 connect 最大等待时间,根据实际情况调整
                        .setSocketTimeout(60000)  // 请求超时时间,根据实际情况调整
                        .setConnectTimeout(5000));  // 建连超时时间,根据实际情况调整

        return new RestHighLevelClient(builder);
    }

    public static void main(String[] args) throws Exception {
        String file = "<证书保存地址>";
        RestHighLevelClient client = new OSinitClientWithCA().initClientWithCA(
                Arrays.asList(new HttpHost("<访问域名>", 9200, "https")),
                file, "admin", "<用户密码>");

        ClusterHealthRequest request = new ClusterHealthRequest();
        try {
            ClusterHealthResponse health = client.cluster().health(request, RequestOptions.DEFAULT);
            System.out.println("current cluster status is: " + health.getStatus()); // 输出 - current cluster status is: GREEN
        } catch (IOException e) {
            e.printStackTrace();
        }
        client.close();
    }

}

运行程序,返回如下类似信息:

current cluster status is: GREEN

创建索引

执行以下代码,可以创建索引,您可以配置索引的 Settings 和 Mappings。

package com.bytedance.openplatform.imgr.core.client;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.ssl.SSLContextBuilder;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.common.settings.Settings;

import javax.net.ssl.SSLContext;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.cert.CertificateFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class OSinitClientWithCA {

    RestHighLevelClient initClientWithCA(List<HttpHost> hosts, String caPath, String user, String password) throws Exception{
        if (hosts == null || hosts.isEmpty()) {
            throw new IllegalArgumentException("invalid empty hosts");
        }
        // add more parameters check here

        // 身份信息相关
        CredentialsProvider provider = new BasicCredentialsProvider();
        provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));

        SSLContext sslContext;
        try {
            InputStream is = Files.newInputStream(Paths.get(caPath));
            KeyStore trustStore = KeyStore.getInstance("pkcs12");
            trustStore.load(null,null);
            trustStore.setCertificateEntry("ca", CertificateFactory.getInstance("X.509").generateCertificate(is));
            sslContext = new SSLContextBuilder().loadTrustMaterial(trustStore, null).build();
        } catch (Exception e) {
            // add your own log here
            throw e;
        }

        RestClientBuilder builder = RestClient.builder(hosts.toArray(new HttpHost[0]))
                .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
                        .setConnectionTimeToLive(300, TimeUnit.SECONDS)
                        .setMaxConnPerRoute(10)
                        .setMaxConnTotal(20)
                        .setDefaultCredentialsProvider(provider)  // 设置鉴权凭证
                        .setSSLStrategy(new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE)))  // 跳过 HostName 校验
                .setRequestConfigCallback(requestBuilder -> requestBuilder
                        .setConnectionRequestTimeout(5000)  // 从 connect 池中获取 connect 最大等待时间,根据实际情况调整
                        .setSocketTimeout(60000)  // 请求超时时间,根据实际情况调整
                        .setConnectTimeout(5000));  // 建连超时时间,根据实际情况调整

        return new RestHighLevelClient(builder);
    }

    public static void main(String[] args) throws Exception {
        String file = "<证书保存地址>";
        RestHighLevelClient client = new OSinitClientWithCA().initClientWithCA(
                List.of(new HttpHost("<访问域名>", 9200, "https")),
                file, "admin", "<用户密码>");

        // 创建索引
        CreateIndexRequest createIndexRequest = new CreateIndexRequest("<索引名称>");
        createIndexRequest.settings(Settings.builder()
                .put("index.number_of_shards", 3)
                .put("index.number_of_replicas", 1));  // 配置索引分片和副本数
        createIndexRequest.mapping( Map.ofEntries(
                Map.entry("properties", Map.ofEntries(
                        Map.entry("name", Map.of("type", "keyword")),
                        Map.entry("age", Map.of("type", "integer"))
                ))));   // 配置索引的 mapping

        CreateIndexResponse createIndexResponse = client.indices()
                .create(createIndexRequest, RequestOptions.DEFAULT);

        System.out.println("isAcknowledged=" + createIndexResponse.isAcknowledged() +
                "\nshards_acknowledged=" + createIndexResponse.isShardsAcknowledged() +
                "\nindex=" + createIndexResponse.index());
        client.close();
    }
}

运行程序,返回如下类似信息:

isAcknowledged=true
shards_acknowledged=true
index=custom-index

写入文档

执行以下代码,可以向目标索引中写入文档。

package com.bytedance.openplatform.imgr.core.client;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.ssl.SSLContextBuilder;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;


import javax.net.ssl.SSLContext;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyStore;
import java.security.cert.CertificateFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class OSinitClientWithCA {

    RestHighLevelClient initClientWithCA(List<HttpHost> hosts, String caPath, String user, String password) throws Exception{
        if (hosts == null || hosts.isEmpty()) {
            throw new IllegalArgumentException("invalid empty hosts");
        }
        // add more parameters check here

        // 身份信息相关
        CredentialsProvider provider = new BasicCredentialsProvider();
        provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));

        SSLContext sslContext;
        try {
            InputStream is = Files.newInputStream(Paths.get(caPath));
            KeyStore trustStore = KeyStore.getInstance("pkcs12");
            trustStore.load(null,null);
            trustStore.setCertificateEntry("ca", CertificateFactory.getInstance("X.509").generateCertificate(is));
            sslContext = new SSLContextBuilder().loadTrustMaterial(trustStore, null).build();
        } catch (Exception e) {
            // add your own log here
            throw e;
        }

        RestClientBuilder builder = RestClient.builder(hosts.toArray(new HttpHost[0]))
                .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
                        .setConnectionTimeToLive(300, TimeUnit.SECONDS)
                        .setMaxConnPerRoute(10)
                        .setMaxConnTotal(20)
                        .setDefaultCredentialsProvider(provider)  // 设置鉴权凭证
                        .setSSLStrategy(new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE)))  // 跳过 HostName 校验
                .setRequestConfigCallback(requestBuilder -> requestBuilder
                        .setConnectionRequestTimeout(5000)  // 从 connect 池中获取 connect 最大等待时间,根据实际情况调整
                        .setSocketTimeout(60000)  // 请求超时时间,根据实际情况调整
                        .setConnectTimeout(5000));  // 建连超时时间,根据实际情况调整

        return new RestHighLevelClient(builder);
    }

    public static void main(String[] args) throws Exception {
        String file = "<证书保存地址>";
        RestHighLevelClient client = new OSinitClientWithCA().initClientWithCA(
                List.of(new HttpHost("<访问域名>", 9200, "https")),
                file, "admin", "<用户密码>");

        // 写入文档
        IndexRequest request = new IndexRequest("<索引名称>");
        request.id("1"); // 设置文档 id
        request.source(Map.of(
                "age", 10,
                "name", "test"
        )); // Place your content into the index's source.
        IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);

        System.out.println("文档创建状态:" + indexResponse.status());  // 输出 - 文档创建状态:CREATED
    }

}

运行程序,返回如下类似信息:

文档创建状态:CREATED

忽略证书连接实例

连接实例

该场景适用于连接访问方式为 HTTPS 的 OpenSearch 实例,连接过程中忽略 HTTPS 证书。
示例代码如下:

package com.bytedance.openplatform.imgr.core.client;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.ssl.SSLContextBuilder;
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;

import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class OSinitClientSkipVerification {

    RestHighLevelClient initClientSkipVerification(List<HttpHost> hosts, String user, String password)
            throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        if (hosts == null || hosts.isEmpty()) {
            throw new IllegalArgumentException("invalid empty hosts");
        }
        // add more parameters check here

        // 身份信息相关
        CredentialsProvider provider = new BasicCredentialsProvider();
        provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));
        SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, (x509Certificates, s) -> true).build();  // 跳过 SSL 证书校验

        RestClientBuilder builder = RestClient.builder(hosts.toArray(new HttpHost[0]))
                .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
                        .setConnectionTimeToLive(300, TimeUnit.SECONDS)
                        .setMaxConnPerRoute(10)
                        .setMaxConnTotal(20)
                        .setDefaultCredentialsProvider(provider)  // 设置鉴权凭证
                        .setSSLStrategy(new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE)))  // 跳过 HostName 校验
                .setRequestConfigCallback(requestBuilder -> requestBuilder
                        .setConnectionRequestTimeout(5000)  // 从 connect 池中获取 connect 最大等待时间,根据实际情况调整
                        .setSocketTimeout(60000)  // 请求超时时间,根据实际情况调整
                        .setConnectTimeout(5000));  // 建连超时时间,根据实际情况调整

        return new RestHighLevelClient(builder);

    }

        public static void main(String[] args) throws Exception {
            RestHighLevelClient client = new OSinitClientSkipVerification().initClientSkipVerification(
                    Arrays.asList(new HttpHost("<访问域名>", 9200, "https")),
                    "admin", "<用户密码>");

            ClusterHealthRequest request = new ClusterHealthRequest();
            try {
                ClusterHealthResponse health = client.cluster().health(request, RequestOptions.DEFAULT);
                System.out.println("current cluster status is: " + health.getStatus()); // 输出 - current cluster status is: GREEN
            } catch (IOException e) {
                e.printStackTrace();
            }
            client.close();
        }

    }

运行程序,返回如下类似信息:

current cluster status is: GREEN

创建索引

执行以下代码,可以创建索引,您可以配置索引的 Settings 和 Mappings。

package com.bytedance.openplatform.imgr.core.client;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.ssl.SSLContextBuilder;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.common.settings.Settings;

import javax.net.ssl.SSLContext;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;


public class OSinitClientSkipVerification {

    RestHighLevelClient initClientSkipVerification(List<HttpHost> hosts, String user, String password)
            throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        if (hosts == null || hosts.isEmpty()) {
            throw new IllegalArgumentException("invalid empty hosts");
        }
        // add more parameters check here

        // 身份信息相关
        CredentialsProvider provider = new BasicCredentialsProvider();
        provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));
        SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, (x509Certificates, s) -> true).build();  // 跳过 SSL 证书校验

        RestClientBuilder builder = RestClient.builder(hosts.toArray(new HttpHost[0]))
                .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
                        .setConnectionTimeToLive(300, TimeUnit.SECONDS)
                        .setMaxConnPerRoute(10)
                        .setMaxConnTotal(20)
                        .setDefaultCredentialsProvider(provider)  // 设置鉴权凭证
                        .setSSLStrategy(new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE)))  // 跳过 HostName 校验
                .setRequestConfigCallback(requestBuilder -> requestBuilder
                        .setConnectionRequestTimeout(5000)  // 从 connect 池中获取 connect 最大等待时间,根据实际情况调整
                        .setSocketTimeout(60000)  // 请求超时时间,根据实际情况调整
                        .setConnectTimeout(5000));  // 建连超时时间,根据实际情况调整

        return new RestHighLevelClient(builder);

    }

    public static void main(String[] args) throws Exception {
        RestHighLevelClient client = new OSinitClientSkipVerification().initClientSkipVerification(
                List.of(new HttpHost("<访问域名>", 9200, "https")),
                "admin", "<用户密码>");

        // 创建索引
        CreateIndexRequest createIndexRequest = new CreateIndexRequest("<索引名称>");
        createIndexRequest.settings(Settings.builder()
                .put("index.number_of_shards", 3)
                .put("index.number_of_replicas", 1));  // 配置索引分片和副本数
        createIndexRequest.mapping( Map.ofEntries(
                Map.entry("properties", Map.ofEntries(
                        Map.entry("name", Map.of("type", "keyword")),
                        Map.entry("age", Map.of("type", "integer"))
                ))));   // 配置索引的 mapping

        CreateIndexResponse createIndexResponse = client.indices()
                .create(createIndexRequest, RequestOptions.DEFAULT);

        System.out.println("isAcknowledged=" + createIndexResponse.isAcknowledged() +
                "\nshards_acknowledged=" + createIndexResponse.isShardsAcknowledged() +
                "\nindex=" + createIndexResponse.index());
        client.close();
    }

}

运行程序,返回如下类似信息:

isAcknowledged=true
shards_acknowledged=true
index=custom-index

写入文档

执行以下代码,可以向目标索引中写入文档。

package com.bytedance.openplatform.imgr.core.client;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy;
import org.apache.http.ssl.SSLContextBuilder;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;


import javax.net.ssl.SSLContext;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class OSinitClientSkipVerification {

    RestHighLevelClient initClientSkipVerification(List<HttpHost> hosts, String user, String password)
            throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException {
        if (hosts == null || hosts.isEmpty()) {
            throw new IllegalArgumentException("invalid empty hosts");
        }
        // add more parameters check here

        // 身份信息相关
        CredentialsProvider provider = new BasicCredentialsProvider();
        provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));
        SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, (x509Certificates, s) -> true).build();  // 跳过 SSL 证书校验

        RestClientBuilder builder = RestClient.builder(hosts.toArray(new HttpHost[0]))
                .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
                        .setConnectionTimeToLive(300, TimeUnit.SECONDS)
                        .setMaxConnPerRoute(10)
                        .setMaxConnTotal(20)
                        .setDefaultCredentialsProvider(provider)  // 设置鉴权凭证
                        .setSSLStrategy(new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE)))  // 跳过 HostName 校验
                .setRequestConfigCallback(requestBuilder -> requestBuilder
                        .setConnectionRequestTimeout(5000)  // 从 connect 池中获取 connect 最大等待时间,根据实际情况调整
                        .setSocketTimeout(60000)  // 请求超时时间,根据实际情况调整
                        .setConnectTimeout(5000));  // 建连超时时间,根据实际情况调整

        return new RestHighLevelClient(builder);

    }

    public static void main(String[] args) throws Exception {
        RestHighLevelClient client = new OSinitClientSkipVerification().initClientSkipVerification(
                List.of(new HttpHost("<访问域名>", 9200, "https")),
                "admin", "<用户密码>");

        // 写入文档
        IndexRequest request = new IndexRequest("<索引名称>");
        request.id("1"); // 设置文档 id
        request.source(Map.of(
                "age", 10,
                "name", "test"
        )); // Place your content into the index's source.
        IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);

        System.out.println("文档创建状态:" + indexResponse.status());  // 输出 - 文档创建状态:CREATED
    }

}

运行程序,返回如下类似信息:

文档创建状态:CREATED

连接 HTTP 协议实例

连接实例

该场景适用于连接访问方式为 HTTP 的 OpenSearch 实例。
示例代码如下:

package com.bytedance.openplatform.imgr.core.client;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest;
import org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class OSinitHttpClient {

    RestHighLevelClient initHttpClient(List<HttpHost> hosts, String user, String password) {
        if (hosts == null || hosts.isEmpty()) {
            throw new IllegalArgumentException("invalid empty hosts");
        }
        // add more parameters check here

        // 身份信息相关
        CredentialsProvider provider = new BasicCredentialsProvider();
        provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));

        RestClientBuilder builder = RestClient.builder(hosts.toArray(new HttpHost[0]))
                .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
                        .setConnectionTimeToLive(300, TimeUnit.SECONDS)
                        .setMaxConnPerRoute(10)
                        .setMaxConnTotal(20)
                        .setDefaultCredentialsProvider(provider))  // 设置鉴权凭证
                .setRequestConfigCallback(requestBuilder -> requestBuilder
                        .setConnectionRequestTimeout(5000)  // 从 connect 池中获取 connect 最大等待时间,根据实际情况调整
                        .setSocketTimeout(60000)  // 请求超时时间,根据实际情况调整
                        .setConnectTimeout(5000));  // 建连超时时间,根据实际情况调整

        return new RestHighLevelClient(builder);
    }

    public static void main(String[] args) throws Exception {
            RestHighLevelClient client = new OSinitHttpClient().initHttpClient(
                    Arrays.asList(new HttpHost("<访问域名>", 9200, "http")),
                    "admin", "<用户密码>");

            ClusterHealthRequest request = new ClusterHealthRequest();
            try {
                ClusterHealthResponse health = client.cluster().health(request, RequestOptions.DEFAULT);
                System.out.println("current cluster status is: " + health.getStatus()); // 输出 - current cluster status is: GREEN
            } catch (IOException e) {
                e.printStackTrace();
            }
            client.close();
        }


    }

运行程序,返回如下类似信息:

current cluster status is: GREEN

创建索引

执行以下代码,可以创建索引,您可以配置索引的 Settings 和 Mappings。

package com.bytedance.openplatform.imgr.core.client;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.common.settings.Settings;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class OSinitHttpClient {

    RestHighLevelClient initHttpClient(List<HttpHost> hosts, String user, String password) {
        if (hosts == null || hosts.isEmpty()) {
            throw new IllegalArgumentException("invalid empty hosts");
        }
        // add more parameters check here

        // 身份信息相关
        CredentialsProvider provider = new BasicCredentialsProvider();
        provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));

        RestClientBuilder builder = RestClient.builder(hosts.toArray(new HttpHost[0]))
                .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
                        .setConnectionTimeToLive(300, TimeUnit.SECONDS)
                        .setMaxConnPerRoute(10)
                        .setMaxConnTotal(20)
                        .setDefaultCredentialsProvider(provider))  // 设置鉴权凭证
                .setRequestConfigCallback(requestBuilder -> requestBuilder
                        .setConnectionRequestTimeout(5000)  // 从 connect 池中获取 connect 最大等待时间,根据实际情况调整
                        .setSocketTimeout(60000)  // 请求超时时间,根据实际情况调整
                        .setConnectTimeout(5000));  // 建连超时时间,根据实际情况调整

        return new RestHighLevelClient(builder);
    }

    public static void main(String[] args) throws Exception {
        RestHighLevelClient client = new OSinitHttpClient().initHttpClient(
                List.of(new HttpHost("<访问域名>", 9200, "http")),
                "admin", "<用户密码>");

        // 创建索引
        CreateIndexRequest createIndexRequest = new CreateIndexRequest("<索引名称>");
        createIndexRequest.settings(Settings.builder()
                .put("index.number_of_shards", 3)
                .put("index.number_of_replicas", 1));  // 配置索引分片和副本数
        createIndexRequest.mapping( Map.ofEntries(
                Map.entry("properties", Map.ofEntries(
                        Map.entry("name", Map.of("type", "keyword")),
                        Map.entry("age", Map.of("type", "integer"))
                ))));   // 配置索引的 mapping

        CreateIndexResponse createIndexResponse = client.indices()
                .create(createIndexRequest, RequestOptions.DEFAULT);

        System.out.println("isAcknowledged=" + createIndexResponse.isAcknowledged() +
                "\nshards_acknowledged=" + createIndexResponse.isShardsAcknowledged() +
                "\nindex=" + createIndexResponse.index());
        client.close();
    }

}

运行程序,返回如下类似信息:

isAcknowledged=true
shards_acknowledged=true
index=custom-index

写入文档

执行以下代码,可以向目标索引中写入文档。

package com.bytedance.openplatform.imgr.core.client;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;


import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class OSinitHttpClient {

    RestHighLevelClient initHttpClient(List<HttpHost> hosts, String user, String password) {
        if (hosts == null || hosts.isEmpty()) {
            throw new IllegalArgumentException("invalid empty hosts");
        }
        // add more parameters check here

        // 身份信息相关
        CredentialsProvider provider = new BasicCredentialsProvider();
        provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password));

        RestClientBuilder builder = RestClient.builder(hosts.toArray(new HttpHost[0]))
                .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder
                        .setConnectionTimeToLive(300, TimeUnit.SECONDS)
                        .setMaxConnPerRoute(10)
                        .setMaxConnTotal(20)
                        .setDefaultCredentialsProvider(provider))  // 设置鉴权凭证
                .setRequestConfigCallback(requestBuilder -> requestBuilder
                        .setConnectionRequestTimeout(5000)  // 从 connect 池中获取 connect 最大等待时间,根据实际情况调整
                        .setSocketTimeout(60000)  // 请求超时时间,根据实际情况调整
                        .setConnectTimeout(5000));  // 建连超时时间,根据实际情况调整

        return new RestHighLevelClient(builder);
    }

    public static void main(String[] args) throws Exception {
        RestHighLevelClient client = new OSinitHttpClient().initHttpClient(
                List.of(new HttpHost("<访问域名>", 9200, "http")),
                "admin", "<用户密码>");

        // 写入文档
        IndexRequest request = new IndexRequest("<索引名称>");
        request.id("1"); // 设置文档 id
        request.source(Map.of(
                "age", 10,
                "name", "test"
        )); // Place your content into the index's source.
        IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT);

        System.out.println("文档创建状态:" + indexResponse.status());  // 输出 - 文档创建状态:CREATED
    }

}

运行程序,返回如下类似信息:

文档创建状态:CREATED

KNN查询

public SearchResponse search(String indexName, String userId, float[] vec, int k) throws IOException {
    SearchRequest searchRequest = new SearchRequest();
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
    searchSourceBuilder.query(new WrapperQueryBuilder(
            "{\n" +
                    "    \"knn\": {\n" +
                    "      \"embedding\": {\n" +
                    "        \"vector\": " + objectMapper.writeValueAsString(vec) + ",\n" +
                    "        \"k\": " + k + ",\n" +
                    "        \"filter\": {\n" +
                    "          \"bool\": {\n" +
                    "            \"must\": [\n" +
                    "              {\n" +
                    "                \"term\": {\n" +
                    "                  \"userid\": " + userId + "\n" +
                    "                }\n" +
                    "              }\n" +
                    "            ]\n" +
                    "          }\n" +
                    "        }\n" +
                    "      }\n" +
                    "    }\n" +
                    "  }"
    ));
    searchRequest.indices(indexName).source(searchSourceBuilder);
    return client.search(searchRequest, RequestOptions.DEFAULT);
}