DataWorks Data Integration supports reading data from and writing data to Doris. This topic describes the data synchronization capabilities for Doris in DataWorks.
Supported Doris versions
Doris Writer uses MySQL Driver 5.1.47. The following table lists the supported Doris versions. For more information about the driver capabilities, see the official Doris documentation.
Doris version | Supported |
0.x.x | Support |
1.1.x | Support |
1.2.x | Support |
2.x | Support |
Limits
Data Integration supports only offline writes to Doris.
Supported field types
Different Doris versions support different data types and aggregation models. For a complete list of field types for each Doris version, see the official Doris documentation. The following table describes the support for major Doris field types.
Type | Supported models | Doris version | Offline write (Doris Writer) |
SMALLINT | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x | Support |
INT | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x | Support |
BIGINT | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x | Supported |
LARGEINT | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x | Supported |
FLOAT | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x | Supported |
DOUBLE | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x | Supported |
DECIMAL | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x | Supported |
DECIMALV3 | Aggregate,Unique,Duplicate | 1.2.1+, 2.x | Supported |
DATE | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x | Supported |
DATETIME | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x | Supported |
DATEV2 | Aggregate,Unique,Duplicate | 1.2.x, 2.x | Supported |
DATATIMEV2 | Aggregate,Unique,Duplicate | 1.2.x, 2.x | Support |
CHAR | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x | Supported |
VARCHAR | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x | Supported |
STRING | Aggregate,Unique,Duplicate | 0.x.x, 1.1.x, 1.2.x, 2.x | Supported |
VARCHAR | Aggregate,Unique,Duplicate | 1.1.x, 1.2.x, 2.x | Supported |
ARRAY | Duplicate | 1.2.x, 2.x | Supported |
JSONB | Aggregate,Unique,Duplicate | 1.2.x, 2.x | Supported |
HLL | Aggregate | 0.x.x, 1.1.x, 1.2.x, 2.x | Supported |
BITMAP | Aggregate | 0.x.x, 1.1.x, 1.2.x, 2.x | Supported |
QUANTILE_STATE | Aggregate | 1.2.x, 2.x | Supported |
Implementation
Doris Writer uses the native StreamLoad feature of Doris to import data. Doris Writer caches the data read from the reader in memory, concatenates the data into a text format, and then imports the data in batches into the Doris database. For more information, see the official Doris documentation.
Prerequisites
Before you synchronize data in DataWorks, you must prepare the Doris environment as described in this topic. This ensures that the Doris data synchronization task can be configured and run properly in DataWorks. The following sections describe the required preparations.
Confirm the Doris version
Data Integration has version requirements for Doris. Check whether your Doris version is supported by referring to the Supported Doris versions section. You can download and install the required version from the official Doris website.
Create an account and configure permissions
You must create a logon account and set a password to connect to the data warehouse. If you want to use the default root user to log on, you must set a password for the root user because the root user does not have a password by default. You can run the following SQL statement in Doris to set the password:
SET PASSWORD FOR 'root' = PASSWORD('your_password')Configure network connectivity for Doris
To import data using StreamLoad, you must access the private endpoint of the frontend (FE) node. If you access the public endpoint of the FE node, you are redirected to the private IP address of a backend (BE) node (Data Operation FAQs). Therefore, you must establish network connectivity between Doris and the serverless resource group or exclusive resource group for Data Integration that you use. This allows access through the private endpoint. For more information about how to establish network connectivity, see Network connectivity solutions.
Add a data source
Before you develop a synchronization task in DataWorks, you must add the required data source to DataWorks by following the instructions in Data Source Management. You can view the infotips of parameters in the DataWorks console to understand the meanings of the parameters when you add a data source.
The following items describe the configuration parameters for a Doris data source:
JdbcUrl: Enter the JDBC connection string, which includes the IP address, port number, database, and connection parameters. Both public and private IP addresses are supported. If you use a public IP address, make sure that the Data Integration resource group can access the host where Doris resides.
FE endpoint: Enter the IP addresses and ports of the FE nodes. If your cluster has multiple FE nodes, you can enter the IP addresses and ports of multiple FE nodes. Separate them with commas (,), for example,
ip1:port1,ip2:port2. When you test the connectivity, all specified FE endpoints are tested.Username: Enter the username for the Doris database.
Password: Enter the password that corresponds to the username.
Develop a data synchronization task
For information about the entry point for and the procedure of configuring a synchronization task, see the following configuration guides.
For more information about the procedure, see Configure a task in the codeless UI and Configure a task in the code editor.
For a complete list of parameters and a script demo for the code editor, see Appendix: Script demo and parameters.
Appendix: Script demo and parameters
Configure a batch synchronization task by using the code editor
If you want to configure a batch synchronization task by using the code editor, you must configure the related parameters in the script based on the unified script format requirements. For more information, see Configure a batch synchronization task by using the code editor. The following information describes the parameters that you must configure for data sources when you configure a batch synchronization task by using the code editor.
Reader script demo
{
"type": "job",
"version": "2.0",// The version number.
"steps": [
{
"stepType": "doris",// The plugin name.
"parameter": {
"column": [// The column names.
"id"
],
"connection": [
{
"querySql": [
"select a,b from join1 c join join2 d on c.id = d.id;"
],
"datasource": ""// The data source name.
}
],
"where": "",// The filter condition.
"splitPk": "",// The shard key.
"encoding": "UTF-8"// The encoding format.
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "stream",
"parameter": {},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": "0"// The number of error records.
},
"speed": {
"throttle": true,// If you set throttle to false, the mbps parameter does not take effect, which indicates that no rate limit is imposed. If you set throttle to true, a rate limit is imposed.
"concurrent": 1,// The number of concurrent jobs.
"mbps": "12"// The rate limit. 1 mbps is equal to 1 MB/s.
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}Reader script parameters
Parameter | Description | Required | Default value |
datasource | The name of the data source. You can add a data source in the code editor. The value of this parameter must be the same as the name of the added data source. | Yes | None |
table | The name of the source table from which you want to synchronize data. A data integration task can read data from only one table. The following examples show the advanced usage of the table parameter to configure a range:
Note The task reads data from all matched tables. Specifically, it reads data from the columns specified by the column parameter. If a table or a specified column does not exist, the task fails. | Yes | None |
column | The columns in the source table from which you want to synchronize data. Use a JSON array to describe the field information. By default, all columns are used. Example:
| Yes | None |
splitFactor | The sharding factor. You can configure the number of shards for data synchronization. If you configure multiple concurrent threads, the data is sharded into a number of shards equal to Concurrency × splitFactor. For example, if Concurrency is 5 and splitFactor is 5, the data is sharded into 5 × 5 = 25 shards and processed by five concurrent threads. Note The recommended value range is 1 to 100. An excessively large value may cause an out-of-memory (OOM) error. | No | 5 |
splitPk | When Doris Reader extracts data, if you specify the splitPk parameter, the data is sharded based on the specified field. This enables concurrent data synchronization tasks and improves efficiency.
| No | None |
where | The filter condition. In business scenarios, you may want to synchronize only the data from the current day. In this case, you can set the where condition to
| No | None |
querySql (This parameter is available only in the code editor.) | In some business scenarios, the where parameter may not be sufficient to describe the filter conditions. You can use this parameter to customize the filter SQL statement. If you configure this parameter, the data synchronization system ignores the tables, columns, and splitPk parameters and uses the SQL statement specified by this parameter to filter data. For example, to join multiple tables before synchronizing data, use Note The querySql parameter is case-sensitive. For example, if you enter querysql, the parameter does not take effect. | No | None |
Writer script demo
{
"stepType": "doris",// The plugin name.
"parameter":
{
"postSql":// The SQL statement that is executed after the data synchronization task is run.
[],
"preSql":
[],// The SQL statement that is executed before the data synchronization task is run.
"datasource":"doris_datasource",// The data source name.
"table": "doris_table_name",// The table name.
"column":
[
"id",
"table_id",
"table_no",
"table_name",
"table_status"
],
"loadProps":{
"column_separator": "\\x01",// The column delimiter for the CSV format.
"line_delimiter": "\\x02"// The row delimiter for the CSV format.
}
},
"name": "Writer",
"category": "writer"
}Writer script parameters
Parameter | Description | Required | Default value |
datasource | The name of the data source. You can add a data source in the code editor. The value of this parameter must be the same as the name of the added data source. | Yes | None |
table | The name of the destination table to which you want to synchronize data. | Yes | None |
column | The columns in the destination table to which you want to write data. Separate the column names with commas (,). Example: | Yes | None |
preSql | The SQL statement that is executed before the data synchronization task is run. In the codeless UI, you can execute only one SQL statement. In the code editor, you can execute multiple SQL statements. For example, you can clear old data from a table before the task runs. | No | None |
postSql | The SQL statement that is executed after the data synchronization task is run. In the codeless UI, you can execute only one SQL statement. In the code editor, you can execute multiple SQL statements. For example, you can add a timestamp after the task runs. | No | None |
maxBatchRows | The maximum number of rows in each batch of imported data. This parameter and batchSize control the import size of each batch. When the data in a batch reaches either of the two thresholds, the import of that batch starts. | No | 500000 |
batchSize | The maximum data volume of each batch of imported data. This parameter and maxBatchRows control the import size of each batch. When the data in a batch reaches either of the two thresholds, the import of that batch starts. | No | 104857600 |
maxRetries | The number of retries after a batch import fails. | No | 3 |
labelPrefix | The label prefix for each uploaded file. The final label is a globally unique label composed of | No | datax_doris_writer_ |
loadProps | The request parameters for StreamLoad. This parameter is mainly used to configure the format of the imported data. By default, data is imported in CSV format. If you do not configure the loadProps parameter, the default CSV format is used. The column delimiter is If you want to import data in JSON format, use the following configuration. | No | None |
Script for aggregation types (Doris Writer writing to aggregation types)
Doris Writer supports writing data of aggregation types. However, you must perform additional configurations in the code editor. The following example shows the required configurations.
For example, consider the following Doris table. The uuid column is of the bitmap type (an aggregation type), and the sex column is of the HLL type (an aggregation type).
CREATE TABLE `example_table_1` (
`user_id` int(11) NULL,
`date` varchar(10) NULL DEFAULT "10.5",
`city` varchar(10) NULL,
`uuid` bitmap BITMAP_UNION NULL, -- Aggregation type
`sex` HLL HLL_UNION -- Aggregation type
) ENGINE=OLAP AGGREGATE KEY(`user_id`, `date`,`city`)
COMMENT 'OLAP' DISTRIBUTED BY HASH(`user_id`) BUCKETS 32Insert the following raw data into the table:
user_id,date,city,uuid,sex
0,T0S4Pb,abc,43,'54'
1,T0S4Pd,fsd,34,'54'
2,T0S4Pb,fa3,53,'64'
4,T0S4Pb,fwe,87,'64'
5,T0S4Pb,gbr,90,'56'
2,iY3GiHkLF,234,100,'54'When you use Doris Writer to write data of aggregation types, you must specify the columns in writer.parameter.column and configure the aggregate functions in writer.parameter.loadProps.columns. For example, you can use the bitmap_hash aggregate function for the uuid column and the hll_hash aggregate function for the sex column.
The following code provides a script example:
{
"stepType": "doris",// The plugin name.
"writer":
{
"parameter":
{
"column":
[
"user_id",
"date",
"city",
"uuid",// The bitmap aggregation type
"sex"// The HLL aggregation type
],
"loadProps":
{
"format": "csv",
"column_separator": "\\x01",
"line_delimiter": "\\x02",
"columns": "user_id,date,city,k1,uuid=bitmap_hash(k1),k2,sex=hll_hash(k2)"// You must specify the aggregate functions.
},
"postSql":
[
"select count(1) from example_tbl_3"
],
"preSql":
[],
"datasource":"doris_datasource",// The data source name.
"table": "doris_table_name",// The table name.
}
"name": "Writer",
"category": "writer"
}
}