Chapter 4 - Streaming

This tutorial is based on the streaming.js example, which can be found in the TRAC GitHub Repository under examples/apps/javascript.

Note

This example shows how to create and read a dataset using streaming upload and download operations. The same approach can be used to update datasets or to create, update and read files.

Data transport

The gRPC transport provided by Google in grpc-web does not yet support streaming uploads. Fortunately, TRAC provides a gRPC transport that does support streaming for both uploads and downloads, using a web sockets implementation based on the work by Improbable Eng.

The TRAC transport is available as an option in the transport setup:

20// Create the Data API
21const transportOptions = {transport: "trac"};
22const dataTransport = tracdap.setup.transportForBrowser(tracdap.api.TracDataApi, transportOptions);
23const dataApi = new tracdap.api.TracDataApi(dataTransport);

Or to run outside a browser:

20// Create the Data API
21const transportOptions = {transport: "trac"};
22const dataTransport = tracdap.setup.transportForTarget(tracdap.api.TracDataApi, "http", "localhost", 8080, transportOptions);
23const dataApi = new tracdap.api.TracDataApi(dataTransport);

The options for the transport parameter are trac or google, the default is google. It is fine to use the trac transport for the data service and google transport for everything else, this is the recommended approach. The trac transport is only needed for streaming upload calls, if you are only downloading data you can use the default google transport.

Streaming upload

To upload a data stream we are going to send a series of messages in one streaming upload call. The first message contains all the settings needed in a DataWriteRequest, but no content. The following messages contain content only with no other settings, this content will usually come from a streaming source.

First let’s create the initial request message. We are going to send data in CSV format, so that we can stream data straight from a file to the TRAC data API without any transformation. This example uses an embedded schema, but a schemaId for an external schema is also fine. Tag attributes can be set as normal. The initial request message goes through the same validation as a request to readSmallDataset(), except that the content can be empty.

50    // Create a request object to save the data, this is the first message that will be sent
51    // It is just like createSmallDataset, but without the content
52    const request0 = tracdap.api.DataWriteRequest.create({
53
54        tenant: "ACME_CORP",
55        schema: schema,
56        format: "text/csv",
57
58        tagUpdates: [
59            { attrName: "schema_type", value: { stringValue: "large_test_data" } },
60            { attrName: "business_division", value: { stringValue: "ACME Test Division" } },
61            { attrName: "description", value: { stringValue: "A streaming sample data set" } },
62        ]
63    });

Now let’s create the streaming source. The example code uses the fs module from Node.js to create an input stream, then passes the stream into the upload function:

161    const csvStream = fs.createReadStream(LARGE_CSV_FILE);
162    const dataId = await saveStreamingData(csvStream);

In a browser application, your source is most likely to be an HTML file input control. The file input control supports streaming using the web streams API, which is different from the event streams used in Node.js and Protobuf.js. TRAC provides a utility function to create an event stream, using a web ReadableStream as the source.

161    const csvInput = document.getElementById("input_id");
162    const csvFile = csvInput.files[0];
163    const csvStream = tracdap.utils.streamToEmitter(csvFile.stream());
164    const dataId = await saveStreamingData(csvStream);

Note

To stream data from memory you can use Blob.stream() with streamToEmitter().

We’re going to create a promise for the stream, which will complete when the streaming upload finishes. Although we are sending a stream of messages to the server there will only be a single reply, which can be a success or failure.

To set up the streaming call, we need to use the newStream() method in the web API setup module. It is important to call this method for every new streaming call and each stream can only be used once, otherwise messages from different calls will be mixed in a single stream.

65    // The upload stream is set up as a promise
66    // The stream will run until it either completes or fails, and the result will come back on the promise
67
68    return new Promise((resolve, reject) => {
69
70        // You have to call newStream before using a streaming operation
71        // This is needed so events on different streams don't get mixed up
72        // TRAC will not let you run two streams on the same instance
73        const stream = tracdap.setup.newStream(dataApi);

After creating the stream, start by sending the initial message. This will start the streaming upload operation. This initial API call returns a future which holds the result of the whole operation, so we can use this to complete the promise.

75        // To start the upload, call the API method as normal with your first request object
76        // The success / failure of this call is passed back through resolve/reject on the promise
77
78        stream.createDataset(request0)
79            .then(resolve)
80            .catch(reject);

Now the upload stream is open, we need to relay data from the source stream. To do this we can handle the “data” event on the source stream which supplies chunks of data from the input source. To send them to the upload stream, each chunk needs to be wrapped in a DataWriteRequest. The “end” event signals that the source stream is complete.

82        // Now handle the events on your source stream, by forwarding them to the upload stream
83        // In this example, csvStream is a stream of chunks loaded from the file system
84        // Each chunk needs to be wrapped in a message, by setting the "content" field
85        // All the other fields in the message should be left blank
86
87        csvStream.on('data', chunk => {
88            const msg = tracdap.api.DataWriteRequest.create({content: chunk});
89            stream.createDataset(msg)
90        });
91
92        // Once the source stream completes, signal that the upload stream is also complete
93
94        csvStream.on('end', () => stream.end());

The last thing is to handle any errors that occur on the source stream. These are different from errors in the upload stream, which were handled earlier by .catch(reject).

If there is an error in the source stream, we need to cancel the upload operation. Calling cancel() will eventually produce an error on the upload stream, but this will be an “operation cancelled” error with no information about what went wrong in the source. Instead we want to reject the promise explicitly, to pass on the error information from the source stream.

 96        // If there is an error reading the input data stream, we need to cancel the upload
 97        // This is to prevent a half-sent dataset from being saved
 98        // Calling .cancel() does not result in an error on the stream, so call reject() explicitly
 99
100        csvStream.on('error', err => {
101            stream.cancel();
102            reject(err);
103        });
104
105    }); // End of streaming operation promise

Streaming download

To download a data stream we make a single request and get back a stream of messages. The first message in the stream will contain all the metadata and no content. Subsequent messages will contain only content.

Note

This example shows how to use a download stream and collect the result in memory. It is a useful approach for datasets that are too big to download with readSmallDataset(), but where you still want to keep the whole dataset to display, sort, filter etc.

To start you need to create a DataReadRequest. This is exactly the same as the request used to call readSmallDataset().

111    // Ask for the dataset in CSV format so we can easily count the rows
112    const request = tracdap.api.DataReadRequest.create({
113
114        tenant: "ACME_CORP",
115        selector: dataId,
116        format: "text/csv"
117    });

Since we are going to collect the response data into a single message, we can set up the streaming operation as a promise just like the upload operation. The promise will complete once all the data is collected and aggregated. If there are any errors during the operation, the promise will be rejected.

123    // Just like the upload method, set up the stream operation as a promise
124
125    return new Promise((resolve, reject) => {
126
127        // You have to call newStream before using a streaming operation
128        const stream = tracdap.setup.newStream(dataApi);

The next step is to set up event handlers for the download stream. There are three events to process: “data”, “end” and “error”. In this example we will just collect the response messages from the “data” events until they have all been received, and then use a TRAC utility function to aggregate them into a single DataReadResponse.

Note

The aggregateStreamContent() function works for both DataReadResponse and FileReadResponse messages.

130        // Hold the responses here until the stream is complete
131        const messages = [];
132
133        // When messages come in, stash them until the stream is complete
134        stream.on("data", msg => messages.push(msg));
135
136        // Once the stream finishes we can aggregate the messages into a single response
137        stream.on("end", () => {
138            const response = tracdap.utils.aggregateStreamContent(messages);
139            resolve(response);
140        });
141
142        // Handle the error signal to make sure errors are reported back through the promise
143        stream.on("error", err => reject(err));

Now everything is ready, the final step is to make an API call to start the download stream. Since we are using stream event processing, we need to turn off processing of future results or callbacks by supplying no-op handlers, to prevent JavaScript warnings about unhandled results / errors.

145        // Make the initial API call to start the download stream
146        // Explicitly disable future result processing, we are using stream events instead
147        stream.readDataset(request)
148            .then(_ => {})
149            .catch(_ => {});
150
151        // The equivalent API call using the callback style would be:
152        // stream.readDataset(request, /* no-op callback */ _ => {});
153
154    }); // End of streaming operation promise

Note

The future / callback style of processing results works for streaming upload calls, because there is only a single response message for the whole operation. Download operations produce a stream of messages, so it is not possible to use a single handler and stream events are needed.