Event Store Agent
import { EventAgent } from "@fstnetwork/loc-logic-sdk";
Emit and query LOC data events.
Once emitted, events are stored and can be searched in Elasticsearch. They will be generated to data lineage graphs in Studio.
The data lineage or data trail is represented by the relationship of the graph below:
Availability
- ✓ Generic logic
- ✗ Aggregator logic
Emit Events
async EventAgent.emit(events: Event.Event[]): Promise<void>
Emit event(s). The parameter events
is an array of events.
Event Schema
Type Event.Event
has the following fields:
Field | Type | Description |
---|---|---|
labelName | string | Label name (event name) |
sourceDigitalIdentity or sourceDID | string | Source digital identity (DID) |
targetDigitalIdentity or targetDID | string | Target DID |
meta | string | Meta payload (additional data); max length 32768 bytes. |
type | string | Event type (group) |
The input parameter/value of a would-be event is also referred as event schema.
For now type
only supports "default"
.
Elements of events
does not have to be Event.Event
type, but an error would be thrown if label
, sourceDID
or targetDID
field is not present.
Source and target nodes will be created in Elasticsearch if not exist. Any nodes can be both source and target of other events.
Example
- JavaScript
- TypeScript
const events = [
{
// event 1
labelName: "Event name 1",
sourceDID: "Event source 1",
targetDID: "Event target 1",
meta: "",
type: "default",
},
{
// event 2
labelName: "Event name 2",
sourceDID: "Event source 2",
targetDID: "Event target 2",
meta: "",
type: "default",
},
// ...
];
await EventAgent.emit(events);
import { ..., Event } from "@fstnetwork/loc-logic-sdk";
const events: Event.Event[] = [
{
// event 1
labelName: "Event name 1",
sourceDID: "Source DID 1",
targetDID: "Target DID 1",
meta: "",
type: "default",
},
{
// event 2
labelName: "Event name 2",
sourceDID: "Source DID 2",
targetDID: "Target DID 2",
meta: "",
type: "default",
},
// ...
];
await EventAgent.emit(events);
The events may not be properly emitted without using await
.
You can also use JSON.stringify()
to include a JSON object in the meta payload, and later decode it with JSON.parse()
.
Query Events
Query event(s) in Elasticsearch.
async EventAgent.search(request: Search): Promise<SearchResult>
Parameter request
is of type Search
and the function returns type SearchResult
.
Search Parameter
Type: Search
Member | Type | Description |
---|---|---|
queries? | Query[] | null | Event query conditions |
excludes? | Query[] | null | Event exclude conditions |
filters? | Query[] | null | Event filter conditions |
sort? | Query[] | null | Event sort operator |
aggregation? | Aggregation | null | Aggregation syntax |
from? | number | null | Event query starts from |
size? | number | null | Event query size |
All fields are optional. aggregation
is the syntax for getting metrics, statistics, or other analytics from Elasticsearch, which is an advanved feature that we will not demostrate here.
Search Result
Type: SearchResult
Member | Type | Description |
---|---|---|
events | Event[] | Queried events |
count | number | Queried number of events |
total | number | Queried number of events |
took | number | Query time (milllisecond seconds) |
aggregation? | AggregationResult | null | Aggregation results |
count
and total
are similar metrics from Elasticsearch using different APIs; you can ignore them and simply use events.length
instead.
Searched Events
Type: Event[]
An event in events
is type of Event
(different from the one used in emit
):
Member | Type | Description |
---|---|---|
dataProcessIdentityContext | IdentityContext (see Context) | Data process ID and name |
logicIdentityContext | IdentityContext | Logic identity ID and name |
executionId | string | Execution ID |
taskId | string | Task ID |
sequence | number | Event sequence number |
label | Label , which is { id: string; name: string; } | Event label ID and name |
sourceDigitalIdentity | string | Source DID |
targetDigitalIdentity | string | Target DID |
meta | string | Meta payload |
type | string | Event group |
timestamp | string | Event emitted time |
Each queried event, other than the basic fields, also contains info about the execution, task, logic and data process where it was emitted.
Example: query events
List of available query fields
Name | Description |
---|---|
data_process_permanent_identity | Data process permanent ID |
data_process_name | Data process name |
data_process_revision | Data process revision number |
logic_name | Logic name |
logic_permanent_identity | Logic permanent ID |
logic_revision | Logic revision number |
execution_id | Execution ID |
task_id | Task ID |
sequence | Event sequence number |
label_id | label ID |
label_name | Label name (labelName in emit) |
source_digital_identity | Source DID (sourceDID in emit) |
target_digital_identity | Target DID (targetDID in emit) |
type | Type (type in emit) |
- JavaScript
- TypeScript
const requests = {
queries: [
{
field: "label_name", // field name
type: "match", // matching operater
value: "your event name", // value
},
// match condition 2...
],
excludes: [],
filters: [],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
};
const query = await EventAgent.search(requests);
const events = query?.events;
// iterate through events
events.forEach((event) => {
const label_name = event.label.name;
const meta = event.meta;
// ...
});
import { ..., Search } from "@fstnetwork/loc-logic-sdk";
const requests: Search = {
queries: [
{
field: "label_name", // field name
type: "match", // querying method
value: "your event name", // value
},
// match condition 2...
],
excludes: [],
filters: [],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
};
const query = await EventAgent.search(requests);
const events = query?.events;
// iterate through events
events.forEach((event) => {
const label_name = event.label.name;
const meta = event.meta;
// ...
});
You can choose from one of the following querying methods:
Querying Method | Meaning |
---|---|
"match" | Any word in the field matches words in your value. Standard full-text search. Suitable for most use cases. |
"term" | Field matches exactly your value. |
"match_phrase" | Words and their order in the field matches words in your value. (For example, value "has been" matches field it has been raining .) |
These querying methods are directly from Elasticsearch: match, term and match_phrase.
Example: exclude events
- JavaScript
- TypeScript
const requests = {
queries: [],
excludes: [
{
// exclude condition 1
field: "source_digital_identity",
type: "match",
value: "your source DID",
},
// match condition 2...
],
filters: [],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
};
const query = await EventAgent.search(requests);
const events = query?.events;
import { ..., Search } from "@fstnetwork/loc-logic-sdk";
const requests: Search = {
queries: [],
excludes: [
{
// exclude condition 1
field: "source_digital_identity",
type: "match",
value: "your source DID",
},
// exclude condition 2...
],
filters: [],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
};
const query = await EventAgent.search(requests);
const events = query?.events;
Example: filter events (range)
If a field of certain events is numeric data, you can apply a filter range:
- JavaScript
- TypeScript
const requests = {
queries: [],
excludes: [],
filters: [
{
// filter condition 1
field: "target_digital_identity", // field name
gte: 9000, // value greater than or equal to
lte: null, // value smaller than or equal to
type: "range",
},
// filter condition 2...
],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
};
const query = await EventAgent.search(requests);
const events = query?.events;
import { ..., Search } from "@fstnetwork/loc-logic-sdk";
const requests: Search = {
queries: [],
excludes: [],
filters: [
{
// filter condition 1
field: "target_digital_identity", // field name
gte: 9000, // value greater than or equal to
lte: null, // value smaller than or equal to
type: "range",
},
// filter condition 2...
],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
};
const query = await EventAgent.search(requests);
const events = query?.events;
Both gte
and lte
fields are optional and can be set to either a number
or null
. The example above will query events that target_digital_identity
>= 9000.
Example: filter events (wildcard)
filters
can apply a wildcard search on string names as well, using the following wildcard operators:
Wilcard operator | Description |
---|---|
? | Representing any single character |
* | Representing zero or more characters, including an empty one |
For example, event-?-*
matches event-A-1
and event-B-123
, and so on.
- JavaScript
- TypeScript
const requests = {
queries: [],
excludes: [],
filters: [
{
// filter condition 2...
field: "target_digital_identity", // field name
type: "wildcard",
value: "some?name*", // wildcard value
},
// filter condition 2...
],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
};
const query = await EventAgent.search(requests);
const events = query?.events;
import { ..., Search } from "@fstnetwork/loc-logic-sdk";
const requests: Search = {
queries: [],
excludes: [],
filters: [
{
// filter condition 2...
field: "target_digital_identity", // field name
type: "wildcard",
value: "some?name*", // wildcard value
},
// filter condition 2...
],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
};
const query = await EventAgent.search(requests);
const events = query?.events;
Example: sort events
- JavaScript
- TypeScript
const requests = {
queries: [],
excludes: [],
filters: [],
sorts: [
{
// sort condition 1
field: "source_digital_identity",
order: "desc",
},
// sort condition 2...
],
aggregation: null,
from: 0,
size: 1000,
};
const query = await EventAgent.search(requests);
const events = query?.events;
import { ..., Search } from "@fstnetwork/loc-logic-sdk";
const requests: Search = {
queries: [],
excludes: [],
filters: [],
sorts: [
{
// sort condition 1
field: "source_digital_identity",
order: "desc",
},
// sort condition 2...
],
aggregation: null,
from: 0,
size: 1000,
};
const query = await EventAgent.search(requests);
const events = query?.events;
order
can be either "asc"
(ascending order) or "desc"
(descending order). Works for both numeric data or non-numeric strings (sorting alphabetically).
Query Event Sequences
Search sequence of events. The first event has to satisfy first search condition, and so on...
async EventAgent.searchWithPattern(request: Pattern): Promise<PatternResult>
Sequence Search Parameter
Type: Pattern
Member | Type | Description |
---|---|---|
sequences | Sequence[] | Sequence of conditions |
filter? | Filter[] | Filter conditions (see here) |
maxSpan? | string | Search time span (for example, 30s = 30 secs and 15m = 15 mins) |
Sequence Parameter
Type: Sequence
Member | Type | Description |
---|---|---|
conditions? | Condition[] | null | Sequence query conditions |
sharedFields? | string[] | null | |
type? | string | null |
The available field names in conditions?
are the same as search
(see here). See the example for details.
Sequence Search Result
Type: PatternResult
Member | Type | Description |
---|---|---|
sequences | SequencesResult[] | Sequence of queried events |
count | number | Queried number of events |
total | number | Queried number of events |
took | number | Query time (milllisecond seconds) |
Returned Sequence
Type: SequencesResult
PatternResult
contains an array of such sequences, each sequence would contain one or more events:
Member | Type | Description |
---|---|---|
events | Event[] | Queried events (same as the one used by seqrch) |
joinKeys | string[] |
Example
- JavaScript
- TypeScript
// create sequence search pattern
const requests = {
sequences: [
// must have at least two event conditions!
{
// sequence 1 event condition
conditions: [
{
field: "label_name", // field name
op: "eq", // operator
value: "label name", // value
},
],
sharedFields: [],
type: "any",
},
{
// sequence 2 event condition
conditions: [
{
field: "source_digital_identity",
op: "gt",
value: "source DID",
},
{
field: "target_digital_identity",
op: "lt",
value: "target DID",
},
],
sharedFields: [],
type: "any",
},
],
filter: null,
maxSpan: null,
};
const query = await EventAgent.searchWithPattern(requests);
const sequences = query?.sequences;
// iterate through sequences
sequences.forEach((sequence) => {
// iterate through events in each sequence
sequence.events?.forEach((event) => {
const label_name = event.label.name;
const meta = event.meta;
// ...
});
});
import { ..., Pattern } from "@fstnetwork/loc-logic-sdk";
// create sequence search pattern
const requests: Pattern = {
sequences: [
// must have at least two event conditions!
{
// sequence 1 event condition
conditions: [
{
field: "label_name", // field name
op: "eq", // operator
value: "label name", // value
},
],
sharedFields: [],
type: "any",
},
{
// sequence 2 event condition
conditions: [
{
field: "source_digital_identity",
op: "gt",
value: "source DID",
},
{
field: "target_digital_identity",
op: "lt",
value: "target DID",
},
],
sharedFields: [],
type: "any",
},
],
filter: null,
maxSpan: null,
};
const query = await EventAgent.searchWithPattern(requests);
const sequences = query?.sequences;
// iterate through sequences
sequences.forEach((sequence) => {
// iterate through events in each sequence
sequence.events?.forEach((event) => {
const label_name = event.label.name;
const meta = event.meta;
// ...
});
});
op
operator in conditions
includes the following options:
Operator | Descriptionn |
---|---|
"eq" | equal to |
"ne" | not equal to |
"gt" | greater than |
"lt" | less than |
"gte" | greater than or equal to |
"lte" | less than or equal to |