You can find here a recipe with an hello-world example which will :
- Show you how to connect to the API
- How to use the streams API
- Upload an example dataset as a stream
- Get the stream definition schema
- How to (basically) use the Query API on this example dataset
It's a request builder to get data from a stream
Supports multiple "modes":
- listing all records matching a criteria
- performing ad-hoc aggregations over records
- retrieving a distinct_on of records at a specific time, distinct on a set of fields
This reuses all the principles of a SQL query and can be considered as a (very) simplified layer above the data. A basic understanding of the main SQL principles would be helpful to understand its capacities, especially because we'll use SQL as a guideline to explain the different principles.
With this documentation is also provided a set of examples which contain :
- A raw data example of the stream we are querying
- A sentence explaining what we're trying to achieve with this query
- The full payload of the sentence
- An extract of the response sent by the API :
- schema of the columns response
- the first row result
You can find it HERE
The API already provides the stream definition of the columns on different endpoints ( GET zone/{zone_id}/streams/, GET zone/{zone_id}/streams/{stream_id} ). This stream definition gives us information about data present in the stream :
- the column name
- the min/max of the column
- the number of rows
- the data type
Here is a stream definition example
"stream_fields": {
"additionalProp1": {
"index": "primary",
"extents": {
"min": 2345,
"max": 6789
},
"count": 0,
"data_type": "number"
},
...
},
With this stream definition, you have the necessary informations to build a request to answer questions you have on the data.
A payload for the query endpoint ( POST zone/{zone_id}/streams/query ) is composed like this :
{
"fields": [],
"aggregation": {
"time": {
"resolution": "minute",
"field": "string",
"label": "string"
},
"operations": []
},
"computed_fields": [],
"distinct_on": {
"fields": [],
"order_by": [],
"where": []
},
"joins": [],
"where": [],
"order_by": [],
"limit": 50000,
"sample": null
}
Let's discover what's the use case for each field
-
fields
-
It is considered as a simple select in SQL to choose the fields to return.
-
If none of the attributes fields, aggregation or computed_fields are provided in the payload request, it will select all fields in the stream by default
{
"fields": [
"start_name",
"end_name",
"geography",
]
...
}
-
SQL translation :
select start_name,end_name,geography from stream_records
-
aggregation
-
It's also a select in SQL, but it is used to compute aggregations on 1 specific field : sum, average, count ...
-
Specifities :
count does not need a field key associated with it, it will count all rows
- All the other aggregations can be performed on fields which have a
number type (We obviously can't make an average or a sum on a string)
-
There's no need to supply another value in the fields key, it will be merged as a single select
-
If you want to order by an aggregated_field, you'll have to provide the label property that will be used in the order_by field.
-
It can contain :
- A list of operations that will be applied on fields :
{
"aggregation": {
"operations": [
{
"field": "od_count",
"type": "sum"
}
]
}
...
},
- SQL translation :
```sql
select sum(od_count) from stream_records
```
- the time/date to group by the values on, with the possibility to change the resolution of the selected field date
{
"aggregation": {
"time": {
"resolution": "month",
"field": "OCCURRED_ON_DATE"
},
"operations": [
{
"type": "count"
}
]
},
...
}
- SQL translation :
```sql
select DATE_TRUNC(MONTH,OCCURED_ON_DATE)::TIMESTAMP_NTZ as "time" from stream_records
```
-
computed_fields
- It's also a
select in SQL, but it's used to make operations between fields or values. Here, we don't want to aggregate fields based on an aggregation operation, but we simply want to make simple computations between 2 fields. In the opposite, the aggregation performs on only 1 field.
- There's no need to supply another value in the
fields key, it will be merged as a single select
- It can be quite powerful because it can be used in a nested way. You can make any nested computations that you want, the parser uses recursivity to interpret correctly the final operation
- If you want to order by an aggregated_field, you'll have to provide the
label property that will be used in the order_by field.
{
"computed_fields": [
{
"fields": [
{
"fields" : [
{
"fields": [
"end_time",
"start_time"
],
"type": "minus"
},
{
"fields": [
10,
100
],
"type": "multiply"
}
],
"type": "divide"
},
99
],
"type": "plus",
"label": "trip_duration_recomputed"
},
...
}
select ((end_time - start_time) / (10 * 100)) + 99 as "trip_duration_recomputed" from stream_records
-
where
- Used to filter the rows based on the values of the fields. It's made of one field to compare (multiple operators available) with a value/another field
- ** WARNING : It's mandatory to provide at least one
where condition with the stream_primary_key in it, to filter it **
{
"where": [
{
"field": "stream_primary_key",
"condition": "equals",
"value": "1111111-b0ff-4610-986f-bf3a25154971"
},
{
"field": "time",
"condition": "greater_than",
"value": "2022-03-20"
}
{
"field": "start_date",
"condition": "equals",
"value": {
"source": "end_date"
}
}
],
...
}
- SQL Translation
```sql
from table stream_records
where
stream_primary_key = '1111111-b0ff-4610-986f-bf3a25154971'
and
time > "2022-03-20"
and
start_date = end_date -- check rows where the events happened in the same day, for example
```
-
order_by
- The fields to order by the results on. It can be cumulative (order first with the field
x then y then ...)
- You can also provide how to order it : from highest value to lowest (
ascending:false) or from lowest value to highest (ascending:true)
- You can order by a simple field
{
"order_by": [
{
"field": "index",
"ascending": false
}
],
...
}
order by index desc
- And you can order by a computed/aggregated field but the fields will need to have a
label associated with them.
{
"order_by": [
{
"label": "computed_field",
"ascending": false
}
],
...
}
order by computed_field desc
-
limit
- Number of rows maximum that will be returned in the output
{
"limit": 10,
...
}
SQL translation :
sql LIMIT 10
-
sample
- Data will be queried on
n rows chosen randomly in the stream
- To use with precaution, it will use only a sample of the raw data to make the computation so the query results will only represent a subset of the truth, to use when no need for absolute precision.
{
"sample": 500
...
}
SQL translation :
sql FROM table stream_records SAMPLE(500 ROWS)
-
distinct_on
- You can select some fields to be unique in the output and decide on what criteria the specific row should be chosen by reusing 2 concepts already seen :
- You will :
- select the rows in the dataset with unique
vehicle id (the same vehicle id won't appear twice)
- How to choose the row which should be selected to appear ? the most recent one in the dataset, for example, with the
order_by clause
- Filter : where the event_type in the row is not removed (
where).
- Concept explained here
{
"distinct_on": {
"fields": [
"vehicle_id"
],
"order_by": [
{
"ascending": false,
"field": "time"
}
],
"where": [
{
"condition": "not_equals",
"field": "event_type",
"value": "removed"
}
]
},
...
}
- SQL translation :
QUALIFY
ROW_NUMBER() OVER (
PARTITION BY
vehicle_id
ORDER BY
time desc
) = 1
AND event_type != removed
-
join
- Enables you to join your main source of data with other streams, by joining 2 columns together. You can also filter on values in the dataset to be joined on.
- Like in the
where param, it's mandatory to set up the stream_primary_key of the stream you want to join_on.
- You can join as many streams as you want but be careful, the query might be slower to perform
join_on is a list of where conditions. You'll use these conditions to choose which fields needs to be joined together, and you'll refer to the main stream with the source param.
- The
label field will name the second stream, and we'll use it to refer to the field owned by the second stream, like we'll see in the next example
{
"joins": [
{
"join_on": [
{
"field": "stream_primary_key",
"condition": "equals",
"value": "5eee21fc-aac8-43f3-8ac0-7d2d19b9ccdf"
},
{
"field": "name",
"condition": "equals",
"value": {
"source": "start_name"
}
}
{
"field": "tag",
"condition": "in_",
"value": [
"district"
]
}
],
"label": "districts"
},
...
}
- SQL translation :
```sql
FROM stream_records
INNER JOIN on stream_records as districts
ON B.stream_primary_key = "5eee21fc-aac8-43f3-8ac0-7d2d19b9ccdf"
AND A.start_name = B.name
AND B.tag in ('district')
```
- To select a field owned by the second stream as a colum of the output results, you'll have to use a specific syntax by reusing the label set up (You can also reuse this syntax in
order_by/where/aggregation/computed fields) :
{
"fields": [
"start_name",
"end_name",
[
"join",
"districts",
"geography"
],
],
... ,
"order_by" : [
[
"join",
"districts",
"name"
],
]
}
- SQL translation
```sql
SELECT start_name, end_name, districts.geography
ORDER BY districts.name
```