TL;DR version: Add "COMPUPDATE OFF" and "STATUPDATE OFF" option to "COPY" for loading data to a staging table. With these options, you can skip post processes of "COPY", which may take a long time even if the number of loaded records is small.

What is “UPSERT”?

"UPSERT" is the operation to merge new records with existing records using primary keys on a table. While some RDBMSs support a single “UPSERT” statement, Amazon Redshift does not support it. Instead, they recommend to use a staging table for merging records as they outline in the Amazon Redshift development guide.

Here is an example of an "UPSERT" statement for Amazon Redshift.

- An example table("users") table schema

CREATE TABLE users (
  id int primary key,
  name,
  city
 );

- "UPSERT" queries with "COPY" command

-- Start transaction BEGIN;

-- Create a staging table
CREATE TABLE users_staging (LIKE users);

-- Load data into the staging table
COPY users_staging (id, name, city)
FROM 's3://.......'
CREDENTIALS 'aws_access_key_id=xxxxxxx;aws_secret_access_key=xxxxxxx';

-- Update records
UPDATE users
SET name = s.name, city = s.city
FROM users_staging s
WHERE users.id = s.id;

-- Insert records
INSERT INTO users
SELECT s.* FROM users_staging s LEFT JOIN users
ON s.id = users.id
WHERE users.id IS NULL;

-- Drop the staging table
DROP TABLE users_staging;

-- End transaction
END;

Performance issue with continuous "UPSERT"s

Recently we encountered a performance issue with continuous "upsert"s, especially when the CPU load of the Redshift cluster was high. For example, we encountered a case where the "UPSERT" transaction took about 10 minutes with even just one record.

Through our investigation, we found that the bottleneck was the "COPY ANALYZE" and the "ANALYZE COMPRESSION” commands which we did not specify in the "UPSERT" statement. The following query result indicates that there are a lot of analyzing commands that are included in one "UPSERT" transaction, and those unexpected commands took a surprising 8 minutes in total.

=> select xid,substring(querytxt,0,32),starttime,endtime from stl_query where xid = 7345326 order by starttime sac;
xid | substring | starttime | endtime
--------+----------------------------------------------------+---------------------
7345326 | COPY ANALYZE users_staging | 2014-07-31 18:19:55 | 2014-07-31 18:19:57
7345326 | analyze compression phase 1 | 2014-07-31 18:19:57 | 2014-07-31 18:19:57
7345326 | analyze compression phase 2 | 2014-07-31 18:19:57 | 2014-07-31 18:20:09
7345326 | analyze compression phase 1 | 2014-07-31 18:20:09 | 2014-07-31 18:20:09
7345326 | analyze compression phase 2 | 2014-07-31 18:27:43 | 2014-07-31 18:27:44
...(repeat analyze compression phase 1 & 2)...
7345326 | copy users_staging("id"," | 2014-07-31 18:27:44 | 2014-07-31 18:27:45
7345326 | padb_fetch_sample: select co | 2014-07-31 18:27:45 | 2014-07-31 18:27:45
7345326 | padb_fetch_sample: select * | 2014-07-31 18:27:45 | 2014-07-31 18:27:45
7345326 | UPDATE users SET "id" = s.id | 2014-07-31 18:27:45 | 2014-07-31 18:27:48
7345326 | INSERT INTO users SELECT s.* | 2014-07-31 18:27:48 | 2014-07-31 18:27:48
(170 rows)

Solution

The reason why "COPY ANALYZE" was called was because that was the default behavior of a "COPY" against empty tables. If the table was empty, "COPY" commands run "COPY ANALYZE" and "ANALYZE COMMAND" automatically, in order to analyze the table and determine the compression type.

Although those options are useful during initial bulk loading, it is not necessary when loading data for temporary staging. As a result, we added "COMPUPDATE OFF" and "STATUPDATE OFF" to “COPY” commands to disable these features during our “UPSERT”s. Here is an example of a “COPY” command with these options set.

-- Load data into the staging table
COPY users_staging (id, name, city)
FROM 's3://.......'
CREDENTIALS 'aws_access_key_id=xxxxxxx;aws_secret_access_key=xxxxxxx'
COMPUPDATE OFF STATUPDATE OFF;

Here are the results of the query running, after applying this change.

=> select xid,substring(querytxt,0,32),starttime,endtime from stl_query where xid = 8624724 order by starttime sac;

xid | substring | starttime | endtime
--------+------------------------------+---------------------+---------------------
8624724 | copy users_staging("id"," | 2014-08-01 22:43:30 | 2014-08-01 22:43:32
8624724 | UPDATE userslSETn"id"a=is.id | 2014-08-01 22:43:32 | 2014-08-01 22:43:48
8624724 | INSERT INTO userslSELECT.s.* | 2014-08-01 22:43:48 | 2014-08-01 22:44:38
(3 rows)

Now we succeeded in getting rid of the unnecessary "COPY ANALYZE" and "ANALYZE COMPRESSION" queries, and it takes only 18 seconds to complete one "UPSERT" transaction. It’s a drastic performance improvement for our use case.

We hope this information will help you when you run your “UPSERT”s!

Reference

Here are documents which may help you to introduce "UPSERT"s.

  • Amazon Redshift Dev Guide -- Use a staging table to perform a merge (upsert) -- http://docs.aws.amazon.com/redshift/latest/dg/c_best-practices-upsert.html

  • Amazon Redshift Dev Guide -- COPY command -- http://docs.aws.amazon.com/redshift/latest/dg/r_COPY.html