Skip to content

Instantly share code, notes, and snippets.

@stefb69
Created October 16, 2023 23:07
Show Gist options
  • Save stefb69/623a806bd3d1e4b6a5fa00de686e41eb to your computer and use it in GitHub Desktop.
Save stefb69/623a806bd3d1e4b6a5fa00de686e41eb to your computer and use it in GitHub Desktop.
Script to replicate/migrate indexes from an elasticsearch cluster to an opensearch cluster
#!/usr/bin/python3
import socket
from elasticsearch import Elasticsearch, exceptions as es_exceptions, Transport as es_transport, ConnectionPool as es_connection_pool
from opensearchpy import OpenSearch, exceptions as os_exceptions, Transport as os_transport, ConnectionPool as os_connection_pool
import logging
from urllib3.connection import HTTPConnection
# Configuration de la journalisation
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Configuration des clusters
SRC_CLUSTER = {
'host': 'sourceserver',
'port': 9210
}
DEST_CLUSTER = {
'host': 'destserver',
'port': 9200,
'http_auth': ('admin', 'admin'), # Authentification basique
'scheme': 'https',
'use_ssl': True,
'verify_certs': False,
}
class KeepAliveHTTPConnection(HTTPConnection):
def _new_conn(self):
self.source_address = self._get_hostport(self.host, self.port)
conn = super()._new_conn()
conn.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60)
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 60)
conn.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 100)
return conn
class es_KeepAliveConnectionPool(es_connection_pool):
ConnectionCls = KeepAliveHTTPConnection
class es_KeepAliveTransport(es_transport):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.connection_pool_class = es_KeepAliveConnectionPool
class os_KeepAliveConnectionPool(os_connection_pool):
ConnectionCls = KeepAliveHTTPConnection
class os_KeepAliveTransport(os_transport):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.connection_pool_class = os_KeepAliveConnectionPool
# Connexion aux clusters
# Connection to the clusters
try:
src_es = Elasticsearch([{'host': SRC_CLUSTER['host'], 'port': SRC_CLUSTER['port']}],
transport_class=es_KeepAliveTransport,
timeout=300)
except es_exceptions.ConnectionError as e:
logging.error(f"Connection error: {e}")
exit(1)
try:
dest_os = OpenSearch(
hosts=[{'host': DEST_CLUSTER['host'], 'port': DEST_CLUSTER['port']}],
http_auth=DEST_CLUSTER['http_auth'],
use_ssl=DEST_CLUSTER['use_ssl'],
verify_certs=DEST_CLUSTER['verify_certs'],
ssl_assert_hostname = False,
ssl_show_warn = False,
timeout=86400,
transport_class=os_KeepAliveTransport
)
except os_exceptions.ConnectionError as e:
logging.error(f"Connection error: {e}")
exit(1)
# Récupération de la liste des indices du cluster source et tri par ordre alphabétique
try:
src_indices = sorted(src_es.indices.get_alias(index="*").keys())
except es_exceptions.ElasticsearchException as e:
logging.error(f"Erreur lors de la récupération des indices du cluster source : {e}")
exit(1)
# Récupération de la liste des indices du cluster de destination
try:
dest_indices = dest_os.indices.get_alias(index="*").keys()
except os_exceptions.OpenSearchException as e:
logging.error(f"Erreur lors de la récupération des indices du cluster de destination : {e}")
exit(1)
# Filtrage des indices qui nécessitent une reindexation
indices_to_reindex = []
for index in src_indices:
src_count = src_es.count(index=index)['count']
dest_count = 0
if index in dest_indices:
dest_count = dest_os.count(index=index)['count']
if dest_count < src_count:
indices_to_reindex.append(index)
i = 0
# Reindexation des indices filtrés vers le cluster de destination
for index in indices_to_reindex:
i += 1
logging.info(f"Reindexing {index}...")
body = {
"source": {
"remote": {
"host": f"http://{SRC_CLUSTER['host']}:{SRC_CLUSTER['port']}",
},
"index": index
},
"dest": {
"index": index
}
}
try:
dest_os.reindex(body=body, wait_for_completion=False)
logging.info(f"Démarrage de l'indexation de {index}.")
except os_exceptions.OpenSearchException as e:
logging.error(f"Erreur lors de la reindexation de {index} : {e}")
logging.info("Reindexation terminée.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment