RocketMQ源码详解 | Broker篇 · 其五:高可用之主从架构

发布于 2022年 01月 12日 08:47

概述

对于一个消息中间件来讲,高可用功能是极其重要的,RocketMQ 当然也具有其对应的高可用方案。

在 RocketMQ 中,有主从架构和 Dledger 两种高可用方案:

第一种通过主 Broker 将消息发送到从 Broker 实现高可用,在主 Broker IO 压力大或宕机的时候,从 Broker 可以接管读请求,但这种方案不支持在主 Broker 宕机后自动进行故障转移,且从 Broker 不支持写请求,也就是说在主 Broker 宕机后我们只能手动处理。

第二种是在 RocketMQ 4.5.X 的时候才加入的新的方案,其为基于 Raft 算法实现的一个高可用方案,支持集群自动选主与故障转移,但 TPS 低于第一种方案。

本文主要介绍前者的实现



HAService

RocketMQ 的主从高可用的实现的代码量比较少,大概就一两千行,其主要在 HAService 类和 HAConnection 类。


HAService 有三个内部类:

  • AcceptSocketService

    用来监听 HAClient 的连接请求,接收请求后将建立好的 channel 包装成 HAConnection 保存起来

  • GroupTransferService

    用以监听与处理分发请求,当外部发送了异步的分发请求后,该类中的线程将同步的处理该请求,并将其执行结果交给 Future 以执行回调函数

  • HAClient

    高可用客户端,在从 Broker 上启动,用以从主 Broker 拉取消息


HAService 主要是对上面三个类的包装,通过控制它们来对外提供服务。


AcceptSocketService

首先创建了一个注册了 OP_ACCEPT 事件的 selector ,用以监听绑定在 HA 服务端口上的 ServerSocketChannel(也就是一个标准的 NIO 服务器)

public void beginAccept() throws Exception {
  this.serverSocketChannel = ServerSocketChannel.open();
  this.selector = RemotingUtil.openSelector();
  this.serverSocketChannel.socket().setReuseAddress(true);
  this.serverSocketChannel.socket().bind(this.socketAddressListen);
  this.serverSocketChannel.configureBlocking(false);
  // 监听 accept 事件
  this.serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}

不过需要注意 RemotingUtil.openSelector() 方法,这里如果在 Linux 平台上,会使用 Epoll 来做多路复用的 selector

public static Selector openSelector() throws IOException {
  Selector result = null;

  // 如果在 linux 平台, 则使用 Epoll 作为多路复用的 selector
  if (isLinuxPlatform()) {
    try {
      final Class<?> providerClazz = Class.forName("sun.nio.ch.EPollSelectorProvider");
      if (providerClazz != null) {
        // pass: 这里通过反射调用 provider 方法获取 SelectorProvider 以创建 epoll 的 selector
      }
    } catch (final Exception e) {
      // ignore
    }
  }

  // 否则如果在其他平台上使用 nio 默认的实现
  if (result == null) {
    result = Selector.open();
  }

  return result;
}

在创建完成后,就开始通过使用 selector 监听 accpet 事件发生,然后进行以下处理

SocketChannel sc = ((ServerSocketChannel) k.channel()).accept();

if (sc != null) {
  try {
    // 在创建 socket 后, 将该创建好的 channel 包装成 HAConnection 类
    // 放入主类进行管理
    HAConnection conn = new HAConnection(HAService.this, sc);
    conn.start();
    HAService.this.addConnection(conn);
  } catch (Exception e) {
    log.error("new HAConnection exception", e);
    sc.close();
  }
}

在创建了 HAConnection 并启动后,这个服务就能自动的从存储服务中拉取已经持久化的消息 (准确来讲,是否已经持久化取决于使用的刷盘方案) ,并发送给该 Channel 对应的从 Broker ,且响应从 Broker 发送过来的请求。


GroupTransferService

如同简介介绍的,这个服务主要用来处理上层发过来的分发请求

public void putRequest(final CommitLog.GroupCommitRequest request) {
  lock.lock();
  try {
    // 添加到等待写队列
    this.requestsWrite.add(request);
  } finally {
    lock.unlock();
  }
  this.wakeup();
}

其中对于请求的存放具有两个队列,分别为 requestsWriterequestsRead ,这种设计方式也是一种比较常见的"无锁编程"的方式,即在遍历一个"读队列"里的请求时,使用另外一个队列来接受新到来的请求,而只需要在两个变量交换队列时与"写队列"写入时加锁即可。避免了在每次写入与读取都需要加锁,或直接使用一个阻塞队列所带来的消耗。


然后在线程中,会不断的对"读队列"中的请求进行处理

for (CommitLog.GroupCommitRequest req : this.requestsRead) {
  // 当 HAService 的已同步的进度超过了该请求要求的进度时
  // 视为已提交
  boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();

  // 设置等待的超时时间
  long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
    + HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();

  // 等待直到主从进度到达要求的位置或超过指定时间未到达
  while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
    this.notifyTransferObject.waitForRunning(1000);
    transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
  }

  // 由于在 broker 中持久化是串行的, 所以下一个请求要求到达的偏移量
  // 一定大于当前请求, 因此我们的处理也能串行化
  if (!transferOK) {
    log.warn("transfer messsage to slave timeout, " + req.getNextOffset());
  }

  // complete 执行结果
  req.wakeupCustomer(transferOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
}

可以看出,这个类其实也不会做实质上的备份到从 Broker 的操作,而是进行对已经同步到 salve 进度进行监控,等待分发请求需要到达的偏移量到达,或者超时时,将执行结果交给异步请求。


HAClient

最后是高可用客户端的设计,这个客户端会被从 Broker 用来从主 Broker 拉取消息。

先来看它的成员属性

private static final int READ_MAX_BUFFER_SIZE = 1024 * 1024 * 4;
// 主 Broker 的地址
private final AtomicReference<String> masterAddress = new AtomicReference<>();
// 向主 Broker 发起拉取的起始 offset
private final ByteBuffer reportOffset = ByteBuffer.allocate(8);

private SocketChannel socketChannel;
private Selector selector;

// 上一次写入消息的时间戳
private long lastWriteTimestamp = System.currentTimeMillis();
// 本从 Broker 的当前复制进度
private long currentReportedOffset = 0;
// 读缓冲区中已经处理到的位置的指针
private int dispatchPosition = 0;
// 读缓冲区大小 (4MB)
private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);

byteBufferReadbyteBufferBackup 你想到了什么?嗯没错,上文提到的无锁编程中的读写分离队列


这个客户端类大致的执行流程如下:

while (!this.isStopped()) {
  try {
    // 如果还没有建立连接,则尝试连接到主 Broker
    if (this.connectMaster()) {

      // 检查当前时间是否需要上报偏移量
      // 同时, 上报偏移量又充当了主从 broker 之间的心跳的角色
      if (this.isTimeToReportOffset()) {
        // 向已经建立好的连接写入当前偏移量
        boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
        if (!result) {
          this.closeMaster();
        }
      }

      // 等待事件的发生
      // 而这里注册的事件为 OP_READ, 即为等待主 Broker 发送消息
      this.selector.select(1000);

      // 处理到来的读事件
      boolean ok = this.processReadEvent();
      // 出现异常时关闭连接
      if (!ok) {
        this.closeMaster();
      }

      // 从本地 store 服务中读取已经提交的偏移量, 报告给主 Broker
      if (!reportSlaveMaxOffsetPlus()) {
        continue;
      }

			// pass: 超时后的关闭 channel
    } else {
      this.waitForRunning(1000 * 5);
    }
  } catch (Exception e) {
    log.warn(this.getServiceName() + " service has exception. ", e);
    this.waitForRunning(1000 * 5);
  }
}

在主从 Broker 的通信中,上报偏移量充当了心跳的角色,在主 Broker 那边,超过指定时间没有接收到心跳,则会判断为断开连接

private boolean isTimeToReportOffset() {
  long interval =
    HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp;
  boolean needHeart = interval > HAService.this.defaultMessageStore.getMessageStoreConfig()
    .getHaSendHeartbeatInterval();

  return needHeart;
}

且这个心跳的超时时间默认配置为 5s


如果超过了指定的需要上报的间隔,就会通过已经建立好的 channel 写入当前已经持久化偏移量

private boolean reportSlaveMaxOffset(final long maxOffset) {
  this.reportOffset.position(0);
  this.reportOffset.limit(8);
  this.reportOffset.putLong(maxOffset);
  this.reportOffset.position(0);
  this.reportOffset.limit(8);

  for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
    try {
      this.socketChannel.write(this.reportOffset);
    } catch (IOException e) {
      log.error(this.getServiceName()
                + "reportSlaveMaxOffset this.socketChannel.write exception", e);
      return false;
    }
  }

  lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
  return !this.reportOffset.hasRemaining();
}

这段代码很好理解,即直接将偏移量写入到 buffer,然后将 buffer 写入到 channel

吐槽下这种控制 buffer 位置的方式,直接调用 clear 后 put 然后 filp 不比直接操作位置直观吗...


写入这里可能会有疑惑,为什么需要多次写入(write),难道一次写不完吗?

SocketChannel 的注释表示可能还真写不完

Unless otherwise specified, a write operation will return only after writing all of the r requested bytes. Some types of channels, depending upon their state, may write only some of the bytes or possibly none at all. A socket channel in non-blocking mode, for example, cannot write any more bytes than are free in the socket's output buffer.

由于我们使用的是 NIO,也叫非阻塞 IO,所以在写入的时候是不会阻塞的,我们也不知道具体写入成功了没有。而在 TCP 中是具有流量控制的,如果对面 socket 的接受缓冲区已经满了,就会触发流量控制,在本端的 socket 中的发送缓冲区由于不会发送任何数据,所以会很快的堆满,直到对方恢复接收为止。

而这时如果是传统的 BIO,会阻塞在 write(..) 这直到完全的写入为止,而 NIO 就可能需要多次 write 保证写入完成。


然后在写入完成后,会更新已经通信的时间,并让 selector 等待可读事件发生,也就是等待对面发送需要拉取的数据。

在事件发生或到达 1s 后,就会执行以下代码

private boolean processReadEvent() {
  int readSizeZeroTimes = 0;
  while (this.byteBufferRead.hasRemaining()) {
    try {
      int readSize = this.socketChannel.read(this.byteBufferRead);
      if (readSize > 0) {
        // 读取到的需要拉取的消息, 进行写入
        readSizeZeroTimes = 0;
        boolean result = this.dispatchReadRequest();
        if (!result) {
          log.error("HAClient, dispatchReadRequest error");
          return false;
        }
      } else if (readSize == 0) {
        // 如果连续三次读到的 size 都为 0 则结束
        if (++readSizeZeroTimes >= 3) {
          break;
        }
      } else {
        log.info("HAClient, processReadEvent read socket < 0");
        return false;
      }
    } catch (IOException e) {
      log.info("HAClient, processReadEvent read socket exception", e);
      return false;
    }
  }

  return true;
}

首先会先读一个 long 的长度(8B)来判断是否有数据,如有则通过 dispatchReadRequest 方法接着读取读取具体的消息

private boolean dispatchReadRequest() {
    final int msgHeaderSize = 8 + 4; // phyoffset + size

    while (true) {
        int diff = this.byteBufferRead.position() - this.dispatchPosition;
        if (diff >= msgHeaderSize) {
         // pass: 读消息,然后同步 append 到 commitlog   
        }

        if (!this.byteBufferRead.hasRemaining()) {
            // 将当前正在读的 byteBuffer 和 backupByteBuffer 进行交换
            this.reallocateByteBuffer();
        }
        break;
    }
    return true;
}

在读取完成并持久化成功后,会提交一次已同步的偏移量,并接着循环以上的过程



HAConnection

该类为 SocketChannel 的包装类,直接负责对 socket 的读写。

并且这个类也和 HAService 类似,将功能实现交给了两个内部服务类去做:

  • ReadSocketService

    负责 socket 的读,读的是 HAClient 发过来的已经提交的偏移量(也是心跳)

  • WriteSocketService

    负责 socket 的写,写的是需要同步到从 Broker 的消息


ReadSocketService

和 HAClient 类似,也是简单的通过在注册了 OP_READ 事件的 selector 来获取发送过来的偏移量

while (!this.isStopped()) {
  try {
    this.selector.select(1000);
    boolean ok = this.processReadEvent();
    if (!ok) {
      HAConnection.log.error("processReadEvent error");
      break;
    }

    // 检查心跳时间, 超时则停止服务且移除这个连接
    long interval = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
    if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
      log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + interval);
      break;
    }
  } catch (Exception e) {
    HAConnection.log.error(this.getServiceName() + " service has exception.", e);
    break;
  }
}

在有可读事件后的核心源码如下

if ((this.byteBufferRead.position() - this.processPosition) >= 8) {
  // 在这里读取到到的消息为从 Broker 当前已经持久化的偏移量
  int pos = this.byteBufferRead.position() - (this.byteBufferRead.position() % 8);
  long readOffset = this.byteBufferRead.getLong(pos - 8);
  this.processPosition = pos;

  // 更新自身对于从 Broker 的持久化位置的信息
  HAConnection.this.slaveAckOffset = readOffset;
  if (HAConnection.this.slaveRequestOffset < 0) {
    HAConnection.this.slaveRequestOffset = readOffset;
    log.info("slave[" + HAConnection.this.clientAddr + "] request offset " + readOffset);
  }

  // 通知主 Broker 从 Broker 的提交偏移量已经更新
  HAConnection.this.haService.notifyTransferSome(HAConnection.this.slaveAckOffset);
}

在这里通知了已提交偏移量更新了以后,之前在 GroupTransferService 类中看见的阻塞线程就会被唤醒,然后进行请求的响应。


WriteSocketService

该类用于定期的将主 Broker 的消息写入到该连接对应的从 Broker。

首先先初始化需要拉取的开始位置

if (-1 == this.nextTransferFromWhere) {
  // 初始化从主 Broker 当前已写入位置推送或为从 Broker 请求的位置
  if (0 == HAConnection.this.slaveRequestOffset) {
    // 计算需要推送的偏移量, 这个偏移量是 MappedFile 中的物理偏移量
    long masterOffset = HAConnection.this.haService.getDefaultMessageStore().getCommitLog().getMaxOffset();
    masterOffset =
      masterOffset
      - (masterOffset % HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
         .getMappedFileSizeCommitLog());

    if (masterOffset < 0) {
      masterOffset = 0;
    }

    this.nextTransferFromWhere = masterOffset;
  } else {
    this.nextTransferFromWhere = HAConnection.this.slaveRequestOffset;
  }

  log.info("master transfer data from " + this.nextTransferFromWhere + " to slave[" + HAConnection.this.clientAddr
           + "], and slave request " + HAConnection.this.slaveRequestOffset);
}

默认的起点为从 Broker 的请求位置,如果没有则从自身当前已经写入的物理位置为起点写入


确定好即将进行 append 的位置后,则先将还没写完的数据进行写入,直到所有剩余的数据写完后,才会进行下一批数据的拉取并写入

if (this.lastWriteOver) {
  // 计算上一次写入的时间, 超过指定间隔才写入
  long interval =
    HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;

  if (interval > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig()
      .getHaSendHeartbeatInterval()) {
  	// pass: 写入头部字段
  }
} else {
  // 如果上一次写入还有没有写完的数据, 需要进行写入
  this.lastWriteOver = this.transferData();
  if (!this.lastWriteOver)
    continue;
}

然后进行新数据的拉取

// 所有之前拉取的数据都已经写完了, 现在继续获取新的数据以写入从 Broker
SelectMappedBufferResult selectResult =
  HAConnection.this.haService.getDefaultMessageStore().getCommitLogData(this.nextTransferFromWhere);
if (selectResult != null) {
  // 分批推送过去, 默认为 32k
  int size = selectResult.getSize();
  if (size > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize()) {
    size = HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaTransferBatchSize();
  }

  long thisOffset = this.nextTransferFromWhere;
  this.nextTransferFromWhere += size;
  
  // pass: 写入头部

  this.lastWriteOver = this.transferData();
} else {

  HAConnection.this.haService.getWaitNotifyObject().allWaitForRunning(100);
}

而我们可以看见,这里的写入并没有将所有的数据进行直接的写入,而是以默认 32K 的数据量进行分批的推送,一次没有写完的数据将会在下一次循环中写入。



总结

最后我们再梳理一下主备同步的完整过程:

  1. HAService 启动,监听指定端口;HAClient 连接到指定端口。
  2. HAClient 如果有指定的偏移量,则从这个偏移量开始发送;否则从主 Broker 的 CommitLog 的尾部开始发送消息。
  3. HAClient 收到来自 HAService 的消息后,将其持久化到存储层,然后更新已持久化索引
  4. HAClient 的已提交的 Log 的位置会定时上报
  5. HAService 发现位置已经更新了后,会处理上层应用之前提交的请求

以上过程不断的重复,就完成了主备同步。


可以看的出来,这里的实现十分的简单,设计也非常高效,但是对于宕机却没有做多少处理,即使从 Broker 可以在主 Broker 宕机以后接管读请求,但却不能做到自动的主备转移。

从 Broker 接管读的具体实现已经在前几篇文章提过


所以 RocketMQ 有了另一种高可用模式:DLedger 集群,这种新的方案基于 Raft 算法实现了 Broker 组内的自动故障转移功能,实现了高可用。



Authoren_oc

Posted2021-12-17 18:24

License本文使用知识共享署名 4.0 国际许可协议进行许可(使用需署名)

推荐文章