1、 Java连接Elasticsearch 8.12.2
正文内容均以Elasticsearch8.12.2版本为例,小版本差距不大,可直接使用。后续以es8代替Elasticsearch8.12.2。
1.1 Maven依赖
<properties> <!--【Elasticsearch搜索引擎】--> <elasticsearch-java.version>8.13.4</elasticsearch-java.version> <elasticsearch-rest.version>8.13.4</elasticsearch-rest.version> <jackson-databind.version>2.17.1</jackson-databind.version> <jakarta-json.version>2.1.3</jakarta-json.version> <commons-pool2.version>2.12.0</commons-pool2.version> </properties> <dependencies> <!--++++++++++++++++++++++++++++++++++++++++++++++++++++--> <!--【Elasticsearch搜索引擎 --> <!--++++++++++++++++++++++++++++++++++++++++++++++++++++--> <dependency> <groupId>co.elastic.clients</groupId> <artifactId>elasticsearch-java</artifactId> <version>${elasticsearch-java.version}</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>${elasticsearch-rest.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>${jackson-databind.version}</version> </dependency> <dependency> <groupId>jakarta.json</groupId> <artifactId>jakarta.json-api</artifactId> <version>${jakarta-json.version}</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>${commons-pool2.version}</version> </dependency> </dependencies>
1.2 普通连接
Elastic官网:https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/8.13/connecting.html
// URL and API key String serverUrl = "https://localhost:9200"; // Elasticsearch 服务URL String apiKey = "VnVhQ2ZHY0JDZGJrU..."; // Elasticsearch API密钥 // Create the low-level client RestClient restClient = RestClient .builder(HttpHost.create(serverUrl)) .setDefaultHeaders(new Header[]{ new BasicHeader("Authorization", "ApiKey " + apiKey) }) .build(); // Create the transport with a Jackson mapper ElasticsearchTransport transport = new RestClientTransport( restClient, new JacksonJsonpMapper()); // And create the API client (创建API客户端) ElasticsearchClient esClient = new ElasticsearchClient(transport);
1.3 连接池
基于commons.pool2,自定义了一套ES8的连接池。
(1) ESClientPoolFactory.java
package com.wanma.framework_api.sdk.elasticsearch; import co.elastic.clients.elasticsearch.ElasticsearchClient; import co.elastic.clients.json.jackson.JacksonJsonpMapper; import co.elastic.clients.transport.ElasticsearchTransport; import co.elastic.clients.transport.rest_client.RestClientTransport; import lombok.extern.slf4j.Slf4j; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.PooledObjectFactory; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import java.util.ArrayList; import java.util.Arrays; import java.util.List; /** * Elasticsearch客户端连接池工厂类 * * @author dddsoft * @date 2024/05/21 */ @Slf4j public class ESClientPoolFactory implements PooledObjectFactory<ElasticsearchClient> { /** * 创建ElasticsearchClient对象 */ @Override public PooledObject<ElasticsearchClient> makeObject() throws Exception { String esServerHost = "ip:port,ip:port"; List<HttpHost> httpHosts = new ArrayList<>(); // 填充数据 List<String> hostList = Arrays.asList(esServerHost.split(",")); for (int i = 0; i < hostList.size(); i++) { String host = hostList.get(i); httpHosts.add( new HttpHost( host.substring(0, host.indexOf(":")), Integer.parseInt(host.substring(host.indexOf(":") + 1)), "http" ) ); } // 创建低级客户端 RestClient restClient = RestClient.builder(httpHosts.toArray(new HttpHost[0])).build(); // 使用Jackson映射器创建传输层 ElasticsearchTransport transport = new RestClientTransport( restClient, new JacksonJsonpMapper() ); ElasticsearchClient client = new ElasticsearchClient(transport); // log.info("创建了对象:" + client); return new DefaultPooledObject<>(client); } /** * 销毁ElasticsearchClient对象 */ @Override public void destroyObject(PooledObject<ElasticsearchClient> pooledObject) throws Exception { ElasticsearchClient elasticsearchClient = pooledObject.getObject(); // log.info("销毁了对象" + elasticsearchClient); } @Override public boolean validateObject(PooledObject<ElasticsearchClient> pooledObject) { return true; } @Override public void activateObject(PooledObject<ElasticsearchClient> pooledObject) throws Exception { // log.info("对象被激活了" + pooledObject.getObject()); } @Override public void passivateObject(PooledObject<ElasticsearchClient> pooledObject) throws Exception { // log.info("对象被钝化了" + pooledObject.getObject()); } }
(2) ESClientPool.java
package com.wanma.framework_api.sdk.elasticsearch; import co.elastic.clients.elasticsearch.ElasticsearchClient; import lombok.extern.slf4j.Slf4j; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import java.time.Duration; /** * Elasticsearch客户端连接池 * * @author Albert * @date 2024/05/21 */ @Slf4j public class ESClientPool { // 对象池配置类,不写也可以,采用默认配置 private static GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig(); // 采用默认配置maxTotal是8,池中有8个client static { poolConfig.setMaxIdle(200); poolConfig.setMaxTotal(20); //【注意】setMinEvictableIdleTimeMillis()已弃用,参考连接 // https://commons.apache.org/proper/commons-dbcp/apidocs/org/apache/commons/dbcp2/cpdsadapter/DriverAdapterCPDS.html // poolConfig.setMinEvictableIdleTimeMillis(1000L * 3L); poolConfig.setMinEvictableIdleDuration(Duration.ofMillis(1000L * 3L)); } // 要池化的对象的工厂类,这个是我们要实现的类 private static ESClientPoolFactory esClientPoolFactory = new ESClientPoolFactory(); // 利用对象工厂类和配置类生成对象池 private static GenericObjectPool<ElasticsearchClient> clientPool = new GenericObjectPool<>(esClientPoolFactory, poolConfig); /** * 获得对象 */ public static ElasticsearchClient getClient() throws Exception { ElasticsearchClient client = clientPool.borrowObject(); // log.info("从池中取一个对象" + client); return client; } /** * 归还对象 */ public static void returnClient(ElasticsearchClient client) throws Exception { // log.info("使用完毕后,归还对象" + client); clientPool.returnObject(client); } }
1.4 使用
// 获取连接 ElasticsearchClient client = ESClientPool.getClient(); // 创建索引 Boolean acknowledged = client.indices().create(c -> c.index(indexName)).acknowledged(); // 归还连接对象 ESClientPool.returnClient(client);
参考文章: