| 
          #!/usr/bin/env python3 | 
        
        
           | 
          import subprocess | 
        
        
           | 
          import logging | 
        
        
           | 
          from itertools import chain | 
        
        
           | 
          
 | 
        
        
           | 
          logging.basicConfig(level=logging.INFO) | 
        
        
           | 
          
 | 
        
        
           | 
          
 | 
        
        
           | 
          def frr_execute(commands): | 
        
        
           | 
              arguments = list(chain.from_iterable(('-c', command) for command in commands)) | 
        
        
           | 
              process = None | 
        
        
           | 
          
 | 
        
        
           | 
              try: | 
        
        
           | 
                  logging.debug('Executing FRR commands: ' + ', '.join(commands)) | 
        
        
           | 
                  process = subprocess.run([FRR_VTYSH_PATH] + arguments, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | 
        
        
           | 
                  if process.returncode != 0: | 
        
        
           | 
                      raise subprocess.CalledProcessError(process.returncode, process.args, output=process.stdout, | 
        
        
           | 
                                                          stderr=process.stderr) | 
        
        
           | 
          
 | 
        
        
           | 
                  return [str(process.stdout), str(process.stderr)] | 
        
        
           | 
              except subprocess.CalledProcessError: | 
        
        
           | 
                  logging.warning('command execution failed: ' + ', '.join(commands)) | 
        
        
           | 
                  logging.warning('> process stdout: ' + str(process.stdout)) | 
        
        
           | 
                  logging.warning('> process stderr: ' + str(process.stderr)) | 
        
        
           | 
          
 | 
        
        
           | 
          
 | 
        
        
           | 
          def frr_verify(matcher): | 
        
        
           | 
              stdout, stderr = frr_execute(['show running-config']) | 
        
        
           | 
              return matcher in str(stdout) | 
        
        
           | 
          
 | 
        
        
           | 
          
 | 
        
        
           | 
          class BgpAttribute(): | 
        
        
           | 
              def __init__(self, name, address_family=None): | 
        
        
           | 
                  self.name = name | 
        
        
           | 
                  self.address_family = address_family | 
        
        
           | 
          
 | 
        
        
           | 
              def description(self): | 
        
        
           | 
                  return '{address_family} > {name}'.format(address_family=self.address_family or 'global', name=self.name) | 
        
        
           | 
          
 | 
        
        
           | 
              def configure(self, neighbor, params, invert=False): | 
        
        
           | 
                  cmd = 'neighbor {neighbor} {name} {param_string}'.format( | 
        
        
           | 
                      neighbor=neighbor, name=self.name, param_string=' '.join([str(param) for param in params])) | 
        
        
           | 
                  if invert: | 
        
        
           | 
                      cmd = 'no ' + cmd | 
        
        
           | 
          
 | 
        
        
           | 
                  return self.configure_raw(cmd) | 
        
        
           | 
          
 | 
        
        
           | 
              def configure_raw(self, cmd): | 
        
        
           | 
                  # Sanitize command | 
        
        
           | 
                  cmd = cmd.strip() | 
        
        
           | 
          
 | 
        
        
           | 
                  # Jump into configuration of BGP instance | 
        
        
           | 
                  cmds = [ | 
        
        
           | 
                      'configure terminal', | 
        
        
           | 
                      'router bgp {instance}'.format(instance=FRR_BGP_INSTANCE) | 
        
        
           | 
                  ] | 
        
        
           | 
          
 | 
        
        
           | 
                  # Jump into address family configuration if applicable | 
        
        
           | 
                  if self.address_family: | 
        
        
           | 
                      cmds.append('address-family {address_family}'.format(address_family=self.address_family)) | 
        
        
           | 
          
 | 
        
        
           | 
                  # Execute the prepared commands | 
        
        
           | 
                  frr_execute(cmds + [cmd]) | 
        
        
           | 
          
 | 
        
        
           | 
                  # Return the executed command | 
        
        
           | 
                  return cmd | 
        
        
           | 
          
 | 
        
        
           | 
          
 | 
        
        
           | 
          FRR_VTYSH_PATH = "/home/ppmathis/development/frr/vtysh/vtysh" | 
        
        
           | 
          FRR_BGP_INSTANCE = 65500 | 
        
        
           | 
          FRR_BGP_NEIGHBOR = '1.1.1.1' | 
        
        
           | 
          FRR_BGP_PEER_GROUP = 'PG-FUZZ' | 
        
        
           | 
          FRR_BGP_ATTRIBUTES = [ | 
        
        
           | 
              [BgpAttribute('advertisement-interval'), [10], [20]], | 
        
        
           | 
              [BgpAttribute('bfd'), [10, 100, 1000], [20, 200, 2000]], | 
        
        
           | 
              [BgpAttribute('capability dynamic'), [], None], | 
        
        
           | 
              [BgpAttribute('capability extended-nexthop'), [], None], | 
        
        
           | 
              [BgpAttribute('description'), ['TEST #1'], ['TEST #2']], | 
        
        
           | 
              [BgpAttribute('disable-connected-check'), [], None], | 
        
        
           | 
              [BgpAttribute('dont-capability-negotiate'), [], None], | 
        
        
           | 
              [BgpAttribute('ebgp-multihop'), [10], [20]], | 
        
        
           | 
              [BgpAttribute('enforce-multihop'), [], None], | 
        
        
           | 
              [BgpAttribute('local-as'), [100], [200]], | 
        
        
           | 
              [BgpAttribute('override-capability'), [], None], | 
        
        
           | 
              [BgpAttribute('passive'), [], None], | 
        
        
           | 
              [BgpAttribute('password'), ['HelloWorld'], ['GoodbyeWorld']], | 
        
        
           | 
              # [BgpAttribute('remote-as'), [100], [200]], | 
        
        
           | 
              # ^ FRR Restriction: Peers of a peer group may not have a different AS | 
        
        
           | 
              # [BgpAttribute('shutdown'), [], None], | 
        
        
           | 
              # ^ FRR Restriction: Peers of a shutdown peer group can not be unshutted | 
        
        
           | 
              [BgpAttribute('solo'), [], None], | 
        
        
           | 
              [BgpAttribute('timers'), [10, 100], [20, 200]], | 
        
        
           | 
              [BgpAttribute('timers connect'), [10], [20]], | 
        
        
           | 
              [BgpAttribute('ttl-security hops'), [10], [20]], | 
        
        
           | 
              [BgpAttribute('update-source'), ['1.1.1.1'], ['2.2.2.2']], | 
        
        
           | 
          
 | 
        
        
           | 
              [BgpAttribute('activate', 'ipv4 unicast'), [], None], | 
        
        
           | 
              [BgpAttribute('addpath-tx-all-paths', 'ipv4 unicast'), [], None], | 
        
        
           | 
              [BgpAttribute('addpath-tx-bestpath-per-AS', 'ipv4 unicast'), [], None], | 
        
        
           | 
              [BgpAttribute('allowas-in', 'ipv4 unicast'), [1], [2]], | 
        
        
           | 
              [BgpAttribute('allowas-in origin', 'ipv4 unicast'), [], None], | 
        
        
           | 
              [BgpAttribute('as-override', 'ipv4 unicast'), [], None], | 
        
        
           | 
              [BgpAttribute('attribute-unchanged', 'ipv4 unicast'), ['as-path', 'next-hop', 'med'], None], | 
        
        
           | 
              [BgpAttribute('capability orf prefix-list', 'ipv4 unicast'), ['both'], ['send']], | 
        
        
           | 
              [BgpAttribute('default-originate', 'ipv4 unicast'), [], None], | 
        
        
           | 
              [BgpAttribute('default-originate route-map', 'ipv4 unicast'), ['RM-DO1'], ['RM-DO2']], | 
        
        
           | 
              [BgpAttribute('distribute-list', 'ipv4 unicast'), ['DL1-IN', 'in'], ['DL2-IN', 'in']], | 
        
        
           | 
              [BgpAttribute('distribute-list', 'ipv4 unicast'), ['DL1-OUT', 'out'], ['DL2-OUT', 'out']], | 
        
        
           | 
              [BgpAttribute('filter-list', 'ipv4 unicast'), ['FL1-IN', 'in'], ['FL2-IN', 'in']], | 
        
        
           | 
              [BgpAttribute('filter-list', 'ipv4 unicast'), ['FL1-OUT', 'out'], ['FL2-OUT', 'out']], | 
        
        
           | 
              [BgpAttribute('maximum-prefix', 'ipv4 unicast'), [10, 11, 'restart', 1], [20, 22, 'restart', 2]], | 
        
        
           | 
              [BgpAttribute('next-hop-self', 'ipv4 unicast'), [], None], | 
        
        
           | 
              [BgpAttribute('next-hop-self force', 'ipv4 unicast'), [], None], | 
        
        
           | 
              [BgpAttribute('prefix-list', 'ipv4 unicast'), ['PL1-IN', 'in'], ['PL2-IN', 'in']], | 
        
        
           | 
              [BgpAttribute('prefix-list', 'ipv4 unicast'), ['PL1-OUT', 'out'], ['PL2-OUT', 'out']], | 
        
        
           | 
              [BgpAttribute('remove-private-AS', 'ipv4 unicast'), ['all', 'replace-AS'], None], | 
        
        
           | 
              [BgpAttribute('route-map', 'ipv4 unicast'), ['RM1-IN', 'in'], ['RM2-IN', 'in']], | 
        
        
           | 
              [BgpAttribute('route-map', 'ipv4 unicast'), ['RM1-OUT', 'out'], ['RM2-OUT', 'out']], | 
        
        
           | 
              # TODO: route-reflector-client (more complicated to check due to internal-only) | 
        
        
           | 
              [BgpAttribute('route-server-client', 'ipv4 unicast'), [], None], | 
        
        
           | 
              # TODO: send-community (more complicated to check due to implicit all) | 
        
        
           | 
              [BgpAttribute('soft-reconfiguration inbound', 'ipv4 unicast'), [], None], | 
        
        
           | 
              [BgpAttribute('unsuppress-map', 'ipv4 unicast'), ['RM1-US'], ['RM2-US']], | 
        
        
           | 
              [BgpAttribute('weight', 'ipv4 unicast'), [100], [200]], | 
        
        
           | 
          
 | 
        
        
           | 
              [BgpAttribute('nexthop-local unchanged', 'ipv6 unicast'), [], None], | 
        
        
           | 
          ] | 
        
        
           | 
          
 | 
        
        
           | 
          # Clone attributes of IPv4 unicast AFI for IPv6 unicast | 
        
        
           | 
          for attribute in FRR_BGP_ATTRIBUTES: | 
        
        
           | 
              if attribute[0].address_family == 'ipv4 unicast': | 
        
        
           | 
                  FRR_BGP_ATTRIBUTES.append([BgpAttribute(attribute[0].name, 'ipv6 unicast'), attribute[1], attribute[2]]) | 
        
        
           | 
          
 | 
        
        
           | 
          # Sort attributes by AFI followed by name | 
        
        
           | 
          FRR_BGP_ATTRIBUTES = sorted(FRR_BGP_ATTRIBUTES, key=lambda k: (k[0].address_family or '.', k[0].name)) | 
        
        
           | 
          
 | 
        
        
           | 
          # Check all attributes | 
        
        
           | 
          for attribute in FRR_BGP_ATTRIBUTES: | 
        
        
           | 
              # (Re-)initialize FRR testing environment | 
        
        
           | 
              frr_execute([ | 
        
        
           | 
                  'configure terminal', | 
        
        
           | 
                  'no router bgp {instance}'.format(instance=FRR_BGP_INSTANCE), | 
        
        
           | 
                  'router bgp {instance}'.format(instance=FRR_BGP_INSTANCE), | 
        
        
           | 
                  'no bgp default ipv4-unicast', | 
        
        
           | 
                  'neighbor {neighbor} peer-group'.format(neighbor=FRR_BGP_PEER_GROUP), | 
        
        
           | 
                  'neighbor {neighbor} peer-group {peer_group}'.format(neighbor=FRR_BGP_NEIGHBOR, peer_group=FRR_BGP_PEER_GROUP) | 
        
        
           | 
              ]) | 
        
        
           | 
          
 | 
        
        
           | 
              # Attempt to set peer-group specific attribute | 
        
        
           | 
              cmd_peer_group = attribute[0].configure(FRR_BGP_PEER_GROUP, attribute[1]) | 
        
        
           | 
              if not frr_verify(cmd_peer_group): | 
        
        
           | 
                  logging.info('result: \u2716 {attribute} (failed peer-group config: {command})' | 
        
        
           | 
                               .format(attribute=attribute[0].description(), command=cmd_peer_group)) | 
        
        
           | 
                  continue | 
        
        
           | 
          
 | 
        
        
           | 
              # Attempt to set peer specific attribute | 
        
        
           | 
              cmd_peer = None | 
        
        
           | 
              if attribute[2]: | 
        
        
           | 
                  cmd_peer = attribute[0].configure(FRR_BGP_NEIGHBOR, attribute[2]) | 
        
        
           | 
              else: | 
        
        
           | 
                  cmd_peer = attribute[0].configure(FRR_BGP_NEIGHBOR, attribute[1], invert=True) | 
        
        
           | 
          
 | 
        
        
           | 
              if not frr_verify(cmd_peer): | 
        
        
           | 
                  logging.info('result: \u2716 {attribute} (failed peer config: {command})' | 
        
        
           | 
                               .format(attribute=attribute[0].description(), command=cmd_peer)) | 
        
        
           | 
                  continue | 
        
        
           | 
          
 | 
        
        
           | 
              # Attempt to set peer-group specific attribute a second time | 
        
        
           | 
              attribute[0].configure_raw(cmd_peer_group) | 
        
        
           | 
              if not frr_verify(cmd_peer_group): | 
        
        
           | 
                  logging.info('result: \u2716 {attribute} (failed peer-group re-config: {command})' | 
        
        
           | 
                               .format(attribute=attribute[0].description(), command=cmd_peer_group)) | 
        
        
           | 
                  continue | 
        
        
           | 
          
 | 
        
        
           | 
              # Verify that peer-specific attribute still exists | 
        
        
           | 
              if not frr_verify(cmd_peer): | 
        
        
           | 
                  logging.info('result: \u2716 {attribute} (lost peer-specific attribute: {command})' | 
        
        
           | 
                               .format(attribute=attribute[0].description(), command=cmd_peer_group)) | 
        
        
           | 
                  continue | 
        
        
           | 
          
 | 
        
        
           | 
              # Everything looks good! | 
        
        
           | 
              logging.info('result: \u2714 {attribute}'.format(attribute=attribute[0].description())) |