Features
Support for Iceberg features depends upon the execution engine you choose.
Supported catalog backends
dagster-iceberg supports most catalog backends that are available through pyiceberg. See overview and configuration options here.
Configuration
dagster-iceberg supports setting configuration values using a .pyiceberg.yaml configuration file and environment variables. For more information, see the PyIceberg documentation.
You may also pass your catalog configuration through use of the IcebergCatalogConfig object, e.g:
from dagster_iceberg.config import IcebergCatalogConfig
from dagster_iceberg.io_manager.arrow import PyArrowIcebergIOManager
io_manager = PyArrowIcebergIOManager(
name="test",
config=IcebergCatalogConfig(
properties={
"uri": "postgresql+psycopg2://test:test@localhost:5432/test",
"warehouse": "file:///path/to/warehouse",
}
),
namespace="dagster",
)
Spark
Spark configuration can be set directly on the io_manager.spark.SparkIcebergIOManager or in the spark-defaults.conf file. Properties set directly on the I/O manager take precedence over those set in the spark-defaults.conf file. To set properties directly, pass a dictionary of configurations to set in the spark_config argument of the I/O manager:
from dagster_iceberg.io_manager.spark import SparkIcebergIOManager
SPARK_CONFIG = {
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.catalog.postgres": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.postgres.type": "jdbc",
"spark.sql.catalog.postgres.uri": "jdbc:postgresql://postgres:5432/test",
"spark.sql.catalog.postgres.jdbc.user": "test",
"spark.sql.catalog.postgres.jdbc.password": "test",
"spark.sql.catalog.postgres.warehouse": "/home/iceberg/warehouse",
"spark.sql.defaultCatalog": "postgres",
"spark.eventLog.enabled": "true",
"spark.eventLog.dir": "/home/iceberg/spark-events",
"spark.history.fs.logDirectory": "/home/iceberg/spark-events",
"spark.sql.catalogImplementation": "in-memory",
"spark.sql.execution.arrow.pyspark.enabled": "true",
}
io_manager = SparkIcebergIOManager(
catalog_name="test",
namespace="dagster",
spark_config=SPARK_CONFIG,
remote_url="sc://localhost",
)
Implemented engines
The following engines are currently implemented.
PyIceberg Features
The table below shows which PyIceberg features are currently available.
| Feature | Supported | Link | Comment |
|---|---|---|---|
| Add existing files | ❌ | https://py.iceberg.apache.org/api/#add-files | Useful for existing partitions that users don't want to re-materialize/re-compute. |
| Schema evolution | ✅ | https://py.iceberg.apache.org/api/#schema-evolution | More complicated than e.g. delta lake since updates require diffing input table with existing Iceberg table. This is implemented by checking the schema of incoming data, dropping any columns that no longer exist in the data schema, and then using the union_by_name() method to merge the current schema with the table schema. Current implementation has a chance of creating a race condition when e.g. partition A tries to write to a table that has not yet processed a schema update. Should be covered by retrying when writing. |
| Sort order | ❌ | https://shorturl.at/TycZN | Currently limited support in PyIceberg. Sort ordering is supported when creating a table from an Iceberg schema (one must pass the source_id which can be inferred from a PyArrow schema but this is shaky). However, we cannot simply update a sort ordering like a partition or schema spec. |
| PyIceberg commit retries | ✅ | https://github.com/apache/iceberg-python/pull/330 https://github.com/apache/iceberg-python/issues/269 | PR to add this to PyIceberg is open. Will probably be merged for an upcoming release. Added a custom retry function using Tenacity for the time being. |
| Partition evolution | ✅ | https://py.iceberg.apache.org/api/#partition-evolution | Create, Update, Delete partitions by updating the Dagster partitions definition. |
| Table properties | ✅ | https://py.iceberg.apache.org/api/#table-properties | Added as metadata on an asset. NB: config options are not checked explicitly because users can add any key-value pair to a table. Available properties here. |
| Snapshot properties | ✅ | https://py.iceberg.apache.org/api/#snapshot-properties | Useful for correlating Dagster runs to snapshots by adding tags to snapshot. Not configurable by end-user. |
| Upsert | ✅ | https://py.iceberg.apache.org/api/#upsert | Update existing rows and insert new rows in a single operation. Configure via write_mode: "upsert" and upsert_options in asset metadata. |