Last active
May 8, 2019 14:44
-
-
Save hairyhum/523e07932d1f82004138fa7bb1c76290 to your computer and use it in GitHub Desktop.
Simple perf test for rabbitmq schema storage.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(schema_test). | |
-export([declare_queues/3, add_binding/3]). | |
-include_lib("amqp_client/include/amqp_client.hrl"). | |
% -include_lib("rabbit_common/include/rabbit.hrl"). | |
-define(QNAME, <<"foo">>). | |
-define(XNAME, <<"foo">>). | |
%% This module contains functions, which create RabbitMQ entities in parallel | |
%% processes. Can be used to measure schema database performance. | |
%% This function declares queues and binds them to exchange "foo" | |
%% PerProc - number of queues, sequentially created by each process | |
%% NProcs - number of parallel processes | |
%% Tag - a suffix to add to queue names. If run twice with the same tag - | |
%% new queues will not be created. | |
declare_queues(PerProc, NProcs, Tag) -> | |
Self = self(), | |
Pids = [spawn_link(fun() -> do_declare_queues(PerProc, N, Self, Tag) end) | |
|| N <- lists:seq(1, NProcs)], | |
collect_responses(Pids). | |
do_declare_queues(PerProc, ProcN, Pid, Tag) -> | |
Exchange = #resource{virtual_host = <<"/">>, kind = exchange, name = <<"foo">>}, | |
Queue = #resource{virtual_host = <<"/">>, kind = queue}, | |
[begin | |
Name = <<"QUEUE_", (integer_to_binary(ProcN))/binary, "_", (integer_to_binary(N))/binary, Tag/binary>>, | |
rabbit_amqqueue:declare(Queue#resource{name = Name}, true, false, [], none, <<"none">>), | |
rabbit_binding:add(#binding{source = Exchange, destination = Queue#resource{name = Name}, key = <<"key">>}, <<"none">>) | |
end || | |
N <- lists:seq(1, PerProc)], | |
Pid ! {finished, self()}. | |
%% This function creates bindings between queue "foo" and exchange "foo" | |
%% PerProc - number of bindings, sequentially created by each process | |
%% NProcs - number of parallel processes | |
%% Tag - a suffix to add to routing keys. If run twice with the same tag - | |
%% new bindings will not be created. | |
add_bindings(PerProc, NProcs, Tag) -> | |
Self = self(), | |
Pids = [spawn_link(fun() -> do_add_bindings(PerProc, N, Self, Tag) end) | |
|| N <- lists:seq(1, NProcs)], | |
collect_responses(Pids). | |
do_add_bindings(PerProc, ProcN, Pid, Tag) -> | |
Resource = #resource{virtual_host = <<"/">>, name = <<"foo">>}, | |
[begin | |
RoutingKey = <<"RK_", (integer_to_binary(ProcN))/binary, "_", (integer_to_binary(N))/binary, Tag/binary>>, | |
Binging = #binding{source = Resource#resource{kind = exchange}, | |
destination = Resource#resource{kind = queue}, | |
key = RoutingKey}, | |
rabbit_binding:add(Binging, <<"none">>) | |
end || | |
N <- lists:seq(1, PerProc)], | |
Pid ! {finished, self()}. | |
collect_responses(Pids) -> collect_responses(Pids, []). | |
collect_responses([], Processed) -> | |
Processed; | |
collect_responses(Pids, Processed) -> | |
receive {finished, Pid} -> collect_responses(Pids -- [Pid], [Pid | Processed]) | |
after 500000 -> error(timeout) | |
end. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment