Created
October 16, 2023 23:07
-
-
Save stefb69/623a806bd3d1e4b6a5fa00de686e41eb to your computer and use it in GitHub Desktop.
Script to replicate/migrate indexes from an elasticsearch cluster to an opensearch cluster
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
import socket | |
from elasticsearch import Elasticsearch, exceptions as es_exceptions, Transport as es_transport, ConnectionPool as es_connection_pool | |
from opensearchpy import OpenSearch, exceptions as os_exceptions, Transport as os_transport, ConnectionPool as os_connection_pool | |
import logging | |
from urllib3.connection import HTTPConnection | |
# Configuration de la journalisation | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
# Configuration des clusters | |
SRC_CLUSTER = { | |
'host': 'sourceserver', | |
'port': 9210 | |
} | |
DEST_CLUSTER = { | |
'host': 'destserver', | |
'port': 9200, | |
'http_auth': ('admin', 'admin'), # Authentification basique | |
'scheme': 'https', | |
'use_ssl': True, | |
'verify_certs': False, | |
} | |
class KeepAliveHTTPConnection(HTTPConnection): | |
def _new_conn(self): | |
self.source_address = self._get_hostport(self.host, self.port) | |
conn = super()._new_conn() | |
conn.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) | |
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60) | |
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 60) | |
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 100) | |
return conn | |
class es_KeepAliveConnectionPool(es_connection_pool): | |
ConnectionCls = KeepAliveHTTPConnection | |
class es_KeepAliveTransport(es_transport): | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.connection_pool_class = es_KeepAliveConnectionPool | |
class os_KeepAliveConnectionPool(os_connection_pool): | |
ConnectionCls = KeepAliveHTTPConnection | |
class os_KeepAliveTransport(os_transport): | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.connection_pool_class = os_KeepAliveConnectionPool | |
# Connexion aux clusters | |
# Connection to the clusters | |
try: | |
src_es = Elasticsearch([{'host': SRC_CLUSTER['host'], 'port': SRC_CLUSTER['port']}], | |
transport_class=es_KeepAliveTransport, | |
timeout=300) | |
except es_exceptions.ConnectionError as e: | |
logging.error(f"Connection error: {e}") | |
exit(1) | |
try: | |
dest_os = OpenSearch( | |
hosts=[{'host': DEST_CLUSTER['host'], 'port': DEST_CLUSTER['port']}], | |
http_auth=DEST_CLUSTER['http_auth'], | |
use_ssl=DEST_CLUSTER['use_ssl'], | |
verify_certs=DEST_CLUSTER['verify_certs'], | |
ssl_assert_hostname = False, | |
ssl_show_warn = False, | |
timeout=86400, | |
transport_class=os_KeepAliveTransport | |
) | |
except os_exceptions.ConnectionError as e: | |
logging.error(f"Connection error: {e}") | |
exit(1) | |
# Récupération de la liste des indices du cluster source et tri par ordre alphabétique | |
try: | |
src_indices = sorted(src_es.indices.get_alias(index="*").keys()) | |
except es_exceptions.ElasticsearchException as e: | |
logging.error(f"Erreur lors de la récupération des indices du cluster source : {e}") | |
exit(1) | |
# Récupération de la liste des indices du cluster de destination | |
try: | |
dest_indices = dest_os.indices.get_alias(index="*").keys() | |
except os_exceptions.OpenSearchException as e: | |
logging.error(f"Erreur lors de la récupération des indices du cluster de destination : {e}") | |
exit(1) | |
# Filtrage des indices qui nécessitent une reindexation | |
indices_to_reindex = [] | |
for index in src_indices: | |
src_count = src_es.count(index=index)['count'] | |
dest_count = 0 | |
if index in dest_indices: | |
dest_count = dest_os.count(index=index)['count'] | |
if dest_count < src_count: | |
indices_to_reindex.append(index) | |
i = 0 | |
# Reindexation des indices filtrés vers le cluster de destination | |
for index in indices_to_reindex: | |
i += 1 | |
logging.info(f"Reindexing {index}...") | |
body = { | |
"source": { | |
"remote": { | |
"host": f"http://{SRC_CLUSTER['host']}:{SRC_CLUSTER['port']}", | |
}, | |
"index": index | |
}, | |
"dest": { | |
"index": index | |
} | |
} | |
try: | |
dest_os.reindex(body=body, wait_for_completion=False) | |
logging.info(f"Démarrage de l'indexation de {index}.") | |
except os_exceptions.OpenSearchException as e: | |
logging.error(f"Erreur lors de la reindexation de {index} : {e}") | |
logging.info("Reindexation terminée.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment