ooni / data

OONI Data CLI and Pipeline v5
https://docs.ooni.org/data
8 stars 4 forks source link

Refactor of schema for pipeline #97

Closed hellais closed 2 months ago

hellais commented 2 months ago
hellais commented 2 months ago

~I am going to run the change to alter the table to add the observation_idx column like so:~

ALTER TABLE obs_web ADD COLUMN observationidx UInt8 DEFAULT toInt8(arraySlice(splitByChar('', observation_id), -1, 1)[1]) + 1

ALTER TABLE obs_web MODIFY ORDER BY (measurement_start_time, hostname, probe_cc, probe_asn, measurement_uid, observation_idx)

The above will actually not do it, since the observation_id column was part of the primary key. I am going to instead go for a different approach which involves creating a new table, moving the data into the new table and then doing a rename of the old to the new.

codecov[bot] commented 2 months ago

Codecov Report

Attention: Patch coverage is 72.22222% with 45 lines in your changes missing coverage. Please review.

Project coverage is 82.24%. Comparing base (ff1fc86) to head (2a59271). Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
oonipipeline/src/oonipipeline/db/maintenance.py 31.42% 24 Missing :warning:
oonipipeline/src/oonipipeline/cli/commands.py 69.23% 16 Missing :warning:
...ine/src/oonipipeline/temporal/activities/common.py 60.00% 2 Missing :warning:
oonidata/src/oonidata/models/base.py 66.66% 1 Missing :warning:
...e/src/oonipipeline/temporal/activities/analysis.py 0.00% 1 Missing :warning:
...oonipipeline/transforms/measurement_transformer.py 80.00% 1 Missing :warning:
Additional details and impacted files ```diff @@ Coverage Diff @@ ## main #97 +/- ## ========================================== + Coverage 82.14% 82.24% +0.09% ========================================== Files 82 83 +1 Lines 6351 6347 -4 ========================================== + Hits 5217 5220 +3 + Misses 1134 1127 -7 ``` | [Flag](https://app.codecov.io/gh/ooni/data/pull/97/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=ooni) | Coverage Δ | | |---|---|---| | [oonidata](https://app.codecov.io/gh/ooni/data/pull/97/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=ooni) | `77.36% <66.66%> (-0.08%)` | :arrow_down: | | [oonipipeline](https://app.codecov.io/gh/ooni/data/pull/97/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=ooni) | `84.34% <72.32%> (+0.17%)` | :arrow_up: | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=ooni#carryforward-flags-in-the-pull-request-comment) to find out more.

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

hellais commented 2 months ago

In the end the table migrations were run using the following manual migration script (click show to expand)

show
    CREATE TABLE obs_web_new (
     measurement_uid String,
     observation_idx UInt16,
     input Nullable(String),
     report_id String,
     measurement_start_time Datetime64(3, 'UTC'),
     software_name String,
     software_version String,
     test_name String,
     test_version String,
     bucket_date String,
     probe_asn UInt32,
     probe_cc String,
     probe_as_org_name String,
     probe_as_cc String,
     probe_as_name String,
     network_type String,
     platform String,
     origin String,
     engine_name String,
     engine_version String,
     architecture String,
     resolver_ip String,
     resolver_asn UInt32,
     resolver_cc String,
     resolver_as_org_name String,
     resolver_as_cc String,
     resolver_is_scrubbed UInt8,
     resolver_asn_probe UInt32,
     resolver_as_org_name_probe String,
     created_at Nullable(Datetime('UTC')),
     target_id Nullable(String),
     hostname Nullable(String),
     transaction_id Nullable(UInt16),
     ip Nullable(String),
     port Nullable(UInt16),
     ip_asn Nullable(UInt32),
     ip_as_org_name Nullable(String),
     ip_as_cc Nullable(String),
     ip_cc Nullable(String),
     ip_is_bogon Nullable(UInt8),
     dns_query_type Nullable(String),
     dns_failure Nullable(String),
     dns_engine Nullable(String),
     dns_engine_resolver_address Nullable(String),
     dns_answer_type Nullable(String),
     dns_answer Nullable(String),
     dns_answer_asn Nullable(UInt32),
     dns_answer_as_org_name Nullable(String),
     dns_t Nullable(Float64),
     tcp_failure Nullable(String),
     tcp_success Nullable(UInt8),
     tcp_t Nullable(Float64),
     tls_failure Nullable(String),
     tls_server_name Nullable(String),
     tls_version Nullable(String),
     tls_cipher_suite Nullable(String),
     tls_is_certificate_valid Nullable(UInt8),
     tls_end_entity_certificate_fingerprint Nullable(String),
     tls_end_entity_certificate_subject Nullable(String),
     tls_end_entity_certificate_subject_common_name Nullable(String),
     tls_end_entity_certificate_issuer Nullable(String),
     tls_end_entity_certificate_issuer_common_name Nullable(String),
     tls_end_entity_certificate_san_list Array(String),
     tls_end_entity_certificate_not_valid_after Nullable(Datetime64(3, 'UTC')),
     tls_end_entity_certificate_not_valid_before Nullable(Datetime64(3, 'UTC')),
     tls_certificate_chain_length Nullable(UInt16),
     tls_certificate_chain_fingerprints Array(String),
     tls_handshake_read_count Nullable(UInt16),
     tls_handshake_write_count Nullable(UInt16),
     tls_handshake_read_bytes Nullable(UInt32),
     tls_handshake_write_bytes Nullable(UInt32),
     tls_handshake_last_operation Nullable(String),
     tls_handshake_time Nullable(Float64),
     tls_t Nullable(Float64),
     http_request_url Nullable(String),
     http_network Nullable(String),
     http_alpn Nullable(String),
     http_failure Nullable(String),
     http_request_body_length Nullable(UInt32),
     http_request_method Nullable(String),
     http_runtime Nullable(Float64),
     http_response_body_length Nullable(Int32),
     http_response_body_is_truncated Nullable(UInt8),
     http_response_body_sha1 Nullable(String),
     http_response_status_code Nullable(UInt16),
     http_response_header_location Nullable(String),
     http_response_header_server Nullable(String),
     http_request_redirect_from Nullable(String),
     http_request_body_is_truncated Nullable(UInt8),
     http_t Nullable(Float64),
     probe_analysis Nullable(String)
    )
    ENGINE = ReplacingMergeTree
    -- Partition by the month of the bucket
    PARTITION BY concat(substring(bucket_date, 1, 4), substring(bucket_date, 6, 2))
    PRIMARY KEY (measurement_uid, observation_idx)
    ORDER BY (measurement_uid, observation_idx, measurement_start_time, probe_cc, probe_asn)
    SETTINGS index_granularity = 8192;
INSERT INTO obs_web_new (
measurement_uid,
observation_idx,
input,
report_id,
measurement_start_time,
software_name,
software_version,
test_name,
test_version,
probe_asn,
probe_cc,
probe_as_org_name,
probe_as_cc,
probe_as_name,
network_type,
platform,
origin,
engine_name,
engine_version,
architecture,
resolver_ip,
resolver_asn,
resolver_cc,
resolver_as_org_name,
resolver_as_cc,
resolver_is_scrubbed,
resolver_asn_probe,
resolver_as_org_name_probe,
bucket_date,
created_at,
target_id,
hostname,
transaction_id,
ip,
port,
ip_asn,
ip_as_org_name,
ip_as_cc,
ip_cc,
ip_is_bogon,
dns_query_type,
dns_failure,
dns_engine,
dns_engine_resolver_address,
dns_answer_type,
dns_answer,
dns_answer_asn,
dns_answer_as_org_name,
dns_t,
tcp_failure,
tcp_success,
tcp_t,
tls_failure,
tls_server_name,
tls_version,
tls_cipher_suite,
tls_is_certificate_valid,
tls_end_entity_certificate_fingerprint,
tls_end_entity_certificate_subject,
tls_end_entity_certificate_subject_common_name,
tls_end_entity_certificate_issuer,
tls_end_entity_certificate_issuer_common_name,
tls_end_entity_certificate_san_list,
tls_end_entity_certificate_not_valid_after,
tls_end_entity_certificate_not_valid_before,
tls_certificate_chain_length,
tls_certificate_chain_fingerprints,
tls_handshake_read_count,
tls_handshake_write_count,
tls_handshake_read_bytes,
tls_handshake_write_bytes,
tls_handshake_last_operation,
tls_handshake_time,
tls_t,
http_request_url,
http_network,
http_alpn,
http_failure,
http_request_body_length,
http_request_method,
http_runtime,
http_response_body_length,
http_response_body_is_truncated,
http_response_body_sha1,
http_response_status_code,
http_response_header_location,
http_response_header_server,
http_request_redirect_from,
http_request_body_is_truncated,
http_t,
probe_analysis
)
SELECT
measurement_uid,
IF(
    observation_id = '', 1,
    toUInt16(arraySlice(splitByChar('_', observation_id), -1, 1)[1]) + 1
) as observation_idx,
input,
report_id,
measurement_start_time,
software_name,
software_version,
test_name,
test_version,
probe_asn,
probe_cc,
probe_as_org_name,
probe_as_cc,
probe_as_name,
network_type,
platform,
origin,
engine_name,
engine_version,
architecture,
resolver_ip,
resolver_asn,
resolver_cc,
resolver_as_org_name,
resolver_as_cc,
resolver_is_scrubbed,
resolver_asn_probe,
resolver_as_org_name_probe,
bucket_date,
created_at,
target_id,
hostname,
transaction_id,
ip,
port,
ip_asn,
ip_as_org_name,
ip_as_cc,
ip_cc,
ip_is_bogon,
dns_query_type,
dns_failure,
dns_engine,
dns_engine_resolver_address,
dns_answer_type,
dns_answer,
dns_answer_asn,
dns_answer_as_org_name,
dns_t,
tcp_failure,
tcp_success,
tcp_t,
tls_failure,
tls_server_name,
tls_version,
tls_cipher_suite,
tls_is_certificate_valid,
tls_end_entity_certificate_fingerprint,
tls_end_entity_certificate_subject,
tls_end_entity_certificate_subject_common_name,
tls_end_entity_certificate_issuer,
tls_end_entity_certificate_issuer_common_name,
tls_end_entity_certificate_san_list,
tls_end_entity_certificate_not_valid_after,
tls_end_entity_certificate_not_valid_before,
tls_certificate_chain_length,
tls_certificate_chain_fingerprints,
tls_handshake_read_count,
tls_handshake_write_count,
tls_handshake_read_bytes,
tls_handshake_write_bytes,
tls_handshake_last_operation,
tls_handshake_time,
tls_t,
http_request_url,
http_network,
http_alpn,
http_failure,
http_request_body_length,
http_request_method,
http_runtime,
http_response_body_length,
http_response_body_is_truncated,
http_response_body_sha1,
http_response_status_code,
http_response_header_location,
http_response_header_server,
http_request_redirect_from,
http_request_body_is_truncated,
http_t,
probe_analysis
FROM obs_web

CREATE TABLE obs_http_middlebox_new (
     measurement_uid String,
     observation_idx UInt16,
     input Nullable(String),
     report_id String,
     measurement_start_time Datetime64(3, 'UTC'),
     software_name String,
     software_version String,
     test_name String,
     test_version String,
     bucket_date String,
     probe_asn UInt32,
     probe_cc String,
     probe_as_org_name String,
     probe_as_cc String,
     probe_as_name String,
     network_type String,
     platform String,
     origin String,
     engine_name String,
     engine_version String,
     architecture String,
     resolver_ip String,
     resolver_asn UInt32,
     resolver_cc String,
     resolver_as_org_name String,
     resolver_as_cc String,
     resolver_is_scrubbed UInt8,
     resolver_asn_probe UInt32,
     resolver_as_org_name_probe String,
     created_at Nullable(Datetime64(3, 'UTC')),
     hirl_sent_0 Nullable(String),
     hirl_sent_1 Nullable(String),
     hirl_sent_2 Nullable(String),
     hirl_sent_3 Nullable(String),
     hirl_sent_4 Nullable(String),
     hirl_received_0 Nullable(String),
     hirl_received_1 Nullable(String),
     hirl_received_2 Nullable(String),
     hirl_received_3 Nullable(String),
     hirl_received_4 Nullable(String),
     hirl_failure Nullable(String),
     hirl_success Nullable(UInt8),
     hfm_diff Nullable(String),
     hfm_failure Nullable(String),
     hfm_success Nullable(UInt8)
    )
    ENGINE = ReplacingMergeTree
    PARTITION BY concat(substring(bucket_date, 1, 4), substring(bucket_date, 6, 2))
    PRIMARY KEY (measurement_uid, observation_idx)
    ORDER BY (measurement_uid, observation_idx, measurement_start_time)
    SETTINGS index_granularity = 8192;

INSERT INTO obs_http_middlebox_new (
measurement_uid,
observation_idx,
input,
report_id,
measurement_start_time,
software_name,
software_version,
test_name,
test_version,
probe_asn,
probe_cc,
probe_as_org_name,
probe_as_cc,
probe_as_name,
network_type,
platform,
origin,
engine_name,
engine_version,
architecture,
resolver_ip,
resolver_asn,
resolver_cc,
resolver_as_org_name,
resolver_as_cc,
resolver_is_scrubbed,
resolver_asn_probe,
resolver_as_org_name_probe,
bucket_date,
created_at,
hirl_sent_0,
hirl_sent_1,
hirl_sent_2,
hirl_sent_3,
hirl_sent_4,
hirl_received_0,
hirl_received_1,
hirl_received_2,
hirl_received_3,
hirl_received_4,
hirl_failure,
hirl_success,
hfm_diff,
hfm_failure,
hfm_success
)
SELECT
measurement_uid,
IF(
    observation_id = '', 1,
    toUInt16(arraySlice(splitByChar('_', observation_id), -1, 1)[1]) + 1
) as observation_idx,
input,
report_id,
measurement_start_time,
software_name,
software_version,
test_name,
test_version,
probe_asn,
probe_cc,
probe_as_org_name,
probe_as_cc,
probe_as_name,
network_type,
platform,
origin,
engine_name,
engine_version,
architecture,
resolver_ip,
resolver_asn,
resolver_cc,
resolver_as_org_name,
resolver_as_cc,
resolver_is_scrubbed,
resolver_asn_probe,
resolver_as_org_name_probe,
bucket_date,
created_at,
hirl_sent_0,
hirl_sent_1,
hirl_sent_2,
hirl_sent_3,
hirl_sent_4,
hirl_received_0,
hirl_received_1,
hirl_received_2,
hirl_received_3,
hirl_received_4,
hirl_failure,
hirl_success,
hfm_diff,
hfm_failure,
hfm_success
FROM obs_http_middlebox

CREATE TABLE obs_web_ctrl_new (
     measurement_uid String,
     observation_idx UInt16,
     input Nullable(String),
     report_id String,
     measurement_start_time Datetime64(3, 'UTC'),
     software_name String,
     software_version String,
     test_name String,
     test_version String,
     bucket_date String,
     hostname String,
     created_at Nullable(Datetime64(3, 'UTC')),
     ip String,
     port Nullable(UInt16),
     ip_asn Nullable(UInt32),
     ip_as_org_name Nullable(String),
     ip_as_cc Nullable(String),
     ip_cc Nullable(String),
     ip_is_bogon Nullable(UInt8),
     dns_failure Nullable(String),
     dns_success Nullable(UInt8),
     tcp_failure Nullable(String),
     tcp_success Nullable(UInt8),
     tls_failure Nullable(String),
     tls_success Nullable(UInt8),
     tls_server_name Nullable(String),
     http_request_url Nullable(String),
     http_failure Nullable(String),
     http_success Nullable(UInt8),
     http_response_body_length Nullable(Int32)
)
ENGINE = ReplacingMergeTree
PARTITION BY concat(substring(bucket_date, 1, 4), substring(bucket_date, 6, 2))
PRIMARY KEY (measurement_uid, observation_idx)
ORDER BY (measurement_uid, observation_idx, measurement_start_time, hostname)
SETTINGS index_granularity = 8192;

INSERT INTO obs_web_ctrl_new (
measurement_uid,
observation_idx,
input,
report_id,
measurement_start_time,
software_name,
software_version,
test_name,
test_version,
hostname,
bucket_date,
created_at,
ip,
port,
ip_asn,
ip_as_org_name,
ip_as_cc,
ip_cc,
ip_is_bogon,
dns_failure,
dns_success,
tcp_failure,
tcp_success,
tls_failure,
tls_success,
tls_server_name,
http_request_url,
http_failure,
http_success,
http_response_body_length
)
SELECT
measurement_uid,
IF(
    observation_id = '', 1,
    toUInt16(arraySlice(splitByChar('_', observation_id), -1, 1)[1]) + 1
) as observation_idx,
input,
report_id,
measurement_start_time,
software_name,
software_version,
test_name,
test_version,
hostname,
bucket_date,
created_at,
ip,
port,
ip_asn,
ip_as_org_name,
ip_as_cc,
ip_cc,
ip_is_bogon,
dns_failure,
dns_success,
tcp_failure,
tcp_success,
tls_failure,
tls_success,
tls_server_name,
http_request_url,
http_failure,
http_success,
http_response_body_length
FROM obs_web_ctrl