浣滆€咃綔闄堝┃鏁?br> 鏈枃绠€瑕佸洖椤句簡鏁版嵁鍏ユ箹锛堜粨锛夌殑鍙戝睍闃舵锛岄拡瀵瑰湪鏁版嵁搴撴暟鎹叆婀栦腑闈复鐨勯棶棰橈紝鎻愬嚭浜嗕娇鐢?Flink Table Store 浣滀负鍏ㄥ閲忎竴浣撳叆婀栫殑瑙e喅鏂规锛屽苟杈呬互寮€婧?Demo 鐨勬祴璇曠粨鏋滀綔涓哄睍绀恒€傛枃绔犱富瑕佸唴瀹瑰寘鎷細
鏁版嵁搴撴暟鎹泦鎴愬叆婀栵紙浠擄級鐨勫彂灞曢樁娈靛強闈复鐥涚偣
鍩轰簬 Apache Flink Table Store 瑙e喅鍏ㄥ閲忎竴浣撳叆婀?/p>
鎬荤粨涓庡睍鏈?/p>
1. 鏁版嵁鍏ユ箹锛堜粨锛夊彂灞曢樁娈靛強闈复鐥涚偣
鍩轰簬鏁版嵁搴撶殑鏁版嵁闆嗘垚杩囩▼锛岀畝瑕佹潵璇寸粡鍘嗕簡濡備笅鍑犱釜闃舵銆?/p>
1.1 鍏ㄩ噺 + 瀹氭湡澧為噺鐨勬暟鎹叆浠?/h3>
Fig.1 鏁版嵁鍏ヤ粨: 涓€娆″叏閲?鍛ㄦ湡澧為噺
鍏ㄩ噺鏁版嵁閫氳繃 bulk load 涓€娆℃€у鍏ワ紝瀹氭椂璋冨害澧為噺鍚屾浠诲姟浠庢暟鎹簱鍚屾澧為噺鍒颁复鏃惰〃锛屽啀涓庡叏閲忔暟鎹繘琛屽悎骞躲€傝繖绉嶆柟寮忚櫧鐒惰兘婊¤冻涓€瀹氱殑涓氬姟闇€姹傦紝浣嗘槸涔熷瓨鍦ㄤ互涓嬮棶棰樸€?/p>
:::
鈿狅笍 閾捐矾澶嶆潅锛屾椂鏁堟€у樊
:::
鍏ㄩ噺涓庡閲忛渶瑕佸畾鏈熺殑鍚堝苟浠ヨ幏鍙栨渶鏂扮殑鏁版嵁蹇収锛岀敱浜庝笉鏀寔璁板綍绾у埆鐨勬洿鏂帮紝鐢ㄦ埛闇€瑕侀澶栫殑 SQL 浠诲姟鍘昏绠楀幓閲嶏紱鏁版嵁鏂伴矞搴︿緷璧栦簬璋冨害锛岃嫢鏁版嵁瀛樺湪鏅氬埌锛屽垯杩橀渶瑕佸鐞嗘暟鎹紓绉绘儏鍐碉紝涓€绉嶅父瑙佸鐞嗘柟寮忔槸鍦?T +N 鏃跺埢锛坧rocessing time锛夎皟搴︿骇鐢?T 锛坋vent time锛夌殑鍚堝苟浠诲姟锛屽悓鏃跺彇 T ~ T + N 涓垎鍖猴紙processing time锛夛紝鍐嶄粠涓繃婊ゅ嚭涓氬姟鏃堕棿灏忎簬绛変簬 T 锛坋vent time锛夌殑鏁版嵁杩涜鍚堝苟锛岃繖浼氬鑷存暟鎹柊椴滃害杩涗竴姝ラ檷浣庛€?/p>
:::
鈿狅笍 鏄庣粏鏌ヨ鎱紝鎺掓煡闂闅?br>
:::
铏界劧涓嬫父浼氫娇鐢ㄥ悇绉嶄氦浜掑紡鍒嗘瀽寮曟搸鏉ュ姞閫熸煡璇紝浣嗗熀浜庢垚鏈€冭檻锛屽簳琛ㄦ槑缁嗘暟鎹竴鑸病鏈夎繖绉嶅緟閬囷紝杩欏氨瀵艰嚧鍦ㄦ暟鎹纭€ф帓鏌ユ椂闇€瑕佺洿鎺ユ煡璇㈡槑缁嗭紝鐗瑰埆鏄渶瑕佹煡璇㈠悎骞跺墠鍚庡叏閲忓拰澧為噺鐨勬槑缁嗗彉鍖栨潵瀹氫綅闂銆傚鏋滀笟鍔″彉鏇达紝瀵艰嚧涓€鎵硅鍗曟暟鎹渶瑕佽姝e苟瑕佹眰鐢熸垚璁㈡涔嬪悗鐨勫悇绫绘寚鏍囷紝鍒欓渶瑕佹墜宸ュ鍘熷琛ㄥ強鍏朵笅娓镐緷璧栬〃杩涜绾ц仈璁㈡銆?/p>
1.2 鍏ㄩ噺 + 瀹炴椂澧為噺鐨勬暟鎹叆婀?/h3>
鐩稿浜庝紶缁熸暟鎹粨搴擄紝鏁版嵁婀栫殑鍑虹幇浣垮緱鏁版嵁鍦ㄤ互浣庢垚鏈瓨鍌ㄧ殑鍚屾椂锛屾暟鎹柊椴滃害鏈変簡鏋佸ぇ鐨勬彁鍗囥€備互 Apache Hudi 涓轰緥锛屽畠鏀寔鍏堝仛涓€娆″叏閲?bootstrap 鏋勫缓鍩虹琛紝鐒跺悗鍩轰簬鏂版帴鍏ョ殑 CDC 鏁版嵁杩涜瀹炴椂鏋勫缓 鍩轰簬 Hudi 鐨勬箹浠撲竴浣撴妧鏈湪 Shopee 鐨勫疄璺?sup>[1]锛屽 Fig.2 鎵€绀恒€?/p>
Fig.2 鏁版嵁鍏ユ箹: 涓€娆″叏閲?瀹炴椂澧為噺
鐢变簬鏀寔璁板綍绾у埆鐨勬洿鏂板強鍒犻櫎锛屽湪瀛樺偍渚у氨鍙互瀹屾垚涓婚敭鐨勫幓閲嶏紝涓嶅啀闇€瑕侀澶栫殑鍚堝苟浠诲姟銆傚湪鏁版嵁鏂伴矞搴︽柟闈紝鐢变簬娴佸紡浣滀笟浼氬畾鏈熺殑瑙﹀彂 checkpoint 鏉ヤ骇鐢熷叏閲忎笌澧為噺鍚堝苟鍚庣殑蹇収锛屾晠鑰屾暟鎹柊椴滃害瀵规瘮绗竴绉嶆柟寮忥紙浠ュぉ鎴栧皬鏃惰皟搴︿骇鐢熷悎骞跺揩鐓э級鏈変簡寰堝ぇ鎻愬崌銆?/p>
浣嗕粠鍙﹀涓€鏂归潰锛屾垜浠篃鍙戠幇杩欑鏂瑰紡鏈変互涓嬭繖浜涢棶棰樸€?/p>
:::
馃 Bootstrap Index 瓒呮椂鍙?state 鑶ㄨ儉
:::
浠ユ祦妯″紡鍚姩 Flink 澧為噺鍚屾浣滀笟鍚庯紝绯荤粺浼氬厛灏嗗叏閲忚〃瀵煎叆鍒?Flink state 鏉ユ瀯寤?Hoodie key锛堝嵆涓婚敭 + 鍒嗗尯璺緞锛夊埌鍐欏叆鏂囦欢鐨?file group 鐨勭储寮曪紝姝よ繃绋嬩細闃诲 checkpoint 瀹屾垚銆傝€屽彧鏈夊湪 checkpoint 鎴愬姛鍚庯紝鍐欏叆鐨勬暟鎹墠鍙互鍙樹负鍙鐘舵€侊紝鏁呰€屽綋鍏ㄩ噺鏁版嵁寰堝ぇ鏃讹紝鏈夊彲鑳戒細鍑虹幇 checkpoint 涓€鐩磋秴鏃剁殑鎯呭喌锛屽鑷翠笅娓歌涓嶅埌鏁版嵁銆傚彟澶栵紝鐢变簬绱㈠紩涓€鐩翠繚瀛樺湪 state 鍐咃紝鍦ㄥ閲忓悓姝ラ樁娈甸亣鍒颁簡 insert 绫诲瀷鐨勮褰曚篃浼氭洿鏂扮储寮曪紝闇€瑕佸悎鐞嗚瘎浼?state TTL锛岄厤缃お灏忓彲鑳戒細涓㈠け鏁版嵁锛岄厤缃繃澶у彲鑳藉鑷?state 鑶ㄨ儉銆?/p>
:::
馃 閾捐矾渚濈劧澶嶆潅锛岄毦浠ュ榻愬閲忕偣浣嶏紝鑷姩鍖栬繍缁存垚鏈珮
:::
鍏ㄩ噺 + 瀹炴椂澧為噺鐨勬柟寮忓苟娌℃湁绠€鍖栭摼璺殑澶嶆潅搴︼紝鍥犱负瀹冮澶栧紩鍏ヤ簡 Kafka 鐨勮繍缁达紝闇€瑕佹墜宸ュ榻愬閲忔秷璐圭殑鐐逛綅浠ラ槻姝㈡暟鎹涪澶?Change Data Capture with Debezium and Apache Hudi[2]銆傚湪鍚姩澧為噺 CDC 浣滀笟鍚庯紝鐢ㄦ埛闇€瑕佺瓑寰呭拰瑙傚療浣滀笟鐨勮繍琛岀姸鎬侊紝鍦ㄧ涓€娆?checkpoint 鎴愬姛鍚庯紝鏆傚仠浣滀笟锛坰top-with-savepoint锛変慨鏀归厤缃鐢?Bootstrap Index锛岀劧鍚庝粠 savepoint 閲嶅惎浣滀笟锛?restore-from-savepoint锛夈€傛暣涓繃绋嬫搷浣滃鏉傦紝瀹炵幇鑷姩鍖栬繍缁存垚鏈瘮杈冮珮銆?/p>
姝ゅ锛屾垜浠洖椤句簡涓€浜涗娇鐢?Hudi 鐨勮涓氬疄璺碉紝鍙戠幇鐢ㄦ埛闇€瑕佹牸澶栨敞鎰忓悇椤归厤缃潵瀹炵幇涓嶅悓闇€姹傦紝杩欏鏄撶敤鎬ф湁涓€瀹氱殑浼ゅ銆傛瘮濡?鍩轰簬 Hudi 鐨勬箹浠撲竴浣撴妧鏈湪 Shopee 鐨勫疄璺?sup>[1] 涓彁鍒扮殑闇€瑕佸湪骞冲彴灞傞潰鐩戞帶鐢ㄦ埛鐨勫缓琛ㄨ鍙ワ紝闃叉鍦ㄥぇ瑙勬ā鍐欏叆鍦烘櫙閰嶇疆涓?COW锛圕opy on Write锛?妯″紡锛涘叏澧為噺鍒囨崲鏃剁敤鎴峰繀椤绘牸澶栨敞鎰?Kafka 娑堣垂鐐逛綅鏉ヤ繚璇佹暟鎹噯纭€э紝鍙傛暟閰嶇疆鏋佸ぇ褰卞搷浜嗕綔涓氱殑鏁版嵁鍑嗙‘鎬у強鎬ц兘銆?/p>
2. 鍩轰簬 Apache Flink Table Store 鐨勫叏澧為噺涓€浣撳叆婀?/h2>
闅忕潃鍩轰簬鏃ュ織鐨?CDC 閫愭鍙栦唬鍩轰簬鏌ヨ鐨?CDC锛岀壒鍒槸 Flink SQL CDC 鍦?source 绔凡鏀寔鍏ㄥ閲忎竴浣撳悓姝ュ悗锛屽叏澧為噺涓€浣撳叆婀栵紙浣跨敤涓€涓祦浣滀笟瀹屾垚鍏ㄩ噺鍚屾銆佸苟鎸佺画鐩戝惉澧為噺 changelog锛変篃鎴愪负涓€涓柊鐨勬帰绱㈡柟鍚戙€傝繖绉嶆柟寮忛檷浣庝簡閾捐矾澶嶆潅搴︼紝鍚屾椂灏嗗叏澧為噺鍒囨崲鏃堕渶瑕佹墜宸ュ榻?offset 鐨勭箒鐞愭墭绠$粰浜?Flink CDC 鍜?checkpoint 鏈哄埗锛岃妗嗘灦灞傞潰鍘讳繚闅滄暟鎹殑鏈€缁堜竴鑷存€с€備絾缁忚繃璋冪爺鎴戜滑鍙戠幇锛屽湪浣跨敤 Hudi 鍋氳繖绉嶅皾璇曟椂閬囧埌浜嗕互涓嬫寫鎴樸€?/p>
:::
馃 鍏ㄩ噺鍚屾闃舵鏁版嵁涔卞簭涓ラ噸锛屽啓鍏ユ€ц兘鍜岀ǔ瀹氭€ч毦浠ヤ繚闅?br>
:::
鍦ㄥ叏閲忓悓姝ラ樁娈甸潰涓寸殑涓€涓棶棰樻槸澶氬苟鍙戝悓鏃惰鍙?chunk 浼氶亣鍒颁弗閲嶇殑鏁版嵁涔卞簭锛屽嚭鐜板悓鏃跺啓澶氫釜鍒嗗尯鐨勬儏鍐碉紝澶ч噺鐨勯殢鏈哄啓鍏ヤ細瀵艰嚧鎬ц兘鍥為€€锛屽嚭鐜板悶鍚愭瘺鍒猴紝姣忎釜鍒嗗尯瀵瑰簲鐨?writer 閮借缁存姢鍚勮嚜缂撳瓨锛屽緢瀹规槗鍙戠敓 OOM 瀵艰嚧浣滀笟涓嶇ǔ瀹氥€傝櫧鐒?Hudi 鏀寔閫氳繃 Rate Limit Apache Hudi DeltaStreamer#Rate Limit[3] 闄愬埗姣忓垎閽熺殑鏁版嵁鍐欏叆鏉ヨ捣鍒颁竴瀹氱殑骞虫粦鏁堟灉锛屼絾鍦ㄤ綔涓氱ǔ瀹氭€у拰鎬ц兘鍚炲悙涔嬮棿鍙栧緱骞宠 鐨勮皟浼樿繃绋嬪浜庝竴鑸敤鎴锋潵璇撮棬妲涗篃杈冮珮銆?/p>
2.1 涓轰粈涔堥€夋嫨 Flink Table Store
Apache Table Store https://github.com/apache/flink-table-store[4] 浣滀负 2022 骞村垵寮€婧愮殑 Apache Flink 瀛愰」鐩紝鐩爣鏄墦閫犱竴涓敮鎸佹洿鏂扮殑鎹箹瀛樺偍锛岀敤浜庡疄鏃舵祦寮?Changelog 鎽勫彇鍜岄珮鎬ц兘鏌ヨ銆?/p>
:::
馃殌 澶у悶鍚愰噺鐨勬洿鏂版暟鎹憚鍙栵紝鏀寔鍏ㄥ閲忎竴浣撳叆婀栵紝涓€涓?Flink 浣滀笟鎼炲畾鎵€鏈?br>
:::
Fig.3 Flink Table Store: 鍏ㄩ噺 + 澧為噺涓€浣撳寲鍚屾
鍥為【鍓嶆枃锛屾垜浠煡閬撳叏澧為噺涓€浣撳寲鍚屾鍏ユ箹鐨勪富瑕佹寫鎴樺湪浜庡叏閲忓悓姝ラ樁娈典骇鐢熶簡澶ч噺鏁版嵁涔卞簭寮曡捣鐨勯殢鏈哄啓鍏ワ紝瀵艰嚧鎬ц兘鍥為€€銆佸悶鍚愭瘺鍒哄強涓嶇ǔ瀹氥€傝€?Table Store 鐨勫瓨鍌ㄦ牸寮忛噰鐢ㄥ厛鍒嗗尯锛圥artition锛夊啀鍒嗘《锛圔ucket锛夛紝姣忎釜妗跺唴鍚勮嚜缁存姢涓€妫?LSM锛圠og-structured Merge Tree锛夌殑鏂瑰紡锛堝弬瑙?Fig.4銆丗ig.5锛夛紝姣忔潯璁板綍閫氳繃涓婚敭鍝堝笇钀藉叆妗跺唴鏃跺氨纭畾浜嗗啓鍏ヨ矾寰勶紙Directory锛夛紝浠?KV 鏂瑰紡鍐欏叆 MemTable 涓紙绫讳技浜?HashMap锛孠ey 灏辨槸涓婚敭锛孷alue 鏄褰曪級銆傚湪 flush 鍒扮鐩樼殑杩囩▼涓紝浠ヤ富閿帓搴忓悎骞讹紙鍘婚噸锛夛紝浠ヨ拷鍔犳柟寮忓啓鍏ョ鐩樸€係ort Merge 鍦?buffer 鍐呰繘琛岋紝閬垮厤浜嗛渶瑕佺偣鏌ョ储寮曟潵鍒ゆ柇涓€鏉¤褰曟槸 insert 杩樻槸 update 鏉ヨ幏鍙栧啓鍏ユ枃浠剁殑 file group 鐨?tagging Apache Hudi Technical Specification#Writer Expectations[5] 銆傚彟澶栵紝瑙﹀彂 MemTable flush 鍙戠敓鍦?buffer 鍏呮弧鏃讹紝涓嶉渶瑕侀澶栭€氳繃 Auto-File Sizing Apache Hudi Write Operations#Writing path[6]锛圓uto-File Sizing 浼氬奖鍝嶅啓鍏ラ€熷害 Apache Hudi File Sizing#Auto-Size During ingestion[7]锛夋潵閬垮厤灏忔枃浠朵骇鐢燂紝鏁翠釜鍐欏叆杩囩▼閮芥槸灞€閮ㄤ笖椤哄簭鐨?On Disk IO, Part 3: LSM Trees [8]锛岄伩鍏嶄簡闅忔満 IO 浜х敓銆?/p>
浣跨敤 Table Store 浣滀负婀栧瓨鍌ㄦ椂锛屽彧闇€瑕佷竴鏉?INSERT INTO 璇彞灏卞彲浠ュ畬鎴愬叏澧為噺鍚屾銆備互濡備笅 SQL 涓轰緥锛屽畠灞曠ず浜嗕娇鐢ㄤ竴涓?Flink 娴佷綔涓氬皢 MySQL 鏁版嵁搴撲腑鐨勮鍗曡〃閫氳繃 Streaming 鏂瑰紡鍐欏叆 Table Store 琛紝骞舵寔缁秷璐瑰閲忔暟鎹€?/p>
-- 鍒涘缓骞朵娇鐢?table store catalog
CREATE CATALOG `table_store` WITH (
'type' = 'table-store',
'warehouse' = 'hdfs://foo/bar'
);
USE CATALOG `table_store`;
-- 瀹氫箟 mysql-cdc source 琛?
CREATE TEMPORARY TABLE `orders_cdc` (
order_id BIGINT NOT NULL,
gmt_modified TIMESTAMP(3) NOT NULL,
...
PRIMARY KEY (`order_id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
...
);
-- 瀹氫箟 table store ods 琛紝鎸夋棩鏈熶綔鍒嗗尯
CREATE TABLE IF NOT EXISTS `orders` (
...
PRIMARY KEY (`dt`, `order_id`) NOT ENFORCED
) PARTITIONED BY (`dt`);
-- streaming 妯″紡涓嬫彁浜や綔涓?
SET 'execution.runtime-mode' = 'streaming';
-- 璁剧疆 1min 鐨?cp interval锛屽搴?1min 鐨勬暟鎹柊椴滃害
SET 'execution.checkpointing.interval' = '1min';
-- 涓€鏉?SQL 鍚屾鍏ㄩ噺 + 澧為噺锛屽姩鎬佸垎鍖哄啓鍏?
INSERT INTO `orders`
SELECT ..., DATE_FORMAT(gmt_modified, 'yyyy-MM-dd') AS dt
FROM `orders_cdc`;
Fig.4 灞曠ず浜嗕互 dt 浣滀负鍒嗗尯 orders 琛ㄧ殑瀛樺偍缁撴瀯锛屽湪鐢ㄦ埛鎸囧畾鎬?bucket 鏁?N 鍚庯紝姣忎釜鍒嗗尯涓嬩細鐢熸垚鐩稿簲鐨?bucket-{n} 鐨勮褰曟枃浠躲€?/p>
Fig.4 Flink Table Store 琛ㄧ殑鏂囦欢鐩綍
鍏冩暟鎹笌鏁版嵁瀛樺偍鍦ㄨ〃鐨勫悓涓€绾х洰褰曚笅锛屽寘鎷?manifest 鐩綍鍜?snapshot 鐩綍銆?/p>
manifest 鐩綍涓嬩腑璁板綍浜嗘瘡娆$粡 checkpoint 瑙﹀彂鑰屾彁浜ょ殑鏁版嵁鏂囦欢鍙樻洿锛屽寘鍚柊澧炲拰鍒犻櫎鐨勬暟鎹枃浠?/p>
snapshot 鐩綍涓嬭褰曚簡姣忔鎻愪氦浜х敓鐨?snapshot 鏂囦欢锛屽唴瀹瑰寘鎷负涓婁竴娆℃彁浜や骇鐢熺殑 manifest锛屽姞涓婃湰娆℃彁浜や骇鐢熺殑 manifest 浣滀负澧為噺
Fig.5 灞曠ず浜?Fig.4 涓瘡涓?bucket 鍐?LSM 瀹炵幇杩囩▼銆傛垜浠互 Flink 娴佷綔涓氫负渚嬶紝鍦ㄦ瘡娆?checkpoint 鏃讹紝Flink Table Store 浼氫骇鐢熶竴娆℃彁浜わ紙commit锛夛紝鍖呭惈浠ヤ笅淇℃伅
鐢熸垚褰撳墠 table 鐨勪竴涓揩鐓э紙snapshot锛夈€傜郴缁熶細閫氳繃 snapshot pointer file锛堢被浼间簬鎸囬拡锛夎拷韪渶鏃╀骇鐢熷拰褰撳墠鏈€鏂扮殑 snapshot 鏂囦欢
snapshot 鏂囦欢涓寘鍚簡鏈 commit 鏂板浜嗗摢浜?manifest 鏂囦欢銆佸垹闄や簡鍝簺 manifest 鏂囦欢
姣忎釜 manifest 鏂囦欢涓褰曚簡浜х敓浜嗗摢浜?sst 鏂囦欢銆佸垹闄や簡鍝簺 sst 鏂囦欢锛屼互鍙婃瘡涓?sst 鏂囦欢鎵€鍖呭惈璁板綍鐨勪富閿寖鍥淬€佹瘡涓瓧娈电殑 min/max/null count 绛夌粺璁′俊鎭?/p>
姣忎釜 sst 鏂囦欢鍒欏寘鍚簡鎸変富閿帓濂藉簭鐨勩€佸垪瀛樻牸寮忕殑璁板綍銆傚浜?Level 0 鐨勬枃浠讹紝Table Store 浼氬紓姝ュ湴瑙﹀彂 compact 鍚堝苟绾跨▼鏉ユ秷闄や富閿寖鍥撮噸鍙犲甫鏉ョ殑璇荤 merge 寮€閿€銆?/p>
Fig.5 Flink Table Store 琛ㄧ殑 LSM 瀹炵幇
鍦?Flink Table Store 0.2.0 鍙戝竷鏃讹紝鎴戜滑娴嬭瘯浜嗕簲浜挎潯鏁版嵁鍦ㄤ竴浜夸富閿笅鐨勫疄鏃舵洿鏂板満鏅啓鍏ワ紙鍖呭惈鎻掑叆涓庢洿鏂帮級骞朵笌 Apache Hudi MOR 鍙?COW 杩涜瀵规瘮 Apache Flink Table Store 0.2.0 鍙戝竷[9], Table Store 鍦ㄥぇ瑙勬ā瀹炴椂鏇存柊鍐欏叆鍦烘櫙鎷ユ湁鑹ソ鐨勫啓鍏ユ€ц兘銆?/p>
:::
馃殌 楂樻晥 Data Skipping 鏀寔杩囨护锛屾彁渚涢珮鎬ц兘鐨勭偣鏌ュ拰鑼冨洿鏌ヨ
:::
铏界劧娌℃湁棰濆鐨勭储寮曪紝浣嗘槸寰楃泭浜?meta 鐨勭鐞嗗拰鍒楀瓨鏍煎紡锛宮anifest 涓繚瀛樹簡
鏂囦欢鐨勪富閿殑 min/max 鍙婃瘡涓瓧娈电殑缁熻淇℃伅锛岃繖鍙互鍦ㄤ笉璁块棶鏂囦欢鐨勬儏鍐典笅锛岃繘琛屼竴浜?predicate 鐨勮繃婊?/p>
orc/parquet 鏍煎紡涓紝鏂囦欢鐨勫熬閮ㄨ褰曚簡绋€鐤忕储寮曪紝姣忎釜 chunk 鐨勭粺璁′俊鎭拰 offset锛岃繖鍙互閫氳繃鏂囦欢鐨勫熬閮ㄤ俊鎭紝杩涜涓€浜?predicate 鐨勮繃婊?/p>
鏁版嵁鍦ㄦ湁 filter 璇诲彇鏃讹紝鍙互鏍规嵁涓婅堪淇℃伅鍋氬涓嬭繃婊?/p>
璇诲彇 manifest锛氭牴鎹枃浠剁殑 min/max銆佸垎鍖猴紝鎵ц鍒嗗尯鍜屽瓧娈电殑 predicate锛屾窐姹板浣欑殑鏂囦欢
璇诲彇鏂囦欢 footer锛氭牴鎹?chunk 鐨?min/max锛岃繃婊や笉闇€瑕佽鍙栫殑 chunk
璇诲彇鍓╀笅涓庢枃浠朵互鍙婂叾涓殑 chunks
浠ヤ笂杩拌鍗曡〃 orders 涓轰緥锛屽綋鐢ㄦ埛鎯宠鏌ヨ dt = 2022-01-01 鍒嗗尯涓嬫墍鏈?order_id 鍦?100 ~ 200 涔嬮棿鐨勮鍗曟椂
SELECT * FROM orders WHERE dt = '2022-01-01' AND order_id >= 100 AND order_id <= 200;
Flink Table Store 浼氬厛鏍规嵁 LATEST-SNAPSHOT 鏂囦欢璇诲埌鏈€杩戜竴娆℃彁浜ょ殑 snapshot 鏂囦欢锛坮ead committed锛夛紝鐒跺悗浠?snapshot 涓鍙栧埌瀵瑰簲 manifest meta 鏂囦欢锛?鏍规嵁鍒嗗尯鏉′欢 dt='2022-01-01'锛岃繃婊ゅ嚭鍖呭惈杩欎簺鍒嗗尯鐨勭粺璁′俊鎭紝鐢变簬缁熻淇℃伅閲屽寘鍚簡姣忎釜 sst 鏂囦欢 key 鐨勮寖鍥达紝鎵€浠ョ户缁墽琛?order_id 鍦?[100, 200] 鍖洪棿杩欎釜杩囨护鏉′欢锛屽氨鑳藉湪 2022-01-01 杩欎釜鐩綍涓嬪彧璇诲彇瀵瑰簲鐨?sst 鏂囦欢銆?/p>
鎴戜滑鍚屾牱鍩轰簬涓婅堪鏁版嵁闆嗘祴璇曚簡 Flink Table Store 鐨勬煡璇㈡€ц兘 Apache Flink Table Store 0.2.0 鍙戝竷[9]锛屽湪鐐规煡鍜岃寖鍥存煡璇㈢殑鍦烘櫙涓嬶紝Flink Table Store 琛ㄧ幇鍑轰紬銆備粠瀹炵幇鍘熺悊鏉ヨ锛孧OR 鐨勬煡璇㈡€ц兘浣庝簬 COW銆丆OW 鐨勫啓鍏ユ€ц兘浣庝簬 MOR 鏄毦浠ラ伩鍏嶇殑銆傝€屽湪瀹炶返灞傞潰锛屽湪澶ц妯″啓鍏ュ満鏅笅寤虹珛鐨?MOR 琛ㄤ篃寰堥毦涓€閿浆鎹负 COW 鏉ヨ鍙栵紝鎵€浠ュ湪鏌ヨ鍐欏叆杈冨鐨勮〃锛圡OR 琛級杩欎釜鍓嶆彁涓嬶紝Flink Table Store 鐨勬煡璇㈣〃鐜拌繕鏄笉淇楃殑銆?/p>
:::
馃殌 鏂囦欢鏍煎紡鏀寔娴佽
:::
Flink Table Store 瀹炵幇浜?Incremental Scan锛屽湪娴佹ā寮忎笅锛屽彲浠ユ寔缁洃鍚枃浠舵洿鏂帮紝鏁版嵁鏂伴矞搴︿繚鎸佸湪鍒嗛挓绾у埆锛屽涓嬫墍绀恒€?/p>
-- 杩涘叆 SQL CLI锛屽垱寤?catalog 鍜?table
CREATE CATALOG table_store WITH (
'type' = 'table-store',
'warehouse' = 'file://foo/bar/' --鎴?'hdfs://foo/bar'
);
CREATE TABLE IF NOT EXIST my_table (
f0 INT,
f1 STRING,
PRIMARY KEY(f0) NOT ENFORCED
);
-- 鍒囨崲鍒?batch 妯″紡锛屽啓鍏ユ暟鎹?
SET 'execution.runtime-mode' = 'batch';
INSERT INTO my_table VALUES(1, 'Hello');
-- 鏂版墦寮€涓€涓?SQL CLI 涓紝鍒囨崲鍒?streaming 妯″紡锛屾彁浜ゆ祦寮忔煡璇?
SET 'execution.runtime-mode' = 'streaming';
SET 'sql-client.execution.result-mode' = 'tableau';
SELECT * FROM my_table;
-- 鍙互璇诲埌缁撴灉濡備笅
+----+-------------+--------------------------------+
| op | f0 | f1 |
+----+-------------+--------------------------------+
| +I | 1 | Hello |
-- 鍦ㄧ涓€涓?SQL CLI 涓紝缁х画鍐欏叆鏁版嵁
INSERT INTO my_table VALUES(1, 'Bye'), (2, '浣犲ソ');
-- 鍙互鍦ㄧ浜屼釜 SQL CLI 涓紝瑙傚療鍒版柊澧炶緭鍑?(-U, 1, Hello)锛?+U, 1, Bye) 鍜?(+I, 2, 浣犲ソ)
+----+-------------+--------------------------------+
| op | f0 | f1 |
+----+-------------+--------------------------------+
| +I | 1 | Hello |
| -U | 1 | Hello |
| +U | 1 | Bye |
| +I | 2 | 浣犲ソ |
2.2 鍩轰簬 TPC-H 鏁版嵁闆嗙殑鍏ㄥ閲忎竴浣撳叆婀?Demo
鍓嶆枃瀵?Flink Table Store 瑙e喅鍏ㄥ閲忎竴浣撳叆婀栬繘琛屼簡绠€瑕佸垎鏋愶紝涓嬮潰涓€涓疄渚嬫紨绀轰簡濡備綍鍦ㄦ湰鍦板崟鏈虹幆澧冧笅锛屽皢杩戝叚鍗冧竾鏉¤鍗曡褰曚綔涓哄叏閲忔暟鎹粠 MySQL 鍚屾鍒?Flink Table Store锛屽苟鎸佺画娑堣垂澧為噺鏇存柊锛堢敱 TPC-H RF1 鍜?RF2 浜х敓锛夛紝涓嬫父鎺ュ疄鏃惰仛鍚堝強鏌ヨ鐨勮繃绋嬨€?/p>
鏁版嵁婧愮敱 TPC-H 鑷姩鐢熸垚骞跺鍏?MySQL 锛岃繍琛屽湪 Docker 瀹瑰櫒鍐咃紝鏈湴鍙渶瑕佷笅杞?Flink release 鍖呭拰 Flink Table Store 渚濊禆鍗冲彲瀹屾垚銆?/p>
Demo 浣跨敤 lineitem 琛ㄤ腑鍙戣揣鏃ユ湡 l_shipdate 浣滀负涓氬姟瀛楁瀹氫箟浜嗕簩绾у垎鍖?l_year 鍜?l_month锛屾椂闂磋法搴︿粠 1992.1 ~ 1998.12锛屽嵆鍔ㄦ€佸啓鍏?84 涓垎鍖恒€?缁忔祴璇曪紝鍦ㄦ湰鍦板崟鏈哄苟鍙戜负 2锛宑heckpoint interval 涓?1 min 鐨勯厤缃笅锛堟瘡鍒嗛挓鏇存柊鍙锛?6 min 鍐呭啓鍏?59.9 million 鍏ㄩ噺鏁版嵁锛屾瘡 10 min 鐨勫啓鍏ユ€ц兘濡備笅琛ㄦ墍绀猴紝骞冲潎鍐欏叆鎬ц兘鍦?1.3 million/min銆?/p>
璇︾粏閰嶇疆濡備笅鎵€绀?/p>
璇︾粏姝ラ鍙煡鐪?Apache Flink Table Store 鍏ㄥ閲忎竴浣?CDC 瀹炴椂鍏ユ箹銆?/p>
3. 鎬荤粨涓庡睍鏈?/h2>
鏈枃绠€瑕佸洖椤句簡鏁版嵁鍏ユ箹锛堜粨锛夌殑鍙戝睍闃舵锛岄拡瀵瑰湪鏁版嵁搴撴暟鎹叆婀栦腑闈复鐨勯棶棰橈紝鎴戜滑鎻愬嚭浜嗕娇鐢?Flink Table Store 浣滀负鍏ㄥ閲忎竴浣撳叆婀栫殑瑙e喅鏂规锛屽苟杈呬互寮€婧?Demo 鐨勬祴璇曠粨鏋滀綔涓哄睍绀恒€?/p>
鎴戜滑鏈熷緟鐢ㄦ埛鐨勪竴绾垮弽棣堝強鍚岃娣卞叆浜ゆ祦锛屾杩庡ぇ瀹舵壂鐮佸姞鍏ラ拤閽夌兢涓€璧锋帰绱€?/p>
4. Reference
[1] 鍩轰簬 Hudi 鐨勬箹浠撲竴浣撴妧鏈湪 Shopee 鐨勫疄璺?/p>
[2] Change Data Capture with Debezium and Apache Hudi
[3] Apache Hudi DeltaStreamer#Rate Limit
[4] https://github.com/apache/flink-table-store
[5] Apache Hudi Technical Specification#Writer Expectations
[6] Apache Hudi Write Operations#Writing path
[7] Apache Hudi File Sizing#Auto-Size During ingestion
[8] On Disk IO, Part 3: LSM Trees
[9] Apache Flink Table Store 0.2.0 鍙戝竷