medic / cht-pipeline

Data models for transforming the raw data from Couch DB into models that can then be queried to build dashboards
GNU General Public License v3.0
1 stars 4 forks source link

BRAC - CHP Follow Up - Set up "PostgreSQL (BRAC UG)- Hierarchy" data source #64

Closed njuguna-n closed 2 months ago

njuguna-n commented 9 months ago

Mirror the data source but reading from the cht-sync database

lorerod commented 7 months ago

The problem with this data source:

The possible solutions: Option 1:

Option 2:

cc: @njuguna-n @witash @andrablaj

njuguna-n commented 7 months ago

I added this update from yesterday to the wrong ticket 🤦

I tested a theory I had that views based on incremental tables are not dropped when dbt is running. The theory turned out to be wrong but it does seem like they take a shorter period to be dropped and refreshed. I converted couchdb view into a model which is working well and does not take as much time as other incremental models such as impact_anc_danger_sign. I noted some views while being available were timing out when being queried. I will test whether once converted to incremental tables the performance issues will persist.

lorerod commented 7 months ago

I changed the couchdb model to be incremental and also added a primary key and indexes to improve performance. The primary key and indexes are not created on the Postgres database. @njuguna-n, do you know how to recreate the model? For incremental models to change configuration, I think we need to recreate the model or run a full refresh, maybe?

witash commented 7 months ago

apparently we do need to do a full refresh to recreate incremental models whenever the schema changes https://docs.getdbt.com/docs/build/incremental-models

This might be a moot point if converting everything to materialized incremental tables, but did some analysis of the slow tables and will just post it here for anyone interested. Here is the current query plan for impact_ancview_dangersign

 Gather  (cost=1000.00..591138.40 rows=304 width=204) (actual time=925.176..27172.666 rows=105108 loops=1)
   Workers Planned: 2
   Workers Launched: 2
   ->  Parallel Append  (cost=0.00..590108.00 rows=304 width=204) (actual time=9034.430..27054.405 rows=35036 loops=3)
         ->  Subquery Scan on "*SELECT* 1"  (cost=0.42..189961.73 rows=270 width=204) (actual time=344.933..10279.362 rows=33465 loops=3)
               ->  Subquery Scan on ancview_pregnancy  (cost=0.42..189959.03 rows=270 width=204) (actual time=344.929..10270.924 rows=33465 loops=3)
                     ->  Nested Loop  (cost=0.42..189956.33 rows=112 width=213) (actual time=344.919..10262.321 rows=33465 loops=3)
                           ->  Nested Loop  (cost=0.42..189918.95 rows=4 width=857) (actual time=344.661..7323.796 rows=881 loops=3)
                                 ->  Parallel Seq Scan on couchdb  (cost=0.00..189885.14 rows=4 width=821) (actual time=342.850..6762.851 rows=2111 loops=3)
                                       Filter: (((doc #>> '{fields,danger_signs}'::text[]) <> ''::text) AND ((doc ->> 'type'::text) = 'data_record'::text) AND ((doc ->> 'form'::text) = 'pregnancy'::text) AND (to_timestamp((((NULLIF((doc ->> 'reported_date'::text), ''::text))::bigint / 1000))::double precision) > '2016-05-26 09:54:29+00'::timestamp with time zone))
                                       Rows Removed by Filter: 418888
                                 ->  Index Scan using "81d3b09f024ed0ff9628965b627d2822" on contactview_metadata contact  (cost=0.42..8.44 rows=1 width=72) (actual time=0.236..0.237 rows=0 loops=6333)
                                       Index Cond: (uuid = (couchdb.doc #>> '{contact,_id}'::text[]))
                           ->  Seq Scan on configuration  (cost=0.00..8.38 rows=27 width=0) (actual time=0.005..0.068 rows=38 loops=2642)
                                 Filter: ((value ? 'lmp_calcs'::text) AND (key = 'anc'::text))
                                 Rows Removed by Filter: 152
         ->  Subquery Scan on "*SELECT* 2"  (cost=0.00..400144.74 rows=34 width=200) (actual time=13035.007..25153.407 rows=2356 loops=2)
               ->  Parallel Append  (cost=0.00..400144.40 rows=34 width=200) (actual time=13035.005..25151.012 rows=2356 loops=2)
                     ->  Subquery Scan on "*SELECT* 2_1"  (cost=0.00..201714.78 rows=3 width=200) (actual time=12660.836..12660.836 rows=0 loops=2)
                           ->  Parallel Seq Scan on couchdb couchdb_1  (cost=0.00..201714.75 rows=1 width=241) (actual time=12660.834..12660.834 rows=0 loops=2)
                                 Filter: (((doc #>> '{fields,danger_signs}'::text[]) IS NOT NULL) AND ((doc #>> '{fields,patient_age_in_years}'::text[]) <> ''::text) AND ((doc #>> '{fields,danger_signs}'::text[]) <> ''::text) AND ((doc ->> 'form'::text) = 'assessment'::text) AND (CASE WHEN ((doc ->> 'form'::text) = 'assessment'::text) THEN 'iccm'::text ELSE NULL::text END = 'anc'::text) AND ((NULLIF((doc #>> '{fields,patient_age_in_years}'::text[]), ''::text))::integer <= 5) AND (to_timestamp((((NULLIF((doc ->> 'reported_date'::text), ''::text))::bigint / 1000))::double precision) > '2016-05-26 09:54:29+00'::timestamp with time zone))
                                 Rows Removed by Filter: 631499
                     ->  Subquery Scan on "*SELECT* 1_1"  (cost=0.00..198429.46 rows=31 width=200) (actual time=924.381..24977.733 rows=4712 loops=1)
                           ->  Parallel Seq Scan on couchdb couchdb_2  (cost=0.00..198429.15 rows=13 width=241) (actual time=924.377..24973.722 rows=4712 loops=1)
                                 Filter: (((doc #>> '{fields,danger_signs}'::text[]) IS NOT NULL) AND ((doc #>> '{fields,danger_signs}'::text[]) <> ''::text) AND ((doc ->> 'form'::text) = ANY ('{pregnancy_visit,postnatal_care,immunization_follow_up}'::text[])) AND (CASE WHEN ((doc ->> 'form'::text) = 'pregnancy_visit'::text) THEN 'anc'::text WHEN ((doc ->> 'form'::text) = 'postnatal_care'::text) THEN 'pnc'::text WHEN ((doc ->> 'form'::text) = 'immunization_follow_up'::text) THEN 'imm'::text ELSE NULL::text END = 'anc'::text) AND (to_timestamp((((NULLIF((doc ->> 'reported_date'::text), ''::text))::bigint / 1000))::double precision) > '2016-05-26 09:54:29+00'::timestamp with time zone))
                                 Rows Removed by Filter: 1258286
 Planning Time: 1.035 ms
 JIT:
   Functions: 72
   Options: Inlining true, Optimization true, Expressions true, Deforming true
   Timing: Generation 15.014 ms, Inlining 190.426 ms, Optimization 1609.352 ms, Emission 1181.689 ms, Total 2996.480 ms
 Execution Time: 27185.991 ms

So the main problem is its scanning the couchdb table (which is the largest table) and testing the condition against every row; because impact_ancview_dangersing is unioning two other queries, its doing this twice, although its able to do both in parallel.

It has to table scan even when the incremental condition on reported would filter out almost every row, because. the reported column is not indexed. reported is defined separately on useview_pregnancy and useview_visit as to_timestamp((NULLIF(couchdb.doc ->> 'reported_date', '')::bigint / 1000)::double precision) AS reported. Because both of these tables are non-materialized views (and the intermediate views are also non materialized) it cannot be indexed directly, and because it is a function not a column of couchdb directly, the query planner can't use it on couchdb.

moving this calculated column to the couchdb table directly and indexing it changes the query plan to:

 Append  (cost=0.86..252761.52 rows=295 width=192) (actual time=86.148..5339.363 rows=1835 loops=1)
   ->  Subquery Scan on "*SELECT* 1"  (cost=0.86..12647.72 rows=131 width=204) (actual time=86.147..367.973 rows=1784 loops=1)
         ->  Subquery Scan on ancview_pregnancy  (cost=0.86..12646.41 rows=131 width=204) (actual time=86.144..367.739 rows=1784 loops=1)
               ->  Nested Loop  (cost=0.86..12645.10 rows=131 width=213) (actual time=86.140..367.517 rows=1784 loops=1)
                     ->  Nested Loop  (cost=0.86..12602.46 rows=1 width=805) (actual time=85.918..302.517 rows=8 loops=1)
                           ->  Index Scan using "188faf839e6a944e4a0ebb5a03ea9bda" on couchdb  (cost=0.43..12594.01 rows=1 width=769) (actual time=57.861..296.167 rows=58 loops=1)
                                 Index Cond: (reported > '2017-08-01 07:53:55+00'::timestamp with time zone)
                                 Filter: (((doc #>> '{fields,danger_signs}'::text[]) <> ''::text) AND ((doc ->> 'type'::text) = 'data_record'::text) AND ((doc ->> 'form'::text) = 'pregnancy'::text))
                                 Rows Removed by Filter: 12245
                           ->  Index Scan using "95572172d358924e8605c27113fa092e" on contactview_metadata contact  (cost=0.43..8.45 rows=1 width=72) (actual time=0.091..0.091 rows=0 loops=58)
                                 Index Cond: (uuid = (couchdb.doc #>> '{contact,_id}'::text[]))
                     ->  Seq Scan on configuration  (cost=0.00..40.35 rows=131 width=0) (actual time=0.024..0.273 rows=223 loops=8)
                           Filter: ((value ? 'lmp_calcs'::text) AND (key = 'anc'::text))
                           Rows Removed by Filter: 892
   ->  Subquery Scan on "*SELECT* 2"  (cost=2067.98..240112.32 rows=164 width=182) (actual time=4924.963..4971.240 rows=51 loops=1)
         ->  Gather  (cost=2067.98..240110.68 rows=164 width=182) (actual time=4924.960..4971.210 rows=51 loops=1)
               Workers Planned: 2
               Workers Launched: 2
               ->  Parallel Append  (cost=1067.98..239094.28 rows=163 width=182) (actual time=4906.918..4938.160 rows=17 loops=3)
                     ->  Subquery Scan on "*SELECT* 2_1"  (cost=2335.50..7574.61 rows=2 width=182) (actual time=133.225..133.229 rows=0 loops=1)
                           ->  Parallel Bitmap Heap Scan on couchdb couchdb_1  (cost=2335.50..7574.59 rows=1 width=223) (actual time=133.222..133.224 rows=0 loops=1)
                                 Recheck Cond: ((reported > '2017-08-01 07:53:55+00'::timestamp with time zone) AND (form = 'assessment'::text))
                                 Rows Removed by Index Recheck: 18
                                 Filter: (((doc #>> '{fields,danger_signs}'::text[]) IS NOT NULL) AND ((doc #>> '{fields,patient_age_in_years}'::text[]) <> ''::text) AND ((doc #>> '{fields,danger_signs}'::text[]) <> ''::text) AND (CASE WHEN ((doc ->> 'form'::text) = 'assessment'::text) THEN 'iccm'::text ELSE NULL::text END = 'anc'::text) AND ((NULLIF((doc #>> '{fields,patient_age_in_years}'::text[]), ''::text))::integer <= 5))
                                 Rows Removed by Filter: 1878
                                 ->  BitmapAnd  (cost=2335.50..2335.50 rows=1375 width=0) (actual time=46.379..46.381 rows=0 loops=1)
                                       ->  Bitmap Index Scan on "188faf839e6a944e4a0ebb5a03ea9bda"  (cost=0.00..240.22 rows=15439 width=0) (actual time=1.824..1.825 rows=12303 loops=1)
                                             Index Cond: (reported > '2017-08-01 07:53:55+00'::timestamp with time zone)
                                       ->  Bitmap Index Scan on cfbf7cdb8c12694da5a0dc9a6dc1870c  (cost=0.00..2095.03 rows=191813 width=0) (actual time=43.119..43.120 rows=188097 loops=1)
                                             Index Cond: (form = 'assessment'::text)
                     ->  Subquery Scan on "*SELECT* 1_1"  (cost=1067.98..231518.85 rows=162 width=182) (actual time=4862.507..4893.738 rows=17 loops=3)
                           ->  Parallel Bitmap Heap Scan on couchdb couchdb_2  (cost=1067.98..231517.23 rows=68 width=223) (actual time=4862.501..4893.716 rows=17 loops=3)
                                 Recheck Cond: (form = ANY ('{pregnancy_visit,postnatal_care,immunization_follow_up}'::text[]))
                                 Rows Removed by Index Recheck: 84453
                                 Filter: (((doc #>> '{fields,danger_signs}'::text[]) IS NOT NULL) AND ((doc #>> '{fields,danger_signs}'::text[]) <> ''::text) AND (CASE WHEN ((doc ->> 'form'::text) = 'pregnancy_visit'::text) THEN 'anc'::text WHEN ((doc ->> 'form'::text) = 'postnatal_care'::text) THEN 'pnc'::text WHEN ((doc ->> 'form'::text) = 'immunization_follow_up'::text) THEN 'imm'::text ELSE NULL::text END = 'anc'::text) AND (to_timestamp((((NULLIF((doc ->> 'reported_date'::text), ''::text))::bigint / 1000))::double precision) > '2017-08-01 07:53:55+00'::timestamp with time zone))
                                 Rows Removed by Filter: 34072
                                 Heap Blocks: exact=16545 lossy=11582
                                 ->  Bitmap Index Scan on cfbf7cdb8c12694da5a0dc9a6dc1870c  (cost=0.00..1067.94 rows=97953 width=0) (actual time=24.950..24.950 rows=102267 loops=1)
                                       Index Cond: (form = ANY ('{pregnancy_visit,postnatal_care,immunization_follow_up}'::text[]))
 Planning Time: 0.912 ms
 JIT:
   Functions: 58
   Options: Inlining false, Optimization false, Expressions true, Deforming true
   Timing: Generation 10.039 ms, Inlining 0.000 ms, Optimization 3.244 ms, Emission 60.423 ms, Total 73.705 ms
 Execution Time: 5342.035 ms

for a reported close to the maximum...which is not doing any table scans and is overall much better, but is still slow (~5 seconds with this plan vs. 30 before, on my local copy). so now something else is going on which is not obvious to me from the query plan...looking at the timings, there's about 4 seconds missing.

So. it might not be necessary to materialize ALL the views; if we materialize the root models, pull common columns like reported, form, type and patient_id, out of the json doc column, index them and then avoid redefining them subsequently in non-materialized views where they cannot be indexed.

But. even doing that there is some other performance issue that needs more investigation.

njuguna-n commented 7 months ago

@lorerod like @witash mentions we have to do a full refresh to pick up the new changes. There is no easy way to do this currently so I have created a ticket to make sure we look into it. In the mean time if you change the model to a view and then change it back it will be recreated with the new schema. Very hacky but it works 😄

njuguna-n commented 7 months ago

@witash that is great analysis and even if we are moving to incremental tables your main points still stand. There is definitely a lot of unoptimised querying currently that we can greatly improve. I will start by turning the couchdb model into a materialized view and seeing what impact that has on the other models.

lorerod commented 7 months ago

Thank you, @witash, for the detailed analysis of query performance; we need to do this detailed work for every query we have, starting from the root tables and going up. We also need to create indexes for the POSTGRES_TABLE. The one couchdb model reads from.

lorerod commented 7 months ago

I received some error logs on my local cht-sync. The logs are from PostgreSQL:

2024-04-11 20:21:24.030 UTC [3356] ERROR:  duplicate key value violates unique constraint "medic__id__rev_key"
2024-04-11 20:21:24.030 UTC [3356] DETAIL:  Key (_id, _rev)=(%{[doc][_id]}, %{[doc][_rev]}) already exists.
2024-04-11 20:21:24.030 UTC [3356] STATEMENT:  WITH pgrst_source AS (INSERT INTO "v1"."medic"("@timestamp", "@version", "_id", "_rev") SELECT "pgrst_body"."@timestamp", "pgrst_body"."@version", "pgrst_body"."_id", "pgrst_body"."_rev" FROM (SELECT $1 AS json_data) pgrst_payload, LATERAL (SELECT CASE WHEN json_typeof(pgrst_payload.json_data) = 'array' THEN pgrst_payload.json_data ELSE json_build_array(pgrst_payload.json_data) END AS val) pgrst_uniform_json, LATERAL (SELECT "@timestamp", "@version", "_id", "_rev" FROM json_to_recordset(pgrst_uniform_json.val) AS _("@timestamp" timestamp without time zone, "@version" text, "_id" text, "_rev" text) ) pgrst_body  RETURNING 1) SELECT '' AS total_result_set, pg_catalog.count(_postgrest_t) AS page_total, array[]::text[] AS header, ''::text AS body, nullif(current_setting('response.headers', true), '') AS response_headers, nullif(current_setting('response.status', true), '') AS response_status, '' AS response_inserted FROM (SELECT * FROM pgrst_source) _postgrest_t

@njuguna-n, does this make any sense? We have the unique key defined in v1.medic. Are the duplicates for updates? But at that level, if I understand it correctly, we are just inserting, right?

lorerod commented 7 months ago

Also, I could finally define the couchdb model to have the unique constraint {'columns': ['_id,' '_rev'], 'unique': True} as an index. Because defining the unique_key on the model (unique_key=['_id', '_rev']) did not create the unique_key in the Postgres table. And now I'm getting errors the following error when running dbt:

Database Error in model couchdb (models/root/couchdb.sql)
  duplicate key value violates unique constraint "fa11a0b69cc892e8cc69dbe1db73fd70"
  DETAIL:  Key (_id, _rev)=(763F8625-BEA2-60AC-A24C-0CFC15E9D5F6, 1-39d7f3f08b3eb1f8fec395aeda4bdc4c) already exists.
  compiled Code at target/run/pipeline/models/root/couchdb.sql

This also made me question the uniqueness of the key. I removed the unique index on the couchdb model to prevent dbt failures.

njuguna-n commented 7 months ago

The RDBMS instance seems to be full and that is affecting DBT runs. I will work on setting up a new Postgres instance using EKS

Image