Connectors

Connectors that you can use

IO

Read and write data through standard input and output.

keyaliasDescriptionDefault ValuePossible Values
type-Required in order to use this connectorioio
metadatametaOverride metadata informationnoneMetadata

examples:

[
    {
        "type": "reader",
        "connector":{
            "type": "io"
        }
    }
]

In Memory

Read and write data through memory. You can use this connector if you want inject constant in your flow.

keyaliasDescriptionDefault ValuePossible Values
type-Required in order to use this connectorin_memoryin_memory / mem
metadatametaOverride metadata informationnoneMetadata
memoryvalue / doc / dataMemory valuenoneString

examples:

[
    {
        "type": "reader",
        "connector":{
            "type": "in_memory",
            "memory": "{\"username\": \"{{ MY_USERNAME }}\",\"password\": \"{{ MY_PASSWORD }}\"}"
        }
    }
]

Local

Read and write data in local file. It is possible to read multifiles with wildcard. If you want to write dynamicaly in different files, use the mustache variable that will be replace with the data in input.

keyaliasDescriptionDefault ValuePossible Values
type-Required in order to use this connectorlocallocal
metadatametaOverride metadata informationnoneMetadata
path-Path of a file or list of files. Allow wildcard charater * and mustache variablesnoneString
parameters-Variable that can be use in the path. Parameters of the connector is merge with the current datanoneList of key and value

examples:

[
    {
        "type": "reader",
        "connector":{
            "type": "local",
            "path": "./{{ folder }}/*.json",
            "metadata": {
                "content-type": "application/json; charset=utf-8"
            },
            "parameters": {
                "folder": "my_folder"
            }
        }
    }
]

Curl

Read and write data through http(s) connector.

keyaliasDescriptionDefault ValuePossible Values
type-Required in order to use this connectorcurlcurl
metadatametaOverride metadata informationnoneMetadata
authenticatorauthDefine the authentification that secure the http(s) callnoneAuthenticator
endpoint-The http endpoint of the urlnoneString
pathuriThe path of the resourcenoneString
method-The http method to usegetHTTP methods
headers-The http headers to overridenoneList of key/value
parameters-Parameters used in the path that can be overridenoneObject or Array of objects
limit-Limit value used by the pagination100unsigned number
skip-Skip value used by the pagination0unsigned number
paginator-Paginator parametersnonePaginator

examples:

[
    {
        "type": "read",
        "connector":{
            "type": "curl",
            "endpoint": "{{ CURL_ENDPOINT }}",
            "path": "/post?skip={{ SKIP }}&limit={{ LIMIT }}&cache={{ cache }}",
            "method": "post",
            "authenticator": {
                "type": "basic",
                "username": "{{ BASIC_USERNAME }}",
                "password": "{{ BASIC_PASSWORD }}",
            },
            "headers": {
                "Accept": "application/json"
            },
            "parameters": [
                { "cache": false }
            ],
            "limit": 1000,
            "skip": 0,
            "paginator": {
                "limit": "LIMIT",
                "skip": "SKIP"
            }
        },
    }
]

Paginator

Use to override the default configuration

keyaliasDescriptionDefault ValuePossible Values
limit_namelimitName of the field limit used in query parameterlimitString
skip_nameskipName of the field skip used un query parameterskipString

examples:

[
    {
        "type": "write",
        "connector":{
            "type": "curl",
            "endpoint": "{{ CURL_ENDPOINT }}",
            "path": "/resource",
            "method": "get",
            "paginator": {
                "limit": "my_limit_field",
                "skip": "my_skip_field"
            }
        },
    }
]

Authenticator

Basic

keyaliasDescriptionDefault ValuePossible Values
type-Required in order to use this authenticationbasicbasic
usernameuser / usrUsername to use for the authentificationnoneString
passwordpass / pwdPassword to use for the authentificationnoneString

examples:

[
    {
        "type": "read",
        "connector":{
            "type": "curl",
            "endpoint": "{{ CURL_ENDPOINT }}",
            "path": "/get",
            "method": "get",
            "authenticator": {
                "type": "basic",
                "username": "{{ BASIC_USERNAME }}",
                "password": "{{ BASIC_PASSWORD }}",
            }
        },
    }
]

Bearer

keyaliasDescriptionDefault ValuePossible Values
type-Required in order to use this authenticationbearerbearer
token-The bearer tockennoneString
is_base64-Specify if the bearer token is encoded in base64falsefalse / true
parameters-Use to replace the token with dynamic value in input from the connectornoneList of Key/Value string

examples:

[
    {
        "type": "write",
        "connector":{
            "type": "curl",
            "endpoint": "{{ CURL_ENDPOINT }}",
            "path": "/post",
            "method": "post",
            "authenticator": {
                "type": "bearer",
                "token": "{{ token }}",
                "is_base64": false,
                "parameters": {
                    "token": "my_token"
                }
            }
        },
    }
]

(JWT) Java Web Token

keyaliasDescriptionDefault ValuePossible Values
type-Required in order to use this authenticationjwtjwt
algorithmalgoThe algorithm used to build the signingHS256String
refresh_connector-The connector used to refresh the tokennoneSee Connectors
refresh_token-The token name used to identify the token into the refresh connectortokenString
jwk-The Json web key used to signnoneObject
format-Define the type of the key used for the signingsecretsecret / base64secret / rsa_pem / rsa_components / ec_pem / rsa_der / ec_der
key-Key used for the signingnoneString
payload-The jwt payloadnoneObject or Array of objects
parameters-The parameters used to remplace variables in the payloadnoneObject or Array of objects
token-The token that can be override if necessarynoneString

examples:

[
    {
        "type": "write",
        "connector":{
            "type": "curl",
            "endpoint": "{{ CURL_ENDPOINT }}",
            "path": "/post",
            "method": "post",
            "authenticator": {
                "type": "jwt",
                "refresh_connector": {
                    "type": "curl",
                    "endpoint": "http://my_api.com",
                    "path": "/tokens",
                    "method": "post"
                },
                "refresh_token":"token",
                "key": "my_key",
                "payload": {
                    "alg":"HS256",
                    "claims":{"GivenName":"Johnny","username":"{{ username }}","password":"{{ password }}","iat":1599462755,"exp":33156416077}
                },
                "parameters": {
                    "username": "my_username",
                    "password": "my_username"
                }
            }
        },
    }
]

Mongodb

Read and write data into mongodb database.

keyaliasDescriptionDefault ValuePossible Values
type-Required in order to use this connectormongodbmongodb / mongo
endpoint-Endpoint of the connectornoneString
databasedbThe database namenoneString
collectioncolThe collection namenoneString
query-Query to find an element into the collectionnoneObject
find_optionsprojectionSpecifies the fields to return in the documents that match the query filter. To return all fields in the matching documents, omit this parameter. For details, see ProjectionnoneObject
update_options-Options apply during the update)noneObject

examples:

[
    {
        "type": "w",
        "connector":{
            "type": "mongodb",
            "endpoint": "mongodb://admin:admin@localhost:27017",
            "db": "tests",
            "collection": "test",
            "update_options": {
                "upsert": true
            }
        },
        "thread_number":3
    }
]

Bucket

Read and write data into S3/Minio bucket.

keyaliasDescriptionDefault ValuePossible Values
type-Required in order to use this connectorbucketbucket
metadatametaOverride metadata informationnoneMetadata
endpoint-Endpoint of the connectornoneString
access_key_id-The access key used for the authentificationnoneString
secret_access_key-The secret access key used for the authentificationnoneString
region-The bucket's regionus-east-1String
bucket-The bucket namenoneString
pathkeyThe path of the resource. Can use * in order to read multiple filesnoneString
parametersparamsThe parameters used to remplace variables in the pathnoneObject or Array of objects
limit-Limit the number of files to read.noneUnsigned number
skip-Skip N files before to start to read the next filesnoneUnsigned number
version-Read a specific version of a filenoneString
tags-List of tags to apply on the file. Used to give more context to a file(service:writer:name,chewdata)List of Key/Value
cache_control-Override the file cache controlenoneString
expires-Override the file expire datenoneString

examples:

[
    {
        "type": "r",
        "connector": {
            "type": "bucket",
            "bucket": "my-bucket",
            "path": "data/*.json*",
            "endpoint":"{{ BUCKET_ENDPOINT }}",
            "access_key_id": "{{ BUCKET_ACCESS_KEY_ID }}",
            "secret_access_key": "{{ BUCKET_SECRET_ACCESS_KEY }}",
            "region": "{{ BUCKET_REGION }}",
            "limit": 10,
            "skip": 0,
            "tags": {
                "service:writer": "my_service",
                "service:writer:owner": "my_team_name",
                "service:writer:env": "dev",
                "service:writer:context": "example"
            }
        }
    },
]

Bucket Select

Filter data file with S3 select queries and read data into AWS/Minio bucket. Use Bucket connector in order to write into the bucket.

keyaliasDescriptionDefault ValuePossible Values
type-Required in order to use this connectorbucketbucket
metadatametaOverride metadata informationnoneMetadata
endpoint-Endpoint of the connectornoneString
access_key_id-The access key used for the authentificationnoneString
secret_access_key-The secret access key used for the authentificationnoneString
region-The bucket's regionus-east-1String
bucket-The bucket namenoneString
pathkeyThe path of the resource. Can use * in order to read multiple files with the same content typenoneString
parametersparamsThe parameters used to remplace variables in the pathnoneObject or Array of objects
query-S3 select queryselect * from s3objectSee AWS S3 select
limit-Limit the number of files to read with the wildcard mode in the pathnoneUnsigned number
skip-Skip N files before to start to read the next files with the wildcard mode in the pathnoneUnsigned number

examples:

[
    {
        "type": "r",
        "connector": {
            "type": "bucket_select",
            "bucket": "my-bucket",
            "path": "data/my_file.jsonl",
            "endpoint": "{{ BUCKET_ENDPOINT }}",
            "access_key_id": "{{ BUCKET_ACCESS_KEY_ID }}",
            "secret_access_key": "{{ BUCKET_SECRET_ACCESS_KEY }}",
            "region": "{{ BUCKET_REGION }}",
            "query": "select * from s3object[*].results[*] r where r.number = 20"
        },
        "document" : {
            "type": "jsonl"
        }
    }
]

Metadata

By default, the metadata is manage with the document Metadata but you can override it if necessary.