#postgresql #elasticsearch
#postgresql #elasticsearch
Вопрос:
У меня огромный объем данных в elastic search, и я хочу написать скрипт для создания таблицы, соответствующей определенному индексу, и передачи всех данных в postgres.
Комментарии:
1. Взгляните на Logstash: elastic.co/guide/en/logstash/current /…
Ответ №1:
Неважно, я получил свой ответ. Что я сделал, так это
- создайте соединение с postgres и elastic search
- создать таблицу в postgresql
- храните данные порциями по 10 КБ в списке словаря.
- перенесите данные из этого списка словаря в postgresql, а затем очистите список для следующей итерации.
import psycopg2
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
from collections import defaultdict
dict = defaultdict(list)
t_host = "localhost"
t_port = "9200"
t_dbname_ES = "companydatabase" #index
t_user = "elastic"
t_pw = "changeme"
client_ES = Elasticsearch([t_host],http_auth=(t_user, t_pw),port=t_port)
t_host = "localhost"
t_port = "5999"
t_dbname = "postgres"
t_user = "postgres"
t_pw = "postgres"
db_conn = psycopg2.connect(host=t_host, port=t_port, dbname=t_dbname, user=t_user, password=t_pw)
db_cursor = db_conn.cursor()
column_name_list = ["Address","Age","DateOfJoining","Designation","FirstName","Gender","Interests","LastName","MaritalStatus","Salary"]
column_type_list = ["text not null","integer","date","text","text","text","text","text","text","text","text","text","integer"]
table_name = 'sample_table2' #table name to insert data into
column_names = ', '.join(column_name_list)
column_types = ", ".join(column_type_list)
#table creation
create_table_query = "CREATE TABLE {} (".format(table_name)
for i in range(len(column_name_list)):
create_table_query = column_name_list[i]
create_table_query = " "
create_table_query = column_type_list[i]
if i != len(column_name_list) - 1:
create_table_query = ", "
create_table_query = ");"
try:
db_cursor.execute(create_table_query)
db_conn.commit()
except psycopg2.Error as e:
t_message = "Database error: " e
#data insertion
s = Search(index=t_dbname_ES).using(client_ES).query("match_all")
total_documents = s.count() #total count of records in the index
count=0
for hit in s.scan(): #looping over all records one at a time
count =1
total_documents -=1
for i in range(len(column_name_list)): #appending the data fethed from document in a list of dictionary.
dict[column_name_list[i]].append(hit[column_name_list[i]])
if count==10000 or total_documents==0: #appending data in postgres 10k records at a time
insert_query = "INSERT INTO " table_name " (" column_names ")" " VALUES"
for i in range(min(10000,count)):
insert_query = "("
for j in range(len(column_name_list)):
if j!=0:
insert_query =', ' "'" str(dict[column_name_list[j]][i]) "'"
else:
insert_query ="'" str(dict[column_name_list[j]][i]) "'"
insert_query = "),"
insert_query= insert_query[:-1]
insert_query = ";"
for i in range(len(column_name_list)): #making the list empty for next iteration of 10k records
dict[column_name_list[i]]=[]
try:
db_cursor.execute(insert_query)
db_conn.commit()
count=0
except psycopg2.Error as e:
t_message = "Database error: " e
db_cursor.close()
db_conn.close()