Processing tasks in parallel and sequentially Java(并行和顺序处理 Java 任务)
问题描述
In my program, the user can trigger different tasks via an interface, which take some time to process. Therefore they are executed by threads. So far I have implemented it so that I have an executer with one thread that executes all tasks one after the other. But now I would like to parallelize everything a little bit.
i.e. I would like to run tasks in parallel, except if they have the same path, then I want to run them sequentially. For example, I have 10 threads in my pool and when a task comes in, the task should be assigned to the worker which is currently processing a task with the same path. If no task with the same path is currently being processed by a worker, then the task should be processed by a currently free worker.
Additional info: A task is any type of task that is executed on a file in the local file system. For example, renaming a file. Therefore, the task have the attribute path
. And I don't want to execute two tasks on the same file at the same time, so such tasks with the same paths should be performed sequentially.
Here is my sample code but there is work to do:
One of my problems is, I need a safe way to check if a worker is currently running and get the path of the currently running worker. By safe I mean, that no problems of simultaneous access or other thread problems occur.
public class TasksOrderingExecutor {
public interface Task extends Runnable {
//Task code here
String getPath();
}
private static class Worker implements Runnable {
private final LinkedBlockingQueue<Task> tasks = new LinkedBlockingQueue<>();
//some variable or mechanic to give the actual path of the running tasks??
private volatile boolean stopped;
void schedule(Task task) {
tasks.add(task);
}
void stop() {
stopped = true;
}
@Override
public void run() {
while (!stopped) {
try {
Task task = tasks.take();
task.run();
} catch (InterruptedException ie) {
// perhaps, handle somehow
}
}
}
}
private final Worker[] workers;
private final ExecutorService executorService;
/**
* @param queuesNr nr of concurrent task queues
*/
public TasksOrderingExecutor(int queuesNr) {
Preconditions.checkArgument(queuesNr >= 1, "queuesNr >= 1");
executorService = new ThreadPoolExecutor(queuesNr, queuesNr, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
workers = new Worker[queuesNr];
for (int i = 0; i < queuesNr; i++) {
Worker worker = new Worker();
executorService.submit(worker);
workers[i] = worker;
}
}
public void submit(Task task) {
Worker worker = getWorker(task);
worker.schedule(task);
}
public void stop() {
for (Worker w : workers) w.stop();
executorService.shutdown();
}
private Worker getWorker(Task task) {
//check here if a running worker with a specific path exists? If yes return it, else return a free worker. How do I check if a worker is currently running?
return workers[task.getPath() //HERE I NEED HELP//];
}
}
all you need is a hash map of actors, with file path as a key. Different actors would run in parallel, and concrete actor would handle tasks sequentially.
Your solution is wrong because Worker class uses blocking operation take
but is executed in a limited thread pool, which may lead to a thread starvation (a kind of deadlock). Actors do not block when waiting for next message.
import org.df4j.core.dataflow.ClassicActor;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.*;
public class TasksOrderingExecutor {
public static class Task implements Runnable {
private final String path;
private final String task;
public Task(String path, String task) {
this.path = path;
this.task = task;
}
//Task code here
String getPath() {
return path;
}
@Override
public void run() {
System.out.println(path+"/"+task+" started");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
System.out.println(path+"/"+task+" stopped");
}
}
static class Worker extends ClassicActor<Task> {
@Override
protected void runAction(Task task) throws Throwable {
task.run();
}
}
private final ExecutorService executorService;
private final Map<String,Worker> workers = new HashMap<String,Worker>(){
@Override
public Worker get(Object key) {
return super.computeIfAbsent((String) key, (k) -> {
Worker res = new Worker();
res.setExecutor(executorService);
res.start();
return res;
});
}
};
/**
* @param queuesNr nr of concurrent task queues
*/
public TasksOrderingExecutor(int queuesNr) {
executorService = ForkJoinPool.commonPool();
}
public void submit(Task task) {
Worker worker = getWorker(task);
worker.onNext(task);
}
public void stop() throws InterruptedException {
for (Worker w : workers.values()) {
w.onComplete();
}
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.SECONDS);
}
private Worker getWorker(Task task) {
//check here if a runnig worker with a specific path exists? If yes return it, else return a free worker. How do I check if a worker is currently running?
return workers.get(task.getPath());
}
public static void main(String[] args) throws InterruptedException {
TasksOrderingExecutor orderingExecutor = new TasksOrderingExecutor(20);
orderingExecutor.submit(new Task("path1", "task1"));
orderingExecutor.submit(new Task("path1", "task2"));
orderingExecutor.submit(new Task("path2", "task1"));
orderingExecutor.submit(new Task("path3", "task1"));
orderingExecutor.submit(new Task("path2", "task2"));
orderingExecutor.stop();
}
}
The protocol of execution shows that tasks with te same key are executed sequentially and tasks with different keys are executed in parallel:
path3/task1 started
path2/task1 started
path1/task1 started
path3/task1 stopped
path2/task1 stopped
path1/task1 stopped
path2/task2 started
path1/task2 started
path2/task2 stopped
path1/task2 stopped
I used my own actor library DF4J, but any other actor library can be used.
这篇关于并行和顺序处理 Java 任务的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本文标题为:并行和顺序处理 Java 任务


- GC_FOR_ALLOC 是否更“严重"?在调查内存使用情况时? 2022-01-01
- 在 Java 中,如何将 String 转换为 char 或将 char 转换 2022-01-01
- 获取数字的最后一位 2022-01-01
- Eclipse 的最佳 XML 编辑器 2022-01-01
- 转换 ldap 日期 2022-01-01
- 如何使 JFrame 背景和 JPanel 透明且仅显示图像 2022-01-01
- 未找到/usr/local/lib 中的库 2022-01-01
- 如何指定 CORS 的响应标头? 2022-01-01
- 将 Java Swing 桌面应用程序国际化的最佳实践是什么? 2022-01-01
- java.lang.IllegalStateException:Bean 名称“类别"的 BindingResult 和普通目标对象都不能用作请求属性 2022-01-01