Flink Table Store
Contents
Flink Table Store#
Flink Table Store is a unified storage to build dynamic tables for both streaming and batch processing in Flink, supporting high-speed data ingestion and timely data query.
Tip
This article assumes that you have mastered the basic knowledge and operation of Flink Table Store. For the knowledge about Flink Table Store not mentioned in this article, you can obtain it from its Official Documentation.
By using kyuubi, we can run SQL queries towards Flink Table Store which is more convenient, easy to understand, and easy to expand than directly using flink to manipulate Flink Table Store.
Flink Table Store Integration#
To enable the integration of kyuubi flink sql engine and Flink Table Store, you need to:
Referencing the Flink Table Store dependencies
Dependencies#
The classpath of kyuubi flink sql engine with Flink Table Store supported consists of
kyuubi-flink-sql-engine-1.6.0-incubating_2.12.jar, the engine jar deployed with Kyuubi distributions
a copy of flink distribution
flink-table-store-dist-<version>.jar (example: flink-table-store-dist-0.2.jar), which can be found in the Maven Central
In order to make the Flink Table Store packages visible for the runtime classpath of engines, we can use these methods:
Put the Flink Table Store packages into
$FLINK_HOME/lib
directlySetting the HADOOP_CLASSPATH environment variable or copy the Pre-bundled Hadoop Jar to flink/lib.
Warning
Please mind the compatibility of different Flink Table Store and Flink versions, which can be confirmed on the page of Flink Table Store multi engine support.
Flink Table Store Operations#
Taking CREATE CATALOG
as a example,
CREATE CATALOG my_catalog WITH (
'type'='table-store',
'warehouse'='hdfs://nn:8020/warehouse/path' -- or 'file:///tmp/foo/bar'
);
USE CATALOG my_catalog;
Taking CREATE TABLE
as a example,
CREATE TABLE MyTable (
user_id BIGINT,
item_id BIGINT,
behavior STRING,
dt STRING,
PRIMARY KEY (dt, user_id) NOT ENFORCED
) PARTITIONED BY (dt) WITH (
'bucket' = '4'
);
Taking Query Table
as a example,
SET 'execution.runtime-mode' = 'batch';
SELECT * FROM orders WHERE catalog_id=1025;
Taking Streaming Query
as a example,
SET 'execution.runtime-mode' = 'streaming';
SELECT * FROM MyTable /*+ OPTIONS ('log.scan'='latest') */;
Taking ``Rescale Bucket` as a example,
ALTER TABLE my_table SET ('bucket' = '4');
INSERT OVERWRITE my_table PARTITION (dt = '2022-01-01');