I tried it out on the basis of an online tutorial. In this tutorial, a queue is used in the context object for the elements to be processed.
I don't really like my solution. But switching to a solution with a list did not work.
<?xml version="1.0" encoding="UTF-8"?>
<job id="hugeImport" xmlns="https://jakarta.ee/xml/ns/jakartaee" version="2.0">
<step id="dummyItems" next="chunkProcessor">
<batchlet ref="dummyItemsBatchlet">
<properties>
<property name="numberOfDummyItems" value="10"/>
</properties>
</batchlet>
</step>
<step id="chunkProcessor" next="reloadItemsQueue_001">
<chunk>
<reader ref="itemReader">
<properties>
<property name="numberOfItems" value="2"/>
</properties>
</reader>
<processor ref="itemMockProcessor"/>
<writer ref="itemJpaWriter"/>
</chunk>
<partition>
<plan partitions="2"></plan>
</partition>
</step>
<step id="reloadItemsQueue_001" next="chunkProcessortest">
<batchlet ref="reloadItemQueueBatchlet">
</batchlet>
</step>
<step id="chunkProcessortest">
<chunk>
<reader ref="itemReader">
<properties>
<property name="numberOfItems" value="3"/>
</properties>
</reader>
<processor ref="itemMockProcessor"/>
<writer ref="itemJpaWriter"/>
</chunk>
<partition>
<plan partitions="2"></plan>
</partition>
</step>
</job>
public class ImportItem {
private Long id;
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public ImportItem(long id, String name) {
this.id = id;
this.name = name;
}
@Override
public String toString() {
return "ImportItem{" + "id=" + id + ", name=" + name + '}';
}
}
import java.util.List;
public class ImportItems {
private List<ImportItem> items;
public List<ImportItem> getItems() {
return items;
}
public void setItems(List<ImportItem> items) {
this.items = items;
}
}
import jakarta.batch.runtime.context.JobContext;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
@Named
public class ImportJobContext {
@Inject
private JobContext jobContext;
private final Queue<ImportItem> itemsToDo = new ConcurrentLinkedQueue<>();
private final Queue<ImportItem> itemsForNextStep = new ConcurrentLinkedQueue<>();
public void addItems(List<ImportItem> items) {
getImportJobContext().itemsToDo.addAll(items);
}
public synchronized void reloadQueue(){
getImportJobContext().itemsToDo.clear();
getImportJobContext().itemsToDo.addAll(getImportJobContext().itemsForNextStep);
getImportJobContext().itemsForNextStep.clear();
}
public synchronized List<ImportItem> getItems(int count) {
List<ImportItem> items = new ArrayList<>(count);
for (int i = 0; i < count; i++) {
var item = getImportJobContext().itemsToDo.poll();
if(item == null) {
continue;
}
items.add(item);
getImportJobContext().itemsForNextStep.add(item);
}
return items.isEmpty() ? null : items;
}
private ImportJobContext getImportJobContext() {
if (jobContext.getTransientUserData() == null) {
jobContext.setTransientUserData(this);
}
return (ImportJobContext) jobContext.getTransientUserData();
}
}
import jakarta.batch.api.AbstractBatchlet;
import jakarta.batch.api.BatchProperty;
import jakarta.batch.runtime.BatchStatus;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.util.ArrayList;
import java.util.List;
@Named
public class DummyItemsBatchlet extends AbstractBatchlet {
@Inject
private ImportJobContext jobContext;
@Inject
@BatchProperty
private String numberOfDummyItems;
@Override
public String process() throws Exception {
List<ImportItem> list = new ArrayList<>();
for(int i=0; i<Integer.parseInt(numberOfDummyItems); i++){
list.add(new ImportItem(i, "dummyItem" + i));
}
jobContext.addItems(list);
return BatchStatus.COMPLETED.name();
}
}
import jakarta.batch.api.BatchProperty;
import jakarta.batch.api.chunk.AbstractItemReader;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.util.List;
@Named
public class ItemReader extends AbstractItemReader {
@Inject
ImportJobContext importJobContext;
@Inject
@BatchProperty
private String numberOfItems;
@Override
public List<ImportItem> readItem() throws Exception {
int numberOfWorkerItems = 2;
if(numberOfItems != null){
numberOfWorkerItems = Integer.parseInt(numberOfItems);
}
return importJobContext.getItems(numberOfWorkerItems);
}
}
import jakarta.batch.api.chunk.ItemProcessor;
import jakarta.inject.Named;
@Named
public class ItemMockProcessor implements ItemProcessor {
@Override
public Object processItem(Object o) throws Exception {
System.out.println("--> processing " + o);
return o;
}
}
import jakarta.batch.api.chunk.AbstractItemWriter;
import jakarta.inject.Named;
import java.util.List;
@Named
public class ItemJpaWriter extends AbstractItemWriter {
@Override
public void writeItems(List<Object> list) throws Exception {
for (Object obj : list) {
List<ImportItem> item = (List<ImportItem>) obj;
System.out.println("--> Persisting " + item);
}
}
}
@Named
public class ReloadItemQueueBatchlet extends AbstractBatchlet {
@Inject
private ImportJobContext jobContext;
@Override
public String process() throws Exception {
System.out.println("ReloadItemQueueBatchlet.process");
jobContext.reloadQueue();
return BatchStatus.COMPLETED.name();
}
}
Can you please give me tips on how I can optimize the code?