Chapter 4 - Streaming

This tutorial is based on the streaming.js example, which can be found in the TRAC GitHub Repository under examples/apps/javascript.

Note

This example shows how to create and read a dataset using streaming upload and download operations. The same approach can be used to update datasets or to create, update and read files.

Data transport

The gRPC transport provided by Google in grpc-web does not yet support streaming uploads. Fortunately, TRAC provides a gRPC transport that does support streaming for both uploads and downloads, using a web sockets implementation based on the work by Improbable Eng.

The TRAC transport is available as an option in the transport setup:

19// Create the Data API
20const transportOptions = {transport: "trac"};
21const dataTransport = tracdap.setup.transportForBrowser(tracdap.api.TracDataApi, transportOptions);
22const dataApi = new tracdap.api.TracDataApi(dataTransport);

Or to run outside a browser:

19// Create the Data API
20const transportOptions = {transport: "trac"};
21const dataTransport = tracdap.setup.transportForTarget(tracdap.api.TracDataApi, "http", "localhost", 8080, transportOptions);
22const dataApi = new tracdap.api.TracDataApi(dataTransport);

The options for the transport parameter are trac or google, the default is google. It is fine to use the trac transport for the data service and google transport for everything else, this is the recommended approach. The trac transport is only needed for streaming upload calls, if you are only downloading data you can use the default google transport.

Streaming upload

To upload a data stream we are going to send a series of messages in one streaming upload call. The first message contains all the settings needed in a DataWriteRequest, but no content. The following messages contain content only with no other settings, this content will usually come from a streaming source.

First let’s create the initial request message. We are going to send data in CSV format, so that we can stream data straight from a file to the TRAC data API without any transformation. This example uses an embedded schema, but a schemaId for an external schema is also fine. Tag attributes can be set as normal. The initial request message goes through the same validation as a request to readSmallDataset(), except that the content can be empty.

49    // Create a request object to save the data, this is the first message that will be sent
50    // It is just like createSmallDataset, but without the content
51    const request0 = tracdap.api.DataWriteRequest.create({
52
53        tenant: "ACME_CORP",
54        schema: schema,
55        format: "text/csv",
56
57        tagUpdates: [
58            { attrName: "schema_type", value: { stringValue: "large_test_data" } },
59            { attrName: "business_division", value: { stringValue: "ACME Test Division" } },
60            { attrName: "description", value: { stringValue: "A streaming sample data set" } },
61        ]
62    });

Now let’s create the streaming source. The example code uses the fs module from Node.js to create an input stream, then passes the stream into the upload function:

156    const csvStream = fs.createReadStream(LARGE_CSV_FILE);

In a browser application, your source is most likely to be an HTML file input control. The file input control supports streaming using the web streams API, which is different from the event streams used in Node.js and Protobuf.js. TRAC provides a utility function to create an event stream, using a web ReadableStream as the source.

156    const csvInput = document.getElementById("input_id");
157    const csvFile = csvInput.files[0];
158    const csvStream = tracdap.utils.streamToEmitter(csvFile.stream());

Note

To stream data from memory you can use Blob.stream() with streamToEmitter().

We’re going to create a promise for the stream, which will complete when the streaming upload finishes. Although we are sending a stream of messages to the server there will only be a single reply, which can be a success or failure.

To set up the streaming call, we need to use the newStream() method in the web API setup module. It is important to call this method for every new streaming call and each stream can only be used once, otherwise messages from different calls will be mixed in a single stream.

64    // The upload stream is set up as a promise
65    // The stream will run until it either completes or fails, and the result will come back on the promise
66
67    return new Promise((resolve, reject) => {
68
69        // You have to call newStream before using a streaming operation
70        // This is needed so events on different streams don't get mixed up
71        // TRAC will not let you run two streams on the same instance
72        const stream = tracdap.setup.newStream(dataApi);

After creating the stream, start by sending the initial message. This will start the streaming upload operation. This initial API call returns a future which holds the result of the whole operation, so we can use this to complete the promise.

74        // To start the upload, call the API method as normal with your first request object
75        // The success / failure of this call is passed back through resolve/reject on the promise
76
77        stream.createDataset(request0)
78            .then(resolve)
79            .catch(reject);

Now the upload stream is open, we need to relay data from the source stream. To do this we can handle the “data” event on the source stream which supplies chunks of data from the input source. To send them to the upload stream, each chunk needs to be wrapped in a DataWriteRequest. The “end” event signals that the source stream is complete.

81        // Now handle the events on your source stream, by forwarding them to the upload stream
82        // In this example, csvStream is a stream of chunks loaded from the file system
83        // Each chunk needs to be wrapped in a message, by setting the "content" field
84        // All the other fields in the message should be left blank
85
86        csvStream.on('data', chunk => {
87            const msg = tracdap.api.DataWriteRequest.create({content: chunk});
88            stream.createDataset(msg)
89        });
90
91        // Once the source stream completes, signal that the upload stream is also complete
92
93        csvStream.on('end', () => stream.end());

The last thing is to handle any errors that occur on the source stream. These are different from errors in the upload stream, which were handled earlier by .catch(reject).

If there is an error in the source stream, we need to cancel the upload operation. Calling cancel() will eventually produce an error on the upload stream, but this will be an “operation cancelled” error with no information about what went wrong in the source. Instead we want to reject the promise explicitly, to pass on the error information from the source stream.

 95        // If there is an error reading the input data stream, we need to cancel the upload
 96        // This is to prevent a half-sent dataset from being saved
 97        // Calling .cancel() does not result in an error on the stream, so call reject() explicitly
 98
 99        csvStream.on('error', err => {
100            stream.cancel();
101            reject(err);
102        });
103
104    }); // End of streaming operation promise

Streaming download

To download a data stream we make a single request and get back a stream of messages. The first message in the stream will contain all the metadata and no content. Subsequent messages will contain only content.

Note

This example shows how to use a download stream and collect the result in memory. It is a useful approach for datasets that are too big to download with readSmallDataset(), but where you still want to keep the whole dataset to display, sort, filter etc.

To start you need to create a DataReadRequest. This is exactly the same as the request used to call readSmallDataset().

110    // Ask for the dataset in CSV format so we can easily count the rows
111    const request = tracdap.api.DataReadRequest.create({
112
113        tenant: "ACME_CORP",
114        selector: dataId,
115        format: "text/csv"
116    });

Since we are going to collect the response data into a single message, we can set up the streaming operation as a promise just like the upload operation. The promise will complete once all the data is collected and aggregated. If there are any errors during the operation, the promise will be rejected.

118    // In this example, the messages from the download stream are aggregated into a single response message
119    // To display data in the browser, the entire dataset must be loaded in memory
120    // Aggregating the data and returning a single promise keeps the streaming logic contained
121
122    // Just like the upload method, set up the stream operation as a promise
123
124    return new Promise((resolve, reject) => {
125
126        // You have to call newStream before using a streaming operation
127        const stream = tracdap.setup.newStream(dataApi);

The next step is to set up event handlers for the download stream. There are three events to process: “data”, “end” and “error”. In this example we will just collect the response messages from the “data” events until they have all been received, and then use a TRAC utility function to aggregate them into a single DataReadResponse.

Note

The aggregateStreamContent() function works for both DataReadResponse and FileReadResponse messages.

129        // Hold the responses here until the stream is complete
130        const messages = [];
131
132        // When messages come in, stash them until the stream is complete
133        stream.on("data", msg => messages.push(msg));
134
135        // Once the stream finishes we can aggregate the messages into a single response
136        stream.on("end", () => {
137            const response = tracdap.utils.aggregateStreamContent(messages);
138            resolve(response);
139        });
140
141        // Handle the error signal to make sure errors are reported back through the promise
142        stream.on("error", err => reject(err));

Now everything is ready, the final step is to make an API call to start the download stream.

144        // Make the initial API call to start the download stream
145        // It is not necessary to use a .then() or .catch() block,
146        // because responses are processed on the stream
147        stream.readDataset(request);
148
149    }); // End of streaming operation promise