đ Pipelines de donnĂ©es fonctionnels avec Python¶
Implémenter des pipelines de traitement de données grùce aux concepts de programmation fonctionnelle inclus nativement avec Python
Romain Clement - Meetup Python Grenoble - 23/11/2023
đ€·ââïž Contexte¶
- Essor du Data Engineering
- Paradigme de graphes orientés acycliques (DAGs)
- Programmation fonctionnelle en Python
- Modularité, déterminisme, testabilité
â ïž Remarques¶
- Proposition de patterns
- Small / medium data
- Programmation fonctionnelle light
âïž DAG ?¶
Directed Acyclic Graph
- Graphe : noeuds + arĂȘtes
- OrientĂ© : ~arĂȘtes~ arcs
- Acyclique : pas de circuits
âïž DAG - ReprĂ©sentation¶
graph LR; A-->B; A-->C; B-->D; C-->D;
âïž DAG - Avantages¶
- Dépendances et ordre d'exécution
- Modularité, réutilisabilité, testabilité
- Pipelines et algorithmes
âïž DAG - Exemple 1¶
graph LR; A[Compute some stuff];
âïž DAG - Exemple 2 - Nettoyer un fichier CSV¶
graph LR; A[Load CSV] -- Dataframe --> B[Clean Dataframe]; B -- Dataframe --> C[Save to CSV];
âïž DAG - Exemple 2bis - Nettoyer un fichier CSV¶
graph LR; subgraph F[Process] direction LR A[Load CSV] -- Dataframe --> B[Clean Dataframe]; B -- Dataframe --> C[Save to CSV]; end START[ ] -- Path --> A; C -- Path --> END[ ]; style START fill:#FFFFFF00, stroke:#FFFFFF00; style END fill:#FFFFFF00, stroke:#FFFFFF00;
âïž DAG - Exemple 3 - Web-scraping ETL¶
graph LR; A[Extract data] -- Dataframe --> B[Transform data]; A -- Dataframe --> C[Compute metadata]; B -- Dataframe --> D[Load data]; C -- Dict[str, Any] --> D;
âïž DAG - Exemple 4 - Traitement de fichiers (streaming)¶
graph LR; subgraph B[Per file process] direction LR B1[Read file] --> B2[Process file] end A[List files] -- files --> B1
âïž DAG - Exemple 5 - Machine Learning¶
graph LR; A[Load dataset] --> B[Train / test split]; B -- train set --> C[Train model]; C -- model --> D[Evaluate model] & E[Register]; B -- test set --> D; D -- metrics --> F[Log];
Æ Programmation Fonctionnelle¶
Concepts utiles :
- Tout est fonction
- Immutabilité
- Composition
- Réutilisabilité (Curryfication)
- Evaluation paresseuse
Æ Concepts fonctionnels en Python¶
Disponible nativement :
- Fonctions:
def
- Fonctions d'ordre supérieur:
map()
,filter()
,itertools.reduce()
,lambda
- Réutilisabilité:
functools.partial()
- Evaluation paresseuse:
yield
,itertools.tee()
- Typage (faible):
typing
Æ Concepts fonctionnels en Python¶
Non disponible nativement :
- Composition de fonctions
- Structures de données complexes immutables
- Typage fort (~)
đšâđ» Mise en pratique¶
Essayons de fusionner les concepts de DAGs et programmation fonctionnelle !
⻠Réutilisabilité¶
- Fonctions paramétrables
- Curryfication
functools.partial
In [2]:
def load_csv(filename: str, separator: str = ",") -> pd.DataFrame:
return pd.read_csv(filename, sep=separator)
In [3]:
def simple_process_csv() -> None:
load_csv("data.csv")
In [4]:
load_tsv = functools.partial(load_csv, separator="\t")
In [5]:
def simple_process_tsv() -> None:
load_tsv("data.tsv")
â Composabilité¶
- ChaĂźnage de fonctions
functools.partial
functools.reduce
In [6]:
def load_csv(filename: str, separator: str = ",") -> pd.DataFrame:
return pd.read_csv(filename)
def clean_csv(data: pd.DataFrame) -> pd.DataFrame:
return data.dropna()
def save_csv(filename: str, data: pd.DataFrame) -> None:
data.to_csv(filename)
In [7]:
def pipeline_imperative(input_csv: Path, output_csv: Path) -> None:
input_data = load_csv(input_csv)
clean_data = clean_csv(input_data)
save_csv(output_csv, clean_data)
In [8]:
def pipeline_functional(input_csv: Path, output_csv: Path) -> None:
save_csv(output_csv, clean_csv(load_csv(input_csv)))
In [9]:
def compose(*functions):
return functools.reduce(
lambda f, g: lambda x: f(g(x)),
functions,
lambda x: x,
)
def pipeline_compose(input_csv: Path, output_csv: Path) -> None:
dag = compose(
functools.partial(save_csv, output_csv),
clean_csv,
load_csv,
)
dag(input_csv)
đ€ Evaluation paresseuse¶
- Streaming
- Générateurs
map
itertools.tee
list
In [10]:
def list_files() -> Iterable[Path]:
return Path().glob("*.png")
def open_file(filepath: Path) -> bytes:
return filepath.read_bytes()
def process_data(data: bytes) -> int:
return len(data)
In [11]:
def streaming_imperative() -> None:
files = list_files()
files_bytes = map(open_file, files)
files_len = map(process_data, files_bytes)
print(list(files_len))
In [12]:
def streaming_functional() -> None:
files_len = map(process_data, map(open_file, list_files()))
print(list(files_len))
In [13]:
def streaming_multiple() -> None:
files1, files2 = itertools.tee(list_files(), 2)
list(map(process_data, map(open_file, files1)))
list(map(print, files2))
đ Avantages¶
- Fonctions Python pures
- Graphes de traitement avec style fonctionnel
- Force une conception générique
- Unités de traitement paramétrables et réutilisables
- Données volumineuses bénéficient du streaming avec les générateurs
- Traitement des générateurs par évaluation paresseuse
- Test facilité
- Traitement concurrent / parallĂšle possible (
concurrent.futures
)
đ Limitations¶
- Courbe d'apprentissage
- Balance de performance
- Composition de fonctions
- Matérialisation des générateurs
- Résultats intermédiaires
- Introspection