You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

432 lines
25 KiB
Markdown

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# 中间件研究
## 1. 中间件生态
- ![中间件图](pic/中间件图.png)
## 2. 数据结构
### 2.1 数组与链表: 存储设计的基石
- 数组的特性: 内存连续性和随机访问效率高
- 连续内存的好处: 数组的随机访问性能极好, 连续内存确保了地址空间的连续性, 寻址非常简单高效
- 数组可以根据下标计算出真实的物理地址, 寻找算法为 : baseOffset + index * size
- 这种高效的访问机制在中间件领域有使用: 比如 RocketMQ的文件设计中就应用了
- **RocketMQ - 数组使用剖析**
- RocketMQ 为了追求消息写入时极致的顺序写,**会把所有主题的消息全部顺序写入到 commitlog 文件中**。也就是说commitlog 文件中混杂着各个主题的消息,但消息消费
时,需要根据主题、队列、消费位置向消息服务器拉取消息。**如果想从 commitlog 文件中读取消息,则需要遍历 commitlog 文件中的所有消息,检索性能非常低下。**
- 一开始为了提高检索效率RocketMQ 引入了 **ConsumeQueue 文件**,可以理解为 **commitlog 文件按照主题创建索引**
- 为了在消费端支持消息**按 tag 进行消息过滤**,索引数据中需要包含消息的 tag 信息,它的数据类型是 String**索引文件遵循{topic}/{queueId}**,也就是按照主题、队列两级目录存
储。单个索引文件的存储结构设计如下图所示:
- ![RocketMQ-tag索引设计](pic/RocketMQ-tag索引设计.png)
- 索引文件中,每一条消息都包含**偏移量、消息长度和 tag 内容** 3 个字段。
- commitlog 偏移量
- 可以根据该值快速从 commitlog 文件中找到消息,这也是索引文件的意义。
- 消息长度
- 消息的长度,知道它可以方便我们快速提取一条完整的消息。
- tag 内容
- 由于消息的 tag 是由用户定义的,例如 tagA、createorder 等,它的长度可变。在文件存储领域,**一般存储可变长的数据**,通常会采用“**长度字段 + 具体内容**”的存储方式。
- 回到消息消费这个需求,我们根据主题、消费组,消息位置 (队列中存储的第 N 条消息),能否快速找到消息呢?例如输入 topic:order_topic、queueId:0,offset:2能不能马上找
到第 N 条消息?
- 答案是可以找到,但不那么高效。原因是,我们根据 topic、queueid能非常高效地找到对应的索引文件。我们只需要找到对应的 topic 文件夹,然后在它的子目录中找到对应的
队列 id 文件夹就可以了。但要想从索引文件中找到具体条目,我们还是必须遍历索引文件中的每一个条目,**直到到达 offset 的条目,才能取出对应的 commitlog 偏移量**。
- 那是否有更高效的索引方式呢?
- 当然有,我们可以将每一个条目设计成固定长度,然后按照数组下标的方式进行检索。
- 为了实现每一个条目定长,我们在这里不存储 tag 的原始字符串,而是**存储原始字符串的 hashCode这样就可以确保定长了**。你可以看看下面这张设计图:
- ![hashCode保证定长](pic/hashCode保证定长.png)
- 基于这种设计,如果给定一个 offset我们再想快速提取一条索引就变得非常简单了。
- 首先,根据 offset * 20(每一个条目的长度),定位到需要查找条目的 **起始位置**用startOffset 表示。
- 然后,从 startOffset 位置开始读取 20 个字节的长度,就可以得到 **物理偏移量、消息长度和 tag 的 hashCode** 了。
- 接着,我们可以通过 hashCode 进行第一次过滤,如果遇到 hash 冲突,**就让客户端再根据消息的 tag 字符串精确过滤一遍**
- 总之,正是由于数组具有内存连续性,具有随机访问的特性,它在存储设计领域的应用才非常广泛,我们后面介绍的 HashMap 也引入了数组。
---
- **ArrayList**
- **数组从严格意义上来说是面向过程编程中的产物**,而 Java 是一门面向对象编程的语言,所以,直接使用数组容易破坏面向对象的编程范式,故面向对象编程语言都会对数组
进行更高级别的抽象,在 Java 中对应的就是 ArrayList。
- 从**数据存储结构、扩容机制、数据访问特性**三个方面和你一起来探究一下 ArrayList。
- ArrayList 的底层数据直接使用了数组,是**对数组的抽象**。
- ArrayList 相比数组,增加了一个特性,它**支持自动扩容**。其扩容机制如下图所示:
- ![ArrayList自动扩容](pic/ArrayList自动扩容.png)
- 扩容的实现有三个要点
- 需要将原数组中的内容拷贝到新的数组,即**扩容过程中存在内存复制等较重的操作**
- 注意,只在当前无剩余空间时才会触发扩容。在实际的使用过程中,我们要**尽量做好容量评估,减少扩容的发生**。因为扩容的成本还是比较高的,**存储的数据越多,扩容的成本越高**。
- ArrayList 的数据访问特性
- 顺序添加元素的效率高
- ArrayList 顺序添加元素,如果不需要扩容,直接将新的数据添加到 elementData[size] 位置,然后 size 加一即可其中size 表示当前数组中存储的元素个数)
- ArrayList 添加元素的时间复杂度为 O(1),也就是说它不会随着存储数据的大小而改变,是非常高效的存储方式。
- 中间位置插入 / 删除元素的效率低
- 在插入元素时,我们将需要插入数据的下标用 index 表示,将 index 之后的依次向后移动(复制到 index + 1),然后将新数据存储在下标 index 的位置。
- 删除操作与插入类似,只是一个数据是往后移,而删除动作是往前移。
- ArrayList 在中间位置进行删除的时间复杂度为 O(n),这是一个比较低效的操作。
- 随机访问性能高
- 由于 ArrayList 的底层就是数组,因此它拥有高效的随机访问数据特性。
---
- LinkedList
- 一个 LinkedList 对象在内存中通常由两部分组成:**LinkedList 对象和由 Node 节点组成的链条**。
- 一个 LinkedList 对象在内存中主要包含 3 个字段。
- int size链表中当前存在的 Node 节点数,主要用来判断是否为空、判断随机访问位点是否存在;
- Node first指向链表的头节点
- Node last指向链表的尾节点。
- 再来说说由 Node 节点组成的链条。Node 节点用于存储真实的数据,并维护两个指针。
- E item拥有存储用户数据
- Node prev前驱节点指向当前节点的前一个指针
- Node last后继节点指向当前节点的下一个节点。
- 由这两部分构成的链表具有一个非常典型的特征:**内存的申请无须连续性**。这就减少了内存申请的限制。
- 对比
- ![ArrayList与LinkedList对比](pic/ArrayList与LinkedList对比.png)
---
- HashMap
- 无论是链表还是数组都是一维的,在现实世界中有一种关系也非常普遍:关联关系。**关联关系在计算机领域主要是用键值对来实现**HashMap 就是基于哈希表 Map 接口的具体实现。
- 在 HashMap 中,单个键值对用一个 Map.Entry 结构表示,具体字段信息如下。
- K key存储的 Key后续可以用该 Key 进行查找
- V value存储的 Value
- int hashKey 的哈希值;
- Ma.Entry next 链表。
- 当哈希槽中已经存在数据时,新加入的元素是存储在链表的头部还是尾部呢?
- 答案是 **放在头部**。代码如下:
```java
//假设新放入的槽位下标用 index 表示,哈希槽用 hashArray 表示
Map.Entry newEntry = new Map.Entry(key,value);
newEntry.next = hashArray[index];
hashArray[index] = newEntry;
```
- 我们将新增加的元素放到链表的头部,也就是直接放在哈希槽中,然后用 next 指向原先存在于哈希槽中的元素。
- ![HashMap头插](pic/HashMap头插.png)
- 这种方式的妙处在于,只涉及两个指针的修改。如果我们把新增加的元素放入链表的头部,链表的复杂度为 O(1)。相反,如果我们把新元素放到链表的尾部,那就需要遍历整条
链表,写入复杂度会有所提高,随着哈希表中存储的数据越来越多,那么**新增数据的性能将随着链表长度的增加而逐步降低**。
- 介绍完添加元素,我们来看一下元素的查找流程,也就是如何根据 Key 查找到指定的键值对。
- 首先,计算 Key 的 hashCode然后与哈希槽总数进行取模得到对应哈希槽下标。
- 然后,访问哈希槽中对应位置的数据。如果数据为空,则返回“未找到元素”。如果哈希槽对应位置的数据不为空,那我们就要判断 Key 值是否匹配了。如果匹配,则返回当前数
据;如果不匹配,则需要遍历哈希槽,如果遍历到链表尾部还没有匹配到任何元素,则返回“未找到元素”。
- 说到这里我们不难得出这样一个结论如果没有发生哈希槽冲突也就是说如果根据Key 可以直接命中哈希槽中的元素,数据读取访问性能非常高。但如果需要从链表中查找
数据,则性能下降非常明显,时间复杂度将从 O(1) 提升到 O(n),这对查找来说就是一个“噩梦”。
- 怎么解决这个问题呢JDK 的设计者们给出了两种优化策略。
- 第一种,对 Hash 槽进行扩容,让数据尽可能分布到哈希槽上,但不能解决因为哈希冲突导致的链表变长的问题。
- 第二种,当链表达到指定长度后,将链表结构转换为红黑树,提升检索性能 (JDK8 开始引入)。
- 我们先来通过源码深入探究一下 HashMap 的扩容机制。**HashMap 的扩容机制由 resize 方法实现**,该方法主要分成两个部分,**上半部分处理初始化或扩容容量计算,下半部分处
理扩容后的数据复制** (重新布局)。
- 具体源码如下:
```java
/**
* Initializes or doubles table size. If null, allocates in
* accord with initial capacity target held in field threshold.
* Otherwise, because we are using power-of-two expansion, the
* elements from each bin must either stay at same index, or move
* with a power of two offset in the new table.
*
* @return the table
*/
final Node<K,V>[] resize() {
Node<K,V>[] oldTab = table;
int oldCap = (oldTab == null) ? 0 : oldTab.length;
int oldThr = threshold;
int newCap, newThr = 0;
if (oldCap > 0) {
if (oldCap >= MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
oldCap >= DEFAULT_INITIAL_CAPACITY)
newThr = oldThr << 1; // double threshold
}
else if (oldThr > 0) // initial capacity was placed in threshold
newCap = oldThr;
else { // zero initial threshold signifies using default
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}
if (newThr == 0) {
float ft = (float)newCap * loadFactor;
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACIT
(int)ft : Integer.MAX_VALUE);
}
threshold = newThr;
@SuppressWarnings({"rawtypes","unchecked"})
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
table = newTab;
//此处省略数据复制相关代码
}
```
- 与之对应的流程图:
- ![HashMap对应流程图](pic/HashMap对应流程图.png)
- HashMap 的容量并无限制,但超过 2 的 30 次幂后不再扩容哈希槽。
- 哈希槽是按倍数扩容的。
- HashMap 在不指定容量时,默认初始容量为 16。
- HashMap 并不是在无容量可用的时候才扩容。它会先设置一个扩容临界值,当 HashMap 中的存储的数据量达到设置的阔值时就触发扩容,这个阔值用 threshold 表示。
- 我们还引入了一个变量 loadFactor 来计算阔值,阔值 = 容量 *loadFactor。其中loadFactor 表示加载因子,默认为 0.75。
- 加载因子的引入与 HashMap 哈希槽的存储结构与存储算法有关。
- HashMap 在出现哈希冲突时,会引入一个链表,形成“数组 + 链表”的存储结构。这带来的效果就是,如果 HashMap 有 32 个哈希槽,当前存储的数据也刚好有 32 个,这些数
据却不一定全会落在哈希槽中,因为可能存在 hash 值一样但是不同 Key 的数据,这时,数据就会进入到链表中。
- 前面我们也提到过,数据放入链表就容易引起查找性能的下降,所以,**HashMap 的设计者为了将数据尽可能地存储到哈希槽中,会提前进行扩容,用更多的空间换来检索性能的提高**。
```java
@SuppressWarnings({"rawtypes","unchecked"})
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
table = newTab;
if (oldTab != null) {
for (int j = 0; j < oldCap; ++j) {
Node<K,V> e;
if ((e = oldTab[j]) != null) {
oldTab[j] = null;
if (e.next == null)
else { // preserve order
Node<K,V> loHead = null, loTail = null;
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
do {
next = e.next;
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
}
}
}
```
- 这段代码不难理解,就是按照扩容后的容量创建一个新的哈希槽数组,遍历原先的哈希槽(数组),然后将数据重新放入到新的哈希槽中,为了保证链表中数据的顺序性,在**扩容时采用尾插法**。
- 除了扩容JDK8 之后的版本还有另外一种提升检索能力的措施,那就是在链表长度超过 8 时,将链表演变为**红黑树**。这时的时间复杂度为 O(2lgN),可以有效提升效率。
- 哈希表是我们使用得最多的数据结构,它的底层的设计也很具技巧性。**哈希表充分考虑到数组与链表的优劣,扬长避短**HashMap 就是这两者的组合体。为了解决链表检索性能
低下的问题HashMap 内部又引入了扩容与链表树化两种方式进行性能提升,提高了使用的便利性,降低了使用门槛。
## 3. 开源工具及底层
### 3.1 多线程编程有哪些常见的设计模式?
- 如何复用线程?
```java
Thread t = new Thread(new UserTask());
```
- 这段代码会创建一个操作系统线程吗?
- 答案是**不会**。这段代码只是创建了一个普通的 Java 对象,要想成为一个真实的线程,必须调用线程的 start 方法,让线程真正受操作系统调度。而线程的结束和 run 方法的执行情
况有关,一旦线程的 run 方法结束运行,线程就会进入消亡阶段,相关资源也会被操作系统回收。
- 所以**要想复用线程,一个非常可行的思路就是,不让 run 方法结束**。
- 通常我们会想到下面这种办法:
```java
class Task implements Runnable {
@Override
public void run() {
while(true) {
if( shouldRun() ) {// 符合业务规则就运行
doSomething();
} else {
try {
//休眠1s,继续去判断是否可运行
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
private void doSomething() {
}
}
```
- 通过一个 while(true) 死循环确保 run 方法不会结束,然后不断地判断当前是否可以执行业务逻辑;如果不符合执行条件,就让线程休眠一段时间,然后再次进行判断。
- 这个方法确实可以复用线程,但存在明显的缺陷。因为一旦不满足运行条件,就会进行反复无意义的判断,造成 CPU 资源的浪费。另外,在线程处于休眠状态时,就算满足执行条
件,也需要等休眠结束后才能触发检测,时效性会大打折扣。
- 那我们能不能一有任务就立马执行,没有任务就阻塞线程呢?毕竟,如果线程处于阻塞状态,就不会参与 CPU 调度,自然也就不会占用 CPU 时间了。
- 答案当然是可以的业界有一种非常经典的线程复用模型while 循环 + 阻塞队列,下面是一段示范代码:
```java
class Task implements Runnable {
private LinkedBlockingQueue taskQueue = new LinkedBlockingQueue();
private AtomicBoolean running = new AtomicBoolean(true);
public void submitTask(Object task) throws InterruptedException {
taskQueue.put(task);
}
@Override
public void run() {
while(running.get()) {
try {
Object task = taskQueue.take(); // 如果没有任务,会使线程阻塞,
doSomething(task);
} catch (Throwable e) {
e.printStackTrace();
}
}
}
public void shutdown() {
if(running.compareAndSet(true, false)) {
System.out.println(Thread.currentThread() + " is stoped");
}
}
}
```
- 我们来解读一下。这里,我们**用 AtomicBoolean 变量来标识线程是否在运行中**,用 **while(running.get()) 替换 while(true),方便优雅地退出线程**
- 线程会从阻塞队列中获取待执行任务,如果当前没有可执行的任务,那么线程处于阻塞状态,不消耗 CPU 资源;一旦有任务进入到阻塞队列,线程会被唤醒执行任务,这就很好地
保证了时效性。
- 那怎么停止一个线程呢?调用线程的 shutdown 方法一定能停止线程吗?
- 答案是不一定。 如果任务队列中没有任务那么线程会一直处于阻塞状态不能被停止。而且Java 中 Thread 对象的 stop 方法被声明为已过期,直接调用并不能停止线程。那
怎么优雅地停止一个线程呢?
- 原来Java 中提供了中断机制,在 Thread 类中与中断相关的方法有三个。
- public void interrupt()Thread 实例方法,用于设置中断标记,但是不能立即中断线程。
- public boolean isInterrupted()Thread 实例方法,用于获取当前线程的中断标记。
- public static boolean interrupted()Thread 静态方法,用于获取当前线程的中断标记,并且会清除中断标记。
- 如果调用线程对象的 interrupt() 方法,会首先设置线程的中断位,这时又会出现两种情况:
- 如果线程阻塞在支持中断的方法上会立即结束阻塞并向外抛出InterruptedException(中断异常)
- 如果线程没有阻塞在支持中断的方法上,则该方法不能立即停止线程。
- 通常,我们需要在代码中添加显示的中断检测代码,我还是用前面的例子给出示例代码,你可以看一下:
```java
static class Task implements Runnable {
private LinkedBlockingQueue taskQueue = new LinkedBlockingQueue();
private AtomicBoolean running = new AtomicBoolean(true);
public void submitTask(Object task) throws InterruptedException {
taskQueue.put(task);
}
@Override
public void run() {
while(running.get()) {
try {
Object task = taskQueue.take(); // 如果没有任务,会使线程阻塞,
doSomething(task);
if(Thread.currentThread().isInterrupted()) {
//线程被中断,跳出循环,线程停止
break;
}
//这是一个耗时很长的方法
doSomething2(task);
} catch (Throwable e) {
e.printStackTrace();
}
}
}
public void shutdown() {
if(running.compareAndSet(true, false)) {
System.out.println(Thread.currentThread() + " is stoped");
}
}
private void doSomething(Object task) {
}
private void doSomething2(Object task) {
}
}
```
- 非常不建议你直接使用 Executors 相关的 API 来创建线程池,因为通过这种方式创建的线程池内部会默认创建一个无界的阻塞队列,一旦使用不当就会造成内存泄露。
- 我更推荐你**使用 new 的方式创建线程,然后给线程指定一个可阅读的名称**
```java
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 0, TimeUnit.MILLISE,new LinkedBlockingQueue<>(), new ThreadFactory() {
private AtomicInteger threadNum = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("pull-service-" + threadNum.incrementAndGet());
return t;
}
});
```
- 这样当系统发生故障时如果我们想要分析线程栈信息就能很快定位各个线程的职责。例如RocketMQ 的消费线程我就会以“ConsumeMessageThread_”开头。
- 使用线程池另一个值得关注的问题是**怎么选择阻塞队列**,是使用无界队列还是有界队列。
- 通常,我们可以遵循这样的原则:**对于 Request-Response 等需要用户交互的场景,建议使用有界队列,避免内存溢出****对于框架内部线程之间的交互,可以根据实际情况加以选择**。
- 我们通过几个例子来看一下具体的场景。
- 项目开发中通常会遇到文件下载、DevOps 的系统发布等比较耗时的请求,这类场景就非常适合使用线程池。基本的工作方式如图:
- ![文件下载上传场景线程池解决](pic/文件下载上传场景线程池解决.png)
- 在与用户交互的场景中,如果几十万个文件下载请求同时提交到线程池,当线程池中的所有线程都在处理任务时,无法及时处理的请求就会存储到线程池中的阻塞队列中。这就很
容易使内存耗尽,从而触发 Full-GC导致系统无法正常运作。
- 因此,这类场景我建议使用有界队列,**直接拒绝**暂时处理不了的请求,并给用户返回一条消息“请求排队中,请稍后再试”,这就保证了系统的可用性。
- 在一个线程或多个线程向一个阻塞队列中添加数据时,通常也会使用有界队列。记得我在开发数据同步产品时,为了实现源端与目标端线程,就采用了阻塞队列,下面是一张示意图:
- ![源数据传输有界队列](pic/源数据传输有界队列.png)
- 为了实现 MySQL 增量同步Canal 线程源源不断地将 MySQL 数据写入到阻塞队列,然后目标端线程从队列中读取数据并写入到 MQ。如果写入端的写入速度变慢阻塞队列中
后,我们就能让源端线程处于阻塞状态,从而对源端进行限流。
- 但在选择阻塞队列时还可能有另外一种情况,那就是一个线程对应多个阻塞队列,这时我们一般会采用无界阻塞队列 +size 的机制,实现细粒度限流。当时,我设计的 RocketMQ
消费模型是下面这样:
- ![RocketMQ消费模型](pic/RocketMQ消费模型.png)
- 一个拉取线程轮流从 Broker 端队列 (q0、q1) 中拉取消息,然后根据队列分别放到不同的阻塞队列中,每一个阻塞队列会单独分配单个或多个线程去处理。
- 这个时候,采用有界队列可能出现问题。如果我们采用有界队列,一旦其中一个阻塞队列对应的下游消费者处理性能降低,阻塞队列中没有剩余空间存储消息,就会阻塞消息发送
线程,最终造成另外一个任务也无法拉取新的消息。显然,这会让整体并发度降低,影响性能。
- 那如果采用无界队列呢?单纯使用无界队列容易导致内存泄露,触发更严重的后果,好像也不是一个好的选择。
- 其实我们可以在无界队列的基础上额外引入一个参数,用它来控制阻塞队列中允许存放的消息条数。当阻塞队列中的数据大于允许存放的阔值时,新的消息还可以继续写入队列,
不会阻塞消息发送线程。但我们需要给消息拉取线程一个反馈,暂时停止从对应队列中拉取消息,从而实现限流。
- **多线程编程常用的设计模式**
## 4. 场景分析
## 5. 补充