Source code for unistat.formula_regression

import ast
import re
from abc import ABC, abstractmethod
from collections import namedtuple
import numpy as np
import pandas as pd
import patsy
import statsmodels.api as sm
from statsmodels.stats.outliers_influence import variance_inflation_factor


# For functions giving separate results for Patsy formula LHS & RHS
FormulaSides = namedtuple('FormulaSides', ['lhs', 'rhs'])


[docs] class FormulaRegression(ABC): r"""Abstract base class for formula-based regression statistics. Provides common functionality for regression models, including data preparation, standardization, VIF calculation, and properties for regression results. Uses Wilkinson formulae (akin to R-style formulae), which are implemented in `statsmodels <statsmodels-homepage_>`_ via the ``patsy`` package. Parameters ---------- formula : str ``statsmodels``/``patsy``/``R``-style formula defining the regression model. data : pd.DataFrame Data for the model. ``data`` must contain (at a minimum) all variables referenced by ``formula``. Attributes ---------- data : pd.DataFrame Input DataFrame after filtering for columns in formula, dropping NaNs, and converting all columns to `float`. """ def __init__(self, formula: str, data: pd.DataFrame): self.formula = formula self._formula_cols = self.extract_formula_cols(self.formula) self.data = data[self._formula_cols.lhs + self._formula_cols.rhs] self.data = self._standardize_dtypes() self._coerce_occult_bools() def __str__(self): """ String representation of the regression results. Returns ------- str Formatted string with VIF (if applicable) and regression summaries. """ if len(self.X.columns) >= 2: print_string = (f'{str(self.vif_matrix())}\n' f'{self.reg.summary2().as_text()}\n') else: print_string = f'{self.reg.summary2().as_text()}\n' if hasattr(self, 'std_reg'): print_string += f'{self.std_reg.summary2().as_text()}\n' return print_string @property def _X_num_names(self) -> list[str]: return ( self.data[self._formula_cols.rhs] .select_dtypes(include='number') .columns.tolist() ) def _standardize_dtypes(self): df = self.data.dropna() X_bool_cols: list[str] = ( df[self._formula_cols.rhs] .select_dtypes(include=['bool', 'boolean']) .columns.tolist() ) X_num_cols: list[str] = self._X_num_names # Ensure y is float for col in self._formula_cols.lhs: df[col] = df[col].astype(float) # Ensure all bools are boolean, nums are floats df[X_bool_cols] = df[X_bool_cols].astype(bool) df[X_num_cols] = df[X_num_cols].astype(float) return df def _coerce_occult_bools(self) -> None: for col in self._X_num_names: # If only values in column are 0 & 1: # - coerce column to bool # - edit formula to add C() if self.data[col].astype(float).isin([0, 1]).all(): self.data[col] = self.data[col].astype(bool) self.formula = self.formula.replace( col, f'C({col}, Treatment(False))' ) @staticmethod def _extract_cols_from_termlist(termlist: list[str]) -> list[str]: """Extract unique column names from a patsy termlist. `termlist` can derive from the left- (LHS) or right-hand (RHS) side of a `patsy` formula, but not both. This function parses each term in the termlist using patsy, then processes each factor to identify underlying column names, handling special cases like C(), Q(), I(), and function calls (e.g., np.log). Parameters ---------- termlist : list[str] `patsy`-derived list of terms from LHS/RHS of a `patsy` formula. Returns ------- list[str] Sorted list of unique column names extracted from the formula. Examples -------- >>> extract_cols_from_termlist([ ... 'ca_grams_4h', 'C(ca_gluconate_bool)', 'ca_grams_4h', ... 'C(ca_gluconate_bool)', 'I(age ** 2)', 'np.log(iss)', 'C(moi_pen)', ... 'C(sex_female)', 'mortality_24h' ... ]) [ 'age', 'ca_gluconate_bool', 'ca_grams_4h', 'iss', 'moi_pen', 'mortality_24h', 'sex_female', ] """ cols = set() for term in termlist: for factor in term.factors: code = factor.code.strip() if code.startswith('C(') or code.startswith('Q('): inner = code[code.find('(') + 1:code.rfind(')')].strip() if ',' in inner: inner = inner.split(',', 1)[0].strip() if inner.startswith(("'", '"')) and inner.endswith(("'", '"')): inner = inner[1:-1] cols.add(inner) elif code.startswith('I('): expr = code[2:-1].strip() tree = ast.parse(expr) for node in ast.walk(tree): if isinstance(node, ast.Name): cols.add(node.id) elif '.' in code: # e.g., np.log(col) func, arg = code.rsplit('.', 1) if arg.endswith(')'): arg = arg[arg.find('(') + 1: arg.rfind(')')].strip() cols.add(arg) else: cols.add(code) return list(cols)
[docs] @staticmethod def extract_formula_cols(formula: str) -> FormulaSides: """Extract unique column names from a `patsy`/`R` formula string. This function parses the formula using `patsy`, then processes each factor to identify underlying column names, handling special cases like C(), Q(), I(), and function calls (e.g., np.log). Parameters ---------- formula : str The patsy-compatible formula string (e.g., 'y ~ x + C(cat) + np.log(z)'). Returns ------- list[str] Sorted list of unique column names extracted from the formula. Examples -------- >>> extract_columns_from_formula( ... 'mortality_24h ~ ca_grams_4h*C(ca_gluconate_bool) + I(age**2) ' ... '+ np.log(iss) + C(moi_pen) + C(sex_female)' ... ) [ 'age', 'ca_gluconate_bool', 'ca_grams_4h', 'iss', 'moi_pen', 'mortality_24h', 'sex_female' ] """ # Use Patsy ModelDesc to structure the formula model_desc = patsy.ModelDesc.from_formula(formula) result = FormulaSides( lhs=FormulaRegression._extract_cols_from_termlist( model_desc.lhs_termlist ), rhs=FormulaRegression._extract_cols_from_termlist( model_desc.rhs_termlist ) ) return result
@abstractmethod def _run_regression(self, standardize: bool = False): if not standardize: model = sm.OLS.from_formula(self.formula, self.data) self._reg_cache = model.fit() return self._reg_cache elif self._formula_std is None or self._data_std is None: return None else: model = sm.OLS.from_formula(self._formula_std, self._data_std) self._std_reg_cache = model.fit() return self._std_reg_cache @property def reg(self): r"""Fitted `statsmodels <statsmodels-homepage_>`_ regression model. Returns ------- Fitted model. """ # Run regression if never done prior (which defines self._reg_cache) if not hasattr(self, '_reg_cache'): self._run_regression(standardize=False) return self._reg_cache @property def std_reg(self): r"""Fitted ``statsmodels` standardized regression model. Returns ------- Fitted model. """ # Run regression if never done prior (which defines self._reg_cache) if not hasattr(self, '_std_reg_cache'): self._run_regression(standardize=True) return self._std_reg_cache @property def _X_num_std_names(self) -> list[str]: """ Lazily computed list of standardized numeric predictor column names. Returns the original numeric RHS column names with '_std' appended. Empty list if no numeric predictors. Cached for efficiency; invalidate with ``del self._X_num_std_names_cache``. """ if not hasattr(self, '_X_num_std_names_cache'): num_cols = self._X_num_names self._X_num_std_names_cache = ( [col + '_std' for col in num_cols] if num_cols else [] ) return self._X_num_std_names_cache @property def _data_std(self) -> pd.DataFrame | None: """ Lazily computed version of ``self.data`` with numeric RHS columns standardized. Numeric columns are centered and scaled (ddof=1) via ``patsy.standardize``, renamed with '_std' suffix, and appended after non-numeric columns. Non-numeric columns are unchanged. Returns ``None`` if no numeric predictors. Cached for efficiency; invalidate with ``del self._data_std_cache``. """ if not hasattr(self, '_data_std_cache'): num_cols = self._X_num_names std_names = self._X_num_std_names if not num_cols: self._data_std_cache = None else: std_df = self.data.copy() # Standardize only numeric RHS columns standardized = patsy.standardize( self.data[num_cols], center=True, rescale=True, ddof=1 ) # Rename to *_std standardized = standardized.rename( columns=dict(zip(num_cols, std_names)) ) # Drop original numerics and append standardized versions std_df = std_df.drop(columns=num_cols) std_df = pd.concat([std_df, standardized], axis=1) self._data_std_cache = std_df return self._data_std_cache @property def _formula_std(self) -> str | None: """ Lazily computed formula where numeric RHS columns are replaced with their _std versions. Returns None if no numeric columns to standardize. """ if not hasattr(self, '_formula_std_cache'): if not self._X_num_names: self._formula_std_cache = None else: f = self.formula for orig, std in zip(self._X_num_names, self._X_num_std_names): f = re.sub(r'\b' + re.escape(orig) + r'\b', std, f) self._formula_std_cache = f return self._formula_std_cache @property def _dmatrices(self): dmatrices = patsy.highlevel.dmatrices( self.formula, self.data, return_type='dataframe' ) # Make LHS a series if only single outcome column lhs = ( dmatrices[0].iloc[:, 0] if len(dmatrices[0]) == 1 else dmatrices[0] ) return FormulaSides( lhs=lhs, rhs=dmatrices[1] ) @property def X(self): r"""Predictor ``pd.DataFrame`` used in model. Has ``patsy`` transformations applied. Includes ``'Intercept'`` column. Returns ------- pd.DataFrame """ return self._dmatrices.rhs @property def y(self): r"""Outcome ``pd.Series`` used in model. Has ``patsy`` transformations applied. Includes ``'Intercept'`` column. Returns ------- pd.Series """ return self._dmatrices.lhs @property def _dmatrices_std(self): dmatrices = patsy.highlevel.dmatrices( self._formula_std, self._data_std, return_type='dataframe' ) # Make LHS a series if only single outcome column lhs = ( dmatrices[0].iloc[:, 0] if len(dmatrices[0]) == 1 else dmatrices[0] ) return FormulaSides( lhs=lhs, rhs=dmatrices[1] ) @property def X_std(self): r"""Standardized (Z-scored) version of :attr:X used in model. Has ``patsy`` transformations applied. Includes ``'Intercept'`` column. Returns ------- pd.DataFrame """ return self._dmatrices_std.rhs @property def _y_std(self): return self._dmatrices_std.lhs
[docs] def vif_matrix(self, drop_ints: bool = False): """Calculate variance inflation factors (VIF) for feature DataFrame. Returns ------- pd.Series VIF values for each feature. Raises ------ ValueError If fewer than 2 columns in X. """ if len(self.X) < 2: raise ValueError('VIF requires at least 2 modelled X columns') X = self.X if drop_ints: X = X.drop(columns=[col for col in X.columns if ':' in col]) vif_matrix = pd.Series( [variance_inflation_factor(X.values, i) for i in range(X.shape[1])], name='vif', index=X.columns, ) return vif_matrix
[docs] class FormulaLogit(FormulaRegression): def __init__(self, formula: str, data: pd.DataFrame): super().__init__(formula, data) def __str__(self): """String representation with VIF, summary, and odds ratios. Returns ------- str Formatted results. """ with pd.option_context('display.max_colwidth', None, 'max_colwidth', None, 'display.width', 256): if len(self.X.columns) >= 2: print_string = ( f'{str(self.vif_matrix())}\n' f'{self.reg.summary2().as_text()}\n' f'{self.logit_or().to_string()}\n' ) else: print_string = ( f'{self.reg.summary2().as_text()}\n' f'{self.logit_or().to_string()}\n' ) if hasattr(self, 'std_reg'): print_string += ( f'{self.std_reg.summary2().as_text()}\n' f'{self.logit_or(standardize=True).to_string()}\n' ) return print_string def _run_regression(self, standardize: bool = False): if not standardize: model = sm.Logit.from_formula(self.formula, self.data) self._reg_cache = model.fit() return self._reg_cache elif self._formula_std is None or self._data_std is None: return None else: model = sm.Logit.from_formula(self._formula_std, self._data_std) self._std_reg_cache = model.fit() return self._std_reg_cache
[docs] def logit_or(self, standardize: bool = False): r"""Odds ratios (ORs) for each feature in :attr:`X`. Parameters ---------- standardize : bool, default False Whether crude or standardized ORs calculated. Returns ------- pd.DataFrame ``columns=['OR', '95% CI lower', '95% CI upper']``; ``index`` is each column in :attr:`X`/:attr:`X_std`, including the intercept. """ if not standardize: model = self.reg elif self.std_reg is None: raise ValueError('Standardized logit cannot be run when all ' 'predictors are categorical/boolean.') else: model = self.std_reg output = pd.DataFrame(model.params, columns=['OR']) output[['95% CI lower', '95% CI upper']] = model.conf_int() return output.map(np.exp)
[docs] class FormulaLinReg(FormulaRegression): def __init__(self, formula: str, data: pd.DataFrame): super().__init__(formula, data) def _run_regression(self, standardize: bool = False): if not standardize: model = sm.OLS.from_formula(self.formula, self.data) self._reg_cache = model.fit() return self._reg_cache elif self._formula_std is None or self._data_std is None: return None else: model = sm.OLS.from_formula(self._formula_std, self._data_std) self._std_reg_cache = model.fit() return self._std_reg_cache