Rede-DSBR / PNP

Repositório do Projeto da Plataforma Nilo Peçanha
0 stars 0 forks source link

Coletar as fontes de dados do mundo do trabalho. #133

Closed bmarbueno closed 4 months ago

bmarbueno commented 8 months ago
marcelromano commented 8 months ago

Para a extração e sincronização das bases de dados do CAGED, proponho a utilização de um shell script que faça a tarefa (como exemplo do wget 1-liner abaixo) e uma sincronização para uma cloud utilizando o rclone:

wget -r -np -nH --cut-dirs=2 "ftp://ftp.mtps.gov.br/pdet/microdados/NOVO CAGED"

Script completo:

#!/bin/bash
PASTA="NOVO CAGED"
DEST="compilado"
#wget -r -np -nH --cut-dirs=2 "ftp://ftp.mtps.gov.br/pdet/microdados/NOVO CAGED"
for ano in "$PASTA"/*
do
    if [ -d "$ano" ]
    then
        for mes in "$ano"/*
        do
            if [ -d "$mes" ]
            then
                for zip in "$mes"/*.7z
                do
                    f=$(basename -- "$zip")
                    f="${f%.7z}"
                    if [ ! -f "$DEST/$f.txt.gz" ]
                    then
                        echo "Processando $f"
                        7za e "$zip" "-o$DEST"
                        gzip -9 "$DEST/${f%.7z}.txt"
                    fi
                done
            fi
        done
    fi
done
marcelromano commented 8 months ago

Podemos utilizar esse caso para teste da infraestrutura Azure, com o storage e disponibilização dos dados em lake.

damascenaluiz commented 8 months ago

Poderia ser uma oportunidade de testar o conector FTP e se não atender tentamos subir este script ou adaptar para alguma linguagem suportada em uma function.

bmarbueno commented 8 months ago

CAGED

Novo Caged

Arquivos de movimentação ("CAGEDMOV")

Anos: 2020 a 2023 31.697.827 linhas carregadas

rogerioluizsi commented 7 months ago

Junto com o @damascenaluiz começamos a desenvolver uma estratégia que pudesse fazer a ingestão incremental dos dados do CAGED de maneira automatizada no Azure data lake. O ADF da Azure mostrou-se inviável pela 1) complexidade das consultas recursivas e necessidade de descompactar os arquivos que estão em 7z (não realizado nativamente pelo Azure). Dessa forma partimos para a implementação de Azure functions aqui e aqui. Apesar de ambas as funções funcionarem localmente com teste feito no próprio CAGED e no ambiente simulado Azurite, em produção as coisas não funcionaram bem. Debruçando um pouco mais em um das funções percebi que parece ter um bloqueio para o FTP do CAGED a partir da Azure. O anexo mostra que a função tem acesso a internet e inclusive consegue acessar e listar os arquivos de um servidor FTP nos moldes do CAGED (speedtest que é público e tb escuta na porta 21), mas não cosegue nem mesmo dar um "ping" via conexão socket no CAGED na mesma porta. Irei avaliar se o mesmo ocorre a partir do ADF.

ftp_cged_error caged_ERRRO

rogerioluizsi commented 6 months ago

A conexão e o download a partir do Pipeline/Copy activity do Azure Data Factory funciona sem nenhum problema.

rogerioluizsi commented 6 months ago

Dados baixados, agregados conforme solicitado por @bmarbueno e disponibilizado pelo container do blob em compactado em gz.

Para acessá-los a partir do Power BI é necessário usar o powerquery com a seguinte consulta.

let Source = AzureStorage.Blobs("ACCOUNTNAME"), caged1 = Source{[Name="CONTAINER"]}[Data], FilteredRows = Table.SelectRows(caged1, each ([Name] = "<PATH/FILE.GZ>")), GetContent = FilteredRows{[#"Folder Path"="https://ACCOUNTNAME.blob.core.windows.net/caged/",Name="DIRECTORY/.gz"]}[Content], DecompressContent = Binary.Decompress(GetContent, Compression.GZip), ConvertToTable = Csv.Document(DecompressContent,[Delimiter=",", Columns=N_COLUMNS, Encoding=ENCODING, QuoteStyle=QuoteStyle.None]), PromotedHeaders = Table.PromoteHeaders(ConvertToTable, [PromoteAllScalars=true]) in PromotedHeaders

damascenaluiz commented 6 months ago

Terminada a adição de todos os anos ao Storage Account (2020/01-2024/02) convertidos em .gz através de function SevenZipTirigger implementada. Faltaria somente reexecutar as pipelines já criadas e/ou criar para anos sequentes. @bmarbueno e @rogerioluizsi. Os arquivos originais (7z) foram adicionados para camada de acesso frio para reduzir custos.

rogerioluizsi commented 6 months ago

Pipelines executados para todos os anos. Arquivos disponíveis no container [caged/output/<cagedaggregated.gz>]

rogerioluizsi commented 6 months ago

@damascenaluiz , O @bmarbueno percebeu que os dados agregados de 2023 tb possuem dados de 2024. Criei uma coluna que diz a partir de qual arquivo cada linha está sendo sendo criada. Observei que para algumas colunas indevidas referentes ao ano de 2024 nos dados de 2023 (ex; 202402) eram oriundas do mês 04 de 2023. Dessa forma imaginei que o problema pode ser na fonte dos dados ou no processo de conversão da AzureFunction SevenZipTirigger que está se confundindo com o 04.

damascenaluiz commented 6 months ago

Vou revisar a function, salvo engano foram executadas em momentos distantes diminuindo a possibilidade, mas acredito que atende a necessidade da forma como implementou. Outra possibilidade seria o @bmarbueno checar no arquivos originais se este problema acontece como citou.

rogerioluizsi commented 6 months ago

O @damascenaluiz corrigiu a entrada e o fluxo de dados foi executado novamente para o ano de 2023.

bmarbueno commented 3 months ago

Acesso aos dados resumidos:

let URL = "https://stpublicpnp.blob.core.windows.net/pnpdatasource/caged/caged_agg_2020.gz", BlobContent = Web.Contents(URL), DecompressedContent = Binary.Decompress(BlobContent, Compression.GZip), DecompressedText = Text.FromBinary(DecompressedContent), CSVTable = Csv.Document(DecompressedText, [Delimiter=";", Encoding=65001, QuoteStyle=QuoteStyle.None]), PromotedHeaders = Table.PromoteHeaders(CSVTable, [PromoteAllScalars=true]) in PromotedHeaders