Created
May 24, 2013 16:49
-
-
Save deepcube/5644856 to your computer and use it in GitHub Desktop.
zeromq broker. Frontend client prepends messages with the ID of the final destination. Broker receives message through a ZMQ_ROUTER so message now has initial source ID prepended. Broker swaps first two frames and sends message out backend ZMQ_ROUTER, which routes message to final destination. Clients used as destinations must identify to broker…
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <czmq.h> | |
#include <stdlib.h> | |
#include <zmq.h> | |
#include "zerr.h" | |
/* | |
* read message, switch first two frames (from and to), send it back out | |
* arg is the socket to which the message should be sent. For the front_end arg | |
* is back_end, and vice versa. | |
* | |
* NOTE: If a zmsg_push() succeeds, the frame is owned by the message, and will | |
* be destroyed along with the message on zmsg_send(). | |
* If a zmsg_push() fails, we still own the frame and must destroy it | |
* I asked on the zmq mailing list about this, and will submit a patch | |
* so these calls nullify the reference. | |
* TODO: Log errors to syslog, log EHOSTUNREACH errors once per unreachable | |
* host until reconnect. | |
* Find out if any of these errors should be fatal. | |
*/ | |
int msg_handler(zloop_t *loop, zmq_pollitem_t *item, void *arg) | |
{ | |
zmsg_t *msg = NULL; | |
zframe_t *src = NULL; | |
zframe_t *dst = NULL; | |
void *out = arg; | |
zcheck_goto(cleanup, msg = zmsg_recv(item->socket), "zmsg_recv failed" ); | |
zcheck_goto(cleanup, 2 <= zmsg_size( msg) , "message too small (need 2 address frames)" ); | |
zcheck_goto(cleanup, src = zmsg_pop ( msg) , "zmsg_pop failed to pop source address" ); | |
zcheck_goto(cleanup, dst = zmsg_pop ( msg) , "zmsg_pop failed to pop destination address" ); | |
zcheck_goto(cleanup, 0 == zmsg_push( msg, src) , "zmsg_push failed to push source address" ); | |
src = NULL; | |
zcheck_goto(cleanup, 0 == zmsg_push( msg, dst) , "zmsg_push failed to push destination address"); | |
dst = NULL; | |
zcheck_goto(cleanup, 0 == zmsg_send(&msg, out) , "zmsg_send failed" ); | |
return (0); | |
cleanup: | |
if (src) { zframe_print(src, "source addr:"); zframe_destroy(&src); } | |
if (dst) { zframe_print(dst, "dest addr:"); zframe_destroy(&dst); } | |
if (msg) { zmsg_dump(msg) ; zmsg_destroy (&msg); } | |
return (0); // returning -1 causes the zloop to exit. we want to complain, but continue running | |
} | |
int main(int argc, char **argv) | |
{ | |
zctx_t *ctx = NULL; | |
void *front_end = NULL; | |
void *back_end = NULL; | |
zloop_t *loop = NULL; | |
zmq_pollitem_t front_poll = {}; | |
zmq_pollitem_t back_poll = {}; | |
int status = EXIT_FAILURE; | |
zcheck_err(EXIT_FAILURE, argc == 4, "Usage: %s <protocol://front_address> <protocol://back_address> <broker_id>", argv[0]); | |
zcheck_goto(cleanup, ctx = zctx_new() , "zctx_new failed to create zeromq context" ); | |
zcheck_goto(cleanup, front_end = zsocket_new(ctx, ZMQ_ROUTER), "zsocket_new failed to create front_end socket"); | |
zcheck_goto(cleanup, back_end = zsocket_new(ctx, ZMQ_ROUTER), "zsocket_new failed to create back_end socket" ); | |
zcheck_goto(cleanup, loop = zloop_new() , "zloop_new failed" ); | |
zsocket_set_router_mandatory(front_end, 1); | |
zsocket_set_router_mandatory(back_end , 1); | |
zsocket_set_identity(front_end, argv[3]); | |
zsocket_set_identity(back_end , argv[3]); | |
front_poll = (zmq_pollitem_t){ .socket = front_end, .fd = 0, .events = ZMQ_POLLIN, .revents = 0 }; | |
back_poll = (zmq_pollitem_t){ .socket = back_end , .fd = 0, .events = ZMQ_POLLIN, .revents = 0 }; | |
//TODO: use zeromq's perf tools to test IPC vs TCP for back_end | |
zcheck_goto(cleanup, 0 <= zsocket_bind(front_end, argv[1]) , "zsocket_bind failed to bind front_end socket" ); | |
zcheck_goto(cleanup, 0 <= zsocket_bind(back_end , argv[2]) , "zsocket_bind failed to bind back_end socket" ); | |
zcheck_goto(cleanup, 0 == zloop_poller(loop, &front_poll, msg_handler, back_end ), "zloop_poller failed to register front_end handler"); | |
zcheck_goto(cleanup, 0 == zloop_poller(loop, &back_poll , msg_handler, front_end), "zloop_poller failed to register back_end handler" ); | |
status = (zloop_start(loop) ? EXIT_FAILURE : EXIT_SUCCESS); | |
cleanup: | |
if (loop) zloop_destroy(&loop); | |
if (ctx ) zctx_destroy(&ctx); | |
return (status); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment