inveniosoftware / docs-invenio-rdm

InvenioRDM docs
https://inveniordm.docs.cern.ch/
MIT License
25 stars 68 forks source link

Migration recipes for mappings and reindexing #696

Open slint opened 3 months ago

slint commented 3 months ago

Cold migration (with downtime)

Deploy the code and run the following:

invenio index destroy --yes-i-know
invenio index init
invenio rdm rebuild-all-indices

invenio-jobs changes:

invenio alembic upgrade

Live migration (no downtime)

Deploy the code in a separate environment and using the following script:

import json
import time
from datetime import datetime, timedelta
from textwrap import dedent

import humanize
from flask import current_app
from invenio_access.permissions import system_identity
from invenio_oaiserver.percolator import (
    PERCOLATOR_MAPPING,
    _build_percolator_index_name,
)
from invenio_rdm_records.proxies import current_rdm_records
from invenio_search.proxies import current_search, current_search_client
from invenio_search.utils import build_alias_name

def get_index_info(index):
    write_alias = build_alias_name(index)
    indices = current_search_client.indices.get_alias(name=write_alias, ignore=[404])
    if indices.get("status") == 404:
        return None, None, None
    assert len(indices) == 1
    index_name = list(indices.keys())[0]
    aliases_resp = current_search_client.indices.get_alias(index=index_name)
    read_aliases = [
        a for a in aliases_resp[index_name]["aliases"].keys() if a != write_alias
    ]
    return index_name, write_alias, read_aliases

def reindex(old_index_name, new_index_name):
    # Set replicas to 0
    print(f"Setting replicas to 0 for {new_index_name}")
    current_search_client.indices.put_settings(
        index=new_index_name,
        body={"index": {"number_of_replicas": 0}},
    )

    # Reindex all records (this will return a Task ID)
    print(f"Reindexing {old_index_name} to {new_index_name}")
    task = current_search_client.reindex(
        body={
            "source": {"index": old_index_name},
            "dest": {
                "index": new_index_name,
                "version_type": "external_gte",
            },
        },
        wait_for_completion=False,
    )
    print(
        f"Task ID for reindexing {old_index_name} to {new_index_name}: {task['task']}"
    )
    return task

def reindex_delta(old_index_name, new_index_name, since):
    total_docs = current_search_client.count(
        index=old_index_name,
        body={"query": {"range": {"updated": {"gte": since}}}},
    )["count"]
    print(
        f"Reindexing {old_index_name} to {new_index_name} since {since} ({total_docs} docs)"
    )
    task = current_search_client.reindex(
        body={
            "source": {
                "index": old_index_name,
                "query": {"range": {"updated": {"gte": since}}},
            },
            "dest": {
                "index": new_index_name,
                "version_type": "external_gte",
            },
        },
        wait_for_completion=False,
    )
    print(
        f"Task ID for reindexing {old_index_name} to {new_index_name}: {task['task']}"
    )
    return task

def get_last_updated_ts(index_name):
    res = current_search_client.search(
        index=index_name,
        body={"size": 0, "aggs": {"last_updated": {"max": {"field": "updated"}}}},
    )
    return res["aggregations"]["last_updated"]["value_as_string"]

def check_progress(task_id):
    progress = current_search_client.tasks.get(task_id=task_id)

    if not progress["completed"]:
        total = progress["task"]["status"]["total"]
        created = progress["task"]["status"]["created"]
        if total == 0 or created == 0:
            print("Reindexing in progress: no records reindexed yet.")
            return False
        percentage = round((created / total) * 100, 2)
        eta_seconds = (
            progress["task"]["running_time_in_nanos"]
            / created
            * (total - created)
            / 1_000_000_000
        )
        eta = datetime.now() + timedelta(seconds=eta_seconds)
        print(
            dedent(f"""\
        Reindexing in progress: {created}/{total} ({percentage}%) records reindexed.
        ETA: {humanize.naturaldelta(eta_seconds)} ({eta.isoformat()})
        """)
        )
        return False

    # Refresh the index
    index_name = progress["task"]["description"].split(" to ")[1][1:-1]
    total_time = progress["task"]["running_time_in_nanos"] / 1_000_000_000
    print(f"Reindexing completed in {humanize.naturaldelta(total_time)}")
    print(f"Refreshing {index_name}...")
    current_search_client.indices.refresh(index=index_name)
    print(f"Refreshed {index_name}")

    # Set replicas to 2
    print(f"Updating replicas for {index_name}")
    current_search_client.indices.put_settings(
        index=index_name,
        body={"index": {"number_of_replicas": 2}},
    )
    return True

def rollover_index(old_index, new_index):
    old_index_name, old_index_alias, read_aliases = get_index_info(old_index)
    new_index_name, new_index_alias, _ = get_index_info(new_index)

    # Update aliases
    alias_ops = []
    alias_ops.append({"remove": {"index": old_index_name, "alias": new_index_alias}})
    alias_ops.append({"add": {"index": new_index_name, "alias": new_index_alias}})
    for alias in read_aliases:
        # Skip aliases that are not part of the new index name
        if alias not in new_index_name:
            continue

        alias_ops.append({"remove": {"index": old_index_name, "alias": alias}})
        alias_ops.append({"add": {"index": new_index_name, "alias": alias}})
    current_search_client.indices.update_aliases(body={"actions": alias_ops})

def delete_old_index(old_index):
    old_index_name, _, _ = get_index_info(old_index)

    # Delete old index
    current_search_client.indices.delete(index=old_index_name)

def run_pre_deploy(old_index, new_index, custom_fields_cfg=None):
    old_index_name, old_index_alias, read_aliases = get_index_info(old_index)
    new_index_name, new_index_alias, _ = get_index_info(new_index)
    assert new_index_name is None, f"New index {new_index_name} already exists."

    # Create the new index
    (new_index_name, _), (new_index_alias, _) = current_search.create_index(
        index=new_index, create_write_alias=True
    )

    # Handle custom fields
    if custom_fields_cfg:
        custom_fields = current_app.config[custom_fields_cfg]
        properties = Mapping.properties_for_fields(None, custom_fields)
        current_search_client.indices.put_mapping(
            index=new_index_name,
            body={"properties": properties},
        )

    # Reindex all records
    task = reindex(old_index_name, new_index_name)
    while not check_progress(task["task"]):
        print("Waiting 10sec for reindexing to complete...")
        time.sleep(10)

def run_sync(old_index, new_index):
    old_index_name, old_index_alias, read_aliases = get_index_info(old_index)
    new_index_name, new_index_alias, _ = get_index_info(new_index)

    # Reindex all records since last update
    since = get_last_updated_ts(new_index_name)
    while True:
        task = reindex_delta(old_index_name, new_index_name, since)
        while not check_progress(task["task"]):
            print("Waiting 10sec for reindexing to complete...")
            time.sleep(10)

        # Refresh
        current_search_client.indices.refresh(index=new_index_name)

        # Check if there are newer documents
        new_index_latest = get_last_updated_ts(new_index_name)
        old_index_latest = get_last_updated_ts(old_index_name)
        if new_index_latest >= old_index_latest:
            print("No new documents to sync.")
            break

        print(f"More documents to sync: {new_index_latest} > {old_index_latest}")

        # Reindex since we started the current reindexing task
        since = new_index_latest
        # Give an opportunity to interrupt the sync
        print("Press Ctrl+C to stop the sync...")
        time.sleep(10)

def run_post_deploy(old_index, new_index):
    # Rollover the index
    rollover_index(old_index, new_index)

def update_records_percolator(index=None):
    index = index or current_app.config["OAISERVER_RECORD_INDEX"]
    percolator_index = _build_percolator_index_name(index)
    mapping_path = current_search.mappings[index]
    with open(mapping_path, "r") as body:
        mapping = json.load(body)
        mapping["mappings"]["properties"].update(PERCOLATOR_MAPPING["properties"])
        current_search_client.indices.create(index=percolator_index, body=mapping)
    # reindex all percolator queries from OAISets
    oaipmh_service = current_rdm_records.oaipmh_server_service
    oaipmh_service.rebuild_index(identity=system_identity)

Run the following commands:

#
# Affiliations
#
OLD_AFFILIATIONS_INDEX = "affiliations-affiliation-v1.0.0"
NEW_AFFILIATIONS_INDEX = "affiliations-affiliation-v2.0.0"

run_pre_deploy(OLD_AFFILIATIONS_INDEX, NEW_AFFILIATIONS_INDEX)

# Once code is deployed, rollover the index
run_post_deploy(OLD_AFFILIATIONS_INDEX, NEW_AFFILIATIONS_INDEX)

#
# Funders
#
OLD_FUNDERS_INDEX = "funders-funder-v1.0.0"
NEW_FUNDERS_INDEX = "funders-funder-v2.0.0"

run_pre_deploy(OLD_FUNDERS_INDEX, NEW_FUNDERS_INDEX)

# Once code is deployed, rollover the index
run_post_deploy(OLD_FUNDERS_INDEX, NEW_FUNDERS_INDEX)

#
# Names
#
OLD_NAMES_INDEX = "names-name-v1.0.0"
NEW_NAMES_INDEX = "names-name-v2.0.0"

run_pre_deploy(OLD_NAMES_INDEX, NEW_NAMES_INDEX)

# Once code is deployed, rollover the index
run_post_deploy(OLD_NAMES_INDEX, NEW_NAMES_INDEX)

#
# Communities
#
OLD_COMMUNITIES_INDEX = "communities-communities-v1.0.0"
NEW_COMMUNITIES_INDEX = "communities-communities-v2.0.0"

run_pre_deploy(
    OLD_COMMUNITIES_INDEX, NEW_COMMUNITIES_INDEX,
    custom_fields_cfg="COMMUNITIES_CUSTOM_FIELDS",
)

# Sync new and updated documents
run_sync(OLD_COMMUNITIES_INDEX, NEW_COMMUNITIES_INDEX)

# Once code is deployed, rollover the index
run_post_deploy(OLD_COMMUNITIES_INDEX, NEW_COMMUNITIES_INDEX)
# Run a last sync to make sure all documents are up-to-date
run_sync(OLD_COMMUNITIES_INDEX, NEW_COMMUNITIES_INDEX)

#
# Users
#
OLD_USERS_INDEX = "users-user-v2.0.0"
NEW_USERS_INDEX = "users-user-v3.0.0"

run_pre_deploy(OLD_USERS_INDEX, NEW_USERS_INDEX)

# Sync new and updated documents
run_sync(OLD_USERS_INDEX, NEW_USERS_INDEX)

# Once code is deployed, rollover the index
run_post_deploy(OLD_USERS_INDEX, NEW_USERS_INDEX)
# Run a last sync to make sure all documents are up-to-date
run_sync(OLD_USERS_INDEX, NEW_USERS_INDEX)

#
# Records
#
OLD_RECORDS_INDEX = "rdmrecords-records-record-v6.0.0"
NEW_RECORDS_INDEX = "rdmrecords-records-record-v7.0.0"

run_pre_deploy(OLD_RECORDS_INDEX, NEW_RECORDS_INDEX, custom_fields_cfg="RDM_CUSTOM_FIELDS")
update_records_percolator(index=NEW_RECORDS_INDEX)

# Sync new and updated documents
run_sync(OLD_RECORDS_INDEX, NEW_RECORDS_INDEX)

# Once code is deployed, rollover the index
run_post_deploy(OLD_RECORDS_INDEX, NEW_RECORDS_INDEX)
# Run a last sync to make sure all documents are up-to-date
run_sync(OLD_RECORDS_INDEX, NEW_RECORDS_INDEX)

#
# Update record view stats events template to add `is_machine`
#
from invenio_search.proxies import current_search
# Will update all templates (including the record view events)
list(current_search.put_templates())

# You'll also need to update the latest record views events index manually:
"""
PUT /events-stats-record-view-2024-09/_mapping
{
  "properties": {
    "is_machine": {
          "type": "boolean"
    }
  }
}
"""