Source code for aijack.defense.kanonymity.wrapper

import pandas as pd

from aijack_cpp_core import DataFrame as AnoDataFrame
from aijack_cpp_core import Mondrian as MondrianCore


[docs]def convert_pddataframe_to_anodataframe(pd_df, is_continuous_map): columns = pd_df.columns.tolist() ano_df = AnoDataFrame(columns, is_continuous_map, 0) for col in columns: if is_continuous_map[col]: ano_df.insert_continuous_column(col, pd_df[col].tolist()) else: ano_df.insert_categorical_column(col, pd_df[col].tolist()) return ano_df
[docs]def convert_anodataframe_to_pddataframe(ano_df, columns, is_continuous_map): data_continuous = ano_df.get_data_continuous() data_categorical = ano_df.get_data_categorical() df = pd.DataFrame() for col in columns: if is_continuous_map[col]: df[col] = data_continuous[col] else: df[col] = data_categorical[col] return df
[docs]class Mondrian: """Implementation of K. LeFevre, D. J. DeWitt and R. Ramakrishnan, 'Mondrian Multidimensional K-Anonymity,' 22nd International Conference on Data Engineering (ICDE'06), Atlanta, GA, USA, 2006, pp. 25-25, doi: 10.1109/ICDE.2006.101. Our implementation is based on Nuclearstar/K-Anonymity (https://github.com/Nuclearstar/K-Anonymity) """ def __init__(self, k=3): self.api = MondrianCore(k)
[docs] def get_final_partitions(self): return self.api.get_final_partitions()
[docs] def anonymize( self, df, quasiid_columns, sensitive_column, is_continuous_map, ignore_unused_features=True, ): ano_df = convert_pddataframe_to_anodataframe(df, is_continuous_map) ano_anonymized_df = self.api.anonymize( ano_df, quasiid_columns, sensitive_column ) pd_df_anonymized = convert_anodataframe_to_pddataframe( ano_anonymized_df, quasiid_columns + [sensitive_column], is_continuous_map ) if ignore_unused_features: return pd_df_anonymized else: pd_df_unused_and_sensitive_columns = df[ list(set(df.columns) - set(quasiid_columns + [sensitive_column])) ] idx = sum(self.get_final_partitions(), []) result_df = pd.concat( [ pd_df_anonymized, pd_df_unused_and_sensitive_columns.iloc[idx], ], axis=1, ) result_df.index = idx return result_df