1、 Java连接Elasticsearch 8.12.2
正文内容均以Elasticsearch8.12.2版本为例,小版本差距不大,可直接使用。后续以es8代替Elasticsearch8.12.2。
1.1 Maven依赖
<properties>
<!--【Elasticsearch搜索引擎】-->
<elasticsearch-java.version>8.13.4</elasticsearch-java.version>
<elasticsearch-rest.version>8.13.4</elasticsearch-rest.version>
<jackson-databind.version>2.17.1</jackson-databind.version>
<jakarta-json.version>2.1.3</jakarta-json.version>
<commons-pool2.version>2.12.0</commons-pool2.version>
</properties>
<dependencies>
<!--++++++++++++++++++++++++++++++++++++++++++++++++++++-->
<!--【Elasticsearch搜索引擎 -->
<!--++++++++++++++++++++++++++++++++++++++++++++++++++++-->
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>${elasticsearch-java.version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elasticsearch-rest.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson-databind.version}</version>
</dependency>
<dependency>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
<version>${jakarta-json.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>${commons-pool2.version}</version>
</dependency>
</dependencies>
1.2 普通连接
Elastic官网:https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/8.13/connecting.html
// URL and API key
String serverUrl = "https://localhost:9200"; // Elasticsearch 服务URL
String apiKey = "VnVhQ2ZHY0JDZGJrU..."; // Elasticsearch API密钥
// Create the low-level client
RestClient restClient = RestClient
.builder(HttpHost.create(serverUrl))
.setDefaultHeaders(new Header[]{
new BasicHeader("Authorization", "ApiKey " + apiKey)
})
.build();
// Create the transport with a Jackson mapper
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
// And create the API client (创建API客户端)
ElasticsearchClient esClient = new ElasticsearchClient(transport);
1.3 连接池
基于commons.pool2,自定义了一套ES8的连接池。
(1) ESClientPoolFactory.java
package com.wanma.framework_api.sdk.elasticsearch;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* Elasticsearch客户端连接池工厂类
*
* @author dddsoft
* @date 2024/05/21
*/
@Slf4j
public class ESClientPoolFactory implements PooledObjectFactory<ElasticsearchClient> {
/**
* 创建ElasticsearchClient对象
*/
@Override
public PooledObject<ElasticsearchClient> makeObject() throws Exception {
String esServerHost = "ip:port,ip:port";
List<HttpHost> httpHosts = new ArrayList<>();
// 填充数据
List<String> hostList = Arrays.asList(esServerHost.split(","));
for (int i = 0; i < hostList.size(); i++) {
String host = hostList.get(i);
httpHosts.add(
new HttpHost(
host.substring(0, host.indexOf(":")),
Integer.parseInt(host.substring(host.indexOf(":") + 1)),
"http"
)
);
}
// 创建低级客户端
RestClient restClient = RestClient.builder(httpHosts.toArray(new HttpHost[0])).build();
// 使用Jackson映射器创建传输层
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper()
);
ElasticsearchClient client = new ElasticsearchClient(transport);
// log.info("创建了对象:" + client);
return new DefaultPooledObject<>(client);
}
/**
* 销毁ElasticsearchClient对象
*/
@Override
public void destroyObject(PooledObject<ElasticsearchClient> pooledObject) throws Exception {
ElasticsearchClient elasticsearchClient = pooledObject.getObject();
// log.info("销毁了对象" + elasticsearchClient);
}
@Override
public boolean validateObject(PooledObject<ElasticsearchClient> pooledObject) {
return true;
}
@Override
public void activateObject(PooledObject<ElasticsearchClient> pooledObject) throws Exception {
// log.info("对象被激活了" + pooledObject.getObject());
}
@Override
public void passivateObject(PooledObject<ElasticsearchClient> pooledObject) throws Exception {
// log.info("对象被钝化了" + pooledObject.getObject());
}
}
(2) ESClientPool.java
package com.wanma.framework_api.sdk.elasticsearch;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import java.time.Duration;
/**
* Elasticsearch客户端连接池
*
* @author Albert
* @date 2024/05/21
*/
@Slf4j
public class ESClientPool {
// 对象池配置类,不写也可以,采用默认配置
private static GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
// 采用默认配置maxTotal是8,池中有8个client
static {
poolConfig.setMaxIdle(200);
poolConfig.setMaxTotal(20);
//【注意】setMinEvictableIdleTimeMillis()已弃用,参考连接
// https://commons.apache.org/proper/commons-dbcp/apidocs/org/apache/commons/dbcp2/cpdsadapter/DriverAdapterCPDS.html
// poolConfig.setMinEvictableIdleTimeMillis(1000L * 3L);
poolConfig.setMinEvictableIdleDuration(Duration.ofMillis(1000L * 3L));
}
// 要池化的对象的工厂类,这个是我们要实现的类
private static ESClientPoolFactory esClientPoolFactory = new ESClientPoolFactory();
// 利用对象工厂类和配置类生成对象池
private static GenericObjectPool<ElasticsearchClient> clientPool
= new GenericObjectPool<>(esClientPoolFactory, poolConfig);
/**
* 获得对象
*/
public static ElasticsearchClient getClient() throws Exception {
ElasticsearchClient client = clientPool.borrowObject();
// log.info("从池中取一个对象" + client);
return client;
}
/**
* 归还对象
*/
public static void returnClient(ElasticsearchClient client) throws Exception {
// log.info("使用完毕后,归还对象" + client);
clientPool.returnObject(client);
}
}
1.4 使用
// 获取连接 ElasticsearchClient client = ESClientPool.getClient(); // 创建索引 Boolean acknowledged = client.indices().create(c -> c.index(indexName)).acknowledged(); // 归还连接对象 ESClientPool.returnClient(client);
参考文章: