Skip to content

States

BaseState

Abstract base class for states.

Parameters:

Name Type Description Default
name str

Name of the state

required
is_actionable bool

If True, state can be changed by a policy

False
is_observable bool

If True, state can be observed by a policy

True
exception_on_nan bool

If True an exception is raised whenever an instance tries to access the value of the state and this value is None.

False
Source code in lineflow/simulation/states.py
class BaseState(metaclass=ABCMeta):
    """
    Abstract base class for states.

    Args:
        name (str): Name of the state
        is_actionable (bool): If `True`, state can be changed by a policy
        is_observable (bool): If `True`, state can be observed by a policy
        exception_on_nan (bool): If `True` an exception is raised whenever an instance tries to
            access the value of the state and this value is `None`.
    """

    def __init__(self, name, is_actionable=False, is_observable=True, exception_on_nan=False):
        self.name = name
        self.is_actionable = is_actionable
        self.is_observable = is_observable
        self._value = None
        self.exception_on_nan = exception_on_nan

    @property
    def value(self):
        """
        The (scalar) value of the state.
        """
        if self._value is None and self.exception_on_nan:
            raise ValueError('NAN value detected')
        return self._value

    def _change_value(self, value):
        """
        Method that has to be called whenever the value of the state should be changed.
        """
        self.assert_valid(value)
        self._value = value

    def apply(self, value):
        """
        Should be called from a policy to change the value of the state
        """
        assert self.is_actionable, 'Trying to set a non-action'
        self._change_value(value)

    def update(self, value):
        """
        Should be called from a `lineflow.simulation.base.LineObject` to change the value
        of the state
        """
        self._change_value(value)

    def to_str(self):
        return str(self.value)

    def print(self):
        return self.to_str()

    def reverse(self, series):
        """
        To be implemented for states that hold categorical values
        """
        return series

    def assert_valid(self, value):
        """
        Can be implemented by a downstream class to check whether value to be set is valid
        """
        pass

value property

The (scalar) value of the state.

apply(value)

Should be called from a policy to change the value of the state

Source code in lineflow/simulation/states.py
def apply(self, value):
    """
    Should be called from a policy to change the value of the state
    """
    assert self.is_actionable, 'Trying to set a non-action'
    self._change_value(value)

assert_valid(value)

Can be implemented by a downstream class to check whether value to be set is valid

Source code in lineflow/simulation/states.py
def assert_valid(self, value):
    """
    Can be implemented by a downstream class to check whether value to be set is valid
    """
    pass

reverse(series)

To be implemented for states that hold categorical values

Source code in lineflow/simulation/states.py
def reverse(self, series):
    """
    To be implemented for states that hold categorical values
    """
    return series

update(value)

Should be called from a lineflow.simulation.base.LineObject to change the value of the state

Source code in lineflow/simulation/states.py
def update(self, value):
    """
    Should be called from a `lineflow.simulation.base.LineObject` to change the value
    of the state
    """
    self._change_value(value)

CountState

Bases: NumericState

State to count discrete events.

Parameters:

Name Type Description Default
name str

Name of the state

required
vmin float

The allowed minimal value the state accepts

0
vmax float

The allowed maximal value the state accepts

inf
is_actionable bool

If True, state can be changed by a policy

False
is_observable bool

If True, state can be observed by a policy

True
exception_on_nan bool

If True an exception is raised whenever an instance tries to access the value of the state and this value is None.

False
Source code in lineflow/simulation/states.py
class CountState(NumericState):
    """
    State to count discrete events.

    Args:
        name (str): Name of the state
        vmin (float): The allowed minimal value the state accepts
        vmax (float): The allowed maximal value the state accepts
        is_actionable (bool): If `True`, state can be changed by a policy
        is_observable (bool): If `True`, state can be observed by a policy
        exception_on_nan (bool): If `True` an exception is raised whenever an instance tries to
            access the value of the state and this value is `None`.
    """
    def __init__(
        self,
        name,
        vmin=0,
        vmax=np.inf,
        is_actionable=False,
        is_observable=True,
        exception_on_nan=False,
    ):
        super().__init__(
            name=name,
            is_actionable=is_actionable,
            is_observable=is_observable,
            vmin=vmin,
            vmax=vmax,
            exception_on_nan=exception_on_nan,
        )

    def assert_valid(self, value):
        NumericState.assert_valid(self, value)
        assert int(value) == value, f"Value {value} is not integer"

Data

Bases: object

Source code in lineflow/simulation/states.py
class Data(object):
    def __init__(self, feature_names, observables=None):
        self.feature_names = feature_names

        if observables is None:
            # All features are observable
            observables = (len(self.feature_names))*[True]
        self.observables = np.array(observables)

        self.T = np.array([])
        self.X = np.array([]).reshape(0, len(feature_names))

        self.modes = []
        for feature in self.feature_names:
            if feature.endswith('mode'):
                self.modes.append(True)
            else:
                self.modes.append(False)

    def append(self, end_time, values):
        self.T = np.append(self.T, end_time)
        self.X = np.vstack([self.X, values])

    def get_modes(self, lookback=None):
        """
        Returns the percent of working mode of the cells of a line over the
        lookback period
        """
        if lookback is None:
            lookback = self.T.shape[0]

        return self.X[-lookback:, self.modes]

    def get_uptime(self, lookback=None):
        """
        Returns the percentage of the station being in working mode
        (mode=0) over the lookback period
        """
        modes = self.get_modes(lookback=lookback)
        uptimes = (modes == 0).mean(axis=0)
        return uptimes

    def get_observations(self, lookback=None, include_time=True):
        """
        Here, only observable values are returned
        """
        if lookback is None:
            lookback = self.T.shape[0]

        X = self.X[-lookback:, self.observables]

        if include_time:
            T = self.T[-lookback:].reshape(-1, 1)
            T = T - T.max()
            return np.hstack([X, T])
        else:
            return X

    def df(self):
        df = pd.DataFrame(
            data=self.X,
            columns=self.feature_names,
        )
        df['T_end'] = self.T
        return df

get_modes(lookback=None)

Returns the percent of working mode of the cells of a line over the lookback period

Source code in lineflow/simulation/states.py
def get_modes(self, lookback=None):
    """
    Returns the percent of working mode of the cells of a line over the
    lookback period
    """
    if lookback is None:
        lookback = self.T.shape[0]

    return self.X[-lookback:, self.modes]

get_observations(lookback=None, include_time=True)

Here, only observable values are returned

Source code in lineflow/simulation/states.py
def get_observations(self, lookback=None, include_time=True):
    """
    Here, only observable values are returned
    """
    if lookback is None:
        lookback = self.T.shape[0]

    X = self.X[-lookback:, self.observables]

    if include_time:
        T = self.T[-lookback:].reshape(-1, 1)
        T = T - T.max()
        return np.hstack([X, T])
    else:
        return X

get_uptime(lookback=None)

Returns the percentage of the station being in working mode (mode=0) over the lookback period

Source code in lineflow/simulation/states.py
def get_uptime(self, lookback=None):
    """
    Returns the percentage of the station being in working mode
    (mode=0) over the lookback period
    """
    modes = self.get_modes(lookback=lookback)
    uptimes = (modes == 0).mean(axis=0)
    return uptimes

DiscreteState

Bases: BaseState

State to handle discrete states, like integer numbers or categories.

Parameters:

Name Type Description Default
name str

Name of the state

required
categories list

List of values this state can take.

required
is_actionable bool

If True, state can be changed by a policy

False
is_observable bool

If True, state can be observed by a policy

True
exception_on_nan bool

If True an exception is raised whenever an instance tries to access the value of the state and this value is None.

False
Source code in lineflow/simulation/states.py
class DiscreteState(BaseState):
    """
    State to handle discrete states, like integer numbers or categories.

    Args:
        name (str): Name of the state
        categories (list): List of values this state can take.
        is_actionable (bool): If `True`, state can be changed by a policy
        is_observable (bool): If `True`, state can be observed by a policy
        exception_on_nan (bool): If `True` an exception is raised whenever an instance tries to
            access the value of the state and this value is `None`.
    """

    def __init__(
        self,
        name,
        categories,
        is_actionable=False,
        is_observable=True,
        exception_on_nan=False,
    ):
        super().__init__(
            name=name,
            is_actionable=is_actionable,
            is_observable=is_observable,
            exception_on_nan=exception_on_nan,
        )
        self.categories = categories
        self.n_categories = len(self.categories)
        self.values = np.arange(self.n_categories)

        self._mapping = dict(zip(self.categories, self.values))

    def update(self, value):
        mapped_value = self._mapping[value]
        self._change_value(mapped_value)

    def to_str(self):
        return self.categories[self.value]

    def reverse(self, series):
        return series.astype(int).apply(lambda i: self.categories[i])

    def assert_valid(self, value):
        assert not isinstance(value, bool), f"{value} should not be boolean, but one of {self.values}"
        assert value in self.values, f"{value}, not in {self.values}"

    def set_next(self):
        self._change_value((self.value+1) % self.n_categories)

LineStates

Bases: object

Bag of all ObjectStates of all LineObjectss of a line.

Parameters:

Name Type Description Default
objects dict

Dict where keys are the object name and the value are of type ObjectStates.

required
Source code in lineflow/simulation/states.py
class LineStates(object):
    """
    Bag of all ObjectStates of all [`LineObjects`][lineflow.simulation.states.LineStates]s of a line.

    Args:
        objects (dict): Dict where keys are the object name and the value are of type
            [`ObjectStates`][lineflow.simulation.states.ObjectStates].
    """

    def __init__(self, objects: dict, env):
        self.objects = objects
        self.env = env

        # Fix an ordering of the objects
        self.object_names = [
            name for name in self.objects.keys()
        ]

        # Fix an order of the features
        self.feature_names = []
        self.observables = []
        self.actionables = []
        for name in self.object_names:
            self.feature_names.extend(self.objects[name]._get_names_with_prefix(name))
            self.observables.extend(self.objects[name].observables)
            self.actionables.extend(self.objects[name].actionables)

        self.data = Data(
            feature_names=self.feature_names,
            observables=self.observables)

    @property
    def observable_features(self):
        return [f for f, o in zip(self.feature_names, self.observables) if o]

    @property
    def actionable_features(self):
        return [f for f, a in zip(self.feature_names, self.actionables) if a]

    def get_actions(self):
        """
        Returns a list of actions for policies to design valid outputs
        """
        actions = {}

        for object_name in self.object_names:

            object_states = []
            for state_name in self[object_name].names:
                state = self[object_name][state_name]
                if state.is_actionable:
                    object_states.append(state)

            if len(object_states) > 0:
                actions[object_name] = object_states
        return actions

    def __getitem__(self, name):
        return self.objects[name]

    def apply(self, values):
        for object_name, object_values in values.items():
            self[object_name].apply(object_values)

    def update(self, values):
        for object_name, object_values in values.items():
            self[object_name].update(object_values)

    @property
    def values(self):
        data = np.array([], dtype=np.float32)
        for name in self.object_names:
            data = np.append(data, self.objects[name].values)
        return data

    def log(self):
        """
        Appends the (current) values of all objects to the data class
        """

        self.data.append(
            end_time=self.env.now,
            values=self.values
        )

    def get_observations(self, lookback=None, include_time=True):
        return self.data.get_observations(lookback=lookback, include_time=include_time)

    def get_n_parts_produced(self):
        return int(sum(
            [v for f, v in self.to_dict().items() if f.endswith('n_parts_produced')]
        ))

    def get_n_scrap_parts(self):
        return int(sum(
            [v for f, v in self.to_dict().items() if f.endswith('n_scrap_parts')]
        ))

    def get_uptime(self, lookback=None):
        return self.data.get_uptime(lookback=lookback)

    def to_dict(self):
        return dict(zip(self.feature_names, self.values))

    def __iter__(self):
        for object_name in self.object_names:
            for state_name in self[object_name].names:
                yield object_name, state_name

    def df(self, reverse=True, lookback=None):
        """
        This function is expensive in time and should only be called after simulation is finished
        """
        df = self.data.df()

        if lookback is not None:
            df = df.iloc[-lookback:]

        if reverse:
            for object_name, state_name in self:
                state = self[object_name][state_name]
                feature = f"{object_name}_{state_name}"
                if isinstance(state, DiscreteState) or isinstance(state, TokenState):
                    df[feature] = state.reverse(df[feature])

        df['T_start'] = df['T_end'].shift(1).fillna(0.0)
        return df

df(reverse=True, lookback=None)

This function is expensive in time and should only be called after simulation is finished

Source code in lineflow/simulation/states.py
def df(self, reverse=True, lookback=None):
    """
    This function is expensive in time and should only be called after simulation is finished
    """
    df = self.data.df()

    if lookback is not None:
        df = df.iloc[-lookback:]

    if reverse:
        for object_name, state_name in self:
            state = self[object_name][state_name]
            feature = f"{object_name}_{state_name}"
            if isinstance(state, DiscreteState) or isinstance(state, TokenState):
                df[feature] = state.reverse(df[feature])

    df['T_start'] = df['T_end'].shift(1).fillna(0.0)
    return df

get_actions()

Returns a list of actions for policies to design valid outputs

Source code in lineflow/simulation/states.py
def get_actions(self):
    """
    Returns a list of actions for policies to design valid outputs
    """
    actions = {}

    for object_name in self.object_names:

        object_states = []
        for state_name in self[object_name].names:
            state = self[object_name][state_name]
            if state.is_actionable:
                object_states.append(state)

        if len(object_states) > 0:
            actions[object_name] = object_states
    return actions

log()

Appends the (current) values of all objects to the data class

Source code in lineflow/simulation/states.py
def log(self):
    """
    Appends the (current) values of all objects to the data class
    """

    self.data.append(
        end_time=self.env.now,
        values=self.values
    )

NumericState

Bases: BaseState

State to handle numeric values.

Parameters:

Name Type Description Default
name str

Name of the state

required
vmin float

The allowed minimal value the state accepts

-inf
vmax float

The allowed maximal value the state accepts

inf
is_actionable bool

If True, state can be changed by a policy

False
is_observable bool

If True, state can be observed by a policy

True
exception_on_nan bool

If True an exception is raised whenever an instance tries to access the value of the state and this value is None.

False
Source code in lineflow/simulation/states.py
class NumericState(BaseState):
    """
    State to handle numeric values.

    Args:
        name (str): Name of the state
        vmin (float): The allowed minimal value the state accepts
        vmax (float): The allowed maximal value the state accepts
        is_actionable (bool): If `True`, state can be changed by a policy
        is_observable (bool): If `True`, state can be observed by a policy
        exception_on_nan (bool): If `True` an exception is raised whenever an instance tries to
            access the value of the state and this value is `None`.
    """
    def __init__(
        self,
        name,
        vmin=-np.inf,
        vmax=np.inf,
        is_actionable=False,
        is_observable=True,
        exception_on_nan=False,
    ):
        super().__init__(
            name=name,
            is_actionable=is_actionable,
            is_observable=is_observable,
            exception_on_nan=exception_on_nan,
        )
        self.vmin = vmin
        self.vmax = vmax

    def assert_valid(self, value):
        assert (
            (self.vmin <= value) and
            (value <= self.vmax)
        ), f'Violated: {self.vmin}<={value}<={self.vmax}'

    def increment(self):
        """
        Increments the value by 1
        """
        self._change_value(self.value+1)

    def decrement(self):
        """
        Decrements the value by 1
        """
        self._change_value(self.value-1)

decrement()

Decrements the value by 1

Source code in lineflow/simulation/states.py
def decrement(self):
    """
    Decrements the value by 1
    """
    self._change_value(self.value-1)

increment()

Increments the value by 1

Source code in lineflow/simulation/states.py
def increment(self):
    """
    Increments the value by 1
    """
    self._change_value(self.value+1)

ObjectStates

Bases: object

Bag of all states of a LineObject

Parameters:

Name Type Description Default
states list

List of BaseState objects.

()
Source code in lineflow/simulation/states.py
class ObjectStates(object):
    """
    Bag of all states of a LineObject

    Args:
        states (list): List of [`BaseState`][lineflow.simulation.states.BaseState] objects.
    """

    def __init__(self, *states):
        self.names = [s.name for s in states]
        self.states = {
            state.name: state for state in states
        }

        self.observables = [self.states[n].is_observable for n in self.names]
        self.actionables = [self.states[n].is_actionable for n in self.names]

    def apply(self, values):
        """
        Applies the values to all states

        Args:
            values (dict): Dict where the keys are the names of the internal states and the values
                are the values to be applied.
        """
        for name, value in values.items():
            self[name].apply(value)

    def update(self, values):
        """
        Updates the values of all states

        Args:
            values (dict): Dict where the keys are the names of the internal states and the values
                are the values to be updated.
        """
        # TODO: Ugly code duplicate
        for name, value in values.items():
            self[name].update(value)

    @property
    def values(self):
        return np.array([self.states[n].value for n in self.names])

    def __getitem__(self, name):
        return self.states[name]

    def _get_names_with_prefix(self, prefix=None):
        prefix = "" if prefix is None else f"{prefix}_"
        return [f"{prefix}{n}" for n in self.names]

    def to_dict(self, prefix=None):
        return dict(
            zip(
                self._get_names_with_prefix(prefix),
                self.values
            )
        )

apply(values)

Applies the values to all states

Parameters:

Name Type Description Default
values dict

Dict where the keys are the names of the internal states and the values are the values to be applied.

required
Source code in lineflow/simulation/states.py
def apply(self, values):
    """
    Applies the values to all states

    Args:
        values (dict): Dict where the keys are the names of the internal states and the values
            are the values to be applied.
    """
    for name, value in values.items():
        self[name].apply(value)

update(values)

Updates the values of all states

Parameters:

Name Type Description Default
values dict

Dict where the keys are the names of the internal states and the values are the values to be updated.

required
Source code in lineflow/simulation/states.py
def update(self, values):
    """
    Updates the values of all states

    Args:
        values (dict): Dict where the keys are the names of the internal states and the values
            are the values to be updated.
    """
    # TODO: Ugly code duplicate
    for name, value in values.items():
        self[name].update(value)

TokenState

Bases: BaseState

State to handle discrete objects where its not clear from the begining which and how many objects need to be tracked.

Source code in lineflow/simulation/states.py
class TokenState(BaseState):
    """
    State to handle discrete objects where its not clear from the begining which and how many
    objects need to be tracked.
    """
    def __init__(self, name, mapping=None, is_actionable=False, is_observable=False):
        super().__init__(name=name, is_actionable=is_actionable, is_observable=is_observable)

        if mapping is None:
            self._mapping = {}
        else:
            self._mapping = mapping
        self.tokens = []

    def _get_next_value(self):

        if len(self._mapping.values()) == 0:
            return 0
        else:
            return max(self._mapping.values())+1

    def assert_valid(self, value):
        assert value in self._mapping.values()

    def update(self, token):
        if token not in self._mapping:
            # Generate new id for this token
            self._mapping[token] = self._get_next_value()
            self.tokens.append(token)

        value = self._mapping[token]
        self._change_value(value)

    def reverse(self, series):
        return series.astype(int).apply(lambda i: self.tokens[i])

Simulation

Line

Parameters:

Name Type Description Default
realtime bool

Only if visualize is True

False
factor float

visualization speed

0.5
info list

A list of line data that is retrivable over the get_info() method. That is info = [("A1", n_workers), ("A3", "assembly_time")]. Data will be logged in experiments.

None
Source code in lineflow/simulation/line.py
class Line:
    """
    Args:
        realtime (bool): Only if `visualize` is `True`
        factor (float): visualization speed
        info (list): A list of line data that is retrivable over the get_info() method.
            That is `info = [("A1", n_workers), ("A3", "assembly_time")]`.
            Data will be logged in experiments.
    """

    def __init__(
        self,
        realtime=False,
        factor=0.5,
        random_state=10,
        step_size=1,
        scrap_factor=1,
        info=None,
    ):

        # TODO: This attribute needs to be refactored in future as it is only used by the
        # gym-simulation
        self.scrap_factor = scrap_factor
        self.realtime = realtime
        self.factor = factor
        self.step_size = step_size
        if info is None:
            info = []
        self._info = info

        self.reset(random_state=random_state)

    @property
    def name(self):
        return self.__class__.__name__

    def info(self):
        """
        Returns additional Information about the line
        """
        general = {
            "name": self.name,
            "T": self.env.now,
            "n_parts": self.get_n_parts_produced(),
            "n_scrap_parts": self.get_n_scrap_parts(),
        }

        additional = {
            f"{station}_{attribute}": self.state.objects[station].states[attribute].value
            for station, attribute in self._info
        }
        return {**general, **additional}

    def _make_env(self):
        if self.realtime:
            self.env = simpy.rt.RealtimeEnvironment(factor=self.factor, strict=False)
        else:
            self.env = simpy.Environment()

    def _make_objects(self):
        """
        Builds the LineObjects
        """
        # Build the stations and connectors
        with StationaryObject() as objects:
            self.build()

        self._objects = {}

        for obj in objects:
            if obj.name in self._objects:
                raise ValueError(f'Multiple objects with name {obj.name} exist')
            self._objects[obj.name] = obj

    def _build_states(self):
        """
        Builds the states of the line objects as well as the LineState
        """
        object_states = {}

        for name, obj in self._objects.items():
            obj.init(self.random)
            object_states[name] = obj.state

        self.state = LineStates(object_states, self.env)

    def reset(self, random_state=None):
        """
        Resets the simulation.
        """
        self.random = np.random.RandomState(random_state)
        self._make_env()
        self._make_objects()

        self._build_states()
        self._register_objects_at_env()

        self.end_step = 0
        self.env.process(self.step_event())

    def _assert_one_sink(self):
        if len([c for c in self._objects.values() if isinstance(c, Sink)]) != 1:
            raise ValueError(
                "Number of sinks does not match"
                "Currently, only scenarios with exactly one sink are allowed"
            )

    def get_sink(self):
        sinks = [s for s in self._objects.values() if isinstance(s, Sink)]
        self._assert_one_sink()
        return sinks[0]

    def get_n_scrap_parts(self):
        """
        Returns the number of produced parts up to now
        """
        return self.state.get_n_scrap_parts()

    def get_n_parts_produced(self):
        """
        Returns the number of produced parts up to now
        """
        return self.state.get_n_parts_produced()

    def get_uptime(self, lookback=None):
        """
        Returns the uptime of the line 
        """
        return self.state.get_uptime(lookback=lookback)

    def build(self):
        """
        This function should add objects of the LineObject class as attributes
        """
        raise NotImplementedError()

    def _register_objects_at_env(self):
        """
        Registers all line objects at the simpy simulation environment.
        """
        for o in self._objects.values():
            o.register(self.env)

    def _draw(self, screen, actions=None):

        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                break

        screen.fill('white')

        font = pygame.font.SysFont(None, 20)

        time = font.render('T={:.2f}'.format(self.env.now), True, 'black')
        n_parts = font.render(
            f'#Parts={self.get_n_parts_produced()}', True, 'black'
        )

        screen.blit(time, time.get_rect(center=(30, 30)))
        screen.blit(n_parts, n_parts.get_rect(center=(30, 50)))

        # Draw objects, first connectors, then stations
        self._draw_connectors(screen)
        self._draw_stations(screen)
        if actions:
            self._draw_actions(screen, actions)
        pygame.display.flip()

    def _draw_actions(self, screen, actions):
        font = pygame.font.SysFont(None, 20)
        actions = font.render(f'{actions}', True, 'black')
        screen.blit(actions, actions.get_rect(center=(500, 30)))
        pygame.display.flip()

    def _draw_stations(self, screen):
        self._draw_objects_of_type(screen, Station)

    def _draw_connectors(self, screen):
        self._draw_objects_of_type(screen, Connector)

    def _draw_objects_of_type(self, screen, object_type):
        for name, obj in self._objects.items():
            if isinstance(obj, object_type):
                obj._draw(screen)

    def setup_draw(self):
        pygame.init()
        x = []
        y = []
        for o in self._objects.values():
            o.setup_draw()
            if isinstance(o, Station):
                assert hasattr(o, "position"), f"Please provide position for {Station.name}"
                x.append(o.position[0])
                y.append(o.position[1])

        return pygame.display.set_mode((max(x) + 100, max(y) + 100))

    def teardown_draw(self):
        pygame.quit()

    def apply(self, values):
        for object_name in values.keys():
            self._objects[object_name].apply(values[object_name])

    def step(self, simulation_end=None):
        """
        Step to the next state of the line
        Args:
            simulation_end (int):
                Time until terminated flag is returned as True. If None
                terminated is always False.
        """
        terminated = False

        # The end of the the current step, excluding the event execution
        # i.e. execute all events where scheudled_time < end_step
        self.end_step = self.end_step + self.step_size

        while True:
            if self.env.peek() > self.end_step:
                self.state.log()
                # If the next event is scheduled after simulation end
                if simulation_end is not None and self.env.peek() > simulation_end:
                    terminated = True

                return self.state, terminated

            self.env.step()

    def step_event(self):
        """
        Ensures that there is an Event scheduled for `self.step_size` intervals
        The step function is only able to stop the simulation if an Event is scheduled.
        """
        while True:
            yield self.env.timeout(self.step_size)

    def run(
        self,
        simulation_end,
        agent=None,
        show_status=True,
        visualize=False,
        capture_screen=False,
    ):
        """
        Args:
            simulation_end (float): Time until the simulation stops
            agent (lineflow.models.reinforcement_learning.agents): An Agent that interacts with a
                line. Can also be just a policy if an __call__ method exists like in the BaseAgent
                class.
            show_status (bool): Show progress bar for each simulation episode
            visualize (bool): If true, line visualization is opened
            capture_screen (bool): Captures last Time frame when screen should be recorded
        """

        if visualize:
            # Stations first, then connectors
            screen = self.setup_draw()

        # Register objects when simulation is initially started
        if len(self.env._queue) == 0:
            self._register_objects_at_env()

        now = 0
        actions = None
        pbar = tqdm(
            total=simulation_end,
            bar_format='{desc}: {percentage:3.2f}%|{bar:50}|',
            disable=not show_status,
        )

        while self.env.now < simulation_end:
            pbar.update(self.env.now - now)
            now = self.env.now
            try:
                self.step()
            except simpy.core.EmptySchedule:
                logger.warning('Simulation in dead-lock - end early')
                break

            if agent is not None:
                actions = agent(self.state, self.env)
                self.apply(actions)

            if visualize:
                if actions is not None:
                    self._draw(screen, actions)
                else:
                    self._draw(screen)

        if capture_screen and visualize:
            pygame.image.save(screen, f"{self.name}.png")

        if visualize:
            self.teardown_draw()

    def get_observations(self, object_name=None):
        """
        """

        df = self.state.df()

        if object_name is None:
            return df
        else:
            cols = [c for c in df.columns if c.startswith(object_name)]
            cols = cols + ['T_start', 'T_end']
            return df[cols].rename(
                columns={
                    c: c.replace(object_name + '_', '') for c in cols
                }
            )

    def __getitem__(self, name):
        return self._objects[name]

build()

This function should add objects of the LineObject class as attributes

Source code in lineflow/simulation/line.py
def build(self):
    """
    This function should add objects of the LineObject class as attributes
    """
    raise NotImplementedError()

get_n_parts_produced()

Returns the number of produced parts up to now

Source code in lineflow/simulation/line.py
def get_n_parts_produced(self):
    """
    Returns the number of produced parts up to now
    """
    return self.state.get_n_parts_produced()

get_n_scrap_parts()

Returns the number of produced parts up to now

Source code in lineflow/simulation/line.py
def get_n_scrap_parts(self):
    """
    Returns the number of produced parts up to now
    """
    return self.state.get_n_scrap_parts()

get_observations(object_name=None)

Source code in lineflow/simulation/line.py
def get_observations(self, object_name=None):
    """
    """

    df = self.state.df()

    if object_name is None:
        return df
    else:
        cols = [c for c in df.columns if c.startswith(object_name)]
        cols = cols + ['T_start', 'T_end']
        return df[cols].rename(
            columns={
                c: c.replace(object_name + '_', '') for c in cols
            }
        )

get_uptime(lookback=None)

Returns the uptime of the line

Source code in lineflow/simulation/line.py
def get_uptime(self, lookback=None):
    """
    Returns the uptime of the line 
    """
    return self.state.get_uptime(lookback=lookback)

info()

Returns additional Information about the line

Source code in lineflow/simulation/line.py
def info(self):
    """
    Returns additional Information about the line
    """
    general = {
        "name": self.name,
        "T": self.env.now,
        "n_parts": self.get_n_parts_produced(),
        "n_scrap_parts": self.get_n_scrap_parts(),
    }

    additional = {
        f"{station}_{attribute}": self.state.objects[station].states[attribute].value
        for station, attribute in self._info
    }
    return {**general, **additional}

reset(random_state=None)

Resets the simulation.

Source code in lineflow/simulation/line.py
def reset(self, random_state=None):
    """
    Resets the simulation.
    """
    self.random = np.random.RandomState(random_state)
    self._make_env()
    self._make_objects()

    self._build_states()
    self._register_objects_at_env()

    self.end_step = 0
    self.env.process(self.step_event())

run(simulation_end, agent=None, show_status=True, visualize=False, capture_screen=False)

Parameters:

Name Type Description Default
simulation_end float

Time until the simulation stops

required
agent agents

An Agent that interacts with a line. Can also be just a policy if an call method exists like in the BaseAgent class.

None
show_status bool

Show progress bar for each simulation episode

True
visualize bool

If true, line visualization is opened

False
capture_screen bool

Captures last Time frame when screen should be recorded

False
Source code in lineflow/simulation/line.py
def run(
    self,
    simulation_end,
    agent=None,
    show_status=True,
    visualize=False,
    capture_screen=False,
):
    """
    Args:
        simulation_end (float): Time until the simulation stops
        agent (lineflow.models.reinforcement_learning.agents): An Agent that interacts with a
            line. Can also be just a policy if an __call__ method exists like in the BaseAgent
            class.
        show_status (bool): Show progress bar for each simulation episode
        visualize (bool): If true, line visualization is opened
        capture_screen (bool): Captures last Time frame when screen should be recorded
    """

    if visualize:
        # Stations first, then connectors
        screen = self.setup_draw()

    # Register objects when simulation is initially started
    if len(self.env._queue) == 0:
        self._register_objects_at_env()

    now = 0
    actions = None
    pbar = tqdm(
        total=simulation_end,
        bar_format='{desc}: {percentage:3.2f}%|{bar:50}|',
        disable=not show_status,
    )

    while self.env.now < simulation_end:
        pbar.update(self.env.now - now)
        now = self.env.now
        try:
            self.step()
        except simpy.core.EmptySchedule:
            logger.warning('Simulation in dead-lock - end early')
            break

        if agent is not None:
            actions = agent(self.state, self.env)
            self.apply(actions)

        if visualize:
            if actions is not None:
                self._draw(screen, actions)
            else:
                self._draw(screen)

    if capture_screen and visualize:
        pygame.image.save(screen, f"{self.name}.png")

    if visualize:
        self.teardown_draw()

step(simulation_end=None)

Step to the next state of the line Args: simulation_end (int): Time until terminated flag is returned as True. If None terminated is always False.

Source code in lineflow/simulation/line.py
def step(self, simulation_end=None):
    """
    Step to the next state of the line
    Args:
        simulation_end (int):
            Time until terminated flag is returned as True. If None
            terminated is always False.
    """
    terminated = False

    # The end of the the current step, excluding the event execution
    # i.e. execute all events where scheudled_time < end_step
    self.end_step = self.end_step + self.step_size

    while True:
        if self.env.peek() > self.end_step:
            self.state.log()
            # If the next event is scheduled after simulation end
            if simulation_end is not None and self.env.peek() > simulation_end:
                terminated = True

            return self.state, terminated

        self.env.step()

step_event()

Ensures that there is an Event scheduled for self.step_size intervals The step function is only able to stop the simulation if an Event is scheduled.

Source code in lineflow/simulation/line.py
def step_event(self):
    """
    Ensures that there is an Event scheduled for `self.step_size` intervals
    The step function is only able to stop the simulation if an Event is scheduled.
    """
    while True:
        yield self.env.timeout(self.step_size)

Assembly

Bases: Station

Assembly takes a carrier from buffer_in and buffer_component, puts the parts of the component carrier on the carrier that came from buffer_in, and pushes that carrier to buffer_out and pushes the component carrier to buffer_return if a buffer return exists, otherwise these carriers are lost.

Parameters:

Name Type Description Default
name str

Name of the station

required
processing_time float

Time until parts are moved from component carrier to main carrier

5
position tuple

X and Y position in the visualization

None
buffer_return Buffer

The buffer to put the old component carriers on

None
processing_std float

The standard deviation of the processing time

None
Source code in lineflow/simulation/stations.py
class Assembly(Station):
    """
    Assembly takes a carrier from `buffer_in` and `buffer_component`, puts the parts of the component
    carrier on the carrier that came from buffer_in, and pushes that carrier to buffer_out and
    pushes the component carrier to buffer_return if a buffer return exists, otherwise these
    carriers are lost.

    Args:
        name (str): Name of the station
        processing_time (float): Time until parts are moved from component carrier to main carrier
        position (tuple): X and Y position in the visualization
        buffer_return (lineflow.simulation.connectors.Buffer): The buffer to
            put the old component carriers on
        processing_std (float): The standard deviation of the processing time
    """
    def __init__(
        self,
        name,
        buffer_in=None,
        buffer_out=None,
        buffer_component=None,
        processing_time=5,
        position=None,
        buffer_return=None,
        processing_std=None,
        NOK_part_error_time=2,
        worker_pool=None,
    ):

        super().__init__(
            name=name,
            position=position,
            processing_time=processing_time,
            processing_std=processing_std,
            worker_pool=worker_pool,
        )
        self.NOK_part_error_time = NOK_part_error_time

        if buffer_in is not None:
            self._connect_to_input(buffer_in)

        if buffer_out is not None:
            self._connect_to_output(buffer_out)

        if buffer_component is not None:
            self.buffer_component = buffer_component.connect_to_output(self)

        if buffer_return is not None:
            self.buffer_return = buffer_return.connect_to_input(self)

    def init_state(self):
        self.state = ObjectStates(
            DiscreteState('on', categories=[True, False], is_actionable=False, is_observable=False),
            DiscreteState('mode', categories=['working', 'waiting', 'failing']),
            TokenState(name='carrier', is_observable=False),
            TokenState(name='carrier_component', is_observable=False),
            CountState('n_scrap_parts', is_actionable=False, is_observable=True),
            CountState('n_workers', is_actionable=False, is_observable=True, vmin=0),
            NumericState('processing_time', is_actionable=False, is_observable=True, vmin=0),
        )
        self.state['on'].update(True)
        self.state['mode'].update("waiting")
        self.state['carrier'].update(None)
        self.state['carrier_component'].update(None)
        self.state['n_scrap_parts'].update(0)
        self.state['processing_time'].update(self.processing_time)
        self.state['n_workers'].update(self.n_workers)

    def connect_to_component_input(self, station, *args, **kwargs):
        buffer = Buffer(name=f"Buffer_{station.name}_to_{self.name}", *args, **kwargs)
        self.buffer_component = buffer.connect_to_output(self)
        station._connect_to_output(buffer)

    def connect_to_component_return(self, station, *args, **kwargs):
        buffer = Buffer(name=f"Buffer_{self.name}_to_{station.name}", *args, **kwargs)
        self.buffer_return = buffer.connect_to_input(self)
        station._connect_to_input(buffer)

    def _has_invalid_components_on_carrier(self, carrier):
        """
        Checks if any of the components on the carrier is not valid for assembly. In this case,
        `True` is returned. Otherwise, `False` is returned.
        """
        for component in carrier:
            if not component.is_valid_for_assembly():
                return True
        return False

    def _draw_info(self, screen):
        self._draw_n_workers(screen)

    def run(self):

        while True:
            if self.is_on():

                yield self.env.process(self.request_workers())
                self.state['n_workers'].update(self.n_workers)
                # Wait to get part from buffer_in
                yield self.env.process(self.set_to_waiting())
                carrier = yield self.env.process(self.buffer_in())

                # Update current_carrier and count parts of carrier
                self.state['carrier'].update(carrier.name)

                # Run until carrier with components each having a valid assembly condition is
                # received
                while True:
                    # Wait to get component
                    carrier_component = yield self.env.process(self.buffer_component())
                    self.state['carrier_component'].update(carrier_component.name)

                    # Check component
                    if self._has_invalid_components_on_carrier(carrier_component):
                        yield self.env.process(self.set_to_error())
                        yield self.env.timeout(self.NOK_part_error_time)
                        self.state['n_scrap_parts'].increment()

                        # send carrier back
                        if hasattr(self, 'buffer_return'):
                            carrier_component.parts.clear()
                            yield self.env.process(self.buffer_return(carrier_component))
                        yield self.env.process(self.set_to_waiting())
                        continue

                    else:
                        # All components are valid, proceed with assembly
                        break

                # Process components
                yield self.env.process(self.set_to_work())
                processing_time = self._sample_exp_time(
                    time=self.processing_time,
                    scale=self.processing_std,
                )
                yield self.env.timeout(processing_time)
                self.state['processing_time'].update(processing_time)

                for component in carrier_component:
                    carrier.assemble(component)

                # Release workers
                self.release_workers()

                # Place carrier on buffer_out
                yield self.env.process(self.set_to_waiting())
                yield self.env.process(self.buffer_out(carrier))
                self.state['carrier'].update(None)

                # send component carrier back
                if hasattr(self, 'buffer_return'):
                    yield self.env.process(self.buffer_return(carrier_component))

                self.state['carrier_component'].update(None)

            else:
                yield self.turn_off()

Magazine

Bases: Station

Magazine station manages carriers.

The Magazine gets carriers from buffer_in and stores them in the magazine. Afterwards it takes a carrier from its magazine and pushes the carrier to buffer_out. If unlimited_carriers is True no buffer_in is needed.

Parameters:

Name Type Description Default
unlimited_carriers bool

If True, the Magazine will have an unlimited amount of carriers available

True
carriers_in_magazine int

Number of carriers in the magazine

0
carrier_getting_time float

Time to get a carrier from the magazine

2
actionable_magazine bool

If True, carriers in the magazine is in an actionable state

True
carrier_capacity int

Defines how many parts can be assembled on a carrier. If set to default (infinity) or > 15, carrier will be visualized with one part.

inf
Source code in lineflow/simulation/stations.py
class Magazine(Station):
    '''
    Magazine station manages carriers.

    The Magazine gets carriers from buffer_in and stores them in the
    magazine. Afterwards it takes a carrier from its magazine and pushes the
    carrier to buffer_out.
    If unlimited_carriers is True no buffer_in is needed.

    Args:
        unlimited_carriers (bool): If True, the Magazine will have an unlimited amount of carriers available
        carriers_in_magazine (int): Number of carriers in the magazine
        carrier_getting_time (float): Time to get a carrier from the magazine
        actionable_magazine (bool): If True, carriers in the magazine is in an actionable state
        carrier_capacity (int): Defines how many parts can be assembled on a carrier. If set to default (infinity) or > 15, carrier will be visualized with one part.
    '''
    def __init__(
        self,
        name,
        buffer_in=None,
        buffer_out=None,
        position=None,
        unlimited_carriers=True,
        carrier_capacity=np.inf,
        actionable_magazine=True,
        carrier_getting_time=2,
        carriers_in_magazine=0,
    ):
        super().__init__(
            name=name,
            position=position,
        )
        self._assert_init_args(buffer_in, unlimited_carriers, carriers_in_magazine, carrier_capacity)

        if buffer_in is not None:
            self._connect_to_input(buffer_in)
        if buffer_out is not None:
            self._connect_to_output(buffer_out)

        self.actionable_magazine = actionable_magazine
        self.init_carriers_in_magazine = carriers_in_magazine
        self.carrier_getting_time = carrier_getting_time
        self.unlimited_carriers = unlimited_carriers
        self.carrier_capacity = carrier_capacity

        self.carrier_id = 1

    def init_state(self):

        self.state = ObjectStates(
            DiscreteState('on', categories=[True, False], is_actionable=False, is_observable=False),
            DiscreteState('mode', categories=['working', 'waiting', 'failing']),
            CountState('carriers_in_magazine', is_actionable=self.actionable_magazine, is_observable=True),
            TokenState(name='carrier', is_observable=False),
            TokenState(name='part', is_observable=False),
        )

        self.state['carriers_in_magazine'].update(self.init_carriers_in_magazine)
        self.state['on'].update(True)
        self.state['mode'].update("waiting")
        self.state['carrier'].update(None)
        self.state['part'].update(None)

    def _assert_init_args(self, buffer_in, unlimited_carriers, carriers_in_magazine, carrier_capacity):
        if carrier_capacity > 15:
            warnings.warn("If carrier_capacity > 15, visualization of parts on carriers is restriced and carrier will be visualized with one part")

        if not isinstance(carrier_capacity, int) and carrier_capacity != np.inf:
            raise AttributeError("Type of carrier capacity must be int or np.inf")

        if not unlimited_carriers and carriers_in_magazine == 0:
            raise AttributeError(f"unlimited_carriers is {unlimited_carriers} and cell also has 0 carriers in magazine")

        if unlimited_carriers and carriers_in_magazine > 0:
            raise AttributeError(f"unlimited_carriers is {unlimited_carriers} and cell has more than 0 carriers in magazine")

        if buffer_in and unlimited_carriers:
                raise AttributeError(f"Only magazine or unlimited_carriers {unlimited_carriers} is required")

    def create_carrier(self):
        name = f'{self.name}_cr_{self.carrier_id}'
        carrier = Carrier(self.env, name=name, capacity=self.carrier_capacity)
        self.carrier_id += 1
        return carrier

    def _initial_fill_magazine(self, n_carriers):
        # attribute needs to be set here as env is not available in __init__()
        self.magazine = simpy.Store(self.env)
        for i in range(n_carriers):
            carrier = self.create_carrier()
            self.magazine.put(carrier)

    def get_carrier_from_magazine(self):
        yield self.env.process(self._update_magazine())
        yield self.env.timeout(self.carrier_getting_time)

        while True:
            yield self.env.process(self._update_magazine())
            yield self.env.process(self.set_to_work())
            if len(self.magazine.items) > 0:
                carrier = yield self.magazine.get()
                break
            else:
                yield self.env.process(self.set_to_waiting())
                yield self.env.timeout(1)

        self.state['carriers_in_magazine'].decrement()
        return carrier

    def _buffer_in_to_magazine(self):
        while True:
            carrier = yield self.env.process(self.buffer_in())
            yield self.env.process(self.add_carrier_to_magazine(carrier))

    def add_carrier_to_magazine(self, carrier):
        yield self.magazine.put(carrier)
        self.state['carriers_in_magazine'].increment()

    def _update_magazine(self):
        '''
        update the magazine according to state
        '''
        should = self.state['carriers_in_magazine'].value
        current = len(self.magazine.items)
        diff = should - current
        if diff > 0:
            for i in range(diff):
                carrier = self.create_carrier()
                self.magazine.put(carrier)

        if diff < 0:
            for i in range(abs(diff)):
                carrier = yield self.magazine.get()

    def _draw_info(self, screen):
        self._draw_n_carriers(screen)

    def get_carrier(self):
        # First check if Magazine is allowed to create unlimited carriers
        if self.unlimited_carriers:
            yield self.env.timeout(self.carrier_getting_time)
            carrier = self.create_carrier()

        # Second check magazine
        else:
            carrier = yield self.env.process(self.get_carrier_from_magazine())
        self.state["carrier"].update(carrier.name)
        return carrier

    def run(self):
        # Initially fill the magazine with carriers
        self._initial_fill_magazine(self.state['carriers_in_magazine'].value)

        if hasattr(self, 'buffer_in'):
            self.env.process(self._buffer_in_to_magazine())

        while True:
            if self.is_on():
                # Get carrier from Magazine
                yield self.env.process(self.set_to_work())
                carrier = yield self.env.process(self.get_carrier())

                # Wait to place carrier to buffer_out
                yield self.env.process(self.set_to_waiting())
                yield self.env.process(self.buffer_out(carrier))
                self.state['carrier'].update(None)
            else:
                yield self.turn_off()

Process

Bases: Station

Process stations take a carrier as input, process the carrier, and push it onto buffer_out Args: processing_std: Standard deviation of the processing time rework_probability: Probability of a carrier to be reworked (takes 2x the time) position (tuple): X and Y position in visualization

Source code in lineflow/simulation/stations.py
class Process(Station):
    '''
    Process stations take a carrier as input, process the carrier, and push it onto buffer_out
    Args:
        processing_std: Standard deviation of the processing time
        rework_probability: Probability of a carrier to be reworked (takes 2x the time)
        position (tuple): X and Y position in visualization
    '''

    def __init__(
        self,
        name,
        buffer_in=None,
        buffer_out=None,
        processing_time=5,
        position=None,
        processing_std=None,
        rework_probability=0,
        worker_pool=None,

    ):

        super().__init__(
            name=name,
            position=position,
            processing_time=processing_time,
            processing_std=processing_std,
            rework_probability=rework_probability,
            worker_pool=worker_pool,
        )

        if buffer_in is not None:
            self._connect_to_input(buffer_in)

        if buffer_out is not None:
            self._connect_to_output(buffer_out)

    def init_state(self):

        self.state = ObjectStates(
            DiscreteState('on', categories=[True, False], is_actionable=False, is_observable=False),
            DiscreteState('mode', categories=['working', 'waiting', 'failing']),
            TokenState(name='carrier', is_observable=False),
            NumericState('processing_time', is_actionable=False, is_observable=True, vmin=0),
            CountState('n_workers', is_actionable=False, is_observable=True, vmin=0),
        )
        self.state['on'].update(True)
        self.state['mode'].update("waiting")
        self.state['carrier'].update(None)
        self.state['processing_time'].update(self.processing_time)
        self.state['n_workers'].update(self.n_workers)

    def _draw_info(self, screen):
        self._draw_n_workers(screen)

    def run(self):

        while True:
            if self.is_on():
                yield self.env.process(self.request_workers())
                self.state['n_workers'].update(self.n_workers)
                # Wait to get part from buffer_in
                yield self.env.process(self.set_to_waiting())
                carrier = yield self.env.process(self.buffer_in())
                self.state['carrier'].update(carrier.name)

                yield self.env.process(self.set_to_work())

                processing_time = self._sample_exp_time(
                    time=self.processing_time,
                    scale=self.processing_std,
                    rework_probability=self.rework_probability,
                )
                yield self.env.timeout(processing_time)
                self.state['processing_time'].update(processing_time)

                # Release workers
                self.release_workers()

                # Wait to place carrier to buffer_out
                yield self.env.process(self.set_to_waiting())
                yield self.env.process(self.buffer_out(carrier))
                self.state['carrier'].update(None)

            else:
                yield self.turn_off()

Sink

Bases: Station

The Sink takes carriers from buffer_in. It removes the parts of the carrier and either destroys it or puts them to buffer_out if one exists.

Parameters:

Name Type Description Default
processing_std float

The standard deviation of the processing time.

None
position tuple

X and Y position in the visualization.

None
Source code in lineflow/simulation/stations.py
class Sink(Station):
    """
    The Sink takes carriers from buffer_in. It removes the parts of the carrier and either
    destroys it or puts them to buffer_out if one exists.

    Args:
        processing_std (float): The standard deviation of the processing time.
        position (tuple): X and Y position in the visualization.
    """
    def __init__(
        self,
        name,
        buffer_in=None,
        buffer_out=None,
        processing_time=2,
        processing_std=None,
        position=None,
    ):
        super().__init__(
            name=name,
            processing_time=processing_time,
            processing_std=processing_std,
            position=position,
        )

        if buffer_in is not None:
            self._connect_to_input(buffer_in)
        if buffer_out is not None:
            self._connect_to_output(buffer_out)

    def init_state(self):

        self.state = ObjectStates(
            DiscreteState('on', categories=[True, False], is_actionable=False, is_observable=False),
            DiscreteState('mode', categories=['working', 'waiting', 'failing']),
            CountState('n_parts_produced', is_actionable=False, is_observable=False),
            TokenState(name='carrier', is_observable=False),
        )

        self.state['on'].update(True)
        self.state['mode'].update("waiting")
        self.state['n_parts_produced'].update(0)
        self.state['mode'].update("waiting")
        self.state['carrier'].update(None)

    def remove(self, carrier):

        processing_time = self._sample_exp_time(
            time=self.processing_time,
            scale=self.processing_std,
        )
        yield self.env.timeout(processing_time)
        self.state['n_parts_produced'].increment()

        if hasattr(self, 'buffer_out'):
            yield self.env.process(self.set_to_waiting())
            carrier.parts.clear()
            yield self.env.process(self.buffer_out(carrier))

    def run(self):

        while True:
            if self.is_on():
                yield self.env.process(self.set_to_waiting())
                carrier = yield self.env.process(self.buffer_in())
                yield self.env.process(self.set_to_work())
                self.state['carrier'].update(carrier.name)

                # Wait to place carrier to buffer_out
                yield self.env.process(self.remove(carrier))
                self.state['carrier'].update(None)

            else:
                yield self.turn_off()

Source

Bases: Station

Source station generating parts on carriers.

The Source takes carriers from buffer_in, creates a part, places that part onto the carrier, and pushes the carrier onto the buffer_out. If unlimited carriers is True, no buffer_in is needed and no magazine.

Parameters:

Name Type Description Default
name str

Name of the Cell

required
part_specs list

List of dictionaries. Each dict contain specification about a part that is assembled on the carrier i.e.: [{"assembly_condition": 5}]. Assembly condition defines the maxmim time a part can be on the line before not being able to be assembled.

None
buffer_in Buffer

Buffer in

None
buffer_out obj

Buffer out

None
processing_time float

Time it takes to put part on carrier (carrier needs to be available)

2
processing_std float

Standard deviation of processing time

None
waiting_time float

Time to wait between pushing a carrier out and taking the next one

0
position tuple

X and Y position in visualization

None
unlimited_carriers bool

If source has the ability to create unlimited carriers

False
carrier_capacity int

Defines how many parts can be assembled on a carrier. If set to default (infinity) or > 15, carrier will be visualized with one part.

inf
Source code in lineflow/simulation/stations.py
class Source(Station):
    '''
    Source station generating parts on carriers.

    The Source takes carriers from buffer_in, creates a part, places that part
    onto the carrier, and pushes the carrier onto the buffer_out.
    If unlimited carriers is True, no buffer_in is needed and no magazine.

    Args:
        name (str): Name of the Cell
        part_specs (list): List of dictionaries. Each dict contain specification about a part that
            is assembled on the carrier i.e.: [{"assembly_condition": 5}]. Assembly condition
            defines the maxmim time a part can be on the line before not being able to be assembled.
        buffer_in (lineflow.simulation.connectors.Buffer, optional): Buffer in
        buffer_out (obj): Buffer out
        processing_time (float): Time it takes to put part on carrier (carrier needs to be
            available)
        processing_std (float): Standard deviation of processing time
        waiting_time (float): Time to wait between pushing a carrier out and taking the next one
        position (tuple): X and Y position in visualization
        unlimited_carriers (bool): If source has the ability to create unlimited carriers
        carrier_capacity (int): Defines how many parts can be assembled on a carrier. If set to
            default (infinity) or > 15, carrier will be visualized with one part.

    '''
    def __init__(
        self,
        name,
        part_specs=None,
        buffer_in=None,
        buffer_out=None,
        processing_time=2,
        processing_std=None,
        waiting_time=0,
        waiting_time_step=0.5,
        position=None,
        actionable_magazin=True,
        actionable_waiting_time=True,
        unlimited_carriers=False,
        carrier_capacity=np.inf,
    ):
        super().__init__(
            name=name,
            position=position,
            processing_time=processing_time,
            processing_std=processing_std,
        )
        self._assert_init_args(unlimited_carriers, carrier_capacity, buffer_in)

        if buffer_in is not None:
            self._connect_to_input(buffer_in)
        if buffer_out is not None:
            self._connect_to_output(buffer_out)

        self.buffer_in_object = buffer_in
        self.unlimited_carriers = unlimited_carriers
        self.carrier_capacity = carrier_capacity
        self.waiting_time_step = waiting_time_step

        self.actionable_magazin = actionable_magazin
        self.actionable_waiting_time = actionable_waiting_time

        if part_specs is None:
            part_specs = [None]

        self.part_specs = part_specs
        self.part_id = 1

        self.unlimited_carriers = unlimited_carriers
        self.carrier_capacity = carrier_capacity
        self.carrier_id = 1

        self.init_waiting_time = waiting_time

    def init_state(self):

        self.state = ObjectStates(
            DiscreteState('on', categories=[True, False], is_actionable=False, is_observable=False),
            DiscreteState('mode', categories=['working', 'waiting', 'failing']),
            DiscreteState('waiting_time', categories=np.arange(0, 100, self.waiting_time_step), is_actionable=self.actionable_waiting_time),
            TokenState(name='carrier', is_observable=False),
            TokenState(name='part', is_observable=False),
        )

        self.state['waiting_time'].update(self.init_waiting_time)
        self.state['on'].update(True)
        self.state['mode'].update("waiting")
        self.state['carrier'].update(None)
        self.state['part'].update(None)

    def _assert_init_args(self, unlimited_carriers, carrier_capacity, buffer_in):
        if unlimited_carriers:
            if carrier_capacity > 15:
                warnings.warn(
                    "If carrier_capacity > 15, visualization of parts"
                    "on carriers is restriced and carrier will be visualized with one part")
            if not isinstance(carrier_capacity, int) and carrier_capacity != np.inf:
                raise AttributeError("Type of carrier capacity must be int or np.inf")

    def create_carrier(self):
        name = f'{self.name}_cr_{self.carrier_id}'
        carrier = Carrier(self.env, name=name, capacity=self.carrier_capacity)
        self.carrier_id += 1
        return carrier

    def create_parts(self):
        """
        Creates the parts based on the part_specs attribute
        For each dict in the part_specs list one part is created
        """
        parts = []
        for part_spec in self.part_specs:
            part = Part(
                env=self.env,
                name=self.name + '_part_' + str(self.part_id),
                specs=part_spec,
            )
            self.part_id += 1
            part.create(self.position)
            parts.append(part)
        return parts

    def assemble_parts_on_carrier(self, carrier, parts):
        """
        Put parts onto carrier
        """
        for part in parts:
            carrier.assemble(part)

    def assemble_carrier(self, carrier):

        parts = self.create_parts()
        self.state['part'].update(parts[0].name)

        processing_time = self._sample_exp_time(
            time=self.processing_time,
            scale=self.processing_std,
        )
        self.state['carrier'].update(carrier.name)

        yield self.env.timeout(processing_time)
        self.assemble_parts_on_carrier(carrier, parts)

        return carrier

    def wait(self):

        waiting_time = self.state['waiting_time'].to_str()

        if waiting_time > 0:
            yield self.env.process(self.set_to_waiting())
            yield self.env.timeout(waiting_time)

    def run(self):

        while True:
            if self.is_on():
                yield self.env.process(self.set_to_waiting())
                yield self.env.process(self.wait())

                if self.unlimited_carriers:
                    carrier = self.create_carrier()
                else:
                    carrier = yield self.env.process(self.buffer_in())

                yield self.env.process(self.set_to_work())
                carrier = yield self.env.process(self.assemble_carrier(carrier))

                yield self.env.process(self.set_to_waiting())
                yield self.env.process(self.buffer_out(carrier))
                self.state['part'].update(None)
                self.state['carrier'].update(None)

            else:
                yield self.turn_off()

assemble_parts_on_carrier(carrier, parts)

Put parts onto carrier

Source code in lineflow/simulation/stations.py
def assemble_parts_on_carrier(self, carrier, parts):
    """
    Put parts onto carrier
    """
    for part in parts:
        carrier.assemble(part)

create_parts()

Creates the parts based on the part_specs attribute For each dict in the part_specs list one part is created

Source code in lineflow/simulation/stations.py
def create_parts(self):
    """
    Creates the parts based on the part_specs attribute
    For each dict in the part_specs list one part is created
    """
    parts = []
    for part_spec in self.part_specs:
        part = Part(
            env=self.env,
            name=self.name + '_part_' + str(self.part_id),
            specs=part_spec,
        )
        self.part_id += 1
        part.create(self.position)
        parts.append(part)
    return parts

Station

Bases: StationaryObject

Source code in lineflow/simulation/stations.py
class Station(StationaryObject):

    _width = 30
    _height = 30
    _color = 'black'

    def __init__(
        self,
        name,
        position=None,
        processing_time=5,
        processing_std=None,
        rework_probability=0,
        worker_pool=None,
    ):

        super().__init__()

        if position is None:
            position = (0, 0)

        self.name = name
        self.position = pygame.Vector2(position[0], position[1])

        self.worker_pool = worker_pool
        self.worker_requests = {}

        if self.worker_pool is not None:
            self.worker_pool.register_station(self)

        self.processing_time = processing_time
        self.rework_probability = rework_probability

        if self.rework_probability > 1 or self.rework_probability < 0:
            raise ValueError('rework_probability should should be between 0 and 1')

        if processing_std is None:

            self.processing_std = 0.1*self.processing_time
        else:
            assert processing_std >= 0 and processing_std <= 1
            self.processing_std = processing_std*self.processing_time

        self.worker_assignments = {}

    @property
    def is_automatic(self):
        return self.worker_pool is None

    @property
    def n_workers(self):
        if self.worker_pool is not None:
            return len(self.worker_assignments) + 1
        else:
            return 1

    def setup_draw(self):

        self._rect = pygame.Rect(
            self.position.x - self._width / 2,
            self.position.y - self._height / 2,
            self._width,
            self._height,
        )

        font = pygame.font.SysFont(None, 20)
        self._text = font.render(self.name, True, 'black')

    def _draw(self, screen):
        pygame.draw.rect(screen, self._color, self._rect, border_radius=5)
        self._draw_info(screen)
        screen.blit(
            self._text,
            self._text.get_rect(center=self.position + (0, -0.6 * self._height)),
        )

    def _draw_info(self, screen):
        pass

    def _draw_n_workers(self, screen):
        if not self.is_automatic:
            font = pygame.font.SysFont(None, 14)
            text = font.render(
                "W=" + str(self.n_workers),
                True,
                'black',
            )
            screen.blit(
                text,
                text.get_rect(center=self.position),
            )

    def _draw_n_carriers(self, screen):
        font = pygame.font.SysFont(None, 14)
        text = font.render(
            "C=" + self.state['carriers_in_magazine'].to_str(),
            True,
            'black',
        )
        screen.blit(
            text,
            text.get_rect(center=self.position),
        )

    def get_performance_coefficient(self):
        return compute_performance_coefficient(self.n_workers)

    def _sample_exp_time(self, time=None, scale=None, rework_probability=0):
        """
        Samples a time from an exponential distribution
        """

        coeff = self.get_performance_coefficient()

        t = time*coeff + self.random.exponential(scale=scale)

        rework = self.random.choice(
            [1, 2],
            p=[1-rework_probability, rework_probability],
        )

        return t*rework

    def set_to_waiting(self):
        yield self.env.timeout(0)
        self._color = 'yellow'
        self.state['mode'].update('waiting')
        yield self.env.timeout(0)

    def request_workers(self):
        """
        Requests (and blocks) the worker for the process coming up.
        """
        if not self.is_automatic:
            self.worker_assignments = self.worker_pool.get_worker(self.name)

            self.worker_requests = {
                name: worker.request() for name, worker in self.worker_assignments.items()
            }

            # Block workers for this process
            for request in self.worker_requests.values():
                yield request

        else:
            yield self.env.timeout(0)

    def release_workers(self):
        """
        Releases the worker, to they may follow a new assignment
        """
        if not self.is_automatic:

            for worker, request in self.worker_requests.items():
                self.worker_assignments[worker].release(request)
            self.worker_requests = {}
            self.worker_assignments = {}

    def set_to_error(self):
        yield self.env.timeout(0)
        self._color = 'red'
        self.state['mode'].update('failing')
        yield self.env.timeout(0)

    def set_to_work(self):
        yield self.env.timeout(0)
        self._color = 'green'
        self.state['mode'].update('working')
        yield self.env.timeout(0)

    def turn_off(self):
        self._color = 'gray'
        self.state['on'].update(False)
        self.turn_off_event = simpy.Event(self.env)
        return self.turn_off_event

    def is_on(self):
        return self.state['on'].to_str()

    def turn_on(self):
        event = self.turn_off_event

        self.state['on'].update(True)
        if not event.triggered:
            yield event.succeed()
        else:
            yield self.env.timeout(0)

    def connect_to_input(self, station, *args, **kwargs):
        buffer = Buffer(name=f"Buffer_{station.name}_to_{self.name}", *args, **kwargs)
        self._connect_to_input(buffer)
        station._connect_to_output(buffer)

    def connect_to_output(self, station, *args, **kwargs):
        buffer = Buffer(name=f"Buffer_{self.name}_to_{station.name}", *args, **kwargs)
        self._connect_to_output(buffer)
        station._connect_to_input(buffer)

    def _connect_to_input(self, buffer):
        if hasattr(self, 'buffer_in'):
            raise ValueError(f'Input of {self.name} already connected')
        self.buffer_in = buffer.connect_to_output(self)

    def _connect_to_output(self, buffer):
        if hasattr(self, 'buffer_out'):
            raise ValueError(f'Output of {self.name} already connected')
        self.buffer_out = buffer.connect_to_input(self)

    def register(self, env):
        self.env = env
        self.env.process(self.run())

    def _derive_actions_from_new_state(self, state):
        # Turn machine back on if needed
        if not self.is_on() and 'on' in state and hasattr(self, 'turn_off_event') and state['on'] == 0:
            self.env.process(self.turn_on())

    def apply(self, actions):
        self._derive_actions_from_new_state(actions)
        self.state.apply(actions)

release_workers()

Releases the worker, to they may follow a new assignment

Source code in lineflow/simulation/stations.py
def release_workers(self):
    """
    Releases the worker, to they may follow a new assignment
    """
    if not self.is_automatic:

        for worker, request in self.worker_requests.items():
            self.worker_assignments[worker].release(request)
        self.worker_requests = {}
        self.worker_assignments = {}

request_workers()

Requests (and blocks) the worker for the process coming up.

Source code in lineflow/simulation/stations.py
def request_workers(self):
    """
    Requests (and blocks) the worker for the process coming up.
    """
    if not self.is_automatic:
        self.worker_assignments = self.worker_pool.get_worker(self.name)

        self.worker_requests = {
            name: worker.request() for name, worker in self.worker_assignments.items()
        }

        # Block workers for this process
        for request in self.worker_requests.values():
            yield request

    else:
        yield self.env.timeout(0)

Switch

Bases: Station

A Switch distributes carriers onto buffer outs. In and out buffers can be provided to the constructor but can also be added to a switch using the connect_to_input and connect_to_output methods.

Parameters:

Name Type Description Default
buffers_in list

A list of buffers that lead into the Switch.

None
buffers_out list

A list of buffers that lead away from the Switch.

None
position tuple

X and Y position in the visualization.

None
alternate bool

If True, the Switch switches between the buffers_out; else, only one buffer_out is used.

False
Source code in lineflow/simulation/stations.py
class Switch(Station):
    """
    A Switch distributes carriers onto buffer outs. In and out buffers can be provided to
    the constructor but can also be added to a switch using the `connect_to_input` and `connect_to_output`
    methods.

    Args:
        buffers_in (list): A list of buffers that lead into the Switch.
        buffers_out (list): A list of buffers that lead away from the Switch.
        position (tuple): X and Y position in the visualization.
        alternate (bool): If True, the Switch switches between the buffers_out; else, only one buffer_out is used.
    """

    def __init__(
        self,
        name,
        buffers_in=None,
        buffers_out=None,
        position=None,
        processing_time=5,
        alternate=False,
    ):
        super().__init__(
            name=name,
            position=position,
            processing_time=processing_time,
            # We assume switches do not have variation here
            processing_std=0,
        )

        # time it takes for a model to change buffer_in or buffer_out
        self.readjustment_time = 10

        self.buffer_in = []
        self.buffer_out = []

        if buffers_in is not None:
            for buffer in buffers_in:
                self._connect_to_input(buffer)

        if buffers_out is not None:
            for buffer in buffers_out:
                self._connect_to_output(buffer)

        self.alternate = alternate

    def init_state(self):
        self.state = ObjectStates(
            DiscreteState('on', categories=[True, False], is_actionable=False, is_observable=False),
            DiscreteState('mode', categories=['working', 'waiting', 'failing']),
            DiscreteState(
                name='index_buffer_in',
                categories=list(range(self.n_buffers_in)),
                is_actionable=not self.alternate and self.n_buffers_in > 1
            ),
            DiscreteState(
                name='index_buffer_out',
                categories=list(range(self.n_buffers_out)),
                is_actionable=not self.alternate and self.n_buffers_out > 1),
            TokenState(name='carrier', is_observable=False),
        )
        self.state['index_buffer_in'].update(0)
        self.state['index_buffer_out'].update(0)
        self.state['on'].update(True)
        self.state['mode'].update("waiting")
        self.state['carrier'].update(None)

    @property
    def n_buffers_in(self):
        return len(self.buffer_in)

    @property
    def n_buffers_out(self):
        return len(self.buffer_out)

    def _get_buffer_in_position(self):
        return self.buffer_in[
            self.state['index_buffer_in'].value
        ].__self__._positions_slots[-1]

    def _get_buffer_out_position(self):
        return self.buffer_out[
            self.state['index_buffer_out'].value
        ].__self__._positions_slots[0]

    def _draw_info(self, screen):
        pos_buffer_in = self._get_buffer_in_position()
        pos_buffer_out = self._get_buffer_out_position()

        pos_in = pos_buffer_in + 0.5*(self.position - pos_buffer_in)
        pos_out = pos_buffer_out + 0.5*(self.position - pos_buffer_out)

        pygame.draw.circle(screen, 'gray', self.position, 6)
        for pos in [pos_in, pos_out]:
            pygame.draw.line(screen, "gray", self.position, pos, width=5)

    def _connect_to_input(self, buffer):
        self.buffer_in.append(buffer.connect_to_output(self))

    def _connect_to_output(self, buffer):
        self.buffer_out.append(buffer.connect_to_input(self))

    def _alternate_indices(self):
        self.state['index_buffer_in'].set_next()
        self.state['index_buffer_out'].set_next()

    def look_in(self):
        """
        Checks if part at current buffer_in is available
        """
        buffer_in = self.buffer_in[self.state['index_buffer_in'].value].__self__
        while buffer_in.get_fillstate() == 0:
            yield self.env.timeout(1)
            buffer_in = self.buffer_in[self.state['index_buffer_in'].value].__self__
        return buffer_in

    def look_out(self):
        """
        Checks if space at current buffer_out is available
        """
        buffer_out = self.buffer_out[self.state['index_buffer_out'].value].__self__

        while buffer_out.get_fillstate() == 1:
            yield self.env.timeout(1)
            # check if buffer out changed
            buffer_out = self.buffer_out[self.state['index_buffer_out'].value].__self__
        return buffer_out

    def get(self):
        while True:
            yield self.env.process(self.set_to_waiting())
            buffer_in = yield self.env.process(self.look_in())
            self.getting_process = None
            yield self.env.process(self.set_to_work())
            carrier = yield self.env.process(
                buffer_in.get()
            )
            self.state['carrier'].update(carrier.name)
            return carrier

    def put(self, carrier):
        while True:
            yield self.env.process(self.set_to_waiting())
            buffer_out = yield self.env.process(self.look_out())
            yield self.env.process(buffer_out.put(carrier))
            self.state['carrier'].update(None)
            return

    def run(self):
        while True:
            if self.is_on():
                # Get carrier
                carrier = yield self.env.process(self.get())

                # Process
                yield self.env.process(self.set_to_work())
                yield self.env.timeout(self.processing_time)

                yield self.env.process(self.put(carrier))

                if self.alternate:
                    self._alternate_indices()

            else:
                yield self.turn_off()

look_in()

Checks if part at current buffer_in is available

Source code in lineflow/simulation/stations.py
def look_in(self):
    """
    Checks if part at current buffer_in is available
    """
    buffer_in = self.buffer_in[self.state['index_buffer_in'].value].__self__
    while buffer_in.get_fillstate() == 0:
        yield self.env.timeout(1)
        buffer_in = self.buffer_in[self.state['index_buffer_in'].value].__self__
    return buffer_in

look_out()

Checks if space at current buffer_out is available

Source code in lineflow/simulation/stations.py
def look_out(self):
    """
    Checks if space at current buffer_out is available
    """
    buffer_out = self.buffer_out[self.state['index_buffer_out'].value].__self__

    while buffer_out.get_fillstate() == 1:
        yield self.env.timeout(1)
        # check if buffer out changed
        buffer_out = self.buffer_out[self.state['index_buffer_out'].value].__self__
    return buffer_out

WorkerPool

Bases: StationaryObject

Source code in lineflow/simulation/stations.py
class WorkerPool(StationaryObject):

    def __init__(
        self,
        name,
        n_workers=None,
        transition_time=5,
    ):
        super().__init__()

        assert n_workers is not None, "Workers have to be set"

        self.name = name
        self.n_workers = n_workers
        self.transition_time = transition_time
        self.stations = []
        self._station_names = []
        self._worker_names = [f"W{i}" for i in range(self.n_workers)]
        self.workers = {
            name: Worker(
                name=name,
                transition_time=self.transition_time
            ) for name in self._worker_names
        }

    def register_station(self, station):
        self.stations.append(station)
        self._station_names.append(station.name)

    def init_state(self):
        for worker in self.workers.values():
            worker.init_state(self.stations)

        self.state = ObjectStates(
            *[
                worker.state for worker in self.workers.values()
            ]
        )
        # Distribute worker on stations in round robin fashion
        for worker, station in zip_cycle(self.n_workers, self.n_stations):
            self.state[f"W{worker}"].apply(station)

    @property
    def n_stations(self):
        return len(self.stations)

    def register(self, env):
        self.env = env

        for worker in self.workers.values():
            worker.register(env)

        for worker_n, station in zip_cycle(self.n_workers, self.n_stations):

            worker = self.workers[f"W{worker_n}"]
            # Start working
            self.env.process(worker.work())

    def apply(self, actions):
        """
        This should just update the state of the workers
        """
        for worker, station in actions.items():
            worker_obj = self.workers[worker]
            self.env.process(worker_obj.assign(station))

    def get_worker(self, name):
        # gather these workers assigned to these station
        station = self._station_names.index(name)
        requests = {}

        for worker in self.workers.values():
            # If state of worker equals the station, the worker is blocked for exactly this station
            if worker.state.value == station:
                requests[worker.name] = worker
        return requests

apply(actions)

This should just update the state of the workers

Source code in lineflow/simulation/stations.py
def apply(self, actions):
    """
    This should just update the state of the workers
    """
    for worker, station in actions.items():
        worker_obj = self.workers[worker]
        self.env.process(worker_obj.assign(station))

Buffer

Bases: Connector

Element that connects two different types of stations.

Parameters:

Name Type Description Default
name str

Name of the station

required
capacity int

Number of slots the buffer can hold

2
transition_time float

Time carriers need to traverse the full buffer

10
put_time float

Time the buffer needs to hand over a carrier to the next element

1
get_time float

Time the buffer needs to get a carrier to the previous element

1
Source code in lineflow/simulation/connectors.py
class Buffer(Connector):
    """
    Element that connects two different types of stations.

    Args:
        name (str): Name of the station
        capacity (int): Number of slots the buffer can hold
        transition_time (float): Time carriers need to traverse the full buffer
        put_time (float): Time the buffer needs to hand over a carrier to the next element
        get_time (float): Time the buffer needs to get a carrier to the previous element

    """

    def __init__(self, name, capacity=2, transition_time=10, put_time=1, get_time=1, put_std=None):
        super().__init__()
        self.name = name
        self.transition_time_between_slots = transition_time / (capacity - 1)
        self.transition_time = transition_time
        self.put_time = put_time
        self.get_time = get_time
        self.color = "gray"

        # Small registry of carriers
        self.carriers = {}
        self.capacity = capacity

        if put_std is None:
            self.put_std = 0.1*self.put_time
        else:
            self.put_std = put_std

        self._positions_slots = self.capacity * [None]
        self._positions_arrow = self.capacity * [None]

    def init_state(self):

        self.state = ObjectStates(
            DiscreteState('on', categories=[True, False], is_actionable=False, is_observable=False),
            NumericState(name='fill', vmin=0, vmax=1, is_observable=True, is_actionable=False),
        )

        self.state['on'].update(True)
        self.state['fill'].update(0)

    @property
    def n_carriers(self):
        return len(self.carriers)

    def setup_draw(self):

        vec_direction = np.array(
            [
                self._position_output.x - self._position_input.x,
                self._position_output.y - self._position_input.y
            ]
        ) / (self.capacity + 1)

        if vec_direction[0] == 0 and vec_direction[1] == 0:
            raise ValueError(f'Start and end position of {self.name} equal!')

        vec_start = np.array([self._position_input.x, self._position_input.y])

        for i in range(self.capacity):

            pos = vec_start + (i+1)*vec_direction
            self._positions_slots[i] = pygame.Vector2(pos[0], pos[1])

        # Calculate arrowhead points
        arrowhead_size = 3.5
        vec_normalized = vec_direction / np.linalg.norm(vec_direction)

        for i in range(self.capacity):
            positions_arrow = vec_start + (i+1.55) * vec_direction
            arrowhead = np.tile(positions_arrow, (3, 1))

            for j, index in enumerate([0, 2]):
                arrowhead[index] += arrowhead_size*np.array(
                    [
                        -vec_normalized[0]+(-1)**j*vec_normalized[1],
                        -vec_normalized[1]-(-1)**j*vec_normalized[0],
                    ]
                )

            self._positions_arrow[i] = arrowhead

    def _draw(self, screen):

        pygame.draw.line(
            screen,
            self.color,
            self._position_input,
            self._position_output,
            width=10,
        )

        # Draw slots
        for i, slot in enumerate(self._positions_slots):
            pygame.draw.circle(screen, 'gray', slot, 10)

        # Draw arrowheads
        for i, arrow in enumerate(self._positions_arrow[:-1]):
            pygame.draw.polygon(screen, 'black', arrow)

        # Draw carriers
        for carrier in self.carriers.values():
            carrier.draw(screen)

    def _sample_put_time(self):
        return self.put_time + self.random.exponential(scale=self.put_std)

    def get(self):
        self.carrier = yield self.slots[-1].get()
        # Release segment
        yield self.blockers[-1].get()

        # Remove carrier from registry
        self.carriers.pop(self.carrier.name)
        self.state['fill'].update(self.get_fillstate())
        yield self.env.timeout(self.get_time)
        return self.carrier

    def put(self, carrier):

        # Wait for segment to be released
        yield self.blockers[0].put(1)

        # Wait a bit to actually put part
        yield self.env.timeout(self._sample_put_time())

        yield self.slots[0].put(carrier)

        carrier.move(self._positions_slots[0])

        # Remove carrier from registry
        self.carriers[carrier.name] = carrier
        self.state['fill'].update(self.get_fillstate())

    def get_fillstate(self):
        return self.n_carriers/self.capacity

    def _move(self, i):

        slot_from = self.slots[i]
        slot_to = self.slots[i+1]

        blocker_self = self.blockers[i]
        blocker_next = self.blockers[i+1]

        while True:
            # Wait for carrier in slot
            carrier = yield slot_from.get()

            # Waiting for segment to be free
            yield blocker_next.put(1)

            carrier.move(self._positions_slots[i+1])

            # Release segment before
            yield blocker_self.get()

            # Start moving Transition
            yield self.env.timeout(self.transition_time_between_slots)

            # Push carrier to next segment
            yield slot_to.put(carrier)

    def register(self, env):
        self.env = env

        self.slots = [
            simpy.Store(
                env=self.env,
                capacity=1,
            ) for _ in range(self.capacity)
        ]

        self.blockers = [
            simpy.Store(
                env=self.env,
                capacity=1,
            ) for _ in range(self.capacity)
        ]

        for i in range(self.capacity - 1):
            self.env.process(self._move(i))

Carrier

Bases: MovableObject

Source code in lineflow/simulation/movable_objects.py
class Carrier(MovableObject):

    def __init__(self, env, name, color='Black', width=30, height=10, capacity=np.inf):
        super(Carrier, self).__init__(env, name, specs=None)
        self.capacity = capacity
        self._color = color
        self._width = width
        self._height = height

        self._width_part = 0.8*self._width
        if capacity < 15:
            self._width_part = self._width_part / self.capacity

        self._height_part = 0.7*self._height

        self.parts = {}

    def assemble(self, part):

        if part.name in self.parts:
            raise ValueError(f'A part with name {part.name} already contained')

        if not hasattr(part, "creation_time"):
            raise ValueError('Part not created')
        self.parts[part.name] = part

    def _draw_shape(self, screen):

        self._rect = pygame.Rect(
            self._position.x - self._width / 2,
            self._position.y - self._height / 2,
            self._width,
            self._height,
        )
        pygame.draw.rect(screen, self._color, self._rect, border_radius=2)

        for i, part in enumerate(self):
            part._draw(
                screen,
                x=self._position.x+0.1*self._width - self._width / 2 + i*(self._width_part),
                y=self._position.y - self._height_part / 2,
                width=self._width_part,
                height=self._height_part,
            )

    def move(self, position):
        """
        """

        # If no position has been given, no move is taking place
        if position is None:
            return

        if not isinstance(position, pygame.Vector2):
            raise ValueError('Expect pygame vector as position')

        self._position = position

        for part in self.parts.values():
            part.move(position)

    def __iter__(self):
        for part in self.parts.values():
            yield part

move(position)

Source code in lineflow/simulation/movable_objects.py
def move(self, position):
    """
    """

    # If no position has been given, no move is taking place
    if position is None:
        return

    if not isinstance(position, pygame.Vector2):
        raise ValueError('Expect pygame vector as position')

    self._position = position

    for part in self.parts.values():
        part.move(position)

Part

Bases: MovableObject

Source code in lineflow/simulation/movable_objects.py
class Part(MovableObject):
    def __init__(self, env, name, specs=None, color='Orange'):
        super(Part, self).__init__(env, name, specs=specs)
        self._color = color

    def is_valid_for_assembly(self):
        """
        If the part has an `assembly_condition` in its specification, then it checks whether the
        time between its creation and now is smaller than this condition. Otherwise it will just
        return true.
        """
        if "assembly_condition" in self.specs:
            return (self.env.now - self["creation_time"]) < self["assembly_condition"]
        else:
            return True

    def create(self, position):
        if not isinstance(position, pygame.Vector2):
            raise ValueError('Expect pygame vector as position')
        self.move(position)

    def _draw(self, screen, x, y, width, height):
        _part_rect = pygame.Rect(x, y, width, height)
        pygame.draw.rect(screen, self._color, _part_rect, border_radius=1)

is_valid_for_assembly()

If the part has an assembly_condition in its specification, then it checks whether the time between its creation and now is smaller than this condition. Otherwise it will just return true.

Source code in lineflow/simulation/movable_objects.py
def is_valid_for_assembly(self):
    """
    If the part has an `assembly_condition` in its specification, then it checks whether the
    time between its creation and now is smaller than this condition. Otherwise it will just
    return true.
    """
    if "assembly_condition" in self.specs:
        return (self.env.now - self["creation_time"]) < self["assembly_condition"]
    else:
        return True

Worker

Bases: object

Source code in lineflow/simulation/movable_objects.py
class Worker(object):
    def __init__(self, name, transition_time=5):
        self.name = name
        self.transition_time = transition_time

    def register(self, env):
        self.env = env
        self._working = Resource(self.env, capacity=1)
        self.assignment = Store(env=self.env, capacity=1)

    def release(self, request):
        self._working.release(request)

    def request(self):
        return self._working.request()

    def assign(self, station):
        """
        Assign worker to station.

        """
        if len(self.assignment.items) > 0:
            # Clean old assignment
            yield self.assignment.get()

        yield self.assignment.put(station)

    def init_state(self, stations):

        self.stations = stations
        self.state = DiscreteState(
            name=self.name,
            categories=[s.name for s in self.stations],
            is_observable=True,
            is_actionable=True,
        )

    def work(self):

        # Initially fill value of state to assignment
        yield self.assignment.put(self.state.value)

        while True:
            # Wait for new assignment
            station = yield self.assignment.get()

            # Wait until worker is released from current station
            transition_request = self.request()
            yield transition_request

            if self.state.value != station:
                # New cell-assignment, wait for transition
                # Move to new cell
                yield self.env.timeout(self.transition_time)

            self.release(transition_request)
            # Station now can create requests
            self.state.apply(station)

assign(station)

Assign worker to station.

Source code in lineflow/simulation/movable_objects.py
def assign(self, station):
    """
    Assign worker to station.

    """
    if len(self.assignment.items) > 0:
        # Clean old assignment
        yield self.assignment.get()

    yield self.assignment.put(station)

StationaryObject

Source code in lineflow/simulation/stationary_objects.py
class StationaryObject:
    _objects = []

    def __init__(self):
        StationaryObject._objects.append(self)

    def setup_draw(self):
        pass

    def _draw(self, screen):
        pass

    def register(self, env):
        raise NotImplementedError

    def __enter__(self):
        # Clean up line objects
        StationaryObject._objects = []

        return self._objects

    def init(self, random):
        """
        Function that is called after line is built, so all available information is present
        """
        self.random = random
        self.init_state()

    def init_state(self):
        """
        Should initialize the state object
        """
        raise NotImplementedError()

    def __exit__(self, type, value, traceback):
        StationaryObject._objects = []

init(random)

Function that is called after line is built, so all available information is present

Source code in lineflow/simulation/stationary_objects.py
def init(self, random):
    """
    Function that is called after line is built, so all available information is present
    """
    self.random = random
    self.init_state()

init_state()

Should initialize the state object

Source code in lineflow/simulation/stationary_objects.py
def init_state(self):
    """
    Should initialize the state object
    """
    raise NotImplementedError()

This file is a wrapper around our simulation such that it is compatible with the stable-baselines repo

LineSimulation

Bases: Env

A Gym-compatible environment for simulating production lines using LineFlow.

This environment wraps a LineFlow production line into a reinforcement learning setup, where agents can interact with stations via actions at discrete decision points, while the underlying process unfolds in continuous time using discrete-event simulation.

Parameters:

Name Type Description Default
line Line

A LineFlow Line object representing the production line layout and behavior.

required
simulation_end int

The simulation end time (in simulation time units).

required
reward str

The reward signal to use. Options are "parts" (default) for counting produced parts, or "uptime" for average utilization.

'parts'
part_reward_lookback int

Time window for computing average uptime-based rewards (used only if reward=uptime).

0
render_mode str or None

Optional rendering mode. Currently supports "human" for visual rendering or None.

None
Source code in lineflow/simulation/environment.py
class LineSimulation(gym.Env):
    """
    A Gym-compatible environment for simulating production lines using LineFlow.

    This environment wraps a LineFlow production line into a reinforcement learning setup, where
    agents can interact with stations via actions at discrete decision points, while the underlying
    process unfolds in continuous time using discrete-event simulation.

    Args:
        line (lineflow.simulation.line.Line): A LineFlow `Line` object representing the production
            line layout and behavior.
        simulation_end (int): The simulation end time (in simulation time units).
        reward (str, optional): The reward signal to use. Options are "parts" (default) for counting
            produced parts, or "uptime" for average utilization.
        part_reward_lookback (int, optional): Time window for computing average uptime-based rewards
            (used only if reward=`uptime`).
        render_mode (str or None, optional): Optional rendering mode. Currently supports "human" for
            visual rendering or None.
    """

    metadata = {"render_modes": [None, "human"]}

    def __init__(self, line, simulation_end, reward="parts", part_reward_lookback=0, render_mode=None):
        super().__init__()
        self.line = line
        self.simulation_end = simulation_end
        self.part_reward_lookback = part_reward_lookback

        self.render_mode = render_mode

        assert reward in ["uptime", "parts"]
        self.reward = reward

        # fix an order of states
        self.action_space = _build_action_space(self.line.state)
        self.observation_space = _build_observation_space(line_state=self.line.state)

        self.n_parts = 0
        self.n_scrap_parts = 0

    def _map_to_action_dict(self, actions):

        actions_iterator = filterfalse(
            lambda n: not self.line.state[n[0]][n[1]].is_actionable,
            self.line.state
        )

        actions_dict = {}
        for action, (station, action_name) in zip(actions, actions_iterator):
            if station not in actions_dict:
                actions_dict[station] = {}

            actions_dict[station][action_name] = action
        return actions_dict

    def step(self, actions):
        """
        Advances the simulation by one environment step.

        Args:
            actions (list or array): A list of agent actions corresponding to actionable features.

        Returns:
            observation (numpy.ndarray): Observation tensor representing the current state.
            reward (float): The computed reward for the current step.
            terminated (bool): Whether the episode has ended.
            truncated (bool): Whether the episode ended due to an internal error or simulation limit.
            info (dict): Additional diagnostic information.
        """
        actions = self._map_to_action_dict(actions)
        self.line.apply(actions)

        try:
            state, terminated = self.line.step(self.simulation_end)
            truncated = False
        except simpy.core.EmptySchedule:
            # TODO: not tested yet
            state = self.line.state
            terminated = True
            truncated = True

        observation = self._get_observations_as_tensor(state)

        if self.reward == "parts":
            reward = (self.line.get_n_parts_produced() - self.n_parts) - \
                self.line.scrap_factor*(self.line.get_n_scrap_parts() - self.n_scrap_parts)
        elif self.reward == "uptime":
            reward = self.line.get_uptime(lookback=self.part_reward_lookback).mean()
        else:
            assert False, f"Reward {reward} not implemented"

        self.n_parts = self.line.get_n_parts_produced()
        self.n_scrap_parts = self.line.get_n_scrap_parts()

        if self.render_mode == "human":
            self.render()

        return observation, reward, terminated, truncated, self._get_info()

    def _get_info(self):
        return self.line.info()

    def increase_scrap_factor(self, factor=0.1):
        """
        Sets the scrap penalty factor in the reward function.

        Args:
            factor (float): The multiplier applied to scrap parts in the parts-based reward.
        """
        self.line.scrap_factor = factor

    def reset(self, seed=None, options=None):
        gym.Env.reset(self, seed=seed)

        self.line.reset(random_state=seed)
        self.n_parts = 0
        self.n_scrap_parts = 0

        state, _ = self.line.step()
        observation = self._get_observations_as_tensor(state)

        if self.render_mode == "human":
            self.screen = self.line.setup_draw()
            self.render()
        return observation, self._get_info()

    @property
    def features(self):
        return self.line.state.observable_features

    def render(self):
        self.line._draw(self.screen)

    def close(self):
        if self.render_mode == 'human':
            self.line.teardown_draw()

    def _get_observations_as_tensor(self, state):

        X = state.get_observations(lookback=1, include_time=False)
        return np.array(X, dtype=np.float32)

increase_scrap_factor(factor=0.1)

Sets the scrap penalty factor in the reward function.

Parameters:

Name Type Description Default
factor float

The multiplier applied to scrap parts in the parts-based reward.

0.1
Source code in lineflow/simulation/environment.py
def increase_scrap_factor(self, factor=0.1):
    """
    Sets the scrap penalty factor in the reward function.

    Args:
        factor (float): The multiplier applied to scrap parts in the parts-based reward.
    """
    self.line.scrap_factor = factor

step(actions)

Advances the simulation by one environment step.

Parameters:

Name Type Description Default
actions list or array

A list of agent actions corresponding to actionable features.

required

Returns:

Name Type Description
observation ndarray

Observation tensor representing the current state.

reward float

The computed reward for the current step.

terminated bool

Whether the episode has ended.

truncated bool

Whether the episode ended due to an internal error or simulation limit.

info dict

Additional diagnostic information.

Source code in lineflow/simulation/environment.py
def step(self, actions):
    """
    Advances the simulation by one environment step.

    Args:
        actions (list or array): A list of agent actions corresponding to actionable features.

    Returns:
        observation (numpy.ndarray): Observation tensor representing the current state.
        reward (float): The computed reward for the current step.
        terminated (bool): Whether the episode has ended.
        truncated (bool): Whether the episode ended due to an internal error or simulation limit.
        info (dict): Additional diagnostic information.
    """
    actions = self._map_to_action_dict(actions)
    self.line.apply(actions)

    try:
        state, terminated = self.line.step(self.simulation_end)
        truncated = False
    except simpy.core.EmptySchedule:
        # TODO: not tested yet
        state = self.line.state
        terminated = True
        truncated = True

    observation = self._get_observations_as_tensor(state)

    if self.reward == "parts":
        reward = (self.line.get_n_parts_produced() - self.n_parts) - \
            self.line.scrap_factor*(self.line.get_n_scrap_parts() - self.n_scrap_parts)
    elif self.reward == "uptime":
        reward = self.line.get_uptime(lookback=self.part_reward_lookback).mean()
    else:
        assert False, f"Reward {reward} not implemented"

    self.n_parts = self.line.get_n_parts_produced()
    self.n_scrap_parts = self.line.get_n_scrap_parts()

    if self.render_mode == "human":
        self.render()

    return observation, reward, terminated, truncated, self._get_info()

Reinforcement Learning

train(config)

Function that handles RL training

Args: - train: Scores from the model update phase - rollout: Scores when a policy is rolled out to gather new experiences. - eval: Scores when a policy is evaluated on a separate environment

Notes

Size of rollout-buffer: n_steps*n_envs, then an model-update is done

Source code in scripts/train.py
def train(config):
    """
    Function that handles RL training

    Args:
    - `train`: Scores from the model update phase
    - `rollout`: Scores when a policy is rolled out to gather new experiences.
    - `eval`: Scores when a policy is evaluated on a separate environment

    Notes:
        Size of rollout-buffer: `n_steps*n_envs`, then an model-update is done
    """

    simulation_end = config['simulation_end'] + 1

    env_train = make_stacked_vec_env(
        line=_make_line(config['env'], config['n_cells'], config['info'], curriculum=config['curriculum']),
        simulation_end=simulation_end,
        reward=config["rollout_reward"],
        n_envs=config['n_envs'],
        n_stack=config['n_stack'] if not config['recurrent'] else 1,
    )

    env_eval = make_stacked_vec_env(
        line=_make_line(config['env'], config['n_cells'], config['info'], curriculum=config['curriculum']),
        simulation_end=simulation_end,
        reward=config["eval_reward"],
        n_envs=1,
        n_stack=config['n_stack'] if not config['recurrent'] else 1,
    )
    run = wandb.init(
        project='Lineflow',
        sync_tensorboard=True,
        config=config
    )
    log_path = os.path.join(config['log_dir'], run.id)

    if config['env'] == 'complex_line' and config['curriculum']:
        curriculum_callback = CurriculumLearningCallback(
            # Task marked as resolved if rewards is above 100
            threshold=100, 
            # Update of scrap factor
            update=(1/config["n_cells"])/5, 
            factor_max=1/config["n_cells"],
            look_back=3,
        )
    else:
        curriculum_callback = None

    eval_callback = EvalCallback(
        eval_env=env_eval,
        deterministic=config['deterministic'],
        n_eval_episodes=1,
        # Every (eval_freq*eval_envs) / (n_steps*train_envs)  step an update is done
        eval_freq=config["n_steps"]* config["n_envs"] * 10, # ever step in every env counts
        callback_after_eval=curriculum_callback,
    )

    model_args = {
        "policy": 'MlpPolicy' if not config['recurrent'] else 'MlpLstmPolicy',
        "env": env_train,
        "n_steps": config["n_steps"],
        "gamma": config['gamma'],  # discount factor
        "learning_rate": config["learning_rate"],
        "use_sde": False,
        "normalize_advantage": config['normalize_advantage'],
        "device": get_device(),
        "tensorboard_log": log_path,
        "stats_window_size": 10,
        "verbose": 0,
        "seed": config['seed'] if config['seed'] != 0 else None,
    }

    if "PPO" in config['model']:
        model_cls = PPO
        model_args["batch_size"] = config["n_steps"]  # mini-batch size
        model_args["n_epochs"] = 5  # number of times to go over experiences with mini-batches
        model_args["clip_range"] = config['clip_range']
        model_args["max_grad_norm"] = 0.5
        model_args["ent_coef"] = config['ent_coef']

        if config['recurrent']:
            model_cls = RecurrentPPO

    if "A2C" in config['model']:
        model_cls = A2C
        model_args["max_grad_norm"] = 0.5
        model_args["ent_coef"] = config['ent_coef']

    if "TRPO" in config['model']:
        model_cls = TRPO

    model = model_cls(**model_args)

    model.learn(
        total_timesteps=config["total_steps"],
        callback=CallbackList([
            WandbCallback(verbose=0),
            eval_callback,
        ])
    )
    run.finish()

CurriculumLearningCallback

Bases: BaseCallback

Parameters:

Name Type Description Default
look_back int

Number of last evaluations that should be used to check if task is solved

5
factor_max float

Factor with which the threshold is multiplied to get new threshold

inf
Source code in lineflow/learning/curriculum.py
class CurriculumLearningCallback(BaseCallback):
    """

    Args:
        look_back (int): Number of last evaluations that should be used to check if task is solved
        factor_max (float): Factor with which the threshold is multiplied to get new threshold
    """
    parent: EvalCallback

    def __init__(self, threshold, update=0.02, factor_max=np.inf, look_back=5):
        super().__init__(verbose=0)

        self.threshold = threshold
        self.look_back = look_back
        self.update = update
        self.factor = 0
        self.factor_max = factor_max

        self.rewards = []
        self.last_adjustment = 0

    def update_task(self):
        self.factor = min(self.factor + self.update, self.factor_max)
        self.parent.eval_env.env_method('increase_scrap_factor', (self.factor))
        self.parent.training_env.env_method('increase_scrap_factor', (self.factor))

    def _on_step(self) -> bool:
        self.rewards.append(self.parent.last_mean_reward)
        if self.n_calls >= self.last_adjustment + self.look_back:

            # Check if task is solved
            if np.mean(self.rewards[-self.look_back:]) >= self.threshold:
                self.update_task()
                self.last_adjustment = self.n_calls
        self.logger.record("eval/scrap_factor", self.factor)
        return True