I'm trying to read a POSTGRESQL dump file with INSERTs statements and convert it to COPY statements
I would like to know if this package could help me with this?
#!/usr/bin/env python3
import sys
import sqlparse
from sqlparse.sql import Identifier, Function, Values
from sqlparse.tokens import DML, Keyword
def main():
if len(sys.argv) != 3:
print('Uso: python pg_dump_converter.py entrada.sql saida.sql')
return
input_file_path = sys.argv[1]
output_file_path = sys.argv[2]
try:
with open(input_file_path, 'r', encoding='utf-8') as input_file, \
open(output_file_path, 'w', encoding='utf-8') as output_file:
process_file(input_file, output_file)
except FileNotFoundError:
print('O arquivo de entrada não existe.')
return
def process_file(input_file, output_file):
statements = sqlparse.parsestream(input_file)
current_table = None
columns = None
values_list = []
for statement in statements:
print(f'statement: {statement}')
if statement.get_type() == 'INSERT':
table_name, cols, values = process_insert_statement(statement)
if current_table != table_name or columns != cols:
# Escreve os dados acumulados anteriores
if current_table and values_list:
write_copy_statement(output_file, current_table, columns, values_list)
values_list = []
current_table = table_name
columns = cols
values_list.extend(values)
else:
# Escreve os dados acumulados antes de processar outras instruções
if current_table and values_list:
write_copy_statement(output_file, current_table, columns, values_list)
values_list = []
current_table = None
columns = None
output_file.write(str(statement).strip() + '\n')
# Escreve quaisquer dados restantes
if current_table and values_list:
write_copy_statement(output_file, current_table, columns, values_list)
def process_insert_statement(statement):
tokens = statement.tokens
table_name = ''
columns = []
values = []
idx = 0
while idx < len(tokens):
token = tokens[idx]
if token.ttype is DML and token.value.upper() == 'INSERT':
idx += 1 # Avança para o próximo token
continue
elif isinstance(token, Identifier):
table_name = token.get_name()
elif token.ttype is Keyword and token.value.upper() == 'VALUES':
# Coleta os valores
if idx + 1 < len(tokens) and isinstance(tokens[idx+1], Values):
values.extend(parse_values(tokens[idx+1]))
idx += 1 # Pula o token 'Values'
elif isinstance(token, sqlparse.sql.Parenthesis):
# Pode ser a lista de colunas ou valores
if not columns:
# Assume que é a lista de colunas
columns = [str(id).strip('"') for id in token.get_identifiers()]
else:
# Valores adicionais
values.extend(parse_values(token))
idx += 1
return table_name, columns, values
def parse_values(token):
values = []
if isinstance(token, Values):
for parenthesis in token.get_sublists():
values.append(parse_value_list(parenthesis))
elif isinstance(token, sqlparse.sql.Parenthesis):
values.append(parse_value_list(token))
return values
def parse_value_list(parenthesis):
value_list = []
for token in parenthesis.tokens:
if isinstance(token, sqlparse.sql.IdentifierList):
for id in token.get_identifiers():
value_list.append(process_value(id))
elif not token.is_whitespace and token.ttype != sqlparse.tokens.Punctuation:
value_list.append(process_value(token))
return value_list
def process_value(token):
value = token.value
value = value.strip()
if value.upper() == 'NULL':
return '\\N'
elif value.startswith("'") and value.endswith("'"):
# Remove as aspas e trata caracteres especiais
value = value[1:-1].replace("''", "'")
value = value.replace("\\", "\\\\").replace("\n", "\\n").replace("\r", "\\r")
return value
else:
return value
def write_copy_statement(output_file, table_name, columns, values_list):
output_file.write(f'COPY {table_name} ({", ".join(columns)}) FROM stdin;\n')
for values in values_list:
values_line = '\t'.join(values)
output_file.write(values_line + '\n')
output_file.write('\\.\n')
if __name__ == '__main__':
try:
import sqlparse
except ImportError:
print('A biblioteca sqlparse é necessária. Instale-a usando "pip install sqlparse".')
else:
main()
I'm trying to read a POSTGRESQL dump file with INSERTs statements and convert it to COPY statements I would like to know if this package could help me with this?