datalad / datalad-registry

MIT License
0 stars 2 forks source link

"cron job" to keep dataset updated #112

Closed yarikoptic closed 9 months ago

yarikoptic commented 1 year ago

I sorted by "Last update" to get a few which seems never got analyzed but the sample one I went to exists and well

URL Dataset Commit Annex keys Annexed files in working tree Size of Annexed files in working tree (bytes)
https://github.com/anthfm/afm.git None     None None    
https://github.com/ReproNim/ReproTube.git None     None None    
https://github.com/dguibert/nur-packages.git None     None None    
https://github.com/adswa/ml-books-gitannex.git None     None None    
https://github.com/datalad-datasets/machinelearning-books.git None     None None    
https://github.com/anthfm/s3-test.git None     None None    
https://github.com/anthfm/dd2.git None     None None    
https://github.com/soichih/datalad-o3d-test.git None     None None    
https://github.com/TrueTube/Andriy_Popyk.git None

I even wonder if we should add some "Last update error" DB field to store information if it was some errored out execution and add it as the last column to web ui table to ease troubleshooting... WDYT?

candleindark commented 1 year ago

I am aware of this. Last time I checked, the logic in handling update was not fully implemented. The idea of using a central dispatcher with an dedicated queue I mentioned today should solve the problem, but I may use a more expedient solution for now.

yarikoptic commented 1 year ago

This is suggested changes to the model which should IMHO facilitate all desired operations for checks more or less efficiently

❯ git diff
diff --git a/datalad_registry/models.py b/datalad_registry/models.py
index 75c0faf..b9748d9 100644
--- a/datalad_registry/models.py
+++ b/datalad_registry/models.py
@@ -15,13 +15,20 @@ class URL(db.Model):  # type: ignore
     annex_key_count = db.Column(db.Integer)
     annexed_files_in_wt_count = db.Column(db.Integer)
     annexed_files_in_wt_size = db.Column(db.BigInteger)
-    info_ts = db.Column(db.DateTime(timezone=True))
-    update_announced = db.Column(db.Boolean, default=False, nullable=False)
     head = db.Column(db.Text)
     head_describe = db.Column(db.Text)
+    head_dt = db.Column(db.DateTime(timezone=True))
     branches = db.Column(db.Text)
     tags = db.Column(db.Text)
     git_objects_kb = db.Column(db.BigInteger)
+
+    #: Last time the DB record was updated
+    last_update_dt = db.Column(db.DateTime(timezone=True))  # former info_ts
+    #: Last time we decided to check this URL. null at the beginning since never checked
+    last_check_dt = db.Column(db.DateTime(timezone=True), nullable=True)
+    #: Non-null value means there is a pending check request, with a dt on when it was requested.
+    #: If non-null set, and another request comes in, ignore new request - keep old check_request value
+    check_request_dt = db.Column(db.DateTime(timezone=True), default=None, nullable=True)  # former bool update_announced
+    #:  To increment if check failed for some reason, and reset if check succeeded
+    n_failed_checks = db.Column(db.Integer, default=0)
     #: Whether initial data has been collected for this URL
     processed = db.Column(db.Boolean, default=False, nullable=False)

Logic for #198 (update trigger):

if url.check_request_dt is not None:
   # as in the comment -- do nothing
else:
   url.check_request_dt = now()

Logic for regular updates -- do it regularly unless there is REALLY nothing todo:


# ideally this all should become configuration for the instance
# (in addition to those about "read-only")
check_minimal_time = 3600 # seconds -- so no checks for a URL
n_run_in_parallel = 10
sleep_waiting = 60
n_max_failed_checks = 10
while True:
    n_currently_running = len(celery.get_tasks(check_url))
    n_can_run = n_run_in_parallel - n_currently_running
    if not n_can_run:
        sleep(sleep_waiting)
        continue
    urls_to_check = sorted(
            [url for URLs if url.check_request_dt and url.processed],
            'check_request_dt')[:n_can_run]
    if not urls_to_check:
       urls_to_check = sorted(
             [url for URLs 
              if url.processed and url.n_failed_checks <= max_n_failed_checks and url.last_check_dt  and url.last_check_dt - now() > check_minimal_time
            ],
            'last_check_dt')[:n_can_run]
     if urls_to_check:
          for url in urls_to_check:
              url.last_check_dt = now()
          celery.map(check_url, urls_to_check) to complete
     else:
          sleep(sleep_waiting)

and in check_url increment .n_failed_checks if check fails for some reason, and reset to 0 if it succeeds.

yarikoptic commented 1 year ago

here is chatgpt produced code:

import time
from datetime import datetime
from sqlalchemy import create_engine, MetaData, Table, select
from celery import Celery

# Set up the Celery application
app = Celery('tasks', broker='pyamqp://guest@localhost//')

# Set up the SQLAlchemy engine
engine = create_engine('postgresql://user:password@localhost/dbname')

# SQLAlchemy models
metadata = MetaData()
URLs = Table('urls', metadata, autoload_with=engine)

check_minimal_time = 3600  # seconds -- so no checks for a URL
n_run_in_batch = 10
sleep_waiting = 60
n_max_failed_checks = 10

@app.task
def check_url(url):
    pass  # Implement your task here
    query = URLs.update().where(URLs.c.id == url.id).values(last_check_dt=datetime.now())
    connection.execute(query)

while True:
    # Get all running tasks
    inspect = app.control.inspect()
    active_tasks = inspect.active()
    n_currently_running = len([task for tasks in active_tasks.values() for task in tasks if task['name'] == 'tasks.check_url'])

    n_can_run = n_run_in_batch - n_currently_running

    # may be later after we figure out if all prior ones actually succeeded, not just "done"
    # which could be due to error or rate limit and then we do not want to increment
    #   may be check should be based on number of succeeded with the  last_update_dt within the e.g. 10*sleep_waiting
    #   thus getting it to a moving average, may be with some "extra" to stay positive.
    # if n_can_run == n_run_in_batch:
    #      n_run_in_batch = min(100, n_run_in_batch + 1)  # next time run more but no more than upper limit

    if not n_can_run:
        time.sleep(sleep_waiting)
        continue

    with engine.connect() as connection:
        # SQLAlchemy query
        query = select([URLs]).where(URLs.c.processed == True)
        urls = connection.execute(query).fetchall()

    urls_to_check = sorted(
            [url for url in urls if url.check_request_dt and url.processed],
            key=lambda url: url.check_request_dt)[:n_can_run]

    if not urls_to_check:
       urls_to_check = sorted(
             [url for url in urls 
              if url.processed and url.n_failed_checks <= n_max_failed_checks and url.last_check_dt  and (datetime.now() - url.last_check_dt).total_seconds() > check_minimal_time
            ],
            key=lambda url: url.last_check_dt)[:n_can_run]

    if urls_to_check:
        # Adding tasks to celery
        [check_url.delay(url, expires=60) for url in urls_to_check]
    time.sleep(sleep_waiting)
yarikoptic commented 1 year ago
here is the code avoiding while True loop ```python from datetime import datetime, timedelta from sqlalchemy import create_engine, MetaData, Table, select from celery import Celery, current_task from celery.schedules import crontab # Set up the Celery application app = Celery('tasks', broker='pyamqp://guest@localhost//') # Set up the SQLAlchemy engine engine = create_engine('postgresql://user:password@localhost/dbname') # SQLAlchemy models metadata = MetaData() URLs = Table('urls', metadata, autoload_with=engine) check_minimal_time = 3600 # seconds -- so no checks for a URL n_run_in_parallel = 10 n_max_failed_checks = 10 @app.task def check_url(url): pass # Implement your task here @app.task def schedule_url_checks(): # Get all running tasks inspect = app.control.inspect() active_tasks = inspect.active() n_currently_running = len([task for tasks in active_tasks.values() for task in tasks if task['name'] == 'tasks.check_url']) n_can_run = n_run_in_parallel - n_currently_running if not n_can_run: return with engine.connect() as connection: # SQLAlchemy query query = select([URLs]).where(URLs.c.processed == True) urls = connection.execute(query).fetchall() urls_to_check = sorted( [url for url in urls if url.check_request_dt and url.processed], key=lambda url: url.check_request_dt)[:n_can_run] if not urls_to_check: urls_to_check = sorted( [url for url in urls if url.processed and url.n_failed_checks <= n_max_failed_checks and url.last_check_dt and (datetime.now() - url.last_check_dt).total_seconds() > check_minimal_time ], key=lambda url: url.last_check_dt)[:n_can_run] if urls_to_check: for url in urls_to_check: # SQLAlchemy update query = URLs.update().where(URLs.c.id == url.id).values(last_check_dt=datetime.now()) connection.execute(query) # Adding tasks to celery [check_url.delay(url) for url in urls_to_check] # Configure periodic task app.conf.beat_schedule = { 'schedule-url-checks-every-60-seconds': { 'task': 'tasks.schedule_url_checks', 'schedule': 60.0, }, } ```
yarikoptic commented 1 year ago

re update: since some evil ppl could do "git push --force" and rewrite history we cannot just use git pull or even git pull --ff-only since it would not be guaranteed to work. I think the safest strategy would be to

try git fetch. If that is ok, means connection is good etc. Do git pull --ff-only and if that one fails, then rm cached copy, redo cloning from scratch for that URL. If git fetch fails -- announce check failed, and let above cron job to retry later.

You can use git commands to find out the current branch and the corresponding upstream remote branch dynamically. Here's how you can do it: ```shell # Fetch all the remotes git fetch || raise TryAgainLater # skip to the next attempt # either use ds.repo.call_git(['fetch']) or just ds.repo.fetch() . I believe both would be raising CommandError if failure. # Fetch information about default remote branch current_remote_branch=$(git ls-remote --symref --refs origin HEAD | awk '/ref:/{print $2;}' | sed -e 's,refs/heads/,,g') # if fails -- raise TryAgainLater # Get the name of the remote tracking branch remote_branch=$(git rev-parse --abbrev-ref --symbolic-full-name "@{u}") # if this fails -- our assumption is wrong and we need to fix the code. Raise exception to propagate # in python: # ❯ python -c 'from datalad.api import Dataset; ds=Dataset("."); print(ds.repo.call_git_oneline("rev-parse --abbrev-ref --symbolic-full-name @{u}".split()))' # gh-yarikoptic/temp-testbm if $current_remote_branch != $remote_branch: raise RedoThisUrlError # Do the merge git merge --ff-only "${remote_branch}" || raise RedoThisUrlError # if this did not result in any update -- we would need to update. but that logic is outside atm according to ito # ds.repo.call_git(["merge", "--ff-only", remote_branch]) # and capture CommandError, and log somehow the stdout stderr so we could inspect if desired # if isinstance(ds.repo, AnnexRepo): git annex merge # ds.repo.call_annex(['merge']) # if this fails -- bubble up since there should be no failure ``` The `git rev-parse --abbrev-ref --symbolic-full-name @{u}` command fetches the name of the remote tracking branch for the current branch. If there's no tracking information for the current branch, the command will fail. The git branch --show-current command gives the name of the currently checked out branch. This command is available in git version 2.22 and later. If you are on an older version of git, you can use git symbolic-ref --short HEAD or git rev-parse --abbrev-ref HEAD instead. This script assumes that your current branch is set up to track the correct upstream branch. If it's not, you'll need to set that up first using git branch -u . ```
candleindark commented 1 year ago

+ head_dt = db.Column(db.DateTime(timezone=True))

@yarikoptic is head_dt the authordate of the current HEAD?

yarikoptic commented 1 year ago

Yes

candleindark commented 9 months ago

This issue is closed by #238.