79355943

Date: 2025-01-14 18:04:49
Score: 1
Natty:
Report link

Thanks very much Szymon, I want to update my interim solution for this, cause neither .parallelProcessing nor newSingleThreadExecutor nor synchrnous() is working without multithreading.

So what works is:

 .aggregate(constant(true), new GroupedBodyAggregationStrategy())
                    .completionSize(ELASTICSEARCH_BULK_SIZE)
                    .completionPredicate(someDataCache.getEndOfPostfilePredicate())
                    .process(this::processBulkRequest)
                    .end();

with

private final AtomicInteger bulkRequestCount = new AtomicInteger(0);

 private void processBulkRequest(Exchange exchange) {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            bulkRequestCount.incrementAndGet();
            exchange.getContext().createProducerTemplate().send("elasticsearch://elasticsearch?operation=Bulk&hostAddresses=#elasticsearchHostAddressess", exchange);
        });

        try {
            future.get();
            int totalBulkRequestsNeeded = someDataCache.getEndOfFilePredicate().getDocumentCount() / ELASTICSEARCH_BULK_SIZE;
            if (bulkRequestCount.get() > totalBulkRequestsNeeded) {
                logger.info("Bulk request completed. Updating alias");
                exchange.getContext().createProducerTemplate().send("direct:updateAlias", exchange);
            }
        } catch (Exception e) {
            logger.error("Error while processing bulk request", e);
        }
    }

in this case its guranteed that the alias will be created after the bulk upload.

But still would be great to know why this is not working as expected with Camel 4 and Java 21.

Reasons:
  • Blacklisted phrase (0.5): Thanks
  • RegEx Blacklisted phrase (1): I want
  • Long answer (-1):
  • Has code block (-0.5):
  • Self-answer (0.5):
  • Low reputation (0.5):
Posted by: Splioo