Created
July 26, 2017 08:46
-
-
Save li-ch/f46026975654e924852466d9eeb1f483 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
installation: | |
git clone [email protected]:heikoheiko/pydevp2p.git | |
git co sha3id | |
python setup.py develop | |
pip install statistics networkx matplotlib | |
usage: | |
cd devp2p/tests/ | |
python test_connections_stratgy.py <num_nodes> | |
you probably want to adjust `min_peer_options` and `klasses` | |
in the `main` function (end of file) | |
ls | |
implementing strategies: | |
inherit form CNodeBase and implement .setup_targets() | |
add your class to `klasses` in the `main` function. | |
""" | |
import time | |
import operator | |
import devp2p.kademlia | |
from test_kademlia_protocol import test_many, get_wired_protocol | |
from collections import OrderedDict | |
import random | |
import statistics | |
random.seed(42) | |
class CNodeBase(object): | |
""" | |
to implement your conenction strategy override | |
.select_targets (must) | |
.connect_peers | |
""" | |
k_max_node_id = devp2p.kademlia.k_max_node_id | |
def __init__(self, proto, network, min_peers=5, max_peers=10): | |
self.proto = proto | |
self.network = network | |
self.min_peers = min_peers | |
self.max_peers = max_peers | |
self.connections = [] | |
self.id = proto.this_node.id | |
# list of dict(address=long, tolerance=long, connected=(None or Node)) | |
# address is the id : long | |
self.targets = list() | |
@classmethod | |
def prepare_dht(cls, num_nodes): | |
return test_many(num_nodes) | |
def distance(self, other): | |
return self.id ^ other.id | |
def outbound(self): | |
"peers this node connected to" | |
return [n['connected'] for n in self.targets if n['connected']] | |
def inbound(self): | |
"peers that connected this node" | |
ob = self.outbound() | |
return [n for n in self.connections if n not in ob] | |
def receive_connect(self, other): | |
if len(self.connections) >= self.max_peers: | |
return False | |
else: | |
assert other not in self.connections | |
self.connections.append(other) | |
assert len(self.connections) <= self.max_peers | |
return True | |
def receive_disconnect(self, other): | |
assert other in self.connections | |
self.connections.remove(other) | |
for t in self.targets: | |
if t['connected'] == other: | |
t['connected'] = None | |
def _find_targets(self): | |
"call find node to fill buckets with addresses close to the target" | |
for t in self.targets: | |
self.proto.find_node(t['address']) | |
self.network.process() | |
def find_targets(self): | |
"call find node to fill buckets with addresses close to the target" | |
all = [n.proto for n in self.network.values()] | |
for t in self.targets: | |
self.proto.find_node(t['address']) | |
self.network.process() | |
def connect_peers(self, max_connects=0, random_within_distance=False, candidates=[]): | |
""" | |
random_within_distance: collect random node within distance form local buckets | |
candidates: directly specify a list of candidates | |
override to deal with situations where | |
- you enter the method and have not enough slots to conenct your targets | |
- your targets don't want to connect you | |
- targets are not within the tolerace | |
... | |
""" | |
assert self.targets | |
num_connected = 0 | |
# connect closest node to target id | |
for t in (t for t in self.targets if not t['connected']): | |
if len(self.connections) >= self.max_peers: | |
break | |
if candidates: | |
assert not random_within_distance | |
elif random_within_distance: | |
candidates = self.proto.routing.neighbours_within_distance( | |
t['address'], t['tolerance']) | |
random.shuffle(candidates) | |
else: | |
candidates = self.proto.routing.neighbours(t['address']) | |
for knode in candidates: | |
assert isinstance(knode, devp2p.kademlia.Node) | |
assert len(self.connections) < self.max_peers | |
# assure within tolerance | |
if knode.id_distance(t['address']) < t['tolerance']: | |
# make sure we are not connected yet | |
remote = self.network[knode.id] | |
if remote not in self.connections and remote != self: | |
if remote.receive_connect(self): | |
assert len(self.connections) < self.max_peers | |
t['connected'] = remote | |
self.connections.append(remote) | |
assert len(self.connections) <= self.max_peers | |
num_connected += 1 | |
if max_connects and num_connected == max_connects: | |
return num_connected | |
break | |
return num_connected | |
def setup_targets(self): | |
""" | |
calculate select target distances, addresses and tolerances | |
""" | |
for i in range(self.min_peers): | |
self.targets.append(dict(address=0, tolerance=0, connected=None)) | |
# NOT IMPLEMENTED HERE | |
class CNodeRandom(CNodeBase): | |
def setup_targets(self): | |
""" | |
connects random nodes | |
""" | |
for i in range(self.min_peers): | |
distance = random.randint(0, self.k_max_node_id) | |
address = (self.id + distance) % (self.k_max_node_id + 1) | |
tolerance = self.k_max_node_id / self.max_peers | |
self.targets.append(dict(address=address, tolerance=tolerance, connected=None)) | |
class CNodeRandomFast(CNodeRandom): | |
@classmethod | |
def prepare_dht(cls, num_nodes): | |
return [get_wired_protocol() for i in range(num_nodes)] | |
def find_targets(self): | |
self.candidates = [] | |
assert len(self.network.all_nodes) | |
tolerance = devp2p.kademlia.k_max_node_id / len(self.network.all_nodes) * 100 | |
for t in self.targets: | |
nodes = [n for n in self.network.all_nodes if n.id_distance(t['address']) < tolerance] | |
assert nodes | |
c = sorted(nodes, key=operator.methodcaller('id_distance', t['address']))[:4] | |
self.candidates.extend(c) | |
def connect_peers(self, max_connects, random_within_distance=False, candidates=[]): | |
return CNodeBase.connect_peers(self, max_connects, random_within_distance, self.candidates) | |
class CNodeFelixNoLimits(CNodeBase): | |
# https://github.com/ethereum/go-ethereum/wiki/RLPx-Peer-Selection-Proposal | |
""" | |
no id selection limits on incomming connections | |
""" | |
def connect_peers(self, max_connects=0): | |
return CNodeBase.connect_peers(self, max_connects=max_connects, random_within_distance=True) | |
def setup_targets(self): | |
""" | |
The 512 bit (256 bit) space is divided into N ranges. | |
The dialer maintains N slots, one for each range. | |
In order to find peers, it performs a node lookup with a random target | |
in reach range corresponding to an empty slot. | |
The results of the lookup are then dialed. | |
If dialing does not succeed for any node, another random lookup is | |
performed in that range. | |
""" | |
slot_width = self.k_max_node_id / self.min_peers | |
for i in range(self.min_peers): | |
distance = int((i + 0.5) * slot_width) | |
tolerance = slot_width / 2 | |
address = (self.id + distance) % (self.k_max_node_id + 1) | |
assert isinstance(address, long), address | |
self.targets.append(dict(address=address, tolerance=tolerance, connected=None)) | |
class CNodeRandomSelfLookup(CNodeBase): | |
""" | |
As proposed by Alex: | |
node joining the network | |
doing a self lookup | |
selects random peers from the returned peers | |
note: all repsonding nodes probably know the complete network, | |
but this is sensible, assuming they are way longer in the network | |
limits: if a node does not accept the connection, there is no backup (rarely) | |
""" | |
def setup_targets(self): | |
bootstrap_node = random.choice(list(self.proto.routing)) | |
# reset routing table | |
self.proto.routing = devp2p.kademlia.RoutingTable(self.proto.this_node) | |
self.proto.bootstrap([bootstrap_node]) | |
self.network.process() | |
selected = set() | |
for i in range(self.min_peers): | |
while True: | |
target = random.choice(list(self.proto.routing)) | |
if target not in selected: | |
selected.add(target) | |
break | |
self.targets.append(dict(address=target.id, tolerance=1, connected=None)) | |
class CNodeSelfOtherSide(CNodeBase): | |
""" | |
Daniel: | |
Thus, the self-lookup finds (enough) nodes that are closer to self than the bootstrap node, | |
while the "other side" lookup makes sure that it has neighbors that are sufficiently far. | |
These two lookups are both necessary and sufficient to make sure that the node becomes | |
"well connected", i.e. connected to enough nodes at every distance level. | |
Nodes must have enough neighbors both near and far in order to have short (i.e. O(log N) | |
length) path to every node in the network with a centrality roughly equaling that of | |
all other nodes. | |
""" | |
def setup_targets(self): | |
bootstrap_node = random.choice(list(self.proto.routing)) | |
# reset routing table | |
self.proto.routing = devp2p.kademlia.RoutingTable(self.proto.this_node) | |
# lookup self | |
self.proto.bootstrap([bootstrap_node]) | |
self.network.process() | |
# lookup not self | |
other_side_id = self.id ^ 0 | |
self.proto.find_node(other_side_id) | |
self.network.process() | |
selected = set() | |
for i in range(self.min_peers): | |
while True: | |
target = random.choice(list(self.proto.routing)) | |
if target not in selected: | |
selected.add(target) | |
break | |
self.targets.append(dict(address=target.id, tolerance=1, connected=None)) | |
class CNodeKademlia(CNodeBase): | |
def setup_targets(self): | |
""" | |
connects nodes according to a dht routing | |
""" | |
distance = self.k_max_node_id | |
for i in range(self.min_peers): | |
distance /= 2 | |
address = (self.id + distance) % (self.k_max_node_id + 1) | |
tolerance = distance / 2 | |
self.targets.append(dict(address=address, tolerance=tolerance, connected=None)) | |
class CNodeKademliaRandom(CNodeKademlia): | |
def connect_peers(self, max_connects=0): | |
return CNodeBase.connect_peers(self, max_connects=max_connects, random_within_distance=True) | |
def analyze(network): | |
import networkx as nx | |
G = nx.Graph() | |
def weight(a, b): | |
""" | |
Weight of edge. In dot, | |
the heavier the weight, the shorter, straighter and more vertical the edge is. | |
For other layouts, a larger weight encourages the layout to make the edge length | |
closer to that specified by the len attribute. | |
""" | |
# same node is weight == 1 | |
return (1 - a.distance(b) / devp2p.kademlia.k_max_node_id) * 10 | |
for node in network.values(): | |
for r in node.connections: | |
G.add_edge(node, r, weight=weight(node, r)) | |
num_peers = [len(n.connections) for n in network.values()] | |
metrics = OrderedDict(num_nodes=len(network)) | |
metrics['max_peers'] = max(num_peers) | |
metrics['min_peers'] = min(num_peers) | |
metrics['avg_peers'] = statistics.mean(num_peers) | |
metrics['rsd_peers'] = statistics.stdev(num_peers) / statistics.mean(num_peers) | |
# calc shortests paths | |
# lower is better | |
if nx.is_connected(G): | |
print 'calculating avg_shortest_path' | |
avg_shortest_paths = [] | |
for node in G: | |
path_length = nx.single_source_shortest_path_length(G, node) | |
avg_shortest_paths.append(statistics.mean(path_length.values())) | |
metrics['avg_shortest_path'] = statistics.mean(avg_shortest_paths) | |
metrics['rsd_shortest_path'] = statistics.stdev( | |
avg_shortest_paths) / metrics['avg_shortest_path'] | |
try: | |
# Closeness centrality at a node is 1/average distance to all other nodes. | |
# higher is better | |
print 'calculating closeness centrality' | |
vs = nx.closeness_centrality(G).values() | |
metrics['min_closeness_centrality'] = min(vs) | |
metrics['avg_closeness_centrality'] = statistics.mean(vs) | |
metrics['rsd_closeness_centrality'] = statistics.stdev( | |
vs) / metrics['avg_closeness_centrality'] | |
# The load centrality of a node is the fraction of all shortest paths that | |
# pass through that node | |
# Daniel: | |
# I recommend calculating (or estimating) the centrality of each node and making sure that | |
# there are no nodes with much higher centrality than the average. | |
# lower is better | |
print 'calculating load centrality' | |
vs = nx.load_centrality(G).values() | |
metrics['max_load_centrality'] = max(vs) | |
metrics['avg_load_centrality'] = statistics.mean(vs) | |
metrics['rsd_load_centrality'] = statistics.stdev(vs) / metrics['avg_load_centrality'] | |
print 'calculating node_connectivity' | |
# higher is better | |
metrics['node_connectivity'] = nx.node_connectivity(G) | |
print 'calculating diameter' | |
# lower is better | |
metrics['diameter '] = nx.diameter(G) | |
except nx.exception.NetworkXError as e: | |
metrics['ERROR'] = -1 | |
return metrics | |
def draw(G, metrics=dict()): | |
import matplotlib.pyplot as plt | |
""" | |
dot - "hierarchical" or layered drawings of directed graphs. This is the default tool to use if edges have directionality. | |
neato - "spring model'' layouts. This is the default tool to use if the graph is not too large (about 100 nodes) and you don't know anything else about it. Neato attempts to minimize a global energy function, which is equivalent to statistical multi-dimensional scaling. | |
fdp - "spring model'' layouts similar to those of neato, but does this by reducing forces rather than working with energy. | |
sfdp - multiscale version of fdp for the layout of large graphs. | |
twopi - radial layouts, after Graham Wills 97. Nodes are placed on concentric circles depending their distance from a given root node. | |
circo - circular layout, after Six and Tollis 99, Kauffman and Wiese 02. This is suitable for certain diagrams of multiple cyclic structures, such as certain telecommunications networks. | |
""" | |
print 'layouting' | |
text = '' | |
for k, v in metrics.items(): | |
text += '%s: %.4f\n' % (k.ljust(max(len(x) for x in metrics.keys())), v) | |
print text | |
#pos = nx.graphviz_layout(G, prog='dot', args='') | |
pos = nx.spring_layout(G) | |
plt.figure(figsize=(8, 8)) | |
nx.draw(G, pos, node_size=20, alpha=0.5, node_color="blue", with_labels=False) | |
plt.text(0.02, 0.02, text, transform=plt.gca().transAxes) # , font_family='monospace') | |
plt.axis('equal') | |
outfile = 'network_graph.png' | |
plt.savefig(outfile) | |
print 'saved visualization to', outfile | |
plt.ion() | |
plt.show() | |
while True: | |
time.sleep(0.1) | |
def simulate(node_class, set_num_nodes=20, set_min_peers=7, set_max_peers=14): | |
print 'running simulation', node_class.__name__, \ | |
dict(num_nodes=set_num_nodes, min_peers=set_min_peers, max_peers=set_max_peers) | |
print 'bootstrapping discovery protocols' | |
kademlia_protocols = node_class.prepare_dht(set_num_nodes) | |
# create ConnectableNode instances | |
print 'executing connection strategy' | |
network = OrderedDict() # node.id -> Node | |
# .process executes all messages on the network | |
network.process = lambda: kademlia_protocols[0].wire.process(kademlia_protocols) | |
network.all_nodes = [p.this_node for p in kademlia_protocols] | |
# wrap protos in connectable nodes and map via network | |
for p in kademlia_protocols: | |
cn = node_class(p, network, min_peers=set_min_peers, max_peers=set_max_peers) | |
network[cn.id] = cn | |
print 'setup targets' | |
# setup targets | |
for cn in network.values(): | |
cn.setup_targets() | |
print 'lookup targets' | |
# lookup targets | |
for cn in network.values(): | |
cn.find_targets() | |
print 'connect peers' | |
# connect peers (one client per round may connect) | |
while True: | |
n_connects = 0 | |
for cn in network.values(): | |
n_connects += cn.connect_peers(max_connects=1) | |
if n_connects == 0: | |
break | |
metrics = analyze(network) | |
return metrics | |
def print_results(results=[]): | |
labels = results[0].keys() | |
print '\t'.join(labels) | |
f = lambda x: str(x) if isinstance(x, (int, str)) else ('%.4f' % x) # .replace('.', ',') | |
for r in results: | |
print '\t'.join(f(r.get(k, 'n/a')) for k in labels) | |
def main(num_nodes): | |
# strategies to test | |
klasses = [CNodeRandom, | |
CNodeSelfOtherSide, | |
CNodeKademliaRandom, | |
CNodeRandomSelfLookup, | |
CNodeFelixNoLimits] | |
klasses = [CNodeRandomFast] | |
# min_peer settings to test | |
min_peer_options = (6,) | |
print 'running %d simulations' % (len(min_peer_options) * len(klasses)) | |
results = [] | |
for min_peers in min_peer_options: | |
max_peers = min_peers * 2 | |
for node_class in klasses: | |
p = OrderedDict(node_class=node_class) | |
p.update(OrderedDict(set_num_nodes=num_nodes, set_min_peers=min_peers, | |
set_max_peers=max_peers)) | |
metrics = simulate(**p) | |
p.update(metrics) | |
p['node_class'] = p['node_class'].__name__ | |
print p | |
results.append(p) | |
print_results(results) | |
if __name__ == '__main__': | |
# import pyethereum.slogging | |
# pyethereum.slogging.configure(config_string=':debug') | |
import sys | |
if not len(sys.argv) == 2: | |
print 'usage:%s <num_nodes>' % sys.argv[0] | |
sys.exit(1) | |
num_nodes = int(sys.argv[1]) | |
main(num_nodes) | |
""" | |
todos: | |
colorize nodes being closest to 0, 1/4, 1/2, 3/4 of the id space | |
validate graph (assert bidirectional connections, max_peers satisfactions) | |
support ananlytics about nodes added to an established network | |
""" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment