Chapter 3 - Inputs & Outputs¶
This tutorial is based on example code which can be found in the TRAC GitHub Repository under examples/models/python.
Optional Inputs & Outputs¶
Optional inputs and outputs provide a way for a model to react to the available data. If an input is marked as optional then it may not be supplied, the model code must check at runtime to see if it is available. When an output is marked as optional the model can choose whether to provide that output or not, for example in response to the input data or a boolean flag supplied as a model parameter.
Here is an example of defining an optional input, using schemas read from schema files:
39 def define_inputs(self) -> tp.Dict[str, trac.ModelInputSchema]:
40
41 # Define an optional account filter input, using external schema files
42
43 customer_loans = trac.load_schema(schemas, "customer_loans.csv")
44 account_filter = trac.load_schema(schemas, "account_filter.csv")
45
46 return {
47 "customer_loans": trac.ModelInputSchema(customer_loans),
48 "account_filter": trac.ModelInputSchema(account_filter, optional=True)
49 }
Schemas defined in code can also be marked as optional, let’s use that approach to define an optional output:
51 def define_outputs(self) -> tp.Dict[str, trac.ModelOutputSchema]:
52
53 # Define an optional output for stats on excluded accounts, using schema definitions in code
54
55 profit_by_region = trac.define_output_table(
56 trac.F("region", trac.STRING, label="Customer home region", categorical=True),
57 trac.F("gross_profit", trac.DECIMAL, label="Total gross profit"))
58
59 exclusions = trac.define_output_table(
60 trac.F("reason", trac.STRING, "Reason for exclusion"),
61 trac.F("count", trac.INTEGER, "Number of accounts"),
62 optional=True)
63
64 return {
65 "profit_by_region": profit_by_region,
66 "exclusions": exclusions
67 }
Now let’s see how to use optional inputs and outputs in run_model()
.
Since the input is optional we will need to check if it is available before we can use it.
TRAC provides the has_dataset()
method for this purpose. If the optional dataset exists we will use it to apply
some filtering to the customer accounts list, then produce the optional output
dataset with some stats on the filtered accounts. Here is what that looks like:
77 if ctx.has_dataset("account_filter"):
78
79 # Filter out customer accounts with IDs in the filter set
80 account_filter = ctx.get_pandas_table("account_filter")
81 account_mask = customer_loans['id'].isin(account_filter["account_id"])
82 customer_loans = customer_loans.loc[~account_mask]
83
84 # Create an optional output with some stats about the excluded accounts
85 exclusions = account_filter.groupby(["reason"]).size().to_frame(name="count").reset_index()
86 ctx.put_pandas_table("exclusions", exclusions)
In this example the optional output is only produced when the optional input is supplied - that is not a requirement and the model can decide whether to provide optional outputs based on whatever criteria are appropriate. If an optional output is not going to be produced, then simply do not output the dataset and TRAC will understand it has been omitted. If an optional output is produced then it is subject to all the same validation rules as any other dataset.
See also
Full source code is available for the Optional IO example on GitHub
Dynamic Inputs & Outputs¶
Dynamic inputs and outputs allow a model to work with data when the schema is not known in advance. This allows one model to work with a wide range of data inputs, which can be very useful if you have common requirements that need to be satisfied across a large data estate. Common examples include scenario generation, post-processing of model outputs, model monitoring and data quality reporting.
Tip
Only use dynamic schemas when they are really needed. Static schemas are more robust and allow the platform to make several optimisations that are not possible with dynamic schemas.
Good to know
TRAC normally performs schema validation before a job is executed, to make sure the models and data are compatible. When models are defined with dynamic schemas validation is delayed until runtime. This means validation errors will be reported when the job executes, rather than before it starts. For jobs with a mix of static and dynamic schemas TRAC will still validate the static schemas up front, validation is only delayed for the dynamic schemas.
Schema Inspection¶
This example use a dynamic input to get some information about the schema of an unknown dataset. This approach allows for building common reports that can run on a large collection of datasets with varied schemas.
Note
The TRAC metadata store already holds an entry for every dataset on the platform, you don’t need to write a model just to get the schema of a dataset! However, the technique illustrated here can be used to build more detailed reports, such as data quality reports, model monitoring reports etc.
First let’s define the model, it should take a generic dataset as an input and produce some basic information about the schema and content:
22class DynamicSchemaInspection(trac.TracModel):
23
24 def define_parameters(self) -> tp.Dict[str, trac.ModelParameter]:
25
26 return dict()
27
28 def define_inputs(self) -> tp.Dict[str, trac.ModelInputSchema]:
29
30 return { "source_data": trac.define_input_table(dynamic=True, label="Unknown source dataset") }
31
32 def define_outputs(self) -> tp.Dict[str, trac.ModelOutputSchema]:
33
34 source_data_schema = trac.define_output_table(
35 trac.F("column_name", trac.STRING, label="Column name"),
36 trac.F("column_type", trac.STRING, label="TRAC column type"),
37 trac.F("contains_nulls", trac.BOOLEAN, label="Whether the source column contains nulls"),
38 label="Schema information for the source dataset")
39
40 return { "source_data_schema": source_data_schema }
The source data is defined as a dynamic input. Notice that there are no fields in the schema definition - dynamic inputs or outputs cannot define any fields, doing so will result in a validation error. Since we know what data we want to collect about the incoming dataset, the output schema can be defined as normal. This is a common pattern for inspecting generalised data - the source schema will be dynamic but the expected output is known.
Now let’s see how to use these datasets in the model.
42 def run_model(self, ctx: trac.TracContext):
43
44 source_data = ctx.get_pandas_table("source_data") # Source data as a regular dataframe
45 source_schema = ctx.get_schema("source_data") # TRAC schema for the source data
46
47 # Get the column names and types discovered by TRAC
48 columns = source_schema.table.fields
49 column_names = [col.fieldName for col in columns]
50 column_types = [col.fieldType.name for col in columns]
51
52 # Use discovered column names to inspect the data
53 contains_nulls = [source_data[col].isnull().values.any() for col in column_names]
54
55 # Save results as a regular dataframe
56 result = pd.DataFrame({
57 "column_name": column_names,
58 "column_type": column_types,
59 "contains_nulls": contains_nulls})
60
61 ctx.put_pandas_table("source_data_schema", result)
The model gets the source dataset as normal, but it also gets the schema of the dataset using
get_schema()
which returns aTRAC
SchemaDefinition
object. The schema will
agree precisely with the contents of the dataset, including the field order and casing, so we
can use the information in the schema to operate on the dataset. In this example we just perform
a simple null check on all the columns and produce the output as a normal Pandas dataframe.
You can use the information held in the schema to look for columns with particular attributes to decide how to process each column. Here are some examples of matching columns based on type, nullability and the categorical flag:
float_columns = [col.fieldName for col in columns if col.fieldType == trac.FLOAT]
nullable_columns = [col.fieldName for col in columns if col.notNull != True]
categorical_columns = [col.fieldName for col in columns if col.categorical]
Note
Calling get_schema()
returns a copy of
the TRAC schema object. If you want to manipulate it in your model code, for example to add
or remove fields, that is perfectly fine and will not cause any adverse effects. This can be
useful to create a dynamic output schema based on the contents of a dynamic input. Calling
get_schema()
a second time at some later
point will return a new copy of the schema, without any modifications made by the model code.
Using dynamic inputs locally
TRAC holds schema information for every dataset and passes this information on to models when they run on the platform, which provides the schema for a dynamic input. When models run locally in the IDE, TRAC has to do schema inference so the input files need to hold schema information. Today, the most popular format for storing data files with schema information is Parquet.
Here is a sample job configuration for this model, using a Parquet file as the dynamic input. A small sample data file is included with the tutorial, but you can use any Parquet file and the model will tell you about its schema.
1job:
2 runModel:
3
4 inputs:
5 # Dynamic inputs for local jobs need a file format with schema information
6 source_data: "inputs/loan_final313_100.parquet"
7
8 outputs:
9 source_data_schema: "outputs/dynamic_io/source_data_schema.csv"
Note
TRAC does not currently allow using CSV files as dynamic inputs when running locally. This is because of the need to do schema inference, which is not reliable for CSV files. You can create Parquet files to test dynamic inputs by running a model with the output defined as a .parquet file in the job configuration.
Data Generation¶
Another use for dynamic schemas is to generate datasets based on some criteria. In this example the model receives one input which is a list of columns, and produces a output dataset which contains those columns. Depending on the input that is supplied, the schema of the output will be different.
Let’s see how to define this model:
64class DynamicGenerator(trac.TracModel):
65
66 def define_parameters(self) -> tp.Dict[str, trac.ModelParameter]:
67
68 return trac.define_parameters(
69 trac.P("number_of_rows", trac.INTEGER, label="How many rows of data to generate"))
70
71 def define_inputs(self) -> tp.Dict[str, trac.ModelInputSchema]:
72
73 sample_columns = trac.define_input_table(
74 trac.F("column_name", trac.STRING, label="Column name"),
75 label="List of columns for generating sample data")
76
77 return { "sample_columns": sample_columns }
78
79 def define_outputs(self) -> tp.Dict[str, trac.ModelOutputSchema]:
80
81 return { "dynamic_data_sample": trac.define_output_table(dynamic=True, label="Dynamically generated sample") }
In real life more parameters would often be needed to control the generated data, e.g. range limits or distribution parameters, but for this simple example those are not needed.
Now let’s look at the model code:
83 def run_model(self, ctx: trac.TracContext):
84
85 sample_columns = ctx.get_pandas_table("sample_columns")
86 number_of_rows = ctx.get_parameter("number_of_rows")
87
88 sample_schema = trac.SchemaDefinition(trac.SchemaType.TABLE, table=trac.TableSchema())
89 sample_data = dict()
90
91 for column_index, column_name in enumerate(sample_columns["column_name"]):
92
93 field_schema = trac.F(column_name, trac.INTEGER, label=f"Generated column {column_name}")
94 sample_schema.table.fields.append(field_schema)
95
96 offset = column_index * number_of_rows
97 column_values = range(offset, offset + number_of_rows)
98 sample_data[column_name] = column_values
99
100 ctx.put_schema("dynamic_data_sample", sample_schema)
101 ctx.put_pandas_table("dynamic_data_sample", pd.DataFrame(sample_data))
The model creates a SchemaDefinition
and adds a
FieldSchema
for each column. The same helper functions
that are available for defining static schemas can be used to build dynamic schemas, in this example
we use trac.F()
to define each field. You can also load schemas from
schema files using load_schema()
, then make changes to those
schemas in code and use the result as your dynamic output schema.
The model creates a data dictionary with some generated values for each column, so the output dataset
will match the generated schema. Before saving the dataset, the model calls
put_schema()
which sets the schema for a dynamic
output. Trying to save a dynamic output before its schema is set will cause a runtime validation error.
Only dynamic outputs can have their schema set in this way and each schema can only be set once.
If the model is updating an existing dataset, the schema must be compatible. The schema will be validated
as part of the call to put_schema()
.
Note
Calling put_schema()
creates a copy of
the TRAC schema object. Any changes made to the schema after it is saved will not be picked
up by TRAC. Calling get_schema()
after
a schema has been set will always return the schema as it was saved.
Once the schema is set the output can be saved as normal and TRAC will validate against the new schema.
Dynamic Filtering¶
Lastly, let’s see how to use dynamic schemas to create a generic data filtering model. This model will exclude records from a dataset, based on some filter criteria passed in as parameters. There might be a lot of datasets we want to filter in this way, all with different schemas and we want a single filtering model that will work for all of them.
Let’s see an example model definition that can help us do that:
104class DynamicDataFilter(trac.TracModel):
105
106 def define_parameters(self) -> tp.Dict[str, trac.ModelParameter]:
107
108 return trac.define_parameters(
109 trac.P("filter_column", trac.STRING, label="Filter colum"),
110 trac.P("filter_value", trac.STRING, label="Filter value"))
111
112 def define_inputs(self) -> tp.Dict[str, trac.ModelInputSchema]:
113
114 return { "original_data": trac.define_input_table(dynamic=True, label="Original (unfiltered) data") }
115
116 def define_outputs(self) -> tp.Dict[str, trac.ModelOutputSchema]:
117
118 return { "filtered_data": trac.define_output_table(dynamic=True, label="Filtered (output) data") }
Here both the input and the output are dynamic, and the model is controlled only by the two filter parameters. Now let’s see the implementation:
120 def run_model(self, ctx: trac.TracContext):
121
122 original_schema = ctx.get_schema("original_data")
123 original_data = ctx.get_pandas_table("original_data")
124
125 filter_column = ctx.get_parameter("filter_column")
126 filter_value = ctx.get_parameter("filter_value")
127
128 filtered_data = original_data[original_data[filter_column] != filter_value]
129
130 ctx.put_schema("filtered_data", original_schema)
131 ctx.put_pandas_table("filtered_data", filtered_data)
The original input schema is used directly as the output schema, so the schema of the input and output will be the same.
See also
Full source code is available for the Dynamic IO examples on GitHub