The tags features is not yet supported by this lib.
Given i need this feature for a project, for now, i just added a plugin.
Below the code.
Shall i fork and PR the mod ? (i don't know if the project still active)
import re
import requests
import json
from nextcloud import NextCloud as _Nextcloud
from nextcloud.api_wrappers import OCS_API_CLASSES
from nextcloud.api_wrappers.webdav import File, WebDAV
from nextcloud.base import WithRequester
from nextcloud.requester import WebDAVRequester, WebDAVResponse
from xml.etree import ElementTree as ET
# to simplify code parts with DAV requests
XML_REQUEST_PROPFIND_TEMPLATE = """<?xml version="1.0"?>
<d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns"
xmlns:nc="http://nextcloud.org/ns">
<d:prop>
{0}
</d:prop>
</d:propfind>
"""
def build_xml_propfind_query(oc_fields=[], d_fields=[]):
props_xml = ""
for field in d_fields:
props_xml += "<d:{} />".format(field)
for field in oc_fields:
props_xml += "<oc:{} />".format(field)
return XML_REQUEST_PROPFIND_TEMPLATE.format(props_xml)
"""
Stick new classes to nextcloud_api lib
"""
WebDAVResponse.METHODS_SUCCESS_CODES['POST'] = WebDAVResponse.METHODS_SUCCESS_CODES['PUT']
class WebDAVExtented(WebDAV):
def get_file_property(self, uid, path, field, tag='oc'):
get_file_prop_xpath = '{DAV:}propstat/d:prop/%s:%s' % (tag, field)
data = build_xml_propfind_query([field])
additional_url = uid
additional_url = "{}/{}".format(uid, path)
resp = self.requester.propfind(
additional_url=additional_url,
headers={"Depth": str(0)},
data=data)
response_data = resp.data
resp.data = None
if not resp.is_ok:
return resp
response_xml_data = ET.fromstring(response_data)
for xml_data in response_xml_data:
for prop in xml_data.findall(get_file_prop_xpath, File.xml_namespaces_map):
resp.data = prop.text
break
return resp
class NextCloud(_Nextcloud):
def __init__(self, endpoint, user, password, json_output=True):
super().__init__(endpoint, user, password, json_output=json_output)
webdav_requester = WebDAVRequester(endpoint, user, password)
functionality_classes = [
WebDAVExtented(webdav_requester, json_output=json_output),
SystemTags(webdav_requester, json_output=json_output),
SystemTagsRelation(webdav_requester, json_output=json_output,
client=self), # require to fetch both tag and files
]
self.functionality_classes += functionality_classes
for functionality_class in functionality_classes:
for potential_method in dir(functionality_class):
if(
potential_method.startswith('_')
or not callable(getattr(functionality_class, potential_method))
):
continue
setattr(self, potential_method, getattr(
functionality_class, potential_method))
"""
Define properties models
"""
class Prop():
def __init__(self, xml_name, json=None, default=None):
self.attr_name = self._xml_name_to_py_name(xml_name)
self.json_key = json
self.xml_key = xml_name
self.default_val = default
@staticmethod
def _xml_name_to_py_name(name):
return name.replace('-', '_')
@staticmethod
def _py_name_to_xml_name(name):
return name.replace('_', '-')
class DProp(Prop):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.type = 'd'
class OCProp(Prop):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.type = 'oc'
"""
Define generic result class
that could be common to File and Tag
"""
class AttributeCollection():
SUCCESS_STATUS = 'HTTP/1.1 200 OK'
COLLECTION_RESOURCE_TYPE = 'collection'
_attrs = []
_xml_namespaces_map = File.xml_namespaces_map
@property
def _fields(self):
return [v.attr_name for v in self._attrs]
@property
def _properties(self):
return [v.xml_key for v in self._attrs]
def __init__(self, xml_data):
self.href = xml_data.find('d:href', self._xml_namespaces_map).text
for attr in self._attrs:
setattr(self, attr.attr_name, None)
for propstat in xml_data.iter('{DAV:}propstat'):
if propstat.find('d:status', self._xml_namespaces_map).text != self.SUCCESS_STATUS:
continue
for xml_property in propstat.find('d:prop', self._xml_namespaces_map):
property_name = re.sub("{.*}", "", xml_property.tag)
if property_name not in self._properties:
continue
value = self._get_property_value(xml_property)
setattr(self, Prop._xml_name_to_py_name(property_name), value)
@classmethod
def _get_property_value(self, xml_property):
return xml_property.text
@classmethod
def default_get(cls, key_format='json', **kwargs):
vals = {getattr(v, "%s_key" % key_format): v.default_val
for v in cls._attrs
if getattr(v, "%s_key" % key_format, False)}
vals.update(kwargs)
return vals
@classmethod
def build_xml_propfind_query(cls, fields=None):
if not fields:
fields = [attr.xml_key for attr in cls._attrs]
return build_xml_propfind_query(oc_fields=fields)
@classmethod
def from_response(cls, resp, json_output=None, filtered=None):
if not resp.is_ok:
resp.data = None
return resp
response_data = resp.data
response_xml_data = ET.fromstring(response_data)
attr_datas = [cls(xml_data) for xml_data in response_xml_data]
if filtered and callable(filtered):
attr_datas = [attr_data for attr_data in attr_datas
if filtered(attr_data)]
resp.data = attr_datas if not json_output else [
attr_data.as_dict() for attr_data in attr_datas]
return resp
def as_dict(self):
attrs = [v.attr_name for v in self._attrs]
return {key: value
for key, value in self.__dict__.items()
if key in attrs}
"""
Define main processings
"""
class Tag(AttributeCollection):
_attrs = [
OCProp("id"),
OCProp("display-name", json="name", default='default_tag_name'),
OCProp("user-visible", json="userVisible", default=True),
OCProp("can-assign", json="canAssign", default=True),
OCProp("user-assignable", json="userAssignable", default=True)
]
class SystemTags(WithRequester):
API_URL = '/remote.php/dav/systemtags'
CREATED_CODE = 201
def __init__(self, *args, **kwargs):
super(SystemTags, self).__init__(*args)
self.json_output = kwargs.get('json_output')
def get_sytemtag(self, name, fields=None, json_output=None):
if not fields:
fields = Tag._fields
resp = self.requester.propfind(
data=Tag.build_xml_propfind_query(
set(['display-name', *fields])
)
)
if json_output is None:
json_output = self.json_output
return Tag.from_response(
resp,
json_output=json_output,
filtered=lambda t: t.display_name == name)
def get_systemtags(self, name=None):
"""
Get list of all tags
Returns: response with <list>Tag in data
"""
resp = self.requester.propfind(data=Tag.build_xml_propfind_query())
return Tag.from_response(resp, json_output=self.json_output)
def create_systemtag(self, name, **kwargs):
"""
Create a new system tag from name.
Returns requester response with file id as data
"""
data = Tag.default_get(name=name, **kwargs)
url = self.requester.get_full_url()
# i use requests because headers, can't be set in requester.post
# resp = self.requester.post(data=data)
res = requests.post(url, auth=self.requester.auth_pk,
data=json.dumps(data), headers={
"Content-Type": "application/json"
})
resp = self.requester.rtn(res)
if res.status_code == self.CREATED_CODE:
resp.data = int(res.headers['Content-Location'].split('/')[-1])
return resp
def delete_systemtag(self, name=None, tag_id=None):
"""
Delete systemtag
Args:
name (str): tag name
OR
tag_id (int): tag id
Returns response
"""
if not tag_id:
resp = self.get_sytemtag(name, ['id'],
json_output=False)
if resp.data:
tag_id = resp.data[0].id
if tag_id:
resp = self.requester.delete(url=str(tag_id))
return resp
class FluidArgumentsMixin():
"""
a stupid mixin to avoid code repetition
using dangerous getattr functions
with major counter side of loosing
some transaction infos
"""
def _arguments_get(self, varnames, vals):
if 'kwargs' in vals:
vals.update(vals['kwargs'])
ret = []
for varname in varnames:
val = vals.get(varname, None)
if val is None:
getter_func_name = '_default_get_%s' % varname
if hasattr(self, getter_func_name):
val = getattr(self, getter_func_name)(vals)
ret.append(val)
return ret
class SystemTagsRelation(WithRequester, FluidArgumentsMixin):
API_URL = '/remote.php/dav/systemtags-relations/files'
def __init__(self, *args, **kwargs):
super(SystemTagsRelation, self).__init__(*args)
self.json_output = kwargs.get('json_output')
self.client = kwargs.get('client')
def _get_fileid_from_path(self, uid, path):
""" Tricky function to fetch file """
resp = self.client.get_file_property(uid, path, 'fileid')
id_ = None
if resp.data:
id_ = int(resp.data)
return id_
def _get_systemtag_id_from_name(self, name):
resp = self.client.get_sytemtag(name, ['id'],
json_output=False)
tag_id = None
if resp.data:
tag_id = int(resp.data[0].id)
return tag_id
def _default_get_file_id(self, vals):
uid = vals.get('uid', None)
path = vals.get('path', None)
if not (uid and path):
raise ValueError("Insufficient infos about the file")
return self._get_fileid_from_path(uid, path)
def _default_get_tag_id(self, vals):
tag_name = vals.get('tag_name', None)
if not tag_name:
raise ValueError("Insufficient infos about the tag")
return self._get_systemtag_id_from_name(tag_name)
def get_systemtags_relation(self, file_id=None, **kwargs):
"""
Get all tags from a given file/folder
Args:
file_id (int): file id found from file object
OR
uid (str): user (to know from where fetch file)
path (str): path to file/folder
Returns:
"""
(file_id,) = self._arguments_get(['file_id'], locals())
data = Tag.build_xml_propfind_query()
resp = self.requester.propfind(additional_url=file_id,
data=data)
return Tag.from_response(resp, json_output=self.json_output)
def delete_systemtags_relation(self, file_id=None, tag_id=None, **kwargs):
"""
Delete a tag from a given file/folder
Args:
file_id (int): id found in file object
tag_id (int): id found in tag object
Returns:
"""
(file_id, tag_id) = self._arguments_get(['file_id', 'tag_id'], locals())
resp = self.requester.delete(
url="{}/{}".format(file_id, tag_id))
return resp
def add_systemtags_relation(self, file_id=None, tag_id=None, **kwargs):
"""
set a tag from a given file/folder
Args:
file_id (int): id found in file object
tag_id (int): id found in tag object
(if you didn't provided file_id)
uid (str): user (to know from where fetch file)
path (str): path to file/folder
(if you didn't provided tag_id)
tag_name (str): tag_name to search or create
Returns:
"""
(file_id, tag_id) = self._arguments_get(['file_id', 'tag_id'], locals())
if not tag_id and 'tag_name' in kwargs:
resp = self.client.create_systemtag(kwargs['tag_name'])
if not resp.is_ok:
return resp
tag_id = resp.data
if not file_id:
raise ValueError('No file found')
data = Tag.build_xml_propfind_query()
resp = self.requester.put(url="{}/{}".format(file_id, tag_id))
return resp
Hello,
The tags features is not yet supported by this lib. Given i need this feature for a project, for now, i just added a plugin. Below the code.
Shall i fork and PR the mod ? (i don't know if the project still active)