Am querying elasticsearch index documents
which is having more than 100K
documents via java code. Am using RestClient
for that.
While am trying to fetching documents am getting Exception in thread "main" java.io.IOException: listener timeout after waiting f
or [30000] ms
Please find the error below :
D:\Karthikeyan\ElasticSearch\ElasticSearch_Tesing\target>java -jar ElasticSearch
Utility-1.0.0-SNAPSHOT-jar-with-dependencies.jar
ERROR StatusLogger No log4j2 configuration file found. Using default configurati
on: logging only errors to the console. Set system property 'log4j2.debug' to sh
ow Log4j2 internal initialization logging.
Jul 13, 2018 2:46:37 PM com.es.utility.DocumentIndex main
INFO: Started Indexing the Document.....
Request --->SearchRequest{searchType=QUERY_THEN_FETCH, indices=[documents], indi
cesOptions=IndicesOptions[id=38, ignore_unavailable=false, allow_no_indices=true
, expand_wildcards_open=true, expand_wildcards_closed=false, allow_aliases_to_mu
ltiple_indices=true, forbid_closed_indices=true, ignore_aliases=false], types=[d
oc], routing='null', preference='null', requestCache=null, scroll=null, maxConcu
rrentShardRequests=0, batchedReduceSize=512, preFilterShardSize=128, source={"si
ze":120000,"query":{"match_all":{"boost":1.0}}}}
Jul 13, 2018 2:47:20 PM com.es.utility.DocumentIndex main
Else block
Exception in thread "main" java.io.IOException: listener timeout after waiting f
or [30000] ms
at org.elasticsearch.client.RestClient$SyncResponseListener.get(RestClie
nt.java:663)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:22
2)
at org.elasticsearch.client.RestClient.performRequest(RestClient.java:19
4)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighL
evelClient.java:443)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEn
tity(RestHighLevelClient.java:429)
at org.elasticsearch.client.RestHighLevelClient.index(RestHighLevelClien
t.java:312)
at com.es.utility.DocumentIndex.main(DocumentIndex.java:273)
Please find my java code :
public class DocumentIndex {
private final static String DOCUMENT = "documents";
private final static String ATTACHMENT = "document_attachment";
private final static String TYPE = "doc";
private static final Logger logger = Logger.getLogger(Thread.currentThread().getStackTrace()[0].getClassName());
static long BUFFER_SIZE = 500 * 1024 * 1024; // <---- set buffer to 500MB instead of 100MB
public static void main(String args[]) throws IOException {
RestHighLevelClient restHighLevelClient = null;
try {
restHighLevelClient = new RestHighLevelClient(RestClient.builder(new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")));
} catch (Exception e) {
System.out.println(e.getMessage());
}
RestClient restClient = null;
Response contentSearchResponse=null;
RestClientBuilder builder = null;
String responseBody = null;
Document doc=new Document();
logger.info("Started Indexing the Document.....");
try {
restClient = RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")).build();
} catch (Exception e) {
System.out.println(e.getMessage());
}
SearchRequest contentSearchRequest = new SearchRequest(DOCUMENT);
SearchSourceBuilder contentSearchSourceBuilder = new SearchSourceBuilder();
contentSearchRequest.types(TYPE);
QueryBuilder attachmentQB = QueryBuilders.matchAllQuery();
contentSearchSourceBuilder.query(attachmentQB);
contentSearchSourceBuilder.size(120000);
contentSearchRequest.source(contentSearchSourceBuilder);
System.out.println("Request --->"+contentSearchRequest.toString());
Map<String, String> params = Collections.emptyMap();
HttpEntity entity = new NStringEntity(contentSearchSourceBuilder.toString(), ContentType.APPLICATION_JSON);
HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory consumerFactory =
new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory((int) BUFFER_SIZE);
try {
contentSearchResponse = restClient.performRequest("GET", "/documents/doc/_search", params, entity, consumerFactory);
} catch (IOException e1) {
e1.printStackTrace();
}
try {
responseBody = EntityUtils.toString(contentSearchResponse.getEntity());
} catch (ParseException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
File all_files_path = new File("d:\\All_Files_Path.txt");
File available_files = new File("d:\\Available_Files.txt");
File missing_files = new File("d:\\Missing_Files.txt");
all_files_path.deleteOnExit();
available_files.deleteOnExit();
missing_files.deleteOnExit();
all_files_path.createNewFile();
available_files.createNewFile();
missing_files.createNewFile();
int totalFilePath=1;
int totalAvailableFile=1;
int missingFilecount=1;
JSONArray hitsArray;
JSONArray responseArray = new JSONArray();
JSONObject responseObj;
String fileRelativePath=null;
JSONObject json = new JSONObject(responseBody);
JSONObject hits = json.getJSONObject("hits");
hitsArray = hits.getJSONArray("hits");
ArrayList<Object> coalitionList = new ArrayList<Object>();
for (int i=0; i<hitsArray.length(); i++) {
ArrayList<String> fileNameIdArrList = new ArrayList<String>();
JSONObject h = hitsArray.getJSONObject(i);
JSONObject sourceJObj = h.getJSONObject("_source");
responseObj = new JSONObject();
fileRelativePath = sourceJObj.optString("path").toString().concat(sourceJObj.optString("filename").toString());
doc.setId((int) sourceJObj.opt("id"));
doc.setApp_language(String.valueOf(sourceJObj.opt("app_language")));
String encodedfile = null;
File file=null;
Map<String, Object> jsonMap ;
String filepath=doc.getPath().concat(doc.getFilename());
logger.info("ID---> "+doc.getId()+"File Path --->"+filepath);
try(PrintWriter out = new PrintWriter(new FileOutputStream(all_files_path, true)) ){
out.println("FilePath Count ---"+totalFilePath+":::::::ID---> "+doc.getId()+"File Path --->"+filepath);
}
file = new File(filepath);
if(file.exists() && !file.isDirectory()) {
try {
try(PrintWriter out = new PrintWriter(new FileOutputStream(available_files, true)) ){
out.println("Available File Count --->"+totalAvailableFile+":::::::ID---> "+doc.getId()+"File Path --->"+filepath);
totalAvailableFile++;
}
FileInputStream fileInputStreamReader = new FileInputStream(file);
byte[] bytes = new byte[(int) file.length()];
fileInputStreamReader.read(bytes);
encodedfile = new String(Base64.getEncoder().encodeToString(bytes));
fileInputStreamReader.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
else
{
System.out.println("Else block");
PrintWriter out = new PrintWriter(new FileOutputStream(missing_files, true));
out.println("Available File Count --->"+missingFilecount+":::::::ID---> "+doc.getId()+"File Path --->"+filepath);
out.close();
missingFilecount++;
}
jsonMap = new HashMap<>();
jsonMap.put("id", doc.getId());
jsonMap.put("app_language", doc.getApp_language());
jsonMap.put("fileContent", encodedfile);
String id=Long.toString(doc.getId());
IndexRequest request = new IndexRequest(ATTACHMENT, "doc", id )
.source(jsonMap)
.setPipeline(ATTACHMENT);
PrintStream printStream = new PrintStream(new File("d:\\exception.txt"));
try {
IndexResponse response = restHighLevelClient.index(request);
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
}
e.printStackTrace(printStream);
}
totalFilePath++;
}
logger.info("Indexing done.....");
}
}
I have tried the same (fetching 100k
documents) using RestHighLevelClient with Scroll API for that approach am getting the below error. So, that i changed RestHighLevelClient
to RestClient
Suppressed: org.apache.http.ContentTooLongException: entity content is t
oo long [223328895] for the configured buffer limit [104857600]
Please find the below error for RestHighLevelClient
D:\Karthikeyan\ElasticSearch\ElasticSearch_Tesing\target>java -jar ElasticSearch
Utility-1.0.0-SNAPSHOT-jar-with-dependencies.jar
Jul 13, 2018 3:11:59 PM com.es.utility.DocumentIndex main
INFO: Started Indexing the Document.....
ERROR StatusLogger No log4j2 configuration file found. Using default configurati
on: logging only errors to the console. Set system property 'log4j2.debug' to sh
ow Log4j2 internal initialization logging.
Exception in thread "main" java.net.ConnectException: Connection refused: no fur
ther information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEv
ent(DefaultConnectingIOReactor.java:171)
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.processEv
ents(DefaultConnectingIOReactor.java:145)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute
(AbstractMultiworkerIOReactor.java:348)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.exe
cute(PoolingNHttpClientConnectionManager.java:192)
at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(Cl
oseableHttpAsyncClientBase.java:64)
at java.lang.Thread.run(Unknown Source)
Suppressed: org.apache.http.ContentTooLongException: entity content is t
oo long [223328895] for the configured buffer limit [104857600]
at org.elasticsearch.client.HeapBufferedAsyncResponseConsumer.on
EntityEnclosed(HeapBufferedAsyncResponseConsumer.java:76)
at org.apache.http.nio.protocol.AbstractAsyncResponseConsumer.re
sponseReceived(AbstractAsyncResponseConsumer.java:131)
at org.apache.http.impl.nio.client.MainClientExec.responseReceiv
ed(MainClientExec.java:315)
at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerI
mpl.responseReceived(DefaultClientExchangeHandlerImpl.java:147)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.respons
eReceived(HttpAsyncRequestExecutor.java:303)
at org.apache.http.impl.nio.DefaultNHttpClientConnection.consume
Input(DefaultNHttpClientConnection.java:255)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputRea
dy(InternalIODispatch.java:81)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputRea
dy(InternalIODispatch.java:39)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputRead
y(AbstractIODispatch.java:114)
at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseI
OReactor.java:162)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEve
nt(AbstractIOReactor.java:337)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEve
nts(AbstractIOReactor.java:315)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(Ab
stractIOReactor.java:276)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIO
Reactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor
$Worker.run(AbstractMultiworkerIOReactor.java:588)
... 1 more
RestClientBuilder
, which is implicitly used to construct the RestClient
used internally by RestHighLevelClient
has a default timeout of 30s. You need to change this using setMaxRetryTimeoutMillis
, which is passed through to the SyncResponseListener
(which is not obvious, but makes sense if you think about it, if you want to enforce an upper limit).
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With