jkuhl-uni / git-annex-remote-zenodo

Use Zenodo as a special remote for git-annex
GNU General Public License v3.0
0 stars 1 forks source link

minimal special remote from scratch #2

Open adswa opened 2 months ago

adswa commented 2 months ago

@mih and I took a look at the code and zenodo's API and had the urge to create a minimal special remote from scratch to better understand why some parts of the code from the original authors is rather convoluted and slow (e.g., parsing through all available deposits or looping through all files or making many requests again and again).

Here's that code as FYI

#!/usr/bin/env python

from annexremote import Master
from annexremote import SpecialRemote
from annexremote import RemoteError

from os import environ
import sys
import requests

class ZenodoRemote(SpecialRemote):
    def __init__(self, annex: Master):
        super().__init__(annex)
        self._api_token = None
        self._deposit_info = None

    def listconfigs(self):
        return {'api': 'APIDESC', 'deposit_id': 'DEPOSITDESC'}

    # method to handle all the basic queries
    def query(
        self,
        query_method,
        url,
        parent_func=str,
        headers=None,
        data=None,
        stream=None,
    ):
        if headers is None:
            headers = {"Content-Type": "application/json"}

        # we use the same access key for all the queries.
        params = {'access_token': self.api_token}
        # depending on the query, some of the arguments might be null
        if query_method == 'get':
            # this is for when we want to retrieve and download the file
            if stream is True:
                request = requests.get(url, params=params, stream=True)
            else:
                request = requests.get(url, params=params)
        elif query_method == 'post':
            request = requests.post(url, params=params, json={},
                                    headers=headers)
        elif query_method == 'put':
            request = requests.put(url, params=params, json={}, data=data)
        elif query_method == 'head':
            request = requests.head(url, params=params)
        elif query_method == 'delete':
            request = requests.delete(url, params=params)
        else:
            raise RuntimeError('unsupported request')

        # informing the user of the currint state of the operation
        self.annex.debug("[info]: " + query_method + " operation finished in " + parent_func + ". The returned code: " + str(request.status_code))
        return request

    @property
    def api(self) -> str:
        if 'api' in self.info:
            return self.info['api']

        api_param = self.annex.getconfig('api')
        if not api_param:
            api_endpoint = 'https://zenodo.org/api'
        else:
            api_endpoint = api_param
        self.info['api'] = api_endpoint
        return api_endpoint

    @property
    def deposit_id(self) -> str:
        if 'deposit_id' in self.info:
            return self.info['deposit_id']

        deposit_id = self.annex.getconfig('deposit_id')
        self.info['deposit_id'] = deposit_id

    @property
    def api_token(self) -> str:
        if not self._api_token:
            self._api_token = environ['ZENODO_TOKEN']
        return self._api_token

    @property
    def deposition_info(self) -> dict:
        if self._deposit_info:
            return self._deposit_info

        r = self.query(
            'get',
            f'{self.api}/deposit/depositions/{self.deposit_id}',
            "deposition_info",
        )
        if r.status_code > 201:
            raise self._error_from_response(
                r, f'Could not retrieve info for deposit {self.deposit_id}'
            )

        self._deposit_info = r.json()
        return self._deposit_info

    @property
    def deposit_bucket(self) -> str:
        # this is the API endpoint for the bucket
        return self.deposition_info['links']['bucket']

    def _error_from_response(self, r, context: str) -> str:
        return RemoteError(
            f'{context}: HTTP{r.status_code} {r.json().get("message")}'
        )

    def _create_new_deposit(self):
        r = self.query(
            'post',
            f'{self.api}/deposit/depositions',
            "initremote",
        )
        if r.status_code > 201:
            raise self._error_from_response(
                r, 'Failed to create a new deposit')
        self._deposit_info = r.json()
        self.info['deposit_id'] = self._deposit_info['id']

    def initremote(self):
        api = self.annex.getconfig('api')
        if not self.deposit_id:
            self._create_new_deposit()
            self.annex.setconfig('deposit_id', self.deposit_id)
        if api:
            self.annex.setconfig('api', self.api)

    def prepare(self):
        # need to run the next to get them info the
        # 'info' output
        self.api
        self.deposit_id

    def transfer_store(self, key, path):
        with open(path, "rb") as fp:
            r = self.query(
                'put',
                # TODO possibly the /key is not even needed
                f"{self.deposit_bucket}/{key}",
                "transfer_store",
                data=fp,
            )
            if r.status_code > 201:
                raise self._error_from_response(
                    r, 'Failed to upload file')
            self.annex.setstate(
                key,
                f'{self.deposit_bucket.split("/")[-1]}/{key}',
            )

    def transfer_retrieve(self, key, filename):
        state = self.annex.getstate(key)
        q = self.query(
            'get',
            f'{self.api}/files/{state}',
            "transfer_retrieve",
            stream=True,
        )
        q.raise_for_status()
        with open(filename, "wb") as f:
            for chunk in q.iter_content():
                f.write(chunk)
        return

    def checkpresent(self, key):
        state = self.annex.getstate(key)
        if not state:
            return False
        r = self.query(
            'head',
            f'{self.api}/files/{state}',
            "checkpresent",
        )
        return False if r.status_code > 201 else True
        pass

    def remove(self, key):
        if self.deposition_info['submitted']:
            raise RemoteError(
                'Deposition is published, cannot remove files anymore')

        state = self.annex.getstate(key)
        q = self.query(
            'delete',
            f'{self.api}/files/{state}',
            "remove",
        )
        # At present, the API returns a 500 when deleting, but nevertheles
        # does the deletion.
        if q.status_code >= 300 and self.checkpresent(key):
            self._error_from_response(q, 'Failed to remove key')
        return

def main():
    # Redirect output to stderr to avoid messing up the protocol
    output = sys.stdout
    sys.stdout = sys.stderr

    master = Master(output)
    remote = ZenodoRemote(master)
    master.LinkRemote(remote)
    master.Listen()

if __name__ == "__main__":
    main()

One can run git-annex' testremote --fast against a zenodo special remote to get some basic testing.

jkuhl-uni commented 2 months ago

Thanks for sharing that! It's the first time for me to write a Special-Remote, so I'm happy for any help :) I know that performance is - for now - not great. I'll look through the code to find things that you did more efficiently. The loops through the deposits, that you mention, is needed, if I remember right, to find older versions of the data.