C++ Workflow框架
2025-06-28 22:34:08

Workflow源码及官方文档

Sogou 公司 C++服务器引擎,不止是网络库,支持大部分 C++后端服务。

  1. 抽象粒度比较大的异步任务;
  2. 任务流组织多个异步任务;
  3. 不占用线程协调任务。

编程范式:程序 = 协议 + 算法 + 任务流:

协议:

  • 内置通用网络协议,包括 Http、Redis 和各种 RPC。
  • 用户可自定义网络协议,但需要提供序列化和反序列化协议。

算法:

  • 算法是与协议对称的,协议调用是rpc,算法调用就是一次apc。
  • 提供的通用算法可以直接使用,亦可以自定义算法。

任务流:

  • 任务流是实际的业务逻辑,即把协议和算法放在流程图中使用起来。
  • 典型的任务流是一个闭合的串并联图,任务流图可以根据每一步的结果动态生成,任务异步执行。

异步任务

异步任务一般借助回调实现。
需要建立 Redis 连接,以 Reactor 为例:

  1. 建立连接(可能涉及多次回调:DNS 解析、写回调等)
  2. 发送命令,等待返回值(异步可能涉及多次读回调,一次没读完)
  • 同步的话线程会阻塞等待 Redis 返回值

即当服务器去访问 Redis,可能涉及n个回调,不太适合做业务(使用类似 Reactor 的网络库可能一个业务会有多次回调)。

workflow 对异步任务进行抽象,其抽象的是一个粒度比较大的异步任务,n个回调抽象成 1次回调,隐藏底层实现。

workflow 抽象了哪些粒度大的异步任务?

  1. 网络
  • 服务端 server
  • 客户端 redis\mysql\kafka\http
  1. 磁盘 IO
  • readwritefsyncfdsync
  1. 定时任务
  2. 计算任务

任务处理

如何组织异步任务?

  1. 组织:串联、并联、DAG。
  2. 协调:计数器、资源池、条件任务。

线程模型

一般而言:

  1. 1 个主线程
  2. 4 个 poller 线程:检测条件是否满足(网络事件触发、定时任务触发)。
  3. 20 个 worker 线程:调用回调函数。
  4. 8 个 go 线程:执行耗时计算的任务。

以下述代码为例:main 线程让 poller 线程监听 8888 端口,基于任务,创建listenfdlistenfd对 4取余,交由某个 poller 线程负责,该 poller 线程检测客户端是否建立连接,建立连接后得到clientfd,同样对4取余,交由某个 poller 线程负责,该 poller 线程检测客户端是否发送数据,收到数据后分割完整的数据包并解析协议,向一个队列中添加任务,worker 线程池中的某个线程从中获取任务,该 worker 线程会处理回调,如果任务中有耗时任务,不同的耗时任务有不同的队列,交给 go 线程池中的线程来处理,整个任务处理完成之后,将结果通过 poller 线程发给客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
#include <stdio.h>
#include "workflow/WFHttpServer.h"
int main()
{
WFHttpServer server([](WFHttpTask *task) {
task->get_resp()->append_output_body("<html>Hello World!</html>");
});
if (server.start(8888) == 0) { // start server on port 8888
getchar(); // press "Enter" to end.
server.stop();
}
return 0;
}

并发任务流,可能会面临操作临界资源的问题,如果用加锁的方式,可能需要业务层加锁操作,因此 workflow 提出了几种任务协调策略:

  1. counter:计数器,用于控制工作流。
    10 个任务,调用counter->count(),每个任务完成后,计数器减 1,当计数器为0即所有任务完成时,会回调一个函数。
  2. resource pool:资源池 =》“信号量”。

任务运行时需要从资源池里获得一个资源,任务运行结束将资源放回资源池,不希望占用线程等待。

100 个任务,资源池 10 个资源,初始时 10 个任务,有一个执行完成,剩余的 90 个任务就可以有 1 个获取到资源池中的 1 个资源来执行。如果用信号量实现会阻塞线程,而资源池不需要阻塞线程,资源池相当于用一个“条件”来控制。

  1. 条件任务:任务包装器,通过对条件任务发送信号触发被包装任务的执行。

个人浅析

workflow框架通过任务流的方式,把粒度较大的任务划分为小粒度任务,并通过合理调度(取模)将这些小粒度任务分给各线程处理,实质就是尽可能让各线程负载均衡的思路。

  • 思路有点类似Golang的GMP模型:就是通过合理调度(窃取、本地队列)让各M(工作线程)能尽可能均衡的负载协程G!
Prev
2025-06-28 22:34:08
Next