【Java】JUC - Monitor监控ThreadPoolExecutor

JUC - Monitor监控ThreadPoolExecutor

一个自定义Monitor监控ThreadPoolExecutor的执行情况

TASK

WokerTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class WorkerTask implements Runnable{
private String command;

public WorkerTask(String command) {
this.command = command;
}

@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" Start. Command = "+command);
processCommand();
System.out.println(Thread.currentThread().getName()+" End.");
}

private void processCommand(){
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}

}

@Override
public String toString() {
return "WorkerTask{" +
"command='" + command + '\'' +
'}';
}
}

MonitorTask(监听器)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class MonitorTask implements Runnable{
// 被监控的executor
private final ThreadPoolExecutor executor;
// 监控间隔
private final int seconds;
// 监控开关
private boolean run = true;

public MonitorTask(ThreadPoolExecutor executor, int seconds) {
this.executor = executor;
this.seconds = seconds;
}

public void shutdown(){
this.run = false;
}
@Override
public void run() {
while (run) {
System.out.println(
String.format("%s - [monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, Queue: %d, isShutdown: %s, isTerminated: %s",
Thread.currentThread().getName(),
this.executor.getPoolSize(),
this.executor.getCorePoolSize(),
this.executor.getActiveCount(),
this.executor.getCompletedTaskCount(),
this.executor.getTaskCount(),
this.executor.getQueue().size(),
this.executor.isShutdown(),
this.executor.isTerminated()));
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

RejectedExecutionHandler(拒绝策略)

LogRejectedExecutionHandler

1
2
3
4
5
6
7
class LogRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 当任务队列满了,并且达到maximumPoolSize时的拒绝策略
System.out.println(r.toString() + " is rejected");
}
}

ThreadPoolExecutor

1
2
3
4
5
6
7
8
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
2,
4,
10, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
Executors.defaultThreadFactory(),
new LogRejectedExecutionHandler()
);

完整的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
public class ThreadPoolExecutorMonitorSimple {
public static void main(String[] args) throws InterruptedException {
final ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
2,
4,
10, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
Executors.defaultThreadFactory(),
new LogRejectedExecutionHandler()
);

// 创建监控任务,间隔3s
final MonitorTask monitorTask = new MonitorTask(poolExecutor, 3);

// 监听任务启动
new Thread(monitorTask).start();

for (int i = 0; i < 10; i++) {
poolExecutor.execute(new WorkerTask("cmd-"+i));
}

TimeUnit.SECONDS.sleep(60);

poolExecutor.shutdown();

TimeUnit.SECONDS.sleep(5);

monitorTask.shutdown();
}

static class LogRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 当任务队列满了,并且达到maximumPoolSize时的拒绝策略
System.out.println(r.toString() + " is rejected");
}
}

static class WorkerTask implements Runnable{
private String command;

public WorkerTask(String command) {
this.command = command;
}

@Override
public void run() {
System.out.println(Thread.currentThread().getName()+" Start. Command = "+command);
processCommand();
System.out.println(Thread.currentThread().getName()+" End.");
}

private void processCommand(){
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}

}

@Override
public String toString() {
return "WorkerTask{" +
"command='" + command + '\'' +
'}';
}
}

static class MonitorTask implements Runnable{
// 被监控的executor
private final ThreadPoolExecutor executor;
// 监控间隔
private final int seconds;
// 监控开关
private boolean run = true;

public MonitorTask(ThreadPoolExecutor executor, int seconds) {
this.executor = executor;
this.seconds = seconds;
}

public void shutdown(){
this.run = false;
}
@Override
public void run() {
while (run) {
System.out.println(
String.format("%s - [monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, Queue: %d, isShutdown: %s, isTerminated: %s",
Thread.currentThread().getName(),
this.executor.getPoolSize(),
this.executor.getCorePoolSize(),
this.executor.getActiveCount(),
this.executor.getCompletedTaskCount(),
this.executor.getTaskCount(),
this.executor.getQueue().size(),
this.executor.isShutdown(),
this.executor.isTerminated()));
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

日志:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
pool-1-thread-1 Start. Command = cmd-0
pool-1-thread-2 Start. Command = cmd-1
pool-1-thread-3 Start. Command = cmd-4
WorkerTask{command='cmd-6'} is rejected
WorkerTask{command='cmd-7'} is rejected
WorkerTask{command='cmd-8'} is rejected
WorkerTask{command='cmd-9'} is rejected
pool-1-thread-4 Start. Command = cmd-5
Thread-0 - [monitor] [0/2] Active: 0, Completed: 0, Task: 1, Queue: 0, isShutdown: false, isTerminated: false
Thread-0 - [monitor] [4/2] Active: 4, Completed: 0, Task: 6, Queue: 2, isShutdown: false, isTerminated: false
pool-1-thread-1 End.
pool-1-thread-1 Start. Command = cmd-2
pool-1-thread-2 End.
pool-1-thread-2 Start. Command = cmd-3
pool-1-thread-3 End.
pool-1-thread-4 End.
Thread-0 - [monitor] [4/2] Active: 2, Completed: 4, Task: 6, Queue: 0, isShutdown: false, isTerminated: false
Thread-0 - [monitor] [4/2] Active: 2, Completed: 4, Task: 6, Queue: 0, isShutdown: false, isTerminated: false
pool-1-thread-1 End.
pool-1-thread-2 End.
Thread-0 - [monitor] [4/2] Active: 0, Completed: 6, Task: 6, Queue: 0, isShutdown: false, isTerminated: false
Thread-0 - [monitor] [2/2] Active: 0, Completed: 6, Task: 6, Queue: 0, isShutdown: false, isTerminated: false
Thread-0 - [monitor] [2/2] Active: 0, Completed: 6, Task: 6, Queue: 0, isShutdown: false, isTerminated: false
Thread-0 - [monitor] [2/2] Active: 0, Completed: 6, Task: 6, Queue: 0, isShutdown: false, isTerminated: false
Thread-0 - [monitor] [2/2] Active: 0, Completed: 6, Task: 6, Queue: 0, isShutdown: false, isTerminated: false
Thread-0 - [monitor] [2/2] Active: 0, Completed: 6, Task: 6, Queue: 0, isShutdown: false, isTerminated: false
Thread-0 - [monitor] [2/2] Active: 0, Completed: 6, Task: 6, Queue: 0, isShutdown: false, isTerminated: false
Thread-0 - [monitor] [2/2] Active: 0, Completed: 6, Task: 6, Queue: 0, isShutdown: false, isTerminated: false
Thread-0 - [monitor] [2/2] Active: 0, Completed: 6, Task: 6, Queue: 0, isShutdown: false, isTerminated: false
Thread-0 - [monitor] [2/2] Active: 0, Completed: 6, Task: 6, Queue: 0, isShutdown: false, isTerminated: false
Thread-0 - [monitor] [2/2] Active: 0, Completed: 6, Task: 6, Queue: 0, isShutdown: false, isTerminated: false
Thread-0 - [monitor] [2/2] Active: 0, Completed: 6, Task: 6, Queue: 0, isShutdown: false, isTerminated: false
Thread-0 - [monitor] [2/2] Active: 0, Completed: 6, Task: 6, Queue: 0, isShutdown: false, isTerminated: false
Thread-0 - [monitor] [2/2] Active: 0, Completed: 6, Task: 6, Queue: 0, isShutdown: false, isTerminated: false
Thread-0 - [monitor] [2/2] Active: 0, Completed: 6, Task: 6, Queue: 0, isShutdown: false, isTerminated: false
Thread-0 - [monitor] [2/2] Active: 0, Completed: 6, Task: 6, Queue: 0, isShutdown: false, isTerminated: false
Thread-0 - [monitor] [0/2] Active: 0, Completed: 6, Task: 6, Queue: 0, isShutdown: true, isTerminated: true
Thread-0 - [monitor] [0/2] Active: 0, Completed: 6, Task: 6, Queue: 0, isShutdown: true, isTerminated: true

参考阅读