Processing¶
A set o basic functions designed to be used in the processing section for the purpose of to read and manipulate event data and implement yur application logic.
Generic Functions¶
-
class
krules_core.base_functions.processing.Process¶ Like Filter evaluate a given expression but does not return it. The best way to exploit it is to use it in combination with Argument Processors.
from krules_env import RULE_PROC_EVENT #... rulesdata = [ { # Store processed events with Django ORM rulename: "processed-rules", subscibre_to: RULE_PROC_EVENT, ruledata: { processing: [ Process( lambda payload:( ProcessedEvent.objects.create( rule_name=payload["name"], type=payload["type"], subject=payload["subject"], event_info=payload["event_info"], payload=payload["payload"], time=payload["event_info"].get("time", datetime.now().isoformat()), filters=payload["filters"], processing=payload["processing"], got_errors=payload["got_errors"], processed=payload["processed"], origin_id=payload["event_info"].get("originid", "-") ) ) ), ] } } ]
-
execute(value)¶
-
-
class
krules_core.base_functions.processing.Route¶ Produce an event inside and/or outside the ruleset, for “sending outside” the event we mean to deliver it to the dispatcher component. By default an event is dispatched outside only if there is no handler defined in the current ruleset. However it is possible to change this behavior using dispatch_policy. Available choices are defined in krules_core.route.router.DispatchPolicyConst as:
DEFAULT: Dispatched outside only when no handler is found in current ruleset;
ALWAYS: Always dispatched outside even if an handler is found and processed in the current ruleset;
NEVER: Never dispatched outside;
DIRECT: Skip to search for a local handler and send outside directly.
from krules_core.route.router.DispatchPolicyConst import DEFAULT, ALWAYS, NEVER, DIRECT from krules_core.event_types import SUBJECT_PROPERTY_CHANGED # ... rulesdata = [ { rulename: "on-device-onboarded-dispatch-added-event", subscibre_to: "device-onboarded", ruledata: { processing: [ # ... # do something with device Route( subject=lambda payload: payload["device_id"], payload=lambda payload: payload["device_data"], event_type="device-added", # no dispatch_policy is provided so will be used the DEFAULT one ), ] } }, { rulename: "on-position-change-propagate-event", subscibre_to: SUBJECT_PROPERTY_CHANGED, ruledata: { filters: [ OnSubjectPropertyChanged("position") ] processing: [ Route( dispatch_policy=DIRECT # In this case we don't specify neither type, nor subject, nor payload. # We use dispatch_policy DIRECT to propagate the received event outside, this increase # efficiency because we want avoid useless check in other rules subscribed to SUBJECT_PROPERTY_CHANGED. # Note that the rules are processed following the order in which they were defined. ) ] } }, { rulename: "on-temp-change-propagate-event", subscibre_to: SUBJECT_PROPERTY_CHANGED, ruledata: { filters: [ OnSubjectPropertyChanged("temp", value=lambda v: v > 30) ] processing: [ Route( event_type="device-overheated" dispatch_policy=ALWAYS # We want to handle device-overheated event both in the current container and outside, for example to send an external notification ) ] } }, { rulename: "on-device-overheated-schedule-check", subscribe_to: "device-overheated", ruledata: { # ... } }, ]
-
execute(event_type=None, subject=None, payload=None, dispatch_policy='default')¶ - Parameters
event_type – The event type. If None use current processing event type [default None]
subject – The event subject. If None use the current subject [default None]
payload – The event payload. If None use the current payload [default None]
dispatch_policy – Define the event dispatch policy as explained before. [default DispatchPolicyConst.DEFAULT]
-
class
krules_core.base_functions.processing.RaiseException¶ Force the given exception raising
from .my_custom_exceptions import UnexpectedPayload # supposing we defined a module with custom exceptions rulesdata = [ { rulename: "on-unexpected-payload-raise-exception", subscibre_to: "device-onboarded", ruledata: { filters: [ Return(lambda payload: "device_id" not in payload) ] processing: [ RaiseException( UnexpectedPayload("device_id missing!") ) ] } }, ]
-
execute(ex)¶ - Parameters
ex – The exception to be raised
-
Acting on Subject¶
-
class
krules_core.base_functions.processing.SetSubjectProperty¶ Set a single property of the subject, supporting atomic operation. By default, the property is reactive unless is muted (muted=True) or extended (extended=True)
rulesdata = [ { rulename: "set-device-class", subscibre_to: "device-onboarded", ruledata: { filters: [ ... # Check if device has characteristics of an heather ] processing: [ SetSubjectProperty( property_name="device_class", value="heather" ) ] } }, { rulename: "on-new-checkup-increment-counter", subscibre_to: "checkup", ruledata: { processing: [ SetSubjectProperty( property_name="checkup_cnt", value=lambda x: x is None and 1 or x + 1 # Operation is atomic ) ] } } ]
-
execute(property_name, value, extended=False, muted=False, use_cache=True)¶ - Parameters
property_name – Name of the property to set. It may or may not exist
value – Value to set. It can be a callable and receives (optionally) the current property value. If the property does not exist yet, it receives None. Note that value setting is an atomic operation.
extended – If True set an extended property instead a standard one. [default False]
muted – If True no subject-property-changed will be raised after property setting. Note that extended properties are always muted so, if extended is True, this parameter will be ignored. [default False]
use_cache – If False store the property value immediately on the storage, otherwise wait for the end of rule execution. [default False]
-
-
class
krules_core.base_functions.processing.SetSubjectPropertyImmediately¶ Extends SetSubjectProperty setting a property directly to the storage without using the cache ( use_cache=False ). This could be very helpful to avoid concurrency issues by avoiding running into inconsistencies during the execution. The extension’s aim is to made code more readable.
-
execute(property_name, value, extended=False, muted=False, **kwargs)¶
-
-
class
krules_core.base_functions.processing.SetSubjectExtendedProperty¶ Extends SetSubjectProperty setting an extended property of the subject( extended=True ). Note that muted is not present anymore in the arguments because an extended property is always muted. The extension’s aim is to made code more readable.
-
execute(property_name, value, use_cache=True, **kwargs)¶
-
-
class
krules_core.base_functions.processing.SetSubjectProperties¶ Set multiple properties in subject from dictionary. This is allowed only by using cache and not for extended properties. Each property set in that way is muted but it is possible to unmute some of that using unmuted parameter
rulesdata = [ { rulename: "on-device-oboarded-update", subscibre_to: "device-onboarded", ruledata: { filters: [ ... # Check if device has characteristics of an heather ] processing: [ SetSubjectProperties( props=lambda: { "device_class": "heather", "on_boarding_tm": datetime.now(), }, unmuted=["heather"] ) # Thanks to ArgumentProcessor we can use a lambda, without that on_boarding_tm # would be always equal to the Rule instantiation's datetime while we need the execution's one. ] } } ]
-
execute(props, unmuted=[])¶ - Parameters
props – The properties to set
unmuted – List of property names for which emit property changed events
-
-
class
krules_core.base_functions.processing.StoreSubject¶ Store alla subject properties on the subject storage and then flush the cache. Usually this happens at the end of the ruleset execution.
-
class
krules_core.base_functions.processing.FlushSubject¶ Remove all subject’s properties. It is important tho recall that a subject exists while it has at least a property, so remove all its properties means remove the subject itself.
rulesdata = [ { rulename: "on-user-unsubscribe-delete-subject", subscibre_to: "user-unsubscribed", ruledata: { processing: [ DeleteProfileFromDB(user_id=lambda subject: subject.user_id), FlushSubject() ] } }, { rulename: "on-onboard-device-store-properties", subscribe_to: "onboard-device", ruledata: { processing: [ FlushSubject(), SetSubjectProperties(lambda payload: payload["data"]), SetSubjectProperty('status', 'READY'), ], }, }, ]
Acting on Payload¶
-
class
krules_core.base_functions.processing.SetPayloadProperties¶ Set the given properties in the payload, if some of that already exist will be overridden
rulesdata = [ { rulename: "on-admin-login-update-payload", subscibre_to: "user-login", ruledata: { filters: [ ... # Check if user is admin ] processing: [ SetPayloadProperties( # Definition with a dictionary lambda: **{ "has_admin_access": True, "last_login": datetime.now() } ) ] } }, { rulename: "on-user-login-update-payload", subscibre_to: "user-login", ruledata: { filters: [ ... # Check if user has not admin privileges ] processing: [ SetPayloadProperties( # Definition with named arguments has_admin_access=False, last_login=lambda:datetime.now() ) ] } }, # Thanks to ArgumentProcessor we can use a lambda, without that last_login # would be always equal to the Rule instantiation's datetime while we need the execution's one. ]
-
execute(**kwargs)¶ - Parameters
**kwargs – Each named paramenter is the key and the value to update with.
-
-
class
krules_core.base_functions.processing.SetPayloadProperty¶ Extends SetPayloadProperties expecting a single property to set
rulesdata = [ { rulename: "on-heather-onboarded-set-class", subscibre_to: "device-onboarded", ruledata: { filters: [ ... # Check if device has characteristics of an heather ] processing: [ SetPayloadProperty( property_name="device_class", value="heather" ) ] } }, ]
-
execute(property_name, value)¶ - Parameters
property_name – Name of property which will be to set,
value – Value to set.
-
2021, Airspot s.r.l. Sede Legale:Via Ormea 33 10125 Torino, TO Italy C.F. e P. IVA: 12141910013.
|
Powered by