PHP笔记网

革命尚未成功,同志仍须努力下载JDK17

作者:Albert.Wen  添加时间:2024-05-21 09:32:49  修改时间:2025-01-09 00:29:29  分类:04.大数据/Elasticsearch  编辑

1、 Java连接Elasticsearch 8.12.2

正文内容均以Elasticsearch8.12.2版本为例,小版本差距不大,可直接使用。后续以es8代替Elasticsearch8.12.2。

1.1 Maven依赖

<properties>
    <!--【Elasticsearch搜索引擎】-->
    <elasticsearch-java.version>8.13.4</elasticsearch-java.version>
    <elasticsearch-rest.version>8.13.4</elasticsearch-rest.version>
    <jackson-databind.version>2.17.1</jackson-databind.version>
    <jakarta-json.version>2.1.3</jakarta-json.version>
    <commons-pool2.version>2.12.0</commons-pool2.version>
</properties>
 
<dependencies>
    <!--++++++++++++++++++++++++++++++++++++++++++++++++++++-->
    <!--【Elasticsearch搜索引擎 -->
    <!--++++++++++++++++++++++++++++++++++++++++++++++++++++-->
    <dependency>
        <groupId>co.elastic.clients</groupId>
        <artifactId>elasticsearch-java</artifactId>
        <version>${elasticsearch-java.version}</version>
    </dependency>
     
    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>elasticsearch-rest-client</artifactId>
        <version>${elasticsearch-rest.version}</version>
    </dependency>
 
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>${jackson-databind.version}</version>
    </dependency>
 
    <dependency>
        <groupId>jakarta.json</groupId>
        <artifactId>jakarta.json-api</artifactId>
        <version>${jakarta-json.version}</version>
    </dependency>
 
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-pool2</artifactId>
        <version>${commons-pool2.version}</version>
    </dependency>
</dependencies>

1.2 普通连接

Elastic官网:https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/8.13/connecting.html

// URL and API key
String serverUrl = "https://localhost:9200";    // Elasticsearch 服务URL
String apiKey = "VnVhQ2ZHY0JDZGJrU...";         // Elasticsearch API密钥
 
// Create the low-level client
RestClient restClient = RestClient
    .builder(HttpHost.create(serverUrl))
    .setDefaultHeaders(new Header[]{
        new BasicHeader("Authorization", "ApiKey " + apiKey)
    })
    .build();
 
// Create the transport with a Jackson mapper
ElasticsearchTransport transport = new RestClientTransport(
    restClient, new JacksonJsonpMapper());
 
// And create the API client (创建API客户端)
ElasticsearchClient esClient = new ElasticsearchClient(transport);

1.3 连接池

基于commons.pool2,自定义了一套ES8的连接池。

(1) ESClientPoolFactory.java

package com.wanma.framework_api.sdk.elasticsearch;
 
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
 
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
 
/**
 * Elasticsearch客户端连接池工厂类
 *
 * @author dddsoft
 * @date 2024/05/21
 */
@Slf4j
public class ESClientPoolFactory implements PooledObjectFactory<ElasticsearchClient> {
    /**
     * 创建ElasticsearchClient对象
     */
    @Override
    public PooledObject<ElasticsearchClient> makeObject() throws Exception {
        String esServerHost = "ip:port,ip:port";
        List<HttpHost> httpHosts = new ArrayList<>();
 
        // 填充数据
        List<String> hostList = Arrays.asList(esServerHost.split(","));
        for (int i = 0; i < hostList.size(); i++) {
            String host = hostList.get(i);
            httpHosts.add(
                new HttpHost(
                    host.substring(0, host.indexOf(":")),
                    Integer.parseInt(host.substring(host.indexOf(":") + 1)),
                    "http"
                )
            );
        }
 
        // 创建低级客户端
        RestClient restClient = RestClient.builder(httpHosts.toArray(new HttpHost[0])).build();
 
        // 使用Jackson映射器创建传输层
        ElasticsearchTransport transport = new RestClientTransport(
            restClient, new JacksonJsonpMapper()
        );
 
        ElasticsearchClient client = new ElasticsearchClient(transport);
        // log.info("创建了对象:" + client);
 
        return new DefaultPooledObject<>(client);
    }
 
    /**
     * 销毁ElasticsearchClient对象
     */
    @Override
    public void destroyObject(PooledObject<ElasticsearchClient> pooledObject) throws Exception {
        ElasticsearchClient elasticsearchClient = pooledObject.getObject();
        // log.info("销毁了对象" + elasticsearchClient);
    }
 
    @Override
    public boolean validateObject(PooledObject<ElasticsearchClient> pooledObject) {
        return true;
    }
 
    @Override
    public void activateObject(PooledObject<ElasticsearchClient> pooledObject) throws Exception {
        // log.info("对象被激活了" + pooledObject.getObject());
    }
 
    @Override
    public void passivateObject(PooledObject<ElasticsearchClient> pooledObject) throws Exception {
        // log.info("对象被钝化了" + pooledObject.getObject());
    }
}

(2) ESClientPool.java

package com.wanma.framework_api.sdk.elasticsearch;
 
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
 
import java.time.Duration;
 
/**
 * Elasticsearch客户端连接池
 *
 * @author Albert
 * @date 2024/05/21
 */
@Slf4j
public class ESClientPool {
    // 对象池配置类,不写也可以,采用默认配置
    private static GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
 
    // 采用默认配置maxTotal是8,池中有8个client
    static {
        poolConfig.setMaxIdle(200);
        poolConfig.setMaxTotal(20);
 
        //【注意】setMinEvictableIdleTimeMillis()已弃用,参考连接
        // https://commons.apache.org/proper/commons-dbcp/apidocs/org/apache/commons/dbcp2/cpdsadapter/DriverAdapterCPDS.html
        // poolConfig.setMinEvictableIdleTimeMillis(1000L * 3L);
        poolConfig.setMinEvictableIdleDuration(Duration.ofMillis(1000L * 3L));
    }
 
    // 要池化的对象的工厂类,这个是我们要实现的类
    private static ESClientPoolFactory esClientPoolFactory = new ESClientPoolFactory();
 
    // 利用对象工厂类和配置类生成对象池
    private static GenericObjectPool<ElasticsearchClient> clientPool
        = new GenericObjectPool<>(esClientPoolFactory, poolConfig);
 
    /**
     * 获得对象
     */
    public static ElasticsearchClient getClient() throws Exception {
        ElasticsearchClient client = clientPool.borrowObject();
        // log.info("从池中取一个对象" + client);
        return client;
    }
 
    /**
     * 归还对象
     */
    public static void returnClient(ElasticsearchClient client) throws Exception {
        // log.info("使用完毕后,归还对象" + client);
        clientPool.returnObject(client);
    }
}

1.4 使用

// 获取连接
ElasticsearchClient client = ESClientPool.getClient();
 
// 创建索引
Boolean acknowledged = client.indices().create(c -> c.index(indexName)).acknowledged();
 
// 归还连接对象
ESClientPool.returnClient(client);

 

 

参考文章:

  1. 整合Elasticsearch时,报错:java.lang.NoClassDefFoundError: jakarta/json/JsonException