当前位置: 首页>数据库>正文

基于 Apache Flink Table Store 的全增量一体实时入湖

浣滆€咃綔闄堝┃鏁?br> 鏈枃绠€瑕佸洖椤句簡鏁版嵁鍏ユ箹锛堜粨锛夌殑鍙戝睍闃舵锛岄拡瀵瑰湪鏁版嵁搴撴暟鎹叆婀栦腑闈复鐨勯棶棰橈紝鎻愬嚭浜嗕娇鐢?Flink Table Store 浣滀负鍏ㄥ閲忎竴浣撳叆婀栫殑瑙e喅鏂规锛屽苟杈呬互寮€婧?Demo 鐨勬祴璇曠粨鏋滀綔涓哄睍绀恒€傛枃绔犱富瑕佸唴瀹瑰寘鎷細

  1. 鏁版嵁搴撴暟鎹泦鎴愬叆婀栵紙浠擄級鐨勫彂灞曢樁娈靛強闈复鐥涚偣

  2. 鍩轰簬 Apache Flink Table Store 瑙e喅鍏ㄥ閲忎竴浣撳叆婀?/p>

  3. 鎬荤粨涓庡睍鏈?/p>

基于 Apache Flink Table Store 的全增量一体实时入湖,第1张
600x383.jpeg

1. 鏁版嵁鍏ユ箹锛堜粨锛夊彂灞曢樁娈靛強闈复鐥涚偣

鍩轰簬鏁版嵁搴撶殑鏁版嵁闆嗘垚杩囩▼锛岀畝瑕佹潵璇寸粡鍘嗕簡濡備笅鍑犱釜闃舵銆?/p>

1.1 鍏ㄩ噺 + 瀹氭湡澧為噺鐨勬暟鎹叆浠?/h3>
基于 Apache Flink Table Store 的全增量一体实时入湖,第2张
1

Fig.1 鏁版嵁鍏ヤ粨: 涓€娆″叏閲?鍛ㄦ湡澧為噺

鍏ㄩ噺鏁版嵁閫氳繃 bulk load 涓€娆℃€у鍏ワ紝瀹氭椂璋冨害澧為噺鍚屾浠诲姟浠庢暟鎹簱鍚屾澧為噺鍒颁复鏃惰〃锛屽啀涓庡叏閲忔暟鎹繘琛屽悎骞躲€傝繖绉嶆柟寮忚櫧鐒惰兘婊¤冻涓€瀹氱殑涓氬姟闇€姹傦紝浣嗘槸涔熷瓨鍦ㄤ互涓嬮棶棰樸€?/p>

:::
鈿狅笍 閾捐矾澶嶆潅锛屾椂鏁堟€у樊
:::

鍏ㄩ噺涓庡閲忛渶瑕佸畾鏈熺殑鍚堝苟浠ヨ幏鍙栨渶鏂扮殑鏁版嵁蹇収锛岀敱浜庝笉鏀寔璁板綍绾у埆鐨勬洿鏂帮紝鐢ㄦ埛闇€瑕侀澶栫殑 SQL 浠诲姟鍘昏绠楀幓閲嶏紱鏁版嵁鏂伴矞搴︿緷璧栦簬璋冨害锛岃嫢鏁版嵁瀛樺湪鏅氬埌锛屽垯杩橀渶瑕佸鐞嗘暟鎹紓绉绘儏鍐碉紝涓€绉嶅父瑙佸鐞嗘柟寮忔槸鍦?T +N 鏃跺埢锛坧rocessing time锛夎皟搴︿骇鐢?T 锛坋vent time锛夌殑鍚堝苟浠诲姟锛屽悓鏃跺彇 T ~ T + N 涓垎鍖猴紙processing time锛夛紝鍐嶄粠涓繃婊ゅ嚭涓氬姟鏃堕棿灏忎簬绛変簬 T 锛坋vent time锛夌殑鏁版嵁杩涜鍚堝苟锛岃繖浼氬鑷存暟鎹柊椴滃害杩涗竴姝ラ檷浣庛€?/p>

:::
鈿狅笍 鏄庣粏鏌ヨ鎱紝鎺掓煡闂闅?br> :::

铏界劧涓嬫父浼氫娇鐢ㄥ悇绉嶄氦浜掑紡鍒嗘瀽寮曟搸鏉ュ姞閫熸煡璇紝浣嗗熀浜庢垚鏈€冭檻锛屽簳琛ㄦ槑缁嗘暟鎹竴鑸病鏈夎繖绉嶅緟閬囷紝杩欏氨瀵艰嚧鍦ㄦ暟鎹纭€ф帓鏌ユ椂闇€瑕佺洿鎺ユ煡璇㈡槑缁嗭紝鐗瑰埆鏄渶瑕佹煡璇㈠悎骞跺墠鍚庡叏閲忓拰澧為噺鐨勬槑缁嗗彉鍖栨潵瀹氫綅闂銆傚鏋滀笟鍔″彉鏇达紝瀵艰嚧涓€鎵硅鍗曟暟鎹渶瑕佽姝e苟瑕佹眰鐢熸垚璁㈡涔嬪悗鐨勫悇绫绘寚鏍囷紝鍒欓渶瑕佹墜宸ュ鍘熷琛ㄥ強鍏朵笅娓镐緷璧栬〃杩涜绾ц仈璁㈡銆?/p>

1.2 鍏ㄩ噺 + 瀹炴椂澧為噺鐨勬暟鎹叆婀?/h3>

鐩稿浜庝紶缁熸暟鎹粨搴擄紝鏁版嵁婀栫殑鍑虹幇浣垮緱鏁版嵁鍦ㄤ互浣庢垚鏈瓨鍌ㄧ殑鍚屾椂锛屾暟鎹柊椴滃害鏈変簡鏋佸ぇ鐨勬彁鍗囥€備互 Apache Hudi 涓轰緥锛屽畠鏀寔鍏堝仛涓€娆″叏閲?bootstrap 鏋勫缓鍩虹琛紝鐒跺悗鍩轰簬鏂版帴鍏ョ殑 CDC 鏁版嵁杩涜瀹炴椂鏋勫缓 鍩轰簬 Hudi 鐨勬箹浠撲竴浣撴妧鏈湪 Shopee 鐨勫疄璺?sup>[1]锛屽 Fig.2 鎵€绀恒€?/p>

基于 Apache Flink Table Store 的全增量一体实时入湖,第3张
2

Fig.2 鏁版嵁鍏ユ箹: 涓€娆″叏閲?瀹炴椂澧為噺

鐢变簬鏀寔璁板綍绾у埆鐨勬洿鏂板強鍒犻櫎锛屽湪瀛樺偍渚у氨鍙互瀹屾垚涓婚敭鐨勫幓閲嶏紝涓嶅啀闇€瑕侀澶栫殑鍚堝苟浠诲姟銆傚湪鏁版嵁鏂伴矞搴︽柟闈紝鐢变簬娴佸紡浣滀笟浼氬畾鏈熺殑瑙﹀彂 checkpoint 鏉ヤ骇鐢熷叏閲忎笌澧為噺鍚堝苟鍚庣殑蹇収锛屾晠鑰屾暟鎹柊椴滃害瀵规瘮绗竴绉嶆柟寮忥紙浠ュぉ鎴栧皬鏃惰皟搴︿骇鐢熷悎骞跺揩鐓э級鏈変簡寰堝ぇ鎻愬崌銆?/p>

浣嗕粠鍙﹀涓€鏂归潰锛屾垜浠篃鍙戠幇杩欑鏂瑰紡鏈変互涓嬭繖浜涢棶棰樸€?/p>

:::
馃 Bootstrap Index 瓒呮椂鍙?state 鑶ㄨ儉
:::

浠ユ祦妯″紡鍚姩 Flink 澧為噺鍚屾浣滀笟鍚庯紝绯荤粺浼氬厛灏嗗叏閲忚〃瀵煎叆鍒?Flink state 鏉ユ瀯寤?Hoodie key锛堝嵆涓婚敭 + 鍒嗗尯璺緞锛夊埌鍐欏叆鏂囦欢鐨?file group 鐨勭储寮曪紝姝よ繃绋嬩細闃诲 checkpoint 瀹屾垚銆傝€屽彧鏈夊湪 checkpoint 鎴愬姛鍚庯紝鍐欏叆鐨勬暟鎹墠鍙互鍙樹负鍙鐘舵€侊紝鏁呰€屽綋鍏ㄩ噺鏁版嵁寰堝ぇ鏃讹紝鏈夊彲鑳戒細鍑虹幇 checkpoint 涓€鐩磋秴鏃剁殑鎯呭喌锛屽鑷翠笅娓歌涓嶅埌鏁版嵁銆傚彟澶栵紝鐢变簬绱㈠紩涓€鐩翠繚瀛樺湪 state 鍐咃紝鍦ㄥ閲忓悓姝ラ樁娈甸亣鍒颁簡 insert 绫诲瀷鐨勮褰曚篃浼氭洿鏂扮储寮曪紝闇€瑕佸悎鐞嗚瘎浼?state TTL锛岄厤缃お灏忓彲鑳戒細涓㈠け鏁版嵁锛岄厤缃繃澶у彲鑳藉鑷?state 鑶ㄨ儉銆?/p>

:::
馃 閾捐矾渚濈劧澶嶆潅锛岄毦浠ュ榻愬閲忕偣浣嶏紝鑷姩鍖栬繍缁存垚鏈珮
:::

鍏ㄩ噺 + 瀹炴椂澧為噺鐨勬柟寮忓苟娌℃湁绠€鍖栭摼璺殑澶嶆潅搴︼紝鍥犱负瀹冮澶栧紩鍏ヤ簡 Kafka 鐨勮繍缁达紝闇€瑕佹墜宸ュ榻愬閲忔秷璐圭殑鐐逛綅浠ラ槻姝㈡暟鎹涪澶?Change Data Capture with Debezium and Apache Hudi[2]銆傚湪鍚姩澧為噺 CDC 浣滀笟鍚庯紝鐢ㄦ埛闇€瑕佺瓑寰呭拰瑙傚療浣滀笟鐨勮繍琛岀姸鎬侊紝鍦ㄧ涓€娆?checkpoint 鎴愬姛鍚庯紝鏆傚仠浣滀笟锛坰top-with-savepoint锛変慨鏀归厤缃鐢?Bootstrap Index锛岀劧鍚庝粠 savepoint 閲嶅惎浣滀笟锛?restore-from-savepoint锛夈€傛暣涓繃绋嬫搷浣滃鏉傦紝瀹炵幇鑷姩鍖栬繍缁存垚鏈瘮杈冮珮銆?/p>

姝ゅ锛屾垜浠洖椤句簡涓€浜涗娇鐢?Hudi 鐨勮涓氬疄璺碉紝鍙戠幇鐢ㄦ埛闇€瑕佹牸澶栨敞鎰忓悇椤归厤缃潵瀹炵幇涓嶅悓闇€姹傦紝杩欏鏄撶敤鎬ф湁涓€瀹氱殑浼ゅ銆傛瘮濡?鍩轰簬 Hudi 鐨勬箹浠撲竴浣撴妧鏈湪 Shopee 鐨勫疄璺?sup>[1] 涓彁鍒扮殑闇€瑕佸湪骞冲彴灞傞潰鐩戞帶鐢ㄦ埛鐨勫缓琛ㄨ鍙ワ紝闃叉鍦ㄥぇ瑙勬ā鍐欏叆鍦烘櫙閰嶇疆涓?COW锛圕opy on Write锛?妯″紡锛涘叏澧為噺鍒囨崲鏃剁敤鎴峰繀椤绘牸澶栨敞鎰?Kafka 娑堣垂鐐逛綅鏉ヤ繚璇佹暟鎹噯纭€э紝鍙傛暟閰嶇疆鏋佸ぇ褰卞搷浜嗕綔涓氱殑鏁版嵁鍑嗙‘鎬у強鎬ц兘銆?/p>

2. 鍩轰簬 Apache Flink Table Store 鐨勫叏澧為噺涓€浣撳叆婀?/h2>

闅忕潃鍩轰簬鏃ュ織鐨?CDC 閫愭鍙栦唬鍩轰簬鏌ヨ鐨?CDC锛岀壒鍒槸 Flink SQL CDC 鍦?source 绔凡鏀寔鍏ㄥ閲忎竴浣撳悓姝ュ悗锛屽叏澧為噺涓€浣撳叆婀栵紙浣跨敤涓€涓祦浣滀笟瀹屾垚鍏ㄩ噺鍚屾銆佸苟鎸佺画鐩戝惉澧為噺 changelog锛変篃鎴愪负涓€涓柊鐨勬帰绱㈡柟鍚戙€傝繖绉嶆柟寮忛檷浣庝簡閾捐矾澶嶆潅搴︼紝鍚屾椂灏嗗叏澧為噺鍒囨崲鏃堕渶瑕佹墜宸ュ榻?offset 鐨勭箒鐞愭墭绠$粰浜?Flink CDC 鍜?checkpoint 鏈哄埗锛岃妗嗘灦灞傞潰鍘讳繚闅滄暟鎹殑鏈€缁堜竴鑷存€с€備絾缁忚繃璋冪爺鎴戜滑鍙戠幇锛屽湪浣跨敤 Hudi 鍋氳繖绉嶅皾璇曟椂閬囧埌浜嗕互涓嬫寫鎴樸€?/p>

:::
馃 鍏ㄩ噺鍚屾闃舵鏁版嵁涔卞簭涓ラ噸锛屽啓鍏ユ€ц兘鍜岀ǔ瀹氭€ч毦浠ヤ繚闅?br> :::

鍦ㄥ叏閲忓悓姝ラ樁娈甸潰涓寸殑涓€涓棶棰樻槸澶氬苟鍙戝悓鏃惰鍙?chunk 浼氶亣鍒颁弗閲嶇殑鏁版嵁涔卞簭锛屽嚭鐜板悓鏃跺啓澶氫釜鍒嗗尯鐨勬儏鍐碉紝澶ч噺鐨勯殢鏈哄啓鍏ヤ細瀵艰嚧鎬ц兘鍥為€€锛屽嚭鐜板悶鍚愭瘺鍒猴紝姣忎釜鍒嗗尯瀵瑰簲鐨?writer 閮借缁存姢鍚勮嚜缂撳瓨锛屽緢瀹规槗鍙戠敓 OOM 瀵艰嚧浣滀笟涓嶇ǔ瀹氥€傝櫧鐒?Hudi 鏀寔閫氳繃 Rate Limit Apache Hudi DeltaStreamer#Rate Limit[3] 闄愬埗姣忓垎閽熺殑鏁版嵁鍐欏叆鏉ヨ捣鍒颁竴瀹氱殑骞虫粦鏁堟灉锛屼絾鍦ㄤ綔涓氱ǔ瀹氭€у拰鎬ц兘鍚炲悙涔嬮棿鍙栧緱骞宠 鐨勮皟浼樿繃绋嬪浜庝竴鑸敤鎴锋潵璇撮棬妲涗篃杈冮珮銆?/p>

2.1 涓轰粈涔堥€夋嫨 Flink Table Store

Apache Table Store https://github.com/apache/flink-table-store[4] 浣滀负 2022 骞村垵寮€婧愮殑 Apache Flink 瀛愰」鐩紝鐩爣鏄墦閫犱竴涓敮鎸佹洿鏂扮殑鎹箹瀛樺偍锛岀敤浜庡疄鏃舵祦寮?Changelog 鎽勫彇鍜岄珮鎬ц兘鏌ヨ銆?/p>

:::
馃殌 澶у悶鍚愰噺鐨勬洿鏂版暟鎹憚鍙栵紝鏀寔鍏ㄥ閲忎竴浣撳叆婀栵紝涓€涓?Flink 浣滀笟鎼炲畾鎵€鏈?br> :::

基于 Apache Flink Table Store 的全增量一体实时入湖,第4张
3

Fig.3 Flink Table Store: 鍏ㄩ噺 + 澧為噺涓€浣撳寲鍚屾

鍥為【鍓嶆枃锛屾垜浠煡閬撳叏澧為噺涓€浣撳寲鍚屾鍏ユ箹鐨勪富瑕佹寫鎴樺湪浜庡叏閲忓悓姝ラ樁娈典骇鐢熶簡澶ч噺鏁版嵁涔卞簭寮曡捣鐨勯殢鏈哄啓鍏ワ紝瀵艰嚧鎬ц兘鍥為€€銆佸悶鍚愭瘺鍒哄強涓嶇ǔ瀹氥€傝€?Table Store 鐨勫瓨鍌ㄦ牸寮忛噰鐢ㄥ厛鍒嗗尯锛圥artition锛夊啀鍒嗘《锛圔ucket锛夛紝姣忎釜妗跺唴鍚勮嚜缁存姢涓€妫?LSM锛圠og-structured Merge Tree锛夌殑鏂瑰紡锛堝弬瑙?Fig.4銆丗ig.5锛夛紝姣忔潯璁板綍閫氳繃涓婚敭鍝堝笇钀藉叆妗跺唴鏃跺氨纭畾浜嗗啓鍏ヨ矾寰勶紙Directory锛夛紝浠?KV 鏂瑰紡鍐欏叆 MemTable 涓紙绫讳技浜?HashMap锛孠ey 灏辨槸涓婚敭锛孷alue 鏄褰曪級銆傚湪 flush 鍒扮鐩樼殑杩囩▼涓紝浠ヤ富閿帓搴忓悎骞讹紙鍘婚噸锛夛紝浠ヨ拷鍔犳柟寮忓啓鍏ョ鐩樸€係ort Merge 鍦?buffer 鍐呰繘琛岋紝閬垮厤浜嗛渶瑕佺偣鏌ョ储寮曟潵鍒ゆ柇涓€鏉¤褰曟槸 insert 杩樻槸 update 鏉ヨ幏鍙栧啓鍏ユ枃浠剁殑 file group 鐨?tagging Apache Hudi Technical Specification#Writer Expectations[5] 銆傚彟澶栵紝瑙﹀彂 MemTable flush 鍙戠敓鍦?buffer 鍏呮弧鏃讹紝涓嶉渶瑕侀澶栭€氳繃 Auto-File Sizing Apache Hudi Write Operations#Writing path[6]锛圓uto-File Sizing 浼氬奖鍝嶅啓鍏ラ€熷害 Apache Hudi File Sizing#Auto-Size During ingestion[7]锛夋潵閬垮厤灏忔枃浠朵骇鐢燂紝鏁翠釜鍐欏叆杩囩▼閮芥槸灞€閮ㄤ笖椤哄簭鐨?On Disk IO, Part 3: LSM Trees [8]锛岄伩鍏嶄簡闅忔満 IO 浜х敓銆?/p>

浣跨敤 Table Store 浣滀负婀栧瓨鍌ㄦ椂锛屽彧闇€瑕佷竴鏉?INSERT INTO 璇彞灏卞彲浠ュ畬鎴愬叏澧為噺鍚屾銆備互濡備笅 SQL 涓轰緥锛屽畠灞曠ず浜嗕娇鐢ㄤ竴涓?Flink 娴佷綔涓氬皢 MySQL 鏁版嵁搴撲腑鐨勮鍗曡〃閫氳繃 Streaming 鏂瑰紡鍐欏叆 Table Store 琛紝骞舵寔缁秷璐瑰閲忔暟鎹€?/p>

    -- 鍒涘缓骞朵娇鐢?table store catalog
    CREATE CATALOG `table_store` WITH (
        'type' = 'table-store',
        'warehouse' = 'hdfs://foo/bar'
    );
    
    USE CATALOG `table_store`;
    
    -- 瀹氫箟 mysql-cdc source 琛?
    CREATE TEMPORARY TABLE `orders_cdc` (
      order_id BIGINT NOT NULL,
      gmt_modified TIMESTAMP(3) NOT NULL,
      ...
      PRIMARY KEY (`order_id`) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql-cdc',
       ...
    );
    
    -- 瀹氫箟 table store ods 琛紝鎸夋棩鏈熶綔鍒嗗尯
    CREATE TABLE IF NOT EXISTS `orders` (
      ...
      PRIMARY KEY (`dt`, `order_id`) NOT ENFORCED
    ) PARTITIONED BY (`dt`);
    
    -- streaming 妯″紡涓嬫彁浜や綔涓?
    SET 'execution.runtime-mode' = 'streaming';
    
    -- 璁剧疆 1min 鐨?cp interval锛屽搴?1min 鐨勬暟鎹柊椴滃害
    SET 'execution.checkpointing.interval' = '1min';
    
    -- 涓€鏉?SQL 鍚屾鍏ㄩ噺 + 澧為噺锛屽姩鎬佸垎鍖哄啓鍏?
    INSERT INTO `orders`
    SELECT ..., DATE_FORMAT(gmt_modified, 'yyyy-MM-dd') AS dt
    FROM `orders_cdc`;

Fig.4 灞曠ず浜嗕互 dt 浣滀负鍒嗗尯 orders 琛ㄧ殑瀛樺偍缁撴瀯锛屽湪鐢ㄦ埛鎸囧畾鎬?bucket 鏁?N 鍚庯紝姣忎釜鍒嗗尯涓嬩細鐢熸垚鐩稿簲鐨?bucket-基于 Apache Flink Table Store 的全增量一体实时入湖,{n} 鐩綍锛屾瘡涓洰褰曚笅浠ュ垪瀛樻牸寮忥紙orc 鎴?parquet锛夊瓨鏀?hash_func(pk) % N ==,第5张{n} 鐨勮褰曟枃浠躲€?/p>

基于 Apache Flink Table Store 的全增量一体实时入湖,第6张
4

Fig.4 Flink Table Store 琛ㄧ殑鏂囦欢鐩綍

鍏冩暟鎹笌鏁版嵁瀛樺偍鍦ㄨ〃鐨勫悓涓€绾х洰褰曚笅锛屽寘鎷?manifest 鐩綍鍜?snapshot 鐩綍銆?/p>

  • manifest 鐩綍涓嬩腑璁板綍浜嗘瘡娆$粡 checkpoint 瑙﹀彂鑰屾彁浜ょ殑鏁版嵁鏂囦欢鍙樻洿锛屽寘鍚柊澧炲拰鍒犻櫎鐨勬暟鎹枃浠?/p>

  • snapshot 鐩綍涓嬭褰曚簡姣忔鎻愪氦浜х敓鐨?snapshot 鏂囦欢锛屽唴瀹瑰寘鎷负涓婁竴娆℃彁浜や骇鐢熺殑 manifest锛屽姞涓婃湰娆℃彁浜や骇鐢熺殑 manifest 浣滀负澧為噺

Fig.5 灞曠ず浜?Fig.4 涓瘡涓?bucket 鍐?LSM 瀹炵幇杩囩▼銆傛垜浠互 Flink 娴佷綔涓氫负渚嬶紝鍦ㄦ瘡娆?checkpoint 鏃讹紝Flink Table Store 浼氫骇鐢熶竴娆℃彁浜わ紙commit锛夛紝鍖呭惈浠ヤ笅淇℃伅

  • 鐢熸垚褰撳墠 table 鐨勪竴涓揩鐓э紙snapshot锛夈€傜郴缁熶細閫氳繃 snapshot pointer file锛堢被浼间簬鎸囬拡锛夎拷韪渶鏃╀骇鐢熷拰褰撳墠鏈€鏂扮殑 snapshot 鏂囦欢

  • snapshot 鏂囦欢涓寘鍚簡鏈 commit 鏂板浜嗗摢浜?manifest 鏂囦欢銆佸垹闄や簡鍝簺 manifest 鏂囦欢

  • 姣忎釜 manifest 鏂囦欢涓褰曚簡浜х敓浜嗗摢浜?sst 鏂囦欢銆佸垹闄や簡鍝簺 sst 鏂囦欢锛屼互鍙婃瘡涓?sst 鏂囦欢鎵€鍖呭惈璁板綍鐨勪富閿寖鍥淬€佹瘡涓瓧娈电殑 min/max/null count 绛夌粺璁′俊鎭?/p>

  • 姣忎釜 sst 鏂囦欢鍒欏寘鍚簡鎸変富閿帓濂藉簭鐨勩€佸垪瀛樻牸寮忕殑璁板綍銆傚浜?Level 0 鐨勬枃浠讹紝Table Store 浼氬紓姝ュ湴瑙﹀彂 compact 鍚堝苟绾跨▼鏉ユ秷闄や富閿寖鍥撮噸鍙犲甫鏉ョ殑璇荤 merge 寮€閿€銆?/p>

基于 Apache Flink Table Store 的全增量一体实时入湖,第7张
5

Fig.5 Flink Table Store 琛ㄧ殑 LSM 瀹炵幇

鍦?Flink Table Store 0.2.0 鍙戝竷鏃讹紝鎴戜滑娴嬭瘯浜嗕簲浜挎潯鏁版嵁鍦ㄤ竴浜夸富閿笅鐨勫疄鏃舵洿鏂板満鏅啓鍏ワ紙鍖呭惈鎻掑叆涓庢洿鏂帮級骞朵笌 Apache Hudi MOR 鍙?COW 杩涜瀵规瘮 Apache Flink Table Store 0.2.0 鍙戝竷[9], Table Store 鍦ㄥぇ瑙勬ā瀹炴椂鏇存柊鍐欏叆鍦烘櫙鎷ユ湁鑹ソ鐨勫啓鍏ユ€ц兘銆?/p>

:::
馃殌 楂樻晥 Data Skipping 鏀寔杩囨护锛屾彁渚涢珮鎬ц兘鐨勭偣鏌ュ拰鑼冨洿鏌ヨ
:::

铏界劧娌℃湁棰濆鐨勭储寮曪紝浣嗘槸寰楃泭浜?meta 鐨勭鐞嗗拰鍒楀瓨鏍煎紡锛宮anifest 涓繚瀛樹簡

  • 鏂囦欢鐨勪富閿殑 min/max 鍙婃瘡涓瓧娈电殑缁熻淇℃伅锛岃繖鍙互鍦ㄤ笉璁块棶鏂囦欢鐨勬儏鍐典笅锛岃繘琛屼竴浜?predicate 鐨勮繃婊?/p>

  • orc/parquet 鏍煎紡涓紝鏂囦欢鐨勫熬閮ㄨ褰曚簡绋€鐤忕储寮曪紝姣忎釜 chunk 鐨勭粺璁′俊鎭拰 offset锛岃繖鍙互閫氳繃鏂囦欢鐨勫熬閮ㄤ俊鎭紝杩涜涓€浜?predicate 鐨勮繃婊?/p>

鏁版嵁鍦ㄦ湁 filter 璇诲彇鏃讹紝鍙互鏍规嵁涓婅堪淇℃伅鍋氬涓嬭繃婊?/p>

  1. 璇诲彇 manifest锛氭牴鎹枃浠剁殑 min/max銆佸垎鍖猴紝鎵ц鍒嗗尯鍜屽瓧娈电殑 predicate锛屾窐姹板浣欑殑鏂囦欢

  2. 璇诲彇鏂囦欢 footer锛氭牴鎹?chunk 鐨?min/max锛岃繃婊や笉闇€瑕佽鍙栫殑 chunk

  3. 璇诲彇鍓╀笅涓庢枃浠朵互鍙婂叾涓殑 chunks

浠ヤ笂杩拌鍗曡〃 orders 涓轰緥锛屽綋鐢ㄦ埛鎯宠鏌ヨ dt = 2022-01-01 鍒嗗尯涓嬫墍鏈?order_id 鍦?100 ~ 200 涔嬮棿鐨勮鍗曟椂

    SELECT * FROM orders WHERE dt = '2022-01-01' AND order_id >= 100 AND order_id <= 200;

Flink Table Store 浼氬厛鏍规嵁 LATEST-SNAPSHOT 鏂囦欢璇诲埌鏈€杩戜竴娆℃彁浜ょ殑 snapshot 鏂囦欢锛坮ead committed锛夛紝鐒跺悗浠?snapshot 涓鍙栧埌瀵瑰簲 manifest meta 鏂囦欢锛?鏍规嵁鍒嗗尯鏉′欢 dt='2022-01-01'锛岃繃婊ゅ嚭鍖呭惈杩欎簺鍒嗗尯鐨勭粺璁′俊鎭紝鐢变簬缁熻淇℃伅閲屽寘鍚簡姣忎釜 sst 鏂囦欢 key 鐨勮寖鍥达紝鎵€浠ョ户缁墽琛?order_id 鍦?[100, 200] 鍖洪棿杩欎釜杩囨护鏉′欢锛屽氨鑳藉湪 2022-01-01 杩欎釜鐩綍涓嬪彧璇诲彇瀵瑰簲鐨?sst 鏂囦欢銆?/p>

鎴戜滑鍚屾牱鍩轰簬涓婅堪鏁版嵁闆嗘祴璇曚簡 Flink Table Store 鐨勬煡璇㈡€ц兘 Apache Flink Table Store 0.2.0 鍙戝竷[9]锛屽湪鐐规煡鍜岃寖鍥存煡璇㈢殑鍦烘櫙涓嬶紝Flink Table Store 琛ㄧ幇鍑轰紬銆備粠瀹炵幇鍘熺悊鏉ヨ锛孧OR 鐨勬煡璇㈡€ц兘浣庝簬 COW銆丆OW 鐨勫啓鍏ユ€ц兘浣庝簬 MOR 鏄毦浠ラ伩鍏嶇殑銆傝€屽湪瀹炶返灞傞潰锛屽湪澶ц妯″啓鍏ュ満鏅笅寤虹珛鐨?MOR 琛ㄤ篃寰堥毦涓€閿浆鎹负 COW 鏉ヨ鍙栵紝鎵€浠ュ湪鏌ヨ鍐欏叆杈冨鐨勮〃锛圡OR 琛級杩欎釜鍓嶆彁涓嬶紝Flink Table Store 鐨勬煡璇㈣〃鐜拌繕鏄笉淇楃殑銆?/p>

:::
馃殌 鏂囦欢鏍煎紡鏀寔娴佽
:::

Flink Table Store 瀹炵幇浜?Incremental Scan锛屽湪娴佹ā寮忎笅锛屽彲浠ユ寔缁洃鍚枃浠舵洿鏂帮紝鏁版嵁鏂伴矞搴︿繚鎸佸湪鍒嗛挓绾у埆锛屽涓嬫墍绀恒€?/p>

    -- 杩涘叆 SQL CLI锛屽垱寤?catalog 鍜?table
    CREATE CATALOG table_store WITH (
      'type' = 'table-store',
      'warehouse' = 'file://foo/bar/' --鎴?'hdfs://foo/bar'
    );
    
    CREATE TABLE IF NOT EXIST my_table (
      f0 INT,
      f1 STRING,
      PRIMARY KEY(f0) NOT ENFORCED
    );
    
    -- 鍒囨崲鍒?batch 妯″紡锛屽啓鍏ユ暟鎹?
    SET 'execution.runtime-mode' = 'batch';
    INSERT INTO my_table VALUES(1, 'Hello');
    
    -- 鏂版墦寮€涓€涓?SQL CLI 涓紝鍒囨崲鍒?streaming 妯″紡锛屾彁浜ゆ祦寮忔煡璇?
    SET 'execution.runtime-mode' = 'streaming';
    SET 'sql-client.execution.result-mode' = 'tableau';
    SELECT * FROM my_table; 
    -- 鍙互璇诲埌缁撴灉濡備笅
    +----+-------------+--------------------------------+
    | op |          f0 |                             f1 |
    +----+-------------+--------------------------------+
    | +I |           1 |                          Hello |
    
    -- 鍦ㄧ涓€涓?SQL CLI 涓紝缁х画鍐欏叆鏁版嵁
    INSERT INTO my_table VALUES(1, 'Bye'), (2, '浣犲ソ');
    
    -- 鍙互鍦ㄧ浜屼釜 SQL CLI 涓紝瑙傚療鍒版柊澧炶緭鍑?(-U, 1, Hello)锛?+U, 1, Bye)  鍜?(+I, 2, 浣犲ソ) 
    +----+-------------+--------------------------------+
    | op |          f0 |                             f1 |
    +----+-------------+--------------------------------+
    | +I |           1 |                          Hello |
    | -U |           1 |                          Hello |
    | +U |           1 |                            Bye |
    | +I |           2 |                           浣犲ソ |

2.2 鍩轰簬 TPC-H 鏁版嵁闆嗙殑鍏ㄥ閲忎竴浣撳叆婀?Demo

鍓嶆枃瀵?Flink Table Store 瑙e喅鍏ㄥ閲忎竴浣撳叆婀栬繘琛屼簡绠€瑕佸垎鏋愶紝涓嬮潰涓€涓疄渚嬫紨绀轰簡濡備綍鍦ㄦ湰鍦板崟鏈虹幆澧冧笅锛屽皢杩戝叚鍗冧竾鏉¤鍗曡褰曚綔涓哄叏閲忔暟鎹粠 MySQL 鍚屾鍒?Flink Table Store锛屽苟鎸佺画娑堣垂澧為噺鏇存柊锛堢敱 TPC-H RF1 鍜?RF2 浜х敓锛夛紝涓嬫父鎺ュ疄鏃惰仛鍚堝強鏌ヨ鐨勮繃绋嬨€?/p>

基于 Apache Flink Table Store 的全增量一体实时入湖,第8张
6

鏁版嵁婧愮敱 TPC-H 鑷姩鐢熸垚骞跺鍏?MySQL 锛岃繍琛屽湪 Docker 瀹瑰櫒鍐咃紝鏈湴鍙渶瑕佷笅杞?Flink release 鍖呭拰 Flink Table Store 渚濊禆鍗冲彲瀹屾垚銆?/p>

Demo 浣跨敤 lineitem 琛ㄤ腑鍙戣揣鏃ユ湡 l_shipdate 浣滀负涓氬姟瀛楁瀹氫箟浜嗕簩绾у垎鍖?l_year 鍜?l_month锛屾椂闂磋法搴︿粠 1992.1 ~ 1998.12锛屽嵆鍔ㄦ€佸啓鍏?84 涓垎鍖恒€?缁忔祴璇曪紝鍦ㄦ湰鍦板崟鏈哄苟鍙戜负 2锛宑heckpoint interval 涓?1 min 鐨勯厤缃笅锛堟瘡鍒嗛挓鏇存柊鍙锛?6 min 鍐呭啓鍏?59.9 million 鍏ㄩ噺鏁版嵁锛屾瘡 10 min 鐨勫啓鍏ユ€ц兘濡備笅琛ㄦ墍绀猴紝骞冲潎鍐欏叆鎬ц兘鍦?1.3 million/min銆?/p>

基于 Apache Flink Table Store 的全增量一体实时入湖,第9张
琛ㄦ牸1

璇︾粏閰嶇疆濡備笅鎵€绀?/p>

基于 Apache Flink Table Store 的全增量一体实时入湖,第10张
琛ㄦ牸2

璇︾粏姝ラ鍙煡鐪?Apache Flink Table Store 鍏ㄥ閲忎竴浣?CDC 瀹炴椂鍏ユ箹銆?/p>

3. 鎬荤粨涓庡睍鏈?/h2>

鏈枃绠€瑕佸洖椤句簡鏁版嵁鍏ユ箹锛堜粨锛夌殑鍙戝睍闃舵锛岄拡瀵瑰湪鏁版嵁搴撴暟鎹叆婀栦腑闈复鐨勯棶棰橈紝鎴戜滑鎻愬嚭浜嗕娇鐢?Flink Table Store 浣滀负鍏ㄥ閲忎竴浣撳叆婀栫殑瑙e喅鏂规锛屽苟杈呬互寮€婧?Demo 鐨勬祴璇曠粨鏋滀綔涓哄睍绀恒€?/p>

鎴戜滑鏈熷緟鐢ㄦ埛鐨勪竴绾垮弽棣堝強鍚岃娣卞叆浜ゆ祦锛屾杩庡ぇ瀹舵壂鐮佸姞鍏ラ拤閽夌兢涓€璧锋帰绱€?/p>

基于 Apache Flink Table Store 的全增量一体实时入湖,第11张
浜岀淮鐮?/div>

4. Reference

[1] 鍩轰簬 Hudi 鐨勬箹浠撲竴浣撴妧鏈湪 Shopee 鐨勫疄璺?/p>

[2] Change Data Capture with Debezium and Apache Hudi

[3] Apache Hudi DeltaStreamer#Rate Limit

[4] https://github.com/apache/flink-table-store

[5] Apache Hudi Technical Specification#Writer Expectations

[6] Apache Hudi Write Operations#Writing path

[7] Apache Hudi File Sizing#Auto-Size During ingestion

[8] On Disk IO, Part 3: LSM Trees

[9] Apache Flink Table Store 0.2.0 鍙戝竷


https://www.xamrdz.com/database/6ac1994565.html

相关文章: