20 min read

Apache Iceberg : Compaction, Time Travel et Optimisation des Performances à l'Échelle

Apache Iceberg : Compaction, Time Travel et Optimisation des Performances à l'Échelle

En tant que senior data engineer ayant travaillé sur plusieurs lakehouses en production, je peux affirmer qu'Apache Iceberg représente une révolution dans la gestion des données analytiques. Certaine de ses fonctionnalités les plus puissantes, sont souvent sous-exploitée.

Prérequis

Avant de commencer ce lab sur Apache Iceberg et la gestion avancée des snapshots, assurez-vous de disposer des éléments suivants :

Environnement

  • Apache Spark (version 3.x recommandée) avec support Iceberg
  • Java 8, 11 ou 17
  • Python 3.8+, PySpark 3.5+
  • Hive Metastore ou un catalogue compatible Iceberg
  • Trino

Stockage

  • Accès à un S3, MinIO ou tout autre stockage compatible pour héberger les données
  • Permissions pour créer, lire et écrire des tables Iceberg

Outils supplémentaires

  • IDE ou notebook (ex : Jupyter, vscode, Zeppelin) pour exécuter les scripts

Connaissances préalables

  • Bases de SQL et de Spark SQL
  • Concepts de data lake et lakehouse
  • Notions de partitionnement, snapshots et time travel
💡 Astuce : Préparer un environnement isolé est fortement recommandé pour tester les rollbacks et l’expiration des snapshots sans impacter les données de production.

Pourquoi les Snapshots sont Critiques ?
Dans les architectures de données modernes, nous faisons face à plusieurs défis :

- Auditabilité : Besoin de tracer l'évolution des données dans le temps
- Conformité réglementaire : RGPD, SOX, HIPAA exigent la traçabilité
- Correction d'erreurs : Rollback rapide en cas de mauvaise insertion
- Reproductibilité : Garantir des résultats identiques pour les analyses
- A/B Testing : Comparer différentes versions de données

Iceberg résout ces problèmes de manière élégante grâce à son architecture de métadonnées.

  1. Architecture des Snapshots Iceberg

Anatomie d'un Snapshot
Un snapshot dans Iceberg est un état immutable et complet de la table à un instant T. Contrairement aux systèmes traditionnels, Iceberg ne copie pas les données mais maintient des pointeurs vers les fichiers de données.

Snapshot (version 1)
├── Manifest List (pointeur vers manifests)
├── Manifest Files (liste des fichiers de données)
│ ├── data_file_1.parquet
│ ├── data_file_2.parquet
│ └── data_file_3.parquet
└── Metadata (timestamp, operation, schema)

Points clés à comprendre :
Immutabilité :
Une fois créé, un snapshot ne change jamais
Copy-on-Write vs Merge-on-Read : Iceberg supporte les deux stratégies
Métadonnées légères : Seules les métadonnées sont versionnées, pas les donnée
Isolation ACID : Chaque snapshot garantit une vue consistante

📊 Différences entre Hive/Parquet, Delta Lake et Iceberg

Caractéristique Hive/Parquet Delta Lake Iceberg
Versioning ❌ Aucun ✅ Transaction log ✅ Snapshots natifs
Time Travel ❌ Non supporté ✅ Limité ✅ Complet et performant
Overhead (Coût de gestion) Moyen Minimal
Hidden Partitioning ❌ Non ❌ Non ✅ Oui
Schema Evolution ❌ Complexe ✅ Supporté ✅ Avancé

2. Mise en Place de l'Environnement
   Configuration Spark + Iceberg

import socket
import os
import time
import random
from datetime import datetime, timedelta
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
print(f"PySpark version: {pyspark.__version__}")

Output: PySpark version: 3.5.0

METASTORE = "thrift://164.xxxx.xxx.xx:9083"
MINIO = "https://xxxx.xxxxxxx.dev"
MINIO_KEY = "xxxx"
MINIO_SECRET = "xxxxx"

💡 Note

Apache Iceberg évolue rapidement, et chaque version apporte des changements dans la compatibilité avec les moteurs de calcul comme Spark, Flink ou Trino.
Avant toute mise à jour, il est recommandé de vérifier la correspondance exacte des versions dans la documentation officielle Iceberg.

jars = [
    "iceberg-spark-runtime-3.5_2.12-1.5.0.jar",
    "hadoop-aws-3.3.4.jar",
    "aws-java-sdk-bundle-1.12.262.jar"
]
jars_dir = os.path.abspath("./jars")
jars_paths = [os.path.join(jars_dir, jar) for jar in jars]

print(f"📦 Chargement des JARs depuis: {jars_dir}")
for jar in jars_paths:
    if os.path.exists(jar):
        print(f"  ✓ {os.path.basename(jar)}")
    else:
        print(f"  ✗ MANQUANT: {jar}")
📦 Chargement des JARs depuis: /home/coder/workspace/jars
  ✓ iceberg-spark-runtime-3.5_2.12-1.5.0.jar
  ✓ hadoop-aws-3.3.4.jar
  ✓ aws-java-sdk-bundle-1.12.262.jar
print("="*60)
print("🎯 TEST LAKEHOUSE PRODUCTION")
print("="*60)
print(f"Metastore: {METASTORE}")
print(f"S3 Store:  {MINIO}")
print(f"Hostname:  {socket.gethostname()}")
print("="*60 + "\n")
============================================================
🎯 TEST LAKEHOUSE PRODUCTION
============================================================
Metastore: thrift://164.xxxx.xxx.xx:9083
S3 Store:  https://xxxxx.xxxxxxxxx.dev
Hostname:  xxxxxxxxxxxxx
============================================================
spark = SparkSession.builder
    .appName("Exercies")
    .master("local[2]")
    .config("spark.jars", ",".join(jars_paths))
.config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.iceberg.type", "hive")
    .config("spark.sql.catalog.iceberg.uri", METASTORE)
    .config("spark.sql.catalog.iceberg.warehouse", "s3a://lakehouse/warehouse")
    .config("spark.hadoop.fs.s3a.endpoint", MINIO)
    .config("spark.hadoop.fs.s3a.access.key", MINIO_KEY)
    .config("spark.hadoop.fs.s3a.secret.key", MINIO_SECRET)
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "true")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

Explications des configurations clés

  • IcebergSparkSessionExtensions : Active les extensions SQL spécifiques à Iceberg
  • SparkCatalog : Permet l’intégration avec Hive Metastore
  • warehouse : Définit la localisation physique des données sur S3 ou MinIO
  • path.style.access : Paramètre important pour la compatibilité S3/MinIO

Création de la base de données

spark.sql("CREATE DATABASE IF NOT EXISTS iceberg.exercice1")
DataFrame[]

3. Time Travel : Concepts et Cas d'Usage

Le Time Travel est une fonctionnalité essentielle des formats de tables modernes comme Apache Iceberg, Delta Lake ou Apache Hudi.
Elle permet de revenir à l’état passé d’une table — comme un “retour dans le temps” — pour rejouer des analyses, comparer des versions de données, ou restaurer un état cohérent après une erreur.
Contrairement aux systèmes de fichiers traditionnels, ces formats maintiennent un historique transactionnel complet des modifications, grâce à des snapshots ou des transaction logs.

# Table avec partitionnement avancé pour optimiser le time travel
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg.exercice1.events_advanced (
    event_id STRING,
    user_id BIGINT,
    event_type STRING,
    event_timestamp TIMESTAMP,
    amount DECIMAL(10,2),
    country STRING,
    session_id STRING
) USING iceberg
PARTITIONED BY (
    days(event_timestamp),      -- Partition journalière (hidden partition)
    bucket(16, user_id),         -- Distribution par hash pour éviter le skew
    truncate(country, 2)         -- Regroupement géographique
)
""")
DataFrame[]

Pourquoi ce partitionnement ?

days(event_timestamp) : Permet un partition pruning temporel efficace

bucket(16, user_id) : Répartit uniformément les données sans générer trop de petits fichiers

truncate(country, 2) : Regroupe les pays similaires (US, UK, etc.)


Génération de données réalistes

def generate_events(num_records=100, base_date=None):
    """
    Génère des événements réalistes pour simuler un système de tracking
    
    Args:
        num_records: Nombre d'enregistrements à générer
        base_date: Date de base pour les événements
    
    Returns:
        Liste de dictionnaires représentant les événements
    """
    if base_date is None:
        base_date = datetime(2024, 1, 1)
    
    countries = ['US', 'FR', 'DE', 'UK', 'JP', 'CA', 'AU', 'BR']
    event_types = ['purchase', 'view', 'click', 'add_cart', 'checkout']
    
    data = []
    for i in range(num_records):
        data.append({
            'event_id': f'evt_{i}',
            'user_id': random.randint(1, 10000),
            'event_type': random.choice(event_types),
            'event_timestamp': base_date + timedelta(days=random.randint(0, 90)),
            'amount': int(random.uniform(10, 1000)),
            'country': random.choice(countries),
            'session_id': f'session_{random.randint(1, 1000)}'
        })
    return data
# Insertion initiale - Snapshot 1
print("📸 Création du Snapshot 1...")
events_data = generate_events(100)
df_events = spark.createDataFrame(events_data)
df_events.writeTo("iceberg.exercice1.events_advanced").append()
print("✅ Snapshot 1 créé avec 100 enregistrements")
📸 Création du Snapshot 1...
                                                                                
✅ Snapshot 1 créé avec 100 enregistrements
# Deuxième insertion - Snapshot 2
print("\n📸 Création du Snapshot 2...")
events_data_2 = generate_events(500, base_date=datetime(2024, 2, 1))
df_events_2 = spark.createDataFrame(events_data_2)
df_events_2.writeTo("iceberg.exercice1.events_advanced").append()
print("✅ Snapshot 2 créé avec 500 enregistrements supplémentaires")
📸 Création du Snapshot 2...
                                                                                
✅ Snapshot 2 créé avec 500 enregistrements supplémentaires
# Troisième insertion - Snapshot 3
print("\n📸 Création du Snapshot 3...")
events_data_3 = generate_events(300, base_date=datetime(2024, 3, 1))
df_events_3 = spark.createDataFrame(events_data_3)
df_events_3.writeTo("iceberg.exercice1.events_advanced").append()
print("✅ Snapshot 3 créé avec 300 enregistrements supplémentaires")
📸 Création du Snapshot 3...
                                                                                
✅ Snapshot 3 créé avec 300 enregistrements supplémentaires

4. Gestion avancée des snapshots

Dans les architectures modernes de lakehouses, les snapshots jouent un rôle crucial pour garantir intégrité, auditabilité et reproductibilité des données. Apache Iceberg offre un système de snapshots léger et performant qui permet de :

  • Conserver l’historique complet des modifications sans dupliquer les données
  • Effectuer des rollbacks rapides vers un état précédent en cas d’erreur
  • Exécuter du time travel pour des besoins d’audit, de conformité ou d’analyse rétrospective
  • Optimiser les pipelines en lecture incrémentale grâce aux métadonnées et aux statistiques de colonnes

Cette approche transforme la manière dont les équipes gèrent les données : ce n’est plus uniquement la table ou le fichier qui compte, mais la version exacte et traçable des données. Dans les sections suivantes, nous explorerons les patterns avancés, les best practices, et les métriques essentielles pour maîtriser pleinement la gestion des snapshots dans Iceberg.

a. Lister et analyser les snapshots

# Visualiser l'historique complet des snapshots
print("\n" + "="*80)
print("📋 HISTORIQUE DES SNAPSHOTS")
print("="*80)

snapshot_history = spark.sql("""
SELECT 
    snapshot_id,
    parent_id,
    operation,
    committed_at,
    summary
FROM iceberg.exercice1.events_advanced.snapshots
ORDER BY committed_at
""")

snapshot_history.show(truncate=False)
================================================================================
📋 HISTORIQUE DES SNAPSHOTS
================================================================================

+-------------------+-------------------+---------+-----------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|snapshot_id        |parent_id          |operation|committed_at           |summary                                                                                                                                                                                                                                                                                                            |
+-------------------+-------------------+---------+-----------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|9165404464823178155|NULL               |append   |2025-11-03 15:40:23.392|{spark.app.id -> local-1762180764265, added-data-files -> 100, added-records -> 100, added-files-size -> 209478, changed-partition-count -> 100, total-records -> 100, total-files-size -> 209478, total-data-files -> 100, total-delete-files -> 0, total-position-deletes -> 0, total-equality-deletes -> 0}     |
|6604306536571122135|9165404464823178155|append   |2025-11-04 14:35:39.982|{spark.app.id -> local-1762263234886, added-data-files -> 971, added-records -> 1000, added-files-size -> 2041957, changed-partition-count -> 971, total-records -> 1100, total-files-size -> 2251435, total-data-files -> 1071, total-delete-files -> 0, total-position-deletes -> 0, total-equality-deletes -> 0}|
|4505195276291351018|6604306536571122135|append   |2025-11-04 14:35:52.455|{spark.app.id -> local-1762263234886, added-data-files -> 98, added-records -> 100, added-files-size -> 205494, changed-partition-count -> 98, total-records -> 1200, total-files-size -> 2456929, total-data-files -> 1169, total-delete-files -> 0, total-position-deletes -> 0, total-equality-deletes -> 0}    |
|8469158781690803317|4505195276291351018|append   |2025-11-04 14:38:57.342|{spark.app.id -> local-1762263234886, added-data-files -> 487, added-records -> 500, added-files-size -> 1023658, changed-partition-count -> 487, total-records -> 1700, total-files-size -> 3480587, total-data-files -> 1656, total-delete-files -> 0, total-position-deletes -> 0, total-equality-deletes -> 0} |
|8457227996744290751|8469158781690803317|append   |2025-11-04 14:42:09.062|{spark.app.id -> local-1762263234886, added-data-files -> 296, added-records -> 300, added-files-size -> 621774, changed-partition-count -> 296, total-records -> 2000, total-files-size -> 4102361, total-data-files -> 1952, total-delete-files -> 0, total-position-deletes -> 0, total-equality-deletes -> 0}  |
+-------------------+-------------------+---------+-----------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
# Détails des snapshots avec métriques
print("\n" + "="*80)
print("📊 MÉTRIQUES DÉTAILLÉES DES SNAPSHOTS")
print("="*80)

snapshot_details = spark.sql("""
SELECT 
    snapshot_id,
    operation,
    committed_at,
    summary['added-data-files'] AS files_added,
    summary['added-records'] AS records_added,
    summary['total-data-files'] AS total_files,
    summary['total-records'] AS total_records,
    summary['total-files-size'] AS total_size_bytes
FROM iceberg.exercice1.events_advanced.snapshots
ORDER BY committed_at
""")
snapshot_details.show(truncate=False)

================================================================================
📊 MÉTRIQUES DÉTAILLÉES DES SNAPSHOTS
================================================================================
+-------------------+---------+-----------------------+-----------+-------------+-----------+-------------+----------------+
|snapshot_id        |operation|committed_at           |files_added|records_added|total_files|total_records|total_size_bytes|
+-------------------+---------+-----------------------+-----------+-------------+-----------+-------------+----------------+
|9165404464823178155|append   |2025-11-03 15:40:23.392|100        |100          |100        |100          |209478          |
|6604306536571122135|append   |2025-11-04 14:35:39.982|971        |1000         |1071       |1100         |2251435         |
|4505195276291351018|append   |2025-11-04 14:35:52.455|98         |100          |1169       |1200         |2456929         |
|8469158781690803317|append   |2025-11-04 14:38:57.342|487        |500          |1656       |1700         |3480587         |
|8457227996744290751|append   |2025-11-04 14:42:09.062|296        |300          |1952       |2000         |4102361         |
+-------------------+---------+-----------------------+-----------+-------------+-----------+-------------+----------------+

Ce qu’on observe

  • snapshot_id : Identifiant unique du snapshot
  • parent_id : Chaînage des snapshots (arborescence des versions)
  • operation : Type d’opération (append, overwrite, delete, etc.)
  • committed_at : Timestamp de création — crucial pour le time travel
  • summary : Métadonnées détaillées (nombre de fichiers, taille totale, etc.)

b. Time travel par timestamp

# Requête AS OF TIMESTAMP - Voir la table à un moment précis
print("\n" + "="*80)
print("⏰ TIME TRAVEL PAR TIMESTAMP")
print("="*80)

# Récupérer le timestamp du deuxième snapshot
timestamp_snapshot_2 = spark.sql("""
SELECT committed_at 
FROM iceberg.exercice1.events_advanced.snapshots
ORDER BY committed_at
LIMIT 1 OFFSET 1
""").collect()[0][0]

print(f"\n🕐 Interrogation de la table au timestamp: {timestamp_snapshot_2}")

df_at_timestamp = spark.sql(f"""
SELECT COUNT(*) as record_count, 
       MIN(event_timestamp) as min_date,
       MAX(event_timestamp) as max_date
FROM iceberg.exercice1.events_advanced
FOR SYSTEM_TIME AS OF '{timestamp_snapshot_2}'
""")
df_at_timestamp.show()

# Comparer avec l'état actuel
print("\n📊 Comparaison avec l'état actuel:")
df_current = spark.sql("""
SELECT COUNT(*) as record_count,
       MIN(event_timestamp) as min_date,
       MAX(event_timestamp) as max_date
FROM iceberg.exercice1.events_advanced
""")
df_current.show()
================================================================================
⏰ TIME TRAVEL PAR TIMESTAMP
================================================================================

🕐 Interrogation de la table au timestamp: 2025-11-04 14:35:39.982000

+------------+-------------------+-------------------+
|record_count|           min_date|           max_date|
+------------+-------------------+-------------------+
|        1100|2024-01-01 00:00:00|2024-03-31 01:00:00|
+------------+-------------------+-------------------+

📊 Comparaison avec l'état actuel:
+------------+-------------------+-------------------+
|record_count|           min_date|           max_date|
+------------+-------------------+-------------------+
|        2000|2024-01-01 00:00:00|2024-05-30 00:00:00|
+------------+-------------------+-------------------+

c. Time Travel par Snapshot ID

print("\n" + "="*80)
print("🔍 TIME TRAVEL PAR SNAPSHOT ID")
print("="*80)

# Récupérer le premier snapshot ID
first_snapshot_id = spark.sql("""
SELECT snapshot_id 
FROM iceberg.exercice1.events_advanced.snapshots
ORDER BY committed_at
LIMIT 1
""").collect()[0][0]

print(f"\n📸 Interrogation du snapshot: {first_snapshot_id}")

df_snapshot_1 = spark.sql(f"""
SELECT event_type, COUNT(*) as count
FROM iceberg.exercice1.events_advanced
FOR SYSTEM_VERSION AS OF {first_snapshot_id}
GROUP BY event_type
ORDER BY count DESC
""")
df_snapshot_1.show()
================================================================================
🔍 TIME TRAVEL PAR SNAPSHOT ID
================================================================================

📸 Interrogation du snapshot: 9165404464823178155

[Stage 29:=====================================================>  (24 + 1) / 25]
+----------+-----+
|event_type|count|
+----------+-----+
|  checkout|   27|
|  purchase|   23|
|      view|   22|
|     click|   15|
|  add_cart|   13|
+----------+-----+

Quand utiliser TIMESTAMP vs SNAPSHOT_ID

  • TIMESTAMP : À privilégier pour les besoins d’audit, de conformité ou de débogage (« que s’est-il passé le 15 mars à 14h ? »)
  • SNAPSHOT_ID : Idéal pour la reproductibilité exacte, les tests ou un rollback précis

d. Rollback à un snapshot précédent

print("\n" + "="*80)
print("↩️  ROLLBACK À UN SNAPSHOT PRÉCÉDENT")
print("="*80)

# Identifier le snapshot cible
target_snapshot = spark.sql("""
SELECT snapshot_id, committed_at, summary['total-records'] as records
FROM iceberg.exercice1.events_advanced.snapshots
ORDER BY committed_at
LIMIT 1 OFFSET 1
""")
target_snapshot.show()

# Effectuer le rollback
print(f"\n🔄 Rollback au snapshot {target_snapshot_id}...")
spark.sql(f"""
CALL iceberg.system.rollback_to_snapshot(
    'exercice1.events_advanced',
    {target_snapshot_id}
)
""")

print("✅ Rollback effectué avec succès!")
🔄 Rollback au snapshot 6604306536571122135...
✅ Rollback effectué avec succès!

📊 État de la table après rollback:
+-------------+
|total_records|
+-------------+
|         1100|
+-------------+
⚠️ Attention : Le rollback crée un nouveau snapshot qui pointe vers un ancien état — les données ne sont pas supprimées physiquement.

e. Expiration des snapshots

print("\n" + "="*80)
print("🗑️  EXPIRATION DES SNAPSHOTS ANCIENS")
print("="*80)

# Expirer les snapshots de plus de 7 jours
# ATTENTION : Cette opération est IRREVERSIBLE
print("⚠️  Expiration des snapshots de plus de 7 jours...")

spark.sql("""
CALL iceberg.system.expire_snapshots(
    table => 'exercice1.events_advanced',
    older_than => TIMESTAMP '2024-01-08 00:00:00',
    retain_last => 3  -- Garder au minimum les 3 derniers snapshots
)
""")

print("✅ Expiration terminée")

# Vérifier les snapshots restants
print("\n📋 Snapshots restants:")
spark.sql("""
SELECT snapshot_id, committed_at
FROM iceberg.exercice1.events_advanced.snapshots
ORDER BY committed_at
""").show()
================================================================================
🗑️  EXPIRATION DES SNAPSHOTS ANCIENS
================================================================================
⚠️  Expiration des snapshots de plus de 7 jours...
[Stage 48:====================================================> (200 + 2) / 204]
✅ Expiration terminée

📋 Snapshots restants:
+-------------------+--------------------+
|        snapshot_id|        committed_at|
+-------------------+--------------------+
|9165404464823178155|2025-11-03 15:40:...|
|6604306536571122135|2025-11-04 14:35:...|
|4505195276291351018|2025-11-04 14:35:...|
|8469158781690803317|2025-11-04 14:38:...|
|8457227996744290751|2025-11-04 14:42:...|
+-------------------+--------------------+

Best practices pour l’expiration

  • Toujours conserver : Au moins 3 à 5 snapshots récents
  • Période de rétention : À définir selon les besoins métier (7, 30 ou 90 jours)
  • Conformité : Vérifier les exigences légales avant toute expiration
  • Automatisation : Programmer l’expiration automatique (ex. via Airflow ou un job planifié)

5. Partitionnement et optimisation

Dans les grands ensembles de données analytiques, le partitionnement et l’optimisation des tables sont des leviers essentiels pour améliorer les performances et réduire les coûts. Apache Iceberg fournit des outils puissants pour :

  • Distribuer les données intelligemment afin de limiter le nombre de fichiers petits et inefficaces
  • Accélérer les requêtes grâce au partition pruning et à l’utilisation des statistiques de colonnes
  • Assurer une lecture équilibrée entre partitions, évitant les goulets d’étranglement
  • Faciliter la maintenance et l’optimisation continue des tables dans des environnements volumineux

Bien conçu, le partitionnement permet non seulement des gains de performance significatifs, mais aussi une agilité accrue pour les pipelines ETL, les analyses ad hoc et le time travel. Dans cette section, nous explorerons les métriques clés à surveiller, ainsi que les erreurs courantes à éviter pour tirer pleinement parti des capacités d’Iceberg.

Analyse de distribution des partitions

print("\n" + "="*80)
print("📂 ANALYSE DE LA DISTRIBUTION DES PARTITIONS")
print("="*80)

partition_analysis = spark.sql("""
SELECT 
    partition,
    COUNT(*) AS file_count,
    SUM(record_count) AS total_records,
    ROUND(SUM(file_size_in_bytes) / 1024 / 1024, 2) AS size_mb,
    ROUND(AVG(file_size_in_bytes) / 1024 / 1024, 2) AS avg_file_size_mb
FROM iceberg.exercice1.events_advanced.files
GROUP BY partition
ORDER BY total_records DESC
""")
partition_analysis.show(50, False)
================================================================================
📂 ANALYSE DE LA DISTRIBUTION DES PARTITIONS
================================================================================
[Stage 50:>                                                         (0 + 1) / 1]
+--------------------+----------+-------------+-------+----------------+
|partition           |file_count|total_records|size_mb|avg_file_size_mb|
+--------------------+----------+-------------+-------+----------------+
|{2024-01-20, 3, CA} |1         |3            |0.0    |0.0             |
|{2024-02-15, 15, UK}|1         |2            |0.0    |0.0             |
|{2024-03-19, 4, UK} |1         |2            |0.0    |0.0             |
|{2024-03-16, 4, JP} |1         |2            |0.0    |0.0             |
|{2024-01-19, 8, DE} |1         |2            |0.0    |0.0             |
|{2024-03-20, 3, DE} |1         |2            |0.0    |0.0             |
|{2024-01-10, 15, DE}|1         |2            |0.0    |0.0             |
|{2024-03-12, 4, AU} |1         |2            |0.0    |0.0             |
|{2024-01-06, 2, CA} |1         |2            |0.0    |0.0             |
|{2024-03-01, 4, DE} |1         |2            |0.0    |0.0             |
|{2024-01-05, 1, AU} |1         |2            |0.0    |0.0             |
|{2024-02-07, 6, DE} |1         |2            |0.0    |0.0             |
|{2024-03-07, 1, UK} |1         |2            |0.0    |0.0             |
|{2024-03-26, 14, JP}|2         |2            |0.0    |0.0             |
|{2024-03-15, 11, AU}|1         |2            |0.0    |0.0             |
|{2024-02-20, 11, CA}|1         |2            |0.0    |0.0             |
|{2024-03-30, 2, FR} |1         |2            |0.0    |0.0             |
|{2024-03-20, 10, FR}|1         |2            |0.0    |0.0             |
|{2024-01-28, 6, DE} |1         |2            |0.0    |0.0             |
|{2024-02-14, 15, UK}|1         |2            |0.0    |0.0             |
|{2024-03-29, 7, AU} |2         |2            |0.0    |0.0             |
|{2024-02-19, 13, AU}|1         |2            |0.0    |0.0             |
...
|{2024-02-17, 9, US} |1         |1            |0.0    |0.0             |
+--------------------+----------+-------------+-------+----------------+
only showing top 50 rows

Métriques clés à surveiller

  • file_count : Détecter s’il existe trop de petits fichiers (small files problem)
  • avg_file_size_mb : Taille moyenne idéale entre 128 MB et 1 GB
  • total_records : Vérifier une distribution équilibrée entre les partitions

Comparaison de performance : avec et sans partition pruning

print("\n" + "="*80)
print("⚡ BENCHMARK : PARTITION PRUNING")
print("="*80)

# Test 1 : Sans partition pruning (scan complet)
print("\n❌ Test 1 : Requête sans partition pruning (inefficace)")
start = time.time()
df_no_prune = spark.sql("""
    SELECT COUNT(*) as total, AVG(amount) as avg_amount
    FROM iceberg.exercice1.events_advanced
""")
df_no_prune.show()
time_no_prune = time.time() - start
print(f"⏱️  Temps d'exécution: {time_no_prune:.3f}s")

# Analyser les fichiers scannés
files_scanned = spark.sql("""
    SELECT COUNT(*) as files_scanned
    FROM iceberg.exercice1.events_advanced.files
""").collect()[0][0]
print(f"📁 Fichiers scannés: {files_scanned}")
================================================================================
⚡ BENCHMARK : PARTITION PRUNING
================================================================================

❌ Test 1 : Requête sans partition pruning (inefficace)
                                                                                
+-----+----------+
|total|avg_amount|
+-----+----------+
| 1100|514.754545|
+-----+----------+

⏱️  Temps d'exécution: 39.189s
📁 Fichiers scannés: 1071

Analyse du Plan d'Exécution

print("\n" + "="*80)
print("🔍 ANALYSE DU PLAN D'EXÉCUTION PHYSIQUE")
print("="*80)

spark.sql("""
    SELECT * 
    FROM iceberg.exercice1.events_advanced
    WHERE event_timestamp >= '2024-01-15' 
    AND user_id = 5000
""").explain(mode="extended")

================================================================================
🔍 ANALYSE DU PLAN D'EXÉCUTION PHYSIQUE
================================================================================
== Parsed Logical Plan ==
'Project [*]
+- 'Filter (('event_timestamp >= 2024-01-15) AND ('user_id = 5000))
   +- 'UnresolvedRelation [iceberg, exercice1, events_advanced], [], false

== Analyzed Logical Plan ==
event_id: string, user_id: bigint, event_type: string, event_timestamp: timestamp, amount: decimal(10,2), country: string, session_id: string
Project [event_id#1099, user_id#1100L, event_type#1101, event_timestamp#1102, amount#1103, country#1104, session_id#1105]
+- Filter ((event_timestamp#1102 >= cast(2024-01-15 as timestamp)) AND (user_id#1100L = cast(5000 as bigint)))
   +- SubqueryAlias iceberg.exercice1.events_advanced
      +- RelationV2[event_id#1099, user_id#1100L, event_type#1101, event_timestamp#1102, amount#1103, country#1104, session_id#1105] iceberg.exercice1.events_advanced iceberg.exercice1.events_advanced

== Optimized Logical Plan ==
Filter ((event_timestamp#1102 >= 2024-01-15 00:00:00) AND (user_id#1100L = 5000))
+- RelationV2[event_id#1099, user_id#1100L, event_type#1101, event_timestamp#1102, amount#1103, country#1104, session_id#1105] iceberg.exercice1.events_advanced

== Physical Plan ==
*(1) Project [event_id#1099, user_id#1100L, event_type#1101, event_timestamp#1102, amount#1103, country#1104, session_id#1105]
+- *(1) Filter ((event_timestamp#1102 >= 2024-01-15 00:00:00) AND (user_id#1100L = 5000))
   +- BatchScan iceberg.exercice1.events_advanced[event_id#1099, user_id#1100L, event_type#1101, event_timestamp#1102, amount#1103, country#1104, session_id#1105] iceberg.exercice1.events_advanced (branch=null) [filters=event_timestamp IS NOT NULL, user_id IS NOT NULL, event_timestamp >= 1705273200000000, user_id = 5000, groupedBy=] RuntimeFilters: []

Ce qui est excellent ici

  • ✅ Les filtres sont appliqués au niveau du BatchScan
  • Iceberg lit uniquement les partitions concernées
  • ✅ Les statistiques min/max des colonnes sont exploitées pour optimiser la lecture

6. Patterns avancés et best practices

a. Branching et tagging

print("\n" + "="*80)
print("🌿 BRANCHING POUR DÉVELOPPEMENT ISOLÉ")
print("="*80)

# Créer une branche pour tester des modifications
spark.sql("""
ALTER TABLE iceberg.exercice1.events_advanced 
CREATE BRANCH dev_branch
""")

print("✅ Branche 'dev_branch' créée")

# Travailler sur la branche sans affecter la production
spark.sql("""
INSERT INTO iceberg.exercice1.events_advanced.branch_dev_branch
SELECT * FROM iceberg.exercice1.events_advanced
WHERE amount > 500
""")

# Créer un tag pour marquer une version stable
spark.sql("""
ALTER TABLE iceberg.exercice1.events_advanced 
CREATE TAG `v1.0-stable`
""")
print("✅ Tag 'v1.0-stable' créé")

================================================================================
🌿 BRANCHING POUR DÉVELOPPEMENT ISOLÉ
================================================================================
✅ Branche 'dev_branch' créée

DataFrame[]

✅ Tag 'v1.0-stable' créé

Cas d’usage des branches

  • Tests de nouvelles transformations
  • Développement de features sans impacter la production
  • Isolation des environnements (dev, staging, prod)

b. Audit trail complet

print("\n" + "="*80)
print("📜 AUDIT TRAIL COMPLET")
print("="*80)

audit_trail = spark.sql("""
SELECT 
    committed_at as change_time,
    snapshot_id,
    operation,
    summary['added-records'] as records_added,
    summary['deleted-records'] as records_deleted,
    summary['changed-partition-count'] as partitions_changed
FROM iceberg.exercice1.events_advanced.snapshots 
ORDER BY committed_at DESC;
""")
audit_trail.show(truncate=False)
================================================================================
📜 AUDIT TRAIL COMPLET
================================================================================
25/11/05 06:43:11 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
+-----------------------+-------------------+---------+-------------+---------------+------------------+
|change_time            |snapshot_id        |operation|records_added|records_deleted|partitions_changed|
+-----------------------+-------------------+---------+-------------+---------------+------------------+
|2025-11-04 19:08:00.727|7950110395698150392|append   |574          |NULL           |568               |
|2025-11-04 14:42:09.062|8457227996744290751|append   |300          |NULL           |296               |
|2025-11-04 14:38:57.342|8469158781690803317|append   |500          |NULL           |487               |
|2025-11-04 14:35:52.455|4505195276291351018|append   |100          |NULL           |98                |
|2025-11-04 14:35:39.982|6604306536571122135|append   |1000         |NULL           |971               |
|2025-11-03 15:40:23.392|9165404464823178155|append   |100          |NULL           |100               |
+-----------------------+-------------------+---------+-------------+---------------+------------------+
# Tracer qui a fait quelle modification (nécessite metadata tracking)
spark.sql("""
SELECT 
    file_path,
    file_format,
    partition,
    record_count,
    file_size_in_bytes
FROM iceberg.exercice1.events_advanced.files
WHERE partition.event_timestamp_day = DATE '2024-01-15';
""").show(truncate=False)
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+--------------------+------------+------------------+
|file_path                                                                                                                                                                                 |file_format|partition           |record_count|file_size_in_bytes|
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+--------------------+------------+------------------+
|s3a://lakehouse/warehouse/exercice1.db/events_advanced/data/event_timestamp_day=2024-01-15/user_id_bucket=12/country_trunc=FR/00000-2-056e9490-7bef-4e3e-b451-56569913394d-0-00365.parquet|PARQUET    |{2024-01-15, 12, FR}|2           |2150              |
|s3a://lakehouse/warehouse/exercice1.db/events_advanced/data/event_timestamp_day=2024-01-15/user_id_bucket=1/country_trunc=UK/00000-2-056e9490-7bef-4e3e-b451-56569913394d-0-00353.parquet |PARQUET    |{2024-01-15, 1, UK} |1           |2085              |
|s3a://lakehouse/warehouse/exercice1.db/events_advanced/data/event_timestamp_day=2024-01-15/user_id_bucket=2/country_trunc=UK/00000-2-056e9490-7bef-4e3e-b451-56569913394d-0-00285.parquet |PARQUET    |{2024-01-15, 2, UK} |1           |2112              |
|s3a://lakehouse/warehouse/exercice1.db/events_advanced/data/event_timestamp_day=2024-01-15/user_id_bucket=13/country_trunc=JP/00000-2-056e9490-7bef-4e3e-b451-56569913394d-0-00035.parquet|PARQUET    |{2024-01-15, 13, JP}|1           |2078              |
|s3a://lakehouse/warehouse/exercice1.db/events_advanced/data/event_timestamp_day=2024-01-15/user_id_bucket=0/country_trunc=BR/00000-2-056e9490-7bef-4e3e-b451-56569913394d-0-00211.parquet |PARQUET    |{2024-01-15, 0, BR} |1           |2113              |
|s3a://lakehouse/warehouse/exercice1.db/events_advanced/data/event_timestamp_day=2024-01-15/user_id_bucket=4/country_trunc=CA/00000-2-056e9490-7bef-4e3e-b451-56569913394d-0-00430.parquet |PARQUET    |{2024-01-15, 4, CA} |1           |2113              |
|s3a://lakehouse/warehouse/exercice1.db/events_advanced/data/event_timestamp_day=2024-01-15/user_id_bucket=5/country_trunc=BR/00000-2-056e9490-7bef-4e3e-b451-56569913394d-0-00950.parquet |PARQUET    |{2024-01-15, 5, BR} |1           |2076              |
|s3a://lakehouse/warehouse/exercice1.db/events_advanced/data/event_timestamp_day=2024-01-15/user_id_bucket=3/country_trunc=BR/00000-2-056e9490-7bef-4e3e-b451-56569913394d-0-00691.parquet |PARQUET    |{2024-01-15, 3, BR} |1           |2113              |
|s3a://lakehouse/warehouse/exercice1.db/events_advanced/data/event_timestamp_day=2024-01-15/user_id_bucket=7/country_trunc=DE/00000-2-056e9490-7bef-4e3e-b451-56569913394d-0-00406.parquet |PARQUET    |{2024-01-15, 7, DE} |1           |2112              |
|s3a://lakehouse/warehouse/exercice1.db/events_advanced/data/event_timestamp_day=2024-01-15/user_id_bucket=7/country_trunc=AU/00000-2-056e9490-7bef-4e3e-b451-56569913394d-0-00180.parquet |PARQUET    |{2024-01-15, 7, AU} |1           |2112              |
|s3a://lakehouse/warehouse/exercice1.db/events_advanced/data/event_timestamp_day=2024-01-15/user_id_bucket=0/country_trunc=JP/00000-2-056e9490-7bef-4e3e-b451-56569913394d-0-00873.parquet |PARQUET    |{2024-01-15, 0, JP} |1           |2113              |
|s3a://lakehouse/warehouse/exercice1.db/events_advanced/data/event_timestamp_day=2024-01-15/user_id_bucket=1/country_trunc=JP/00000-2-056e9490-7bef-4e3e-b451-56569913394d-0-00363.parquet |PARQUET    |{2024-01-15, 1, JP} |1           |2092              |
|s3a://lakehouse/warehouse/exercice1.db/events_advanced/data/event_timestamp_day=2024-01-15/user_id_bucket=7/country_trunc=BR/00000-2-056e9490-7bef-4e3e-b451-56569913394d-0-00381.parquet |PARQUET    |{2024-01-15, 7, BR} |1           |2085              |
|s3a://lakehouse/warehouse/exercice1.db/events_advanced/data/event_timestamp_day=2024-01-15/user_id_bucket=8/country_trunc=FR/00000-2-056e9490-7bef-4e3e-b451-56569913394d-0-00959.parquet |PARQUET    |{2024-01-15, 8, FR} |1           |2106              |
|s3a://lakehouse/warehouse/exercice1.db/events_advanced/data/event_timestamp_day=2024-01-15/user_id_bucket=8/country_trunc=DE/00000-2-056e9490-7bef-4e3e-b451-56569913394d-0-00841.parquet |PARQUET    |{2024-01-15, 8, DE} |1           |2092              |
|s3a://lakehouse/warehouse/exercice1.db/events_advanced/data/event_timestamp_day=2024-01-15/user_id_bucket=4/country_trunc=JP/00000-2-056e9490-7bef-4e3e-b451-56569913394d-0-00579.parquet |PARQUET    |{2024-01-15, 4, JP} |1           |2112              |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+--------------------+------------+------------------+

c. Incremental Read Pattern


print("\n" + "="*80)
print("📖 LECTURE INCRÉMENTALE ENTRE DEUX SNAPSHOTS")
print("="*80)

# Récupérer les deux derniers snapshots
snapshots = spark.sql("""
SELECT snapshot_id, parent_id, committed_at
FROM iceberg.exercice1.events_advanced.snapshots
ORDER BY committed_at DESC
LIMIT 2
""").collect()

# snapshot récent
newer_snapshot_id = snapshots[0]['snapshot_id']      # ou snapshots[0][0]
newer_parent_id   = snapshots[0]['parent_id']       # ou snapshots[0][1]

# snapshot juste avant
older_snapshot_id = snapshots[1]['snapshot_id']     # ou snapshots[1][0]
older_parent_id   = snapshots[1]['parent_id']      # ou snapshots[1][1]

print(f"🔄 Lecture des changements entre snapshots:")
print(f"Newer snapshot: {newer_snapshot_id}, parent: {newer_parent_id}")
print(f"Older snapshot: {older_snapshot_id}, parent: {older_parent_id}")



print(f"   Ancien: {older_snapshot}")
print(f"   Nouveau: {newer_snapshot}")

# Lire uniquement les nouvelles données
incremental_df = spark.read \
    .format("iceberg") \
    .option("scan-incrementally", "true") \
    .option("start-snapshot-id", newer_parent_id) \
    .option("end-snapshot-id", newer_snapshot_id) \
    .load("iceberg.exercice1.events_advanced")

print(f"\n📊 Nombre de nouveaux enregistrements: {incremental_df.count()}")
incremental_df.show(10)

================================================================================
📖 LECTURE INCRÉMENTALE ENTRE DEUX SNAPSHOTS
================================================================================
🔄 Lecture des changements entre snapshots:
Newer snapshot: 7950110395698150392, parent: 6604306536571122135
Older snapshot: 8457227996744290751, parent: 8469158781690803317
   Ancien: 8457227996744290751
   Nouveau: 7950110395698150392                                                                        
📊 Nombre de nouveaux enregistrements: 574
+--------+-------+----------+-------------------+------+-------+-----------+
|event_id|user_id|event_type|    event_timestamp|amount|country| session_id|
+--------+-------+----------+-------------------+------+-------+-----------+
| evt_819|   9974|  checkout|2024-02-28 00:00:00|842.00|     UK|session_674|
|  evt_37|   7430|  add_cart|2024-03-29 01:00:00|691.00|     AU| session_43|
| evt_224|   1552|  add_cart|2024-03-03 00:00:00|532.00|     FR|session_176|
| evt_261|   4134|     click|2024-03-28 00:00:00|970.00|     FR|session_185|
|  evt_39|   1494|  add_cart|2024-03-18 01:00:00|830.00|     US|session_170|
| evt_753|   4756|     click|2024-03-29 00:00:00|879.00|     FR|session_277|
| evt_361|   4710|  checkout|2024-02-03 00:00:00|902.00|     US|session_917|
| evt_941|   4459|  add_cart|2024-02-26 00:00:00|506.00|     BR|session_517|
| evt_703|   7294|      view|2024-03-14 00:00:00|586.00|     AU|session_199|
| evt_689|   3007|     click|2024-02-20 00:00:00|923.00|     UK|session_649|
+--------+-------+----------+-------------------+------+-------+-----------+
only showing top 10 rows                                                                            

Pourquoi c’est puissant ?

  • Évite de relire toutes les données
  • ✅ Idéal pour les pipelines incrémentaux
  • Réduit drastiquement les coûts de compute et d’I/O

d. Schema Evolution avec Snapshots

print("\n" + "="*80)
print("🔄 ÉVOLUTION DE SCHÉMA AVEC TRAÇABILITÉ")
print("="*80)

# Ajouter une colonne sans casser les snapshots existants
spark.sql("""
ALTER TABLE iceberg.exercice1.events_advanced
ADD COLUMN device_type STRING
""")
print("✅ Colonne 'device_type' ajoutée")

# Les anciens snapshots restent accessibles !
# Iceberg gère automatiquement l'évolution de schéma
df_old_schema = spark.sql(f"""
SELECT * FROM iceberg.exercice1.events_advanced
FOR SYSTEM_VERSION AS OF {newer_parent_id}
""")
print("\n📸 Snapshot ancien (sans device_type):")
df_old_schema.printSchema()

df_new_schema.show(5)

================================================================================
🔄 ÉVOLUTION DE SCHÉMA AVEC TRAÇABILITÉ
================================================================================
✅ Colonne 'device_type' ajoutée

📸 Snapshot ancien (sans device_type):
root
 |-- event_id: string (nullable = true)
 |-- user_id: long (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_timestamp: timestamp (nullable = true)
 |-- amount: decimal(10,2) (nullable = true)
 |-- country: string (nullable = true)
 |-- session_id: string (nullable = true)
df_new_schema = spark.sql("""
SELECT * FROM iceberg.exercice1.events_advanced
""")
print("\n📸 Snapshot actuel (avec device_type):")
df_new_schema.printSchema()
📸 Snapshot actuel (avec device_type):
root
 |-- event_id: string (nullable = true)
 |-- user_id: long (nullable = true)
 |-- event_type: string (nullable = true)
 |-- event_timestamp: timestamp (nullable = true)
 |-- amount: decimal(10,2) (nullable = true)
 |-- country: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- device_type: string (nullable = true)

+--------+-------+----------+-------------------+------+-------+-----------+-----------+
|event_id|user_id|event_type|    event_timestamp|amount|country| session_id|device_type|
+--------+-------+----------+-------------------+------+-------+-----------+-----------+
|  evt_35|   2069|  add_cart|2024-03-11 01:00:00|481.00|     CA|session_564|       NULL|
|   evt_6|   1621|     click|2024-03-28 01:00:00|962.00|     JP|session_906|       NULL|
|  evt_37|   7430|  add_cart|2024-03-29 01:00:00|691.00|     AU| session_43|       NULL|
|  evt_70|   4027|  checkout|2024-03-26 01:00:00|102.00|     AU|session_806|       NULL|
|  evt_39|   1494|  add_cart|2024-03-18 01:00:00|830.00|     US|session_170|       NULL|
+--------+-------+----------+-------------------+------+-------+-----------+-----------+
only showing top 5 rows

e. Optimisations et Tuning
   i. Compaction des Small Files

print("\n" + "="*80)
print("🗜️  COMPACTION DES FICHIERS")
print("="*80)

# Identifier les partitions avec trop de petits fichiers
spark.sql("""
SELECT 
    partition,
    COUNT(*) as file_count,
    ROUND(AVG(file_size_in_bytes) / 1024 / 1024, 2) as avg_size_mb
FROM iceberg.exercice1.events_advanced.files
GROUP BY partition
HAVING COUNT(*) > 10
""").show()

# Compacter les partitions problématiques
spark.sql("""
CALL iceberg.system.rewrite_data_files(
    table => 'exercice1.events_advanced',
    strategy => 'binpack',
    options => map(
        'target-file-size-bytes', '536870912',  -- 512 MB
        'min-file-size-bytes', '134217728'      -- 128 MB
    )
)
""")
print("✅ Compaction terminée")
================================================================================
🗜️  COMPACTION DES FICHIERS
================================================================================
25/11/05 12:54:29 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
[Stage 0:>                                                          (0 + 1) / 1]
+---------+----------+-----------+
|partition|file_count|avg_size_mb|
+---------+----------+-----------+
+---------+----------+-----------+

✅ Compaction terminée

ii. Snapshot Expiration Automatisée

print("\n" + "="*80)
print("⏰ EXPIRATION AUTOMATIQUE DES SNAPSHOTS")
print("="*80)

# Définir une politique d'expiration
spark.sql("""
ALTER TABLE iceberg.exercice1.events_advanced
SET TBLPROPERTIES (
    'history.expire.max-snapshot-age-ms' = '604800000',  -- 7 jours
    'history.expire.min-snapshots-to-keep' = '5'
)
""")
print("✅ Politique d'expiration configurée (7 jours, min 5 snapshots)")

================================================================================
⏰ EXPIRATION AUTOMATIQUE DES SNAPSHOTS
================================================================================
✅ Politique d'expiration configurée (7 jours, min 5 snapshots)
print("\n" + "="*80)
print("📊 MÉTRIQUES DE SANTÉ DE LA TABLE")
print("="*80)

health_metrics = spark.sql("""
SELECT 
    'Nombre de snapshots' AS metric,
    COUNT(*) AS value
FROM iceberg.exercice1.events_advanced.snapshots

UNION ALL
SELECT 
    'Nombre de fichiers de données' AS metric,
    COUNT(*) AS value
FROM iceberg.exercice1.events_advanced.files

UNION ALL
SELECT 
    'Taille totale (GB)' AS metric,
    ROUND(SUM(file_size_in_bytes) / 1024 / 1024 / 1024, 2) AS value
FROM iceberg.exercice1.events_advanced.files

UNION ALL
SELECT
    'Nombre d_enregistrements' AS metric,
    SUM(record_count) AS value
FROM iceberg.exercice1.events_advanced.files
""")

health_metrics.show(truncate=False)

================================================================================
📊 MÉTRIQUES DE SANTÉ DE LA TABLE
================================================================================
+-----------------------------+------+
|metric                       |value |
+-----------------------------+------+
|Nombre de snapshots          |6.0   |
|Nombre de fichiers de données|1071.0|
|Taille totale (GB)           |0.0   |
|Nombre d_enregistrements     |1100.0|
+-----------------------------+------+

f. Best Practices en Production
i. Stratégie de Rétention

# Configuration recommandée pour la production
retention_policy = {
    'compliance_tables': {
        'min_snapshots': 10,
        'retention_days': 2555,  # 7 ans
    },
    'operational_tables': {
        'min_snapshots': 5,
        'retention_days': 30,
    },
    'dev_tables': {
        'min_snapshots': 3,
        'retention_days': 7,
    }
}

ii. Patterns / Anti-Patterns

À FAIRE

  • Utiliser des tags pour marquer les versions stables
  • Expirer régulièrement les anciens snapshots
  • Monitorer la croissance des métadonnées
  • Tester les rollbacks sur des tables de dev
  • Documenter les snapshots importants

À ÉVITER

  • Garder tous les snapshots indéfiniment
  • Faire des rollbacks en production sans validation
  • Ignorer les politiques de conformité
  • Oublier de compacter les fichiers

iii. Checklist de Production

Checklist Snapshots Iceberg en Production

  • Politique de rétention configurée
  • Expiration automatique activée
  • Monitoring des métriques en place
  • Tags pour versions stables
  • Documentation des snapshots critiques
  • Tests de rollback réguliers
  • Audit trail configuré
  • Alertes sur anomalies

7. Conclusion

La gestion des snapshots et du time travel dans Apache Iceberg représente un changement de paradigme dans la gestion des données analytiques. Ce n’est pas simplement une fonctionnalité de plus, mais une fondation qui permet :

Bénéfices tangibles

  1. Réduction des incidents en production : Rollback en secondes au lieu d’heures
  2. Conformité garantie : Audit trail complet et immuable
  3. Économies de coûts : Lectures incrémentales, pas de copies complètes
  4. Agilité accrue : Tests sur branches sans impact production

Points clés à retenir

  • Les snapshots Iceberg sont légers (métadonnées uniquement)
  • Le time travel est performant grâce au partition pruning
  • L’architecture est ACID-compliant par design
  • La rétention doit être équilibrée entre compliance et coûts

Prochaines étapes

Pour aller plus loin dans votre maîtrise d’Iceberg :

  1. Implémenter les patterns présentés dans cet article sur vos tables
  2. Monitorer les métriques de snapshots en production
  3. Automatiser l’expiration et la compaction
  4. Former vos équipes aux bonnes pratiques

Resources complémentaires


Cet article est le fruit de plusieurs années d’expérience en production sur des lakehouses gérant plusieurs pétaoctets de données. N’hésitez pas à adapter ces patterns à votre contexte spécifique.