Skip to content

Instantly share code, notes, and snippets.

@sqbing
Created September 24, 2014 06:03
Show Gist options
  • Save sqbing/076c8163e6d8ef7d4175 to your computer and use it in GitHub Desktop.
Save sqbing/076c8163e6d8ef7d4175 to your computer and use it in GitHub Desktop.
zmq example
#include <iostream>
#include <boost/program_options.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/any.hpp>
#include <inttypes.h>
#include <signal.h>
#ifdef __cplusplus
extern "C"{
#endif
#include <uuid/uuid.h>
#ifdef __cplusplus
}
#endif
#include "zmq.hpp"
#define GRID_MESSAGE_PREFIX 0x22
#ifndef uchar
#define uchar uint8_t
#endif
#ifndef DELIF
#define DELIF(a) do{if(a){delete (a); (a) = NULL;}}while(0)
#endif
static int router_port = 4150;
static const char *router_port_str = "4150";
static int publish_port = 4151;
static const char *publish_port_str = "4151";
static time_t current_time = time(0);
enum {
ROLE_MASTER,
ROLE_WORKER
};
enum {
MSG_TYPE_HANDSHAKE = 0,
MSG_TYPE_HANDSHAKE_OK,
MSG_TYPE_HANDSHAKE_FAIL,
MSG_TYPE_HEARTBEAT,
MSG_TYPE_EXIT
};
const char *MessageTypeString[] = {
"MESSAGE_TYPE_HANDSHAKE",
"MESSAGE_TYPE_HANDSHAKE_OK",
"MESSAGE_TYPE_HANDSHAKE_FAIL",
"MESSAGE_TYPE_HEARTBEAT",
"MESSAGE_TYPE_EXIT"
};
int process_role = ROLE_MASTER;
std::string local_ip = "";
std::string remote_ip = "";
std::string remote_router_port_str = "";
typedef struct GridContext GridContext;
typedef struct GridInstanceContext GridInstanceContext;
typedef struct GridMessageContext{
zmq::socket_t *_sock;
boost::any _data;
}GridMessageContext;
struct GridInstanceContext{
GridInstanceContext():_last_heartbeat_time(0){
}
~GridInstanceContext(){
CloseAllSockets();
}
void CloseAllSockets(){
std::cout<<"info: closing all sockets for "<<_id<<std::endl;
if(_router_sock){
_router_sock->close();
_router_sock.reset();
}
if(_publish_sock){
_publish_sock->close();
_publish_sock.reset();
}
if(_subscrib_sock){
_subscrib_sock->close();
_subscrib_sock.reset();
}
if(_request_sock){
_request_sock->close();
_request_sock.reset();
}
}
time_t _last_heartbeat_time;
std::string _id;
std::string _ip;
std::string _router_port;
std::string _publish_port;
boost::shared_ptr<zmq::socket_t> _router_sock;
boost::shared_ptr<zmq::socket_t> _publish_sock;
boost::shared_ptr<zmq::socket_t> _subscrib_sock;
boost::shared_ptr<zmq::socket_t> _request_sock;
void add_sub(std::string &address){
_subscrib_sock->connect(address.c_str());
}
void del_sub(std::string &address){
_subscrib_sock->disconnect(address.c_str());
}
void get_jsonified_info(std::string &info){
boost::property_tree::ptree pt_root;
pt_root.put("id", _id);
pt_root.put("ip", _ip);
pt_root.put("publish_port", _publish_port);
pt_root.put("router_port", _router_port);
std::stringstream ss;
boost::property_tree::write_json(ss, pt_root);
info = ss.str();
}
};
typedef struct PollinContext{
boost::any _data;
zmq::socket_t *_sock;
boost::function<void(GridMessageContext &)> _on_read;
}PollinContext;
struct GridContext{
GridContext():_sock_ctx(1){
std::cout<<"info: new GridContext"<<std::endl;
}
~GridContext(){
_all_instances.clear();
_pollins_vector.clear();
_ctx.CloseAllSockets();
std::cout<<"info: destroying zmq context"<<std::endl;
_sock_ctx.close();
}
// 本实例信息
GridInstanceContext _ctx;
// pollin句柄列表
std::vector<boost::shared_ptr<PollinContext> >
_pollins_vector;
// 实例列表
std::map<std::string, boost::shared_ptr<GridInstanceContext> >
_all_instances;
zmq::context_t _sock_ctx;
// 增加pollin句柄
void add_pollin(boost::shared_ptr<PollinContext> pollin_ctx){
_pollins_vector.insert(_pollins_vector.begin(), pollin_ctx);
}
// 删除pollin句柄
void del_pollin(boost::shared_ptr<PollinContext> pollin_ctx){
for(int i = 0; i < _pollins_vector.size(); i++){
if(_pollins_vector[i]->_sock == pollin_ctx->_sock){
_pollins_vector.erase(_pollins_vector.begin()+i);
}
}
}
void poll(){
current_time = time(0);
if(_pollins_vector.empty()){
return ;
}
zmq::pollitem_t *iterms = NULL, *piterm = NULL;
std::vector<boost::shared_ptr<PollinContext> > temp_vector(_pollins_vector);
// 注意释放!
try{
iterms = new zmq::pollitem_t[temp_vector.size()];
}catch(...){
std::cout<<"alloc poll iterms failed"<<std::endl;
return ;
}
for(int i = 0; i < temp_vector.size(); i++){
piterm = &iterms[i];
boost::shared_ptr<PollinContext> pollin_ctx = temp_vector[i];
piterm->socket = *pollin_ctx->_sock;
piterm->fd = 0;
piterm->events = ZMQ_POLLIN;
piterm->revents = 0;
}
try{
int poll_ret = zmq::poll(iterms, temp_vector.size(), 1000);
std::cout<<"info: poll return "<<poll_ret<<std::endl;
if(poll_ret == 0){
if(iterms){
delete[] iterms;
iterms = NULL;
}
return ;
}
for(int i = 0; i < temp_vector.size(); i++){
boost::shared_ptr<PollinContext> pollin_ctx = temp_vector[i];
piterm = &iterms[i];
if(piterm->revents & ZMQ_POLLIN){
std::cout<<"info: dispatching message from socket "<<pollin_ctx->_sock<<std::endl;
// 分发消息
GridMessageContext msg_ctx;
msg_ctx._sock = pollin_ctx->_sock;
msg_ctx._data = pollin_ctx->_data;
pollin_ctx->_on_read(msg_ctx);
}
}
}catch(std::exception &e){
std::cout<<"failed to poll sockets"<<std::endl;
std::cout<<e.what()<<std::endl;
if(iterms){
delete[] iterms;
iterms = NULL;
}
return ;
}
if(iterms){
delete[] iterms;
iterms = NULL;
}
}
};
static GridContext grid_ctx;
typedef struct HandshakeContext{
zmq::socket_t *_sock;
char *_address;
size_t _address_len;
}HandshakeContext;
static int exit_process = 0;
void int_handler(int signal){
exit_process = 1;
}
void make_tcp_address(std::string &ip, std::string &port, std::string &out){
out = "tcp://";
out += ip;
out += ":";
out += port;
}
/**
* @brief 发送握手消息
*
* @param handshake_ctx
*/
int send_handshake_init(HandshakeContext &handshake_ctx){
std::string content;
grid_ctx._ctx.get_jsonified_info(content);
uint32_t content_len = content.size()+1;
zmq::message_t msg(6+content_len);
uchar *data = (uchar *)msg.data();
data[0] = GRID_MESSAGE_PREFIX;
data[1] = MSG_TYPE_HANDSHAKE;
data[2] = (uchar)(content_len >> 24);
data[3] = (uchar)(content_len >> 16);
data[4] = (uchar)(content_len >> 8);
data[5] = (uchar)(content_len);
memcpy(data+6, content.c_str(), content_len);
try{
handshake_ctx._sock->send(msg);
}catch(std::exception &e){
std::cout<<"error: sending handshake failed "<<e.what()<<std::endl;
return -1;
}
return 0;
}
/**
* @brief 发送握手成功消息
*
* @param handshake_ctx
*/
int send_handshake_ok(HandshakeContext &handshake_ctx){
zmq::message_t msg1(handshake_ctx._address_len);
memcpy(msg1.data(), handshake_ctx._address, handshake_ctx._address_len);
zmq::message_t msg2;
std::string content;
grid_ctx._ctx.get_jsonified_info(content);
uint32_t content_len = content.size()+1;
zmq::message_t msg3(6+content_len);
uchar *data = (uchar *)msg3.data();
data[0] = GRID_MESSAGE_PREFIX;
data[1] = MSG_TYPE_HANDSHAKE_OK;
data[2] = (uchar)(content_len >> 24);
data[3] = (uchar)(content_len >> 16);
data[4] = (uchar)(content_len >> 8);
data[5] = (uchar)(content_len);
memcpy(data+6, content.c_str(), content_len);
try{
handshake_ctx._sock->send(msg1, ZMQ_SNDMORE);
handshake_ctx._sock->send(msg2, ZMQ_SNDMORE);
handshake_ctx._sock->send(msg3);
}catch(std::exception &e){
std::cout<<"error: sending handshake_ok failed "<<e.what()<<std::endl;
return -1;
}
return 0;
}
/**
* @brief 发送握手失败消息
*
* @param handshake_ctx
*/
int send_handshake_fail(HandshakeContext &handshake_ctx){
zmq::message_t msg1(handshake_ctx._address_len);
memcpy(msg1.data(), handshake_ctx._address, handshake_ctx._address_len);
zmq::message_t msg2;
std::string content;
grid_ctx._ctx.get_jsonified_info(content);
uint32_t content_len = content.size()+1;
zmq::message_t msg3(6+content_len);
uchar *data = (uchar *)msg3.data();
data[0] = GRID_MESSAGE_PREFIX;
data[1] = MSG_TYPE_HANDSHAKE_FAIL;
data[2] = (uchar)(content_len >> 24);
data[3] = (uchar)(content_len >> 16);
data[4] = (uchar)(content_len >> 8);
data[5] = (uchar)(content_len);
memcpy(data+6, content.c_str(), content_len);
try{
handshake_ctx._sock->send(msg1, ZMQ_SNDMORE);
handshake_ctx._sock->send(msg2, ZMQ_SNDMORE);
handshake_ctx._sock->send(msg3);
}catch(std::exception &e){
std::cout<<"error: sending handshake_ok failed "<<e.what()<<std::endl;
return -1;
}
return 0;
}
void handle_handshake_response(GridMessageContext &msg_ctx){
std::cout<<"info: handle_handshake_response()"<<std::endl;
// 处理handshake_ok和handshake_fail
if(msg_ctx._data.empty()
|| msg_ctx._data.type() != typeid(HandshakeContext *)){
std::cout<<"error: message context invalid"<<std::endl;
return ;
}
HandshakeContext *handshake_ctx =
boost::any_cast<HandshakeContext *>(msg_ctx._data);
try{
// 删除pollin信息
boost::shared_ptr<PollinContext> pollin_ctx(new PollinContext);
pollin_ctx->_sock = msg_ctx._sock;
grid_ctx.del_pollin(pollin_ctx);
std::cout<<"info: pollin deleted "<<pollin_ctx->_sock<<std::endl;
}catch(...){
std::cout<<"error: failed to delete pollin"<<std::endl;
DELIF(handshake_ctx);
return ;
}
// 接收消息
zmq::message_t msg;
zmq::socket_t *sock = msg_ctx._sock;
try{
sock->recv(&msg);
}catch(std::exception &e){
std::cout<<"error: failed to recv handshake message "<<e.what()<<std::endl;
DELIF(handshake_ctx);
return ;
}
// 解析消息
uchar *data = (uchar *)msg.data();
uchar prefix = data[0];
uchar type = data[1];
if(prefix != GRID_MESSAGE_PREFIX
|| (type != MSG_TYPE_HANDSHAKE_OK
&& type != MSG_TYPE_HANDSHAKE_FAIL)){
std::cout<<"error: failed to parse handshake message, format invalid";
DELIF(handshake_ctx);
return ;
}
// 解析instance信息
uint32_t content_len = (data[2] << 24)
| (data[3] << 16)
| (data[4] << 8)
| (data[5]);
std::string content;
try{
char *content_wrapper = new char[content_len+1];
memset(content_wrapper, 0, content_len+1);
memcpy(content_wrapper, data+6, content_len);
content = content_wrapper;
delete[] content_wrapper;
}catch(...){
std::cout<<"error: failed to alloc content wrapper"<<std::endl;
DELIF(handshake_ctx);
return ;
}
std::stringstream ss(content);
boost::property_tree::ptree pt_root;
try{
boost::property_tree::read_json(ss, pt_root);
}catch(std::exception &e){
std::cout<<"error: failed to parse content "<<e.what()<<std::endl;
DELIF(handshake_ctx);
return ;
}
boost::shared_ptr<GridInstanceContext> instance_ctx;
try{
instance_ctx.reset(new GridInstanceContext);
}catch(std::exception &e){
std::cout<<"error: failed to alloc grid instance context "<<e.what()<<std::endl;
DELIF(handshake_ctx);
return ;
}
try{
instance_ctx->_id = pt_root.get<std::string>("id");
instance_ctx->_ip = pt_root.get<std::string>("ip");
instance_ctx->_publish_port = pt_root.get<std::string>("publish_port");
instance_ctx->_router_port = pt_root.get<std::string>("router_port");
}catch(std::exception &e){
std::cout<<"error: failed to parse content "<<e.what()<<std::endl;
DELIF(handshake_ctx);
return ;
}
std::cout<<"info: ["<<MessageTypeString[type]<<"]"
<<" id: "<<instance_ctx->_id
<<" ip: "<<instance_ctx->_ip
<<" publish port: "<<instance_ctx->_publish_port
<<" router port: "<<instance_ctx->_router_port<<std::endl;
grid_ctx._all_instances[instance_ctx->_id] = instance_ctx;
// 删除HandshakeContext
DELIF(handshake_ctx);
// subcrib对方publish接口
std::string remote_publish_address;
make_tcp_address(instance_ctx->_ip, instance_ctx->_publish_port, remote_publish_address);
try{
grid_ctx._ctx._subscrib_sock->connect(remote_publish_address.c_str());
}catch(std::exception &e){
std::cout<<"error: failed to connect to remote publish address ["<<remote_publish_address<<"] "
<<e.what()<<std::endl;
return ;
}
std::cout<<"info: succeeded to subscrib "<<remote_publish_address<<std::endl;
return ;
}
void handle_handshake_request(GridMessageContext &msg_ctx){
std::cout<<"info: handle_handshake_request()"<<std::endl;
// 处理handshake消息
HandshakeContext handshake_ctx;
zmq::message_t msg1, msg2, msg3;
try{
msg_ctx._sock->recv(&msg1);// address
msg_ctx._sock->recv(&msg2);// empty
msg_ctx._sock->recv(&msg3);// content
}catch(std::exception &e){
std::cout<<"error: failed to recv handshake message "<<e.what()<<std::endl;
send_handshake_fail(handshake_ctx);
return;
}
handshake_ctx._address = static_cast<char *>(msg1.data());
handshake_ctx._address_len = msg1.size();
handshake_ctx._sock = msg_ctx._sock;
uchar *data = static_cast<uchar *>(msg3.data());
if(data[0] != GRID_MESSAGE_PREFIX
|| data[1] != MSG_TYPE_HANDSHAKE){
std::cout<<"error: invalid message";
send_handshake_fail(handshake_ctx);
return ;
}
uint32_t content_len = (data[2] << 24)
| (data[3] << 16)
| (data[4] << 8)
| (data[5]);
std::cout<<"info: content length is "<<content_len<<std::endl;
uchar *content = data+6;
boost::property_tree::ptree pt_root;
uchar *content_wrapper = NULL;
try{
content_wrapper = new uchar[content_len+1];
memset(content_wrapper, 0, content_len+1);
memcpy(content_wrapper, content, content_len);
std::string content_str = (char *)content_wrapper;
std::cout<<"info: content ["<<content_str<<"]"<<std::endl;
std::stringstream ss(content_str);
boost::property_tree::read_json(ss, pt_root);
}catch(std::exception &e){
if(content_wrapper){
delete[] content_wrapper;
content_wrapper = NULL;
}
std::cout<<"error: "<<e.what()<<std::endl;
send_handshake_fail(handshake_ctx);
return ;
}
if(content_wrapper){
delete[] content_wrapper;
content_wrapper = NULL;
}
boost::shared_ptr<GridInstanceContext> instance_ctx;
try{
instance_ctx.reset(new GridInstanceContext);
}catch(...){
send_handshake_fail(handshake_ctx);
return ;
}
instance_ctx->_ip = pt_root.get<std::string>("ip");
instance_ctx->_id = pt_root.get<std::string>("id");
instance_ctx->_publish_port = pt_root.get<std::string>("publish_port");
instance_ctx->_router_port = pt_root.get<std::string>("router_port");
std::cout<<"info: id "<<instance_ctx->_id<<std::endl;
std::cout<<"info: ip "<<instance_ctx->_ip<<std::endl;
std::cout<<"info: publish port "<<instance_ctx->_publish_port<<std::endl;
std::cout<<"info: router port "<<instance_ctx->_router_port<<std::endl;
// 在实例列表中检查此实例是否已经存在
std::map<std::string, boost::shared_ptr<GridInstanceContext> >::iterator iter;
iter = grid_ctx._all_instances.find(instance_ctx->_id);
if(iter != grid_ctx._all_instances.end()){
boost::shared_ptr<GridInstanceContext> old_instance_ctx = iter->second;
// 出现同ID实例
// 若同ID实例的IP也相同,则更新已有实例的端口
if(old_instance_ctx->_ip.compare(instance_ctx->_ip) == 0){
if(old_instance_ctx->_publish_port.compare(instance_ctx->_publish_port)){
// 端口不同,则重新订阅新端口
std::string remote_publish_address;
make_tcp_address(old_instance_ctx->_ip, old_instance_ctx->_publish_port, remote_publish_address);
try{
grid_ctx._ctx._subscrib_sock->disconnect(remote_publish_address.c_str());
}catch(std::exception &e){
std::cout<<"error: failed to disconnect old publish address ["<<remote_publish_address<<"] "
<<e.what()<<std::endl;
}
old_instance_ctx->_publish_port = instance_ctx->_publish_port;
old_instance_ctx->_router_port = instance_ctx->_router_port;
}
}
// 若同ID实例的IP不同,则返回握手失败
else{
send_handshake_fail(handshake_ctx);
return ;
}
}
grid_ctx._all_instances[instance_ctx->_id] = instance_ctx;
boost::property_tree::ptree pt_ret;
pt_ret.put("id", grid_ctx._ctx._id);
pt_ret.put("ip", grid_ctx._ctx._ip);
pt_ret.put("router_port", grid_ctx._ctx._router_port);
pt_ret.put("publish_port", grid_ctx._ctx._publish_port);
// subscrib该实例的publish接口
std::string remote_publish_address;
make_tcp_address(instance_ctx->_ip, instance_ctx->_publish_port, remote_publish_address);
try{
grid_ctx._ctx._subscrib_sock->connect(remote_publish_address.c_str());
}catch(std::exception &e){
std::cout<<"error: failed to connect to publish address ["<<remote_publish_address<<"] "<<e.what()<<std::endl;
send_handshake_fail(handshake_ctx);
return ;
}
std::cout<<"info: succeeded to subscrib "<<remote_publish_address<<std::endl;
send_handshake_ok(handshake_ctx);
return ;
}
void handle_instance_exit(GridMessageContext &msg_ctx){
zmq::message_t *msg = boost::any_cast<zmq::message_t*>(msg_ctx._data);
uchar *data = (uchar *)msg->data();
data += 2;
uuid_t u;
memcpy(u, data, 16);
char us[37];
uuid_unparse_lower(u, us);
us[36] = '\0';
std::string id = us;
std::map<std::string, boost::shared_ptr<GridInstanceContext> >::iterator iter = grid_ctx._all_instances.begin();
iter = grid_ctx._all_instances.find(id);
if(iter == grid_ctx._all_instances.end()){
std::cout<<"error: failed to handle ["<<MessageTypeString[MSG_TYPE_EXIT]<<"] message, instance ["<<id<<"] not found"<<std::endl;
return ;
}
std::cout<<"info: remove instance "<<iter->second->_id<<" from instance list"<<std::endl;
grid_ctx._all_instances.erase(iter);
return ;
}
void handle_heartbeat(GridMessageContext &msg_ctx){
zmq::message_t *msg = boost::any_cast<zmq::message_t*>(msg_ctx._data);
uchar *data = (uchar *)msg->data();
data += 2;
uuid_t u;
memcpy(u, data, 16);
char us[37];
uuid_unparse_lower(u, us);
us[36] = '\0';
std::string id = us;
std::map<std::string, boost::shared_ptr<GridInstanceContext> >::iterator iter = grid_ctx._all_instances.begin();
iter = grid_ctx._all_instances.find(id);
if(iter == grid_ctx._all_instances.end()){
std::cout<<"error: failed to handle heartbeat message, instance ["<<id<<"] not found"<<std::endl;
return ;
}
iter->second->_last_heartbeat_time = current_time;
std::cout<<"info: updated heartbeat for ["<<id<<"]"<<std::endl;
return ;
}
void handle_subscrib_message(GridMessageContext &msg_ctx){
std::cout<<"info: handle_subscrib_message()"<<std::endl;
zmq::message_t msg;
zmq::socket_t *sock = msg_ctx._sock;
try{
sock->recv(&msg);
}catch(std::exception &e){
std::cout<<"error: failed to recv from socket "<<e.what()<<std::endl;
return ;
}
if(msg.size() < 2){
std::cout<<"error: failed to recv from socket, message too short"<<std::endl;
return ;
}
uchar *data = (uchar *)msg.data();
uchar prefix = data[0];
uchar type = data[1];
if(prefix != GRID_MESSAGE_PREFIX){
std::cout<<"error: failed to recv from socket, message prefix not found"<<std::endl;
return ;
}
switch(type){
case MSG_TYPE_HEARTBEAT:
msg_ctx._data = &msg;
handle_heartbeat(msg_ctx);
break;
case MSG_TYPE_EXIT:
msg_ctx._data = &msg;
handle_instance_exit(msg_ctx);
break;
default:
std::cout<<"error: failed to recv from socket, message type ["<<type<<"] handler incompleted"<<std::endl;
break;
}
return ;
}
int init(){
// 生成uuid
uuid_t u;
char us[37];
uuid_generate(u);
memset(us, 0, 37);
uuid_unparse_lower(u, us);
GridInstanceContext &instance_ctx = grid_ctx._ctx;
instance_ctx._id = us;
instance_ctx._ip = local_ip;
instance_ctx._router_port = "4150";
instance_ctx._publish_port = "4151";
// 创建sockets
try{
instance_ctx._router_sock =
boost::shared_ptr<zmq::socket_t>(new zmq::socket_t(grid_ctx._sock_ctx, ZMQ_ROUTER));
instance_ctx._publish_sock =
boost::shared_ptr<zmq::socket_t>(new zmq::socket_t(grid_ctx._sock_ctx, ZMQ_PUB));
instance_ctx._subscrib_sock =
boost::shared_ptr<zmq::socket_t>(new zmq::socket_t(grid_ctx._sock_ctx, ZMQ_SUB));
}catch(std::exception &e){
std::cout<<"error: failed to alloc socket"<<std::endl;
std::cout<<e.what()<<std::endl;
return -1;
}
uchar prefix = GRID_MESSAGE_PREFIX;
instance_ctx._subscrib_sock->setsockopt(ZMQ_SUBSCRIBE, &prefix, 1);
std::string publish_address = "tcp://",
router_address = "tcp://";
publish_address += instance_ctx._ip;
router_address += instance_ctx._ip;
publish_address += ":";
router_address += ":";
publish_address += instance_ctx._publish_port;
router_address += instance_ctx._router_port;
// 绑定router和publish句柄
try{
instance_ctx._router_sock->bind(router_address.c_str());
}catch(std::exception &e){
std::cout<<"error: failed to bind router socket"<<std::endl;
std::cout<<e.what()<<std::endl;
return -1;
}
std::cout<<"info: bind router at "<<router_address<<std::endl;
try{
instance_ctx._publish_sock->bind(publish_address.c_str());
}catch(std::exception &e){
std::cout<<"error: failed to bing publish socket"<<std::endl;
std::cout<<e.what()<<std::endl;
return -1;
}
std::cout<<"info: bind publish at "<<publish_address<<std::endl;
// 添加router和subscrib到pollin句柄列表
boost::shared_ptr<PollinContext> router_pollin_ctx;
try{
router_pollin_ctx.reset(new PollinContext);
}catch(...){
return -1;
}
router_pollin_ctx->_sock = instance_ctx._router_sock.get();
router_pollin_ctx->_on_read = handle_handshake_request;
grid_ctx.add_pollin(router_pollin_ctx);
std::cout<<"info: router socket pollin added "<<router_pollin_ctx->_sock<<std::endl;
boost::shared_ptr<PollinContext> subscrib_pollin_ctx;
try{
subscrib_pollin_ctx.reset(new PollinContext);
}catch(...){
return -1;
}
subscrib_pollin_ctx->_sock = instance_ctx._subscrib_sock.get();
subscrib_pollin_ctx->_on_read = handle_subscrib_message;
grid_ctx.add_pollin(subscrib_pollin_ctx);
std::cout<<"info: subscrib socket pollin added "<<subscrib_pollin_ctx->_sock<<std::endl;
return 0;
}
int do_handshake(std::string remote_address){
HandshakeContext *handshake_ctx = NULL;
try{
handshake_ctx = new HandshakeContext;
grid_ctx._ctx._request_sock.reset(new zmq::socket_t(grid_ctx._sock_ctx, ZMQ_REQ));
int liner = 1;
grid_ctx._ctx._request_sock->setsockopt(ZMQ_LINGER, &liner, sizeof(liner));
handshake_ctx->_sock = grid_ctx._ctx._request_sock.get();
}catch(std::exception &e){
std::cout<<"error: failed to alloc handshake context "<<e.what()<<std::endl;
return -1;
}
boost::shared_ptr<PollinContext> pollin_ctx;
try{
pollin_ctx.reset(new PollinContext);
}catch(...){
DELIF(handshake_ctx->_sock);
DELIF(handshake_ctx);
return -1;
}
// 监听REQ接口
pollin_ctx->_sock = handshake_ctx->_sock;
pollin_ctx->_data = handshake_ctx;
pollin_ctx->_on_read = handle_handshake_response;
grid_ctx.add_pollin(pollin_ctx);
handshake_ctx->_sock->connect(remote_address.c_str());
// 发送握手消息
if(send_handshake_init(*handshake_ctx)){
grid_ctx.del_pollin(pollin_ctx);
std::cout<<"info: pollin deleted "<<pollin_ctx->_sock<<std::endl;
DELIF(handshake_ctx->_sock);
DELIF(handshake_ctx);
return -1;
}
std::cout<<"info: handshake message sent to "<<remote_address<<std::endl;
return 0;
}
void broadcast_instance_exit(){
zmq::message_t msg(18);
uchar *data = (uchar *)msg.data();
data[0] = GRID_MESSAGE_PREFIX;
data[1] = MSG_TYPE_EXIT;
uuid_t u;
char us[37];
memcpy(us, grid_ctx._ctx._id.c_str(), 36);
us[36] = '\0';
uuid_parse(us, u);
memcpy(data+2, u, 16);
try{
grid_ctx._ctx._publish_sock->send(msg);
}catch(std::exception &e){
std::cout<<"error: failed to broadcast exit "<<e.what()<<std::endl;
}
std::cout<<"info: "<<MessageTypeString[MSG_TYPE_EXIT]<<" broadcasted"<<std::endl;
}
void broadcast_heartbeat(){
zmq::message_t msg(18);
uchar *data = (uchar *)msg.data();
data[0] = GRID_MESSAGE_PREFIX;
data[1] = MSG_TYPE_HEARTBEAT;
uuid_t u;
char us[37];
memcpy(us, grid_ctx._ctx._id.c_str(), 36);
us[36] = '\0';
uuid_parse(us, u);
memcpy(data+2, u, 16);
try{
grid_ctx._ctx._publish_sock->send(msg);
}catch(std::exception &e){
std::cout<<"error: failed to broadcast heartbeat "<<e.what()<<std::endl;
}
std::cout<<"info: heartbeat broadcasted"<<std::endl;
}
int main(int argc, char *argv[])
{
// 解析命令行参数
namespace po = boost::program_options;
po::options_description desc("Allowed options");
desc.add_options()
("help,h", "produce help options")
("role", po::value<std::string>(), "set role, first or second")
("local_ip", po::value<std::string>(), "local ip address to bind")
("remote_ip", po::value<std::string>(), "remote ip address")
("remote_router_port", po::value<std::string>(), "remote router port");
po::variables_map vm;
//po::store(po::parse_command_line(argc, argv, desc), vm);
po::store(po::command_line_parser(argc, argv).options(desc).allow_unregistered().run(), vm);
po::notify(vm);
if(vm.count("help")){
std::cout<<desc<<std::endl;
return 1;
}
if(vm.count("role")){
std::string role = vm["role"].as<std::string>();
if(role.compare("master") == 0){
process_role = ROLE_MASTER;
std::cout<<"info: role set to "<<vm["role"].as<std::string>()<<std::endl;
}
else if(role.compare("worker") == 0){
process_role = ROLE_WORKER;
std::cout<<"info: role set to "<<vm["role"].as<std::string>()<<std::endl;
}
else {
std::cout<<"info: role ["<<vm["role"].as<std::string>()<<"] invalid"<<std::endl;
}
}
else {
std::cout<<"Role need to be set."<<std::endl;
std::cout<<desc<<std::endl;
return 1;
}
if(vm.count("local_ip")){
local_ip = vm["local_ip"].as<std::string>();
}
else {
std::cout<<"Local ip need to be set."<<std::endl;
std::cout<<desc<<std::endl;
return 1;
}
if(vm.count("remote_ip")){
remote_ip = vm["remote_ip"].as<std::string>();
}
else if(process_role == ROLE_WORKER){
std::cout<<"Remote ip need to be set."<<std::endl;
std::cout<<desc<<std::endl;
return 1;
}
if(vm.count("remote_router_port")){
remote_router_port_str = vm["remote_router_port"].as<std::string>();
}
else if(process_role == ROLE_WORKER){
std::cout<<"Remote router port need to be set."<<std::endl;
std::cout<<desc<<std::endl;
return 1;
}
if(init()){
return -1;
}
if(process_role == ROLE_WORKER){
// 连接master的router接口,并发送握手消息
//创建worker request接口
std::string sock_router_address = "tcp://";
make_tcp_address(remote_ip, remote_router_port_str, sock_router_address);
if(do_handshake(sock_router_address)){
std::cout<<"error: failed to do handshake with "<<sock_router_address<<std::endl;
return -1;
}
}
signal(SIGINT, int_handler);
while(!exit_process){
grid_ctx.poll();
if(current_time - grid_ctx._ctx._last_heartbeat_time > 5){
broadcast_heartbeat();
grid_ctx._ctx._last_heartbeat_time = current_time;
}
sleep(1);
}
broadcast_instance_exit();
sleep(1);
std::cout<<"Process exit"<<std::endl;
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment