Source code for plums.plot.engine.color_engine

from copy import deepcopy
from collections import defaultdict

import ordered_set

from plums.plot.engine.descriptor import Descriptor
from plums.plot.engine.color import CategoricalColorMap, LightnessColorMap, Color
from plums.commons.data import RecordCollection


class CategoricalRecordCollection(RecordCollection):
    """Data model class which aggregates multiple |Record| together and split them according to a property value.

    It also implement list accessors and :meth:`append` to easily edit and access the |RecordCollection|.

    Args:
        monitored_property (str): The |Record| property to monitor for the collection split.
        *records (|Record|): |Record| instances to aggregate.

    Attributes:
        id (str): The |RecordCollection| id. If not provided in the constructor, it is automatically generated.
        records (list): Stored |Record| instances.

    """

    def __init__(self, monitored_property, *records, **kwargs):
        self._category_to_index = defaultdict(list)
        self._category_set = ordered_set.OrderedSet()
        super(CategoricalRecordCollection, self).__init__(*records, **kwargs)
        self._split(monitored_property)

    @property
    def categories(self):
        """set: The set of monitored categories found in the |RecordCollection|."""
        return self._category_set

    @classmethod
    def from_record_collection(cls, monitored_property, record_collection):
        """Create a |CategoricalRecordCollection| from a |RecordCollection|.

        Args:
            monitored_property (str): The |Record| property to monitor for the collection split.
            record_collection (|RecordCollection|): |RecordCollection| to split into a |CategoricalRecordCollection|.

        Returns:
            |CategoricalRecordCollection|: The resulting split collection.

        """
        return cls(monitored_property, *record_collection.records, id=record_collection.id)

    def _split(self, monitored_property):
        if self._category_to_index:
            raise ValueError('CategoricalRecordCollection was already split.')

        for i, record in enumerate(self.records):
            property_value = getattr(record, monitored_property)
            self._category_set.add(property_value)
            self._category_to_index[property_value].append(i)

    def __getitem__(self, item):
        """Access the i-th stored |Record| or the j-th stored |Record| from the i-th category.

        Returns:
            (|Record|, List[|Record|]): The specified |Record| instance or list of |Record|.

        Raises:
            IndexError: If ``index`` does is out of :attr:`records` range.

        """
        if isinstance(item, tuple):
            if len(item) > 2:
                raise IndexError('Invalid index provided: Expected at most 2 dimensions, got {}'.format(len(item)))

            if isinstance(item[0], slice):
                raise IndexError('Invalid index provided: First dimension does not support slice indexing.')

            if item[0] >= len(self.categories):
                return ()

            indices = self._category_to_index[self._category_set[item[0]]][item[1]]

            try:
                return [self.records[i] for i in indices]
            except TypeError:
                return self.records[indices]
        else:
            return super(CategoricalRecordCollection, self).__getitem__(item)

    @property
    def loc(self):
        """Access the i-th stored |Record| or the j-th stored |Record| from the ``category_key`` category.

        Returns:
            (|Record|, List[|Record|]): The specified |Record| instance or list of |Record|.

        Raises:
            IndexError: If ``index`` does is out of :attr:`records` range.

        """
        class _LocIndexer(object):
            def __init__(self, categorical_record_collection):
                self._collection = categorical_record_collection

            def __getitem__(self, item):
                if isinstance(item, tuple):
                    if len(item) > 2:
                        raise IndexError('Invalid index provided: Expected at most 2 dimensions, '
                                         'got {}'.format(len(item)))

                    if isinstance(item[0], slice):
                        raise IndexError('Invalid index provided: First dimension does not support slice indexing.')

                    return self._collection[self._collection.categories.index(item[0]), item[1]]
                else:
                    return self._collection[item]

            def __setitem__(self, key, value):
                if isinstance(key, tuple):
                    if len(key) > 2:
                        raise IndexError(
                            'Invalid index provided: Expected at most 2 dimensions, got {}'.format(len(key)))

                    if isinstance(key[0], slice):
                        raise IndexError('Invalid index provided: First dimension does not support slice indexing.')

                    self._collection[self._collection.categories.index(key[0]), key[1]] = value
                else:
                    self._collection[key] = value

        return _LocIndexer(self)

    def __setitem__(self, key, value):
        """Set the i-th stored |Record| or the j-th stored |Record| from the i-th category.

        Raises:
            IndexError: If ``index`` does is out of :attr:`records` range.

        """
        if isinstance(key, tuple):
            if len(key) > 2:
                raise IndexError('Invalid index provided: Expected at most 2 dimensions, got {}'.format(len(key)))

            if isinstance(key[0], slice):
                raise IndexError('Invalid index provided: First dimension does not support slice indexing.')

            indices = self._category_to_index[self._category_set[key[0]]][key[1]]

            try:
                for i, j in enumerate(indices):
                    self.records[j] = value[i]
            except TypeError:
                self.records[indices] = value
        else:
            super(CategoricalRecordCollection, self).__setitem__(key, value)


[docs]class ByCategoryDescriptor(Descriptor): """Wrapper class to split a given |Descriptor| interface calls along a set of categories. Args: monitored_property (str): The |Record| property to monitor for the collection and descriptor split. descriptor (|Descriptor|): A |Descriptor| to split along a set of categories. """ def __init__(self, monitored_property, descriptor): if not _is_descriptor(descriptor): raise TypeError('Invalid argument: descriptor is expected to be a Descriptor') super(ByCategoryDescriptor, self).__init__(name=descriptor.__descriptor__['name']) self._descriptor = descriptor self._monitored_property = monitored_property self._per_category_descriptors = defaultdict(lambda: deepcopy(self._descriptor)) @property def property_name(self): """str: The |Record| inserted property name after :meth:`compute` call.""" return self._descriptor.property_name @property def type(self): """str: Either ``categorical`` or ``continuous``. The enclosed |Descriptor| type.""" return self._descriptor.__descriptor__['type']
[docs] def update(self, *record_collections): """Update internal values of category-split |Descriptor| from |Record| descriptions. Args: *record_collections (|RecordCollection|): |RecordCollection| of which |Record| will be used to update internals. """ # Split RecordCollection per monitored category try: split_record_collections = \ tuple(CategoricalRecordCollection.from_record_collection(self._monitored_property, record_collection) for record_collection in record_collections) except AttributeError: raise ValueError('Invalid monitored record property name: {} was not found in record.'.format(self.name)) # Accumulate total monitored category range categories = set() for record_collection in split_record_collections: categories.update(record_collection.categories) # Update each sub descriptor with the RecordCollection subsets for category in categories: record_subcollections = tuple(RecordCollection(*record_collection.loc[category, :], id=record_collection.id) for record_collection in split_record_collections if record_collection.loc[category, :]) self._per_category_descriptors[category].update(*record_subcollections)
[docs] def compute(self, *record_collections): """Construct new |RecordCollection| where each enclosed |Record| is added a named description property. Args: *record_collections (|RecordCollection|): |RecordCollection| used to construct new |RecordCollection| with described |Record|. Returns: (|RecordCollection|, ): A described |RecordCollection| tuple. """ # Split RecordCollection per monitored category try: split_record_collections = \ tuple(CategoricalRecordCollection.from_record_collection(self._monitored_property, record_collection) for record_collection in record_collections) except AttributeError: raise ValueError('Invalid monitored record property name: {} was not found in record.'.format(self.name)) # Accumulate total monitored category range categories = set() for record_collection in split_record_collections: categories.update(record_collection.categories) # No categories implies empty record collections, then no need to add descriptor property if not categories: return record_collections # Delay init return tuple until we know its length record_collections_return = None # Compute each sub descriptor with the RecordCollection subsets for category in categories: sub_record_collections = tuple(RecordCollection(*record_collection.loc[category, :], id=record_collection.id) for record_collection in split_record_collections if record_collection.loc[category, :]) sub_record_collections_return = self._per_category_descriptors[category].compute(*sub_record_collections) # Init return tuple if record_collections_return is None: record_collections_return = [RecordCollection() for i in range(len(sub_record_collections_return))] # Concatenate RecordCollection subsets into a valid return tuple for i, sub_record_collection in enumerate(sub_record_collections_return): record_collections_return[i].records.extend(sub_record_collection.records) return tuple(record_collections_return)
[docs] def reset(self): """Reset |Descriptor| internals to factory values.""" self._per_category_descriptors = defaultdict(lambda: deepcopy(self._descriptor))
def _make_interface(self): schema = { str(category): descriptor.__descriptor__['schema'] for category, descriptor in self._per_category_descriptors.items() } return { 'type': "categorical", 'schema': schema }
def _is_descriptor(candidate): return hasattr(candidate, '__descriptor__') and \ hasattr(candidate, 'update') and \ hasattr(candidate, 'compute')
[docs]class ColorEngine(Descriptor): """A special |Descriptor| used to generate |Color| from one or two |Descriptor|. More specifically, a |ColorEngine| is used in two phases: * In a first time, enclosed |Descriptor| internals are updated and/or accumulated over the entire collection of |RecordCollection| tuples. * In a second time, the |ColorEngine| is used to construct a new |RecordCollection| where each enclosed |Record| is added a property called ``color`` containing a |Color| instance computed from the properties computed by the enclosed |Descriptor|. Args: main_descriptor (|Descriptor|): The principal |Descriptor|. It might be a *categorical* descriptor or a *continuous* descriptor, in which case no *secondary_descriptor* can be provided. secondary_descriptor (|Descriptor|): Optional. Default to None. A refinement |Descriptor|. It might be a *categorical* descriptor or a *continuous* descriptor. ctype (str): The color space the generated |Color| will live in (*e.g.* "sRGB255" or "JCh"). """ def __init__(self, main_descriptor, secondary_descriptor=None, ctype='sRGB255'): self._ctype = ctype if not _is_descriptor(main_descriptor): raise TypeError('Invalid argument: main_descriptor is expected to be a Descriptor') if secondary_descriptor is not None: if not _is_descriptor(secondary_descriptor): raise TypeError('Invalid argument: secondary_descriptor is expected to be a Descriptor') if main_descriptor.__descriptor__['type'] != 'categorical': raise TypeError('Invalid argument: main_descriptor is expected to be a CategoricalDescriptor') self._main_descriptor = main_descriptor self._secondary_descriptor = ByCategoryDescriptor(main_descriptor.property_name, secondary_descriptor) \ if secondary_descriptor else secondary_descriptor self.__primary_color_map = None self.__secondary_color_map = None super(ColorEngine, self).__init__('color_engine({})' ''.format(', '.join([interface['name'] for interface in [self._main_interface, self._secondary_interface] if interface is not None]))) @property def _primary_color_map(self): if self.__primary_color_map is None: self._make_color_maps() return self.__primary_color_map @_primary_color_map.setter def _primary_color_map(self, value): self.__primary_color_map = value @property def _secondary_color_map(self): if self.__primary_color_map is None: self._make_color_maps() return self.__secondary_color_map @_secondary_color_map.setter def _secondary_color_map(self, value): self.__secondary_color_map = value @property def _main_interface(self): return self._main_descriptor.__descriptor__ @property def _secondary_interface(self): if self._secondary_descriptor: return self._secondary_descriptor.__descriptor__ return None @property def property_name(self): """str: The |Record| inserted property name after :meth:`compute` call.""" return 'color' @property def ctype(self): """str: The color space the |ColorMap| lives in (*e.g.* "sRGB255" or "JCh"). See Also: For more information on valid color spaces, please see |Color|. """ return self._ctype @ctype.setter def ctype(self, new_ctype): self._ctype = new_ctype self._primary_color_map.ctype = new_ctype if self._secondary_color_map is not None: for key, colormap in self._secondary_color_map.items(): colormap.ctype = new_ctype
[docs] def update(self, *record_collections): """Update the enclosed |Descriptor| from |Record| property :attr:`name` value. Args: *record_collections (|RecordCollection|): |RecordCollection| of which |Record| will be used to update the enclosed |Descriptor|. """ self._main_descriptor.update(*record_collections) if self._secondary_descriptor is not None: self._secondary_descriptor.update(*self._main_descriptor.compute(*record_collections)) self.__primary_color_map = None self.__secondary_color_map = None
[docs] def compute(self, *record_collections): """Construct a new |RecordCollection| where each enclosed |Record| is added a |Color| as a property. Args: **record_collections (|RecordCollection|): |RecordCollection| used to construct new |RecordCollection| with described |Record|. Returns: (|RecordCollection|, ): A tuple containing one |RecordCollection| with colors. """ record_collections = self._main_descriptor.compute(*record_collections) if self._secondary_descriptor is not None: record_collections = self._secondary_descriptor.compute(*record_collections) if len(record_collections) > 1: raise ValueError('Invalid description: Expected only one record collection is expected after running the ' 'enclosed descriptors, got {}'.format(len(record_collections))) record_collection = record_collections[0] main_property = self._main_interface['property'] secondary_property = self._secondary_interface['property'] if self._secondary_descriptor else None for record in record_collection: main_value = getattr(record, main_property) secondary_value = getattr(record, secondary_property) if secondary_property else None if secondary_value is not None: color = self._secondary_color_map[main_value].get_color(secondary_value) else: color = self._primary_color_map.get_color(main_value) record.properties['color'] = color return record_collection,
[docs] def reset(self): """Reset |Descriptor| enclosed |Descriptor| to factory values.""" self._main_descriptor.reset() if self._secondary_descriptor is not None: self._secondary_descriptor.reset() self.__primary_color_map = None self.__secondary_color_map = None
def _make_interface(self): if self._secondary_descriptor is None: if self._main_interface['type'] == 'categorical': schema = { key: self._primary_color_map.get_color(self._main_interface['schema'][key]) for key in self._main_interface['schema'].keys() } else: schema = self._primary_color_map else: if self._secondary_descriptor.type == 'categorical': schema = \ { key: { s_key: self._secondary_color_map[value].get_color( self._secondary_interface['schema'][str(value)][s_key] ) for s_key in self._secondary_interface['schema'][str(value)].keys() } for key, value in self._main_interface['schema'].items() } else: schema = { key: self._secondary_color_map[value] for key, value in self._main_interface['schema'].items() } return { 'type': self._main_interface['type'], 'schema': schema } def _make_secondary_color_map(self, color, category): if self._secondary_descriptor.type == 'categorical': cmap = LightnessColorMap(color, ctype=self.ctype, lightness_range=(0.5, 0.7), data_range=(0, len(self._secondary_interface['schema'][category].keys()) - 1)) return cmap.discretize(len(self._secondary_interface['schema'][category].keys()) or 1) else: cmap = LightnessColorMap(color, ctype=self.ctype, lightness_range=(0.5, 0.7), data_range=self._secondary_interface['schema'][category]).discretize() return cmap def _make_color_maps(self): if self._main_interface['type'] == 'categorical': self._primary_color_map = CategoricalColorMap(len(self._main_interface['schema'].keys()) or 1, ctype=self.ctype) _schema = self._main_interface['schema'] if self._secondary_descriptor is not None: self._secondary_color_map = { value: self._make_secondary_color_map(self._primary_color_map.get_color(_schema[key]), str(value)) for key, value in _schema.items() } if self._main_interface['type'] == 'continuous': self._primary_color_map = LightnessColorMap(Color(50, 78, 50, ctype='JCh'), lightness_range=(0.8, 0.9), chroma_range=(-0.3, -0.1), data_range=self._main_interface['schema'], ctype=self.ctype).discretize()