Source code for pandalone.mappings

#! python
# -*- coding: utf-8 -*-
#
# Copyright 2013-2019 European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
Hierarchical string-like objects useful for indexing, that can be rename/relocated at a later stage.

.. autosummary::

    Pstep
    pmods_from_tuples
    Pmod

*Example*::

    >>> from pandalone.mappings import pmods_from_tuples

    >>> pmods = pmods_from_tuples([
    ...     ('',         'deeper/ROOT'),
    ...     ('/abc',     'ABC'),
    ...     ('/abc/foo', 'BAR'),
    ... ])
    >>> p = pmods.step()
    >>> p.abc.foo
    `BAR`
    >>> p._paths()
    ['deeper/ROOT/ABC/BAR']

- TODO: Implements "anywhere" pmods(`//`).
"""

from copy import copy
import logging
import re

import functools as ft
from pandalone import utils
from pandalone.pandata import (
    iter_jsonpointer_parts_relaxed,
    JSchema,
    unescape_jsonpointer_part,
    escape_jsonpointer_part,
)


__commit__ = ""

log = logging.getLogger(__name__)


[docs]class Pmod(object): r""" A path-step mapping forming the pmods-hierarchy. - The :term:`pmods` denotes the hierarchy of all :term:`mappings`, that either *rename* or *relocate* path-steps. - A single :term:`mapping` transforms an "origin" path to a "destination" one (also called as "from" and "to" paths). - A mapping always transforms the *final* path-step, like that:: FROM_PATH TO_PATH RESULT_PATH --------- ------- ----------- /rename/path foo --> /rename/foo ## renaming /relocate/path foo/bar --> /relocate/foo/bar ## relocation '' a/b/c --> /a/b/c ## Relocate all paths. / a/b/c --> /a/b/c ## Relocates 1st "empty-str" step. - The :term:`pmod` is the mapping of that single path-step. - It is possible to match fully on path-steps using regular-expressions, and then to use any captured-groups from the *final* step into the mapped value:: (/all(.*)/path, foo) + all_1/path --> /all_1/foo + all_XYZ --> /all_XYZ ## no change (/all(.*)/path, foo\1) + all_1/path --> /all_1/foo_1 If more than one regex match, they are merged in the order declared (the latest one overrides a previous one). - Any exact child-name matches are applied and merged after regexs. - Use :func:`pmods_from_tuples()` to construct the pmods-hierarchy. - The pmods are used internally by class:`Pstep` to correspond the component-paths of their input & output onto the actual value-tree paths. Example: .. Note:: Do not manually construct instances from this class! To construct a hierarchy use the :func:`pmods_from_tuples()` or pass mappings as the 2nd argument in :class:`Pstep` constructor. You can either use it for massively map paths, either for *renaming* them:: >>> pmods = pmods_from_tuples([ ... ('/a', 'A'), ... ('/~b.*', r'BB\g<0>'), ## Previous match. ... ('/~b.*/~c.(.*)', r'W\1ER'), ## Capturing-group(1) ... ]) >>> pmods.map_paths(['/a', '/a/foo']) ## 1st rule ['/A', '/A/foo'] >>> pmods.map_path('/big/stuff') ## 2nd rule '/BBbig/stuff' >>> pmods.map_path('/born/child') ## 2nd & 3rd rule '/BBborn/WildER' or to *relocate* them:: >>> pmods = pmods_from_tuples([ ... ('/a', 'A/AA'), ... ('/~b.*/~c(.*)', r'../C/\1'), ... ('/~b.*/~.*/~r.*', r'/\g<0>'), ... ]) >>> pmods.map_paths(['/a/foo', '/big/child', '/begin/from/root']) ['/A/AA/foo', '/big/C/hild', '/root'] Here is how you relocate "root" (notice that the `''` path is the root):: >>> pmods = pmods_from_tuples([('', '/NEW/ROOT')]) >>> pmods.map_paths(['/a/foo', '']) ['/NEW/ROOT/a/foo', '/NEW/ROOT'] """ __slots__ = ["_alias", "_steps", "_regxs"] #: (optional) the mapped-name of the pstep for _alias: str #: {original_name --> pmod} _steps: dict #: {regex_on_originals --> pmod} _regxs: dict
[docs] def __init__(self, _alias=None, _steps={}, _regxs={}): """ Args passed only for testing, remember `_regxs` to be (k,v) tuple-list! .. Note:: Volatile arg-defaults (empty dicts) are knowingly used , to preserve memory; should never append in them! """ self._alias = _alias self._steps = _steps if _regxs: self._regxs = {re.compile(k): v for k, v in _regxs} else: self._regxs = _regxs
[docs] def step(self, pname="", alias=None): """ Create a new :class:`Pstep` having as mappings this pmod. If no `pname` specified, creates a *root* pstep. Delegates to :meth:`Pstep.__new__()`. """ return Pstep(pname, maps=self, alias=alias)
[docs] def _append_into_steps(self, key): """ Inserts a child-mappings into `_steps` dict. :param str key: the step-name to add """ cpmod = None d = self._steps if not d: self._steps = d = {} # Do not modify init-defaults. else: cpmod = d.get(key) if not cpmod: d[key] = cpmod = Pmod() return cpmod
[docs] def _append_into_regxs(self, key): """ Inserts a child-mappings into `_steps` dict. :param str key: the regex-pattern to add """ key = re.compile(key) cpmod = None d = self._regxs if not d: self._regxs = d = {} # Do not modify init-defaults. else: cpmod = d.get(key) if cpmod: # Remove it, to append it at then end. del d[key] if not cpmod: cpmod = Pmod() d[key] = cpmod return cpmod
[docs] def _override_steps(self, other): """ Override this pmod's '_steps' dict with other's, recursively. Same as :meth:`_override_regxs()` but without caring for order. """ opmods = other._steps if opmods: spmods = self._steps if spmods: # Like ``spmods.copy().update()`` but # recursive `_merge()` on common items. # spmods = spmods.copy() for name, opmod in opmods.items(): spmod = spmods.get(name) if spmod: opmod = spmod._merge(opmod) spmods[name] = opmod # Share other-pmod if not mine. opmods = spmods # Share other dict if self hadn't its own. self._steps = opmods
[docs] def _override_regxs(self, other): """ Override this pmod's `_regxs` dict with other's, recursively. - It may "share" (crosslink) the dict and/or its child-pmods between the two pmod args (`self` and `other`). - No dict is modified (apart from self, which must have been cloned previously by :meth:`Pmod._merge()`), to avoid side-effects in case they were "shared". - It preserves dict-ordering so that `other` order takes precedence (its elements are the last ones). :param Pmod self: contains the dict that would be overridden :param Pmod other: contains the dict with the overrides """ opmods = other._regxs if opmods: spmods = self._regxs if spmods: # Like ``spmods.copy().update()`` but # with recursive `_merge()` on common items, # and preserve order. # opairs = [] for name, opmod in opmods.items(): spmod = spmods.get(name) if spmod: mpmod = spmod._merge(opmod) else: mpmod = opmod # Share other-pmod. opairs.append((name, mpmod)) okeys = opmods.keys() spairs = [ (name, spmod) # Share self-pmod. for name, spmod in spmods.items() if name not in okeys ] opmods = type(spmods)(spairs + opairs) # Share other dict if self hadn't its own. self._regxs = opmods
[docs] def _merge(self, other): """ Clone and override all its props with props from other-pmod, recursively. Although it does not modify this, the `other` or their children pmods, it may "share" (crosslink) them, so pmods MUST NOT be modified later. :param Pmod other: contains the dicts with the overrides :return: the cloned merged pmod :rtype: Pmod Examples: Look how `_steps` are merged:: >>> pm1 = Pmod(_alias='pm1', _steps={ ... 'a':Pmod(_alias='A'), 'c':Pmod(_alias='C')}) >>> pm2 = Pmod(_alias='pm2', _steps={ ... 'b':Pmod(_alias='B'), 'a':Pmod(_alias='AA')}) >>> pm = pm1._merge(pm2) >>> sorted(pm._steps.keys()) ['a', 'b', 'c'] And here it is `_regxs` merging, which preserves order:: >>> pm1 = Pmod(_alias='pm1', ... _regxs=[('d', Pmod(_alias='D')), ... ('a', Pmod(_alias='A')), ... ('c', Pmod(_alias='C'))]) >>> pm2 = Pmod(_alias='pm2', ... _regxs=[('b', Pmod(_alias='BB')), ... ('a', Pmod(_alias='AA'))]) >>> pm1._merge(pm2) pmod('pm2', {re.compile('d'): pmod('D'), re.compile('c'): pmod('C'), re.compile('b'): pmod('BB'), re.compile('a'): pmod('AA')}) >>> pm2._merge(pm1) pmod('pm1', {re.compile('b'): pmod('BB'), re.compile('d'): pmod('D'), re.compile('a'): pmod('A'), re.compile('c'): pmod('C')}) """ self = copy(self) if other._alias is not None: self._alias = other._alias self._override_steps(other) self._override_regxs(other) return self
[docs] def _match_regxs(self, cstep): """Return (pmod, regex.match) for those child-pmods matching `cstep`.""" return [ (rpmod, match) for rpmod, match in ( (rpmod, re.fullmatch(regex, cstep)) for regex, rpmod in self._regxs.items() ) if match ]
[docs] def descend(self, cstep): r""" Return child-pmod with merged any exact child with all matched regexps, along with its alias regex-expaned. :param str cstep: the child path-step cstep of the pmod to return :return: the merged-child pmod, along with the alias; both might be None, if nothing matched, or no alias. :rtype: tuple(Pmod, str) Example:: >>> pm = Pmod( ... _steps={'a': Pmod(_alias='A')}, ... _regxs=[(r'a\w*', Pmod(_alias='AWord')), ... (r'a(\d*)', Pmod(_alias=r'A_\1')), ... ]) >>> pm.descend('a') (pmod('A'), 'A') >>> pm.descend('abc') (pmod('AWord'), 'AWord') >>> pm.descend('a12') (pmod('A_\\1'), 'A_12') >>> pm.descend('BAD') (None, None) Notice how children of regexps are merged together:: >>> pm = Pmod( ... _steps={'a': ... Pmod(_alias='A', _steps={1: 11})}, ... _regxs=[ ... (r'a\w*', Pmod(_alias='AWord', ... _steps={2: Pmod(_alias=22)})), ... (r'a\d*', Pmod(_alias='ADigit', ... _steps={3: Pmod(_alias=33)})), ... ]) >>> sorted(pm.descend('a')[0]._steps) ## All children and regexps match. [1, 2, 3] >>> pm.descend('aa')[0]._steps ## Only r'a\w*' matches. {2: pmod(22)} >>> sorted(pm.descend('a1')[0]._steps ) ## Both regexps matches. [2, 3] So it is possible to say:: >>> pm.descend('a1')[0].alias(2) 22 >>> pm.descend('a1')[0].alias(3) 33 >>> pm.descend('a1')[0].descend('BAD') (None, None) >>> pm.descend('a$') (None, None) but it is better to use :meth:`map_path()` for this. """ alias = None cpmod = self._steps.get(cstep) pmods = self._match_regxs(cstep) if cpmod and cpmod._alias is not None: alias = cpmod._alias else: for rpmod, match in reversed(pmods): if rpmod._alias is not None: alias = match.expand(rpmod._alias) break pmods = [pmod for pmod, _ in pmods] if cpmod: pmods.append(cpmod) if pmods: return (ft.reduce(Pmod._merge, pmods), alias) return (None, None)
[docs] def alias(self, cstep): """ Like :meth:`descend()` but without merging child-pmods. :return: the expanded alias from child/regexs or None """ cpmod = self._steps.get(cstep) if cpmod and cpmod._alias is not None: return cpmod._alias pmods = self._match_regxs(cstep) for rpmod, match in reversed(pmods): if rpmod._alias is not None: return match.expand(rpmod._alias)
[docs] def map_path(self, path): r""" Maps a '/rooted/path' using all aliases while descending its child pmods. It uses any aliases on all child pmods if found. :param str path: a rooted path to transform :return: the rooted mapped path or '/' if path was '/' :rtype: str or None Examples:: >>> pmods = pmods_from_tuples([ ... ('/a', 'A/AA'), ... ('/~a(\\w*)', r'BB\1'), ... ('/~a\\w*/~d.*', r'D \g<0>'), ... ('/~a(\\d+)', r'C/\1'), ... ('/~a(\\d+)/~(c.*)', r'CC-/\1'), # The 1st group is ignored! ... ('/~a\\d+/~e.*', r'/newroot/\g<0>'), # Rooted mapping. ... ]) >>> pmods.map_path('/a') '/A/AA' >>> pmods.map_path('/a_hi') '/BB_hi' >>> pmods.map_path('/a12') '/C/12' >>> pmods.map_path('/a12/etc') '/newroot/etc' Notice how children from *all* matching prior-steps are merged:: >>> pmods.map_path('/a12/dow') '/C/12/D dow' >>> pmods.map_path('/a12/cow') '/C/12/CC-/cow' To map *root* use '' which matches before the 1st slash('/'):: >>> pmods = pmods_from_tuples([('', 'New/Root'),]) ## Relative >>> pmods pmod({'': pmod('New/Root')}) >>> pmods.map_path('/for/plant') 'New/Root/for/plant' >>> pmods_from_tuples([('', '/New/Root'),]).map_path('/for/plant') '/New/Root/for/plant' .. Note:: Using slash('/') for "from" path will NOT map *root*:: >>> pmods = pmods_from_tuples([('/', 'New/Root'),]) >>> pmods pmod({'': pmod({'': pmod('New/Root')})}) >>> pmods.map_path('/for/plant') '/for/plant' >>> pmods.map_path('//for/plant') '/New/Root/for/plant' '/root' but '' always remains unchanged (whole document):: >>> pmods.map_path('') '' """ is_folder = len(path) > 1 and path.endswith("/") if is_folder: path = path[:-1] steps = tuple(iter_jsonpointer_parts_relaxed(path)) if self._alias is None: nsteps = () else: nsteps = tuple(iter_jsonpointer_parts_relaxed(self._alias)) if steps: pmod = self # Separate last-step from loop below, since # merged child-pmods in `descend` are not needed. # for step in steps[:-1]: if pmod: pmod, alias = pmod.descend(step) if alias is not None: if alias.startswith("."): nsteps += (step,) step = alias # XXX: Monkey business here. if len(step) > 1 and step.endswith("/"): step = step[:-1] nsteps += tuple(iter_jsonpointer_parts_relaxed(step)) final_step = steps[-1] if pmod: alias = pmod.alias(final_step) if alias is not None: if alias.startswith("."): nsteps += (final_step,) final_step = alias # XXX: Monkey business here. is_folder = len(final_step) > 1 and final_step.endswith("/") if is_folder: final_step = final_step[:-1] nsteps += tuple(iter_jsonpointer_parts_relaxed(final_step)) npath = _join_paths(*nsteps) if is_folder: path += "%s/" % path return npath
def map_paths(self, paths): return [self.map_path(p) for p in paths]
[docs] def __repr__(self): args = [ repr(a) for a in [self._alias, self._steps, self._regxs] if a or a == "" ] args = ", ".join(args) return "pmod({})".format(args)
[docs] def __eq__(self, o): try: return (self._alias, self._steps, self._regxs) == ( o._alias, o._steps, o._regxs, ) except: return False
[docs]def pmods_from_tuples(pmods_tuples): r""" Turns a list of 2-tuples into a *pmods* hierarchy. - Each tuple defines the renaming-or-relocation of the *final* part of some component path onto another one into value-trees, such as:: (/rename/path, foo) --> rename/foo (relocate/path, foo/bar) --> relocate/foo/bar - The "from" path may be: - relative, - absolute(starting with `/`), or - "anywhere"(starting with `//`). - In case a "step" in the "from" path starts with tilda(`~`), it is assumed to be a regular-expression, and it is removed from it. The "to" path can make use of any "from" capture-groups:: ('/~all(.*)/path', 'foo') (r'~some[\d+]/path', 'foo\1') ('//~all(.*)/path', 'foo') :param list(tuple(str, str) pmods_tuples: :return: a root pmod :rtype: Pmod Example:: >>> pmods_from_tuples([ ... ('/a', 'A1/A2'), ... ('/a/b', 'B'), ... ]) pmod({'': pmod({'a': pmod('A1/A2', {'b': pmod('B')})})}) >>> pmods_from_tuples([ ... ('/~a*', 'A1/A2'), ... ('/a/~b[123]', 'B'), ... ]) pmod({'': pmod({'a': pmod({re.compile('b[123]'): pmod('B')})}, {re.compile('a*'): pmod('A1/A2')})}) This is how you map *root*:: >>> pmods = pmods_from_tuples([ ... ('', 'relative/Root'), ## Make all paths relatives. ... ('/a/b', '/Rooted/B'), ## But map `b` would be "rooted". ... ]) >>> pmods pmod({'': pmod('relative/Root', {'a': pmod({'b': pmod('/Rooted/B')})})}) >>> pmods.map_path('/a/c') 'relative/Root/a/c' >>> pmods.map_path('/a/b') '/Rooted/B' But note that '/' maps the 1st "empty-str" step after root:: >>> pmods_from_tuples([ ... ('/', 'New/Root'), ... ]) pmod({'': pmod({'': pmod('New/Root')})}) TODO: Implement "anywhere" matches. """ root = Pmod() for i, (f, t) in enumerate(pmods_tuples): if (f, t) == ("", "") or f is None or t is None: msg = 'pmod-tuple #%i of %i: Invalid "from-to" tuple (%r, %r).' log.warning(msg, i + 1, len(pmods_tuples), f, t) continue pmod = root for srcstep in iter_jsonpointer_parts_relaxed(f): is_regex = srcstep.startswith("~") if is_regex: pmod = pmod._append_into_regxs(srcstep[1:]) else: pmod = pmod._append_into_steps(srcstep) pmod._alias = t return root
[docs]def _append_step(steps, step): """ Joins `step` at the right of `steps`, respecting '/', '..', '.', ''. :param tuple steps: where to append into ("absolute" when 1st-element is '') :param str step: what to append (may be: ``'foo', '.', '..', ''``) :rtype: tuple .. Note:: The empty-string('') is the "root" for both `steps` and `step`. An empty-tuple `steps` is considered "relative", equivalent to dot(`.`). Example:: >>> _append_step((), 'a') ('a',) >>> _append_step(('a', 'b'), '..') ('a',) >>> _append_step(('a', 'b'), '.') ('a', 'b') Not that an "absolute" path has the 1st-step empty(`''`), (so the previous paths above were all "relative"):: >>> _append_step(('a', 'b'), '') ('',) >>> _append_step(('',), '') ('',) >>> _append_step((), '') ('',) Dot-dots preserve "relative" and "absolute" paths, respectively, and hence do not coalesce when at the left:: >>> _append_step(('',), '..') ('',) >>> _append_step(('',), '.') ('',) >>> _append_step(('a',), '..') () >>> _append_step((), '..') ('..',) >>> _append_step(('..',), '..') ('..', '..') >>> _append_step((), '.') () Single-dots('.') just dissappear:: >>> _append_step(('.',), '.') () >>> _append_step(('.',), '..') ('..',) """ assert isinstance(steps, tuple), (steps, step) assert not step or isinstance(step, str), (steps, step) if step == "": return ("",) _last_pair_choices = { (".",): (), ("..",): ("..",), (".", "."): (), (".", ".."): ("..",), ("..", "."): ("..",), ("..", ".."): ("..", ".."), ("", "."): ("",), ("", ".."): ("",), } try: last_pair = steps[-1:] + (step,) steps = steps[:-1] + _last_pair_choices[last_pair] except KeyError: if step == ".": pass elif step == "..": steps = steps[:-1] else: steps += (step,) return steps
[docs]def _join_paths(*steps): """ Joins all path-steps in a single string, respecting ``'/', '..', '.', ''``. :param str steps: single json-steps, from left to right :rtype: str .. Note:: If you use :func:`iter_jsonpointer_parts_relaxed()` to generate path-steps, the "root" is signified by the empty(`''`) step; not the slash(`/`)! Hence a lone slash(`/`) gets splitted to an empty step after "root" like that: ``('', '')``, which generates just "root"(`''`). Therefore a "folder" (i.e. `some/folder/`) when splitted equals ``('some', 'folder', '')``, which results again in the "root"(`''`)! Examples:: >>> _join_paths('r', 'a', 'b') 'r/a/b' >>> _join_paths('', 'a', 'b', '..', 'bb', 'cc') '/a/bb/cc' >>> _join_paths('a', 'b', '.', 'c') 'a/b/c' An empty-step "roots" the remaining path-steps:: >>> _join_paths('a', 'b', '', 'r', 'aa', 'bb') '/r/aa/bb' All `steps` have to be already "splitted":: >>> _join_paths('a', 'b', '../bb') 'a/b/../bb' Dot-doting preserves "relative" and "absolute" paths, respectively:: >>> _join_paths('..') '..' >>> _join_paths('a', '..') '.' >>> _join_paths('a', '..', '..', '..') '../..' >>> _join_paths('', 'a', '..', '..') '' Some more special cases:: >>> _join_paths('..', 'a') '../a' >>> _join_paths('', '.', '..', '..') '' >>> _join_paths('.', '..') '..' >>> _join_paths('..', '.', '..') '../..' .. seealso:: _append_step """ nsteps = ft.reduce(_append_step, steps, ()) if not nsteps: return "." else: return "/".join(nsteps)
_forbidden_pstep_attrs = ("get_values", "Series") """ Psteps attributes excluded from magic-creation, because searched by pandas's indexing code. """
[docs]def _clone_attrs(obj): """Clone deeply any collection attributes of the passed-in object.""" attrs = vars(obj).copy() ccsteps = attrs.get("_csteps", None) if ccsteps: attrs["_csteps"] = ccsteps.copy() ctags = attrs["_tags"] if ctags: attrs["_tags"] = attrs["_tags"].copy() return attrs
[docs]class Pstep(str): """ Automagically-constructed *relocatable* paths for accessing data-tree. The "magic" autocreates psteps as they referenced, making writing code that access data-tree paths, natural, while at the same time the "model" of those tree-data gets discovered. Each pstep keeps internally the *name* of a data-tree step, which, when created through recursive referencing, concedes with parent's branch leading to this step. That name can be modified with :class:`Pmod` so the same data-accessing code can refer to differently-named values int the data-tree. :ivar dict _csteps: the child-psteps by their name (default `None`) :ivar dict _pmod: path-modifications used to construct this and relayed to children (default `None`) :ivar int _locked: one of - :const:`Pstep.CAN_RELOCATE` (default), - :const:`Pstep.CAN_RENAME`, - :const:`Pstep.LOCKED` (neither from the above). :ivar set _tags: A set of strings (default `()`) :ivar dict _schema: json-schema data. See :meth:`__new__()` for interal constructor. **Usage:** - Use a :meth:`Pmod.pstep()` to construct a *root* pstep from mappings. Specify a string argument to construct a relative pstep-hierarchy. - Just referencing (non_private) attributes, creates them. - Private attributes and functions (starting with ``_``) exist for specific operations (ie for specifying json-schema, or for collection all paths). - Assignments are only allowed for string-values, or to private attributes:: >>> p = Pstep() >>> p.assignments = 12 Traceback (most recent call last): AssertionError: Cannot assign '12' to '/assignments! >>> p._but_hidden = 'Ok' - Use :meth:`_paths()` to get all defined paths so far. - Construction:: >>> Pstep() `` >>> Pstep('a') `a` Notice that pstesps are surrounded with the back-tick char('`'). - Paths are created implicitely as they are referenced:: >>> m = {'a': 1, 'abc': 2, 'cc': 33} >>> p = Pstep('a') >>> assert m[p] == 1 >>> assert m[p.abc] == 2 >>> assert m[p.a321.cc] == 33 >>> sorted(p._paths()) ['a/a321/cc', 'a/abc'] - Any "path-mappings" or "pmods" maybe specified during construction:: >>> from pandalone.mappings import pmods_from_tuples >>> maps = [ ... ('', 'deeper/ROOT'), ... ('/abc', 'ABC'), ... ('/abc/foo', 'BAR'), ... ] >>> p = Pstep('', pmods_from_tuples(maps)) OR >>> pmods = pmods_from_tuples(maps) >>> p = pmods.step() >>> p.abc.foo `BAR` >>> p._paths() ['deeper/ROOT/ABC/BAR'] - but exceptions are thrown if mapping any step marked as "locked": >>> p.abc.foo._locked ## 3: CAN_RELOCATE 3 >>> p.abc.foo._lock ## Screams, because `foo` is already mapped. Traceback (most recent call last): ValueError: Cannot rename/relocate 'foo'-->'BAR' due to LOCKED! - .. Warning:: Creating an empty(`''`) step in some paths will "root" the path:: >>> p = Pstep() >>> _ = p.a1.b >>> _ = p.A2 >>> p._paths() ['/A2', '/a1/b'] >>> _ = p.a1.a2.c >>> _ = p.a1.a2 = '' >>> p._paths() ['/A2', '/a1/b', '/c'] """ CAN_RELOCATE = 3 CAN_RENAME = 1 LOCKED = 0 @staticmethod def _lockstr(lock): if lock >= Pstep.CAN_RELOCATE: return "CAN_RELOCATE" if Pstep.LOCKED <= lock < Pstep.CAN_RELOCATE: return "LOCKED" return "LOCKED"
[docs] def __new__(cls, pname=None, maps=None, alias=None, *tags): """ Constructs a string with str-content which may comes from the mappings. These are the valid argument combinations:: pname='attr_name', pname='attr_name', _alias='Mass [kg]' pname='attr_name', maps=Pmod pname='attr_name', maps=Pstep pname='attr_name', maps=Pstep, _alias='Mass [kg]' :param str pname: this pstep's name which must coincede with the name of the parent-pstep's attribute holding this pstep. It is stored at `_orig` and if no `alias` and unmapped by pmod, this becomes the `alias`. To create an "absolute" pstep, do not set this or alias args. :param Pmod or Pstep maps: It can be either: - the mappings for this pstep, - another pstep to clone attributes from (used when replacing an existing child-pstep), or - None. The mappings will apply only if :meth:`Pmod.descend()` match `pname` and will derrive the alias. :param str alias: Will become the super-str object when no mappings specified (`maps` is a dict from some prototype pstep) It gets jsonpointer-escaped if it exists (see :func:`pandata.escape_jsonpointer_part()`) :param tags: Arguments for calling :meth:`_tag()` afterwards. """ pmod = None attrs = None if pname is None: pname = "" if maps: if isinstance(maps, (list, tuple, Pmod)): if isinstance(maps, (list, tuple)): maps = pmods_from_tuples(maps) pmod, m_alias = maps.descend(pname) if alias is None and m_alias: # TODO: Add Escape-path TCs. alias = unescape_jsonpointer_part(m_alias) elif not isinstance(maps, Pstep): raise ValueError("Invalid type(%s) for `maps`!" % maps) else: attrs = _clone_attrs(maps) if alias is None: alias = pname self = str.__new__(cls, alias) self.__dict__ = attrs or { "_orig": pname, "_pmod": pmod, "_csteps": None, "_locked": Pstep.CAN_RELOCATE, "_tags": set(tags) if tags else (), } return self
def __make_cstep(self, ckey, alias=None, existing_cstep=None): csteps = self._csteps if not csteps: self._csteps = csteps = {} else: if existing_cstep is None: existing_cstep = csteps.get(ckey, None) # Update my mappings for `b` when ``self.b = "foo"``. # if alias is not None: pmod = self._pmod if pmod: pmod._alias = alias else: self._pmod = Pmod(_alias=alias) csteps[ckey] = child = Pstep(ckey, existing_cstep or self._pmod, alias) return child def __getattr__(self, attr): try: return str.__getattr__(self, attr) except AttributeError as ex: if attr.startswith("_") or attr in _forbidden_pstep_attrs: raise csteps = self._csteps child = csteps and csteps.get(attr, None) return child or self.__make_cstep(attr)
[docs] def __setattr__(self, attr, value): if attr.startswith("_"): str.__setattr__(self, attr, value) elif isinstance(value, Pstep): self.__make_cstep(attr, existing_cstep=value) elif isinstance(value, str): self.__make_cstep(attr, alias=value) else: raise self._ex_invalid_assignment(attr, value)
[docs] def __dir__(self): d = super(str, self).__dir__() if self._csteps: d = sorted(d + list(self._csteps.keys())) return d
def _ex_invalid_assignment(self, cpname, value): msg = "Cannot assign '%s' to '%s/%s!" return AssertionError(msg % (value, self, cpname))
[docs] def __repr__(self): return "`%s`" % self
@property def _locked(self): """ Gets `_locked` internal flag or scream on set, when step already renamed/relocated Prefer using one of :attr:`_fix` or :attr:`_lock` instead. :param locked: One of :attr:`CAN_RELOCATE`, :attr:`CAN_RENAME`, :attr:`LOCKED`. :raise: ValueError when stricter lock-value on a renamed/relocated pstep """ return vars(self)["_locked"] @_locked.setter def _locked(self, lock_state): if self != self._orig: if lock_state < Pstep.CAN_RENAME or ( lock_state < Pstep.CAN_RELOCATE and "/" in self ): msg = "Cannot rename/relocate '%s'-->'%s' due to %s!" raise ValueError(msg % (self._orig, self, Pstep._lockstr(lock_state))) vars(self)["_locked"] = int(lock_state) @property def _fix(self): """Sets :attr:`locked` = `CAN_RENAME`. :return: self :raise: ValueError if step has been relocated pstep """ self._locked = Pstep.CAN_RENAME return self @property def _lock(self): """Set :attr:`locked` = `LOCKED`. :return: self, for chained use :raise: ValueError if step has been renamed/relocated pstep """ self._locked = Pstep.LOCKED return self
[docs] def _tag(self, *tags): """Add a "tag" for this pstep. :return: self, for chained use """ _tags = self._tags if _tags: _tags.update(tags) elif tags: self._tags = set(tags) return self
[docs] def _tag_remove(self, tag): """Delete a "tag" from this pstep. :return: self, for chained use """ tags = self._tags if tags: tags.discard(tag) return self
def _steps(self, keys=False, tag=None): csteps = self._csteps if not csteps: return [] if keys: return csteps.keys() return csteps.values()
[docs] def _paths(self, with_orig=False, tag=None): """ Return all children-paths (str-list) constructed so far, in a list. :param bool with_orig: wheter to include also orig-path, for debug. :param str tag: If not 'None', fetches all paths with `tag` in their last step. :rtype: [str] Examples:: >>> p = Pstep() >>> _ = p.a1._tag('inp').b._tag('inp').c >>> _ = p.a2.b2 >>> p._paths() ['/a1/b/c', '/a2/b2'] >>> p._paths(tag='inp') ['/a1', '/a1/b'] For debugging set `with_orig` to `True`:: >>> pmods = pmods_from_tuples([ ... ('', 'ROOT'), ... ('/a', 'A/AA'), ... ]) >>> p = pmods.step() >>> _ = p.a.b >>> p._paths(with_orig=True) ['(-->ROOT)/(a-->A/AA)/b'] """ def append_orig(s): orig = s._orig return "(%s-->%s)" % (orig, s) if s != orig else s paths = [] for path in self._iter_hierarchy(): send = path[-1] if (tag and tag in send._tags) or (not tag and not send._csteps): if with_orig: path = [append_orig(s) for s in path] paths.append(_join_paths(*path)) return sorted(set(paths))
[docs] def _derrive_map_tuples(self): """ Recursively extract ``(cmap --> alias)`` pairs from the pstep-hierarchy. :param list pairs: Where to append subtree-paths built. :param tuple prefix_steps: branch currently visiting :rtype: [(str, str)] """ def orig_paths(psteps): return [p._orig for p in psteps] map_pairs = ( (_join_paths(*orig_paths(p)), str(p[-1])) for p in self._iter_hierarchy() ) return sorted(map_pairs, key=lambda p: p[0])
[docs] def _iter_hierarchy(self, prefix_steps=()): """ Breadth-first traversing of pstep-hierarchy. :param tuple prefix_steps: Builds here branch currently visiting. :return: yields the visited pstep along with its path (including it) :rtype: (Pstep, [Pstep]) """ prefix_steps += (self,) yield prefix_steps csteps = self._csteps if csteps: for v in csteps.values(): for p in v._iter_hierarchy(prefix_steps): yield p
@property def _schema(self): """Updates json-schema-v4 on this pstep (see :class:`JSchema`).""" # Lazy create it # (clients should check before`_schema_exists()`) # sdict = vars(self) jschema = sdict.get("_schema") if jschema is None: sdict["_schema"] = jschema = JSchema() return jschema
[docs] def _schema_exists(self): """Always use this to avoid needless schema-instantiations.""" return "_schema" in vars(self)
[docs]def pstep_from_df(columns_df, name_col="names"): """ Creates a :class:`Pstep` instances from a dataframe. :param pd.DataFrame columns_df: pstep's mapped-names in `name_col` column, indexed by paths, and any additional pstep-attributes in the rest columns. example:: ======== ========= =================== paths names renames ======== ========= =================== /A foo ['FOO', 'LL'] /B bar [] ======== ========= =================== """ p = pmods_from_tuples(zip(columns_df.index, columns_df[name_col])).step() cdf = columns_df.drop(name_col, axis=1, errors="ignore") attributes = cdf.columns for rows in cdf.itertuples(): path, *attr_values = rows cstep = getattr(p, path[1:]) for attr, aval in zip(attributes, attr_values): setattr(cstep, "_%s" % attr, aval) return p
if __name__ == "__main__": # pragma: no cover raise NotImplementedError