= sql_select :type: processor :status: stable :categories: ["Integration"] //// THIS FILE IS AUTOGENERATED! To make changes, edit the corresponding source file under: https://github.com/redpanda-data/connect/tree/main/internal/impl/. And: https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl //// // © 2024 Redpanda Data Inc. component_type_dropdown::[] Runs an SQL select query against a database and returns the result as an array of objects, one for each row returned, containing a key for each column queried and its value. Introduced in version 3.59.0. [tabs] ====== Common:: + -- ```yml # Common config fields, showing default values label: "" sql_select: driver: "" # No default (required) dsn: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60 # No default (required) table: foo # No default (required) columns: [] # No default (required) where: meow = ? and woof = ? # No default (optional) args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] # No default (optional) ``` -- Advanced:: + -- ```yml # All config fields, showing default values label: "" sql_select: driver: "" # No default (required) dsn: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60 # No default (required) table: foo # No default (required) columns: [] # No default (required) where: meow = ? and woof = ? # No default (optional) args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] # No default (optional) prefix: "" # No default (optional) suffix: "" # No default (optional) init_files: [] # No default (optional) init_statement: | # No default (optional) CREATE TABLE IF NOT EXISTS some_table ( foo varchar(50) not null, bar integer, baz varchar(50), primary key (foo) ) WITHOUT ROWID; conn_max_idle_time: "" # No default (optional) conn_max_life_time: "" # No default (optional) conn_max_idle: 2 conn_max_open: 0 # No default (optional) ``` -- ====== If the query fails to execute then the message will remain unchanged and the error can be caught using xref:configuration:error_handling.adoc[error handling methods]. == Examples [tabs] ====== Table Query (PostgreSQL):: + -- Here we query a database for columns of footable that share a `user_id` with the message `user.id`. A xref:components:processors/branch.adoc[`branch` processor] is used in order to insert the resulting array into the original message at the path `foo_rows`: ```yaml pipeline: processors: - branch: processors: - sql_select: driver: postgres dsn: postgres://foouser:foopass@localhost:5432/testdb?sslmode=disable table: footable columns: [ '*' ] where: user_id = ? args_mapping: '[ this.user.id ]' result_map: 'root.foo_rows = this' ``` -- ====== == Fields === `driver` A database <> to use. *Type*: `string` Options: `mysql` , `postgres` , `clickhouse` , `mssql` , `sqlite` , `oracle` , `snowflake` , `trino` , `gocosmos` . === `dsn` A Data Source Name to identify the target database. ==== Drivers :driver-support: mysql=certified, postgres=certified, clickhouse=community, mssql=community, sqlite=certified, oracle=certified, snowflake=community, trino=community, gocosmos=community The following is a list of supported drivers, their placeholder style, and their respective DSN formats: |=== | Driver | Data Source Name Format | `clickhouse` | https://github.com/ClickHouse/clickhouse-go#dsn[`clickhouse://[username[:password\]@\][netloc\][:port\]/dbname[?param1=value1&...¶mN=valueN\]`^] | `mysql` | `[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...¶mN=valueN]` | `postgres` | `postgres://[user[:password]@][netloc][:port][/dbname][?param1=value1&...]` | `mssql` | `sqlserver://[user[:password]@][netloc][:port][?database=dbname¶m1=value1&...]` | `sqlite` | `file:/path/to/filename.db[?param&=value1&...]` | `oracle` | `oracle://[username[:password]@][netloc][:port]/service_name?server=server2&server=server3` | `snowflake` | `username[:password]@account_identifier/dbname/schemaname[?param1=value&...¶mN=valueN]` | `trino` | https://github.com/trinodb/trino-go-client#dsn-data-source-name[`http[s\]://user[:pass\]@host[:port\][?parameters\]`^] | `gocosmos` | https://pkg.go.dev/github.com/microsoft/gocosmos#readme-example-usage[`AccountEndpoint=;AccountKey=[;TimeoutMs=\][;Version=\][;DefaultDb/Db=\][;AutoId=\][;InsecureSkipVerify=\]`^] |=== Please note that the `postgres` driver enforces SSL by default, you can override this with the parameter `sslmode=disable` if required. The `snowflake` driver supports multiple DSN formats. Please consult https://pkg.go.dev/github.com/snowflakedb/gosnowflake#hdr-Connection_String[the docs^] for more details. For https://docs.snowflake.com/en/user-guide/key-pair-auth.html#configuring-key-pair-authentication[key pair authentication^], the DSN has the following format: `@//?warehouse=&role=&authenticator=snowflake_jwt&privateKey=`, where the value for the `privateKey` parameter can be constructed from an unencrypted RSA private key file `rsa_key.p8` using `openssl enc -d -base64 -in rsa_key.p8 | basenc --base64url -w0` (you can use `gbasenc` insted of `basenc` on OSX if you install `coreutils` via Homebrew). If you have a password-encrypted private key, you can decrypt it using `openssl pkcs8 -in rsa_key_encrypted.p8 -out rsa_key.p8`. Also, make sure fields such as the username are URL-encoded. The https://pkg.go.dev/github.com/microsoft/gocosmos[`gocosmos`^] driver is still experimental, but it has support for https://learn.microsoft.com/en-us/azure/cosmos-db/hierarchical-partition-keys[hierarchical partition keys^] as well as https://learn.microsoft.com/en-us/azure/cosmos-db/nosql/how-to-query-container#cross-partition-query[cross-partition queries^]. Please refer to the https://github.com/microsoft/gocosmos/blob/main/SQL.md[SQL notes^] for details. *Type*: `string` ```yml # Examples dsn: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60 dsn: foouser:foopassword@tcp(localhost:3306)/foodb dsn: postgres://foouser:foopass@localhost:5432/foodb?sslmode=disable dsn: oracle://foouser:foopass@localhost:1521/service_name ``` === `table` The table to query. *Type*: `string` ```yml # Examples table: foo ``` === `columns` A list of columns to query. *Type*: `array` ```yml # Examples columns: - '*' columns: - foo - bar - baz ``` === `where` An optional where clause to add. Placeholder arguments are populated with the `args_mapping` field. Placeholders should always be question marks, and will automatically be converted to dollar syntax when the postgres or clickhouse drivers are used. *Type*: `string` ```yml # Examples where: meow = ? and woof = ? where: user_id = ? ``` === `args_mapping` An optional xref:guides:bloblang/about.adoc[Bloblang mapping] which should evaluate to an array of values matching in size to the number of placeholder arguments in the field `where`. *Type*: `string` ```yml # Examples args_mapping: root = [ this.cat.meow, this.doc.woofs[0] ] args_mapping: root = [ meta("user.id") ] ``` === `prefix` An optional prefix to prepend to the query (before SELECT). *Type*: `string` === `suffix` An optional suffix to append to the select query. *Type*: `string` === `init_files` An optional list of file paths containing SQL statements to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Glob patterns are supported, including super globs (double star). Care should be taken to ensure that the statements are idempotent, and therefore would not cause issues when run multiple times after service restarts. If both `init_statement` and `init_files` are specified the `init_statement` is executed _after_ the `init_files`. If a statement fails for any reason a warning log will be emitted but the operation of this component will not be stopped. *Type*: `array` Requires version 4.10.0 or newer ```yml # Examples init_files: - ./init/*.sql init_files: - ./foo.sql - ./bar.sql ``` === `init_statement` An optional SQL statement to execute immediately upon the first connection to the target database. This is a useful way to initialise tables before processing data. Care should be taken to ensure that the statement is idempotent, and therefore would not cause issues when run multiple times after service restarts. If both `init_statement` and `init_files` are specified the `init_statement` is executed _after_ the `init_files`. If the statement fails for any reason a warning log will be emitted but the operation of this component will not be stopped. *Type*: `string` Requires version 4.10.0 or newer ```yml # Examples init_statement: |2 CREATE TABLE IF NOT EXISTS some_table ( foo varchar(50) not null, bar integer, baz varchar(50), primary key (foo) ) WITHOUT ROWID; ``` === `conn_max_idle_time` An optional maximum amount of time a connection may be idle. Expired connections may be closed lazily before reuse. If `value <= 0`, connections are not closed due to a connections idle time. *Type*: `string` === `conn_max_life_time` An optional maximum amount of time a connection may be reused. Expired connections may be closed lazily before reuse. If `value <= 0`, connections are not closed due to a connections age. *Type*: `string` === `conn_max_idle` An optional maximum number of connections in the idle connection pool. If conn_max_open is greater than 0 but less than the new conn_max_idle, then the new conn_max_idle will be reduced to match the conn_max_open limit. If `value <= 0`, no idle connections are retained. The default max idle connections is currently 2. This may change in a future release. *Type*: `int` *Default*: `2` === `conn_max_open` An optional maximum number of open connections to the database. If conn_max_idle is greater than 0 and the new conn_max_open is less than conn_max_idle, then conn_max_idle will be reduced to match the new conn_max_open limit. If `value <= 0`, then there is no limit on the number of open connections. The default is 0 (unlimited). *Type*: `int`