Event Store Agent
import { EventAgent, Event, Search, Pattern } from "@fstnetwork/loc-logic-sdk";
Emit and query LOC data events.
Once emitted, events are stored and can be searched in Elasticsearch in a very short time. They will be generated to data lineage graphs in Studio.
The data lineage or data trail is represented by the relationship of the graph below:
Availability
- ✓ Generic logic
- ✗ Aggregator logic
Emit Events
async EventAgent.emit(events: Event.Event[]): Promise<void>
Emit event(s). The parameter events
is an array of events.
Event Schema
Type Event.Event
has the following fields:
Field | Type | Description |
---|---|---|
labelName | string | Label name (event name) |
sourceDigitalIdentity or sourceDID | string | Source digital identity (DID) |
targetDigitalIdentity or targetDID | string | Target DID |
meta | string | Meta payload (additional data); max length 215 (32768) bytes. |
type | string | Event type (group) |
The input parameter/value of a would-be event is also referred as event schema.
For now type
only supports "default"
.
Elements of events
does not have to be Event.Event
type, but an error would be thrown if label
, sourceDID
or targetDID
field is not present.
Source and target nodes will be created in Elasticsearch if not exist. Any nodes can be both source and target of other events.
Example
- JavaScript
- TypeScript
await EventAgent.emit([
{
// event 1
labelName: "Event name 1",
sourceDID: "Event source 1",
targetDID: "Event target 1",
meta: "",
type: "default",
},
{
// event 2
labelName: "Event name 2",
sourceDID: "Event source 2",
targetDID: "Event target 2",
meta: "",
type: "default",
},
// ...
]);
import { ..., Event } from "@fstnetwork/loc-logic-sdk";
await EventAgent.emit([
{
// event 1
labelName: "Event name 1",
sourceDID: "Source DID 1",
targetDID: "Target DID 1",
meta: "",
type: "default",
},
{
// event 2
labelName: "Event name 2",
sourceDID: "Source DID 2",
targetDID: "Target DID 2",
meta: "",
type: "default",
},
// ...
] as Event.Event[]);
The events may not be properly emitted without using await
.
You can also use JSON.stringify()
to include a JSON object in the meta payload, and later decode it with JSON.parse()
.
Query Events
Query event(s) in Elasticsearch.
async EventAgent.search(request: Search): Promise<SearchResult>
Parameter request
is of type Search
and the function returns type SearchResult
.
Search Parameter
Type: Search
Member | Type | Description |
---|---|---|
queries? | Query[] | null | Event query conditions |
excludes? | Query[] | null | Event exclude conditions |
filters? | Filter[] | null | Event filter conditions |
sort? | Sort[] | null | Event sort operator |
aggregation? | Aggregation | null | Aggregation syntax |
from? | number | null | Event query starts from |
size? | number | null | Event query size |
All fields are optional. aggregation
is the syntax for getting metrics, statistics, or other analytics from Elasticsearch, which is an advanved feature that we will not demostrate here.
- type Query
- type Filter
- type Sort
type Query =
| {
field: string; // event field - see "available fields names" below
type: "match"; // match operator
value: string; // field value to be matched
}
| {
field: string;
type: "match_phrase";
value: string;
}
| {
field: string;
type: "term";
value: string;
};
These querying methods are directly from Elasticsearch: match, term and match_phrase:
type (match operator) | Description |
---|---|
"match" | Any word in the field matches your value (fuzzy search). Standard full-text search. Suitable for most use cases. |
"term" | The field matchees exactly to your value (precise search). |
"match_phrase" | Words and their order matches words given in your value. (For example, value "has been" matches field it has been raining .) |
See query events example.
type Filter =
| {
// range filter
field: string; // event field - see "available fields names" below
gte?: number | null; // greater than or equal to (≥)
lte?: number | null; // less than or equal to (≤)
type: "range";
}
| {
// wilcard filter
field: string;
type: "wildcard";
value: string;
};
A filter can be either range or wildcard filter. Set gte
and/or lte
to null
means no filter.
The wildcard filter value can contain the following characters:
Wilcard operator | Description |
---|---|
? | Representing any single character |
* | Representing zero or more characters, including an empty one |
For example, event-?-*
matches event-A-1
and event-B-123
, and so on.
See filter events example (range) and filter events example (wildcard).
interface Sort {
field: string; // event field - see "available fields names" below
order: SortOrder;
}
type SortOrder = "asc" | "desc";
order
can be set to one of the following string:
"asc"
(sort by ascending order)"desc"
(sort by descending order)
This also works for both numeric data or non-numeric strings (sorting alphabetically).
See sort events example.
List of available fields names for query, filter or sort
Name | Description |
---|---|
label_id | label ID |
label_name | Label name (labelName in emit() ) |
source_digital_identity | Source DID (sourceDID in emit() ) |
target_digital_identity | Target DID (targetDID in emit() ) |
type | Type (type in emit() ) |
sequence | Event sequence number (the emit order in an array, starting from 0 ) |
timestamp | Event emitted time (unix timestamp) |
execution_id | Execution ID |
task_id | Task ID |
data_process_permanent_identity | Data process permanent ID |
data_process_name | Data process name |
data_process_revision | Data process revision number |
logic_name | Logic name |
logic_permanent_identity | Logic permanent ID |
logic_revision | Logic revision number |
Search Result
Type: SearchResult
Member | Type | Description |
---|---|---|
events | Event[] | Queried events |
count | number | Number of events to be queried (size parameter from Search ) |
total | number | Actual queried number of events |
took | number | Query time (milllisecond seconds) |
aggregation? | AggregationResult | null | Aggregation results |
count
and total
are similar metrics from Elasticsearch using different APIs; you can ignore them and simply use events.length
instead.
List of examples:
Example: query events
- JavaScript
- TypeScript
const requests = {
queries: [
{
field: "label_name", // field name
type: "match", // matching operater
value: "your event name", // value
},
// match condition 2...
],
excludes: [],
filters: [],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
};
const query = await EventAgent.search(requests);
const events = query?.events;
// iterate through events
events.forEach((event) => {
const label_name = event.label.name;
const meta = event.meta;
// ...
});
import { ..., Search } from "@fstnetwork/loc-logic-sdk";
const requests: Search = {
queries: [
{
field: "label_name", // field name
type: "match", // querying method
value: "your event name", // value
},
// match condition 2...
],
excludes: [],
filters: [],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
};
const query = await EventAgent.search(requests);
const events = query?.events;
// iterate through events
events.forEach((event) => {
const label_name = event.label.name;
const meta = event.meta;
// ...
});
Events require a little bit of time to be stored into Elasticsearch. If you query events almost immediately after they are emitted, EventAgent.search
may return an empty result.
One of the workaround is to use a timed loop:
let events = [];
const start = Date.now();
let now = Date.now();
// wait as long as 30 seconds to query events
do {
const query = await EventAgent.search({
queries: [
{
field: "label_name",
type: "match",
value: "label name",
},
],
excludes: [],
filters: [],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
});
events = query?.events;
now = Date.now();
} while (events.length == 0 || now - start < 30000);
The example above will keep query events until something is returned or the time exceeds 30 seconds.
Example: exclude events
- JavaScript
- TypeScript
const query = await EventAgent.search({
queries: [],
excludes: [
{
// exclude condition 1
field: "source_digital_identity",
type: "match",
value: "your source DID",
},
// match condition 2...
],
filters: [],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
});
const events = query?.events;
import { ..., Search } from "@fstnetwork/loc-logic-sdk";
const query = await EventAgent.search({
queries: [],
excludes: [
{
// exclude condition 1
field: "source_digital_identity",
type: "match",
value: "your source DID",
},
// exclude condition 2...
],
filters: [],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
} as Search);
const events = query?.events;
Example: filter events (range)
If a field of certain events is numeric data, you can apply a filter range:
- JavaScript
- TypeScript
const query = await EventAgent.search({
queries: [],
excludes: [],
filters: [
{
// filter condition 1
field: "target_digital_identity", // field name
gte: 9000, // value greater than or equal to
lte: null, // value smaller than or equal to
type: "range",
},
// filter condition 2...
],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
});
const events = query?.events;
import { ..., Search } from "@fstnetwork/loc-logic-sdk";
const query = await EventAgent.search({
queries: [],
excludes: [],
filters: [
{
// filter condition 1
field: "target_digital_identity", // field name
gte: 9000, // value greater than or equal to 9000
lte: null, // value smaller than or equal to null
type: "range",
},
// filter condition 2...
],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
} as Search);
const events = query?.events;
When filtering events within a time range with timestamp
field, convert the time to unix timestamp. For example:
filters: [ // filter events for the past hour
{
field: "timestamp",
gte: Date.now() - 60 * 60 * 1000, // starts from 1 hour ago (= 60 * 60 * 1000 ms)
lte: Date.now(),
type: "range",
}
],
Example: filter events (wildcard)
filters
can apply a wildcard search on string names as well, using the following wildcard operators:
- JavaScript
- TypeScript
const query = await EventAgent.search({
queries: [],
excludes: [],
filters: [
{
// filter condition 2...
field: "target_digital_identity", // field name
type: "wildcard",
value: "some?name*", // wildcard value
},
// filter condition 2...
],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
});
const events = query?.events;
import { ..., Search } from "@fstnetwork/loc-logic-sdk";
const query = await EventAgent.search({
queries: [],
excludes: [],
filters: [
{
// filter condition 2...
field: "target_digital_identity", // field name
type: "wildcard",
value: "some?name*", // wildcard value
},
// filter condition 2...
],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
} as Search);
const events = query?.events;
Example: sort events
- JavaScript
- TypeScript
const query = await EventAgent.search({
queries: [],
excludes: [],
filters: [],
sorts: [
{
// sort condition 1
field: "source_digital_identity",
order: "desc",
},
// sort condition 2...
],
aggregation: null,
from: 0,
size: 1000,
});
const events = query?.events;
import { ..., Search } from "@fstnetwork/loc-logic-sdk";
const query = await EventAgent.search({
queries: [],
excludes: [],
filters: [],
sorts: [
{
// sort condition 1
field: "source_digital_identity",
order: "desc",
},
// sort condition 2...
],
aggregation: null,
from: 0,
size: 1000,
} as Search);
const events = query?.events;
Queried Events
Type: Event
An event in events
is type of Event
(different from the one used in emit
):
Member | Type | Description |
---|---|---|
label | Label , which is { id: string; name: string; } | Event label ID and name |
sourceDigitalIdentity | string | Source DID |
targetDigitalIdentity | string | Target DID |
meta | string | Meta payload |
type | string | Event group |
sequence | number | Event sequence (the emit order in an array, starting from 0 ) |
timestamp | string | Event emitted datetime (ISO 8601 string) |
executionId | string | Execution ID |
taskId | string | Task ID |
dataProcessIdentityContext | VersionedIdentityContext | Data process ID and name |
logicIdentityContext | VersionedIdentityContext | Logic identity ID and name |
Each queried event, other than the basic fields, also contains info about the execution, task, logic and data process where it was emitted.
Query Event Sequences
Search sequence of events. The first event has to satisfy first search condition, and so on...
async EventAgent.searchWithPattern(request: Pattern): Promise<PatternResult>
Sequence Search Parameter
Type: Pattern
Member | Type | Description |
---|---|---|
sequences | Sequence[] | Sequence of conditions |
filter? | Filter[] | Filter conditions (see here) |
maxSpan? | string | Search time span (for example, 30s = 30 secs and 15m = 15 mins) |
Sequence Parameter
Type: Sequence
Member | Type | Description |
---|---|---|
conditions? | Condition[] | null | Sequence query conditions |
sharedFields? | string[] | null | |
type? | string | null |
The available field names in conditions?
are the same as search()
. See the example for details.
Sequence Search Result
Type: PatternResult
Member | Type | Description |
---|---|---|
sequences | SequencesResult[] | Sequence of queried events |
count | number | Number of events to be queried |
total | number | Actual queried number of events |
took | number | Query time (milllisecond seconds) |
Returned Sequence
Type: SequencesResult
PatternResult
contains an array of such sequences, each sequence would contain one or more events:
Member | Type | Description |
---|---|---|
events | Event[] | Queried events |
joinKeys | string[] |
Example
- JavaScript
- TypeScript
// create sequence search pattern
const query = await EventAgent.searchWithPattern({
sequences: [
// must have at least two event conditions!
{
// sequence 1 event condition
conditions: [
{
field: "label_name", // field name
op: "eq", // operator
value: "label name", // value
},
],
sharedFields: [],
type: "any",
},
{
// sequence 2 event condition
conditions: [
{
field: "source_digital_identity",
op: "gt",
value: "source DID",
},
{
field: "target_digital_identity",
op: "lt",
value: "target DID",
},
],
sharedFields: [],
type: "any",
},
],
filter: null,
maxSpan: null,
});
const sequences = query?.sequences;
// iterate through sequences
sequences.forEach((sequence) => {
// iterate through events in each sequence
sequence.events?.forEach((event) => {
const label_name = event.label.name;
const meta = event.meta;
// ...
});
});
import { ..., Pattern } from "@fstnetwork/loc-logic-sdk";
// create sequence search pattern
const query = await EventAgent.searchWithPattern({
sequences: [
// must have at least two event conditions!
{
// sequence 1 event condition
conditions: [
{
field: "label_name", // field name
op: "eq", // operator
value: "label name", // value
},
],
sharedFields: [],
type: "any",
},
{
// sequence 2 event condition
conditions: [
{
field: "source_digital_identity",
op: "gt",
value: "source DID",
},
{
field: "target_digital_identity",
op: "lt",
value: "target DID",
},
],
sharedFields: [],
type: "any",
},
],
filter: null,
maxSpan: null,
} as Pattern);
const sequences = query?.sequences;
// iterate through sequences
sequences.forEach((sequence) => {
// iterate through events in each sequence
sequence.events?.forEach((event) => {
const label_name = event.label.name;
const meta = event.meta;
// ...
});
});
op
operator in conditions
includes the following options:
Operator | Descriptionn |
---|---|
"eq" | equal to |
"ne" | not equal to |
"gt" | greater than |
"lt" | less than |
"gte" | greater than or equal to |
"lte" | less than or equal to |