跳到主要内容

ES 写入报错及连接池 KeepAlive 优化方案

简述

系统在写入 Elasticsearch 时,偶尔会出现 java.io.IOException: Connection reset by peer 异常。数据写入频率较低,通常每隔几个小时触发一次。

java.io.IOException: Connection reset by peer  
at org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:828)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:248)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:251)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:235)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1514)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454)
at org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:497)

原因分析

  1. 客户端 KeepAlive 策略
    客户端使用的连接池(如 Druid)中,KeepAlive 默认值为 -1,表示连接永不过期,可重复复用。这种持续连接策略在低频请求场景中可能导致连接失效而未及时释放。

  2. 服务端 TCP KeepAlive 设置
    尽管客户端保持长连接,但 Linux 系统层的 TCP KeepAlive 默认会在一段时间内(如 600 秒)探测连接活性。若此期间无任何数据交互,服务端将主动断开连接。

当服务端断开 TCP 连接后,客户端仍可能误判该连接有效。如果此时有新的 ES 请求使用了这个已失效的连接,就会触发 Connection reset by peer 异常。

解决方案

方法一:设置合理的连接空闲时间

在构造 Elasticsearch 客户端连接时设置 KeepAlive 策略,例如设定最小空闲时间为 300 秒。客户端在空闲超时后主动释放连接,避免使用已被服务端关闭的“僵尸连接”。

final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));

RestClientBuilder builder = RestClient.builder(new HttpHost(host, 9200, "http"));
builder.setHttpClientConfigCallback(httpAsyncClientBuilder ->
httpAsyncClientBuilder
.setDefaultCredentialsProvider(credentialsProvider)
.setKeepAliveStrategy((response, context) -> TimeUnit.SECONDS.toMillis(300))
);

RestHighLevelClient restHighLevelClient = new RestHighLevelClient(builder);

方法二:异常处理与重试机制

由于该异常发生频率不高,也可以在请求 ES 时对 IOException 进行捕获处理,必要时重新初始化连接并重试 1~3 次,确保请求成功或明确失败。

RestHighLevelClient client = null;
try {
client = esConf.getClient();
} catch (IOException e) {
log.error("IOException", e);
client = esConf.getClient(); // 重试一次
}