Streaming directly into Postgres with Hapi.js and pg-copy-stream

Date:
Tags:
Node Hapi Postgres GER Knex.js
By

When implementing the Good Enough Recommendations (GER) engine, a core requirement was to let users insert large amounts of data quickly in order to bootstrap the recommendations engine. Additionally, this bootstrapping should be available over HTTP, as this will become the primary channel for interaction with GER.

PostGres (which GER uses) has the COPY command that is "optimised for loading large numbers of rows" in various formats, and npm has the package pg-copy-streams that pass Node.js streams to COPY. This would work well with the Hapi.js web application framework which can turn an uploaded file into a Node.js stream without having to hold the entire file in memory or create a temporary file on disk.

In this post I will describe how to upload a file and directly insert its data into PostGres using Node.js streams with Hapi.js and pg-copy-streams.

#Streaming Data Directly into PostGres with Bootstrap

GER implements a function bootstrap that takes a csv_stream, inserts its data into PostGres, and returns a Q promise for when it is finished (or errored). bootstrap can be broken down into two parts, getting the connection and inserting the data.

##Getting the Connection from Knex GER uses the query builder Knex to communicate and manage connections to the database. The first action for the bootstrap function is to get a connection to the database from Knex:

runner = new knex.client.Runner(knex.client) runner.ensureConnection() .then( (connection) => runner.connection = connection #Use connection ) .finally( -> runner.cleanupConnection())

As with Upsert, this is a bit of a round-about way of dealing with connections. Knex is great, but working around its edges of functionality can be difficult.

##Inserting the Data with pg-copy-streams

The query that is used to insert the data is:

query = "COPY events (person, action, thing) FROM STDIN CSV"

This query COPY's data to the table events, inserting the rows person, action and thing from the standard in (STDIN) stream as a comma separated values (CSV) format. For example, if the streams data were:

bob, views, product1 alice, buys, product2

the query would insert two events one for each row.

Using the from function in pg-copy-streams (copyFrom = require('pg-copy-streams').from) the query is wrapped and sent to the PostGres connection:

copy = connection.query(copyFrom(query))

The returned copy stream is a writable stream where the input csv_stream is piped:

csv_stream.pipe(copy)

To notify the caller that bootstrap has finished or errored, a Q defer is created which listens to the streams end and error events:

deferred = q.defer()

csv_stream.pipe(copy) .on('end', -> deferred.resolve()) .on('error', (err) -> deferred.reject(err))

All together this data insertion looks like:

query = "COPY events (person, action, thing) FROM STDIN CSV" copy = connection.query(copyFrom(query)); deferred = q.defer() csv_stream.pipe(copy) .on('end', -> deferred.resolve()) .on('error', (err) -> deferred.reject(err)) return deferred.promise

#Hapi.js and Streams

By using a Hapi.js server hooked up to GER's bootstrap function, a file can be uploaded and streamed directly into PostGres. As described in the Hapi.js docs. a route can be setup to output a Node.js stream.

To implement this a Hapi.js server must be created with:

Hapi = require('hapi') server = new Hapi.Server('localhost', 8000)

A route that takes a file upload and turns it into a Node.js stream is added to the Hapi.js server with:

server.route method: 'POST' path: 'event/bootstrap' config: payload: maxBytes: 209715200 output:'stream' parse: true handler: (request, reply) -> #do things...

The handler option is the function that handles the request. It can access the uploaded file stream in the requests payload, e.g. request.payload["events"]. This stream is passed to GER's bootstrap function, i.e.

handler: (request, reply) => ger.bootstrap(request.payload["events"]) .then( -> reply({finished: true})) .fail((err) -> reply({error: err}).code(500))

The final part is to start the Hapi.js server with server.start()

##Testing the Server

To test the server and the route, curl can be used to upload a file, e.g. curl -i -F events=@data.csv http://localhost:8000/event/bootstrap

curl can also take a standard stream and upload that, e.g.

head data.csv | curl -i -F events=@- http://localhost:8000/event/bootstrap

I would just like to take the time and examine how awesome this is. head creates a standard-stream, pipes it to curl which turns it into a HTTP multipart request, Hapi.js turns that request to a Node.js stream, which is then piped into PostGres as a standard-stream from GER's bootstrap function. That is just cool!

#Performance Metrics

I wrote a small mocha test that compared inserting 10,000 events into GER one event at a time method, and compared it to inserting 10,000 events using the bootstrap function.

The results were:

  1. 0.7297ms per event when each event was inserted one at a time
  2. 0.0696ms per event for events using bootstrap

That is a 10 times performance improvement when inserting events.

These results are even more exaggerated when adding the overhead of HTTP, as each insert also requires the overhead of its own HTTP request where one uploaded file is only one request.

#Further Reading

Substacks Stream Handbook

Image from RLA-Inque

Related Posts

comments powered by Disqus