Thanks very much Szymon, I want to update my interim solution for this, cause neither .parallelProcessing nor newSingleThreadExecutor nor synchrnous() is working without multithreading.
So what works is:
.aggregate(constant(true), new GroupedBodyAggregationStrategy())
.completionSize(ELASTICSEARCH_BULK_SIZE)
.completionPredicate(someDataCache.getEndOfPostfilePredicate())
.process(this::processBulkRequest)
.end();
with
private final AtomicInteger bulkRequestCount = new AtomicInteger(0);
private void processBulkRequest(Exchange exchange) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
bulkRequestCount.incrementAndGet();
exchange.getContext().createProducerTemplate().send("elasticsearch://elasticsearch?operation=Bulk&hostAddresses=#elasticsearchHostAddressess", exchange);
});
try {
future.get();
int totalBulkRequestsNeeded = someDataCache.getEndOfFilePredicate().getDocumentCount() / ELASTICSEARCH_BULK_SIZE;
if (bulkRequestCount.get() > totalBulkRequestsNeeded) {
logger.info("Bulk request completed. Updating alias");
exchange.getContext().createProducerTemplate().send("direct:updateAlias", exchange);
}
} catch (Exception e) {
logger.error("Error while processing bulk request", e);
}
}
in this case its guranteed that the alias will be created after the bulk upload.
But still would be great to know why this is not working as expected with Camel 4 and Java 21.