Created
December 12, 2015 08:39
-
-
Save amedama41/c11c59e9493a99cda651 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <cstddef> | |
#include <algorithm> | |
#include <atomic> | |
#include <exception> | |
#include <iostream> | |
#include <random> | |
#include <thread> | |
#include <vector> | |
#include <boost/asio/io_service.hpp> | |
#include <boost/asio/strand.hpp> | |
#include <boost/program_options.hpp> | |
#include <boost/thread/barrier.hpp> | |
#include <boost/timer/timer.hpp> | |
namespace asio = boost::asio; | |
struct work | |
{ | |
static std::size_t nloop_max; | |
static thread_local std::mt19937 rnd; | |
void operator()() | |
{ | |
auto const volatile count | |
= std::uniform_int_distribution<std::size_t>(nloop_max * 0.9, nloop_max)(rnd); | |
for (auto i = std::size_t{}; i < count; ++i) {} | |
} | |
}; | |
thread_local std::mt19937 work::rnd{}; | |
std::size_t work::nloop_max{}; | |
auto create_asio_work(std::vector<asio::io_service>& io_service) | |
-> std::vector<asio::io_service::work> | |
{ | |
auto works = std::vector<asio::io_service::work>{}; | |
works.reserve(io_service.size()); | |
for (auto&& ios : io_service) { | |
works.emplace_back(ios); | |
} | |
return works; | |
} | |
auto create_consumer_threads( | |
std::size_t const nconsumer | |
, std::vector<asio::io_service>& io_service | |
, boost::barrier& start_barrier, boost::barrier& consumer_stop_barrier) | |
-> std::vector<std::thread> | |
{ | |
std::cout << "setup " << nconsumer << " consumer thread(s)..." << std::endl; | |
auto consumers = std::vector<std::thread>{}; | |
consumers.reserve(nconsumer); | |
for (auto i = std::size_t{}; i < nconsumer; ++i) { | |
consumers.emplace_back([&, i]{ | |
start_barrier.wait(); | |
try { | |
io_service[i % io_service.size()].run(); | |
} | |
catch (std::exception const& e) { | |
std::cerr << "consumer " << i << ':' << e.what() << std::endl; | |
} | |
consumer_stop_barrier.wait(); | |
}); | |
} | |
return consumers; | |
} | |
template <class Executor> | |
auto create_producer_threads( | |
std::size_t const nproducer, std::size_t const nwork | |
, std::vector<Executor>& executor | |
, boost::barrier& start_barrier, boost::barrier& producer_stop_barrier) | |
-> std::vector<std::thread> | |
{ | |
auto const nwork_per_thread = (nproducer > 0) ? nwork / nproducer : 0; | |
std::cout | |
<< "setup " << nproducer << " producer thread(s)" | |
<< " with " << nwork_per_thread << " work(s) per thread..." | |
<< std::endl; | |
auto producers = std::vector<std::thread>{}; | |
producers.reserve(nproducer); | |
for (auto i = std::size_t{}; i < nproducer; ++i) { | |
producers.emplace_back([&, i, nwork_per_thread]{ | |
start_barrier.wait(); | |
try { | |
for (auto j = std::size_t{}; j < nwork_per_thread; ++j) { | |
executor[i % executor.size()].post(work{}); | |
} | |
} | |
catch (std::exception const& e) { | |
std::cerr << "producer " << i << ':' << e.what() << std::endl; | |
} | |
producer_stop_barrier.wait(); | |
}); | |
} | |
return producers; | |
} | |
template <class Executor> | |
void run(std::vector<asio::io_service>& io_service | |
, std::size_t const nconsumer | |
, std::vector<Executor>& executor | |
, std::size_t const nproducer, std::size_t const nwork) | |
{ | |
auto asio_works = create_asio_work(io_service); | |
boost::barrier start_barrier(nconsumer + nproducer + 1); | |
boost::barrier consumer_stop_barrier(nconsumer + 1); | |
boost::barrier producer_stop_barrier(nproducer + 1); | |
auto consumers = create_consumer_threads( | |
nconsumer, io_service, start_barrier, consumer_stop_barrier); | |
auto producers = create_producer_threads( | |
nproducer, nwork, executor, start_barrier, producer_stop_barrier); | |
{ | |
std::cout << "running..." << std::endl; | |
start_barrier.wait(); | |
boost::timer::auto_cpu_timer timer{}; | |
producer_stop_barrier.wait(); | |
asio_works.clear(); | |
consumer_stop_barrier.wait(); | |
} | |
for (auto&& t : producers) { | |
t.join(); | |
} | |
for (auto&& t : consumers) { | |
t.join(); | |
} | |
} | |
template <class Executor> | |
void set_prework(std::vector<Executor>& executor, std::size_t const nwork) | |
{ | |
std::cout << "setup " << nwork << " prework(s)..." << std::endl; | |
for (auto i = std::size_t{}; i < nwork; ++i) { | |
executor[i % executor.size()].post(work{}); | |
} | |
} | |
int main(int argc, char const* argv[]) | |
{ | |
try { | |
namespace popts = boost::program_options; | |
popts::options_description desc{"perform io_service on consumer/producer model"}; | |
desc.add_options() | |
("help,h", "display help message") | |
("nio_service,i", popts::value<std::size_t>()->default_value(1), "the number of io_services") | |
("nconsumer,c", popts::value<std::size_t>()->default_value(1), "the number of consumers") | |
("nproducer,p", popts::value<std::size_t>()->default_value(0), "the number of producers") | |
("nwork,w", popts::value<std::size_t>()->default_value(0), "the number of works that producers post to io_services") | |
("nprework,r", popts::value<std::size_t>()->default_value(0), "the number of works which are posted to io_services in advance") | |
("nloop,l", popts::value<std::size_t>(&work::nloop_max)->default_value(1000), "max loop count per work") | |
("strand,s", "use strand as executor") | |
; | |
auto vm = popts::variables_map{}; | |
popts::store(popts::parse_command_line(argc, argv, desc), vm); | |
popts::notify(vm); | |
if (vm.count("help")) { | |
std::cout << desc << std::endl; | |
return 0; | |
} | |
auto const nio_service = vm["nio_service"].as<std::size_t>(); | |
auto const nconsumer | |
= std::max(vm["nconsumer"].as<std::size_t>(), nio_service); | |
auto const nproducer = vm["nproducer"].as<std::size_t>(); | |
auto const nwork = vm["nwork"].as<std::size_t>(); | |
auto const nprework = vm["nprework"].as<std::size_t>(); | |
if (!vm.count("strand")) { | |
std::cout << "use " << nio_service << " io_service(s)" << std::endl; | |
auto io_service = std::vector<asio::io_service>(nio_service); | |
set_prework(io_service, nprework); | |
run(io_service, nconsumer, io_service, nproducer, nwork); | |
} | |
else { | |
std::cout << "use " << nio_service << " strand(s)" << std::endl; | |
auto io_service = std::vector<asio::io_service>(1); | |
auto strand = std::vector<asio::io_service::strand>{}; | |
strand.reserve(nio_service); | |
for (auto i = std::size_t{}; i < nio_service; ++i) { | |
strand.emplace_back(io_service[0]); | |
} | |
set_prework(strand, nprework); | |
run(io_service, nconsumer, strand, nproducer, nwork); | |
} | |
} | |
catch (std::exception const& e) { | |
std::cerr << e.what() << std::endl; | |
} | |
return 0; | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment