Chapter 3 - Inputs & Outputs

This tutorial is based on example code which can be found in the TRAC GitHub Repository under examples/models/python.

Optional Inputs & Outputs

Optional inputs and outputs provide a way for a model to react to the available data. If an input is marked as optional then it may not be supplied, the model code must check at runtime to see if it is available. When an output is marked as optional the model can choose whether to provide that output or not, for example in response to the input data or a boolean flag supplied as a model parameter.

Here is an example of defining an optional input, using schemas read from schema files:

src/tutorial/optional_io.py
39    def define_inputs(self) -> tp.Dict[str, trac.ModelInputSchema]:
40
41        # Define an optional account filter input, using external schema files
42
43        customer_loans = trac.load_schema(schemas, "customer_loans.csv")
44        account_filter = trac.load_schema(schemas, "account_filter.csv")
45
46        return {
47            "customer_loans": trac.ModelInputSchema(customer_loans),
48            "account_filter": trac.ModelInputSchema(account_filter, optional=True)
49        }

Schemas defined in code can also be marked as optional, let’s use that approach to define an optional output:

51    def define_outputs(self) -> tp.Dict[str, trac.ModelOutputSchema]:
52
53        # Define an optional output for stats on excluded accounts, using schema definitions in code
54
55        profit_by_region = trac.define_output_table(
56            trac.F("region", trac.STRING, label="Customer home region", categorical=True),
57            trac.F("gross_profit", trac.DECIMAL, label="Total gross profit"))
58
59        exclusions = trac.define_output_table(
60            trac.F("reason", trac.STRING, "Reason for exclusion"),
61            trac.F("count", trac.INTEGER, "Number of accounts"),
62            optional=True)
63
64        return {
65            "profit_by_region": profit_by_region,
66            "exclusions": exclusions
67        }

Now let’s see how to use optional inputs and outputs in run_model(). Since the input is optional we will need to check if it is available before we can use it. TRAC provides the has_dataset() method for this purpose. If the optional dataset exists we will use it to apply some filtering to the customer accounts list, then produce the optional output dataset with some stats on the filtered accounts. Here is what that looks like:

77        if ctx.has_dataset("account_filter"):
78
79            # Filter out customer accounts with IDs in the filter set
80            account_filter = ctx.get_pandas_table("account_filter")
81            account_mask = customer_loans['id'].isin(account_filter["account_id"])
82            customer_loans = customer_loans.loc[~account_mask]
83
84            # Create an optional output with some stats about the excluded accounts
85            exclusions = account_filter.groupby(["reason"]).size().to_frame(name="count").reset_index()
86            ctx.put_pandas_table("exclusions", exclusions)

In this example the optional output is only produced when the optional input is supplied - that is not a requirement and the model can decide whether to provide optional outputs based on whatever criteria are appropriate. If an optional output is not going to be produced, then simply do not output the dataset and TRAC will understand it has been omitted. If an optional output is produced then it is subject to all the same validation rules as any other dataset.

See also

Full source code is available for the Optional IO example on GitHub

Dynamic Inputs & Outputs

Dynamic inputs and outputs allow a model to work with data when the schema is not known in advance. This allows one model to work with a wide range of data inputs, which can be very useful if you have common requirements that need to be satisfied across a large data estate. Common examples include scenario generation, post-processing of model outputs, model monitoring and data quality reporting.

Tip

Only use dynamic schemas when they are really needed. Static schemas are more robust and allow the platform to make several optimisations that are not possible with dynamic schemas.

Good to know

TRAC normally performs schema validation before a job is executed, to make sure the models and data are compatible. When models are defined with dynamic schemas validation is delayed until runtime. This means validation errors will be reported when the job executes, rather than before it starts. For jobs with a mix of static and dynamic schemas TRAC will still validate the static schemas up front, validation is only delayed for the dynamic schemas.

Schema Inspection

This example use a dynamic input to get some information about the schema of an unknown dataset. This approach allows for building common reports that can run on a large collection of datasets with varied schemas.

Note

The TRAC metadata store already holds an entry for every dataset on the platform, you don’t need to write a model just to get the schema of a dataset! However, the technique illustrated here can be used to build more detailed reports, such as data quality reports, model monitoring reports etc.

First let’s define the model, it should take a generic dataset as an input and produce some basic information about the schema and content:

src/tutorial/dynamic_io.py
22class DynamicSchemaInspection(trac.TracModel):
23
24    def define_parameters(self) -> tp.Dict[str, trac.ModelParameter]:
25
26        return dict()
27
28    def define_inputs(self) -> tp.Dict[str, trac.ModelInputSchema]:
29
30        return { "source_data": trac.define_input_table(dynamic=True, label="Unknown source dataset") }
31
32    def define_outputs(self) -> tp.Dict[str, trac.ModelOutputSchema]:
33
34        source_data_schema = trac.define_output_table(
35            trac.F("column_name", trac.STRING, label="Column name"),
36            trac.F("column_type", trac.STRING, label="TRAC column type"),
37            trac.F("contains_nulls", trac.BOOLEAN, label="Whether the source column contains nulls"),
38            label="Schema information for the source dataset")
39
40        return { "source_data_schema": source_data_schema }

The source data is defined as a dynamic input. Notice that there are no fields in the schema definition - dynamic inputs or outputs cannot define any fields, doing so will result in a validation error. Since we know what data we want to collect about the incoming dataset, the output schema can be defined as normal. This is a common pattern for inspecting generalised data - the source schema will be dynamic but the expected output is known.

Now let’s see how to use these datasets in the model.

42    def run_model(self, ctx: trac.TracContext):
43
44        source_data = ctx.get_pandas_table("source_data")   # Source data as a regular dataframe
45        source_schema = ctx.get_schema("source_data")       # TRAC schema for the source data
46
47        # Get the column names and types discovered by TRAC
48        columns = source_schema.table.fields
49        column_names = [col.fieldName for col in columns]
50        column_types = [col.fieldType.name for col in columns]
51
52        # Use discovered column names to inspect the data
53        contains_nulls = [source_data[col].isnull().values.any() for col in column_names]
54
55        # Save results as a regular dataframe
56        result = pd.DataFrame({
57            "column_name": column_names,
58            "column_type": column_types,
59            "contains_nulls": contains_nulls})
60
61        ctx.put_pandas_table("source_data_schema", result)

The model gets the source dataset as normal, but it also gets the schema of the dataset using get_schema() which returns aTRAC SchemaDefinition object. The schema will agree precisely with the contents of the dataset, including the field order and casing, so we can use the information in the schema to operate on the dataset. In this example we just perform a simple null check on all the columns and produce the output as a normal Pandas dataframe.

You can use the information held in the schema to look for columns with particular attributes to decide how to process each column. Here are some examples of matching columns based on type, nullability and the categorical flag:

float_columns = [col.fieldName for col in columns if col.fieldType == trac.FLOAT]
nullable_columns = [col.fieldName for col in columns if col.notNull != True]
categorical_columns = [col.fieldName for col in columns if col.categorical]

Note

Calling get_schema() returns a copy of the TRAC schema object. If you want to manipulate it in your model code, for example to add or remove fields, that is perfectly fine and will not cause any adverse effects. This can be useful to create a dynamic output schema based on the contents of a dynamic input. Calling get_schema() a second time at some later point will return a new copy of the schema, without any modifications made by the model code.

Using dynamic inputs locally

TRAC holds schema information for every dataset and passes this information on to models when they run on the platform, which provides the schema for a dynamic input. When models run locally in the IDE, TRAC has to do schema inference so the input files need to hold schema information. Today, the most popular format for storing data files with schema information is Parquet.

Here is a sample job configuration for this model, using a Parquet file as the dynamic input. A small sample data file is included with the tutorial, but you can use any Parquet file and the model will tell you about its schema.

config/dynamic_io.yaml
1job:
2  runModel:
3
4    inputs:
5      # Dynamic inputs for local jobs need a file format with schema information
6      source_data: "inputs/loan_final313_100.parquet"
7
8    outputs:
9      source_data_schema: "outputs/dynamic_io/source_data_schema.csv"

Note

TRAC does not currently allow using CSV files as dynamic inputs when running locally. This is because of the need to do schema inference, which is not reliable for CSV files. You can create Parquet files to test dynamic inputs by running a model with the output defined as a .parquet file in the job configuration.

Data Generation

Another use for dynamic schemas is to generate datasets based on some criteria. In this example the model receives one input which is a list of columns, and produces a output dataset which contains those columns. Depending on the input that is supplied, the schema of the output will be different.

Let’s see how to define this model:

64class DynamicGenerator(trac.TracModel):
65
66    def define_parameters(self) -> tp.Dict[str, trac.ModelParameter]:
67
68        return trac.define_parameters(
69            trac.P("number_of_rows", trac.INTEGER, label="How many rows of data to generate"))
70
71    def define_inputs(self) -> tp.Dict[str, trac.ModelInputSchema]:
72
73        sample_columns = trac.define_input_table(
74            trac.F("column_name", trac.STRING, label="Column name"),
75            label="List of columns for generating sample data")
76
77        return { "sample_columns": sample_columns }
78
79    def define_outputs(self) -> tp.Dict[str, trac.ModelOutputSchema]:
80
81        return { "dynamic_data_sample": trac.define_output_table(dynamic=True, label="Dynamically generated sample") }

In real life more parameters would often be needed to control the generated data, e.g. range limits or distribution parameters, but for this simple example those are not needed.

Now let’s look at the model code:

 83    def run_model(self, ctx: trac.TracContext):
 84
 85        sample_columns = ctx.get_pandas_table("sample_columns")
 86        number_of_rows = ctx.get_parameter("number_of_rows")
 87
 88        sample_schema = trac.SchemaDefinition(trac.SchemaType.TABLE, table=trac.TableSchema())
 89        sample_data = dict()
 90
 91        for column_index, column_name in enumerate(sample_columns["column_name"]):
 92
 93            field_schema = trac.F(column_name, trac.INTEGER, label=f"Generated column {column_name}")
 94            sample_schema.table.fields.append(field_schema)
 95
 96            offset = column_index * number_of_rows
 97            column_values = range(offset, offset + number_of_rows)
 98            sample_data[column_name] = column_values
 99
100        ctx.put_schema("dynamic_data_sample", sample_schema)
101        ctx.put_pandas_table("dynamic_data_sample", pd.DataFrame(sample_data))

The model creates a SchemaDefinition and adds a FieldSchema for each column. The same helper functions that are available for defining static schemas can be used to build dynamic schemas, in this example we use trac.F() to define each field. You can also load schemas from schema files using load_schema(), then make changes to those schemas in code and use the result as your dynamic output schema.

The model creates a data dictionary with some generated values for each column, so the output dataset will match the generated schema. Before saving the dataset, the model calls put_schema() which sets the schema for a dynamic output. Trying to save a dynamic output before its schema is set will cause a runtime validation error. Only dynamic outputs can have their schema set in this way and each schema can only be set once. If the model is updating an existing dataset, the schema must be compatible. The schema will be validated as part of the call to put_schema().

Note

Calling put_schema() creates a copy of the TRAC schema object. Any changes made to the schema after it is saved will not be picked up by TRAC. Calling get_schema() after a schema has been set will always return the schema as it was saved.

Once the schema is set the output can be saved as normal and TRAC will validate against the new schema.

Dynamic Filtering

Lastly, let’s see how to use dynamic schemas to create a generic data filtering model. This model will exclude records from a dataset, based on some filter criteria passed in as parameters. There might be a lot of datasets we want to filter in this way, all with different schemas and we want a single filtering model that will work for all of them.

Let’s see an example model definition that can help us do that:

104class DynamicDataFilter(trac.TracModel):
105
106    def define_parameters(self) -> tp.Dict[str, trac.ModelParameter]:
107
108        return trac.define_parameters(
109            trac.P("filter_column", trac.STRING, label="Filter colum"),
110            trac.P("filter_value", trac.STRING, label="Filter value"))
111
112    def define_inputs(self) -> tp.Dict[str, trac.ModelInputSchema]:
113
114        return { "original_data": trac.define_input_table(dynamic=True, label="Original (unfiltered) data") }
115
116    def define_outputs(self) -> tp.Dict[str, trac.ModelOutputSchema]:
117
118        return { "filtered_data": trac.define_output_table(dynamic=True, label="Filtered (output) data") }

Here both the input and the output are dynamic, and the model is controlled only by the two filter parameters. Now let’s see the implementation:

120    def run_model(self, ctx: trac.TracContext):
121
122        original_schema = ctx.get_schema("original_data")
123        original_data = ctx.get_pandas_table("original_data")
124
125        filter_column = ctx.get_parameter("filter_column")
126        filter_value = ctx.get_parameter("filter_value")
127
128        filtered_data = original_data[original_data[filter_column] != filter_value]
129
130        ctx.put_schema("filtered_data", original_schema)
131        ctx.put_pandas_table("filtered_data", filtered_data)

The original input schema is used directly as the output schema, so the schema of the input and output will be the same.

See also

Full source code is available for the Dynamic IO examples on GitHub