Как мне перенести данные из эластичного поиска в Postgres?

#postgresql #elasticsearch

#postgresql #elasticsearch

Вопрос:

У меня огромный объем данных в elastic search, и я хочу написать скрипт для создания таблицы, соответствующей определенному индексу, и передачи всех данных в postgres.

Комментарии:

1. Взгляните на Logstash: elastic.co/guide/en/logstash/current /…

Ответ №1:

Неважно, я получил свой ответ. Что я сделал, так это

  1. создайте соединение с postgres и elastic search
  2. создать таблицу в postgresql
  3. храните данные порциями по 10 КБ в списке словаря.
  4. перенесите данные из этого списка словаря в postgresql, а затем очистите список для следующей итерации.
 import psycopg2
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
from collections import defaultdict
dict = defaultdict(list)

t_host = "localhost"
t_port = "9200"
t_dbname_ES = "companydatabase" #index
t_user = "elastic"
t_pw = "changeme"
client_ES = Elasticsearch([t_host],http_auth=(t_user, t_pw),port=t_port)

t_host = "localhost"
t_port = "5999"
t_dbname = "postgres"
t_user = "postgres"
t_pw = "postgres"
db_conn = psycopg2.connect(host=t_host, port=t_port, dbname=t_dbname, user=t_user, password=t_pw)
db_cursor = db_conn.cursor()

column_name_list = ["Address","Age","DateOfJoining","Designation","FirstName","Gender","Interests","LastName","MaritalStatus","Salary"]
column_type_list = ["text not null","integer","date","text","text","text","text","text","text","text","text","text","integer"]
table_name = 'sample_table2'  #table name to insert data into
column_names = ', '.join(column_name_list)
column_types = ", ".join(column_type_list)

#table creation
create_table_query = "CREATE TABLE {} (".format(table_name)
for i in range(len(column_name_list)):
    create_table_query  = column_name_list[i]
    create_table_query  = " "
    create_table_query  = column_type_list[i]
    if i != len(column_name_list) - 1:
        create_table_query  = ", "
create_table_query  = ");"
try:
    db_cursor.execute(create_table_query)
    db_conn.commit()
except psycopg2.Error as e:
    t_message = "Database error: "   e

#data insertion
s = Search(index=t_dbname_ES).using(client_ES).query("match_all")
total_documents = s.count() #total count of records in the index
count=0
for hit in s.scan(): #looping over all records one at a time
    count =1
    total_documents -=1
    for i in range(len(column_name_list)):  #appending the data fethed from document in a list of dictionary.
        dict[column_name_list[i]].append(hit[column_name_list[i]])

    if count==10000 or total_documents==0:   #appending data in postgres 10k records at a time
        insert_query = "INSERT INTO " table_name " ("   column_names   ")" " VALUES"
        for i in range(min(10000,count)):
            insert_query  = "("
            for j in range(len(column_name_list)):
                if j!=0:
                    insert_query =', '  "'" str(dict[column_name_list[j]][i]) "'"
                else:
                    insert_query ="'" str(dict[column_name_list[j]][i]) "'"
            insert_query  = "),"
        insert_query= insert_query[:-1]
        insert_query  = ";"

        for i in range(len(column_name_list)):  #making the list empty for next iteration of 10k records
            dict[column_name_list[i]]=[]

        try:
            db_cursor.execute(insert_query)
            db_conn.commit()
            count=0
        except psycopg2.Error as e:
            t_message = "Database error: "   e

db_cursor.close()
db_conn.close()