Iceberg 存储结构

2024/01/27 LakeHouse 共 2488 字,约 8 分钟

Iceberg 是数据湖三剑客之一,以下简要分析 Iceberg 的元数据、存储结构及读写流程

背景

与 hudi 类似,面向海量数据分析场景的高效存储格式。支持 Full schema evolution,time travel 和事务性管理。以下简要分析 Iceberg 的元数据、存储结构和读写流程

元数据

  • metadata: 提交级别的元数据,包含 snapshots, schema, statics
  • snapshot: 某个时刻的 snapshot,每次提交都会产生新的 snapshot,包含一组 manifest
    • Sequence Nubmer:每次提交都会产生,相当于 version
    • manifest: 每次提交的一组文件
      • data file: parquet
      • delete file
        • Position deletes: 文件名, row offset
        • Equality deletes: 匹配条件

读写流程

以下 SQL 为例,iceberg 两种表格式 COW 和 MOR 的读写流程

CREATE TABLE testhive.default.table (
  id INT, 
  dep STRING
) USING iceberg
TBLPROPERTIES (
  'write.format.default' = 'parquet',
  'write.delete.mode' = 'copy-on-write',
  'write.update.mode' = 'copy-on-write',
  'write.delete.mode' = 'merge-on-read',
  'write.update.mode' = 'merge-on-read',
  'format-version' = '2'
);

INSERT INTO TABLE testhive.default.table VALUES (1, 'software'), (2, 'hr'), (3, 'hehe');

UPDATE testhive.default.table SET id = -1 WHERE dep = 'hr';

DELETE FROM testhive.default.table WHERE id = 3;

Copy On Write

  1. 第一次写入生成 snapshot0,新增了两个 parquet 文件
  2. 第二次写入生成 snapshot1,全量读 snapshot1 根据 update filter 过滤后,重写 data0-1 为 data1-0
  3. 第三次写入生成 snapshot2,全量读 snapshot2 根据 delete filter 过滤后,重写 data1-0 为 data2-0
  4. 查询时读取 snapshot2 的 metafestList(manifest1-m0, manifest2-m0, manifest2-m1),读取 data0-0 和 data2-0 两个 parquet 文件

Merge On Read

  1. 第一次写入生成 snapshot0,新增了两个 parquet 文件
  2. 第二次写入生成 snapshot1,全量读 snapshot1 根据 update filter 过滤后,新增 pos-delete1-1 删除更新前的数据,新增 data1-0 保存更新后的数据
  3. 第三次写入生成 snapshot2,全量读 snapshot2 根据 delete filter 过滤后,新增 pos-delete2-0 删除数据
  4. 查询时读取 snapshot2 的 metafestList(manifest0-m0, manifest1-m0, manifest1-m1, manifest2-mo),读取 data0-0 和 data1-0 两个 parquet 文件,合并 data0-1, pos-delete1-1, pos-delete2-0 三个 parquet 文件。合并时如果数据较少,在内存中构建 bitmap 根据位置进行过滤;数据较大,则使用 sortMerge 方式

Delete file

写入流程

Spark 的 MOR 表会写入 pos-delete,Flink 会缓存一个 batch 的数据,根据缓存判断写 pos-delete 还是 eq-delete

  1. 判断 rowkind 为 insert/update-after
    1. 放入 insertRowMap Map<data, pos>
    2. 写 dataFile
  2. 判断 rowkind 为 delete/update-before
    1. insertRowMap 中存在,写 pos-delete
    2. insertRowMap 中不存在,写 eq-delete

读取流程

Position deletes 文件会进行以下合并

  1. 合并时如果数据较少,在内存中构建 bitmap 根据位置进行过滤
  2. 数据较大,则使用 sortMerge 方式

Equality deletes 文件会进行以下合并

  1. 找到所有 seq-num 小于 deleteFile 的 dataFile
  2. 进行 anti-join

分区

DDL 中定义了普通列和分区列的映射关系

CREATE TABLE testhive.default.table (
    id bigint,
    date string)
USING iceberg
PARTITIONED BY (date, bucket(2, id));

INSERT INTO TABLE testhive.default.table VALUES 
  (0, '2022-01-01'), 
  (1, '2022-01-01'), 
  (2, '2022-01-01'), 
  (3, '2022-01-01'), 
  (0, '2022-01-02'), 
  (1, '2022-01-02');

实际存储上产生了类似 parition by date + clustered by (id) into 2 buckets 的效果

├── date=2022-01-01
│   ├── id_bucket=0
│   │   └── 00000-0-1f12309b-9c48-4fcf-916a-09094cbe52c0-00001.parquet
│   └── id_bucket=1
│       └── 00001-1-1df1c41b-1f28-4512-889f-9ed6b9a7b4b0-00001.parquet
└── date=2022-01-02
    └── id_bucket=0
        └── 00001-1-1df1c41b-1f28-4512-889f-9ed6b9a7b4b0-00002.parquet

文档信息

Search

    Table of Contents