Source code for evokit.core.variator

from __future__ import annotations

from typing import TYPE_CHECKING

from .accelerator import parallelise_task

from .accelerator.parallelisers import __getstate__
from .accelerator.parallelisers import __deepcopy__

if TYPE_CHECKING:
    from typing import Optional
    from typing import Sequence
    from typing import Self
    from typing import Type
    from concurrent.futures import ProcessPoolExecutor

from logging import warning
from abc import abstractmethod
from abc import ABC
from typing import Any
from typing import Generic, TypeVar

from .population import Individual, Population
from typing import override

D = TypeVar("D", bound=Individual[Any])


[docs] class Variator(ABC, Generic[D]): """Base class for all selectors. Derive this class to create custom selectors. Tutorial: :doc:`../guides/examples/onemax`. """
[docs] def __new__(cls: Type[Self], *args: Any, **kwargs: Any) -> Self: """Machinery. :meta private: """ instance: Self = super().__new__(cls) instance.arity = None instance.processes = None instance.share_self = False return instance
[docs] def __init__(self: Self, *args: Any, processes: Optional[int | ProcessPoolExecutor] = None, share_self: bool = False, **kwargs: Any) -> None: """ See :class:`Variator` for parameters :arg:`processes` and :arg:`share_self`. """ #: Size of input to :meth:`vary` self.arity: Optional[int] #: Multiprocessing capabilities. See :meth:`__init__`. self.processes = processes """If attributes of this object will be shared when multiprocessing. See :meth:`__init__`. """ self.share_self = share_self
[docs] @abstractmethod def vary(self: Self, parents: Sequence[D], *args: Any, **kwargs: Any) -> tuple[D, ...]: """Apply the variator to a tuple of parents Produce a tuple of individuals from a sequence of individuals. The length of :arg:`.parents` is at most :attr:`.arity`. """ pass
def _group_to_parents(self: Self, population: Population[D])\ -> Sequence[Sequence[D]]: """Machinery. :meta private: Divide the population into sequences of the given size. """ # Tuple magic. Zipping an iterable with itself extracts a tuple of # that size. The "discarding" behaviour is implemented this way. parent_groups: Sequence[Sequence[D]] if self.arity is None: raise TypeError("Variator does not specify arity," "cannot create parent groups") else: parent_groups = tuple(zip(*(iter(population),) * self.arity)) return parent_groups
[docs] def vary_population(self: Self, population: Population[D], *args: Any, **kwargs: Any) -> Population[D]: """Vary the population. The default implementation separates ``population`` into groups of size `.arity`, call `.vary` with each group as argument, then collect and returns the result. Args: population: Population to vary. .. note:: The default implementation calls :meth:`.Individual.reset_fitness` on each offspring to clear its fitness. Any implementation that overrides this method should do the same. """ parent_groups: Sequence[Sequence[D]] =\ self._group_to_parents(population) if len(parent_groups) == 0: warning("Something is wrong. Population has fewer" " individuals than what is necessary to procreate.") for group in parent_groups: for individual in group: individual.reset_fitness() processes = self.processes share_self = self.share_self nested_results: Sequence[tuple[D, ...]] =\ parallelise_task(fn=type(self).vary, self=self, iterable=parent_groups, processes=processes, share_self=share_self) next_population = Population(list(sum(nested_results, ()))) return next_population
__getstate__ = __getstate__ __deepcopy__ = __deepcopy__
[docs] class NullVariator(Variator[D]): """Variator that does not change anything """
[docs] def __init__(self: Self) -> None: self.arity = 1
[docs] @override def vary(self: Self, parents: Sequence[D]) -> tuple[D, ...]: return tuple(parents)