#!/usr/bin/env python
# -*- coding: UTF-8 -*-
#
# Copyright 2014-2019European Commission (JRC);
# Licensed under the EUPL (the 'Licence');
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: http://ec.europa.eu/idabc/eupl
"""
The manager and the base for all :term:`backends` fetching cells from actual workbooks and sheets.
Prefer accessing the public members from the parent module.
.. currentmodule:: pandalone.xleash
"""
from abc import abstractmethod, ABC
from collections import namedtuple
import itertools as itt
import numpy as np
from .. import Coords, _capture, io_backends
from ... import utils
[docs]class ABCBackend(ABC):
"""A plugin for a :term:`backend` must implement and add instances into :data:`io_backends`."""
[docs] @abstractmethod
def bid(self, wb_url):
"""
Express the willingness to handle a workbook when deciding backends.
:param wb_url:
a full-url of a workbook, local('file://) or remote('http://')
:return:
- an integer signifying willingness; bigger integers take precedence.
- `None` signifies invalidness.
"""
[docs] @abstractmethod
def open_sheet(self, wb_url, sheet_id):
"""
Open a :class:`ABCSheet` subclass, if backend has won the bid.
:param wb_url:
a full-url of a workbook, local('file://) or remote('http://')
"""
[docs] @abstractmethod
def list_sheetnames(self, wb_url):
"""Returns a list of sheet-names, if bids "decided" this backend."""
[docs]class SimpleSheetsFactory(object):
"""
Asks :term:`backends` to bid for creating :class:`ABCSheet` instances - client should handle resources.
Backends are taken from :data:`io_backends` or specified during construction.
"""
[docs] def __init__(self, backends=None):
"""
:param backends:
The list of :class:`backends` to consider when opening sheets.
If it evaluates to false, :data:`io_backends` assumed.
:typ backends:
list or None
"""
self.backends = backends or io_backends
[docs] def fetch_sheet(self, wb_url, sheet_id, base_sheet=None):
"""
:param ABCSheet base_sheet:
The sheet used when unspecified `wb_url`.
"""
if wb_url is None and base_sheet:
if sheet_id is None:
sheet = base_sheet
else:
base_sheet.open_sibling_sheet(sheet_id)
else:
url = utils.path2url(wb_url)
sheet = self._open_sheet(url, sheet_id)
assert sheet, (wb_url, sheet_id)
return sheet
[docs] def decide_backend(self, wb_url):
"""Asks all :attr:`backends` to :term:`bid` for handling a :term:`xl-ref`. """
wb_url = utils.path2url(wb_url)
bids = [(be, be.bid(wb_url)) for be in self.backends]
bids = [(be, score) for be, score in bids if score is not None]
if not bids:
raise ValueError(
"No suitable xleash-backend found for(%r)!"
"\n Have you installed `xlrd` extra with this command?"
"\n pip install pandalone[xlrd]" % wb_url
)
winner = sorted(bids, key=lambda x: x[1])[-1][0]
assert isinstance(winner, ABCBackend), "Invalid backend(%r) class!" % winner
return winner
def list_sheetnames(self, wb_id):
url = utils.path2url(wb_id)
be = self.decide_backend(url)
return be.list_sheetnames(url)
def _open_sheet(self, url, sheet_id): # =None):
be = self.decide_backend(url)
sheet = be.open_sheet(url, sheet_id)
if not sheet:
raise ValueError("Backend(%s) found no sheet for(%r)!" % (be, url))
return sheet
[docs]class SheetsFactory(SimpleSheetsFactory):
"""
A caching-store of :class:`ABCSheet` instances, serving them based on (workbook, sheet) IDs, optionally creating them from backends.
:ivar dict _cached_sheets:
A cache of all _Spreadsheets accessed so far,
keyed by multiple keys generated by :meth:`_derive_sheet_keys`.
- To avoid opening non-trivial workbooks, use the :meth:`add_sheet()`
to pre-populate this cache with them.
- It is a resource-manager for contained sheets, so it can be used wth
a `with` statement.
"""
[docs] def __init__(self, backends=None):
"""
:param backends:
The list of :class:`backends` to consider when opening sheets.
If it evaluates to false, :data:`io_backends` assumed.
:typ backends:
list or None
"""
super(SheetsFactory, self).__init__(backends)
self._cached_sheets = {}
def _cache_get(self, key):
wb, sh = key
if wb in self._cached_sheets:
shs = self._cached_sheets[wb]
return shs.get(sh, None)
def _cache_put(self, key, sheet):
wb, sh = key
if wb in self._cached_sheets:
sh_dict = self._cached_sheets[wb]
else:
sh_dict = self._cached_sheets[wb] = {}
sh_dict[sh] = sheet
def _build_sheet_key(self, wb, sh):
assert wb is not None, (wb, sh)
return (wb, sh)
[docs] def _derive_sheet_keys(self, sheet, wb_ids=None, sh_ids=None):
"""
Retuns the product of user-specified and sheet-internal keys.
:param wb_ids:
a single or a sequence of extra workbook-ids (ie: file, url)
:param sh_ids:
a single or sequence of extra sheet-ids (ie: name, index, None)
"""
wb_id, sh_ids2 = sheet.get_sheet_ids()
assert wb_id is not None, (wb_id, sh_ids2)
wb_ids = [wb_id] + utils.as_list(wb_ids)
sh_ids = sh_ids2 + utils.as_list(sh_ids)
key_pairs = itt.product(wb_ids, sh_ids)
keys = list(set(self._build_sheet_key(*p) for p in key_pairs if p[0]))
assert keys, (keys, sheet, wb_ids, sh_ids)
return keys
def _close_sheet(self, key):
sheet = self._cache_get(key)
if sheet:
sheet._close()
for sh_dict in self._cached_sheets.values():
for sh_id, sh in list(sh_dict.items()):
if sh is sheet:
del sh_dict[sh_id]
[docs] def close(self):
"""Closes all contained sheets and empties cache."""
for sh_dict in self._cached_sheets.values():
for sh in sh_dict.values():
sh._close_all()
self._cached_sheets = {}
[docs] def add_sheet(self, sheet, wb_ids=None, sh_ids=None):
"""
Updates cache.
:param wb_ids:
a single or sequence of extra workbook-ids (ie: file, url)
:param sh_ids:
a single or sequence of extra sheet-ids (ie: name, index, None)
"""
assert sheet, (sheet, wb_ids, sh_ids)
keys = self._derive_sheet_keys(sheet, wb_ids, sh_ids)
for k in keys:
old_sheet = self._cache_get(k)
if old_sheet and old_sheet is not sheet:
self._close_sheet(k)
self._cache_put(k, sheet)
[docs] def fetch_sheet(self, wb_id, sheet_id, base_sheet=None):
"""
:param ABCSheet base_sheet:
The sheet used when unspecified `wb_id`.
"""
if wb_id is None and base_sheet:
if sheet_id is None:
sheet = base_sheet
else:
wb_id, _c_sh_ids = base_sheet.get_sheet_ids()
key = self._build_sheet_key(wb_id, sheet_id)
sheet = self._cache_get(key)
if not sheet:
sheet = base_sheet.open_sibling_sheet(sheet_id)
self.add_sheet(sheet, wb_id, sheet_id)
else:
url = utils.path2url(wb_id)
if wb_id is None:
sheet = self._open_sheet(url, sheet_id)
else:
key = self._build_sheet_key(wb_id, sheet_id)
sheet = self._cache_get(key)
if not sheet:
sheet = self._open_sheet(url, sheet_id)
self.add_sheet(sheet, wb_id, sheet_id)
assert sheet, (wb_id, sheet_id)
return sheet
def __enter__(self):
return self
def __exit__(self, typ, value, traceback):
self.close()
[docs]def margin_coords_from_states_matrix(states_matrix):
"""
Returns top-left/bottom-down margins of full cells from a :term:`state` matrix.
May be used by :meth:`ABCSheet.get_margin_coords()` if a backend
does not report the sheet-margins internally.
:param np.ndarray states_matrix:
A 2D-array with `False` wherever cell are blank or empty.
Use :meth:`ABCSheet.get_states_matrix()` to derrive it.
:return: the 2 coords of the top-left & bottom-right full cells
:rtype: (Coords, Coords)
Examples::
>>> states_matrix = np.asarray([
... [0, 0, 0],
... [0, 1, 0],
... [0, 1, 1],
... [0, 0, 1],
... ])
>>> margins = margin_coords_from_states_matrix(states_matrix)
>>> margins
(Coords(row=1, col=1), Coords(row=3, col=2))
Note that the botom-left cell is not the same as `states_matrix` matrix size::
>>> states_matrix = np.asarray([
... [0, 0, 0, 0],
... [0, 1, 0, 0],
... [0, 1, 1, 0],
... [0, 0, 1, 0],
... [0, 0, 0, 0],
... ])
>>> margin_coords_from_states_matrix(states_matrix) == margins
True
"""
if not states_matrix.any():
c = Coords(0, 0)
return c, c
indices = np.array(np.where(states_matrix), dtype=np.int32).T
# return indices.min(0), indices.max(0)
return Coords(*indices.min(0)), Coords(*indices.max(0))
SheetId = namedtuple("SheetId", ("book", "ids"))
[docs]class ABCSheet(ABC):
"""
A delegating to backend factory and sheet-wrapper with utility methods.
:param np.ndarray _states_matrix:
The :term:`states-matrix` cached, so recreate object
to refresh it.
:param dict _margin_coords:
limits used by :func:`_resolve_cell`, cached, so recreate object
to refresh it.
Resource management is outside of the scope of this class,
and must happen in the backend workbook/sheet instance.
*xlrd* examples::
>>> import xlrd # doctest: +SKIP
>>> with xlrd.open_workbook(self.tmp) as wb: # doctest: +SKIP
... sheet = xleash.xlrdSheet(wb.sheet_by_name('Sheet1'))
... ## Do whatever
*win32* examples::
>>> with dsgdsdsfsd as wb: # doctest: +SKIP
... sheet = xleash.win32Sheet(wb.sheet['Sheet1'])
TODO: Win32 Sheet example
"""
_states_matrix = None
_margin_coords = None
[docs] def _close(self):
""" Override it to release resources for this sheet."""
[docs] def _close_all(self):
""" Override it to release resources this and all sibling sheets."""
[docs] @abstractmethod
def get_sheet_ids(self):
"""
:return: a 2-tuple of its wb-name and a sheet-ids of this sheet i.e. name & indx
:rtype: SheetId or None
"""
[docs] @abstractmethod
def open_sibling_sheet(self, sheet_id):
"""Return a sibling sheet by the given index or name"""
[docs] @abstractmethod
def list_sheetnames(self):
"""Return a list of names"""
[docs] @abstractmethod
def _read_states_matrix(self):
"""
Read the :term:`states-matrix` of the wrapped sheet.
:return: A 2D-array with `False` wherever cell are blank or empty.
:rtype: ndarray
"""
[docs] def get_states_matrix(self):
"""
Read and cache the :term:`states-matrix` of the wrapped sheet.
:return: A 2D-array with `False` wherever cell are blank or empty.
:rtype: ndarray
:raise: EmptyCaptureException if sheet empty
"""
if self._states_matrix is None:
self._states_matrix = self._read_states_matrix()
return self._states_matrix
[docs] @abstractmethod
def read_rect(self, st, nd):
"""
Fecth the actual values from the backend Excel-sheet.
:param Coords st:
the top-left edge, inclusive
:param Coords, None nd:
the bottom-right edge, inclusive(!); when `None`,
must return a scalar value.
:type nd: Coords, None
:return:
Depends on whether both coords are given:
- If both given, 2D list-lists with the values of the rect,
which might be empty if beyond limits.
- If only 1st given, the scalar value, and if
beyond margins, raise error!
:rtype: list
:raise: EmptyCaptureException (optionally) if sheet empty
"""
[docs] def _read_margin_coords(self):
"""
Override if possible to read (any of the) limits directly from the sheet.
:return: the 2 coords of the top-left & bottom-right full cells;
anyone coords can be None.
By default returns ``(None, None)``.
:rtype: (Coords, Coords)
:raise: EmptyCaptureException if sheet empty
"""
return None, None # pragma: no cover
[docs] def get_margin_coords(self):
"""
Extract (and cache) margins either internally or from :func:`margin_coords_from_states_matrix()`.
:return: the resolved top-left and bottom-right :class:`.xleash.Coords`
:rtype: tuple
:raise: EmptyCaptureException if sheet empty
"""
if not self._margin_coords:
up, dn = self._read_margin_coords()
if up is None or dn is None:
sm = self.get_states_matrix()
up1, dn1 = margin_coords_from_states_matrix(sm)
up = up or up1
dn = dn or dn1
self._margin_coords = up, dn
return self._margin_coords
[docs] def __repr__(self):
args = (type(self).__name__,) + self.get_sheet_ids()
return "%s(book=%r, sheet_ids=%r)" % args
[docs]class ArraySheet(ABCSheet):
"""A sample :class:`ABCSheet` made out of 2D-list or numpy-arrays, for facilitating tests."""
[docs] def __init__(self, arr, ids=SheetId("wb", ["sh", 0])):
self._arr = np.asarray(arr)
self._ids = ids
[docs] def open_sibling_sheet(self, sheet_id):
raise NotImplementedError()
[docs] def get_sheet_ids(self):
return self._ids
[docs] def list_sheetnames(self):
return [self._ids.ids[0]]
[docs] def _read_states_matrix(self):
if not self._arr.size:
raise _capture.EmptyCaptureException("empty sheet")
return ~np.equal(self._arr, None)
[docs] def read_rect(self, st, nd):
if not self._arr.size:
raise _capture.EmptyCaptureException("empty sheet")
if nd is None:
return self._arr[st]
rect = np.array([st, nd]) + [[0, 0], [1, 1]]
return self._arr[slice(*rect[:, 0]), slice(*rect[:, 1])].tolist()
[docs] def __repr__(self):
return "ArraySheet(%s, \n%s)" % (self.get_sheet_ids(), self._arr)