Last active
August 29, 2015 14:15
-
-
Save oktal/cfef32adff358e8f5d3b to your computer and use it in GitHub Desktop.
Multipart messaging protocol with nanomsg
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <nanomsg/nn.h> | |
#include <nanomsg/reqrep.h> | |
#include <string> | |
#include <cstring> | |
#include <cstdio> | |
#include <iostream> | |
#include <vector> | |
#include <memory> | |
#include <limits> | |
namespace Nano { | |
const char* encodeMesage(const char* s) { | |
return s; | |
} | |
std::string encodeMessage(const std::string& str) { | |
return str; | |
} | |
template<typename Int> | |
typename std::enable_if<std::is_integral<Int>::value, std::string>::type | |
encodeMessage(Int val) { | |
return std::to_string(val); | |
} | |
template<typename Float> | |
typename std::enable_if<std::is_floating_point<Float>::value, std::string>::type | |
encodeMessage(Float val) { | |
return std::to_string(val); | |
} | |
namespace Multipart { | |
typedef uint16_t SizeType; | |
static constexpr size_t MaxParts = std::numeric_limits<SizeType>::max(); | |
namespace details { | |
struct Packer { | |
std::vector<std::string> payload; | |
template<typename Head, typename... Tail> void pack(Head&& head, Tail&& ...rest) | |
{ | |
payload.push_back(Nano::encodeMessage(std::forward<Head>(head))); | |
pack(rest...); | |
} | |
void pack() | |
{ | |
} | |
}; | |
} | |
std::pair<char *, size_t> pack(const std::vector<std::string>& message) | |
{ | |
const auto partsCount = message.size(); | |
if (partsCount > MaxParts) | |
throw std::invalid_argument("The message contains more parts than allowed"); | |
size_t totalLen = sizeof(SizeType); /* The number of parts */ | |
for (const auto& msg: message) { | |
if (msg.size() > MaxParts) | |
throw std::invalid_argument("The message is too big"); | |
totalLen += (msg.size() + sizeof(SizeType)); /* Message size + bytes */ | |
} | |
auto encodeSize = [](SizeType size, char *&buf) { | |
*buf++ = (size >> 8) & 0xFF; | |
*buf++ = size & 0xFF; | |
}; | |
std::unique_ptr<char[]> buf(new char[totalLen]); | |
char* ptr = buf.get(); | |
encodeSize(partsCount, ptr); | |
for (const auto& msg: message) { | |
encodeSize(msg.size(), ptr); | |
std::memcpy(ptr, msg.c_str(), msg.size()); | |
ptr += msg.size(); | |
} | |
return std::make_pair(buf.release(), totalLen); | |
} | |
std::vector<std::string> unpack(const char* buf) | |
{ | |
auto decodeSize = [](const char* buf) -> SizeType { | |
return buf[0] << 8 | buf[1]; | |
}; | |
const SizeType parts = decodeSize(buf); | |
buf += 2; | |
std::vector<std::string> messages; | |
messages.reserve(parts); | |
for (SizeType i = 0; i < parts; ++i) { | |
const SizeType len = decodeSize(buf); | |
buf += 2; | |
messages.push_back(std::string(buf, buf + len)); | |
buf += len; | |
} | |
return messages; | |
} | |
int send(int sock, const std::vector<std::string>& message) { | |
char* msg = 0; | |
size_t len = 0; | |
std::tie(msg, len) = pack(message); | |
int nbytes = nn_send(sock, msg, len, 0); | |
return nbytes; | |
} | |
int recv(int sock, std::vector<std::string>& message) { | |
void* buf = NULL; | |
int nbytes = nn_recv(sock, &buf, NN_MSG, 0); | |
if (nbytes > 0) { | |
message = unpack(static_cast<const char*>(buf)); | |
} | |
return nbytes; | |
} | |
template<typename... Args> | |
int send(int sock, Args&& ...args) { | |
details::Packer packer; | |
packer.pack(args...); | |
char* msg = 0; | |
size_t len = 0; | |
std::tie(msg, len) = pack(packer.payload); | |
int nbytes = nn_send(sock, msg, len, 0); | |
return nbytes; | |
} | |
} // namespace Multipart | |
} // namespace Nano | |
void test_multipart(const std::string& url) | |
{ | |
int sock1 = nn_socket(AF_SP, NN_REQ); | |
if (sock1 == -1) | |
perror("nn_socket"); | |
if (nn_bind(sock1, url.c_str()) == -1) | |
perror("nn_bind"); | |
int sock2 = nn_socket(AF_SP, NN_REP); | |
if (sock2 == -1) | |
perror("nn_socket"); | |
if (nn_connect(sock2, url.c_str()) == -1) | |
perror("nn_connect"); | |
int nbytes = Nano::Multipart::send(sock1, "Hello", std::string("World"), 21); | |
if (nbytes == -1) | |
perror("send_multipart"); | |
else | |
std::cout << "Sent " << nbytes << " bytes" << std::endl; | |
std::vector<std::string> message; | |
nbytes = Nano::Multipart::recv(sock2, message); | |
if (nbytes == -1) | |
perror("recv_multipart"); | |
else { | |
std::cout << "Received " << nbytes << " bytes" << std::endl; | |
for (const auto& msg: message) std::cout << msg << std::endl; | |
} | |
} | |
int main() | |
{ | |
test_multipart("tcp://127.0.0.1:9090"); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment