Dfp Azure Training
Digital Finger Printing (DFP) with Morpheus - Azure Training
Introduction
In this notebook, we will be building and running a DFP pipeline that performs training on Azure logs. The goal is to train an autoencoder PyTorch model to recogize the patterns of users in the sample data. The model will then be used by a second Morpheus pipeline to generate anomaly scores for each individual log. These anomaly scores can be used by security teams to detect abnormal behavior when it happens so the proper action can be taken.
High Level Configuration
The following options significantly alter the functionality of the pipeline. These options are separated from the individual stage options since they may effect more than one stage. Additionally, the matching python script to this notebook, dfp_azure_pipeline.py, configures these options via command line arguments.
Options
| Name | Type | Description |
|---|---|---|
train_users | One of ["all", "generic", "individual"] | This indicates which users to train for this pipeline:
|
skip_users | List of strings | Any user in this list will be dropped from the pipeline. Useful for debugging to remove automated accounts with many logs. |
cache_dir | string | The location to store cached files. To aid with development and reduce bandwidth, the Morpheus pipeline will cache data from several stages of the pipeline. This option configures the location for those caches. |
input_files | List of strings | List of files to process. Can specify multiple arguments for multiple files. Also accepts glob (*) wildcards and schema prefixes such as s3://. For example, to make a local cache of an s3 bucket, use filecache::s3://mybucket/*. Refer to fsspec documentation for list of possible options. |
model_name_formatter | string | A format string to use when building the model name. The model name is constructed by calling model_name_formatter.format(user_id=user_id). For example, with model_name_formatter="my_model-{user_id}" and a user ID of "first:last" would result in the model name of "my_model-first:last". This should match the value used in DFPMLFlowModelWriterStage. Available keyword arguments: user_id, user_md5. |
experiment_name_formatter | string | A format string (without the f) that will be used when creating an experiment in ML Flow. Available keyword arguments: user_id, user_md5, reg_model_name. |
Set MLFlow Tracking URI
Set MLFlow tracking URI to make inference calls.
Global Config Object
Before creating the pipeline, we need to setup logging and set the parameters for the Morpheus config object. This config object is responsible for the following:
- Indicating whether to use C++ or Python stages
- C++ stages are not supported for the DFP pipeline. This should always be
False
- C++ stages are not supported for the DFP pipeline. This should always be
- Setting the number of threads to use in the pipeline. Defaults to the thread count of the OS.
- Sets the feature column names that will be used in model training
- This option allows extra columns to be used in the pipeline that will not be part of the training algorithm.
- The final features that the model will be trained on will be an intersection of this list with the log columns.
- The column name that indicates the user's unique identifier
- It is required for DFP to have a user ID column
- The column name that indicates the timestamp for the log
- It is required for DFP to know when each log occurred
====Pipeline Pre-build====
====Pre-Building Segment: linear_segment_0====
====Pre-Building Segment Complete!====
====Pipeline Pre-build Complete!====
====Registering Pipeline====
====Building Pipeline====
====Building Pipeline Complete!====
====Registering Pipeline Complete!====
====Starting Pipeline====
====Building Segment: linear_segment_0====
Added source: <from-multi-file-0; MultiFileSource(filenames=['/workspace/examples/data/dfp/azure-training-data/AZUREAD_2022-08-0*.json'], watch=False, watch_interval=1.0)>
โโ> fsspec.OpenFiles
Added stage: <dfp-file-batcher-1; DFPFileBatcherStage(date_conversion_func=functools.partial(<function date_extractor at 0x7fd60dd7a710>, filename_regex=re.compile('(?P<year>\\d{4})-(?P<month>\\d{1,2})-(?P<day>\\d{1,2})(?:T(?P<hour>\\d{1,2})(?::|_|\\.)(?P<minute>\\d{1,2})(?::|_|\\.)(?P<second>\\d{1,2})(?:\\.(?P<microsecond>\\d{0,6}))?)?(?P<zulu>Z)?')), period=D, sampling_rate_s=None, start_time=None, end_time=None, sampling=None)>
โโ fsspec.OpenFiles -> Tuple[fsspec.core.OpenFiles, int]
Added stage: <dfp-file-to-df-2; DFPFileToDataFrameStage(schema=DataFrameInputSchema(json_columns=['properties'], column_info=[DateTimeColumn(name='timestamp', dtype='datetime64[ns]', input_name='time'), RenameColumn(name='username', dtype='str', input_name='properties.userPrincipalName'), RenameColumn(name='appDisplayName', dtype='str', input_name='properties.appDisplayName'), ColumnInfo(name='category', dtype='str'), RenameColumn(name='clientAppUsed', dtype='str', input_name='properties.clientAppUsed'), RenameColumn(name='deviceDetailbrowser', dtype='str', input_name='properties.deviceDetail.browser'), RenameColumn(name='deviceDetaildisplayName', dtype='str', input_name='properties.deviceDetail.displayName'), RenameColumn(name='deviceDetailoperatingSystem', dtype='str', input_name='properties.deviceDetail.operatingSystem'), StringCatColumn(name='location', dtype='str', input_columns=['properties.location.city', 'properties.location.countryOrRegion'], sep=', '), RenameColumn(name='statusfailureReason', dtype='str', input_name='properties.status.failureReason')], preserve_columns=None, row_filter=None), filter_null=True, file_type=FileTypes.JSON, parser_kwargs={'lines': False, 'orient': 'records'}, cache_dir=/workspace/.cache/dfp)>
โโ Tuple[fsspec.core.OpenFiles, int] -> pandas.DataFrame
Added stage: <dfp-split-users-3; DFPSplitUsersStage(include_generic=True, include_individual=True, skip_users=[], only_users=None)>
โโ pandas.DataFrame -> dfp.DFPMessageMeta
Added stage: <dfp-rolling-window-4; DFPRollingWindowStage(min_history=300, min_increment=300, max_history=60d, cache_dir=/workspace/.cache/dfp)>
โโ dfp.DFPMessageMeta -> dfp.MultiDFPMessage
Added stage: <dfp-preproc-5; DFPPreprocessingStage(input_schema=DataFrameInputSchema(json_columns=[], column_info=[ColumnInfo(name='timestamp', dtype='datetime64[ns]'), ColumnInfo(name='username', dtype='str'), ColumnInfo(name='appDisplayName', dtype='str'), ColumnInfo(name='clientAppUsed', dtype='str'), ColumnInfo(name='deviceDetailbrowser', dtype='str'), ColumnInfo(name='deviceDetaildisplayName', dtype='str'), ColumnInfo(name='deviceDetailoperatingSystem', dtype='str'), ColumnInfo(name='statusfailureReason', dtype='str'), IncrementColumn(name='logcount', dtype='int', input_name='timestamp', groupby_column='username', period='D'), DistinctIncrementColumn(name='locincrement', dtype='int', input_name='location', groupby_column='username', period='D', timestamp_column='timestamp'), DistinctIncrementColumn(name='appincrement', dtype='int', input_name='appDisplayName', groupby_column='username', period='D', timestamp_column='timestamp')], preserve_columns=re.compile('(_batch_id)'), row_filter=None))>
โโ dfp.MultiDFPMessage -> dfp.MultiDFPMessage
Added stage: <dfp-training-6; DFPTraining(model_kwargs=None, epochs=30, validation_size=0.1)>
โโ dfp.MultiDFPMessage -> morpheus.MultiAEMessage
Added stage: <dfp-mlflow-model-writer-7; DFPMLFlowModelWriterStage(model_name_formatter=DFP-azure-{user_id}, experiment_name_formatter=dfp/azure/training/{reg_model_name}, databricks_permissions=None, timeout=1.0)>
โโ morpheus.MultiAEMessage -> morpheus.MultiAEMessage
====Pipeline Started====
====Building Segment Complete!====
Creating dask cluster...
Creating dask cluster... Done. Dashboard: http://127.0.0.1:8787/status
S3 objects to DF complete. Rows: 88, Cache: miss, Duration: 7419.959545135498 ms, Rate: 11.859902936760959 rows/s
Batch split users complete. Input: 88 rows from 2022-08-01 00:03:56.207532 to 2022-08-01 23:54:11.248402. Output: 20 users, rows/user min: 1, max: 88, avg: 8.80. Duration: 6.49 ms
S3 objects to DF complete. Rows: 110, Cache: miss, Duration: 568.488597869873 ms, Rate: 193.4955255253492 rows/s
Batch split users complete. Input: 110 rows from 2022-08-02 00:03:57.781586 to 2022-08-02 23:58:42.803775. Output: 19 users, rows/user min: 1, max: 110, avg: 11.58. Duration: 6.61 ms
S3 objects to DF complete. Rows: 97, Cache: miss, Duration: 739.6926879882812 ms, Rate: 131.13553990077668 rows/s
Batch split users complete. Input: 97 rows from 2022-08-03 00:10:42.770060 to 2022-08-03 23:23:43.932133. Output: 16 users, rows/user min: 1, max: 97, avg: 12.12. Duration: 7.85 ms
S3 objects to DF complete. Rows: 126, Cache: miss, Duration: 551.0175228118896 ms, Rate: 228.66786405813593 rows/s
Batch split users complete. Input: 126 rows from 2022-08-04 00:47:51.564611 to 2022-08-04 23:50:26.072379. Output: 21 users, rows/user min: 1, max: 126, avg: 12.00. Duration: 6.17 ms
Rolling window complete for generic_user in 41.16 ms. Input: 126 rows from 2022-08-04 00:47:51.564611 to 2022-08-04 23:50:26.072379. Output: 421 rows from 2022-08-01 00:03:56.207532 to 2022-08-04 23:50:26.072379
S3 objects to DF complete. Rows: 109, Cache: miss, Duration: 543.3297157287598 ms, Rate: 200.6148326597598 rows/s
Batch split users complete. Input: 109 rows from 2022-08-05 00:14:48.503160 to 2022-08-05 23:45:07.826898. Output: 16 users, rows/user min: 1, max: 109, avg: 13.62. Duration: 5.29 ms
S3 objects to DF complete. Rows: 107, Cache: miss, Duration: 774.3010520935059 ms, Rate: 138.18914453325385 rows/s
Batch split users complete. Input: 107 rows from 2022-08-06 00:08:48.348649 to 2022-08-06 23:53:00.392382. Output: 17 users, rows/user min: 1, max: 107, avg: 12.59. Duration: 4.40 ms
Preprocessed 421 data for logs in 2022-08-01 00:03:56.207532 to 2022-08-04 23:50:26.072379 in 1860.1598739624023 ms
Training AE model for user: 'generic_user'...
S3 objects to DF complete. Rows: 107, Cache: miss, Duration: 775.076150894165 ms, Rate: 138.0509513504701 rows/s
Batch split users complete. Input: 107 rows from 2022-08-07 00:00:23.959795 to 2022-08-07 23:56:24.809043. Output: 17 users, rows/user min: 1, max: 107, avg: 12.59. Duration: 4.35 ms
Rolling window complete for generic_user in 13.92 ms. Input: 107 rows from 2022-08-07 00:00:23.959795 to 2022-08-07 23:56:24.809043. Output: 744 rows from 2022-08-01 00:03:56.207532 to 2022-08-07 23:56:24.809043
S3 objects to DF complete. Rows: 119, Cache: miss, Duration: 679.426908493042 ms, Rate: 175.14761118887697 rows/s
Batch split users complete. Input: 119 rows from 2022-08-08 00:16:13.439012 to 2022-08-08 23:58:43.815912. Output: 18 users, rows/user min: 1, max: 119, avg: 13.22. Duration: 6.34 ms
S3 objects to DF complete. Rows: 102, Cache: miss, Duration: 906.3472747802734 ms, Rate: 112.53964439262859 rows/s
Batch split users complete. Input: 102 rows from 2022-08-09 00:23:17.790393 to 2022-08-09 23:59:49.626250. Output: 16 users, rows/user min: 2, max: 102, avg: 12.75. Duration: 4.25 ms
Preprocessed 744 data for logs in 2022-08-01 00:03:56.207532 to 2022-08-07 23:56:24.809043 in 1868.3342933654785 ms
Training AE model for user: 'generic_user'... Complete.
Training AE model for user: 'generic_user'...
Training AE model for user: 'generic_user'... Complete.
Successfully registered model 'DFP-azure-generic_user'.
ML Flow model upload complete: generic_user:DFP-azure-generic_user:1
ML Flow model upload complete: generic_user:DFP-azure-generic_user:2
====Pipeline Complete====
Pipeline Construction
From this point on we begin constructing the stages that will make up the pipeline. To make testing easier, constructing the pipeline object, adding the stages, and running the pipeline, is provided as a single cell. The below cell can be rerun multiple times as needed for debugging.
Source Stage (MultiFileSource)
This pipeline read input logs from one or more input files. This source stage will construct a list of files to be processed and pass to downstream stages. It is capable of reading files from many different source types, both local and remote. This is possible by utilizing the fsspec library (similar to pandas). Refer to the fsspec documentation for more information on the supported file types. Once all of the logs have been read, the source completes.
| Name | Type | Default | Description |
|---|---|---|---|
filenames | List of strings | Any files to read into the pipeline. All files will be combined into a single DataFrame |
File Batcher Stage (DFPFileBatcherStage)
To improve performance, multiple small input files can be batched together into a single DataFrame for processing. This stage is responsible for determining the timestamp of input files, grouping input files into batches by time, and sending the batches to be processed into a single DataFrame. Repeated batches of files will be loaded from cache resulting in increased performance. For example, when performaing a 60 day training run, 59 days can be cached with a period of "D" and retraining once per day.
| Name | Type | Default | Description |
|---|---|---|---|
period | str | "D" | The period to create batches. Refer to pandas windowing frequency documentation for available options. |
date_conversion_func | Function of typing.Callable[[fsspec.core.OpenFile], datetime] | A callback which is responsible for determining the date for a specified file. |
File to DataFrame Stage (DFPFileToDataFrameStage)
After files have been batched into groups, this stage is responsible for reading the files and converting into a DataFrame. The specified input schema converts the raw DataFrame into one suitable for caching and processing. Any columns that are not needed should be excluded from the schema.
| Name | Type | Default | Description |
|---|---|---|---|
schema | DataFrameInputSchema | After the raw DataFrame is read from each file, this schema will be applied to ensure a consisten output from the source. | |
file_type | FileTypes | FileTypes.Auto | Allows overriding the file type. When set to Auto, the file extension will be used. Options are CSV, JSON, PARQUET, Auto. |
parser_kwargs | dict | {} | This dictionary will be passed to the DataFrame parser class. Allows for customization of log parsing. |
cache_dir | str | ./.cache/dfp | The location to write cached input files to. |
Split Users Stage (DFPSplitUsersStage)
Once the input logs have been read into a DataFrame, this stage is responsible for breaking that single DataFrame with many users into multiple DataFrames for each user. This is also where the pipeline chooses whether to train individual users or the generic user (or both).
| Name | Type | Default | Description |
|---|---|---|---|
include_generic | bool | Whether or not to combine all user logs into a single DataFrame with the username 'generic_user' | |
include_individual | bool | Whether or not to output individual DataFrame objects for each user | |
skip_users | List of str | [] | Any users to remove from the DataFrame. Useful for debugging to remove automated accounts with many logs. Mutually exclusive with only_users. |
only_users | List of str | [] | Only allow these users in the final DataFrame. Useful for debugging to focus on specific users. Mutually exclusive with skip_users. |
Rolling Window Stage (DFPRollingWindowStage)
The Rolling Window Stage performs several key pieces of functionality for DFP.
- This stage keeps a moving window of logs on a per user basis
- These logs are saved to disk to reduce memory requirements between logs from the same user
- It only emits logs when the window history requirements are met
- Until all of the window history requirements are met, no messages will be sent to the rest of the pipeline.
- Configuration options for defining the window history requirements are detailed below.
- It repeats the necessary logs to properly calculate log dependent features.
- To support all column feature types, incoming log messages can be combined with existing history and sent to downstream stages.
- For example, to calculate a feature that increments a counter for the number of logs a particular user has generated in a single day, we must have the user's log history for the past 24 hours. To support this, this stage will combine new logs with existing history into a single
DataFrame. - It is the responsibility of downstream stages to distinguish between new logs and existing history.
| Name | Type | Default | Description |
|---|---|---|---|
min_history | int | 300 | The minimum number of logs a user must have before emitting any messages. Logs below this threshold will be saved to disk. |
min_increment | int or str | 300 | Once the min history requirement is met, this stage must receive min_increment new logs before emmitting another message. Logs received before this threshold is met will be saved to disk. Can be specified as an integer count or a string duration. |
max_history | int or str | "60d" | Once min_history and min_increment requirements have been met, this puts an upper bound on the maximum number of messages to forward into the pipeline and also the maximum amount of messages to retain in the history. Can be specified as an integer count or a string duration. |
cache_dir | str | ./.cache/dfp | The location to write log history to disk. |
Preprocessing Stage (DFPPreprocessingStage)
This stage performs the final, row dependent, feature calculations as specified by the input schema object. Once calculated, this stage can forward on all received logs, or optionally can only forward on new logs, removing any history information.
| Name | Type | Default | Description |
|---|---|---|---|
input_schema | DataFrameInputSchema | The final, row dependent, schema to apply to the incoming columns |
Training Stage (DFPTraining)
This stage is responsible for performing the actual training calculations. Training will be performed on all received data. Resulting message will contain the input data paired with the trained model.
| Name | Type | Default | Description |
|---|---|---|---|
model_kwargs | dict | {} | The options to use when creating a new model instance. Refer to DFPAutoEncoder for information on the available options. |
MLFlow Model Writer Stage (DFPMLFlowModelWriterStage)
This stage is the last step in training. It will upload the trained model from the previous stage to MLFlow. The tracking URI for which MLFlow instance to use is configured using the static method mlflow.set_tracking_uri().
| Name | Type | Default | Description |
|---|---|---|---|
model_name_formatter | str | "" | A format string to use when building the model name. The model name is constructed by calling model_name_formatter.format(user_id=user_id). For example, with model_name_formatter="my_model-{user_id}" and a user ID of "first:last" would result in the model name of "my_model-first:last" |
experiment_name | str | All models are created inside of an experiment to allow metrics to be saved with each model. This option specifies the experiment name. The final experiment name for each model will be in the form of {experiment_name}/{model_name} |
/opt/conda/envs/morpheus/lib/python3.10/site-packages/merlin/dtypes/mappings/tf.py:52: UserWarning: Tensorflow dtype mappings did not load successfully due to an error: No module named 'tensorflow'
warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")
2024/08/27 03:40:53 INFO mlflow.tracking.fluent: Experiment with name 'dfp/azure/training/DFP-azure-generic_user' does not exist. Creating a new experiment.
2024/08/27 03:40:56 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: DFP-azure-generic_user, version 1
2024/08/27 03:40:56 INFO mlflow.tracking._tracking_service.client: ๐ View run autoencoder model training run at: http://mlflow:5000/#/experiments/1/runs/c275f703344f40d2aa391c59cf5e2144.
2024/08/27 03:40:56 INFO mlflow.tracking._tracking_service.client: ๐งช View experiment at: http://mlflow:5000/#/experiments/1.
2024/08/27 03:40:56 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: DFP-azure-generic_user, version 2
2024/08/27 03:40:56 INFO mlflow.tracking._tracking_service.client: ๐ View run autoencoder model training run at: http://mlflow:5000/#/experiments/1/runs/83636971f2d1433ea7316af1bfb56c3a.
2024/08/27 03:40:56 INFO mlflow.tracking._tracking_service.client: ๐งช View experiment at: http://mlflow:5000/#/experiments/1.