Thread Pool & Connection Pooling

07. 并发调度与资源池化

有界线程池负责业务并发,MySQL/Redis 连接池负责连接复用与健康检查,形成可控资源模型。

6 个核心函数3 个重点文件基于当前工程源码

模块职责

有界线程池负责业务并发,MySQL/Redis 连接池负责连接复用与健康检查,形成可控资源模型。

重点文件

调用链

  1. 1网络线程接收任务
  2. 2tryEnqueue 非阻塞投递
  3. 3worker 并行执行
  4. 4连接池借出资源
  5. 5RAII 归还与统计
非阻塞任务投递

ThreadPool::tryEnqueue

在不等待队列空间的前提下尝试加入任务,队列满或停止时立即返回。

thread_pool.h · L136–L159
原型template <class F> bool tryEnqueue(F&& f)

调用时机

NetworkServer 收到 UDP 包后调用。

返回说明

成功入队返回 true。

参数

参数说明
f可调用任务对象

执行流程

  1. 构造 Task
  2. 锁定队列
  3. 检查 accepting 状态
  4. 检查队列上限
  5. 入队并通知 worker

工程说明

避免网络接收线程因业务积压被阻塞。

关联接口

查看完整实现
thread_pool.h
template <class F>
    bool tryEnqueue(F&& f)
    {
        Task task(std::forward<F>(f));

        {
            std::lock_guard<std::mutex> lock(m_mutex);

            if (!m_accepting) {
                ++m_rejected;
                return false;
            }

            if (m_maxQueueSize != 0 && m_tasks.size() >= m_maxQueueSize) {
                ++m_rejected;
                return false;
            }

            m_tasks.emplace_back(std::move(task));
        }

        m_taskCv.notify_one();
        return true;
    }
工作线程主循环

ThreadPool::workerLoop

等待任务、执行任务并维护 active/completed/rejected 统计。

thread_pool.h · L256–L291
原型void workerLoop()

调用时机

每个工作线程创建时运行。

返回说明

无返回值。

参数

无显式参数。

执行流程

  1. 等待条件变量
  2. 读取队首任务
  3. 更新活跃计数
  4. 捕获任务异常
  5. 更新完成计数

工程说明

单任务异常不会终止 worker 线程。

关联接口

查看完整实现
thread_pool.h
void workerLoop()
    {
        while (true) {
            Task task;

            {
                std::unique_lock<std::mutex> lock(m_mutex);

                m_taskCv.wait(lock, [this]() {
                    return m_stopping || !m_tasks.empty();
                });

                if (m_stopping && m_tasks.empty()) {
                    return;
                }

                task = std::move(m_tasks.front());
                m_tasks.pop_front();
                ++m_active;
            }

            m_spaceCv.notify_one();

            try {
                task();
            } catch (const std::exception& e) {
                std::cerr << "⚠️ [" << m_name << "] task exception: "
                          << e.what() << "\n";
            } catch (...) {
                std::cerr << "⚠️ [" << m_name << "] task unknown exception\n";
            }

            --m_active;
            ++m_completed;
        }
    }
MySQL 连接池初始化

MySQLConnectionPool::init

保存配置并创建最小连接数,建立可复用的空闲连接队列。

mysql_connection_pool.cpp · L58–L93
原型bool MySQLConnectionPool::init(const MySQLPoolConfig& config)

调用时机

DBManager 初始化时调用。

返回说明

初始化成功返回 true。

参数

参数说明
config连接池参数

执行流程

  1. 校验连接数范围
  2. 写入配置
  3. 创建最小连接
  4. 加入空闲队列
  5. 设置初始化状态

工程说明

连接创建失败时执行清理并返回失败。

关联接口

查看完整实现
mysql_connection_pool.cpp
bool MySQLConnectionPool::init(const MySQLPoolConfig& config)
{
    shutdown();

    MySQLPoolConfig cfg = config;

    if (cfg.maxConnections == 0) {
        cfg.maxConnections = 1;
    }

    if (cfg.minConnections > cfg.maxConnections) {
        cfg.minConnections = cfg.maxConnections;
    }

    {
        std::lock_guard<std::mutex> lock(m_mutex);
        m_config = cfg;
        m_shutdown = false;
        m_initialized = true;
    }

    for (std::size_t i = 0; i < cfg.minConnections; ++i) {
        Node* node = createNode();

        if (!node) {
            std::cerr << "⚠️ [MySQLPool] 预创建连接失败 index=" << i << "\n";
            continue;
        }

        std::lock_guard<std::mutex> lock(m_mutex);
        m_idle.push_back(node);
        ++m_total;
    }

    return true;
}
MySQL 连接借用

MySQLConnectionPool::acquire

从空闲队列获取连接,必要时按上限创建新连接,并在超时后返回空句柄。

mysql_connection_pool.cpp · L125–L193
原型MySQLConnectionPool::Connection MySQLConnectionPool::acquire(int timeoutMs)

调用时机

DBManager 每次访问 MySQL 前调用。

返回说明

返回 RAII Connection。

参数

参数说明
timeoutMs等待超时;负数使用默认值

执行流程

  1. 计算截止时间
  2. 优先获取空闲连接
  3. 按上限创建连接
  4. 执行健康检查
  5. 返回 RAII 句柄

工程说明

Connection 析构时自动归还连接。

关联接口

查看完整实现
mysql_connection_pool.cpp
MySQLConnectionPool::Connection
MySQLConnectionPool::acquire(int timeoutMs)
{
    if (timeoutMs < 0) {
        timeoutMs = m_config.acquireTimeoutMs;
    }

    const auto deadline = Clock::now() + std::chrono::milliseconds(timeoutMs);

    while (true) {
        Node* node = nullptr;
        bool shouldCreate = false;

        {
            std::unique_lock<std::mutex> lock(m_mutex);

            if (!m_initialized || m_shutdown) {
                return {};
            }

            if (!m_idle.empty()) {
                node = m_idle.front();
                m_idle.pop_front();
            } else if (m_total < m_config.maxConnections) {
                ++m_total;
                shouldCreate = true;
            } else {
                if (m_cv.wait_until(lock, deadline) == std::cv_status::timeout) {
                    return {};
                }

                continue;
            }
        }

        if (shouldCreate) {
            node = createNode();

            if (!node) {
                std::lock_guard<std::mutex> lock(m_mutex);
                if (m_total > 0) {
                    --m_total;
                }
                m_cv.notify_one();
                return {};
            }

            return Connection(this, node);
        }

        if (!node) {
            continue;
        }

        if (isAlive(node)) {
            return Connection(this, node);
        }

        closeNode(node);

        {
            std::lock_guard<std::mutex> lock(m_mutex);
            if (m_total > 0) {
                --m_total;
            }
            m_cv.notify_one();
        }
    }
}
Redis 连接池初始化

RedisConnectionPool::init

创建最小 Redis 连接并准备空闲连接队列。

redis_connection_pool.cpp · L58–L93
原型bool RedisConnectionPool::init(const RedisPoolConfig& config)

调用时机

DBManager 初始化 Redis 缓存服务时调用。

返回说明

初始化成功返回 true。

参数

参数说明
configRedis 连接参数

执行流程

  1. 校验配置
  2. 创建连接节点
  3. 执行认证与选库
  4. 加入空闲队列
  5. 标记可用

工程说明

初始化阶段即验证 Redis 服务可达性。

关联接口

查看完整实现
redis_connection_pool.cpp
bool RedisConnectionPool::init(const RedisPoolConfig& config)
{
    shutdown();

    RedisPoolConfig cfg = config;

    if (cfg.maxConnections == 0) {
        cfg.maxConnections = 1;
    }

    if (cfg.minConnections > cfg.maxConnections) {
        cfg.minConnections = cfg.maxConnections;
    }

    {
        std::lock_guard<std::mutex> lock(m_mutex);
        m_config = cfg;
        m_shutdown = false;
        m_initialized = true;
    }

    for (std::size_t i = 0; i < cfg.minConnections; ++i) {
        Node* node = createNode();

        if (!node) {
            std::cerr << "⚠️ [RedisPool] 预创建连接失败 index=" << i << "\n";
            continue;
        }

        std::lock_guard<std::mutex> lock(m_mutex);
        m_idle.push_back(node);
        ++m_total;
    }

    return true;
}
Redis 连接借用

RedisConnectionPool::acquire

获取空闲 Redis 连接,或在容量允许时创建新连接,并进行必要的 PING 健康检查。

redis_connection_pool.cpp · L125–L193
原型RedisConnectionPool::Connection RedisConnectionPool::acquire(int timeoutMs)

调用时机

私聊、在线状态和会话缓存访问前调用。

返回说明

返回 RAII Connection。

参数

参数说明
timeoutMs等待超时

执行流程

  1. 等待可用节点
  2. 按上限扩容
  3. 检查连接存活
  4. 重建失效连接
  5. 返回句柄

工程说明

连接池限制并发连接数量并自动回收。

关联接口

查看完整实现
redis_connection_pool.cpp
RedisConnectionPool::Connection
RedisConnectionPool::acquire(int timeoutMs)
{
    if (timeoutMs < 0) {
        timeoutMs = m_config.acquireTimeoutMs;
    }

    const auto deadline = Clock::now() + std::chrono::milliseconds(timeoutMs);

    while (true) {
        Node* node = nullptr;
        bool shouldCreate = false;

        {
            std::unique_lock<std::mutex> lock(m_mutex);

            if (!m_initialized || m_shutdown) {
                return {};
            }

            if (!m_idle.empty()) {
                node = m_idle.front();
                m_idle.pop_front();
            } else if (m_total < m_config.maxConnections) {
                ++m_total;
                shouldCreate = true;
            } else {
                if (m_cv.wait_until(lock, deadline) == std::cv_status::timeout) {
                    return {};
                }

                continue;
            }
        }

        if (shouldCreate) {
            node = createNode();

            if (!node) {
                std::lock_guard<std::mutex> lock(m_mutex);
                if (m_total > 0) {
                    --m_total;
                }
                m_cv.notify_one();
                return {};
            }

            return Connection(this, node);
        }

        if (!node) {
            continue;
        }

        if (isAlive(node)) {
            return Connection(this, node);
        }

        closeNode(node);

        {
            std::lock_guard<std::mutex> lock(m_mutex);
            if (m_total > 0) {
                --m_total;
            }
            m_cv.notify_one();
        }
    }
}