Created
October 10, 2014 07:37
-
-
Save lnicola/f5a8c9daf5f8e983fdd9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <array> | |
#include <condition_variable> | |
#include <cstdio> | |
#include <mutex> | |
#include <iostream> | |
#include <string> | |
#include <vector> | |
#include <thread> | |
#include <errno.h> | |
#include <poll.h> | |
#include <sys/types.h> | |
#include <sys/socket.h> | |
#include <sys/un.h> | |
using namespace std; | |
class message | |
{ | |
public: | |
virtual void process() = 0; | |
virtual ~message() | |
{ | |
} | |
}; | |
class notify_message : public message | |
{ | |
string text_; | |
public: | |
notify_message(const string &text) | |
: text_(text) | |
{ | |
} | |
void process() override | |
{ | |
cerr << text_; | |
} | |
}; | |
class worker_thread | |
{ | |
public: | |
vector<unique_ptr<message>> messages_; | |
vector<unique_ptr<message>> messages_back_buffer_; | |
mutex message_mutex_; | |
condition_variable message_cond_; | |
thread wt_; | |
~worker_thread() | |
{ | |
wt_.join(); | |
} | |
void start() | |
{ | |
wt_ = thread(&worker_thread::run, this); | |
} | |
void run() | |
{ | |
unique_lock<mutex> lock(message_mutex_); | |
while (true) { | |
message_cond_.wait(lock, [&]() { | |
return !messages_.empty(); | |
}); | |
swap(messages_, messages_back_buffer_); | |
lock.unlock(); | |
for (const auto &msg : messages_back_buffer_) { | |
try { | |
msg->process(); | |
} | |
catch (const exception &e) { | |
cerr << e.what() << '\n'; | |
} | |
catch (...) { | |
cerr << "An error has occurred\n"; | |
} | |
} | |
messages_back_buffer_.clear(); | |
lock.lock(); | |
} | |
} | |
void add_messages(vector<unique_ptr<message>> messages) | |
{ | |
if (!messages.empty()) { | |
(lock_guard<mutex>(message_mutex_), | |
messages_.insert(begin(messages_), | |
make_move_iterator(begin(messages)), | |
make_move_iterator(end(messages)))); | |
message_cond_.notify_one(); | |
} | |
} | |
}; | |
int make_socket() | |
{ | |
int sock = socket(AF_UNIX, SOCK_DGRAM | SOCK_NONBLOCK, 0); | |
if (sock < 0) { | |
perror("socket"); | |
exit(-1); | |
} | |
const char *sock_path = "foo"; | |
if (remove(sock_path) < 0 && errno != ENOENT) { | |
perror("remove"); | |
exit(-1); | |
} | |
sockaddr_un server_addr; | |
memset(&server_addr, sizeof server_addr, 0); | |
server_addr.sun_family = AF_UNIX; | |
strncpy(server_addr.sun_path, sock_path, sizeof server_addr.sun_path - 1); | |
if (bind(sock, reinterpret_cast<sockaddr *>(&server_addr), sizeof server_addr) < 0) { | |
perror("bind"); | |
exit(-1); | |
} | |
return sock; | |
} | |
int main() | |
{ | |
int sock = make_socket(); | |
array<pollfd, 1> fds = { sock, POLLIN, 0 }; | |
worker_thread worker; | |
worker.start(); | |
while (true) { | |
int ret = poll(begin(fds), fds.size(), -1); | |
if (ret < 0 && errno == EINTR) { | |
continue; | |
} | |
if (fds[0].revents & POLLIN) { | |
vector<unique_ptr<message>> messages; | |
while (true) { | |
char buf[1024] = { }; | |
ssize_t len = recvfrom(sock, buf, sizeof buf, 0, NULL, NULL); | |
if (len > 0) { | |
messages.emplace_back(new notify_message(buf)); | |
continue; | |
} | |
else if (len < 0) { | |
if (errno != EINTR && errno != EWOULDBLOCK) { | |
perror("recvfrom"); | |
} | |
} | |
break; | |
} | |
worker.add_messages(move(messages)); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment