Event Store Agent
import { EventAgent, Event, Search, Pattern } from "@fstnetwork/loc-logic-sdk";
Emit and query LOC data events.
Once emitted, events are stored and can be searched in Elasticsearch in a very short time. They will be generated to data lineage graphs in Studio.
The data lineage or data trail is represented by the relationship of the graph below:
Availability
- ✓ Generic logic
- ✗ Aggregator logic
Emit Events
async EventAgent.emit(events: Event.Event[]): Promise<void>
Emit event(s). The parameter events
is an array of events.
Event Schema
Type Event.Event
has the following fields:
Field | Type | Description |
---|---|---|
labelName | string | Label name (event name) |
sourceDigitalIdentity or sourceDID | string | Source digital identity (DID) |
targetDigitalIdentity or targetDID | string | Target DID |
meta | string | Meta payload (additional data); max length 215 (32768) bytes. |
type | string | Event type (group) |
The input parameter/value of a would-be event is also referred as event schema.
For now type
only supports "default"
.
Elements of events
does not have to be Event.Event
type, but an error would be thrown if label
, sourceDID
or targetDID
field is not present.
Source and target nodes will be created in Elasticsearch if not exist. Any nodes can be both source and target of other events.
Example
- JavaScript
- TypeScript
await EventAgent.emit([
{
// event 1
labelName: "Event name 1",
sourceDID: "Event source 1",
targetDID: "Event target 1",
meta: "",
type: "default",
},
{
// event 2
labelName: "Event name 2",
sourceDID: "Event source 2",
targetDID: "Event target 2",
meta: "",
type: "default",
},
// ...
]);
import { ..., Event } from "@fstnetwork/loc-logic-sdk";
await EventAgent.emit([
{
// event 1
labelName: "Event name 1",
sourceDID: "Source DID 1",
targetDID: "Target DID 1",
meta: "",
type: "default",
},
{
// event 2
labelName: "Event name 2",
sourceDID: "Source DID 2",
targetDID: "Target DID 2",
meta: "",
type: "default",
},
// ...
] as Event.Event[]);
The events may not be properly emitted without using await
.
You can also use JSON.stringify()
to include a JSON object in the meta payload, and later decode it with JSON.parse()
.
Query Events
Query event(s) in Elasticsearch.
async EventAgent.search(request: Search): Promise<SearchResult>
Parameter request
is of type Search
and the function returns type SearchResult
.
Search Parameter
Type: Search
Member | Type | Description |
---|---|---|
queries? | Query[] | null | Event query conditions |
excludes? | Query[] | null | Event exclude conditions |
filters? | Filter[] | null | Event filter conditions |
sort? | Sort[] | null | Event sort operator |
aggregation? | Aggregation | null | Aggregation syntax |
from? | number | null | Event query starts from |
size? | number | null | Event query size |
- type Query
- type Filter
- type Sort
type Query =
| {
field: string;
type: "match";
value: string;
}
| {
field: string;
type: "match_phrase";
value: string;
}
| {
field: string;
type: "term";
value: string;
};
type Filter =
| {
field: string;
gte?: number | null;
lte?: number | null;
type: "range";
}
| {
field: string;
type: "wildcard";
value: string;
};
interface Sort {
field: string;
order: SortOrder;
}
type SortOrder = "asc" | "desc";
See the examples below for explaining these parameters.
List of available query fields
Name | Description |
---|---|
data_process_permanent_identity | Data process permanent ID |
data_process_name | Data process name |
data_process_revision | Data process revision number |
logic_name | Logic name |
logic_permanent_identity | Logic permanent ID |
logic_revision | Logic revision number |
execution_id | Execution ID |
task_id | Task ID |
sequence | Event sequence number |
label_id | label ID |
label_name | Label name (labelName in emit) |
source_digital_identity | Source DID (sourceDID in emit ) |
target_digital_identity | Target DID (targetDID in emit ) |
type | Type (type in emit ) |
timestamp | Event emitted time (unix timestamp) |
All fields are optional. aggregation
is the syntax for getting metrics, statistics, or other analytics from Elasticsearch, which is an advanved feature that we will not demostrate here.
Search Result
Type: SearchResult
Member | Type | Description |
---|---|---|
events | Event[] | Queried events |
count | number | Number of events to be queried (size parameter from Search ) |
total | number | Actual queried number of events |
took | number | Query time (milllisecond seconds) |
aggregation? | AggregationResult | null | Aggregation results |
count
and total
are similar metrics from Elasticsearch using different APIs; you can ignore them and simply use events.length
instead.
Queried Events
Type: Event
An event in events
is type of Event
(different from the one used in emit
):
Member | Type | Description |
---|---|---|
dataProcessIdentityContext | VersionedIdentityContext | Data process ID and name |
logicIdentityContext | VersionedIdentityContext | Logic identity ID and name |
executionId | string | Execution ID |
taskId | string | Task ID |
sequence | number | Event sequence (the emit order in an array, started from 0 ) |
label | Label , which is { id: string; name: string; } | Event label ID and name |
sourceDigitalIdentity | string | Source DID |
targetDigitalIdentity | string | Target DID |
meta | string | Meta payload |
type | string | Event group |
timestamp | string | Event emitted datetime (ISO 8601 string) |
Each queried event, other than the basic fields, also contains info about the execution, task, logic and data process where it was emitted.
Example: query events
- JavaScript
- TypeScript
const requests = {
queries: [
{
field: "label_name", // field name
type: "match", // matching operater
value: "your event name", // value
},
// match condition 2...
],
excludes: [],
filters: [],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
};
const query = await EventAgent.search(requests);
const events = query?.events;
// iterate through events
events.forEach((event) => {
const label_name = event.label.name;
const meta = event.meta;
// ...
});
import { ..., Search } from "@fstnetwork/loc-logic-sdk";
const requests: Search = {
queries: [
{
field: "label_name", // field name
type: "match", // querying method
value: "your event name", // value
},
// match condition 2...
],
excludes: [],
filters: [],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
};
const query = await EventAgent.search(requests);
const events = query?.events;
// iterate through events
events.forEach((event) => {
const label_name = event.label.name;
const meta = event.meta;
// ...
});
type
defines how should the value be used to query the specified field:
Query type | Description |
---|---|
"match" | Any word in the field matches words in your value. Standard full-text search. Suitable for most use cases. |
"term" | Field matches exactly your value. |
"match_phrase" | Words and their order in the field matches words in your value. (For example, value "has been" matches field it has been raining .) |
These querying methods are directly from Elasticsearch: match, term and match_phrase.
Events require a little bit of time to be stored into Elasticsearch. If you query events almost immediately after they are emitted, EventAgent.search
may return an empty result.
One of the workaround is to use a timed loop:
let events = [];
const start = Date.now();
let now = Date.now();
// wait as long as 30 seconds to query events
do {
const query = await EventAgent.search({
queries: [
{
field: "label_name",
type: "match",
value: "label name",
},
],
excludes: [],
filters: [],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
});
events = query?.events;
now = Date.now();
} while (events.length == 0 || now - start < 30000);
The example above will keep query events until something is returned or the time exceeds 30 seconds.
Example: exclude events
- JavaScript
- TypeScript
const query = await EventAgent.search({
queries: [],
excludes: [
{
// exclude condition 1
field: "source_digital_identity",
type: "match",
value: "your source DID",
},
// match condition 2...
],
filters: [],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
});
const events = query?.events;
import { ..., Search } from "@fstnetwork/loc-logic-sdk";
const query = await EventAgent.search({
queries: [],
excludes: [
{
// exclude condition 1
field: "source_digital_identity",
type: "match",
value: "your source DID",
},
// exclude condition 2...
],
filters: [],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
} as Search);
const events = query?.events;
Example: filter events (range)
If a field of certain events is numeric data, you can apply a filter range:
- JavaScript
- TypeScript
const query = await EventAgent.search({
queries: [],
excludes: [],
filters: [
{
// filter condition 1
field: "target_digital_identity", // field name
gte: 9000, // value greater than or equal to
lte: null, // value smaller than or equal to
type: "range",
},
// filter condition 2...
],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
});
const events = query?.events;
import { ..., Search } from "@fstnetwork/loc-logic-sdk";
const query = await EventAgent.search({
queries: [],
excludes: [],
filters: [
{
// filter condition 1
field: "target_digital_identity", // field name
gte: 9000, // value greater than or equal to
lte: null, // value smaller than or equal to
type: "range",
},
// filter condition 2...
],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
} as Search);
const events = query?.events;
Both gte
and lte
fields are optional and can be set to either a number
or null
. The example above will query events that target_digital_identity
>= 9000.
When filtering events within a time range with timestamp
field, convert the time to unix timestamp. For example:
filters: [ // filter events for the past hour
{
field: "timestamp",
gte: Date.now() - 60 * 60 * 1000, // starts from 1 hour ago (= 60 * 60 * 1000 ms)
lte: Date.now(),
type: "range",
}
],
Example: filter events (wildcard)
filters
can apply a wildcard search on string names as well, using the following wildcard operators:
- JavaScript
- TypeScript
const query = await EventAgent.search({
queries: [],
excludes: [],
filters: [
{
// filter condition 2...
field: "target_digital_identity", // field name
type: "wildcard",
value: "some?name*", // wildcard value
},
// filter condition 2...
],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
});
const events = query?.events;
Wilcard operator | Description |
---|---|
? | Representing any single character |
* | Representing zero or more characters, including an empty one |
For example, event-?-*
matches event-A-1
and event-B-123
, and so on.
import { ..., Search } from "@fstnetwork/loc-logic-sdk";
const query = await EventAgent.search({
queries: [],
excludes: [],
filters: [
{
// filter condition 2...
field: "target_digital_identity", // field name
type: "wildcard",
value: "some?name*", // wildcard value
},
// filter condition 2...
],
sorts: [],
aggregation: null,
from: 0,
size: 1000,
} as Search);
const events = query?.events;
Example: sort events
- JavaScript
- TypeScript
const query = await EventAgent.search({
queries: [],
excludes: [],
filters: [],
sorts: [
{
// sort condition 1
field: "source_digital_identity",
order: "desc",
},
// sort condition 2...
],
aggregation: null,
from: 0,
size: 1000,
});
const events = query?.events;
import { ..., Search } from "@fstnetwork/loc-logic-sdk";
const query = await EventAgent.search({
queries: [],
excludes: [],
filters: [],
sorts: [
{
// sort condition 1
field: "source_digital_identity",
order: "desc",
},
// sort condition 2...
],
aggregation: null,
from: 0,
size: 1000,
} as Search);
const events = query?.events;
order
can be
"asc"
(ascending order)"desc"
(descending order)
This also works for both numeric data or non-numeric strings (sorting alphabetically).
Query Event Sequences
Search sequence of events. The first event has to satisfy first search condition, and so on...
async EventAgent.searchWithPattern(request: Pattern): Promise<PatternResult>
Sequence Search Parameter
Type: Pattern
Member | Type | Description |
---|---|---|
sequences | Sequence[] | Sequence of conditions |
filter? | Filter[] | Filter conditions (see here) |
maxSpan? | string | Search time span (for example, 30s = 30 secs and 15m = 15 mins) |
Sequence Parameter
Type: Sequence
Member | Type | Description |
---|---|---|
conditions? | Condition[] | null | Sequence query conditions |
sharedFields? | string[] | null | |
type? | string | null |
The available field names in conditions?
are the same as search()
. See the example for details.
Sequence Search Result
Type: PatternResult
Member | Type | Description |
---|---|---|
sequences | SequencesResult[] | Sequence of queried events |
count | number | Number of events to be queried |
total | number | Actual queried number of events |
took | number | Query time (milllisecond seconds) |
Returned Sequence
Type: SequencesResult
PatternResult
contains an array of such sequences, each sequence would contain one or more events:
Member | Type | Description |
---|---|---|
events | Event[] | Queried events |
joinKeys | string[] |
Example
- JavaScript
- TypeScript
// create sequence search pattern
const query = await EventAgent.searchWithPattern({
sequences: [
// must have at least two event conditions!
{
// sequence 1 event condition
conditions: [
{
field: "label_name", // field name
op: "eq", // operator
value: "label name", // value
},
],
sharedFields: [],
type: "any",
},
{
// sequence 2 event condition
conditions: [
{
field: "source_digital_identity",
op: "gt",
value: "source DID",
},
{
field: "target_digital_identity",
op: "lt",
value: "target DID",
},
],
sharedFields: [],
type: "any",
},
],
filter: null,
maxSpan: null,
});
const sequences = query?.sequences;
// iterate through sequences
sequences.forEach((sequence) => {
// iterate through events in each sequence
sequence.events?.forEach((event) => {
const label_name = event.label.name;
const meta = event.meta;
// ...
});
});
import { ..., Pattern } from "@fstnetwork/loc-logic-sdk";
// create sequence search pattern
const query = await EventAgent.searchWithPattern({
sequences: [
// must have at least two event conditions!
{
// sequence 1 event condition
conditions: [
{
field: "label_name", // field name
op: "eq", // operator
value: "label name", // value
},
],
sharedFields: [],
type: "any",
},
{
// sequence 2 event condition
conditions: [
{
field: "source_digital_identity",
op: "gt",
value: "source DID",
},
{
field: "target_digital_identity",
op: "lt",
value: "target DID",
},
],
sharedFields: [],
type: "any",
},
],
filter: null,
maxSpan: null,
} as Pattern);
const sequences = query?.sequences;
// iterate through sequences
sequences.forEach((sequence) => {
// iterate through events in each sequence
sequence.events?.forEach((event) => {
const label_name = event.label.name;
const meta = event.meta;
// ...
});
});
op
operator in conditions
includes the following options:
Operator | Descriptionn |
---|---|
"eq" | equal to |
"ne" | not equal to |
"gt" | greater than |
"lt" | less than |
"gte" | greater than or equal to |
"lte" | less than or equal to |