Skip to main content
Version: LOC v0.10 (legacy)

Events and Multiple Data Processes

Learning Objective
  • To query events emitted from another data process.
  • To inspect the larger data lineage consist of multiple types of events.

This is the follow-up of Emit and Inspect Events, in which we've learned the basics of emitting events.

In reality, your company may have much longer business processes, and they may lay across several different departments. LOC events play an important role of communicate between data processes, as well as to track how data flows.

For example, now that we have some policy applications coming in, we need to assign them to insurance agents. We will, of course, build a second data process to achieve this.

info

How to trigger the second data process depends on your use case, but generally you can

  1. Have the second data process run on scheduler and check events periodically;
  2. Have the first data process invoke a API route or message queue trigger;
  3. Manually started by human operators.

In this example we will execute it manually.

Query Events

Use Case: Assigning Insurance Agent

This new data process will query the EventPolicyRequest events emitted in the previous tutorial, extract the request ID and policy code, then assign to the agent who is responsible for a specific type of policy:

We will use a mock-up agent mapping table (instead of querying it from some databases):

Agent nameResponsible for
MarvinP100, P200
TrillianP300, P400, P500

So if the requested insurance policy is type P100, it will be assigned to Marvin, and so on.

Code Walkthrough

This time we'll create the second data process Policy Assign:

LogicNamePurpose
Generic logic #1Policy Query LogicQuery EventPolicyRequest events and write into session
Generic logic #2Policy Assign LogicRead from session and emit EventAssignAgent events
Aggregator logicResult Aggregator (source)Finalise task result

We don't need Payload JSON Parser this time since this data process does not require trigger payload. Instead, policy-query will query the events we are looking for and write them into the parsed session variable.

import {
LoggingAgent,
SessionStorageAgent,
EventAgent,
} from "@fstnetwork/loc-logic-sdk";

export async function run(ctx) {
// event query conditions
const requests = {
queries: [
{
field: "label_name", // field name
type: "match", // matching operator
value: "EventPolicyRequest", // value (event name)
},
],
excludes: [],
filters: [],
from: 0,
size: 1000,
sorts: [],
};

// query events
const query = await EventAgent.search(requests);
const events = query?.events;

if (events) {
// log queried events
LoggingAgent.info({ queried_events: query?.events });

// overwrite parsed with events if not empty
await SessionStorageAgent.putJson("parsed", query?.events);
}
}

export async function handleError(ctx, error) {
// error logging
LoggingAgent.error({
error: true,
errorMessage: error.message,
stack: error.stack,
taskId: ctx.task.taskKey,
});
}
Filter events within a timespan

By the example above the event store agent would query as many as 1,000 events. It would be sensible to do some double-checking so that you won't process the same event twice. You can also filter events within s certain timespan:

...
const requests = {
queries: [
{
field: "label_name",
type: "match",
value: "EventPolicyRequest"
}
],
excludes: [],
filters: [ // add a filter condition
{
field: "timestamp",
gte: Date.now() - 10 * 60 * 1000, // greater than or equal
lte: Date.now(), // less than or equal
type: "range",
}
],
from: 0,
size: 1000,
sorts: [],
};

...

Both gte and lte fields receive a number (unix timestamp).

The query parameters above would try to find events with label name EventPolicyRequest emitted within 10 minutes.

Invoke Data Process

note

This data process does not need any payload. You can execute it with manual trigger or an API route without adding any payload.

After executing the data process, tt should return a similar result as the first policy allication data process:

{
"status": "ok",
"taskKey": {
"executionId": "ZOMn3_2atIqBa1xcELqoYw",
"taskId": "MgSowyszCqBNB2uPW33Lsg"
},
"data": {
"emitted_length": 3,
"events": [
{
"labelName": "EventAssignAgent",
"sourceDID": "R0042",
"targetDID": "Trillian",
"meta": "P100",
"type": "default"
},
{
"labelName": "EventAssignAgent",
"sourceDID": "R0043",
"targetDID": "Marvin",
"meta": "P300",
"type": "default"
},
{
"labelName": "EventAssignAgent",
"sourceDID": "R0044",
"targetDID": "Trillian",
"meta": "P500",
"type": "default"
}
]
}
}

Inspect Events

To filter events emitted from different tasks, we can use "Quick" mode to filter events within a specific time range:

events3

After locating the six events we're looking for, we can switch to the data lineage graph (you'll need to drag the nodes around to your liking):

datalineage2

This clearly show the data flow of between the two data processes: the applicant first submited a request for a certain type of insurance policy, then each request got assigned to one agent (if there is one). This can go on across the whole business process and be used to find out incorrect business mappings or missing links.