Source code for chemtorch.components.data_pipeline.simple_data_pipeline
from typing import Optional
import pandas as pd
from chemtorch.components.data_pipeline.column_mapper.abstract_column_mapper import AbstractColumnMapper
from chemtorch.components.data_pipeline.data_source.abstract_data_source import AbstractDataSource
from chemtorch.components.data_pipeline.data_splitter.abstract_data_splitter import AbstractDataSplitter
from chemtorch.utils import DataSplit
[docs]
class SimpleDataPipeline:
"""
A simple data pipeline that orchestrates data loading, column mapping,
and data splitting.
The ingestion process is as follows:
1. Load data using the `data_source`. This can result in a single
DataFrame or an already split `DataSplit` object.
2. Apply column transformations (filtering, renaming) using the `column_mapper`.
This mapper can operate on both single DataFrames and `DataSplit` objects.
3. If the data after mapping is a single DataFrame, split it using the
`data_splitter`. If it's already a `DataSplit`, this step is skipped.
"""
[docs]
def __init__(
self,
data_source: AbstractDataSource,
column_mapper: AbstractColumnMapper,
data_splitter: Optional[AbstractDataSplitter] = None,
):
"""
Initializes the SimpleDataPipeline.
Args:
data_source (DataSource): The component responsible for loading the initial data.
column_mapper (ColumnMapper): The component for transforming columns.
It should handle both pd.DataFrame and DataSplit inputs.
data_splitter (Optional[AbstractDataSplitter]): The component for splitting a single DataFrame
into train, validation, and test sets.
This is not used if data_source already provides split data.
"""
self.data_source = data_source
self.column_mapper = column_mapper
self.data_splitter = data_splitter
def __call__(self) -> pd.DataFrame | DataSplit:
"""
Executes the data ingestion pipeline with validation.
Returns:
pd.DataFrame | DataSplit: The final processed data, either as a single DataFrame
or a DataSplit object containing train, validation, and test sets.
Raises:
ValueError: If there is a configuration mismatch, such as:
- A `data_splitter` is provided for a pre-split dataset.
TypeError: If the column mapper returns an unexpected type.
"""
# load data
raw_data = self.data_source.load()
# map columns
processed_data = self.column_mapper(raw_data)
if isinstance(processed_data, pd.DataFrame):
# case: data is a single DataFrame, so a splitter is required
if self.data_splitter:
return self.data_splitter(processed_data)
else:
return processed_data
elif isinstance(processed_data, DataSplit):
# case: data is already split. a splitter is redundant
if self.data_splitter is not None:
raise ValueError(
"The data is already split (presplit dataset), but a 'data_splitter' "
"was also provided. Please provide one or the other, not both."
)
return processed_data
else:
# case: the data is of an unexpected type
raise TypeError(
f"The data after column mapping has an unexpected type: {type(processed_data).__name__}. "
f"Expected a pandas DataFrame or a DataSplit object."
)