Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
lizc9 authored Jan 16, 2025
2 parents 4cf35f0 + 6a1e477 commit 0953f6c
Show file tree
Hide file tree
Showing 218 changed files with 10,391 additions and 2,796 deletions.
15 changes: 14 additions & 1 deletion docs/content/append-table/streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,20 @@ You can streaming write to the Append table in a very flexible way through Flink
Flink, using it like a queue. The only difference is that its latency is in minutes. Its advantages are very low cost
and the ability to push down filters and projection.

## Automatic small file merging
## Pre small files merging

Pre means that this compact occurs before committing files to the snapshot.

If Flink's checkpoint interval is short (for example, 30 seconds), each snapshot may produce lots of small changelog
files. Too many files may put a burden on the distributed storage cluster.

In order to compact small changelog files into large ones, you can set the table option `precommit-compact = true`.
Default value of this option is false, if true, it will add a compact coordinator and worker operator after the writer
operator, which copies changelog files into large ones.

## Post small files merging

Post means that this compact occurs after committing files to the snapshot.

In streaming writing job, without bucket definition, there is no compaction in writer, instead, will use
`Compact Coordinator` to scan the small files and pass compaction task to `Compact Worker`. In streaming mode, if you
Expand Down
42 changes: 21 additions & 21 deletions docs/content/concepts/system-tables.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,45 +279,45 @@ You can query all manifest files contained in the latest snapshot or the specifi
SELECT * FROM my_table$manifests;

/*
+--------------------------------+-------------+------------------+-------------------+---------------+
| file_name | file_size | num_added_files | num_deleted_files | schema_id |
+--------------------------------+-------------+------------------+-------------------+---------------+
| manifest-f4dcab43-ef6b-4713... | 12365| 40 | 0 | 0 |
| manifest-f4dcab43-ef6b-4713... | 1648 | 1 | 0 | 0 |
+--------------------------------+-------------+------------------+-------------------+---------------+
+--------------------------------+-------------+------------------+-------------------+---------------+---------------------+---------------------+
| file_name | file_size | num_added_files | num_deleted_files | schema_id | min_partition_stats | max_partition_stats |
+--------------------------------+-------------+------------------+-------------------+---------------+---------------------+---------------------+
| manifest-f4dcab43-ef6b-4713... | 12365| 40 | 0 | 0 | {20230315, 00} | {20230315, 20} |
| manifest-f4dcab43-ef6b-4713... | 1648 | 1 | 0 | 0 | {20230115, 00} | {20230316, 23} |
+--------------------------------+-------------+------------------+-------------------+---------------+---------------------+---------------------+
2 rows in set
*/

-- You can also query the manifest with specified snapshot
SELECT * FROM my_table$manifests /*+ OPTIONS('scan.snapshot-id'='1') */;
/*
+--------------------------------+-------------+------------------+-------------------+---------------+
| file_name | file_size | num_added_files | num_deleted_files | schema_id |
+--------------------------------+-------------+------------------+-------------------+---------------+
| manifest-f4dcab43-ef6b-4713... | 12365| 40 | 0 | 0 |
+--------------------------------+-------------+------------------+-------------------+---------------+
+--------------------------------+-------------+------------------+-------------------+---------------+---------------------+---------------------+
| file_name | file_size | num_added_files | num_deleted_files | schema_id | min_partition_stats | max_partition_stats |
+--------------------------------+-------------+------------------+-------------------+---------------+---------------------+---------------------+
| manifest-f4dcab43-ef6b-4713... | 12365| 40 | 0 | 0 | {20230315, 00} | {20230315, 20} |
+--------------------------------+-------------+------------------+-------------------+---------------+---------------------+---------------------+
1 rows in set
*/

- You can also query the manifest with specified tagName
SELECT * FROM my_table$manifests /*+ OPTIONS('scan.tag-name'='tag1') */;
/*
+--------------------------------+-------------+------------------+-------------------+---------------+
| file_name | file_size | num_added_files | num_deleted_files | schema_id |
+--------------------------------+-------------+------------------+-------------------+---------------+
| manifest-f4dcab43-ef6b-4713... | 12365| 40 | 0 | 0 |
+--------------------------------+-------------+------------------+-------------------+---------------+
+--------------------------------+-------------+------------------+-------------------+---------------+---------------------+---------------------+
| file_name | file_size | num_added_files | num_deleted_files | schema_id | min_partition_stats | max_partition_stats |
+--------------------------------+-------------+------------------+-------------------+---------------+---------------------+---------------------+
| manifest-f4dcab43-ef6b-4713... | 12365| 40 | 0 | 0 | {20230315, 00} | {20230315, 20} |
+--------------------------------+-------------+------------------+-------------------+---------------+---------------------+---------------------+
1 rows in set
*/

- You can also query the manifest with specified timestamp in unix milliseconds
SELECT * FROM my_table$manifests /*+ OPTIONS('scan.timestamp-millis'='1678883047356') */;
/*
+--------------------------------+-------------+------------------+-------------------+---------------+
| file_name | file_size | num_added_files | num_deleted_files | schema_id |
+--------------------------------+-------------+------------------+-------------------+---------------+
| manifest-f4dcab43-ef6b-4713... | 12365| 40 | 0 | 0 |
+--------------------------------+-------------+------------------+-------------------+---------------+
+--------------------------------+-------------+------------------+-------------------+---------------+---------------------+---------------------+
| file_name | file_size | num_added_files | num_deleted_files | schema_id | min_partition_stats | max_partition_stats |
+--------------------------------+-------------+------------------+-------------------+---------------+---------------------+---------------------+
| manifest-f4dcab43-ef6b-4713... | 12365| 40 | 0 | 0 | {20230315, 00} | {20230315, 20} |
+--------------------------------+-------------+------------------+-------------------+---------------+---------------------+---------------------+
1 rows in set
*/
```
Expand Down
16 changes: 14 additions & 2 deletions docs/content/engines/doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ under the License.

This documentation is a guide for using Paimon in Doris.

> More details can be found in [Apache Doris Website](https://doris.apache.org/docs/lakehouse/datalake-analytics/paimon/)
> More details can be found in [Apache Doris Website](https://doris.apache.org/docs/dev/lakehouse/catalogs/paimon-catalog)
## Version

Expand Down Expand Up @@ -65,9 +65,21 @@ CREATE CATALOG `paimon_hms` PROPERTIES (
"hive.metastore.uris" = "thrift://172.21.0.44:7004",
"hadoop.username" = "hadoop"
);

-- Integrate with Aliyun DLF
CREATE CATALOG paimon_dlf PROPERTIES (
'type' = 'paimon',
'paimon.catalog.type' = 'dlf',
'warehouse' = 'oss://paimon-bucket/paimonoss/',
'dlf.proxy.mode' = 'DLF_ONLY',
'dlf.uid' = 'xxxxx',
'dlf.region' = 'cn-beijing',
'dlf.access_key' = 'ak',
'dlf.secret_key' = 'sk'
);
```

See [Apache Doris Website](https://doris.apache.org/docs/lakehouse/datalake-analytics/paimon/) for more examples.
See [Apache Doris Website](https://doris.apache.org/docs/dev/lakehouse/catalogs/paimon-catalog) for more examples.

## Access Paimon Catalog

Expand Down
2 changes: 1 addition & 1 deletion docs/content/engines/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ under the License.
| Flink | 1.15 - 1.20 |||| ✅(1.17+) |||| ✅(1.17+) |||
| Spark | 3.2 - 3.5 ||||| ✅(3.3+) | ✅(3.3+) |||| ✅(3.3+) |
| Hive | 2.1 - 3.1 |||||||||||
| Trino | 420 - 439 || ✅(427+) | ✅(427+) | ✅(427+) |||||||
| Trino | 420 - 440 || ✅(427+) | ✅(427+) | ✅(427+) |||||||
| Presto | 0.236 - 0.280 |||||||||||
| [StarRocks](https://docs.starrocks.io/docs/data_source/catalog/paimon_catalog/) | 3.1+ |||||||||||
| [Doris](https://doris.apache.org/docs/lakehouse/datalake-analytics/paimon) | 2.0.6+ |||||||||||
Expand Down
25 changes: 25 additions & 0 deletions docs/content/flink/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,31 @@ All available procedures are listed below.
CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => false, parallelism => '5', mode => 'local')
</td>
</tr>
<tr>
<td>remove_unexisting_files</td>
<td>
-- Use named argument<br/>
CALL [catalog.]sys.remove_unexisting_files(`table` => 'identifier', dry_run => 'dryRun', parallelism => 'parallelism') <br/><br/>
-- Use indexed argument<br/>
CALL [catalog.]sys.remove_unexisting_files('identifier')<br/><br/>
CALL [catalog.]sys.remove_unexisting_files('identifier', 'dryRun', 'parallelism')
</td>
<td>
Procedure to remove unexisting data files from manifest entries. See <a href="https://paimon.apache.org/docs/master/api/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.html">Java docs</a> for detailed use cases. Arguments:
<li>identifier: the target table identifier. Cannot be empty, you can use database_name.* to clean whole database.</li>
<li>dryRun (optional): only check what files will be removed, but not really remove them. Default is false.</li>
<li>parallelism (optional): number of parallelisms to check files in the manifests.</li>
<br>
Note that user is on his own risk using this procedure, which may cause data loss when used outside from the use cases listed in Java docs.
</td>
<td>
-- remove unexisting data files in the table `mydb.myt`
CALL sys.remove_unexisting_files(`table` => 'mydb.myt')
<br>
-- only check what files will be removed, but not really remove them (dry run)
CALL sys.remove_unexisting_files(`table` => 'mydb.myt', `dry_run` = true)
</td>
</tr>
<tr>
<td>reset_consumer</td>
<td>
Expand Down
30 changes: 27 additions & 3 deletions docs/content/flink/sql-write.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,13 @@ CREATE TABLE my_partitioned_table (
'partition.time-interval'='1 d',
'partition.idle-time-to-done'='15 m',
'partition.mark-done-action'='done-partition'
-- You can also customize a PartitionMarkDoneAction to mark the partition completed.
-- 'partition.mark-done-action'='done-partition,custom',
-- 'partition.mark-done-action.custom.class'='org.apache.paimon.CustomPartitionMarkDoneAction'
);
```

You can also customize a PartitionMarkDoneAction to mark the partition completed.
- partition.mark-done-action: custom
- partition.mark-done-action.custom.class: The partition mark done class for implement PartitionMarkDoneAction interface (e.g. org.apache.paimon.CustomPartitionMarkDoneAction).

Define a class CustomPartitionMarkDoneAction to implement the PartitionMarkDoneAction interface.
```java
package org.apache.paimon;
Expand All @@ -282,6 +284,28 @@ public class CustomPartitionMarkDoneAction implements PartitionMarkDoneAction {
}
```

Paimon also support http-report partition mark done action, this action will report the partition to the remote http server.
- partition.mark-done-action: http-report
- partition.mark-done-action.http.url : Action will report the partition to the remote http server.
- partition.mark-done-action.http.timeout : Http client connection timeout and default is 5s.
- partition.mark-done-action.http.params : Http client request params in the request body json.

Http Post request body :
```json
{
"table": "table fullName",
"path": "table location path",
"partition": "mark done partition",
"params" : "custom params"
}
```
Http Response body :
```json
{
"result": "success"
}
```

1. Firstly, you need to define the time parser of the partition and the time interval between partitions in order to
determine when the partition can be properly marked done.
2. Secondly, you need to define idle-time, which determines how long it takes for the partition to have no new data,
Expand Down
2 changes: 1 addition & 1 deletion docs/content/maintenance/rescale-bucket.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ The pipeline has been running well for the past few weeks. However, the data vol
and the job's latency keeps increasing. To improve the data freshness, users can
- Suspend the streaming job with a savepoint ( see
[Suspended State](https://nightlies.apache.org/flink/flink-docs-stable/docs/internals/job_scheduling/) and
[Stopping a Job Gracefully Creating a Final Savepoint](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/cli/) )
[Stopping a Job Gracefully Creating a Final Savepoint](https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/cli/#terminating-a-job) )
```bash
$ ./bin/flink stop \
--savepointPath /tmp/flink-savepoints \
Expand Down
2 changes: 1 addition & 1 deletion docs/content/primary-key-table/changelog-producer.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,6 @@ For `input`, `lookup`, `full-compaction` 'changelog-producer'.
If Flink's checkpoint interval is short (for example, 30 seconds) and the number of buckets is large, each snapshot may
produce lots of small changelog files. Too many files may put a burden on the distributed storage cluster.

In order to compact small changelog files into large ones, you can set the table option `changelog.precommit-compact = true`.
In order to compact small changelog files into large ones, you can set the table option `precommit-compact = true`.
Default value of this option is false, if true, it will add a compact coordinator and worker operator after the writer
operator, which copies changelog files into large ones.
2 changes: 2 additions & 0 deletions docs/content/primary-key-table/merge-engine/aggregation.md
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,8 @@ If you allow some functions to ignore retraction messages, you can configure:

The `last_value` and `last_non_null_value` just set field to null when accept retract messages.

The `product` will return null for retraction message when accumulator is null.

The `collect` and `merge_map` make a best-effort attempt to handle retraction messages, but the results are not
guaranteed to be accurate. The following behaviors may occur when processing retraction messages:

Expand Down
2 changes: 1 addition & 1 deletion docs/content/spark/auxiliary.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ The SHOW PARTITIONS statement is used to list partitions of a table. An optional
SHOW PARTITIONS my_table;

-- Lists partitions matching the supplied partition spec for my_table
SHOW PARTITIONS my_table PARTITION (dt=20230817);
SHOW PARTITIONS my_table PARTITION (dt='20230817');
```

## Show Table Extended
Expand Down
18 changes: 18 additions & 0 deletions docs/content/spark/procedures.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,24 @@ This section introduce all available spark procedures about paimon.
CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true, parallelism => '5', mode => 'local')
</td>
</tr>
<tr>
<td>remove_unexisting_files</td>
<td>
Procedure to remove unexisting data files from manifest entries. See <a href="https://paimon.apache.org/docs/master/api/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.html">Java docs</a> for detailed use cases. Arguments:
<li>identifier: the target table identifier. Cannot be empty, you can use database_name.* to clean whole database.</li>
<li>dryRun (optional): only check what files will be removed, but not really remove them. Default is false.</li>
<li>parallelism (optional): number of parallelisms to check files in the manifests.</li>
<br>
Note that user is on his own risk using this procedure, which may cause data loss when used outside from the use cases listed in Java docs.
</td>
<td>
-- remove unexisting data files in the table `mydb.myt`
CALL sys.remove_unexisting_files(table => 'mydb.myt')
<br>
-- only check what files will be removed, but not really remove them (dry run)
CALL sys.remove_unexisting_files(table => 'mydb.myt', dry_run = true)
</td>
</tr>
<tr>
<td>repair</td>
<td>
Expand Down
Loading

0 comments on commit 0953f6c

Please sign in to comment.