79083909

Date: 2024-10-13 18:55:27
Score: 5
Natty:
Report link

@marcinj Can you answer as an unit-test?

I dont recommend unit testing thread correction of your code, how do you know how long such test would have to last? 2s, 10minutes or 10h? Maybe it would fail after a 12h of testing. Currently it fails after few seconds in multithreaded environment, but there are other subtle errors.

The truth is if you would code it to the moment it starts working correctly, you would probably end up with implementation similar to the one from the java.util.concurrent.Executors, so why dont you just reuse what is well tested and official.

As for testing, I suggest checking other things, like if the exception reporting in your class works as intended (errorHandler). Maybe if some scheduled work finishes before executeUntilDeplated.

Below is my try to rewrite your code with Executors thread pool:

package org.example;

import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

public class DepleatingFiFoThreadPool<A> {

public static final Logger LOG = Logger.getLogger(DepleatingFiFoThreadPool.class.getCanonicalName());

private final ExecutorService executor;
private final Consumer<Throwable> errorHandler;
private final String prefix;
private final Consumer<A> invoker;

public DepleatingFiFoThreadPool(final int threadsRunningMax, final Consumer<Throwable> errorHandler,
                            final String prefix, final Consumer<A> invoker) {
    this.errorHandler = errorHandler;
    this.prefix = prefix;
    this.invoker = invoker;
    this.executor = Executors.newFixedThreadPool(threadsRunningMax, new ThreadFactory() {
        private int count = 0;

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r, prefix + "-Thread-" + (++count));
            t.setUncaughtExceptionHandler((thread, e) -> {
                try {
                    errorHandler.accept(e);
                } catch (Throwable ta) {
                    ta.addSuppressed(e);
                    LOG.log(Level.SEVERE, ta.getMessage(), ta);
                }
            });
            return t;
        }
    });
}

public void addAndStartThread(final A notRunningThread, final String threadPostfix) {
    executor.submit(() -> {
        Thread.currentThread().setName(prefix + "-Thread-" + threadPostfix); // Set thread name per task
        try {
            invoker.accept(notRunningThread);
        } catch (Throwable e) {
            // note: this blocks any exceptions to be passed to errorHandler, is this intentional?
            LOG.log(Level.SEVERE, "Task execution error", e);
        }
    });
}

public boolean executeUntilDeplated(long timeoutMs) throws InterruptedException {
    executor.shutdown();
    return executor.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS);
}

}

Reasons:
  • Blacklisted phrase (1): how do you
  • RegEx Blacklisted phrase (2.5): Can you answer
  • RegEx Blacklisted phrase (2.5): do you know how
  • Contains signature (1):
  • Long answer (-1):
  • Has code block (-0.5):
  • Contains question mark (0.5):
  • User mentioned (1): @marcinj
  • High reputation (-2):
Posted by: marcinj