读前思考

不使用CompletableFuture,如何使用Future 来优雅实现异步超时处理,错误处理,以及异步任务的依赖?

一、背景

为什么需要多线程

cpu负责调度系统执行任务,cpu的效率极高,
如果一个cpu对应一个进程的一个线程来执行任务的话,那CPU的效率会大大降低。

底层依靠什么运转

  • 时间片轮转:

    操作系统将CPU时间划分为多个时间片,每个时间片分配给不同的线程。当一个线程的时间片用完时,操作系统会暂停该线程的执行,并将CPU分配给下一个线程。

  • 上下文切换:

    当操作系统暂停一个线程并切换到另一个线程时,需要保存当前线程的上下文(如寄存器状态、程序计数器等),并恢复下一个线程的上下文。
    上下文切换会带来一定的开销,但它使得单核CPU能够在多个线程之间快速切换,从而模拟并发执行。

二、实战

线程实现

实现方式 优点 缺点
继承Thread类 简单易用,直接使用Thread类的方法 不支持多继承,不利于资源共享
实现Runnable接口 支持多继承,利于资源共享 需要额外创建Thread对象,稍微复杂
使用Callable和Future 可以返回结果和抛出异常,适用于需要获取结果的场景 需要使用ExecutorService和Future,代码稍微复杂
使用线程池 高效管理线程,灵活性高,便于资源管理 复杂性增加,可能导致资源耗尽

future的几个实现

特性 Future FutureTask CompletableFuture
概念 接口,用于表示异步计算的结果 Future接口的一个具体实现 Future接口的一个具体实现
功能 提供了一些方法来检查计算是否完成、等待计算完成并获取结果 作为一个任务提交给ExecutorService执行,也可以直接在一个线程中执行 许多方法来进行异步操作、组合多个异步任务
引入版本 Java 5 Java 5 Java 8
是否实现Future接口
是否实现Runnable接口
是否支持链式调用
是否支持组合多个任务
是否支持异步超时处理 是(1.9开始支持)

多线程实战

  • 背景:

    • 业务:C端页面,很多功能模块需要异步获取结果
    • 技术:公司的skywalking 组件是5.x版本的,对于CompletableFuture 的异步任务,无法追踪到trace。
  • 问题分析:

    • 业务:1、如果多个模块之间有依赖怎么办?如下一个模块依赖于上一个模块的结果才决定出与不出。
    • 技术:无法追踪TRACE,意味着无法使用CompletableFuture,那技术上如何适应Future 来优雅实现异步超时处理,错误处理,以及异步任务的依赖?
  • 思路:

    • 异步超时处理:
      起一个定时的任务,该任务的作用是到时间点检查 提交的任务的线程是否还在运行,如果运行,则记录为超时,同时停止任务。
      伪代码如下:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
        FutureTask<?> future = new FutureTask<>(callable);
      executorService.submit(future);
      Runnable r = () -> {
      try {
      boolean b = !future.isDone() ;
      if (b) {
      try {
      LOGGER.warn("{}任务超时啦",xxx);
      future.get(0, TimeUnit.MILLISECONDS);
      } catch (Exception e) {
      future.isMayStopIfRunning(true);
      }
      }
      } catch (Exception e) {
      future.cancel(true);
      LOGGER.error(Strings.EMPTY, e);
      }
      };
      timeoutExecutor.schedule(r, xxxTime, TimeUnit.MILLISECONDS);
    • 错误处理:
      定义一个对象,该对象包含执行线程过程中的全部信息,封装该对象的时机为:在get时候的上下文中来封装
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
       private boolean success;
      private boolean timeout;
      /**
      * 执行器名称
      */
      private String executorName;
      /**
      * 失败原因
      */
      private String errorMessage;
      private Exception ex;
      private long cost;
      private T obj;
  • 异步任务的依赖
    其实开源界有liteflow之类的流程编排框架,这块简化,大概思路就是,传参进来后,把每个模块的处理结果放在一个转盘(对象)内,给该对象起一个监听器,一旦监听到该属性有值,且符合其他模块的出发条件,立即执行其他模块。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    @Getter
    @Setter
    @Accessors(chain = true)
    public class LombokBean {
    private String field;
    private PropertyChangeSupport support = new PropertyChangeSupport(this);

    public void addPropertyChangeListener(PropertyChangeListener listener) {
    support.addPropertyChangeListener(listener);
    }

    public void removePropertyChangeListener(PropertyChangeListener listener) {
    support.removePropertyChangeListener(listener);
    }

    public void setField(String field) {
    String oldField = this.field;
    this.field = field;
    support.firePropertyChange("field", oldField, field);
    }
    }

三、AQS实战

为什么提起

异步超时获取,异步获取消息,这两块都需要主线程 、线程池的其他线程的并发获取结果。
因为涉及两个线程的操作,属于多线程并发范畴,自然想到了使用AQS来管理任务的状态和数据情况
重新定义的任务管理对象,这里参考的是开源框架[async](https://github.com/dromara/gobrs-async)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
package xxxx;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;

@Slf4j
public class FutureTask<V> implements RunnableFuture<V> {
public static final int STOP_STAMP = 5;
private final Sync sync;
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
sync = new Sync(callable);
}

@Override
public boolean isCancelled() {
return sync.innerIsCancelled();
}

@Override
public boolean isDone() {
return sync.innerIsDone();
}

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return sync.innerCancel(mayInterruptIfRunning);
}

/**
* 线程跑的时候,关闭
*
*/
public boolean isMayStopIfRunning(boolean mayStopIfRunning) {
if (sync.runner != null && Thread.State.RUNNABLE != sync.runner.getState()) {
return cancel(mayStopIfRunning);
}
return sync.innerStop(mayStopIfRunning);
}

/**
* @throws CancellationException {@inheritDoc}
* 每个任务都有可能抛出异常
*/
@Override
public V get() throws ExecutionException, InterruptedException {

return sync.innerGet();
}

/**
* @throws CancellationException {@inheritDoc}
* 最多延迟 xx 获取结果
*/
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
return sync.innerGet(unit.toNanos(timeout));
}

/**
* Sets the result of this Future to the given value unless this future
* has already been set or has been cancelled. This method is invoked
* internally by the <tt>run</tt> method upon successful completion of
* the computation.
*
* @param v the value
*/
protected void set(V v) {
sync.innerSet(v);
}

/**
* @param t the cause of failure
*/
protected void setException(Throwable t) {
sync.innerSetException(t);
}

@Override
public void run() {
sync.innerRun();
}

/**
* Synchronization control for FutureTask. Note that this must be a
* non-static inner class in order to invoke the protected <tt>done</tt>
* method. For clarity, all inner class support methods are same as
* outer, prefixed with "inner".
* <p>
* Uses AQS sync state to represent run status
*/
private final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -7828117401763700385L;

/**
* State value representing that task is ready to run
*/
private static final int READY = 0;
/**
* State value representing that task is running
*/
private static final int RUNNING = 1;
/**
* State value representing that task ran
*/
private static final int RAN = 2;
/**
* State value representing that task was cancelled
*/
private static final int CANCELLED = 4;

/**
* The underlying callable
*/
private final Callable<V> callable;
/**
* The result to return from get()
*/
private V result;
/**
* The exception to throw from get()
*/
private Throwable exception;

/**
* The thread running task. When nulled after set/cancel, this
* indicates that the results are accessible. Must be volatile, to
* ensure visibility upon completion.
*/
private volatile Thread runner;

Sync(Callable<V> callable) {
this.callable = callable;
}

private boolean ranOrCancelled(int state) {
return (state & (RAN | CANCELLED)) != 0;
}

/**
* Implements AQS base acquire to succeed if ran or cancelled
*/
@Override
protected int tryAcquireShared(int ignore) {
return innerIsDone() ? 1 : -1;
}

/**
* Implements AQS base release to always signal after setting final
* done status by nulling runner thread.
*/
@Override
protected boolean tryReleaseShared(int ignore) {
runner = null;
return true;
}

/**
* Inner is cancelled boolean.
*
* @return the boolean
*/
boolean innerIsCancelled() {
return getState() == CANCELLED;
}

/**
* Inner is done boolean.
*
* @return the boolean
*/
boolean innerIsDone() {
return ranOrCancelled(getState()) && runner == null;
}

/**
* Inner get v.
*
* @return the v
* @throws InterruptedException the interrupted exception
* @throws ExecutionException the execution exception
*/
V innerGet() throws InterruptedException, ExecutionException {
acquireSharedInterruptibly(0);
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
}

/**
* Inner get v.
*
* @param nanosTimeout the nanos timeout
* @return the v
* @throws InterruptedException the interrupted exception
* @throws ExecutionException the execution exception
* @throws TimeoutException the timeout exception
*/
V innerGet(long nanosTimeout) throws InterruptedException,
ExecutionException, TimeoutException {
if (!tryAcquireSharedNanos(0, nanosTimeout))
throw new TimeoutException();
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
}

/**
* Inner set.
*
* @param v the v
*/
void innerSet(V v) {
for (; ; ) {
int s = getState();
if (s == RAN)
return;
if (s == CANCELLED) {
// aggressively release to set runner to null,
// in case we are racing with a cancel request
// that will try to interrupt runner
releaseShared(0);
return;
}
if (compareAndSetState(s, RAN)) {
result = v;
releaseShared(0);
return;
}
}
}

/**
* 设置异常
*
* @param t the t
*/
void innerSetException(Throwable t) {
for (; ; ) {
int s = getState();
if (s == RAN)
return;
if (s == CANCELLED) {
// aggressively release to set runner to null,
// in case we are racing with a cancel request
// that will try to interrupt runner
releaseShared(0);
return;
}
if (compareAndSetState(s, RAN)) {
exception = t;
releaseShared(0);
return;
}
}
}

/**
* 避免死循环
*
* @param mayStopIfRunning the may stop if running
* @return boolean boolean
*/
boolean innerStop(boolean mayStopIfRunning) {
for (; ; ) {
int s = getState();
if (ranOrCancelled(s))
return false;
if (compareAndSetState(s, CANCELLED))
break;
}
if (mayStopIfRunning) {
Thread r = runner;
if (r != null) {
r.stop();//这里调用线程stop方法
setState(STOP_STAMP);
}
}
releaseShared(0);

return true;
}

/**
* 设置关闭
*
* @param mayInterruptIfRunning the may interrupt if running
* @return the boolean
*/
boolean innerCancel(boolean mayInterruptIfRunning) {
for (; ; ) {
int s = getState();
if (ranOrCancelled(s))
return false;
if (compareAndSetState(s, CANCELLED))
break;
}
if (mayInterruptIfRunning) {
Thread r = runner;
if (r != null) {
r.interrupt();
}
}
releaseShared(0);
return true;
}

/**
* Inner run.
*/
void innerRun() {
if (!compareAndSetState(READY, RUNNING))
return;

runner = Thread.currentThread();
if (getState() == RUNNING) { // recheck after setting thread
V result;
try {
result = callable.call();
} catch (Throwable ex) {
setException(ex);
return;
}
set(result);
} else {
releaseShared(0); // cancel
}
}
}
}

定义

AQS是一个用于构建锁和同步器的框架

实现

AQS的核心思想是使用一个FIFO(先进先出)的等待队列来管理线程的同步状态。
AQS通过一个int类型的状态变量(state)来表示同步状态,并提供了原子操作来修改这个状态。

核心组件

  • 同步状态(state):一个int类型的变量,用于表示同步状态。可以通过getState()、setState(int newState)和compareAndSetState(int expect, int update)方法来访问和修改。
  • 等待队列(Wait Queue):一个FIFO队列,用于存放等待获取同步状态的线程。每个节点(Node)表示一个线程,节点之间通过prev和next指针连接
  • 节点(Node):AQS中的内部类,用于表示等待队列中的每个节点。每个节点包含线程引用、等待状态、前驱节点和后继节点等信息。

主要方法

  • 独占模式(Exclusive Mode)
  • 共享模式(Shared Mode)
  • 队列操作:添加队尾,唤醒下个节点

常见的实现

类名称 实现原理 使用场景
CountDownLatch 维护一个state(初始定义),同时包含await方法,该方法尝试获取共享锁,同时进入等待队列,一旦state为0,所有任务一下全部激活。 每次调用 countDown() 方法时,计数器减一;当计数器减到零时,所有等待的线程都会被唤醒
Semaphore 维护一个state(初始定义),同时包含acquire()方法 和release(),acquire对state -1 ,如果成功,则运行任务,失败,则等待,release()对应的+1 尝试获取一个许可,如果没有可用的许可,则阻塞当前线程,释放一个许可,并唤醒等待的线程
ReentrantReadWriteLock state的低 16 位(0-15 位)表示写锁的重入次数。高 16 位(16-31 位)表示读锁的重入次数。 读写锁允许多个读线程同时访问共享资源,但在写线程访问时,所有的读线程和其他写线程都被阻塞,特别注意:当一个线程获取读锁,也不允许它继续获取写锁

四、总结

AQS 开放了 抽离了获取state的方法,释放state的方法,同时封装了接入队列的方法,基于不同的后去和释放逻辑,
不同的实现类,封装自身对获取和释放state的逻辑,可以创造出对线程等待顺序不同的并发操作,非常巧妙的抽象模板模式。