🌈 个人主页:Zfox_
🔥 系列专栏:C++从入门到精通

一:🔥 服务端 - RpcRouter 实现

rpc_router.hpp

  • 提供 Rpc 请求处理回调函数
  • 内部的服务管理
    • ⽅法名称
    • 参数信息
    • 对外提供参数校验接⼝
#pragma once

#include "../common/net.hpp"
#include "../common/message.hpp"

namespace rpc
{
    namespace server
    {
        enum class VType
        {
            BOOL = 0,
            INTEGRAL,
            NUMERIC,
            SIRING,
            ARRAY,
            OBJECT
        };

        class ServiceDescribe
        {
        public:
            using ptr = std::shared_ptr<ServiceDescribe>;
            using ServiceCallback = std::function<void(const Json::Value& , Json::Value &)>;
            using ParamsDescirbe = std::pair<std::string, VType>;

            // 由于 mname 和 desc 是 const 的,你实际上不能对它们进行移动操作,因为移动语义通常会修改源对象的状态。因此,这里的 const 是有问题的。
            ServiceDescribe(std::string &&mname, std::vector<ParamsDescirbe> &&desc, VType vtype, ServiceCallback &&handler)
                : _method_name(std::move(mname)),
                  _params_desc(std::move(desc)),
                  _return_type(vtype),
                  _callback(std::move(handler))
            {}

            const std::string& method(){ return _method_name; }

            // 针对收到请求中的参数进行校验
            bool paramCheck(const Json::Value &params)
            {
                // 对params进行参数校验 --- 判断所描述的参数字段是否存在,类型是否一致
                for(auto &desc : _params_desc)
                {
                    if(params.isMember(desc.first) == false) {
                        LOG(LogLevel::ERROR) << "参数字段完整性校验失败! " << desc.first << " 字段缺失";
                        return false;
                    }
                    if(check(desc.second, params[desc.first]) == false) {
                        LOG(LogLevel::ERROR) << desc.first << " 参数类型校验失败! ";
                        return false;
                    }
                }
                return true;
            }

            bool call(const Json::Value &params, Json::Value &result)
            {
                _callback(params, result);
                if(rtypecheck(result) == false) {
                    LOG(LogLevel::ERROR) << "回调处理函数中的响应信息校验失败! ";
                    return false;
                }
                return true;
            }
        private:
            bool rtypecheck(const Json::Value &val) {
                return check(_return_type, val); 
            }
            
            bool check(VType vtype, const Json::Value &val) {
                switch (vtype)
                {
                    case VType::BOOL : return val.isBool();
                    case VType::INTEGRAL : return val.isIntegral();
                    case VType::NUMERIC : return val.isNumeric();
                    case VType::SIRING : return val.isString();
                    case VType::ARRAY : return val.isArray();
                    case VType::OBJECT : return val.isObject();
                }
                return false;
            }
        private:
            std::string _method_name;                    // 方法名称
            ServiceCallback _callback;                   // 实际业务的回调函数
            std::vector<ParamsDescirbe> _params_desc;    // 参数字段格式描述
            VType _return_type;                          // 结果作为返回值类型的描述
        };

        class SDescribeFactory
        {
        public:
            void setMethodName(const std::string &name) {
                _method_name = name;
            }

            void setReturnType(VType vtype) {
                _return_type = vtype;
            }

            void setParamsDesc(const std::string &pname, VType vtype)
            {
                _params_desc.push_back(ServiceDescribe::ParamsDescirbe(pname, vtype));
            }

            void setCallback(const ServiceDescribe::ServiceCallback &cb)
            {
                _callback = cb;
            }

            ServiceDescribe::ptr build() {
                return std::make_shared<ServiceDescribe>(std::move(_method_name), std::move(_params_desc), _return_type, std::move(_callback));
            }

        private:
            std::string _method_name;
            ServiceDescribe::ServiceCallback _callback;                     // 实际业务的回调函数
            std::vector<ServiceDescribe::ParamsDescirbe> _params_desc;      // 参数字段格式描述
            VType _return_type;                                             // 结果作为返回值类型的描述
        };

        class ServiceManager
        {
        public:
            using ptr = std::shared_ptr<ServiceManager>;
            void insert(const ServiceDescribe::ptr &desc)
            {   
                std::unique_lock<std::mutex> lock(_mutex);
                _services.insert(std::make_pair(desc->method(), desc));
            }

            ServiceDescribe::ptr select(const std::string &method_name)
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _services.find(method_name);
                if(it == _services.end()) {
                    return ServiceDescribe::ptr();
                }
                return it->second;
            }

            void remove(const std::string &method_name)
            {
                std::unique_lock<std::mutex> lock(_mutex);
                if(_services.find(method_name) != _services.end())
                {
                    _services.erase(method_name);
                }
            }
        private:
            std::mutex _mutex;
            std::unordered_map<std::string, ServiceDescribe::ptr> _services;
        };

        class RpcRouter
        {
        public:
            using ptr = std::shared_ptr<RpcRouter>;
            RpcRouter() : _service_manager(std::make_shared<ServiceManager>())
            {}

            // 这是注册到Dispatcher模块针对rpc请求进行回调函数的业务处理
            void onRpcRequest(const BaseConnection::ptr &conn, RpcRequest::ptr &request)
            {
                // 1. 查询客户端请求的方法描述 --- 判断当前服务端是否能提供对应的服务
                ServiceDescribe::ptr service = _service_manager->select(request->method());
                if(service.get() == nullptr) {
                    LOG(LogLevel::INFO) << request->method().c_str() << " 服务未找到! ";
                    return response(conn, request, Json::Value(), RCode::RCODE_NOT_FOUND_SERVICE);
                }
                // 2. 进行参数校验,确定能否提供服务
                if(service->paramCheck(request->params()) == false) {
                    LOG(LogLevel::INFO) << request->method().c_str() << " 服务参数校验失败! ";
                    return response(conn, request, Json::Value(), RCode::RCODE_INVALID_PARAMS);
                }
                // 3. 调用业务回调接口进行业务处理
                Json::Value result;
                bool ret = service->call(request->params(), result);
                if(ret == false) {
                    LOG(LogLevel::INFO) << request->method().c_str() << " 服务内部错误! ";
                    return response(conn, request, Json::Value(), RCode::RCODE_INTERNAL_ERROR);
                }
                // 4. 处理完毕得到结果,组织响应,向客户端发送
                return response(conn, request, result, RCode::RCODE_OK);
            }
            void registerMethod(const ServiceDescribe::ptr &service)
            {
                return _service_manager->insert(service);
            }
        private:
            void response(const BaseConnection::ptr &conn, const RpcRequest::ptr &req, const Json::Value &res, RCode rcode)
            {
                auto msg = MessageFactory::create<RpcResponse>();
                msg->setId(req->rid());
                msg->setMType(rpc::MType::RSP_RPC);
                msg->setRCode(rcode);
                msg->setResult(res);
                conn->send(msg);
            }
        private:
            ServiceManager::ptr _service_manager;
        };
    }
}

二:🔥 服务端 - Publish & Subscribe 实现

  • 对外提供主题操作处理回调函数
  • 对外提供消息发布处理回调函数
  • 内部进⾏主题及订阅者的管理
#pragma once

#include "../common/net.hpp"
#include "../common/message.hpp"
#include <unordered_set>

namespace rpc
{
    namespace server
    {
        class TopicManager
        {
        public:
            using ptr = std::shared_ptr<TopicManager>;
            TopicManager() {}

            void onTopicRequest(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg)
            {
                bool ret = true;
                switch (msg->optype())
                {
                    // 主题的创建
                    case TopicOptype::TOPIC_CREATE : topicCreate(conn, msg); break;
                    // 主题的删除
                    case TopicOptype::TOPIC_REMOVE : topicRemove(conn, msg); break;
                    // 主题的订阅
                    case TopicOptype::TOPIC_SUBSCRIBE : ret = topicSubscribe(conn, msg); break;
                    // 主题的取消订阅
                    case TopicOptype::TOPIC_CANCEL : topicCancle(conn, msg); break;
                    // 主题消息的发布
                    case TopicOptype::TOPIC_PUBLISH : ret = topicPublish(conn, msg); break;
                    default : return errorResponse(conn, msg, RCode::RCODE_INVALID_OPTYPE);
                }
                if(!ret) return errorResponse(conn, msg, RCode::RCODE_NOT_FOUND_TOPIC);
                return topicResponse(conn, msg);
            }

            // 订阅者在连接断开时的处理 --- 删除其关联的数据
            void onShutdown(const BaseConnection::ptr &conn)
            {
                // 消息发布者断开连接,不需要任何操作   消息订阅者断开连接需要删除管理数据
                // 1. 判断断开连接的是否是订阅者,不是的话直接返回 
                std::vector<Topic::ptr> topics;
                Subscriber::ptr subscriber;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto it = _subscribers.find(conn);
                    if(it == _subscribers.end()) {
                        return ;
                    }
                    subscriber = it->second;
                    // 2. 获取订阅者退出受影响的主题对象
                    for(auto &topic_name : subscriber->topics) {
                        auto topic_it = _topics.find(topic_name);
                        if(topic_it == _topics.end()) continue;
                        topics.push_back(topic_it->second);
                    }
                    // 3. 从订阅者信息当中,删除订阅者
                    _subscribers.erase(it);
                }
                // 4. 从受影响的主题对象中移除订阅者
                for(auto &topic : topics){
                    topic->removeSubscriber(subscriber);
                }
            }
        private:
                void errorResponse(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg, RCode rcode)
                {
                    auto msg_rsp = MessageFactory::create<TopicResponse>();
                    msg_rsp->setId(msg->rid());
                    msg_rsp->setMType(MType::RSP_TOPIC);
                    msg_rsp->setRCode(rcode);
                    conn->send(msg_rsp);
                }
                void topicResponse(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg)
                {
                    auto msg_rsp = MessageFactory::create<TopicResponse>();
                    msg_rsp->setId(msg->rid());
                    msg_rsp->setMType(MType::RSP_TOPIC);
                    msg_rsp->setRCode(RCode::RCODE_OK);
                    conn->send(msg_rsp);
                }

            void topicCreate(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg)
            {
                std::unique_lock<std::mutex> lock(_mutex);
                // 构造一个主题对象,添加映射关系的管理
                std::string topic_name = msg->topicKey();
                auto topic = std::make_shared<Topic>(topic_name);
                _topics.insert(std::make_pair(topic_name, topic));
            }

            void topicRemove(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg)
            {
                // 1. 查看当前主题,有哪些订阅者,然后从订阅者中将主题信息删除
                // 2. 删除主题的数据 -- 主题名称与主题对象的映射关系
                std::string topic_name = msg->topicKey();
                std::unordered_set<Subscriber::ptr> subscribers;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    // 在删除主题之前,先找出会受到影响的订阅者
                    auto it = _topics.find(topic_name);
                    if(it == _topics.end()) {
                        return ;
                    }
                    subscribers = it->second->subscribers;
                    _topics.erase(it);  // 删除当前的主题映射关系
                }
                for(auto &subscriber : subscribers) {
                    subscriber->removeTopic(topic_name);
                }
            }

            bool topicSubscribe(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg)
            {
                // 1. 先找出主题对象,以及订阅者对象
                //    如果没有找到主题 -- 就要报错  如果没有找到订阅者对象,那就要构造一个订阅者
                std::string topic_name = msg->topicKey();
                Topic::ptr topic;
                Subscriber::ptr subscriber;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto topic_it = _topics.find(topic_name);
                    if(topic_it == _topics.end()) {
                        return false;
                    }
                    topic = topic_it->second;
                    auto sub_it = _subscribers.find(conn);
                    if(sub_it != _subscribers.end()) {
                        subscriber = sub_it->second;
                    } else {
                        subscriber = std::make_shared<Subscriber>(conn);
                        _subscribers.insert(std::make_pair(conn, subscriber));
                    }
                }
                // 2. 在主题对象中,新增一个订阅者对象关联的连接;  在订阅者对象中新增一个订阅的主题
                topic->appendSubscriber(subscriber);
                subscriber->appendTopic(topic_name);
                return true;
            }

            void topicCancle(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg)
            {
                // 1. 先找出主题对象 和订阅者对象
                //    主题不存在就报错,订阅者不存在则返回
                // 2. 从主题对象中删除订阅者连接    从订阅者信息中删除我们所订阅的主题名称
                std::string topic_name = msg->topicKey();
                Topic::ptr topic;
                Subscriber::ptr subscriber;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto topic_it = _topics.find(topic_name);
                    if(topic_it != _topics.end()) {
                        topic = topic_it->second;
                    }
                    auto sub_it = _subscribers.find(conn);
                    if(sub_it != _subscribers.end()) {
                        subscriber = sub_it->second;
                    }
                }
                if(subscriber.get()) subscriber->removeTopic(topic_name);
                if(topic.get() && subscriber.get()) topic->removeSubscriber(subscriber);
            }

            bool topicPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr &msg)
            {
                std::string topic_name = msg->topicKey();
                Topic::ptr topic;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto topic_it = _topics.find(topic_name);
                    if(topic_it == _topics.end()) {
                        return false;
                    }
                    topic = topic_it->second;
                }
                topic->pushMessage(msg);
                return true;
            }
        private:
            struct Subscriber
            {
                using ptr = std::shared_ptr<Subscriber>;
                std::mutex _mutex;
                BaseConnection::ptr conn;
                std::unordered_set<std::string> topics;               // 订阅者所订阅的主题名称

                Subscriber(const BaseConnection::ptr &c) : conn(c)
                {}

                // 订阅主题的时候调用
                void appendTopic(const std::string &topic_name)
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    topics.insert(topic_name);
                }

                // 主题被删除 或者 取消订阅的时候,调用
                void removeTopic(const std::string &topic_name)
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    topics.erase(topic_name);
                }
            };
            struct Topic
            {
                using ptr = std::shared_ptr<Topic>;
                std::mutex _mutex;
                std::string topic_name;
                std::unordered_set<Subscriber::ptr> subscribers;     // 当前主题的订阅者

                Topic(const std::string &name) : topic_name(name)
                {}

                // 新增订阅的时候调用
                void appendSubscriber(const Subscriber::ptr &subscriber)
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    subscribers.insert(subscriber);
                }

                // 取消订阅 或者 订阅者连接断开 调用
                void removeSubscriber(const Subscriber::ptr &subscriber)
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    subscribers.erase(subscriber);
                }

                // 收到消息发布请求的时候调用
                void pushMessage(const BaseMessage::ptr &msg)
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    for(auto &subscriber : subscribers) {
                        subscriber->conn->send(msg);
                    }
                }
            };
        private:
            std::mutex _mutex;
            std::unordered_map<std::string, Topic::ptr> _topics;
            std::unordered_map<BaseConnection::ptr, Subscriber::ptr> _subscribers;
        };
    }
}

三:🔥 服务端 - Registry & Discovery实现

  • 对外提供服务操作(注册/发现)消息处理回调函数
  • 内部进⾏服务发现者的管理
  • 内部进⾏服务提供者的管理
#pragma once

#include "../common/net.hpp"
#include "../common/message.hpp"
#include <set>

namespace rpc
{
    namespace server
    {
        class ProviderManager
        {
        public:
            using ptr = std::shared_ptr<ProviderManager>;
            struct Provider
            {
                using ptr = std::shared_ptr<Provider>;
                std::mutex _mutex;
                BaseConnection::ptr conn;
                Address host;
                std::vector<std::string> methods;

                Provider(const BaseConnection::ptr &c, const Address &h) : conn(c), host(h)
                {
                }
                void appendMethod(const std::string &method)
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    methods.emplace_back(method);
                }
            };
            // 当一个新的服务提供者进行服务注册的时候调用
            void addProvider(const BaseConnection::ptr &c, const Address &h, const std::string &method)
            {
                Provider::ptr provider;
                // 查找连接所关联的服务提供者对象,找到则获取,找不到则创建,并建立关联
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto it = _conns.find(c);
                    if (it != _conns.end()) {
                        provider = it->second;
                    } else {
                        provider = std::make_shared<Provider>(c, h);
                        _conns.insert(std::make_pair(c, provider));
                    }
                    // method方法的提供主机 _providers要新增信息
                    auto &providers = _providers[method];       // 有的话直接获取 没有的话也直接创建了
                    providers.insert(provider);
                }
                // 向服务对象中新增一个所能提供的服务名称
                provider->appendMethod(method);
            }
            // 当一个服务提供者断开连接的时候,获取它的信息 -- 用于进行服务下线通知
            Provider::ptr getProvider(const BaseConnection::ptr &c) 
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _conns.find(c);
                if (it != _conns.end()) {
                    return it->second;
                }
                return Provider::ptr();
            }
            // 当一个服务提供者断开连接的时候,删除它的关联信息
            void delProvider(const BaseConnection::ptr &c) 
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _conns.find(c);
                if (it == _conns.end()) {
                    // 当前断开连接的不是一个服务提供者
                    return ;
                }
                // 如果是提供者 看看提供了什么服务 从服务者提供信息中 删除当前服务提供者
                for(auto &method : it->second->methods) {
                    auto &provider = _providers[method];
                    provider.erase(it->second);
                }
                // 删除连接与服务提供者的关联关系
                _conns.erase(it);
            }

            std::vector<Address> methodHosts(const std::string &method)
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _providers.find(method);
                if(it == _providers.end()) {
                    return std::vector<Address>();
                }
                std::vector<Address> result;
                for(auto& providers : it->second) {
                    result.push_back(providers->host);
                }
                return result;
            }

        private:
            std::mutex _mutex;
            std::unordered_map<std::string, std::set<Provider::ptr>> _providers;    // 方法的提供者    用于服务发现
            std::unordered_map<BaseConnection::ptr, Provider::ptr> _conns;          // 连接对应的方法  用于服务下线
        };

        class DiscovererManager
        {
        public:
            using ptr = std::shared_ptr<DiscovererManager>;
            struct Discoverer
            {
                using ptr = std::shared_ptr<Discoverer>;
                std::mutex _mutex;
                BaseConnection::ptr conn;         // 发现者关联的客户端连接
                std::vector<std::string> methods; // 发现过的服务名称

                Discoverer(const BaseConnection::ptr &c) : conn(c)
                {}
                void appendMethod(const std::string &method) 
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    methods.push_back(method);
                }
            };

            // 当每次客户端进行服务发现的时候新增发现者, 新增服务名称
            Discoverer::ptr addDiscoverer(const BaseConnection::ptr &c, const std::string &method) 
            {
                Discoverer::ptr discoverer;
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto it = _conns.find(c);
                    if(it != _conns.end()) {
                        discoverer = _conns[c];
                    }else {
                        discoverer = std::make_shared<Discoverer>(c);
                        _conns.insert(std::make_pair(c, discoverer));
                    }
                    auto &discoverers = _discoverers[method];       // 有的话直接获取 没有的话也直接创建了
                    discoverers.insert(discoverer);
                }
                discoverer->appendMethod(method);
                return discoverer;
            }
            // 发现者客户端断开连接时,找到发现者信息,删除关联数据
            void delDiscoverer(const BaseConnection::ptr &c) 
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _conns.find(c);
                if(it == _conns.end()) {
                    // 没有找到连接对应的发现者信息,代表客户端不是一个服务发现者
                    return ;
                }
                for(auto &method : it->second->methods) {
                    auto& discovers = _discoverers[method];
                    discovers.erase(it->second);
                }
                _conns.erase(it);
            }
            // 当有一个新的服务提供者上线,则进行上线通知
            void onlineNotify(const std::string &method, const Address &host) 
            {
                return notify(method, host, ServiceOptype::SERVICE_ONLINE);
            }
            // 当有一个服务提供者断开连接,则进行下线通知
            void offlineNotify(const std::string &method, const Address &host) 
            {
                return notify(method, host, ServiceOptype::SERVICE_OFFLINE);
            }
        private:
            void notify(const std::string &method, const Address &host, ServiceOptype optype)
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _discoverers.find(method);
                if(it == _discoverers.end()) {
                    // 代表这个服务当前没有发现者
                    return ;
                }
                auto msg_req = MessageFactory::create<ServiceRequest>();
                msg_req->setId(UUID::uuid());
                msg_req->setMType(MType::REQ_SERVICE);
                msg_req->setMethod(method);
                msg_req->setHost(host);
                msg_req->setOptype(optype);
                for(auto &discoverer : it->second) {
                    discoverer->conn->send(msg_req);
                }
            }
        private:
            std::mutex _mutex;
            std::unordered_map<std::string, std::set<Discoverer::ptr>> _discoverers; // 哪些服务被哪些主机发现过
            std::unordered_map<BaseConnection::ptr, Discoverer::ptr> _conns;          // 连接对应的发现者
        };

        class PDManager
        {
        public:
            using ptr = std::shared_ptr<PDManager>;
            PDManager() 
                : _providers(std::make_shared<ProviderManager>()),
                  _discoverers(std::make_shared<DiscovererManager>())
            {}

            // 提供给dispatcher的服务注册/发现回调函数
            void onServiceRequest(const BaseConnection::ptr &conn, const ServiceRequest::ptr msg)
            {
                // 服务操作请求:服务注册 服务发现
                ServiceOptype optype = msg->optype();
                if(optype == ServiceOptype::SERVICE_REGISTRY) {
                    // 服务注册: 
                    // 1. 新增服务提供者    2. 进行服务上线通知
                    LOG(LogLevel::INFO) << msg->host().first << ":" << msg->host().second << " 注册服务 " << msg->method();
                    _providers->addProvider(conn, msg->host(), msg->method());
                    _discoverers->onlineNotify(msg->method(), msg->host());
                    return registryResponse(conn, msg);
                }else if(optype == ServiceOptype::SERVICE_DISCOVERY) {
                    // 服务发现
                    // 1. 新增服务发现者
                    LOG(LogLevel::INFO) << " 客户端要进行 " << msg->method() << " 服务发现 ";
                    _discoverers->addDiscoverer(conn, msg->method());

                    // 应该要通知发现者现在上线的服务 
                    return discoveryResponse(conn, msg); 
                }else {
                    LOG(LogLevel::ERROR) << "收到服务操作请求,但是操作类型错误! ";
                    return errorResponse(conn, msg);
                }
            }
            void onConnShutdown(const BaseConnection::ptr &conn) 
            {
                auto provider = _providers->getProvider(conn);
                if(provider.get() != nullptr) {
                    LOG(LogLevel::INFO) << provider->host.first << ":" << provider->host.second << " 服务下线 ";
                    for(auto &method : provider->methods) {
                        _discoverers->offlineNotify(method, provider->host);
                    }
                    _providers->delProvider(conn);
                    return ;
                }
                _discoverers->delDiscoverer(conn);
            }
        private:
            void errorResponse(const BaseConnection::ptr &conn, const ServiceRequest::ptr &msg)
            {
                auto msg_rsp = MessageFactory::create<ServiceResponse>();
                msg_rsp->setId(msg->rid());
                msg_rsp->setMType(MType::RSP_SERVICE);
                msg_rsp->setRCode(RCode::RCODE_INVALID_OPTYPE);
                msg_rsp->setOptype(ServiceOptype::SERVICE_UNKNOW);
                conn->send(msg_rsp);
            }
            void registryResponse(const BaseConnection::ptr &conn, const ServiceRequest::ptr &msg)
            {
                auto msg_rsp = MessageFactory::create<ServiceResponse>();
                msg_rsp->setId(msg->rid());
                msg_rsp->setMType(MType::RSP_SERVICE);
                msg_rsp->setRCode(RCode::RCODE_OK);
                msg_rsp->setOptype(ServiceOptype::SERVICE_REGISTRY);
                conn->send(msg_rsp);
            }

            void discoveryResponse(const BaseConnection::ptr &conn, const ServiceRequest::ptr &msg)
            {
                auto msg_rsp = MessageFactory::create<ServiceResponse>();
                std::vector<Address> hosts = _providers->methodHosts(msg->method());
                msg_rsp->setId(msg->rid());
                msg_rsp->setMType(MType::RSP_SERVICE);
                msg_rsp->setOptype(ServiceOptype::SERVICE_DISCOVERY);
                if(hosts.empty()) {
                    msg_rsp->setRCode(RCode::RCODE_NOT_FOUND_SERVICE);
                    return conn->send(msg_rsp);
                }
                msg_rsp->setRCode(RCode::RCODE_OK);
                msg_rsp->setMethod(msg->method());
                msg_rsp->setHost(std::move(hosts));
                conn->send(msg_rsp);
            }
        private:
            ProviderManager::ptr _providers;
            DiscovererManager::ptr _discoverers;
        };
    }
}

四:🔥 服务端 - 整合封装 Server

rpc_server.hpp

#pragma once

#include "../common/dispatcher.hpp"
#include "../client/rpc_client.hpp"
#include "rpc_router.hpp"
#include "rpc_registry.hpp"
#include "rpc_topic.hpp"

namespace rpc
{
    namespace server
    {
        // 注册中心服务器: 只需要针对服务器注册与发现请求处理即可 
        class RegistryServer
        {
        public:
            using ptr = std::shared_ptr<RegistryServer>;
            RegistryServer(int port)
                : _pd_manager(std::make_shared<PDManager>()),
                  _dispatcher(std::make_shared<Dispatcher>())
            {
                auto service_cb = std::bind(&PDManager::onServiceRequest, _pd_manager.get(), std::placeholders::_1, std::placeholders::_2);
                _dispatcher->registerHandler<ServiceRequest>(MType::REQ_SERVICE, service_cb);

                _server = ServerFactory::create(port);
                auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
                _server->setMessageCallback(message_cb);

                auto close_cb = std::bind(&RegistryServer::onConnShutdown, this, std::placeholders::_1);
                _server->setCloseCallback(close_cb);
            }

            void start()
            {
                _server->start();
            }
        private:
            void onConnShutdown(const BaseConnection::ptr &conn) 
            {
                _pd_manager->onConnShutdown(conn);
            }
        private:
            PDManager::ptr _pd_manager;
            Dispatcher::ptr _dispatcher;
            BaseServer::ptr _server;
        };

        class Rpcserver
        {
        public:
            using ptr = std::shared_ptr<Rpcserver>;
            // rpcserver端有两套地址信息
            // 1. rpc服务提供端地址信息 -- 必须是rpc服务器对外访问的地址信息
            // 2. 注册中心服务端地址信息 -- 连接服务注册中心
            Rpcserver(const Address &access_addr, bool enableRegistry = false, const Address &registry_server_addr = Address()) 
                : _enableRegistry(enableRegistry),
                  _access_addr(access_addr),
                  _router(std::make_shared<RpcRouter>()),
                  _dispatcher(std::make_shared<Dispatcher>())
            {
                if(enableRegistry) {
                    _reg_client = std::make_shared<client::RegistryClient>(registry_server_addr.first, registry_server_addr.second);
                }
                // 当前成员server是一个rpcserver,用于提供rpc服务的
                auto rpc_cb = std::bind(&RpcRouter::onRpcRequest, _router.get(), std::placeholders::_1, std::placeholders::_2);
                _dispatcher->registerHandler<RpcRequest>(MType::REQ_RPC, rpc_cb);

               _server = ServerFactory::create(_access_addr.second);

               auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
                _server->setMessageCallback(message_cb);
            }

            void registerMethod(const ServiceDescribe::ptr &service)
            {
                if(_enableRegistry) {
                    // 向服务中心注册
                    _reg_client->registryMethod(service->method(), _access_addr);
                }
                _router->registerMethod(service);
            }

            void start()
            {
                _server->start();
            }
        private:
            bool _enableRegistry;
            Address _access_addr;
            Address _registry_server_addr;
            client::RegistryClient::ptr _reg_client;
            RpcRouter::ptr _router;
            Dispatcher::ptr _dispatcher;
            BaseServer::ptr _server;
        };

        // Topic服务器
        class TopicServer
        {
        public:
            using ptr = std::shared_ptr<TopicServer>;
            TopicServer(int port)
                : _topic_manager(std::make_shared<TopicManager>()),
                  _dispatcher(std::make_shared<Dispatcher>())
            {
                auto topic_cb = std::bind(&TopicManager::onTopicRequest, _topic_manager.get(), std::placeholders::_1, std::placeholders::_2);
                _dispatcher->registerHandler<TopicRequest>(MType::REQ_TOPIC, topic_cb);

                _server = ServerFactory::create(port);
                auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
                _server->setMessageCallback(message_cb);

                auto close_cb = std::bind(&TopicServer::onConnShutdown, this, std::placeholders::_1);
                _server->setCloseCallback(close_cb);
            }

            void start()
            {
                _server->start();
            }
        private:
            void onConnShutdown(const BaseConnection::ptr &conn) 
            {
                _topic_manager->onShutdown(conn);
            }
        private:
            TopicManager::ptr _topic_manager;
            Dispatcher::ptr _dispatcher;
            BaseServer::ptr _server;
        };
    }
}

五:🔥 客户端 - Requestor 实现

requestor.hpp

  • 提供发送请求的接⼝
  • 内部进⾏请求 & 响应的管理
#pragma once

#include "../common/net.hpp"
#include "../common/message.hpp"
#include <future>

namespace rpc
{
    namespace client
    {
        class Requestor
        {
        public:
            using ptr = std::shared_ptr<Requestor>;
            using RequestCallback = std::function<void(const BaseMessage::ptr&)>; 
            using AsyncResponse = std::future<BaseMessage::ptr>;
            struct RequestDescribe
            {
                using ptr = std::shared_ptr<RequestDescribe>;
                BaseMessage::ptr request;
                rpc::RType rtype;
                std::promise<BaseMessage::ptr> response;
                RequestCallback callback;
            };

            // 注册给dispatcher的
            void onResponse(const BaseConnection::ptr &conn, BaseMessage::ptr &msg)
            {
                std::string rid = msg->rid();
                RequestDescribe::ptr rdp = getDescribe(rid);
                if(rdp.get() == nullptr) {
                    LOG(LogLevel::ERROR) << "收到响应:" << msg->rid() << "但未找到对应的请求描述";
                    return ;
                }
                if(rdp->rtype == RType::REQ_ASYNC) {
                    rdp->response.set_value(msg);
                }else if(rdp->rtype == RType::REQ_CALLBACK) {
                    if(rdp->callback) rdp->callback(msg); 
                }else {
                    LOG(LogLevel::ERROR) << "响应类型未知";
                } 
                // 这里del是没有问题的 此时result已经传出去了 再删除的 所以不会出问题
                delDescribe(rid);
            }

            bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, AsyncResponse &async_rsp)
            {
                RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_ASYNC);
                if(rdp.get() == nullptr) {
                    LOG(LogLevel::FATAL) << "构造请求描述对象失败! ";
                    return false;
                }
                conn->send(req);
                // ononResponse 就会将响应设置到 async_rsp里
                async_rsp = rdp->response.get_future();
                return true;
            }

            bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, BaseMessage::ptr &rsp)
            {
                AsyncResponse rsp_future;
                bool ret = send(conn, req, rsp_future);
                if(ret == false) {
                    return false;
                }
                rsp = rsp_future.get();
                return true;
            }

            bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, const RequestCallback &cb)
            {
                RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_CALLBACK, cb);
                if(rdp.get() == nullptr) {
                    LOG(LogLevel::FATAL) << "构造请求描述对象失败! ";
                    return false;
                }
                conn->send(req);
                return true;
            }
        private:
            RequestDescribe::ptr newDescribe(const BaseMessage::ptr &req, RType rtype, const RequestCallback &cb = RequestCallback())
            {
                std::unique_lock<std::mutex> lock(_mutex);
                RequestDescribe::ptr rd = std::make_shared<RequestDescribe>();
                rd->request = req;
                rd->rtype = rtype;
                // 这里不需要管异步操作的结果 onresponse处理了
                if(rtype == RType::REQ_CALLBACK && cb) {
                    rd->callback = cb;
                }
                _request_desc.insert(std::make_pair(req->rid(), rd));
                return rd;
            }

            RequestDescribe::ptr getDescribe(const std::string &rid)
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _request_desc.find(rid);
                if(it == _request_desc.end()) {
                    return RequestDescribe::ptr();
                }
                return it->second;
            }

            void delDescribe(const std::string &rid)
            {
                std::unique_lock<std::mutex> lock(_mutex);
                if(_request_desc.find(rid) != _request_desc.end()) {
                    _request_desc.erase(rid);
                }
            }
        private:
            std::mutex _mutex;
            std::unordered_map<std::string, RequestDescribe::ptr> _request_desc;
        };
    };
}

六:🔥 客户端 - RpcCaller实现

  • 提供 Rpc 请求接⼝
#pragma once

#include "requestor.hpp"

namespace rpc
{
    namespace client
    {
        class RpcCaller
        {
        public:
            RpcCaller(const Requestor::ptr &requestor) : _requestor(requestor) {}
            using ptr = std::shared_ptr<RpcCaller>;
            using JsonAsyncResponse = std::future<Json::Value>;
            using JsonResponseCallback = std::function<void(const Json::Value&)>;

            // requestor中的处理是针对BaseMessage进行处理的 由于在rpccaller中针对结果的处理是针对 RpcResponse里边的result进行的
            // 同步
            bool call(const BaseConnection::ptr &conn, const std::string &method, const Json::Value &params, Json::Value &result)
            {
                // 1. 组织请求
                auto req_msg = MessageFactory::create<RpcRequest>();
                req_msg->setId(UUID::uuid());
                req_msg->setMType(MType::REQ_RPC);
                req_msg->setMethod(method);
                req_msg->setParams(params);
                // 2. 发送请求
                BaseMessage::ptr rsp_msg;
                bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), rsp_msg);    // 阻塞
                if(ret == false) {
                    LOG(LogLevel::ERROR) << "同步Rpc请求失败! ";
                    return false;
                }
                // 3. 等待响应
                RpcResponse::ptr rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(rsp_msg);
                if(!rpc_rsp_msg) {
                    LOG(LogLevel::ERROR) << "Rpc响应, 向下类型转换失败! ";
                    return false;
                }
                if(rpc_rsp_msg->rcode() != RCode::RCODE_OK) {
                    LOG(LogLevel::ERROR) << "Rpc请求出错: " << errReason(rpc_rsp_msg->rcode());
                    return false;
                }
                result = rpc_rsp_msg->result();
                return true;
            }

            // 异步
            bool call(const BaseConnection::ptr &conn, const std::string &method, const Json::Value &params, JsonAsyncResponse &result)
            {
                // 1. 向服务器发送异步回调请求,设置回调函数,回调函数会传入一个promise对象,在回调函数中对promise设置数据
                auto req_msg = MessageFactory::create<RpcRequest>();
                req_msg->setId(UUID::uuid());
                req_msg->setMType(MType::REQ_RPC);
                req_msg->setMethod(method);
                req_msg->setParams(params);
                // 2. 发送请求  这里必须是智能指针 不然json_promise释放掉 future会抛出异常
                auto json_promise = std::make_shared<std::promise<Json::Value>>();
                result = json_promise->get_future();
                Requestor::RequestCallback cb = std::bind(&RpcCaller::Callback, this, json_promise, std::placeholders::_1);
                bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), cb);
                if(ret == false) {
                    LOG(LogLevel::ERROR) << "异步Rpc请求失败! ";
                    return false;
                }
                return true;
            }

            // 回调
            bool call(const BaseConnection::ptr &conn, const std::string &method, const Json::Value &params, const JsonResponseCallback &cb)        // 回调函数必须写const 因为函数地址是常量
            {
                auto req_msg = MessageFactory::create<RpcRequest>();
                req_msg->setId(UUID::uuid());
                req_msg->setMType(MType::REQ_RPC); 
                req_msg->setMethod(method);
                req_msg->setParams(params);

                Requestor::RequestCallback req_cb = std::bind(&RpcCaller::Callback1, this, cb, std::placeholders::_1);      // 回调函数里绑定了回调函数
                bool ret = _requestor->send(conn, std::dynamic_pointer_cast<BaseMessage>(req_msg), req_cb);
                if(ret == false) {
                    LOG(LogLevel::ERROR) << "回调Rpc请求失败! ";
                    return false;
                }
                return true;
            }
        private:
            void Callback1(const JsonResponseCallback &cb, const BaseMessage::ptr &msg)
            {
                RpcResponse::ptr rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(msg);
                if(!rpc_rsp_msg) {
                    LOG(LogLevel::ERROR) << "Rpc响应, 向下类型转换失败! ";
                    return ;
                }
                if(rpc_rsp_msg->rcode() != RCode::RCODE_OK) {
                    LOG(LogLevel::ERROR) << "Rpc回调请求出错: " << errReason(rpc_rsp_msg->rcode());
                    return ;
                }
                cb(rpc_rsp_msg->result());
            }

            void Callback(std::shared_ptr<std::promise<Json::Value>> result, const BaseMessage::ptr &msg)
            {
                RpcResponse::ptr rpc_rsp_msg = std::dynamic_pointer_cast<RpcResponse>(msg);
                if(!rpc_rsp_msg) {
                    LOG(LogLevel::ERROR) << "Rpc响应, 向下类型转换失败! ";
                    return ;
                }
                if(rpc_rsp_msg->rcode() != RCode::RCODE_OK) {
                    LOG(LogLevel::ERROR) << "Rpc异步请求出错: " << errReason(rpc_rsp_msg->rcode());
                    return ;
                }
                result->set_value(rpc_rsp_msg->result());
            }
        private:
            Requestor::ptr _requestor;
        };
    }
}

七:🔥 客户端 - Publish & Subscribe实现

rpc_topic.hpp

  • 提供消息发布接⼝
  • 提供主题操作接⼝
  • 内部进⾏主题及订阅者的管理
#pragma once

#include "requestor.hpp"
#include <unordered_set>

namespace rpc
{
    namespace client
    {
        class TopicManager
        {
        public:
            using SubCallback = std::function<void(const std::string &key, const std::string &msg)>;
            using ptr = std::shared_ptr<TopicManager>;
            TopicManager(const Requestor::ptr &requestor) : _requestor(requestor)
            {}

            bool create(const BaseConnection::ptr &conn, const std::string &key)
            {
                return commonRequestor(conn, key, TopicOptype::TOPIC_CREATE);
            }

            bool remove(const BaseConnection::ptr &conn, const std::string &key)
            {
                return commonRequestor(conn, key, TopicOptype::TOPIC_REMOVE);
            }

            // 订阅后收到消息的回调处理函数
            bool subscribe(const BaseConnection::ptr &conn, const std::string &key, const SubCallback &cb)
            {
                // 先建立映射关系 因为可能一订阅就有消息来了 此时回调函数还没注册
                addSubscribe(key, cb);
                bool ret = commonRequestor(conn, key, TopicOptype::TOPIC_SUBSCRIBE);
                if(ret == false) {
                    delSubscribe(key);
                    return false;
                }
                return true;
            }

            bool cancel(const BaseConnection::ptr &conn, const std::string &key)
            {
                delSubscribe(key);
                return commonRequestor(conn, key, TopicOptype::TOPIC_CANCEL);
            }

            bool publish(const BaseConnection::ptr &conn, const std::string &key, const std::string &msg)
            {
                return commonRequestor(conn, key, TopicOptype::TOPIC_PUBLISH, msg);
            }

            // 服务端推送给过来的消息请求
            void onPublish(const BaseConnection::ptr &conn, const TopicRequest::ptr& msg)
            {
                // 1. 从消息中取出操作类型进行判断,是否是消息请求
                auto type = msg->optype();
                if(type != TopicOptype::TOPIC_PUBLISH) {
                    LOG(LogLevel::ERROR) << "收到了错误类型的主题操作! ";
                    return ;
                }
                // 2. 取出消息主题名称,以及消息内容
                std::string topic_key = msg->topicKey();
                std::string topic_msg = msg->topicMsg();
                // 3. 通过主题名称,查找对应主题的回调处理函数 有则处理 无则报错
                auto callback = getSubscribe(topic_key);
                if(!callback) {
                    LOG(LogLevel::ERROR) << "收到了 " << topic_key << " 主题消息,但是该消息无主题处理回调! ";
                    return ;
                }
                return callback(topic_key, topic_msg);
            }
        private:
            void addSubscribe(const std::string &key, const SubCallback &cb)
            {
                std::unique_lock<std::mutex> lock(_mutex);
                _topic_callbacks.insert(std::make_pair(key, cb));
            }

            void delSubscribe(const std::string &key)
            {
                std::unique_lock<std::mutex> lock(_mutex);
                _topic_callbacks.erase(key);
            }

            const SubCallback getSubscribe(const std::string &key)
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _topic_callbacks.find(key);
                if(it == _topic_callbacks.end()) return SubCallback();
                return it->second;
            }

            bool commonRequestor(const BaseConnection::ptr &conn, const std::string &key, TopicOptype type, const std::string &msg = "")
            {
                // 1. 构造请求对象,并填充数据
                auto msg_req = MessageFactory::create<TopicRequest>();
                msg_req->setId(UUID::uuid());
                msg_req->setMType(MType::REQ_TOPIC);
                msg_req->setOptype(type);
                msg_req->setTopicKey(key);
                if(type == TopicOptype::TOPIC_PUBLISH) {
                    msg_req->setTopicMsg(msg);
                }
                // 2. 向服务端发送请求,等待响应
                BaseMessage::ptr msg_rsp;
                bool ret = _requestor->send(conn, msg_req, msg_rsp);
                if(ret == false) {
                    LOG(LogLevel::ERROR) << "主题操作请求失败! ";
                    return false;
                }
                // 3. 判断请求处理是否成功
                TopicResponse::ptr topic_rsp_msg = std::dynamic_pointer_cast<TopicResponse>(msg_rsp);
                if(!topic_rsp_msg) {
                    LOG(LogLevel::ERROR) << "主题操作响应, 向下类型转换失败! ";
                    return false;
                }
                if(topic_rsp_msg->rcode() != RCode::RCODE_OK) {
                    LOG(LogLevel::ERROR) << "主题操作请求出错: " << errReason(topic_rsp_msg->rcode());
                    return false;
                }
                return true;
            }
        private:
            std::mutex _mutex;
            std::unordered_map<std::string, SubCallback> _topic_callbacks;
            Requestor::ptr _requestor;
        };
    }
}

八:🔥 客户端 - Registry & Discovery实现

rpc_registry.hpp

  • 提供服务发现接⼝
  • 提供服务注册接⼝
  • 提供服务操作(上线/下线)通知处理回调函数
  • 内部进⾏发现的服务与主机信息管理
#pragma once

#include "requestor.hpp"
#include <unordered_set>

namespace rpc
{
    namespace client
    {
        class Provider 
        {
        public: 
            using ptr = std::shared_ptr<Provider>;
            Provider(const Requestor::ptr &requestor) : _requestor(requestor)
            {}

            bool registryMethod(const BaseConnection::ptr &conn, const std::string &method, const Address &host) 
            {
                auto msg_req = MessageFactory::create<ServiceRequest>();
                msg_req->setId(UUID::uuid());
                msg_req->setMType(MType::REQ_SERVICE);
                msg_req->setMethod(method);
                msg_req->setHost(host);
                msg_req->setOptype(ServiceOptype::SERVICE_REGISTRY);
                BaseMessage::ptr msg_rsp;
                bool ret = _requestor->send(conn, msg_req, msg_rsp);
                if(ret == false) {
                    LOG(LogLevel::ERROR) << method << " 服务注册失败! ";
                    return false;
                }
                auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);
                if(service_rsp.get() == nullptr) {
                    LOG(LogLevel::ERROR) << "响应类型向下转换失败! ";
                    return false;
                }
                if(service_rsp->rcode() != RCode::RCODE_OK) {
                    LOG(LogLevel::ERROR) << "服务注册失败, 原因: " << errReason(service_rsp->rcode());
                    return false;
                }
                return true;
            }
        private:
            Requestor::ptr _requestor;
        };

        class MethodHost
        {
        public:
            using ptr = std::shared_ptr<MethodHost>;
            MethodHost() : _idx(0) {}
            MethodHost(const std::vector<Address> &hosts) : _hosts(hosts), _idx(0)
            {}

            void appendHost(const Address &host)
            {   
                // 中途收到了服务上线请求后调用
                std::unique_lock<std::mutex> lock(_mutex);
                _hosts.push_back(host);
            }

            void removeHost(const Address& host)
            {
                // 中途收到了服务下线请求后调用
                std::unique_lock<std::mutex> lock(_mutex);
                for(std::vector<Address>::iterator it = _hosts.begin(); it != _hosts.end(); ++it) {
                    if(*it == host) {
                        _hosts.erase(it);
                        return ;
                    }
                }
            }

            Address chooseHost()
            {
                std::unique_lock<std::mutex> lock(_mutex);
                if(_idx >= _hosts.size()) _idx = 0;
                return _hosts[_idx++];
            }

            bool empty() 
            { 
                std::unique_lock<std::mutex> lock(_mutex);
                return _hosts.empty(); 
            }
        private:
            std::mutex _mutex;
            size_t _idx;                    // 负载均衡
            std::vector<Address> _hosts;
        };

        class Discoverer
        {
        public:
            using OfflineCallback = std::function<void(const Address&)>;
            using ptr = std::shared_ptr<Discoverer>;
            Discoverer(const Requestor::ptr &requestor, const OfflineCallback &cb) 
                : _requestor(requestor),
                  _offline_callback(cb)
            {}

            bool serviceDiscovery(const BaseConnection::ptr &conn, const std::string &method, Address &host)
            {
                // 当前所保管的提供者信息存在,则直接返回地址
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto it = _method_hosts.find(method);
                    if(it != _method_hosts.end()) {
                        if(it->second->empty() == false) {
                            host = it->second->chooseHost();
                            return true;
                        }
                    }
                }
                // 当前服务提供者为空
                auto msg_req = MessageFactory::create<ServiceRequest>();
                msg_req->setId(UUID::uuid());
                msg_req->setMType(MType::REQ_SERVICE);
                msg_req->setMethod(method);
                msg_req->setOptype(ServiceOptype::SERVICE_DISCOVERY);
                BaseMessage::ptr msg_rsp;
                bool ret = _requestor->send(conn, msg_req, msg_rsp);
                if(ret == false) {
                    LOG(LogLevel::ERROR) << "服务发现失败! ";
                    return false;
                }
                auto service_rsp = std::dynamic_pointer_cast<ServiceResponse>(msg_rsp);
                if(service_rsp.get() == nullptr) {
                    LOG(LogLevel::ERROR) << "响应类型向下转换失败! ";
                    return false;
                }
                if(service_rsp->rcode() != RCode::RCODE_OK) {
                    LOG(LogLevel::ERROR) << "服务发现失败! " << errReason(service_rsp->rcode());
                    return false;
                }

                // 能走到这里,代表当前_method_hosts 没有对应的服务提供主机
                {
                    std::unique_lock<std::mutex> lock(_mutex);
                    MethodHost::ptr method_host = std::make_shared<MethodHost>(service_rsp->hosts());
                    if(method_host->empty()) {
                        LOG(LogLevel::ERROR) << method << "服务发现失败! 没有能提供服务的主机" << errReason(service_rsp->rcode());
                        return false;
                    }
                    host = method_host->chooseHost();
                    _method_hosts[method] = method_host;
                }
                
                return true;
            }

            // 提供给dispatcher 进行服务上线下线请求处理的回调函数  服务器告诉客户端
            void onServiceRequest(const BaseConnection::ptr &conn, const ServiceRequest::ptr &msg) 
            {
                // 1. 判断是上线还是下线请求,如果都不是就不用处理了
                auto optype = msg->optype();
                std::string method = msg->method();
                if(optype == ServiceOptype::SERVICE_ONLINE) {
                    // 2. 上线请求: 找到 _method_hosts, 向其中新增一个主机地址
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto it = _method_hosts.find(msg->method());
                    if(it == _method_hosts.end()) {
                        MethodHost::ptr method_host = std::make_shared<MethodHost>();
                        method_host->appendHost(msg->host());
                        _method_hosts[method] = method_host;
                    } else {
                        it->second->appendHost(msg->host());
                    }
                } else if(optype == ServiceOptype::SERVICE_OFFLINE) {
                    // 3. 下线请求: 找到 _method_hosts, 从其中删除一个主机地址
                    std::unique_lock<std::mutex> lock(_mutex);
                    auto it = _method_hosts.find(msg->method());
                    if(it == _method_hosts.end()) {
                        return ;
                    } else {
                        it->second->removeHost(msg->host());
                        _offline_callback(msg->host());
                    }
                }
            }
        private:
            OfflineCallback _offline_callback;
            std::mutex _mutex;
            std::unordered_map<std::string, MethodHost::ptr> _method_hosts;
            Requestor::ptr _requestor;
        };
    }
}

九:🔥 客户端 - 整合封装 Client

rpc_client.hpp

#include "../common/dispatcher.hpp"
#include "rpc_caller.hpp" 
#include "requestor.hpp"
#include "rpc_registry.hpp"
#include "rpc_topic.hpp"

namespace rpc
{
    namespace client
    {
        class RegistryClient
        {
        public:
            using ptr = std::shared_ptr<RegistryClient>;
            // 构造函数传入注册中心的地址信息,用于连接注册中心
            RegistryClient(const std::string &ip, int port) 
                : _requestor(std::make_shared<Requestor>()),
                  _provider(std::make_shared<client::Provider>(_requestor)),
                  _dispatcher(std::make_shared<rpc::Dispatcher>())
            {
                // 获取响应的回调函数 获取响应
                auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(), std::placeholders::_1, std::placeholders::_2);
                _dispatcher->registerHandler<BaseMessage>(rpc::MType::RSP_SERVICE, rsp_cb);

                auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
                _client = rpc::ClientFactory::create(ip, port);
                _client->setMessageCallback(message_cb);
                _client->connect();
            }
            // 向外提供的服务注册接口 发送请求
            bool registryMethod(const std::string &method, const Address &host) 
            {
                return _provider->registryMethod(_client->connection(), method, host);
            }
        private:
            Requestor::ptr _requestor;
            client::Provider::ptr _provider;
            Dispatcher::ptr _dispatcher;
            BaseClient::ptr _client;
        };

        class DiscoveryClient
        {
        public:
            using ptr = std::shared_ptr<DiscoveryClient>;
            // 构造函数传入注册中心的地址信息,用于连接注册中心
            DiscoveryClient(const std::string &ip, int port, const Discoverer::OfflineCallback &cb) 
                : _requestor(std::make_shared<Requestor>()),
                 _discoverer(std::make_shared<client::Discoverer>(_requestor, cb)),
                 _dispatcher(std::make_shared<rpc::Dispatcher>())
            {
                // 获取响应的回调函数 获取响应
                auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(), std::placeholders::_1, std::placeholders::_2);
                _dispatcher->registerHandler<BaseMessage>(MType::RSP_SERVICE, rsp_cb);

                auto req_cb = std::bind(&client::Discoverer::onServiceRequest, _discoverer.get(), std::placeholders::_1, std::placeholders::_2);
                _dispatcher->registerHandler<rpc::ServiceRequest>(MType::REQ_SERVICE, req_cb);
 
                auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
                _client = rpc::ClientFactory::create(ip, port);
                _client->setMessageCallback(message_cb);
                _client->connect();
            }
            
            // 向外提供的服务发现接口
            bool serviceDiscovery(const std::string &method, Address &host) 
            {
                return _discoverer->serviceDiscovery(_client->connection(), method, host);
            }
        private:
            Requestor::ptr _requestor;
            client::Discoverer::ptr _discoverer;
            Dispatcher::ptr _dispatcher;
            BaseClient::ptr _client;
        };

        class RpcClient
        {
        public:
            using ptr = std::shared_ptr<RpcClient>;
            // enableDiscovery 表示是否启用服务发现功能, 也决定了传入的地址信息是注册中心的地址,还是服务提供者的地址
            RpcClient(bool enableDiscovery, const std::string &ip, int port) 
                : _enableDiscovery(enableDiscovery),
                _requestor(std::make_shared<Requestor>()),
                _dispatcher(std::make_shared<Dispatcher>()),
                _caller(std::make_shared<RpcCaller>(_requestor))
            {
                // 针对rpc请求后的响应进行的回调处理
                auto rsp_cb = std::bind(&client::Requestor::onResponse, _requestor.get(), std::placeholders::_1, std::placeholders::_2);
                _dispatcher->registerHandler<BaseMessage>(MType::RSP_RPC, rsp_cb);

                // 如果启用了服务发现,地址信息是注册中心的地址,是服务发现客户端需要连接的地址,则通过地址信息实例化 discovery_client
                // 如果没有启用服务发现,则地址信息是服务提供者的地址,则直接实例化好rpc_client
                if(_enableDiscovery) {
                    auto offline_cb = std::bind(&RpcClient::delClient, this,  std::placeholders::_1);
                    _discovery_client = std::make_shared<DiscoveryClient>(ip, port, offline_cb);
                } else {
                    _rpc_client = ClientFactory::create(ip, port);
                    auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
                    _rpc_client->setMessageCallback(message_cb);
                    _rpc_client->connect();
                }
            }
            // 同步
            bool call(const std::string &method, const Json::Value &params, Json::Value &result)
            {
                // 获取服务提供者 1. 服务发现   2. 固定的服务发现者
                BaseClient::ptr client = getClient(method);
                if(client.get() == nullptr) return false;
                // 3. 通过客户端连接,发送rpc请求
                return _caller->call(client->connection(), method, params, result);
            }
            // 异步
            bool call( const std::string &method, const Json::Value &params, RpcCaller::JsonAsyncResponse &result)
            {
                BaseClient::ptr client = getClient(method);
                if(client.get() == nullptr) return false;
                // 3. 通过客户端连接,发送rpc请求
                return _caller->call(client->connection(), method, params, result);
            }
            // 回调
            bool call(const std::string &method, const Json::Value &params, const RpcCaller::JsonResponseCallback &cb)
            {
                BaseClient::ptr client = getClient(method);
                if(client.get() == nullptr) return false;
                // 3. 通过客户端连接,发送rpc请求
                return _caller->call(client->connection(), method, params, cb);
            }
        private:
            BaseClient::ptr newClient(const Address &host)
            {
                auto client = ClientFactory::create(host.first, host.second);
                auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
                client->setMessageCallback(message_cb);
                client->connect();
                putClient(host, client);
                return client;
            }

            BaseClient::ptr getClient(const Address &host)
            {
                std::unique_lock<std::mutex> lock(_mutex);
                auto it = _rpc_clients.find(host);
                if(it == _rpc_clients.end()) return BaseClient::ptr();
                return it->second;
            }

            BaseClient::ptr getClient(const std::string &method)
            {
                BaseClient::ptr client;
                if(_enableDiscovery) {
                    // 1. 通过服务发现获取服务提供者地址信息
                    Address host;
                    bool ret = _discovery_client->serviceDiscovery(method, host);
                    if(ret == false) {
                        LOG(LogLevel::INFO) << "当前 " << method << " 服务, 没有找到服务提供者! ";
                        return BaseClient::ptr();
                    }
                    // 2. 查看服务提供者是否已有实例化客户端,有则直接使用,没有则创建
                    client = getClient(host);
                    if(client.get() == nullptr) {   // 没有找到已经实例化的客户端,则创建
                        client = newClient(host);
                    }
                } else {
                    client = _rpc_client;
                }
                return client;
            }

            void putClient(const Address &host, BaseClient::ptr &client)
            {
                std::unique_lock<std::mutex> lock(_mutex);
                _rpc_clients.insert(std::make_pair(host, client));
            }

            void delClient(const Address &host)
            {
                std::unique_lock<std::mutex> lock(_mutex);
                _rpc_clients.erase(host);
            }

            struct AddressHash
            {
                size_t operator()(const Address& host) const
                {
                    std::string addr = host.first + std::to_string(host.second);
                    return std::hash<std::string>{}(addr);      // 创建一个临时对象 调用仿函数
                }
            };
        private:
            bool _enableDiscovery;
            Requestor::ptr _requestor;
            DiscoveryClient::ptr _discovery_client;
            RpcCaller::ptr _caller;
            Dispatcher::ptr _dispatcher;
            BaseClient::ptr _rpc_client;        // 用于未启用服务发现
            std::mutex _mutex;
            // <"127.0.0.1", client1>
            std::unordered_map<Address, BaseClient::ptr, AddressHash> _rpc_clients;     // 用于服务发现的客户端连接池
        };

        class TopicClient
        {
        public:
            using ptr = std::shared_ptr<TopicClient>;
            // 构造函数传入注册中心的地址信息,用于连接注册中心
            TopicClient(const std::string &ip, int port) 
                : _requestor(std::make_shared<Requestor>()),
                  _dispatcher(std::make_shared<rpc::Dispatcher>()),
                  _topic_manager(std::make_shared<TopicManager>(_requestor))
            {
                // 获取响应的回调函数 获取响应
                auto rsp_cb = std::bind(&Requestor::onResponse, _requestor.get(), std::placeholders::_1, std::placeholders::_2);
                _dispatcher->registerHandler<BaseMessage>(rpc::MType::RSP_TOPIC, rsp_cb);

                auto msg_cb = std::bind(&TopicManager::onPublish, _topic_manager.get(), std::placeholders::_1, std::placeholders::_2);
                _dispatcher->registerHandler<TopicRequest>(rpc::MType::REQ_TOPIC, msg_cb);

                auto message_cb = std::bind(&Dispatcher::onMessage, _dispatcher.get(), std::placeholders::_1, std::placeholders::_2);
                _rpc_client = rpc::ClientFactory::create(ip, port);
                _rpc_client->setMessageCallback(message_cb);
                _rpc_client->connect();
            }

            bool create(const std::string &key)
            {
                return _topic_manager->create(_rpc_client->connection(), key);
            }

            bool remove(const std::string &key)
            {
                return _topic_manager->remove(_rpc_client->connection(), key);
            }

            bool subscribe(const std::string &key, const TopicManager::SubCallback &cb)
            {
                return _topic_manager->subscribe(_rpc_client->connection(), key, cb);
            }

            bool cancel(const std::string &key)
            {
                return _topic_manager->cancel(_rpc_client->connection(), key);
            }

            bool publish(const std::string &key, const std::string &msg)
            {
                return _topic_manager->publish(_rpc_client->connection(), key, msg);
            }

            void shutdown()
            {
                _rpc_client->shutdown();
            }
        private:
            Requestor::ptr _requestor;
            Dispatcher::ptr _dispatcher;
            TopicManager::ptr _topic_manager;
            BaseClient::ptr _rpc_client;
        };
    }
}

十:🔥 整合封装的使⽤代码样例

🦋 简单 Rpc 使⽤

test_client.cpp

#include "../../common/detail.hpp"
#include "../../client/rpc_client.hpp"

void callback(const Json::Value &result)
{
   rpc::LOG(rpc::LogLevel::INFO) << "callback result: " << result.asInt();
}

int main()
{
   rpc::client::RpcClient client(false, "127.0.0.1", 9090);

   Json::Value params, result;
   params["num1"] = 11;
   params["num2"] = 22;
   bool ret = client.call("Add", params, result);    
   if(ret != false) {
      std::cout << "result: " << result.asInt() << std::endl;
   }

   // 异步
   rpc::client::RpcCaller::JsonAsyncResponse res_future;
   params["num1"] = 33;
   params["num2"] = 44;
   ret = client.call("Add", params, res_future);    
   if(ret != false) {
      result = res_future.get();
      std::cout << "result: " << result.asInt() << std::endl;
   }


   // 异步
   params["num1"] = 55;
   params["num2"] = 66;
   ret = client.call("Add", params, callback);
   std::cout << "----------------------------------------------\n" << std::endl;

   sleep(1000);

   return 0;
}

test_server.cpp

#include "../../common/detail.hpp"
#include "../../server/rpc_server.hpp"


void Add(const Json::Value &req, Json::Value &rsp)
{
   int num1 = req["num1"].asInt();
   int num2 = req["num2"].asInt();
   rsp = num1 + num2;
}

int main()
{
   auto router = std::make_shared<rpc::server::RpcRouter>();
   std::unique_ptr<rpc::server::SDescribeFactory> desc_factory(new rpc::server::SDescribeFactory());
   desc_factory->setMethodName("Add");
   desc_factory->setParamsDesc("num1", rpc::server::VType::INTEGRAL);
   desc_factory->setParamsDesc("num2", rpc::server::VType::INTEGRAL);
   desc_factory->setReturnType(rpc::server::VType::INTEGRAL);
   desc_factory->setCallback(Add);
   
   rpc::server::Rpcserver server(rpc::Address("127.0.0.1", 9090));
   server.registerMethod(desc_factory->build());
   server.start();

   return 0;
}

Makefile

CFLAG= -g -std=c++17 -I ../../../build/release-install-cpp11/include/
LFLAG= -L ../../../build/release-install-cpp11/lib -ljsoncpp -lmuduo_net -lmuduo_base -lpthread
.PHONY:all
all: server client
server:test_server.cpp
	g++ -o $@ $^ $(CFLAG) $(LFLAG)
client:test_client.cpp
	g++ -o $@ $^ $(CFLAG) $(LFLAG)
.PHONY:clean
clean:
	rm -f server client

🦋 基于服务注册发现的 Rpc 调⽤

rpc_client.cpp

#include "../../common/detail.hpp"
#include "../../client/rpc_client.hpp"

void callback(const Json::Value &result)
{
   rpc::LOG(rpc::LogLevel::INFO) << "callback result: " << result.asInt();
}

// 第二次测试的
int main()
{
   rpc::client::RpcClient client(true, "127.0.0.1", 8080);

   Json::Value params, result;
   params["num1"] = 11;
   params["num2"] = 22;
   bool ret = client.call("Add", params, result);    
   if(ret != false) {
      std::cout << "result: " << result.asInt() << std::endl;
   }

   // 异步
   rpc::client::RpcCaller::JsonAsyncResponse res_future;
   params["num1"] = 33;
   params["num2"] = 44;
   ret = client.call("Add", params, res_future);    
   if(ret != false) {
      result = res_future.get();
      std::cout << "result: " << result.asInt() << std::endl;
   }


   // 异步
   params["num1"] = 55;
   params["num2"] = 66;
   ret = client.call("Add", params, callback);
   std::cout << "----------------------------------------------\n" << std::endl;

   sleep(1000);

   return 0;
}

rpc_server.cpp

#include "../../common/detail.hpp"
#include "../../server/rpc_server.hpp"

void Add(const Json::Value &req, Json::Value &rsp)
{
   int num1 = req["num1"].asInt();
   int num2 = req["num2"].asInt();
   rsp = num1 + num2;
}

int main()
{
    auto router = std::make_shared<rpc::server::RpcRouter>();
    std::unique_ptr<rpc::server::SDescribeFactory> desc_factory(new rpc::server::SDescribeFactory());
    desc_factory->setMethodName("Add");
    desc_factory->setParamsDesc("num1", rpc::server::VType::INTEGRAL);
    desc_factory->setParamsDesc("num2", rpc::server::VType::INTEGRAL);
    desc_factory->setReturnType(rpc::server::VType::INTEGRAL);
    desc_factory->setCallback(Add);
   
    rpc::server::Rpcserver server(rpc::Address("127.0.0.1", 9090), true, rpc::Address("127.0.0.1", 8080));
    server.registerMethod(desc_factory->build());
    server.start();

    return 0;
}

registry_server.cpp

#include "../../common/detail.hpp"
#include "../../server/rpc_server.hpp"

int main()
{
    rpc::server::RegistryServer reg_server(8080);

    reg_server.start();

    return 0;
}

Makefile

CFLAG= -g -std=c++17 -I ../../../build/release-install-cpp11/include/
LFLAG= -L ../../../build/release-install-cpp11/lib -ljsoncpp -lmuduo_net -lmuduo_base -lpthread
.PHONY:all
all: reg_server rpc_server rpc_client
reg_server:registry_server.cpp
	g++ -o $@ $^ $(CFLAG) $(LFLAG)
rpc_server:rpc_server.cpp
	g++ -o $@ $^ $(CFLAG) $(LFLAG)
rpc_client:rpc_client.cpp
	g++ -o $@ $^ $(CFLAG) $(LFLAG)
.PHONY:clean
clean:
	rm -f reg_server rpc_server rpc_client

🦋 基于⼴播的发布订阅

publish_client.cpp

#include "../../client/rpc_client.hpp"

int main()
{
    // 1. 实例化客户端对象
    rpc::client::TopicClient::ptr client = std::make_shared<rpc::client::TopicClient>("127.0.0.1", 7070);
    // 2. 创建主题
    bool ret = client->create("hello");
    if(ret == false) {
        rpc::LOG(rpc::LogLevel::ERROR) << "创建主题失败! ";
    }
    // 3. 向主题发布消息
    for(int i = 0; i < 10; i++) {
        client->publish("hello", "Hello World" + std::to_string(i));
    }

    client->shutdown();
    
    return 0;
}

server.cpp

#include "../../server/rpc_server.hpp"

int main()
{
    auto server = std::make_shared<rpc::server::TopicServer>(7070);

    server->start();

    return  0;
}

subscribe_client.cpp

#include "../../client/rpc_client.hpp"

void callback(const std::string &key, const std::string &msg)
{
    rpc::LOG(rpc::LogLevel::INFO) << key << " 主题收到推送过来的消息: " << msg;
}

int main()
{
    // 1. 实例化客户端对象
    rpc::client::TopicClient::ptr client = std::make_shared<rpc::client::TopicClient>("127.0.0.1", 7070);
    // 2. 创建主题
    bool ret = client->create("hello");
    if(ret == false) {
        rpc::LOG(rpc::LogLevel::ERROR) << "创建主题失败! ";
    }
    // 3. 订阅主题
    ret = client->subscribe("hello", callback);
    // 4. 等待 -> 退出
    std::this_thread::sleep_for(std::chrono::seconds(10));
    client->shutdown();
    
    return 0;
}

Makefile

CFLAG= -g -std=c++17 -I ../../../build/release-install-cpp11/include/
LFLAG= -L ../../../build/release-install-cpp11/lib -ljsoncpp -lmuduo_net -lmuduo_base -lpthread
.PHONY:all
all: server publish_client subscribe_client
server : server.cpp
	g++ -o $@ $^ $(CFLAG) $(LFLAG)
publish_client : publish_client.cpp
	g++ -o $@ $^ $(CFLAG) $(LFLAG)
subscribe_client : subscribe_client.cpp
	g++ -o $@ $^ $(CFLAG) $(LFLAG)
.PHONY:clean
clean:
	rm -f server publish_client subscribe_client

十一:🔥 共勉

😋 以上就是我对 【C++项目】从零实现RPC框架「四」:业务层实现与项目使用 的理解, 觉得这篇博客对你有帮助的,可以点赞收藏关注支持一波~ 😉
在这里插入图片描述

Logo

助力广东及东莞地区开发者,代码托管、在线学习与竞赛、技术交流与分享、资源共享、职业发展,成为松山湖开发者首选的工作与学习平台

更多推荐