Skip to main content
Version: LOC v0.6 (legacy)

Event Store Agent

Emit and query LOC data events.

Once emitted, events are stored and can be searched in Elasticsearch. They will be generated to data lineage graphs in Studio.

Availability

  • ✓ Generic logic
  • ✗ Aggregator logic

Emit Events

async eventStore.emit(events: Event.Event[]): Promise<void>

Emit event(s). The parameter events is an array of events.

Type Event.Event has to have the following fields:

FieldTypeDescription
labelNamestringLabel name (event name)
sourceDigitalIdentity or sourceDIDstringSource digital identity (DID)
targetDigitalIdentity or targetDIDstringTarget DID
metastringMeta payload (additional data); max length 32768 bytes.
typestringEvent type (group)
note

For now the type only supports "default".

The data lineage or data trail is represented by the relationship of

Source and target nodes will be created in Elasticsearch if not exist. Any nodes can be both source and target of other events.

Example

const events = [
{
// event 1
labelName: "Event name 1",
sourceDID: "Event source 1",
targetDID: "Event target 1",
meta: "",
type: "default",
},
{
// event 2
labelName: "Event name 2",
sourceDID: "Event source 2",
targetDID: "Event target 2",
meta: "",
type: "default",
},
// ...
];

await ctx.agents.eventStore.emit(events);
note

The events may not be properly emitted without using await.

tip

You can also use JSON.stringify() to include a JSON object in the meta payload, and later decode it with JSON.parse().

Query Events

Query event(s) in Elasticsearch.

async eventStore.search(request: Search): Promise<SearchResult>

Parameter request is of type Search (all field are optional):

note

Due to the complexity of search options, we won't show the detail of type SearchRequest and SearchResponse here. Please refer to the examples.

The returned events (an array) will be at SearchRequest.events. Each element has the type Event:

MemberTypeDescription
dataProcessIdentityContextIdentityContext (see Context)Data process ID and name
logicIdentityContextIdentityContextLogic identity ID and name
executionIdstringExecution ID
taskIdstringTask ID
sequencenumberEvent sequence number
labelLabel, which is { id: string; name: string; }Event label ID and name
sourceDigitalIdentitystringSource DID
targetDigitalIdentitystringTarget DID
metastringMeta payload
typestringEvent group
timestampstringEvent emitted time

Example: query events

// helper function
const createMatch = (field, value) => ({ Match: { field, value } });

const requests = {
queries: [
createMatch(
// match condition 1
"label_name", // field name
"event name", // value
),
// match condition 2...
],
excludes: [],
filters: [],
from: 0,
size: 1000,
sorts: [],
};

const query = await ctx.agents.eventStore.search(requests);
const events = query?.events;

// iterate through events
events.forEach((event) => {
const label_name = event.label.name;
const meta = event.meta;
// ...
});
tip

You can combine conditions in queries, excludes, filters and sorts for more precise search. These field names are also required even if you don't use them.

List of available query field names
NameDescription
data_process_permanent_identityData process permanent ID
data_process_nameData process name
data_process_revisionData process revision number
logic_nameLogic name
logic_permanent_identityLogic permanent ID
logic_revisionLogic revision number
execution_idExecution ID
task_idTask ID
sequenceEvent sequence number
label_idlabel ID
label_nameLabel name (labelName in emit)
source_digital_identitySource DID (sourceDID in emit)
target_digital_identityTarget DID (targetDID in emit)
typeType (type in emit)

Example: exclude events

const createMatch = (field, value) => ({ Match: { field, value } });

const requests = {
queries: [],
excludes: [
createMatch(
// match condition 1
"source_digital_identity", // field name
"source DID", // exclude events with this source DID
),
// match condition 2...
],
filters: [],
from: 0,
size: 1000,
sorts: [],
};

const query = await ctx.agents.eventStore.search(requests);
const events = query?.events;

Example: filter events (compare)

const filterCondition = (field, type, value) => ({ Range: { field, value: { [type]: value } } });

const requests = {
queries: [],
excludes: [],
filters: [
filterCondition( // filter condition 1
"target_digital_identity", // field name
"gte" // compare type
9000, // compared value
), // filter events that target DIDs >= 9000
// filter condition 2...
],
from: 0,
size: 1000,
sorts: [],
};

const query = await ctx.agents.eventStore.search(requests);
const events = query?.events;

The compared value has to be number type. The compare type can be either "gte" (greater than or equal) or "lte" (less than or equal).

tip

It would be a good idea to use timestamp field to filter out older events so you won't have to process them twice.

Example: filter events (wildcard)

filters can apply a wildcard search as well:

const filterConditionWildCard = (field, value) => ({
Wildcard: { field, value },
});

const requests = {
queries: [],
excludes: [],
filters: [
filterConditionWildCard(
// filter condition 1
"target_digital_identity", // field name
"target DID", // field name to be searched as wildcard
),
// filter condition 2...
],
from: 0,
size: 1000,
sorts: [],
};

const query = await ctx.agents.eventStore.search(requests);
const events = query?.events;

Example: sort events

const requests = {
queries: [],
excludes: [],
filters: [],
from: 0,
size: 1000,
sorts: [
{
// sort condition 1
field: "source_digital_identity",
orderBy: "Desc",
},
// sort condition 2...
],
};

const query = await ctx.agents.eventStore.search(requests);
const events = query?.events;

orderBy can be either "Asc" (ascending order) or "Desc" (descending order). Works for numeric or non-numeric string values.

Query Event Sequences

Search sequence of events. The first event has to satisfy first search condition, and so on...

async eventStore.searchWithPattern(request: PatternRequest): Promise<PatternResponse>
note

Like search, we'll skip the detail of type PatternRequest and PatternResponse.

Example

// helper function
const sequenceCondition = (field, value, type) => {
const operators = ["Eq", "NotEq", "Gt", "Lt", "Gte", "Lte"];
if (!operators.includes(type)) type = "Eq";
return { [type]: { field, value } };
};

// create sequence search pattern
const requests = {
sequences: [
// must have at least two event conditions!
{
// sequence 1 event condition
conditions: [sequenceCondition("label_name", "event name 1", "Eq")],
sharedFields: [],
type: "any",
},
{
// sequence 2 event condition
conditions: [sequenceCondition("label_name", "event name 2", "Eq")],
sharedFields: [],
type: "any",
},
],
filter: null,
maxSpan: null,
};

const query = await ctx.agents.eventStore.searchWithPattern(requests);
const sequences = query?.sequences;

// iterate through sequences
sequences.forEach((sequence) => {
// iterate through events in each sequence
sequence.events?.forEach((event) => {
const label_name = event.label.name;
const meta = event.meta;
// ...
});
});

Available field names are the same as search

type operator options in sequenceCondition includes:

OperatorDescriptionn
"Eq"equal
"NotEq"not equal
"Gt"greater than
"Lt"less than
"Gte"greater than or equal
"Lte"less than or equal