/ / JAVA Continue enviando tarefas para o pool de threads e o pool de threads de desligamento quando não houver mais tarefas em execução? - java, multithreading, java-8, threadpool, threadpoolexecutor

Continuando Enviando tarefas para o conjunto de encadeamentos e o conjunto de encadeamentos do Encerramento quando não há mais tarefas sendo executadas? - java, multithreading, java-8, threadpool, threadpoolexecutor

Estou experimentando com threads de Java JAVA e quero saber se existe alguma maneira de desligar o pool de threads quando não houver mais threads enviados e enviar threads quando um segmento terminar a execução.

Para ser mais claro, eu tenho um THREAD_LIMIT variável de Integer tipo que é o número máximo de threads podeexecute algoritmos paralelos e recursivos que supostamente verificam o número ativo de threads antes de se chamar. se o número ativo de encadeamentos for menor que o limite de encadeamentos, ele enviará um novo encadeamento para o conjunto de encadeamentos, caso contrário, chama a recursão no mesmo encadeamento.

Os problemas que estou enfrentando é acompanhar os threads ativos e desligar o pool de threads quando nenhum novo thread for enviado. Quero obter o máximo desempenho do meu código usando o multi-threading.

eu segui esta tutorial para criar meu próprio pool de threads e usado

   public int getTaskQueueSize() {
return taskQueue.size();
}

dentro ThreadPool classe para obter um número ativo de threads.

Na classe principal eu estou usando

     void shutdownExecutor() {
while (tp.getTaskQueueSize() != 0) {
try {
Thread.sleep(100);
} catch (InterruptedException ex) {
Logger.getLogger(HamiltonPaths.class.getName()).log(Level.SEVERE, null, ex);
}
//  System.out.println("Not STopping tp");
}
tp.shutdown();
// System.out.println("Stopped tp");

}

na classe principal para desligar o pool de threads. Mas depois de algum tempo, ele parou de gerar novos tópicos. Mas o único segmento continua fazendo o trabalho em recursão.

Atualização 1: Código adicionado Então eu descobri que enviar tarefas parathreadpool está funcionando bem. Mas acidentalmente adicionei um bug com uma alteração recente no código, o que me impede de enviar mais tarefas ao pool de threads e desligar o pool de threads. shutdownExecutor() funciona como tp.getTaskQueueSize() está retornando o tamanho inicial da fila ou as tarefas não estão sendo removidas da fila.

Estou usando a lógica a seguir para decidir se deve enviar uma nova tarefa ou continuar trabalhando em recursão.

if ((this.tp.getTaskQueueSize() < threadLimit) && (!this.tp.isPoolShutDownInitiated())) {
spawnNewThread(new PTask(cp, get));//Class that implements the Runnable and do the same thing as the function called in below statement.

} else {
PPath(cp, get);// call to the function
}

BlockingQueueCustom.java

 package threadpool;

abstract interface BlockingQueueCustom<E>
{
public abstract void put(E paramE)
throws InterruptedException;

public abstract E take()
throws InterruptedException;

public abstract int size();
}

LinkedBlockingQueueCustom.java

package threadpool;

import java.util.LinkedList;
import java.util.List;

class LinkedBlockingQueueCustom<E>
implements BlockingQueueCustom<E> {

private List<E> queue;
private int maxSize;

public LinkedBlockingQueueCustom(int maxSize) {
this.maxSize = maxSize;
this.queue = new LinkedList();
}

public synchronized void put(E item)
throws InterruptedException {
if (this.queue.size() == this.maxSize) {
wait();
}

this.queue.add(item);
notifyAll();
}

public synchronized E take()
throws InterruptedException {
if (this.queue.isEmpty()) {
wait();
}

notifyAll();
if (this.queue.isEmpty()) {
return null;
}
return (E) this.queue.remove(0);
}

public synchronized int size() {
return this.queue.size();
}
}

ThreadPool.java

package threadpool;

import java.util.logging.Level;
import java.util.logging.Logger;

public class ThreadPool {

private BlockingQueueCustom<Runnable> taskQueue;
int size = 0;
int taskExecuted = 0;
ThreadPoolsThread[] threadPoolsThread;

public int getTaskExecuted() {
return this.taskExecuted;
}

public synchronized void dectaskExec() {
this.taskExecuted -= 1;
}

public int getSize() {
return this.size;
}

public BlockingQueueCustom<Runnable> getTaskQueue() {
return this.taskQueue;
}

public int getTaskQueueSize() {
return this.taskQueue.size();
}

private boolean poolShutDownInitiated = false;

public ThreadPool(int nThreads) {
this.taskQueue = new LinkedBlockingQueueCustom(nThreads);
this.size = nThreads;
this.threadPoolsThread = new ThreadPoolsThread[nThreads + 1];

for (int i = 1; i <= nThreads; i++) {
this.threadPoolsThread[i] = new ThreadPoolsThread(this.taskQueue, this);
this.threadPoolsThread[i].setName("" + i);

this.threadPoolsThread[i].start();
}
}

public synchronized void execute(Runnable task) {
if (this.poolShutDownInitiated) {
try {
throw new Exception("ThreadPool has been shutDown, no further tasks can be added");
} catch (Exception ex) {
Logger.getLogger(ThreadPool.class.getName()).log(Level.SEVERE, null, ex);
}
}
this.taskExecuted += 1;

try {
this.taskQueue.put(task);
} catch (InterruptedException ex) {
Logger.getLogger(ThreadPool.class.getName()).log(Level.SEVERE, null, ex);
}
}

public boolean isPoolShutDownInitiated() {
return this.poolShutDownInitiated;
}

public synchronized void shutdown() {
this.poolShutDownInitiated = true;
}
}

ThreadPoolsThread.java

package threadpool;

import java.util.logging.Level;
import java.util.logging.Logger;

class ThreadPoolsThread
extends Thread {

private BlockingQueueCustom<Runnable> taskQueue;
private ThreadPool threadPool;

public ThreadPoolsThread(BlockingQueueCustom<Runnable> queue, ThreadPool threadPool) {
this.taskQueue = queue;
this.threadPool = threadPool;
}

public void run() {
for (;;) {
Runnable runnable = null;
while ((!this.threadPool.isPoolShutDownInitiated()) && (this.taskQueue.size() == 0)) {
}

if ((this.threadPool.isPoolShutDownInitiated()) && (this.taskQueue.size() == 0)) {
break;
}

try {
runnable = (Runnable) this.taskQueue.take();
} catch (InterruptedException ex) {
Logger.getLogger(ThreadPoolsThread.class.getName()).log(Level.SEVERE, null, ex);
break;
}

if (runnable == null) {
break;
}

runnable.run();

if ((this.threadPool.isPoolShutDownInitiated())
&& (this.taskQueue.size() == 0)) {
interrupt();

try {
Thread.sleep(1L);
} catch (InterruptedException ex) {
Logger.getLogger(ThreadPoolsThread.class.getName()).log(Level.SEVERE, null, ex);
break;
}
}
}
}
}

Respostas:

1 para resposta № 1

Parece haver um mal-entendido fundamental. Você não "envia um novo tópico para o conjunto de threads", enviará Tarefas para um serviço executor (que pode serimplementado como pool de threads). O serviço executor gerencia os threads e decide se deve iniciá-los, pará-los ou reutilizá-los. Se você configurar o executor para ter um número máximo de threads, ele não usará mais que o número especificado. Se você enviar mais tarefas que os threads disponíveis, essas tarefas serão enfileiradas. Portanto, o tamanho da fila não é o número de threads.

Geralmente, você não deve implementar seu próprio pool de encadeamentos, principalmente quando não entendeu completamente o que eles fazem. Java implementa implementações há mais de dez anos. Você pode apenas usar Executors.newFixedThreadPool(threadCount) para obter um executor de conjunto de encadeamentos usando o número especificado de encadeamentos.

Se você deseja garantir que as tarefas não sejam enfileiradas, mas sempre executadas pelo chamador quando todos os threads estiverem ocupados, não é muito difícil de realizar. Crie um ThreadPoolExecutor explicitamente, usando o construtor que permite especificar uma fila personalizada e RejectedExecutionHandler junto com um SynchronousQueue e CallerRunsPolicy:

ThreadPoolExecutor e=new ThreadPoolExecutor(threadCount, threadCount,
1, TimeUnit.MINUTES, new SynchronousQueue<>(), Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());

Antes do Java 8, você precisa explicitar o tipo de fila: new SynchronousQueue<Runnable>()
Isso passará as tarefas enviadas para um thread inativo,se houver, mas execute-o no encadeamento de envio, caso contrário. Observe que quando o tamanho do núcleo e o tamanho máximo são idênticos, como no exemplo acima, o valor do tempo limite é irrelevante.

Se você enviou todas as tarefas, pode simplesmente chamar shutDown() sem verificações adicionais. Ele ainda executará tarefas pendentes antes de parar todos os threads.