Iceberg从入门到精通系列之二十四:Spark Structured Streaming

慈云数据 8个月前 (03-13) 技术支持 60 0

Iceberg从入门到精通系列之二十四:Spark Structured Streaming

  • 一、Streaming Reads
  • 二、Streaming Writes
  • 三、Partitioned table
  • 四、流表的维护

    Iceberg 使用 Apache Spark 的 DataSourceV2 API 来实现数据源和目录。 Spark DSv2 是一个不断发展的 API,在 Spark 版本中提供不同级别的支持。

    Iceberg从入门到精通系列之二十四:Spark Structured Streaming
    (图片来源网络,侵删)

    一、Streaming Reads

    Iceberg 支持处理从历史时间戳开始的 Spark 结构化流作业中的增量数据:

    val df = spark.readStream
        .format("iceberg")
        .option("stream-from-timestamp", Long.toString(streamStartTimestamp))
        .load("database.table_name")
    

    Iceberg 仅支持从追加快照中读取数据。覆盖快照无法处理,默认会引发异常。通过设置streaming-skip-overwrite-snapshots=true 可以忽略覆盖。同样,删除快照默认会引发异常,通过设置streaming-skip-delete-snapshots=true可以忽略删除。

    Iceberg从入门到精通系列之二十四:Spark Structured Streaming
    (图片来源网络,侵删)

    二、Streaming Writes

    要将流式查询中的值写入 Iceberg 表,请使用 DataStreamWriter

    data.writeStream
        .format("iceberg")
        .outputMode("append")
        .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
        .option("checkpointLocation", checkpointPath)
        .toTable("database.table_name")
    

    如果您使用的是 Spark 3.0 或更早版本,则需要使用 .option(“path”, “database.table_name”).start(),而不是 .toTable(“database.table_name”)。

    对于基于目录的 Hadoop 目录:

    data.writeStream
        .format("iceberg")
        .outputMode("append")
        .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
        .option("path", "hdfs://nn:8020/path/to/table") 
        .option("checkpointLocation", checkpointPath)
        .start()
    

    Iceberg 支持追加和完整输出模式:

    • append:将每个微批次的行追加到表中
    • complete:替换每个微批次的表内容

      在开始流式查询之前,请确保您创建了表。请参阅 SQL 创建表文档以了解如何创建 Iceberg 表。

      Iceberg 不支持实验性连续处理,因为它不提供“提交”输出的接口。

      三、Partitioned table

      Iceberg 需要在写入数据之前按每个任务的分区对数据进行排序。在 Spark 中,任务按 Spark 分区进行分割。针对分区表。对于批量查询,建议您进行显式排序来满足要求(请参阅此处),但该方法会带来额外的延迟,因为重新分区和排序被视为流工作负载的繁重操作。为了避免额外的延迟,您可以启用扇出编写器来消除这一要求。

      data.writeStream
          .format("iceberg")
          .outputMode("append")
          .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
          .option("fanout-enabled", "true")
          .option("checkpointLocation", checkpointPath)
          .toTable("database.table_name")
      

      扇出写入器打开每个分区值的文件,并且在写入任务完成之前不会关闭这些文件。避免使用扇出写入器进行批量写入,因为对输出行进行显式排序对于批量工作负载来说成本较低。

      四、流表的维护

      流式写入可以快速创建新的表版本,创建大量表元数据来跟踪这些版本。强烈建议通过调整提交率、使旧快照过期以及自动清理元数据文件来维护元数据。

      调整提交率

      高提交率会产生数据文件、清单和快照,从而导致额外的维护。建议触发间隔至少为 1 分钟,并根据需要增加间隔。

      使旧快照过期

      写入表的每个批次都会生成一个新快照。 Iceberg 跟踪表元数据中的快照,直到它们过期。快照会随着频繁提交而快速积累,因此强烈建议定期维护流式查询写入的表。快照过期是删除元数据和任何不再需要的数据文件的过程。默认情况下,该过程将使超过五天的快照过期。

      压缩数据文件

      从流处理写入的数据量通常很小,这可能会导致表元数据跟踪大量小文件。将小文件压缩大文件可以减少表所需的元数据,并提高查询效率。 Iceberg 和 Spark 附带了 rewrite_data_files 过程。

      重写清单

      为了优化流工作负载的写入延迟,Iceberg 可以使用不会自动压缩清单的“快速”附加写入新快照。这可能会导致大量小的清单文件。 Iceberg可以重写清单文件的数量来提高查询性能。 Iceberg 和 Spark 附带了 rewrite_manifests 过程。

微信扫一扫加客服

微信扫一扫加客服

点击启动AI问答
Draggable Icon