# -*- coding: utf-8 -*-
#
# Copyright 2020 Univention GmbH
#
# http://www.univention.de/
#
# All rights reserved.
#
# The source code of this program is made available
# under the terms of the GNU Affero General Public License version 3
# (GNU AGPL V3) as published by the Free Software Foundation.
#
# Binary versions of this program provided by Univention to you as
# well as other copyrighted, protected or trademarked materials like
# Logos, graphics, fonts, specific documentations and configurations,
# cryptographic keys etc. are subject to a license agreement between
# you and Univention and not subject to the GNU AGPL V3.
#
# In the case you use this program under the terms of the GNU AGPL V3,
# the program is provided in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License with the Debian GNU/Linux or Univention distribution in file
# /usr/share/common-licenses/AGPL-3; if not, see
# <http://www.gnu.org/licenses/>.
import copy
import logging
from abc import ABC
from typing import Any, AsyncIterator, Dict, Iterable, List, TypeVar
from urllib.parse import unquote
from .exceptions import InvalidRequest, NoObject
from .session import Session
KelvinObjectType = TypeVar("KelvinObjectType", bound="KelvinObject")
logger = logging.getLogger(__name__)
[docs]class KelvinObject(ABC):
_class_display_name = "Kelvin Object"
_kelvin_attrs = ["name", "ucsschool_roles", "udm_properties"]
def __init__(
self,
*,
name: str = None,
ucsschool_roles: List[str] = None,
udm_properties: Dict[str, Any] = None,
dn: str = None,
url: str = None,
session: Session = None,
language: str = None,
**kwargs,
):
self.name = name
self.ucsschool_roles = ucsschool_roles
self.udm_properties = udm_properties or {}
self.dn = dn
self.url = url
self.session = session
if language:
self.session.language = language
self._resource_class = KelvinResource
self._fresh = True
self._deleted = False
self._old_attrs = {}
self._update_old_attrs()
def __repr__(self):
req_attrs_vals = [f"{attr!r}={value!r}" for attr, value in self._required_get_attrs.items()]
if hasattr(self, "dn") and self.dn:
req_attrs_vals.append(f"dn={self.dn!r}")
return f"{self.__class__.__name__}({', '.join(req_attrs_vals)})"
[docs] async def reload(self) -> KelvinObjectType:
"""
Reload properties of object from the Kelvin API.
:raises ucsschool.kelvin.client.NoObject: if the object cannot be found
:return: self
"""
if self._deleted:
logger.warning(
"[%s] %s %s has been deleted! Trying to load it anyway...",
self.session.request_id[:10],
self._class_display_name,
self,
)
obj = await self._resource_class(session=self.session).get(**self._required_get_attrs)
for k, v in obj.as_dict().items():
setattr(self, k, v)
self._update_old_attrs()
self._fresh = True
return self
[docs] async def save(self) -> KelvinObjectType:
if self._deleted:
raise RuntimeError(f"{self} has been deleted.")
if not all(self._required_save_attrs.values()):
raise AssertionError(
f"{self.__class__.__name__}.save() requires attribute(s) to be set: "
f"{', '.join(self._resource_class.Meta.required_save_attrs)}."
)
if not self._fresh:
logger.debug(
"[%s] Saving possibly stale Kelvin object instance.", self.session.request_id[:10]
)
data = self._to_kelvin_request_data()
# assumption: if self.url was set, the object exists in the Kelvin API
# so if it's not set, we'll try to create the object
if not self.url:
resp_json = await self.session.post(
url=self._resource_class(session=self.session).collection_url, json=data
)
resp_obj = self._from_kelvin_response(resp_json)
for k, v in resp_obj.as_dict().items():
setattr(self, k, v)
self._fresh = False
return self
# self.url was set -> modify object
# TODO: or creation failed and this is the fall-back
resp_json = await self.session.put(url=self.url, json=data)
resp_obj = self._from_kelvin_response(resp_json)
for k, v in resp_obj.as_dict().items():
setattr(self, k, v)
self._fresh = False
return self
[docs] async def delete(self) -> None:
if self._deleted:
logger.warning("[%s] %s has already been deleted.", self.session.request_id[:10], self)
return
if not self.url:
raise RuntimeError("Attribute 'url' unset. Run 'reload()' before 'delete()'.")
await self.session.delete(self.url)
self._deleted = True
[docs] def as_dict(self) -> Dict[str, Any]:
attrs = self._kelvin_attrs + ["dn", "url"]
_dict_repr = copy.deepcopy(dict((attr, getattr(self, attr)) for attr in attrs))
return _dict_repr
@classmethod
def _from_kelvin_response(cls, response: Dict[str, Any]) -> KelvinObjectType:
try:
# school url to school name
school_url = response["school"]
tmp = unquote(school_url.rsplit("/", 1)[-1])
response["school"] = tmp.split("?")[0]
except (IndexError, KeyError):
pass
return cls(**response)
def _to_kelvin_request_data(self) -> Dict[str, Any]:
data = self.as_dict()
if not data["ucsschool_roles"]:
del data["ucsschool_roles"]
del data["dn"]
del data["url"]
try:
# school name to school url
data["school"] = f"{self.session.urls['school']}{data['school']}"
except KeyError:
pass
return data
@property
def _required_get_attrs(self) -> Dict[str, Any]:
return dict((attr, getattr(self, attr)) for attr in self._resource_class.Meta.required_get_attrs)
@property
def _required_save_attrs(self) -> Dict[str, Any]:
return dict(
(attr, getattr(self, attr)) for attr in self._resource_class.Meta.required_save_attrs
)
def _update_old_attrs(self):
self._old_attrs.update(self._required_get_attrs)
[docs]class KelvinResource(ABC):
def __init__(self, session: Session, language: str = None):
self.session = session
if language:
self.session.language = language
self.collection_url = ""
self.object_url = ""
[docs] async def get(self, **kwargs) -> KelvinObjectType:
if not all(attr in kwargs for attr in self.Meta.required_get_attrs):
raise AssertionError(
f"{self.__class__.__name__}.get() requires argument(s): "
f"{', '.join(self.Meta.required_get_attrs)}."
)
url = self.object_url.format(**kwargs)
try:
return await self.get_from_url(url)
except NoObject as exc:
raise NoObject(
f"{self.Meta.kelvin_object._class_display_name} not found at URL {url!r} using "
f"attributes {kwargs!r}.",
reason=exc.reason,
status=exc.status,
url=url,
) from exc
[docs] async def exists(self, **kwargs) -> bool:
if not all(attr in kwargs for attr in self.Meta.required_head_attrs):
raise AssertionError(
f"{self.__class__.__name__}.get() requires argument(s): "
f"{', '.join(self.Meta.required_get_attrs)}."
)
url = self.object_url.format(**kwargs)
status_code: int = await self.session.head(url)
if status_code == 200:
return True
if status_code == 404:
return False
logger.warning(
"There was a problem with HEAD for %s. Trying with GET instead. Status code: %s",
url,
status_code,
)
try:
await self.get(**kwargs)
except NoObject:
return False
return True
[docs] async def get_from_url(self, url: str) -> KelvinObjectType:
resp_json: Dict[str, Any] = await self.session.get(url)
obj = self.Meta.kelvin_object._from_kelvin_response(resp_json)
obj.session = self.session
return obj
[docs] async def search(self, **kwargs) -> AsyncIterator[KelvinObjectType]:
self._check_search_attrs(**kwargs)
# not necessary, but will simplify the query string
for k in [k for k, v in kwargs.items() if v in ("", "*")]:
del kwargs[k]
resp_json: List[Dict[str, Any]] = await self.session.get(self.collection_url, params=kwargs)
for resp in resp_json:
obj = self.Meta.kelvin_object._from_kelvin_response(resp)
obj.session = self.session
yield obj
def _check_search_attrs(self, **kwargs) -> None:
"""
:raises ucsschool.kelvin.client.InvalidRequest: when there is a problem with the kwargs
for `search()`
"""
if not all(attr in kwargs for attr in self.Meta.required_search_attrs):
raise InvalidRequest(
f"{self.__class__.__name__}.search() requires argument(s): "
f"{', '.join(self.Meta.required_search_attrs)}."
)