本文介绍如何使用 Java 语言,通过 Rest High level Client 连接 OpenSearch 实例,并为您提供示例代码。
在 pom.xml
文件中添加以下依赖。Rest High Level Client 客户端版本需要设置为 2.9.0 版本。
<dependency> <groupId>org.opensearch.client</groupId> <artifactId>opensearch-rest-high-level-client</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.15.2</version> </dependency>
该场景适用于连接访问方式为 HTTPS 的 OpneSearch 实例,且需要认证实例的 HTTPS 证书。
示例代码如下:
package com.bytedance.openplatform.imgr.core.client; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.http.ssl.SSLContextBuilder; import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.client.RestHighLevelClient; import javax.net.ssl.SSLContext; import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Paths; import java.security.KeyStore; import java.security.cert.CertificateFactory; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; public class OSinitClientWithCA { RestHighLevelClient initClientWithCA(List<HttpHost> hosts, String caPath, String user, String password) throws Exception{ if (hosts == null || hosts.isEmpty()) { throw new IllegalArgumentException("invalid empty hosts"); } // add more parameters check here // 身份信息相关 CredentialsProvider provider = new BasicCredentialsProvider(); provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password)); SSLContext sslContext; try { InputStream is = Files.newInputStream(Paths.get(caPath)); KeyStore trustStore = KeyStore.getInstance("pkcs12"); trustStore.load(null,null); trustStore.setCertificateEntry("ca", CertificateFactory.getInstance("X.509").generateCertificate(is)); sslContext = new SSLContextBuilder().loadTrustMaterial(trustStore, null).build(); } catch (Exception e) { // add your own log here throw e; } RestClientBuilder builder = RestClient.builder(hosts.toArray(new HttpHost[0])) .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder .setConnectionTimeToLive(300, TimeUnit.SECONDS) .setMaxConnPerRoute(10) .setMaxConnTotal(20) .setDefaultCredentialsProvider(provider) // 设置鉴权凭证 .setSSLStrategy(new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE))) // 跳过 HostName 校验 .setRequestConfigCallback(requestBuilder -> requestBuilder .setConnectionRequestTimeout(5000) // 从 connect 池中获取 connect 最大等待时间,根据实际情况调整 .setSocketTimeout(60000) // 请求超时时间,根据实际情况调整 .setConnectTimeout(5000)); // 建连超时时间,根据实际情况调整 return new RestHighLevelClient(builder); } public static void main(String[] args) throws Exception { String file = "<证书保存地址>"; RestHighLevelClient client = new OSinitClientWithCA().initClientWithCA( Arrays.asList(new HttpHost("<访问域名>", 9200, "https")), file, "admin", "<用户密码>"); ClusterHealthRequest request = new ClusterHealthRequest(); try { ClusterHealthResponse health = client.cluster().health(request, RequestOptions.DEFAULT); System.out.println("current cluster status is: " + health.getStatus()); // 输出 - current cluster status is: GREEN } catch (IOException e) { e.printStackTrace(); } client.close(); } }
运行程序,返回如下类似信息:
current cluster status is: GREEN
执行以下代码,可以创建索引,您可以配置索引的 Settings 和 Mappings。
package com.bytedance.openplatform.imgr.core.client; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.http.ssl.SSLContextBuilder; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.indices.CreateIndexRequest; import org.opensearch.client.indices.CreateIndexResponse; import org.opensearch.common.settings.Settings; import javax.net.ssl.SSLContext; import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Paths; import java.security.KeyStore; import java.security.cert.CertificateFactory; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; public class OSinitClientWithCA { RestHighLevelClient initClientWithCA(List<HttpHost> hosts, String caPath, String user, String password) throws Exception{ if (hosts == null || hosts.isEmpty()) { throw new IllegalArgumentException("invalid empty hosts"); } // add more parameters check here // 身份信息相关 CredentialsProvider provider = new BasicCredentialsProvider(); provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password)); SSLContext sslContext; try { InputStream is = Files.newInputStream(Paths.get(caPath)); KeyStore trustStore = KeyStore.getInstance("pkcs12"); trustStore.load(null,null); trustStore.setCertificateEntry("ca", CertificateFactory.getInstance("X.509").generateCertificate(is)); sslContext = new SSLContextBuilder().loadTrustMaterial(trustStore, null).build(); } catch (Exception e) { // add your own log here throw e; } RestClientBuilder builder = RestClient.builder(hosts.toArray(new HttpHost[0])) .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder .setConnectionTimeToLive(300, TimeUnit.SECONDS) .setMaxConnPerRoute(10) .setMaxConnTotal(20) .setDefaultCredentialsProvider(provider) // 设置鉴权凭证 .setSSLStrategy(new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE))) // 跳过 HostName 校验 .setRequestConfigCallback(requestBuilder -> requestBuilder .setConnectionRequestTimeout(5000) // 从 connect 池中获取 connect 最大等待时间,根据实际情况调整 .setSocketTimeout(60000) // 请求超时时间,根据实际情况调整 .setConnectTimeout(5000)); // 建连超时时间,根据实际情况调整 return new RestHighLevelClient(builder); } public static void main(String[] args) throws Exception { String file = "<证书保存地址>"; RestHighLevelClient client = new OSinitClientWithCA().initClientWithCA( List.of(new HttpHost("<访问域名>", 9200, "https")), file, "admin", "<用户密码>"); // 创建索引 CreateIndexRequest createIndexRequest = new CreateIndexRequest("<索引名称>"); createIndexRequest.settings(Settings.builder() .put("index.number_of_shards", 3) .put("index.number_of_replicas", 1)); // 配置索引分片和副本数 createIndexRequest.mapping( Map.ofEntries( Map.entry("properties", Map.ofEntries( Map.entry("name", Map.of("type", "keyword")), Map.entry("age", Map.of("type", "integer")) )))); // 配置索引的 mapping CreateIndexResponse createIndexResponse = client.indices() .create(createIndexRequest, RequestOptions.DEFAULT); System.out.println("isAcknowledged=" + createIndexResponse.isAcknowledged() + "\nshards_acknowledged=" + createIndexResponse.isShardsAcknowledged() + "\nindex=" + createIndexResponse.index()); client.close(); } }
运行程序,返回如下类似信息:
isAcknowledged=true shards_acknowledged=true index=custom-index
执行以下代码,可以向目标索引中写入文档。
package com.bytedance.openplatform.imgr.core.client; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.http.ssl.SSLContextBuilder; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.client.RestHighLevelClient; import javax.net.ssl.SSLContext; import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Paths; import java.security.KeyStore; import java.security.cert.CertificateFactory; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; public class OSinitClientWithCA { RestHighLevelClient initClientWithCA(List<HttpHost> hosts, String caPath, String user, String password) throws Exception{ if (hosts == null || hosts.isEmpty()) { throw new IllegalArgumentException("invalid empty hosts"); } // add more parameters check here // 身份信息相关 CredentialsProvider provider = new BasicCredentialsProvider(); provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password)); SSLContext sslContext; try { InputStream is = Files.newInputStream(Paths.get(caPath)); KeyStore trustStore = KeyStore.getInstance("pkcs12"); trustStore.load(null,null); trustStore.setCertificateEntry("ca", CertificateFactory.getInstance("X.509").generateCertificate(is)); sslContext = new SSLContextBuilder().loadTrustMaterial(trustStore, null).build(); } catch (Exception e) { // add your own log here throw e; } RestClientBuilder builder = RestClient.builder(hosts.toArray(new HttpHost[0])) .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder .setConnectionTimeToLive(300, TimeUnit.SECONDS) .setMaxConnPerRoute(10) .setMaxConnTotal(20) .setDefaultCredentialsProvider(provider) // 设置鉴权凭证 .setSSLStrategy(new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE))) // 跳过 HostName 校验 .setRequestConfigCallback(requestBuilder -> requestBuilder .setConnectionRequestTimeout(5000) // 从 connect 池中获取 connect 最大等待时间,根据实际情况调整 .setSocketTimeout(60000) // 请求超时时间,根据实际情况调整 .setConnectTimeout(5000)); // 建连超时时间,根据实际情况调整 return new RestHighLevelClient(builder); } public static void main(String[] args) throws Exception { String file = "<证书保存地址>"; RestHighLevelClient client = new OSinitClientWithCA().initClientWithCA( List.of(new HttpHost("<访问域名>", 9200, "https")), file, "admin", "<用户密码>"); // 写入文档 IndexRequest request = new IndexRequest("<索引名称>"); request.id("1"); // 设置文档 id request.source(Map.of( "age", 10, "name", "test" )); // Place your content into the index's source. IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT); System.out.println("文档创建状态:" + indexResponse.status()); // 输出 - 文档创建状态:CREATED } }
运行程序,返回如下类似信息:
文档创建状态:CREATED
该场景适用于连接访问方式为 HTTPS 的 OpenSearch 实例,连接过程中忽略 HTTPS 证书。
示例代码如下:
package com.bytedance.openplatform.imgr.core.client; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.http.ssl.SSLContextBuilder; import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.client.RestHighLevelClient; import javax.net.ssl.SSLContext; import java.io.IOException; import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; public class OSinitClientSkipVerification { RestHighLevelClient initClientSkipVerification(List<HttpHost> hosts, String user, String password) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { if (hosts == null || hosts.isEmpty()) { throw new IllegalArgumentException("invalid empty hosts"); } // add more parameters check here // 身份信息相关 CredentialsProvider provider = new BasicCredentialsProvider(); provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password)); SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, (x509Certificates, s) -> true).build(); // 跳过 SSL 证书校验 RestClientBuilder builder = RestClient.builder(hosts.toArray(new HttpHost[0])) .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder .setConnectionTimeToLive(300, TimeUnit.SECONDS) .setMaxConnPerRoute(10) .setMaxConnTotal(20) .setDefaultCredentialsProvider(provider) // 设置鉴权凭证 .setSSLStrategy(new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE))) // 跳过 HostName 校验 .setRequestConfigCallback(requestBuilder -> requestBuilder .setConnectionRequestTimeout(5000) // 从 connect 池中获取 connect 最大等待时间,根据实际情况调整 .setSocketTimeout(60000) // 请求超时时间,根据实际情况调整 .setConnectTimeout(5000)); // 建连超时时间,根据实际情况调整 return new RestHighLevelClient(builder); } public static void main(String[] args) throws Exception { RestHighLevelClient client = new OSinitClientSkipVerification().initClientSkipVerification( Arrays.asList(new HttpHost("<访问域名>", 9200, "https")), "admin", "<用户密码>"); ClusterHealthRequest request = new ClusterHealthRequest(); try { ClusterHealthResponse health = client.cluster().health(request, RequestOptions.DEFAULT); System.out.println("current cluster status is: " + health.getStatus()); // 输出 - current cluster status is: GREEN } catch (IOException e) { e.printStackTrace(); } client.close(); } }
运行程序,返回如下类似信息:
current cluster status is: GREEN
执行以下代码,可以创建索引,您可以配置索引的 Settings 和 Mappings。
package com.bytedance.openplatform.imgr.core.client; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.http.ssl.SSLContextBuilder; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.indices.CreateIndexRequest; import org.opensearch.client.indices.CreateIndexResponse; import org.opensearch.common.settings.Settings; import javax.net.ssl.SSLContext; import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; public class OSinitClientSkipVerification { RestHighLevelClient initClientSkipVerification(List<HttpHost> hosts, String user, String password) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { if (hosts == null || hosts.isEmpty()) { throw new IllegalArgumentException("invalid empty hosts"); } // add more parameters check here // 身份信息相关 CredentialsProvider provider = new BasicCredentialsProvider(); provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password)); SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, (x509Certificates, s) -> true).build(); // 跳过 SSL 证书校验 RestClientBuilder builder = RestClient.builder(hosts.toArray(new HttpHost[0])) .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder .setConnectionTimeToLive(300, TimeUnit.SECONDS) .setMaxConnPerRoute(10) .setMaxConnTotal(20) .setDefaultCredentialsProvider(provider) // 设置鉴权凭证 .setSSLStrategy(new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE))) // 跳过 HostName 校验 .setRequestConfigCallback(requestBuilder -> requestBuilder .setConnectionRequestTimeout(5000) // 从 connect 池中获取 connect 最大等待时间,根据实际情况调整 .setSocketTimeout(60000) // 请求超时时间,根据实际情况调整 .setConnectTimeout(5000)); // 建连超时时间,根据实际情况调整 return new RestHighLevelClient(builder); } public static void main(String[] args) throws Exception { RestHighLevelClient client = new OSinitClientSkipVerification().initClientSkipVerification( List.of(new HttpHost("<访问域名>", 9200, "https")), "admin", "<用户密码>"); // 创建索引 CreateIndexRequest createIndexRequest = new CreateIndexRequest("<索引名称>"); createIndexRequest.settings(Settings.builder() .put("index.number_of_shards", 3) .put("index.number_of_replicas", 1)); // 配置索引分片和副本数 createIndexRequest.mapping( Map.ofEntries( Map.entry("properties", Map.ofEntries( Map.entry("name", Map.of("type", "keyword")), Map.entry("age", Map.of("type", "integer")) )))); // 配置索引的 mapping CreateIndexResponse createIndexResponse = client.indices() .create(createIndexRequest, RequestOptions.DEFAULT); System.out.println("isAcknowledged=" + createIndexResponse.isAcknowledged() + "\nshards_acknowledged=" + createIndexResponse.isShardsAcknowledged() + "\nindex=" + createIndexResponse.index()); client.close(); } }
运行程序,返回如下类似信息:
isAcknowledged=true shards_acknowledged=true index=custom-index
执行以下代码,可以向目标索引中写入文档。
package com.bytedance.openplatform.imgr.core.client; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.nio.conn.ssl.SSLIOSessionStrategy; import org.apache.http.ssl.SSLContextBuilder; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.client.RestHighLevelClient; import javax.net.ssl.SSLContext; import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; public class OSinitClientSkipVerification { RestHighLevelClient initClientSkipVerification(List<HttpHost> hosts, String user, String password) throws NoSuchAlgorithmException, KeyStoreException, KeyManagementException { if (hosts == null || hosts.isEmpty()) { throw new IllegalArgumentException("invalid empty hosts"); } // add more parameters check here // 身份信息相关 CredentialsProvider provider = new BasicCredentialsProvider(); provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password)); SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, (x509Certificates, s) -> true).build(); // 跳过 SSL 证书校验 RestClientBuilder builder = RestClient.builder(hosts.toArray(new HttpHost[0])) .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder .setConnectionTimeToLive(300, TimeUnit.SECONDS) .setMaxConnPerRoute(10) .setMaxConnTotal(20) .setDefaultCredentialsProvider(provider) // 设置鉴权凭证 .setSSLStrategy(new SSLIOSessionStrategy(sslContext, NoopHostnameVerifier.INSTANCE))) // 跳过 HostName 校验 .setRequestConfigCallback(requestBuilder -> requestBuilder .setConnectionRequestTimeout(5000) // 从 connect 池中获取 connect 最大等待时间,根据实际情况调整 .setSocketTimeout(60000) // 请求超时时间,根据实际情况调整 .setConnectTimeout(5000)); // 建连超时时间,根据实际情况调整 return new RestHighLevelClient(builder); } public static void main(String[] args) throws Exception { RestHighLevelClient client = new OSinitClientSkipVerification().initClientSkipVerification( List.of(new HttpHost("<访问域名>", 9200, "https")), "admin", "<用户密码>"); // 写入文档 IndexRequest request = new IndexRequest("<索引名称>"); request.id("1"); // 设置文档 id request.source(Map.of( "age", 10, "name", "test" )); // Place your content into the index's source. IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT); System.out.println("文档创建状态:" + indexResponse.status()); // 输出 - 文档创建状态:CREATED } }
运行程序,返回如下类似信息:
文档创建状态:CREATED
该场景适用于连接访问方式为 HTTP 的 OpenSearch 实例。
示例代码如下:
package com.bytedance.openplatform.imgr.core.client; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.client.RestHighLevelClient; import java.io.IOException; import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; public class OSinitHttpClient { RestHighLevelClient initHttpClient(List<HttpHost> hosts, String user, String password) { if (hosts == null || hosts.isEmpty()) { throw new IllegalArgumentException("invalid empty hosts"); } // add more parameters check here // 身份信息相关 CredentialsProvider provider = new BasicCredentialsProvider(); provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password)); RestClientBuilder builder = RestClient.builder(hosts.toArray(new HttpHost[0])) .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder .setConnectionTimeToLive(300, TimeUnit.SECONDS) .setMaxConnPerRoute(10) .setMaxConnTotal(20) .setDefaultCredentialsProvider(provider)) // 设置鉴权凭证 .setRequestConfigCallback(requestBuilder -> requestBuilder .setConnectionRequestTimeout(5000) // 从 connect 池中获取 connect 最大等待时间,根据实际情况调整 .setSocketTimeout(60000) // 请求超时时间,根据实际情况调整 .setConnectTimeout(5000)); // 建连超时时间,根据实际情况调整 return new RestHighLevelClient(builder); } public static void main(String[] args) throws Exception { RestHighLevelClient client = new OSinitHttpClient().initHttpClient( Arrays.asList(new HttpHost("<访问域名>", 9200, "http")), "admin", "<用户密码>"); ClusterHealthRequest request = new ClusterHealthRequest(); try { ClusterHealthResponse health = client.cluster().health(request, RequestOptions.DEFAULT); System.out.println("current cluster status is: " + health.getStatus()); // 输出 - current cluster status is: GREEN } catch (IOException e) { e.printStackTrace(); } client.close(); } }
运行程序,返回如下类似信息:
current cluster status is: GREEN
执行以下代码,可以创建索引,您可以配置索引的 Settings 和 Mappings。
package com.bytedance.openplatform.imgr.core.client; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.indices.CreateIndexRequest; import org.opensearch.client.indices.CreateIndexResponse; import org.opensearch.common.settings.Settings; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; public class OSinitHttpClient { RestHighLevelClient initHttpClient(List<HttpHost> hosts, String user, String password) { if (hosts == null || hosts.isEmpty()) { throw new IllegalArgumentException("invalid empty hosts"); } // add more parameters check here // 身份信息相关 CredentialsProvider provider = new BasicCredentialsProvider(); provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password)); RestClientBuilder builder = RestClient.builder(hosts.toArray(new HttpHost[0])) .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder .setConnectionTimeToLive(300, TimeUnit.SECONDS) .setMaxConnPerRoute(10) .setMaxConnTotal(20) .setDefaultCredentialsProvider(provider)) // 设置鉴权凭证 .setRequestConfigCallback(requestBuilder -> requestBuilder .setConnectionRequestTimeout(5000) // 从 connect 池中获取 connect 最大等待时间,根据实际情况调整 .setSocketTimeout(60000) // 请求超时时间,根据实际情况调整 .setConnectTimeout(5000)); // 建连超时时间,根据实际情况调整 return new RestHighLevelClient(builder); } public static void main(String[] args) throws Exception { RestHighLevelClient client = new OSinitHttpClient().initHttpClient( List.of(new HttpHost("<访问域名>", 9200, "http")), "admin", "<用户密码>"); // 创建索引 CreateIndexRequest createIndexRequest = new CreateIndexRequest("<索引名称>"); createIndexRequest.settings(Settings.builder() .put("index.number_of_shards", 3) .put("index.number_of_replicas", 1)); // 配置索引分片和副本数 createIndexRequest.mapping( Map.ofEntries( Map.entry("properties", Map.ofEntries( Map.entry("name", Map.of("type", "keyword")), Map.entry("age", Map.of("type", "integer")) )))); // 配置索引的 mapping CreateIndexResponse createIndexResponse = client.indices() .create(createIndexRequest, RequestOptions.DEFAULT); System.out.println("isAcknowledged=" + createIndexResponse.isAcknowledged() + "\nshards_acknowledged=" + createIndexResponse.isShardsAcknowledged() + "\nindex=" + createIndexResponse.index()); client.close(); } }
运行程序,返回如下类似信息:
isAcknowledged=true shards_acknowledged=true index=custom-index
执行以下代码,可以向目标索引中写入文档。
package com.bytedance.openplatform.imgr.core.client; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestClient; import org.opensearch.client.RestClientBuilder; import org.opensearch.client.RestHighLevelClient; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; public class OSinitHttpClient { RestHighLevelClient initHttpClient(List<HttpHost> hosts, String user, String password) { if (hosts == null || hosts.isEmpty()) { throw new IllegalArgumentException("invalid empty hosts"); } // add more parameters check here // 身份信息相关 CredentialsProvider provider = new BasicCredentialsProvider(); provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(user, password)); RestClientBuilder builder = RestClient.builder(hosts.toArray(new HttpHost[0])) .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder .setConnectionTimeToLive(300, TimeUnit.SECONDS) .setMaxConnPerRoute(10) .setMaxConnTotal(20) .setDefaultCredentialsProvider(provider)) // 设置鉴权凭证 .setRequestConfigCallback(requestBuilder -> requestBuilder .setConnectionRequestTimeout(5000) // 从 connect 池中获取 connect 最大等待时间,根据实际情况调整 .setSocketTimeout(60000) // 请求超时时间,根据实际情况调整 .setConnectTimeout(5000)); // 建连超时时间,根据实际情况调整 return new RestHighLevelClient(builder); } public static void main(String[] args) throws Exception { RestHighLevelClient client = new OSinitHttpClient().initHttpClient( List.of(new HttpHost("<访问域名>", 9200, "http")), "admin", "<用户密码>"); // 写入文档 IndexRequest request = new IndexRequest("<索引名称>"); request.id("1"); // 设置文档 id request.source(Map.of( "age", 10, "name", "test" )); // Place your content into the index's source. IndexResponse indexResponse = client.index(request, RequestOptions.DEFAULT); System.out.println("文档创建状态:" + indexResponse.status()); // 输出 - 文档创建状态:CREATED } }
运行程序,返回如下类似信息:
文档创建状态:CREATED
public SearchResponse search(String indexName, String userId, float[] vec, int k) throws IOException { SearchRequest searchRequest = new SearchRequest(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.query(new WrapperQueryBuilder( "{\n" + " \"knn\": {\n" + " \"embedding\": {\n" + " \"vector\": " + objectMapper.writeValueAsString(vec) + ",\n" + " \"k\": " + k + ",\n" + " \"filter\": {\n" + " \"bool\": {\n" + " \"must\": [\n" + " {\n" + " \"term\": {\n" + " \"userid\": " + userId + "\n" + " }\n" + " }\n" + " ]\n" + " }\n" + " }\n" + " }\n" + " }\n" + " }" )); searchRequest.indices(indexName).source(searchSourceBuilder); return client.search(searchRequest, RequestOptions.DEFAULT); }