Processing

A set o basic functions designed to be used in the processing section for the purpose of to read and manipulate event data and implement yur application logic.

Generic Functions

class krules_core.base_functions.processing.Process

Like Filter evaluate a given expression but does not return it. The best way to exploit it is to use it in combination with Argument Processors.

from krules_env import RULE_PROC_EVENT

#...

rulesdata = [
    {
        # Store processed events with Django ORM
        rulename: "processed-rules",
        subscibre_to: RULE_PROC_EVENT,
        ruledata: {
            processing: [
                Process(
                    lambda payload:(
                        ProcessedEvent.objects.create(
                            rule_name=payload["name"],
                            type=payload["type"],
                            subject=payload["subject"],
                            event_info=payload["event_info"],
                            payload=payload["payload"],
                            time=payload["event_info"].get("time", datetime.now().isoformat()),
                            filters=payload["filters"],
                            processing=payload["processing"],
                            got_errors=payload["got_errors"],
                            processed=payload["processed"],
                            origin_id=payload["event_info"].get("originid", "-")
                        )
                    )
                ),
            ]
        }
    }
]
execute(value)
class krules_core.base_functions.processing.Route

Produce an event inside and/or outside the ruleset, for “sending outside” the event we mean to deliver it to the dispatcher component. By default an event is dispatched outside only if there is no handler defined in the current ruleset. However it is possible to change this behavior using dispatch_policy. Available choices are defined in krules_core.route.router.DispatchPolicyConst as:

  • DEFAULT: Dispatched outside only when no handler is found in current ruleset;

  • ALWAYS: Always dispatched outside even if an handler is found and processed in the current ruleset;

  • NEVER: Never dispatched outside;

  • DIRECT: Skip to search for a local handler and send outside directly.

from krules_core.route.router.DispatchPolicyConst import DEFAULT, ALWAYS, NEVER, DIRECT
from krules_core.event_types import SUBJECT_PROPERTY_CHANGED

# ...

rulesdata = [
    {
        rulename: "on-device-onboarded-dispatch-added-event",
        subscibre_to: "device-onboarded",
        ruledata: {
            processing: [
                # ...
                # do something with device
                Route(
                    subject=lambda payload: payload["device_id"],
                    payload=lambda payload: payload["device_data"],
                    event_type="device-added",
                    # no dispatch_policy is provided so will be used the DEFAULT one
                ),
            ]
        }
    },
    {
        rulename: "on-position-change-propagate-event",
        subscibre_to: SUBJECT_PROPERTY_CHANGED,
        ruledata: {
            filters: [
                OnSubjectPropertyChanged("position")
            ]
            processing: [
                Route(
                    dispatch_policy=DIRECT
                    # In this case we don't specify neither type, nor subject, nor payload.
                    # We use dispatch_policy DIRECT to propagate the received event outside, this increase
                    # efficiency because we want avoid useless check in other rules subscribed to SUBJECT_PROPERTY_CHANGED.
                    #  Note that the rules are processed following the order in which they were defined.
                )
            ]
        }
    },
    {
        rulename: "on-temp-change-propagate-event",
        subscibre_to: SUBJECT_PROPERTY_CHANGED,
        ruledata: {
            filters: [
                OnSubjectPropertyChanged("temp", value=lambda v: v > 30)
            ]
            processing: [
                Route(
                    event_type="device-overheated"
                    dispatch_policy=ALWAYS
                    # We want to handle device-overheated event both in the current container and outside, for example to send an external notification
                )
            ]
        }
    },
    {
        rulename: "on-device-overheated-schedule-check",
        subscribe_to: "device-overheated",
        ruledata: {
            # ...
        }
    },
]
execute(event_type=None, subject=None, payload=None, dispatch_policy='default')
Parameters
  • event_type – The event type. If None use current processing event type [default None]

  • subject – The event subject. If None use the current subject [default None]

  • payload – The event payload. If None use the current payload [default None]

  • dispatch_policy – Define the event dispatch policy as explained before. [default DispatchPolicyConst.DEFAULT]

class krules_core.base_functions.processing.RaiseException

Force the given exception raising

from .my_custom_exceptions import UnexpectedPayload # supposing we defined a module with custom exceptions

rulesdata = [
    {
        rulename: "on-unexpected-payload-raise-exception",
        subscibre_to: "device-onboarded",
        ruledata: {
            filters: [
                Return(lambda payload: "device_id" not in payload)
            ]
            processing: [
                RaiseException(
                    UnexpectedPayload("device_id missing!")
                )
            ]
        }
    },
]
execute(ex)
Parameters

ex – The exception to be raised

Acting on Subject

class krules_core.base_functions.processing.SetSubjectProperty

Set a single property of the subject, supporting atomic operation. By default, the property is reactive unless is muted (muted=True) or extended (extended=True)

rulesdata = [
    {
        rulename: "set-device-class",
        subscibre_to: "device-onboarded",
        ruledata: {
            filters: [
                ...
                # Check if device has characteristics of an heather
            ]
            processing: [
                SetSubjectProperty(
                    property_name="device_class",
                    value="heather"
                )
            ]
        }
    },
    {
        rulename: "on-new-checkup-increment-counter",
        subscibre_to: "checkup",
        ruledata: {
            processing: [
                SetSubjectProperty(
                    property_name="checkup_cnt",
                    value=lambda x: x is None and 1 or x + 1 # Operation is atomic
                )
            ]
        }
    }
]
execute(property_name, value, extended=False, muted=False, use_cache=True)
Parameters
  • property_name – Name of the property to set. It may or may not exist

  • value – Value to set. It can be a callable and receives (optionally) the current property value. If the property does not exist yet, it receives None. Note that value setting is an atomic operation.

  • extended – If True set an extended property instead a standard one. [default False]

  • muted – If True no subject-property-changed will be raised after property setting. Note that extended properties are always muted so, if extended is True, this parameter will be ignored. [default False]

  • use_cache – If False store the property value immediately on the storage, otherwise wait for the end of rule execution. [default False]

class krules_core.base_functions.processing.SetSubjectPropertyImmediately

Extends SetSubjectProperty setting a property directly to the storage without using the cache ( use_cache=False ). This could be very helpful to avoid concurrency issues by avoiding running into inconsistencies during the execution. The extension’s aim is to made code more readable.

execute(property_name, value, extended=False, muted=False, **kwargs)
class krules_core.base_functions.processing.SetSubjectExtendedProperty

Extends SetSubjectProperty setting an extended property of the subject( extended=True ). Note that muted is not present anymore in the arguments because an extended property is always muted. The extension’s aim is to made code more readable.

execute(property_name, value, use_cache=True, **kwargs)
class krules_core.base_functions.processing.SetSubjectProperties

Set multiple properties in subject from dictionary. This is allowed only by using cache and not for extended properties. Each property set in that way is muted but it is possible to unmute some of that using unmuted parameter

rulesdata = [
    {
        rulename: "on-device-oboarded-update",
        subscibre_to: "device-onboarded",
        ruledata: {
            filters: [
                ...
                # Check if device has characteristics of an heather
            ]
            processing: [
                SetSubjectProperties(
                    props=lambda: {
                        "device_class": "heather",
                        "on_boarding_tm": datetime.now(),
                    },
                    unmuted=["heather"]
                )
                # Thanks to ArgumentProcessor we can use a lambda, without that on_boarding_tm
                # would be always equal to the Rule instantiation's datetime while we need the execution's one.
            ]
        }
    }
]
execute(props, unmuted=[])
Parameters
  • props – The properties to set

  • unmuted – List of property names for which emit property changed events

class krules_core.base_functions.processing.StoreSubject

Store alla subject properties on the subject storage and then flush the cache. Usually this happens at the end of the ruleset execution.

class krules_core.base_functions.processing.FlushSubject

Remove all subject’s properties. It is important tho recall that a subject exists while it has at least a property, so remove all its properties means remove the subject itself.

rulesdata = [
    {
        rulename: "on-user-unsubscribe-delete-subject",
        subscibre_to: "user-unsubscribed",
        ruledata: {
            processing: [
                DeleteProfileFromDB(user_id=lambda subject: subject.user_id),
                FlushSubject()
            ]
        }
    },
    {
        rulename: "on-onboard-device-store-properties",
        subscribe_to: "onboard-device",
        ruledata: {
            processing: [
                FlushSubject(),
                SetSubjectProperties(lambda payload: payload["data"]),
                SetSubjectProperty('status', 'READY'),
            ],
        },
    },
]

Acting on Payload

class krules_core.base_functions.processing.SetPayloadProperties

Set the given properties in the payload, if some of that already exist will be overridden

rulesdata = [
    {
        rulename: "on-admin-login-update-payload",
        subscibre_to: "user-login",
        ruledata: {
            filters: [
                ...
                # Check if user is admin
            ]
            processing: [
                SetPayloadProperties(  # Definition with a dictionary
                    lambda: **{
                        "has_admin_access": True,
                        "last_login": datetime.now()
                    }
                )
            ]
        }
    },
    {
        rulename: "on-user-login-update-payload",
        subscibre_to: "user-login",
        ruledata: {
            filters: [
                ...
                # Check if user has not admin privileges
            ]
            processing: [
                SetPayloadProperties(  # Definition with named arguments
                    has_admin_access=False,
                    last_login=lambda:datetime.now()
                )
            ]
        }
    },
    # Thanks to ArgumentProcessor we can use a lambda, without that last_login
    # would be always equal to the Rule instantiation's datetime while we need the execution's one.
]
execute(**kwargs)
Parameters

**kwargs – Each named paramenter is the key and the value to update with.

class krules_core.base_functions.processing.SetPayloadProperty

Extends SetPayloadProperties expecting a single property to set

rulesdata = [
    {
        rulename: "on-heather-onboarded-set-class",
        subscibre_to: "device-onboarded",
        ruledata: {
            filters: [
                ...
                # Check if device has characteristics of an heather
            ]
            processing: [
                SetPayloadProperty(
                    property_name="device_class",
                    value="heather"
                )
            ]
        }
    },
]
execute(property_name, value)
Parameters
  • property_name – Name of property which will be to set,

  • value – Value to set.