Created
August 18, 2017 08:44
-
-
Save taeguk/0c0a44fd62713108417be7eb5400a97b to your computer and use it in GitHub Desktop.
scanpartitioner_with_removing_finalitems.cpp
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright (c) 2007-2017 Hartmut Kaiser | |
// Copyright (c) 2015 Daniel Bourgeois | |
// Copyright (c) 2017 Taeguk Kwon | |
// | |
// Distributed under the Boost Software License, Version 1.0. (See accompanying | |
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
#if !defined(HPX_PARALLEL_UTIL_SCAN_PARTITIONER_DEC_30_2014_0227PM) | |
#define HPX_PARALLEL_UTIL_SCAN_PARTITIONER_DEC_30_2014_0227PM | |
#include <hpx/config.hpp> | |
#include <hpx/dataflow.hpp> | |
#include <hpx/exception_list.hpp> | |
#include <hpx/lcos/wait_all.hpp> | |
#include <hpx/runtime/launch_policy.hpp> | |
#include <hpx/util/bind.hpp> | |
#include <hpx/util/decay.hpp> | |
#include <hpx/util/unused.hpp> | |
#include <hpx/parallel/execution_policy.hpp> | |
#include <hpx/parallel/executors/executor_parameter_traits.hpp> | |
#include <hpx/parallel/executors/execution.hpp> | |
#include <hpx/parallel/traits/extract_partitioner.hpp> | |
#include <hpx/parallel/util/detail/algorithm_result.hpp> | |
#include <hpx/parallel/util/detail/chunk_size.hpp> | |
#include <hpx/parallel/util/detail/handle_local_exceptions.hpp> | |
#include <hpx/parallel/util/detail/scoped_executor_parameters.hpp> | |
#include <algorithm> | |
#include <cstddef> | |
#include <exception> | |
#include <list> | |
#include <memory> | |
#include <utility> | |
#include <vector> | |
/////////////////////////////////////////////////////////////////////////////// | |
namespace hpx { namespace parallel { namespace util | |
{ | |
struct scan_partitioner_normal_tag {}; | |
struct scan_partitioner_sequential_f3_tag {}; | |
/////////////////////////////////////////////////////////////////////////// | |
namespace detail | |
{ | |
/////////////////////////////////////////////////////////////////////// | |
// The static partitioner simply spawns one chunk of iterations for | |
// each available core. | |
template <typename R, typename Result1, typename Result2, | |
typename ScanPartTag> | |
struct static_scan_partitioner_async; | |
template <typename R, typename Result1, typename Result2> | |
struct static_scan_partitioner_async<R, Result1, Result2, | |
scan_partitioner_normal_tag> | |
{ | |
template <typename ExPolicy, typename FwdIter, typename T, | |
typename F1, typename F2, typename F3, typename F4> | |
static hpx::future<R> call(ExPolicy && policy, FwdIter first, | |
std::size_t count, T && init, F1 && f1, F2 && f2, F3 && f3, | |
F4 && f4) | |
{ | |
typedef typename | |
hpx::util::decay<ExPolicy>::type::executor_parameters_type | |
parameters_type; | |
typedef executor_parameter_traits<parameters_type> | |
parameters_traits; | |
typedef scoped_executor_parameters<parameters_type> | |
scoped_executor_parameters; | |
// inform parameter traits | |
std::shared_ptr<scoped_executor_parameters> | |
scoped_param(std::make_shared< | |
scoped_executor_parameters | |
>(policy.parameters())); | |
std::vector<hpx::shared_future<Result1> > workitems; | |
std::vector<hpx::future<Result2> > finalitems; | |
std::list<std::exception_ptr> errors; | |
try { | |
// pre-initialize first intermediate result | |
workitems.push_back(make_ready_future(std::forward<T>(init))); | |
HPX_ASSERT(count > 0); | |
FwdIter first_ = first; | |
std::size_t count_ = count; | |
// estimate a chunk size based on number of cores used | |
typedef typename parameters_traits::has_variable_chunk_size | |
has_variable_chunk_size; | |
auto shape = get_bulk_iteration_shape(policy, workitems, | |
f1, first, count, 1, has_variable_chunk_size()); | |
// schedule every chunk on a separate thread | |
std::size_t size = hpx::util::size(shape); | |
workitems.reserve(size + 1); | |
finalitems.reserve(size); | |
// If the size of count was enough to warrant testing for a | |
// chunk, pre-initialize second intermediate result and | |
// start f3. | |
if (workitems.size() == 2) | |
{ | |
HPX_ASSERT(count_ > count); | |
hpx::shared_future<Result1> curr = workitems[1]; | |
finalitems.push_back(dataflow(hpx::launch::sync, | |
f3, first_, count_ - count, workitems[0], curr)); | |
workitems[1] = dataflow(hpx::launch::sync, | |
f2, workitems[0], curr); | |
} | |
// Schedule first step of scan algorithm, step 2 is | |
// performed as soon as the current partition and the | |
// partition to the left is ready. | |
for(auto const& elem: shape) | |
{ | |
FwdIter it = hpx::util::get<0>(elem); | |
std::size_t size = hpx::util::get<1>(elem); | |
hpx::shared_future<Result1> prev = workitems.back(); | |
auto curr = execution::async_execute( | |
policy.executor(), f1, it, size).share(); | |
finalitems.push_back(dataflow(hpx::launch::sync, | |
f3, it, size, prev, curr)); | |
workitems.push_back(dataflow(hpx::launch::sync, | |
f2, prev, curr)); | |
} | |
} | |
catch (std::bad_alloc const&) { | |
return hpx::make_exceptional_future<R>( | |
std::current_exception()); | |
} | |
catch (...) { | |
errors.push_back(std::current_exception()); | |
} | |
// wait for all tasks to finish | |
return dataflow( | |
[errors, f4, scoped_param]( | |
std::vector<hpx::shared_future<Result1> >&& witems, | |
std::vector<hpx::future<Result2> >&& fitems | |
) mutable -> R | |
{ | |
HPX_UNUSED(scoped_param); | |
handle_local_exceptions<ExPolicy>::call(witems, errors); | |
handle_local_exceptions<ExPolicy>::call(fitems, errors); | |
return f4(std::move(witems), std::move(fitems)); | |
}, | |
std::move(workitems), std::move(finalitems)); | |
} | |
}; | |
template <typename R, typename Result1, typename Result2> | |
struct static_scan_partitioner_async<R, Result1, Result2, | |
scan_partitioner_sequential_f3_tag> | |
{ | |
template <typename ExPolicy, typename FwdIter, typename T, | |
typename F1, typename F2, typename F3, typename F4> | |
static hpx::future<R> call(ExPolicy && policy, FwdIter first, | |
std::size_t count, T && init, F1 && f1, F2 && f2, F3 && f3, | |
F4 && f4) | |
{ | |
typedef typename | |
hpx::util::decay<ExPolicy>::type::executor_parameters_type | |
parameters_type; | |
typedef executor_parameter_traits<parameters_type> | |
parameters_traits; | |
typedef scoped_executor_parameters<parameters_type> | |
scoped_executor_parameters; | |
// inform parameter traits | |
std::shared_ptr<scoped_executor_parameters> | |
scoped_param(std::make_shared< | |
scoped_executor_parameters | |
>(policy.parameters())); | |
std::vector<hpx::shared_future<Result1> > workitems; | |
std::list<std::exception_ptr> errors; | |
try { | |
// pre-initialize first intermediate result | |
workitems.push_back(make_ready_future(std::forward<T>(init))); | |
HPX_ASSERT(count > 0); | |
FwdIter first_ = first; | |
std::size_t count_ = count; | |
bool tested = false; | |
// estimate a chunk size based on number of cores used | |
typedef typename parameters_traits::has_variable_chunk_size | |
has_variable_chunk_size; | |
auto shape = get_bulk_iteration_shape(policy, workitems, | |
f1, first, count, 1, has_variable_chunk_size()); | |
// schedule every chunk on a separate thread | |
std::size_t size = hpx::util::size(shape); | |
workitems.reserve(size + 1); | |
finalitems.reserve(size); | |
// If the size of count was enough to warrant testing for a | |
// chunk, pre-initialize second intermediate result. | |
if (workitems.size() == 2) | |
{ | |
tested = true; | |
hpx::shared_future<Result1> curr = workitems[1]; | |
workitems[1] = dataflow(hpx::launch::sync, | |
f2, workitems[0], curr); | |
} | |
// Schedule first step of scan algorithm, step 2 is | |
// performed as soon as the current partition and the | |
// partition to the left is ready. | |
for (auto const& elem : shape) | |
{ | |
FwdIter it = hpx::util::get<0>(elem); | |
std::size_t size = hpx::util::get<1>(elem); | |
hpx::shared_future<Result1> prev = workitems.back(); | |
auto curr = execution::async_execute( | |
policy.executor(), f1, it, size).share(); | |
workitems.push_back(dataflow(hpx::launch::sync, | |
f2, prev, curr)); | |
} | |
std::size_t idx = 0ul; | |
if (tested) | |
{ | |
HPX_ASSERT(count_ > count); | |
hpx::wait_all(workitems[0], workitems[1]); | |
hpx::util::invoke(f3, first_, count_ - count, | |
workitems[0], workitems[1]); | |
++idx; | |
} | |
// Performs step 3 sequentially. | |
for (auto const& elem : shape) | |
{ | |
FwdIter it = hpx::util::get<0>(elem); | |
std::size_t size = hpx::util::get<1>(elem); | |
hpx::wait_all(workitems[idx], workitems[idx + 1]); | |
// push ready future to finaliterms? | |
hpx::util::invoke(f3, it, size, | |
workitems[idx], workitems[idx + 1]); | |
++idx; | |
} | |
} | |
catch (std::bad_alloc const&) { | |
return hpx::make_exceptional_future<R>( | |
std::current_exception()); | |
} | |
catch (...) { | |
errors.push_back(std::current_exception()); | |
} | |
// wait for all tasks to finish | |
return dataflow( | |
[errors, f4, scoped_param]( | |
std::vector<hpx::shared_future<Result1> >&& witems | |
) mutable -> R | |
{ | |
HPX_UNUSED(scoped_param); | |
handle_local_exceptions<ExPolicy>::call(witems, errors); | |
return f4(std::move(witems)); | |
}, | |
std::move(workitems)); | |
} | |
}; | |
template <typename ExPolicy_, typename R, typename Result1, | |
typename Result2, typename ScanPartTag> | |
struct static_scan_partitioner | |
{ | |
template <typename ExPolicy, typename FwdIter, typename T, | |
typename F1, typename F2, typename F3, typename F4> | |
static R call(ExPolicy && policy, FwdIter first, | |
std::size_t count, T && init, F1 && f1, F2 && f2, F3 && f3, | |
F4 && f4) | |
{ | |
return static_scan_partitioner_async< | |
R, Result1, Result2, ScanPartTag | |
>::call( | |
std::forward<ExPolicy>(policy), | |
first, count, std::forward<T>(init), | |
std::forward<F1>(f1), std::forward<F2>(f2), | |
std::forward<F3>(f3), std::forward<F4>(f4)).get(); | |
} | |
}; | |
template <typename R, typename Result1, typename Result2, | |
typename ScanPartTag> | |
struct static_scan_partitioner< | |
execution::parallel_task_policy, R, Result1, Result2, ScanPartTag> | |
{ | |
template <typename ExPolicy, typename FwdIter, typename T, | |
typename F1, typename F2, typename F3, typename F4> | |
static hpx::future<R> call(ExPolicy && policy, | |
FwdIter first, std::size_t count, T && init, F1 && f1, | |
F2 && f2, F3 && f3, F4 && f4) | |
{ | |
return static_scan_partitioner_async< | |
R, Result1, Result2, ScanPartTag | |
>::call( | |
std::forward<ExPolicy>(policy), | |
first, count, std::forward<T>(init), | |
std::forward<F1>(f1), std::forward<F2>(f2), | |
std::forward<F3>(f3), std::forward<F4>(f4)); | |
} | |
}; | |
template <typename Executor, typename Parameters, typename R, | |
typename Result1, typename Result2, typename ScanPartTag> | |
struct static_scan_partitioner< | |
execution::parallel_task_policy_shim<Executor, Parameters>, | |
R, Result1, Result2, ScanPartTag> | |
: static_scan_partitioner<execution::parallel_task_policy, R, | |
Result1, Result2, ScanPartTag> | |
{}; | |
/////////////////////////////////////////////////////////////////////// | |
// ExPolicy: execution policy | |
// R: overall result type | |
// Result1: intermediate result type of first and second step | |
// Result2: intermediate result of the third step | |
// ScanPartTag: select appropriate policy of scan partitioner | |
// PartTag: select appropriate partitioner | |
template <typename ExPolicy, typename R, typename Result1, | |
typename Result2, typename ScanPartTag, typename PartTag> | |
struct scan_partitioner; | |
/////////////////////////////////////////////////////////////////////// | |
template <typename ExPolicy_, typename R, typename Result1, | |
typename Result2, typename ScanPartTag> | |
struct scan_partitioner<ExPolicy_, R, Result1, Result2, ScanPartTag, | |
parallel::traits::static_partitioner_tag> | |
{ | |
template <typename ExPolicy, typename FwdIter, typename T, | |
typename F1, typename F2, typename F3, typename F4> | |
static R call(ExPolicy && policy, FwdIter first, | |
std::size_t count, T && init, F1 && f1, F2 && f2, F3 && f3, | |
F4 && f4) | |
{ | |
return static_scan_partitioner< | |
typename hpx::util::decay<ExPolicy>::type, | |
R, Result1, Result2, ScanPartTag | |
>::call( | |
std::forward<ExPolicy>(policy), | |
first, count, std::forward<T>(init), | |
std::forward<F1>(f1), std::forward<F2>(f2), | |
std::forward<F3>(f3), std::forward<F4>(f4)); | |
} | |
}; | |
template <typename R, typename Result1, typename Result2, | |
typename ScanPartTag> | |
struct scan_partitioner<execution::parallel_task_policy, R, Result1, | |
Result2, ScanPartTag, parallel::traits::static_partitioner_tag> | |
{ | |
template <typename ExPolicy, typename FwdIter, typename T, | |
typename F1, typename F2, typename F3, typename F4> | |
static hpx::future<R> call(ExPolicy && policy, FwdIter first, | |
std::size_t count, T && init, F1 && f1, F2 && f2, F3 && f3, | |
F4 && f4) | |
{ | |
return static_scan_partitioner< | |
typename hpx::util::decay<ExPolicy>::type, | |
R, Result1, Result2, ScanPartTag | |
>::call( | |
std::forward<ExPolicy>(policy), | |
first, count, std::forward<T>(init), | |
std::forward<F1>(f1), std::forward<F2>(f2), | |
std::forward<F3>(f3), std::forward<F4>(f4)); | |
} | |
}; | |
template <typename Executor, typename Parameters, typename R, | |
typename Result1, typename Result2, typename ScanPartTag> | |
struct scan_partitioner< | |
execution::parallel_task_policy_shim<Executor, Parameters>, | |
R, Result1, Result2, ScanPartTag, | |
parallel::traits::static_partitioner_tag> | |
: scan_partitioner<execution::parallel_task_policy, R, Result1, | |
Result2, ScanPartTag, | |
parallel::traits::static_partitioner_tag> | |
{}; | |
template <typename Executor, typename Parameters, typename R, | |
typename Result1, typename Result2, typename ScanPartTag> | |
struct scan_partitioner< | |
execution::parallel_task_policy_shim<Executor, Parameters>, | |
R, Result1, Result2, ScanPartTag, | |
parallel::traits::auto_partitioner_tag> | |
: scan_partitioner<execution::parallel_task_policy, R, Result1, | |
Result2, ScanPartTag, | |
parallel::traits::auto_partitioner_tag> | |
{}; | |
template <typename Executor, typename Parameters, typename R, | |
typename Result1, typename Result2, typename ScanPartTag> | |
struct scan_partitioner< | |
execution::parallel_task_policy_shim<Executor, Parameters>, | |
R, Result1, Result2, ScanPartTag, | |
parallel::traits::default_partitioner_tag> | |
: scan_partitioner<execution::parallel_task_policy, R, Result1, | |
Result2, ScanPartTag, | |
parallel::traits::static_partitioner_tag> | |
{}; | |
/////////////////////////////////////////////////////////////////////// | |
template <typename ExPolicy, typename R, typename Result1, | |
typename Result2, typename ScanPartTag> | |
struct scan_partitioner<ExPolicy, R, Result1, | |
Result2, ScanPartTag, parallel::traits::default_partitioner_tag> | |
: scan_partitioner<ExPolicy, R, Result1, | |
Result2, ScanPartTag, parallel::traits::static_partitioner_tag> | |
{}; | |
} | |
/////////////////////////////////////////////////////////////////////////// | |
template <typename ExPolicy, typename R = void, typename Result1 = R, | |
typename Result2 = void, | |
typename ScanPartTag = scan_partitioner_normal_tag, | |
typename PartTag = typename parallel::traits::extract_partitioner< | |
typename hpx::util::decay<ExPolicy>::type | |
>::type> | |
struct scan_partitioner | |
: detail::scan_partitioner< | |
typename hpx::util::decay<ExPolicy>::type, R, Result1, | |
Result2, ScanPartTag, PartTag> | |
{}; | |
}}} | |
#endif |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment