ClickHouse utilizes a unique clustered index, meaning that data within a Part is ordered according to the order by keys. In the entire query plan, if an operator can effectively utilize the orderliness of the input data, the execution performance of the operator will see significant improvements. This article discusses the optimization methods for query operators in ClickHouse based on indexes.

In the entire query plan, the operators Sort, Distinct, and Aggregation have the following characteristics compared to other operators such as filtering and projection: 1. The operator needs to maintain state in memory, leading to high memory costs; 2. The operator has high computational costs; 3. The operator will block the execution pipeline, only outputting data downstream after all computations are complete. Therefore, these operators are often the bottlenecks in the entire query.

This article discusses in detail the impact on computation, memory, and pipeline blocking before and after the query optimization of the three operators based on indexes.

Last updated: 2024-04-08

**Preparation Before Experiments**

The subsequent discussion is primarily based on experiments.

```
CREATE TABLE test_in_order
(
`a` UInt64,
`b` UInt64,
`c` UInt64,
`d` UInt64
)
ENGINE = MergeTree
ORDER BY (a, b);
```

There are a total of 3 parts in the table, with each part containing 4 rows of data.

P.S.: Users can disable background merges before inserting data to prevent parts from being merged into one. If parts are merged into one, it will affect the query parallelism, which might impact the experiment. The following query can be used to disable background merges: `system stop merges test_in_order`

.

## 1. Sort Operator

If the `ORDER BY`

field in an `ORDER BY`

query matches the prefix columns of the table’s `ORDER BY`

keys, the query can be optimized based on the ordered nature of the data.

### 1.1 Implementation of the Sort Operator

First, let’s look at scenarios where the primary key’s order cannot be utilized, i.e., when the `ORDER BY`

fields in the query do not match the prefix columns of the table’s `ORDER BY`

keys. For example, consider the following query:

`query_1: EXPLAIN PIPELINE SELECT b FROM read_in_order ORDER BY b ASC`

The query plan is as following:

```
┌─explain───────────────────────────────┐
│ (Expression) │
│ ExpressionTransform │
│ (Sorting) │
│ MergingSortedTransform 3 → 1 │
│ MergeSortingTransform × 3 │
│ LimitsCheckingTransform × 3 │
│ PartialSortingTransform × 3 │
│ (Expression) │
│ ExpressionTransform × 3 │
│ (ReadFromMergeTree) │
│ MergeTreeThread × 3 0 → 1 │
└───────────────────────────────────────┘
```

The sorting algorithm consists of three Transforms, which are:

`PartialSortingTransform`

sorts individual Chunks;`MergeSortingTransform`

sorts a single stream;`MergingSortedTransform`

merges multiple ordered streams to perform a global sort-merge.

If the `ORDER BY`

fields in the query match the prefix columns of the table’s `ORDER BY`

keys, the query can be optimized based on the data’s ordered characteristics. The optimization switch for this is: `optimize_read_in_order`

.

### 1.2 Queries Matching Index Columns

The following query’s `ORDER BY`

fields match the prefix columns of the table’s `ORDER BY`

keys.

`query_3: EXPLAIN PIPELINE SELECT b FROM test_in_order ORDER BY a ASC, b ASCSETTINGS optimize_read_in_order = 0 -- close read_in_order`

Let’s examine the pipeline execution plan for the `ORDER BY`

statement.”

```
┌─explain───────────────────────────┐
│ (Expression) │
│ ExpressionTransform │
│ (Sorting) │
│ MergingSortedTransform 3 → 1 │
│ MergeSortingTransform × 3 │
│ (Expression) │
│ ExpressionTransform × 3 │
│ (ReadFromMergeTree) │
│ MergeTreeThread × 3 0 → 1 │
└───────────────────────────────────┘
```

The `ORDER BY`

operator’s algorithm in this case is as follows:

- First,
`MergeSortingTransform`

sorts the input stream. - Then,
`MergingSortedTransform`

merges the multiple sorted streams and outputs a single globally sorted stream, which is the final sorted result.

There is a question regarding the query plan when the `read_in_order`

optimization is disabled. The system directly assumes that the input to `MergeSortingTransform`

is ordered within each Chunk. This is actually an implicit optimization because the `ORDER BY`

fields in the query match the prefix columns of the table’s `ORDER BY`

keys, ensuring that the data within each Chunk is inherently ordered.

### 1.3 Enable optimize_read_in_order

```
┌─explain──────────────────────────┐
│ (Expression) │
│ ExpressionTransform │
│ (Sorting) │
│ MergingSortedTransform 3 → 1 │
│ (Expression) │
│ ExpressionTransform × 3 │
│ (ReadFromMergeTree) │
│ MergeTreeInOrder × 3 0 → 1 │
└──────────────────────────────────┘
```

### 1.4 Analysis

After enabling `optimize_read_in_order`

:

**For computation**: The algorithm includes only one`MergingSortedTransform`

, omitting the step of sorting within individual streams.**For memory usage**: Since`MergeSortingTransform`

is the most memory-intensive step, the optimization can save a significant amount of memory.**For pipeline blocking**:`MergeSortingTransform`

can block the entire pipeline, so the optimization also eliminates this blocking.

## 2. Distinct Operator

If the distinct fields in a `DISTINCT`

query match the prefix columns of the table’s `ORDER BY`

keys, the query can be optimized based on the ordered nature of the data. The optimization can be enabled using the `optimize_distinct_in_order`

setting. This can be demonstrated through the following experiment:

### 2.1 Implementation of the Distinct Operator

Viewing the Pipeline Execution Plan for a `DISTINCT`

Query

`query_2: EXPLAIN PIPELINE SELECT DISTINCT * FROM woo.test_in_order SETTINGS optimize_distinct_in_order = 0 -- disable optimize_distinct_in_order`

```
┌─explain─────────────────────────────┐
│ (Expression) │
│ ExpressionTransform │
│ (Distinct) │
│ DistinctTransform │
│ Resize 3 → 1 │
│ (Distinct) │
│ DistinctTransform × 3 │
│ (Expression) │
│ ExpressionTransform × 3 │
│ (ReadFromMergeTree) │
│ MergeTreeThread × 3 0 → 1 │
└─────────────────────────────────────┘
```

The DISTINCT operator uses a two-stage approach. First, the initial `DistinctTransform`

performs a preliminary distinct operation internally with a parallelism of 3, meaning that you can think of it as having 3 threads executing simultaneously. Then, the second `DistinctTransform`

performs the final distinct operation.

The computation method for each `DistinctTransform`

is as follows: first, a `HashSet`

data structure is constructed. Then, based on the `HashSet`

, a filter mask is built (if the current key exists in the `HashSet`

, it is filtered out). Finally, the unnecessary data is filtered out.

### 2.2 Enable optimize_distinct_in_order

You can see that the preliminary distinct and final distinct operations use different transforms, namely `DistinctSortedStreamTransform`

and `DistinctTransform`

.

**DistinctSortedStreamTransform:** This transform performs the distinct operation on data within a single stream. Since the distinct columns match the prefix columns of the table’s `ORDER BY`

keys, and the scan operator reads data such that each stream reads from only one part, the input data for each distinct transform is ordered. Therefore, the distinct algorithm is as follows:

**Algorithm for DistinctSortedStreamTransform 1:**

In the `Transform`

, the last input data is kept as the state. For each new input data, if it is the same as the retained state, it is ignored. If it is different, the previous state is output to the previous operator, and the current data is retained as the new state. This algorithm significantly reduces both the time and space complexity for globally deduplicating within the entire stream.

**DistinctSortedStreamTransform Algorithm 2 (Used by ClickHouse):**

In this algorithm, the `Transform`

processes each chunk (the basic unit of data processing in ClickHouse, typically around 65,000 rows). The steps are as follows:

**Divide Data into Ranges:**First, the transform identifies and divides the same data into multiple ranges within the chunk.**Set a Mask Array:**A mask array is set up to mark which rows are duplicates.**Remove Duplicate Data:**Using the mask array, the transform removes the duplicate data from the chunk.**Return the Processed Chunk:**Finally, the transform returns the chunk with the duplicates removed.

This method ensures efficient deduplication within each chunk, leveraging the ordered nature of the data to streamline the process.

### 2.3 Analysis

Enabling `optimize_distinct_in_order`

optimizes the first stage of the DISTINCT operation by switching from a hash-based filtering algorithm to one that handles consecutive identical values.

**Computation:**The optimized algorithm eliminates the need for hash calculations but introduces equality checks instead. Depending on the dataset’s cardinality, each approach has its pros and cons.**Memory Usage:**The optimized algorithm does not require storing a`HashSet`

.**Pipeline Blocking:**Both before and after optimization, the pipeline will not be blocked.

## 3. Aggregating Operator

If the `GROUP BY`

query’s group by fields match the prefix columns of the table’s order by keys, the query can be optimized based on the ordered nature of the data. The optimization switch for this is `optimize_aggregation_in_order`

.

### 3.1 Implementation of the Aggregating Operator

To view the pipeline execution plan of a GROUP BY statement.

`query_4: EXPLAIN PIPELINE SELECT a FROM test_in_order GROUP BY a SETTINGS optimize_aggregation_in_order = 0 -- disable read_in_order optimization`

```
┌─explain─────────────────────────────┐
│ (Expression) │
│ ExpressionTransform × 8 │
│ (Aggregating) │
│ Resize 3 → 8 │
│ AggregatingTransform × 3 │
│ StrictResize 3 → 3 │
│ (Expression) │
│ ExpressionTransform × 3 │
│ (ReadFromMergeTree) │
│ MergeTreeThread × 3 0 → 1 │
└─────────────────────────────────────┘
```

The overall algorithm for the aggregation operator is not fully displayed in the execution plan. At a macro level, it employs a two-stage aggregation algorithm. The complete algorithm is as follows:

**AggregatingTransform**performs the initial aggregation. This step can be executed in parallel.**ConvertingAggregatedToChunksTransform**performs the second stage of aggregation.

(PS: For simplicity, the explanation ignores the introduction of two-level HashMap and spill to disk mechanisms.)

### 3.2 Enable optimize_aggregation_in_order

The following is the query plan:

```
┌─explain───────────────────────────────────────┐
│ (Expression) │
│ ExpressionTransform × 8 │
│ (Aggregating) │
│ MergingAggregatedBucketTransform × 8 │
│ Resize 1 → 8 │
│ FinishAggregatingInOrderTransform 3 → 1 │
│ AggregatingInOrderTransform × 3 │
│ (Expression) │
│ ExpressionTransform × 3 │
│ (ReadFromMergeTree) │
│ MergeTreeInOrder × 3 0 → 1 │
└───────────────────────────────────────────────┘
```

When the `optimize_aggregation_in_order`

setting is enabled, the aggregation algorithm consists of three steps:

**AggregatingInOrderTransform**performs preliminary aggregation by grouping consecutive identical keys within a stream. After this pre-aggregation, there will be only one entry for each unique key within the current stream.**FinishAggregatingInOrderTransform**re-groups data received from multiple streams to ensure that the data in the output chunks is ordered. For instance, if the maximum`group by`

key in the previous chunk is 5, the current chunk being output will not contain any data with keys greater than 5.**MergingAggregatedBucketTransform**performs the final merge aggregation.

The grouping algorithm for the **FinishAggregatingInOrderTransform** works as follows:

Assume there are 3 streams. The current operator will maintain 3 Chunks, one for each stream. The process is as follows:

- Initially, the operator selects the last entry from each of the 3 Chunks and identifies the minimum value among them. For example, if the minimum value is 5, it will then proceed to the next step.
- The operator then extracts all data from the 3 Chunks that is less than this minimum value (in this case, 5). These data entries are processed and output together.
- This process is repeated iteratively. If a Chunk is exhausted (i.e., all its data entries have been processed), a new Chunk is fetched from the corresponding stream.

By following this algorithm, the operator ensures that the data in the output chunks remains ordered.

This algorithm ensures that the maximum value of the Chunk output by FinishAggregatingInOrderTransform is less than the minimum value of the next Chunk, facilitating optimization in subsequent steps.

### 3.3 Analysis

Enabling `optimize_aggregation_in_order`

primarily optimizes the first stage of aggregation, transitioning from a HashMap-based algorithm to one based on consecutive identical values.

**Computation**: The optimized algorithm reduces the number of hash calculations but increases the number of equality checks. Depending on the size and distribution of the data set, this can have varying impacts on performance.**Memory Usage**: There is no significant difference in memory usage before and after the optimization.**Pipeline Blocking**: There is no significant difference in pipeline blocking before and after the optimization.

## 4. Summary of In-order Optimization

In the entire query plan, the Sort, Distinct, and Aggregation operators are often the bottlenecks of the query, making them worthy of deep optimization. ClickHouse optimizes the algorithms of these operators by leveraging the ordered nature of the input data, achieving varying degrees of optimization in terms of computation, memory usage, and pipeline blocking.

The viewpoints expressed in this article are based on reading the ClickHouse source code and the author’s understanding. If you have any questions, feel free to reach out for discussion.

This work is licensed under a Creative Commons Attribution 4.0 International License. When redistributing, please include the original link.