本文介绍如何使用 Java 语言,通过 opensearch-java 客户端连接 OpenSearch 实例,并为您提供示例代码。
Maven
<dependency> <groupId>org.opensearch.client</groupId> <artifactId>opensearch-java</artifactId> <version>2.21.0</version> </dependency> <dependency> <groupId>org.apache.httpcomponents.client5</groupId> <artifactId>httpclient5</artifactId> <version>5.2.1</version> </dependency>
Gradle
dependencies { implementation 'org.opensearch.client:opensearch-java:2.21.0' implementation 'org.apache.httpcomponents.client5:httpclient5:5.2.1' }
运行以下程序,返回如下类似信息:
current cluster status is: GREEN
该场景适用于连接访问方式为 HTTPS 的 OpneSearch 实例,且需要认证实例的 HTTPS 证书。
示例代码如下:
import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import org.apache.hc.client5.http.auth.AuthScope; import org.apache.hc.client5.http.auth.UsernamePasswordCredentials; import org.apache.hc.client5.http.config.ConnectionConfig; import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider; import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; import org.apache.hc.core5.function.Factory; import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.nio.ssl.TlsStrategy; import org.apache.hc.core5.reactor.ssl.TlsDetails; import org.apache.hc.core5.ssl.SSLContextBuilder; import org.apache.hc.core5.util.Timeout; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.opensearch.cat.HealthRequest; import org.opensearch.client.opensearch.cluster.HealthResponse; import org.opensearch.client.transport.OpenSearchTransport; import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder; import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Paths; import java.security.KeyStore; import java.security.cert.CertificateFactory; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; public class OSinitClientWithCA { public OpenSearchClient initClientWithCA(List<HttpHost> hosts, String caPath, String user, String password) throws Exception { if (hosts == null || hosts.isEmpty()) { throw new IllegalArgumentException("invalid empty hosts"); } BasicCredentialsProvider provider = new BasicCredentialsProvider(); for (HttpHost host : hosts) { provider.setCredentials(new AuthScope(host), new UsernamePasswordCredentials(user, password.toCharArray())); } SSLContext sslContext; try { InputStream is = Files.newInputStream(Paths.get(caPath)); KeyStore trustStore = KeyStore.getInstance("pkcs12"); trustStore.load(null,null); trustStore.setCertificateEntry("ca", CertificateFactory.getInstance("X.509").generateCertificate(is)); sslContext = new SSLContextBuilder().loadTrustMaterial(trustStore, null).build(); } catch (Exception e) { // add your own log here throw e; } final ApacheHttpClient5TransportBuilder builder = ApacheHttpClient5TransportBuilder.builder(host); builder.setHttpClientConfigCallback(httpClientBuilder -> { final TlsStrategy tlsStrategy = ClientTlsStrategyBuilder.create() .setSslContext(sslContext) // See https://issues.apache.org/jira/browse/HTTPCLIENT-2219 .setTlsDetailsFactory(new Factory<SSLEngine, TlsDetails>() { @Override public TlsDetails create(final SSLEngine sslEngine) { return new TlsDetails(sslEngine.getSession(), sslEngine.getApplicationProtocol()); } }) .build(); final PoolingAsyncClientConnectionManager connectionManager = PoolingAsyncClientConnectionManagerBuilder .create() .setTlsStrategy(tlsStrategy) .setMaxConnTotal(1000) .setMaxConnPerRoute(1000) .setDefaultConnectionConfig(ConnectionConfig.custom() .setTimeToLive(300, TimeUnit.SECONDS) .setConnectTimeout(Timeout.ofSeconds(5)) .setSocketTimeout(Timeout.ofSeconds(60)) .build()) .build(); return httpClientBuilder .setDefaultCredentialsProvider(provider) .setConnectionManager(connectionManager); }); final OpenSearchTransport transport = builder.build(); return new OpenSearchClient(transport); } public static void main(String[] args) throws Exception { String file = "<证书保存地址>"; OpenSearchClient client = new OSinitClientWithCA().initClientWithCA( Arrays.asList(new HttpHost("https","<访问域名>", 9200)), file, "admin", "<用户密码>"); try { HealthResponse health = client.cluster().health(); System.out.println("current cluster status is: " + health.status()); // 输出 - current cluster status is: GREEN } catch (Exception e) { e.printStackTrace(); } client.shutdown(); } }
该场景适用于连接访问方式为 HTTPS 的 OpenSearch 实例,连接过程中忽略 HTTPS 证书。
示例代码如下:
import org.apache.hc.client5.http.auth.AuthScope; import org.apache.hc.client5.http.auth.UsernamePasswordCredentials; import org.apache.hc.client5.http.config.ConnectionConfig; import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider; import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; import org.apache.hc.core5.function.Factory; import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.nio.ssl.TlsStrategy; import org.apache.hc.core5.reactor.ssl.TlsDetails; import org.apache.hc.core5.ssl.SSLContextBuilder; import org.apache.hc.core5.util.Timeout; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.opensearch.cluster.HealthResponse; import org.opensearch.client.transport.OpenSearchTransport; import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import java.util.concurrent.TimeUnit; public class OSinitClientIgnoreCA { public OpenSearchClient initClientIgnoreCA(HttpHost[] hosts, String user, String password) throws Exception { if (hosts == null || hosts.length == 0) { throw new IllegalArgumentException("invalid empty hosts"); } BasicCredentialsProvider provider = new BasicCredentialsProvider(); for (HttpHost host : hosts) { provider.setCredentials(new AuthScope(host), new UsernamePasswordCredentials(user, password.toCharArray())); } SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, (x509Certificates, s) -> true).build(); // 跳过 SSL 证书校验 final ApacheHttpClient5TransportBuilder builder = ApacheHttpClient5TransportBuilder.builder(hosts); builder.setHttpClientConfigCallback(httpClientBuilder -> { final TlsStrategy tlsStrategy = ClientTlsStrategyBuilder.create() .setSslContext(sslContext) // See https://issues.apache.org/jira/browse/HTTPCLIENT-2219 .setTlsDetailsFactory(new Factory<SSLEngine, TlsDetails>() { @Override public TlsDetails create(final SSLEngine sslEngine) { return new TlsDetails(sslEngine.getSession(), sslEngine.getApplicationProtocol()); } }) .build(); final PoolingAsyncClientConnectionManager connectionManager = PoolingAsyncClientConnectionManagerBuilder .create() .setTlsStrategy(tlsStrategy) .setMaxConnTotal(1000) //最大连接数 .setMaxConnPerRoute(1000) //route的最大连接数 .setDefaultConnectionConfig(ConnectionConfig.custom() .setTimeToLive(300, TimeUnit.SECONDS) //链接最大存活时间,实例重启后自动平衡负载周期 .setConnectTimeout(Timeout.ofSeconds(5)) //连接超时时间 .setSocketTimeout(Timeout.ofSeconds(60)) .build()) .build(); return httpClientBuilder .setDefaultCredentialsProvider(provider) .setConnectionManager(connectionManager); }); final OpenSearchTransport transport = builder.build(); return new OpenSearchClient(transport); } public static void main(String[] args) throws Exception { OpenSearchClient client = new OSinitClientIgnoreCA().initClientIgnoreCA( new HttpHost[]{new HttpHost("https","<访问域名>", 9200)}, "admin", "<用户密码>"); try { HealthResponse health = client.cluster().health(); System.out.println("current cluster status is: " + health.status()); // 输出 - current cluster status is: GREEN } catch (Exception e) { e.printStackTrace(); } client.shutdown(); } }
该场景适用于连接访问方式为 HTTP 的 OpenSearch 实例。
示例代码如下:
import org.apache.hc.client5.http.auth.AuthScope; import org.apache.hc.client5.http.auth.UsernamePasswordCredentials; import org.apache.hc.client5.http.config.ConnectionConfig; import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider; import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; import org.apache.hc.core5.function.Factory; import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.nio.ssl.TlsStrategy; import org.apache.hc.core5.reactor.ssl.TlsDetails; import org.apache.hc.core5.ssl.SSLContextBuilder; import org.apache.hc.core5.util.Timeout; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.opensearch.cluster.HealthResponse; import org.opensearch.client.transport.OpenSearchTransport; import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import java.util.concurrent.TimeUnit; public class OSinitClientHttp { public OpenSearchClient initClientHttp(HttpHost[] hosts, String user, String password) throws Exception { if (hosts == null || hosts.length == 0) { throw new IllegalArgumentException("invalid empty hosts"); } BasicCredentialsProvider provider = new BasicCredentialsProvider(); for (HttpHost host : hosts) { provider.setCredentials(new AuthScope(host), new UsernamePasswordCredentials(user, password.toCharArray())); } final ApacheHttpClient5TransportBuilder builder = ApacheHttpClient5TransportBuilder.builder(hosts); builder.setHttpClientConfigCallback(httpClientBuilder -> { final PoolingAsyncClientConnectionManager connectionManager = PoolingAsyncClientConnectionManagerBuilder .create() .setMaxConnTotal(1000) //最大连接数 .setMaxConnPerRoute(1000) //route的最大连接数 .setDefaultConnectionConfig(ConnectionConfig.custom() .setTimeToLive(300, TimeUnit.SECONDS) //链接最大存活时间,实例重启后自动平衡负载周期 .setConnectTimeout(Timeout.ofSeconds(5)) // 连接超时时间 .setSocketTimeout(Timeout.ofSeconds(60)) .build()) .build(); return httpClientBuilder .setDefaultCredentialsProvider(provider) .setConnectionManager(connectionManager); }); final OpenSearchTransport transport = builder.build(); return new OpenSearchClient(transport); } public static void main(String[] args) throws Exception { OpenSearchClient client = new OSinitClientHttp().initClientHttp( new HttpHost[]{new HttpHost("https","<访问域名>", 9200)}, "admin", "<用户密码>"); try { HealthResponse health = client.cluster().health(); System.out.println("current cluster status is: " + health.status()); // 输出 - current cluster status is: GREEN } catch (Exception e) { e.printStackTrace(); } client.shutdown(); } }
执行以下代码,可以向目标索引中写入文档。
// 最小化创建索引 String index = "sample-index"; CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder().index(index).build(); client.indices().create(createIndexRequest); IndexSettings indexSettings = new IndexSettings.Builder().autoExpandReplicas("0-all").build(); PutIndicesSettingsRequest putIndicesSettingsRequest = new PutIndicesSettingsRequest.Builder().index(index).value(indexSettings).build(); client.indices().putSettings(putIndicesSettingsRequest); // 创建disann 向量索引 String index = "sample-index"; final KnnVectorProperty nameVector = KnnVectorProperty.of( v -> v.dimension(4) .method(m -> m.spaceType("cosinesimil") .engine("diskann") .name("diskann") .parameters("L", JsonData.of(30)) .parameters("R", JsonData.of(32)) ) ); client.indices().create(ci -> ci.index(index) .mappings( builder -> builder.properties( "firstName", p -> p.text(t -> t.analyzer("ik_smart")) ).properties("nameVector", p -> p.knnVector(nameVector)) ) .settings(s -> s.index(indexSetting -> indexSetting.knn(true)))); // 更新索引配置 client.indices().putSettings(ps -> ps.index(index) .settings(s -> s.autoExpandReplicas("0-all")) );
这里只展示向量和文本基本的增删改查样例, 其他高级特性如利用ingest pipeline无感写入向量等,参考最佳实践其他文档。
增删改查所有的文档结构基于以下类。
class IndexData { private String firstName; private Float[] nameVector; public IndexData() { } public IndexData(String firstName, Float[] nameVector) { this.firstName = firstName; this.nameVector = nameVector; } public void setFirstName(String firstName) { this.firstName = firstName; } public void setNameVector(Float[] nameVector) { this.nameVector = nameVector; } public String getFirstName() { return this.firstName; } public Float[] getNameVector() { return this.nameVector; } }
IndexData indexData = new IndexData( "云搜索", new Float[]{ 1.0f,1.0f,1.0f,1.0f}); client.index(ib -> ib.index(index).id("1").document(indexData)); client.indices().refresh(r -> r.index(index));
// 文本搜索 SearchResponse<IndexData> search_res = client.search( r -> r.query( q -> q.match( match -> match.field("firstName") .query(f -> f.stringValue("云搜索")) .analyzer("ik_smart") ) ) , IndexData.class); System.out.println(search_res.hits().hits().get(0).source().getFirstName()); // 向量搜索 SearchResponse<IndexData> vector_res = client.search( r -> r.query( q -> q.knn( knn -> knn.field("nameVector") .k(10) .vector(new float[]{ 1.0f, 1.0f, 1.0f, 1.0f}) ) ) , IndexData.class); System.out.println(vector_res.hits().hits().get(0).source().getFirstName()); // 使用原始的json 搜索 String json_query = "{\"match\":{ \"firstName\": \"云搜索\"}}"; String encodedJSON = Base64.getEncoder().encodeToString(json_query.getBytes()); SearchResponse<IndexData> search_by_json = client.search( r -> r.query( q -> q.wrapper(m -> m.query(encodedJSON))), IndexData.class ); System.out.println(search_by_json.hits().hits().get(0).source().getFirstName()); //class 参数传递 com.fasterxml.jackson.databind.node.ObjectNode 可以获取原始json
//进行数据更新 IndexData updateData = new IndexData(); updateData.setFirstName("OpenSearch"); client.update(r -> r.id("1").doc(updateData).index(index),IndexData.class); //刷盘 client.indices().refresh(r -> r.index(index)); //查询数据是否更新成功 GetResponse<IndexData> getRes = client.get(r -> r.id("1").index(index), IndexData.class); System.out.println(getRes.source().getFirstName());
client.delete(b -> b.index(index).id("1"));
import org.apache.hc.client5.http.auth.AuthScope; import org.apache.hc.client5.http.auth.UsernamePasswordCredentials; import org.apache.hc.client5.http.config.ConnectionConfig; import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider; import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; import org.apache.hc.core5.function.Factory; import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.nio.ssl.TlsStrategy; import org.apache.hc.core5.reactor.ssl.TlsDetails; import org.apache.hc.core5.ssl.SSLContextBuilder; import org.apache.hc.core5.util.Timeout; import org.opensearch.client.json.JsonData; import org.opensearch.client.opensearch.OpenSearchClient; import org.opensearch.client.opensearch._types.FieldValue; import org.opensearch.client.opensearch._types.mapping.KnnVectorProperty; import org.opensearch.client.opensearch._types.mapping.Property; import org.opensearch.client.opensearch._types.mapping.TextProperty; import org.opensearch.client.opensearch._types.mapping.TypeMapping; import org.opensearch.client.opensearch.cluster.HealthResponse; import org.opensearch.client.opensearch.core.*; import org.opensearch.client.opensearch.indices.CreateIndexRequest; import org.opensearch.client.opensearch.indices.IndexSettings; import org.opensearch.client.opensearch.indices.PutIndicesSettingsRequest; import org.opensearch.client.opensearch.indices.RefreshRequest; import org.opensearch.client.transport.OpenSearchTransport; import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLEngine; import java.util.Base64; import java.util.concurrent.TimeUnit; public class OSinitClientTest { public OpenSearchClient initClientIgnoreCA(HttpHost[] hosts, String user, String password) throws Exception { if (hosts == null || hosts.length == 0) { throw new IllegalArgumentException("invalid empty hosts"); } BasicCredentialsProvider provider = new BasicCredentialsProvider(); for (HttpHost host : hosts) { provider.setCredentials(new AuthScope(host), new UsernamePasswordCredentials(user, password.toCharArray())); } SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, (x509Certificates, s) -> true).build(); // 跳过 SSL 证书校验 final ApacheHttpClient5TransportBuilder builder = ApacheHttpClient5TransportBuilder.builder(hosts); builder.setHttpClientConfigCallback(httpClientBuilder -> { final TlsStrategy tlsStrategy = ClientTlsStrategyBuilder.create() .setSslContext(sslContext) // See https://issues.apache.org/jira/browse/HTTPCLIENT-2219 .setTlsDetailsFactory(new Factory<SSLEngine, TlsDetails>() { @Override public TlsDetails create(final SSLEngine sslEngine) { return new TlsDetails(sslEngine.getSession(), sslEngine.getApplicationProtocol()); } }) .build(); final PoolingAsyncClientConnectionManager connectionManager = PoolingAsyncClientConnectionManagerBuilder .create() .setTlsStrategy(tlsStrategy) .setMaxConnTotal(1000) //最大连接数 .setMaxConnPerRoute(1000) //route的最大连接数 .setDefaultConnectionConfig(ConnectionConfig.custom() .setTimeToLive(300, TimeUnit.SECONDS) //链接最大存活时间,实例重启后自动平衡负载周期 .setConnectTimeout(Timeout.ofSeconds(5)) // 连接超时时间 .setSocketTimeout(Timeout.ofSeconds(60)) .build()) .build(); return httpClientBuilder .setDefaultCredentialsProvider(provider) .setConnectionManager(connectionManager); }); final OpenSearchTransport transport = builder.build(); return new OpenSearchClient(transport); } public static void main(String[] args) throws Exception { /** * 连接实例 */ OpenSearchClient client = new OSinitClientTest().initClientIgnoreCA( new HttpHost[]{new HttpHost("<https|http>","<接入点地址>", 9200)}, "<账号>", "<密码>"); try { HealthResponse health = client.cluster().health(); System.out.println("current cluster status is: " + health.status()); // 输出 - current cluster status is: GREEN } catch (Exception e) { e.printStackTrace(); } /** * 索引创建,构建一个带有vector和text的索引 */ String index = "sample-index"; final KnnVectorProperty nameVector = KnnVectorProperty.of( v -> v.dimension(4) .method(m -> m.spaceType("cosinesimil") .engine("diskann") .name("diskann") .parameters("L", JsonData.of(30)) .parameters("R", JsonData.of(32)) ) ); client.indices().create(ci -> ci.index(index) .mappings( builder -> builder.properties( "firstName", p -> p.text(t -> t.analyzer("ik_smart")) ).properties("nameVector", p -> p.knnVector(nameVector)) ) .settings(s -> s.index(indexSetting -> indexSetting.knn(true)))); /** * 更新索引配置 */ client.indices().putSettings(ps -> ps.index(index) .settings(s -> s.autoExpandReplicas("0-all")) ); /** * 写入文档 */ IndexData indexData = new IndexData( "云搜索", new Float[]{ 1.0f,1.0f,1.0f,1.0f}); client.index(ib -> ib.index(index).id("1").document(indexData)); // 刷新索引保证文档立即可见 client.indices().refresh(r -> r.index(index)); /** * 用json进行搜索 */ String json_query = "{\"match\":{ \"firstName\": \"云搜索\"}}"; String encodedJSON = Base64.getEncoder().encodeToString(json_query.getBytes()); SearchResponse<IndexData> search_by_json = client.search( r -> r.query( q -> q.wrapper(m -> m.query(encodedJSON))), IndexData.class ); System.out.println(search_by_json.hits().hits().get(0).source().getFirstName()); /** * 进行全文搜索 */ SearchResponse<IndexData> search_res = client.search( r -> r.query( q -> q.match( match -> match.field("firstName") .query(f -> f.stringValue("云搜索")) .analyzer("ik_smart") ) ) , IndexData.class); System.out.println(search_res.hits().hits().get(0).source().getFirstName()); /** * 进行向量搜索 */ SearchResponse<IndexData> vector_res = client.search( r -> r.query( q -> q.knn( knn -> knn.field("nameVector") .k(10) .vector(new float[]{ 1.0f, 1.0f, 1.0f, 1.0f}) ) ) , IndexData.class); System.out.println(vector_res.hits().hits().get(0).source().getFirstName()); /** * 更新文档 */ IndexData updateData = new IndexData(); updateData.setFirstName("OpenSearch"); client.update(r -> r.id("1").doc(updateData).index(index),IndexData.class); //刷新索引保证文档立即可见 client.indices().refresh(r -> r.index(index)); //确定更新结果 GetResponse<IndexData> getRes = client.get(r -> r.id("1").index(index), IndexData.class); System.out.println(getRes.source().getFirstName()); /** * 删除文档 */ client.delete(b -> b.index(index).id("1")); /** * 删除索引 */ client.indices().delete( r -> r.index(index)); client.shutdown(); } } class IndexData { private String firstName; private Float[] nameVector; public IndexData() { } public IndexData(String firstName, Float[] nameVector) { this.firstName = firstName; this.nameVector = nameVector; } public void setFirstName(String firstName) { this.firstName = firstName; } public void setNameVector(Float[] nameVector) { this.nameVector = nameVector; } public String getFirstName() { return this.firstName; } public Float[] getNameVector() { return this.nameVector; } }