79511637

Date: 2025-03-15 19:09:45
Score: 0.5
Natty:
Report link

I've come back to this thread a lot and used adaptations of @Adib's code several times, but I recently got a version that extends functionality to be more complete, as I believe ColumnTransformer and/or Pipeline will throw an error if you try to use get_feature_names_out().

from sklearn.base import BaseEstimator, TransformerMixin
import numpy as np
import pandas as pd
from scipy import sparse

class MultiHotEncoder(BaseEstimator, TransformerMixin):
    """
    A custom transformer that encodes columns containing lists of categorical values
    into a multi-hot encoded format, compatible with ColumnTransformer.
    
    Parameters:
    -----------
    classes : list or None, default=None
        List of all possible classes. If None, classes will be determined from the data.
    sparse_output : bool, default=False
        If True, return a sparse matrix, otherwise return a dense array.
    """
    def __init__(self, classes=None, sparse_output=False):
        self.classes = classes
        self.sparse_output = sparse_output
        
    def fit(self, X, y=None):
        """
        Fit the transformer by determining all possible classes.
        
        Parameters:
        -----------
        X : array-like of shape (n_samples, n_features)
            Columns containing lists of values.
        
        Returns:
        --------
        self : object
            Returns self.
        """
        # Handle DataFrame input properly
        if isinstance(X, pd.DataFrame):
            X_processed = X.values
        else:
            X_processed = np.asarray(X)
            
        # Collect all unique classes
        if self.classes is None:
            unique_classes = set()
            for col in X_processed.T:
                for row in col:
                    if row is not None and hasattr(row, '__iter__') and not isinstance(row, (str, bytes)):
                        unique_classes.update(row)
            self.classes_ = sorted(list(unique_classes))
        else:
            self.classes_ = sorted(list(self.classes))
            
        # Create a dictionary for fast lookup
        self.class_dict_ = {cls: i for i, cls in enumerate(self.classes_)}
        return self
    
    def transform(self, X):
        """
        Transform lists to multi-hot encoding.
        
        Parameters:
        -----------
        X : array-like of shape (n_samples, n_features)
            Columns containing lists of values.
        
        Returns:
        --------
        X_transformed : array of shape (n_samples, n_features * n_classes)
            Transformed array with multi-hot encoding.
        """
        # Handle DataFrame input properly
        if isinstance(X, pd.DataFrame):
            X_processed = X.values
        else:
            X_processed = np.asarray(X)
            
        n_samples, n_features = X_processed.shape
        n_classes = len(self.classes_)
        
        # Initialize the output array
        if self.sparse_output:
            rows = []
            cols = []
            for j, col in enumerate(X_processed.T):
                for i, row in enumerate(col):
                    if row is None:
                        continue
                    if not hasattr(row, '__iter__') or isinstance(row, (str, bytes)):
                        continue
                    for item in row:
                        if item in self.class_dict_:
                            rows.append(i)
                            cols.append(j * n_classes + self.class_dict_[item])
            data = np.ones(len(rows), dtype=int)
            result = sparse.csr_matrix((data, (rows, cols)), shape=(n_samples, n_features * n_classes))
        else:
            result = np.zeros((n_samples, n_features * n_classes), dtype=int)
            for j, col in enumerate(X_processed.T):
                for i, row in enumerate(col):
                    if row is None:
                        continue
                    if not hasattr(row, '__iter__') or isinstance(row, (str, bytes)):
                        continue
                    for item in row:
                        if item in self.class_dict_:
                            result[i, j * n_classes + self.class_dict_[item]] = 1
        
        return result
    
    def fit_transform(self, X, y=None):
        return self.fit(X).transform(X)
    
    def get_feature_names_out(self, input_features=None):
        """
        Get output feature names for transformation.
        
        Parameters:
        -----------
        input_features : array-like of str or None, default=None
            Input features. Used as a prefix for output feature names.
        
        Returns:
        --------
        feature_names_out : ndarray of str objects
            Array of output feature names.
        """
        if input_features is None:
            input_features = [""]
        
        feature_names_out = []
        for feature in input_features:
            feature_names_out.extend([f"{feature}_{cls}" for cls in self.classes_])
        
        return np.array(feature_names_out)
Reasons:
  • Long answer (-1):
  • Has code block (-0.5):
  • User mentioned (1): @Adib's
  • Low reputation (1):
Posted by: CarlosCoggtana