What is SKIP LOCKED for in PostgreSQL 9.5?

PostgreSQL 9.5 introduces a new SKIP LOCKED option to SELECT ... FOR [KEY] UPDATE|SHARE. It’s used in the same place as NOWAIT and, like NOWAIT, affects behaviour when the tuple is locked by another transaction.

The main utility of SKIP LOCKED is for building simple, reliable and efficient concurrent work queues.

“How do I find the first row (by some given ordering) in a queue table that nobody else has claimed and claim it for myself? It needs to automatically revert to being unclaimed again if I crash or exit for any reason. Many other workers will be doing the same thing at the same time. It is vital that each item get processed exactly once; none may be skipped and none may be processed more than once.”

This is harder than you’d think because SQL statements do not execute atomically. A subquery might run before the outer query, depending on how the planner/optimizer does things. Many of the race conditions that can affect series of statements can also affect single statements with CTEs and subqueries, but the window in which they occur is narrower because the statement-parts usually run closer together. So lots of code that looks right proves not to be, it’s just right 99.95% of the time, or it’s always right until that day your business gets a big surge of business and concurrency goes up. Sometimes that’s good enough. Often it isn’t.

How SKIP LOCKED helps

SKIP LOCKED tries to make this easier by letting you use normal SQL to write efficient, safe queue systems. You don’t need to import a large and complex 3rd party app or library to implement a queue, and you don’t need to deal with the key mapping and namespace issues with advisory locking.

Given a trivial queue:

CREATE TABLE queue(
  itemid INTEGER PRIMARY KEY,
  is_done BOOLEAN NOT NULL DEFAULT 'f'
);

INSERT INTO queue(itemid)
SELECT x FROM generate_series(1,20) x;

an application can grab a single queue item safely while holding an open transaction with:

DELETE FROM queue
WHERE itemid = (
  SELECT itemid
  FROM queue
  ORDER BY itemid
  FOR UPDATE SKIP LOCKED
  LIMIT 1
)
RETURNING *;

This:

  • Scans the queue table in itemid order
  • Tries to acquire a lock on each row. If it fails to acquire the lock, it ignores the row as if it wasn’t in the table at all and carries on.
  • Stops scanning once it’s locked one item
  • Returns the itemid of the locked item
  • Looks up the found itemid in the index to get its physical location
  • Marks the tuple as deleted (but this doesn’t take effect until commit)

Generated columns in PostgreSQL 12

Here’s a new feature coming in Postgres 12:
The ability to have a non-updatable column that is generated by a function.
CREATE OR REPLACE FUNCTION my_concat(text, VARIADIC text[])
RETURNS TEXT AS 'text_concat_ws' LANGUAGE internal immutable;

CREATE TABLE addresses (
    id bigserial primary key,
    address1 text,
    address2 text,
    address3 text,
    city text,
    state text,
    zip text,
    delivery_address text generated always as 
        (my_concat(E'\n',address1,address2,address3,my_concat(' ',my_concat(',', city, state),zip)) ) stored
);

INSERT INTO addresses (address1,address3,city,state,zip) 
    VALUES ('105 Live Oak Street','c/o Somebody, Somewhere','Live Oak Village','TX','78039');

SELECT delivery_address FROM addresses;

Fastest Way to Load Data Into PostgreSQL Using Python

From two minutes to less than half a second!

Data written to an unlogged table will not be logged to the write-ahead-log (WAL), making it ideal for intermediate tables. Note that UNLOGGED tables will not be restored in case of a crash, and will not be replicated.

.. Copy Data From a String Iterator with Buffer Size

In an attempt to squeeze one final drop of performance, we notice that just like page_size, the copy command also accepts a similar argument called size:

size – size of the buffer used to read from the file.

Let’s add a size argument to the function:

@profile
def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]], size: int = 8192) -> None:
    with connection.cursor() as cursor:
        create_staging_table(cursor)
        beers_string_iterator = StringIteratorIO((
            '|'.join(map(clean_csv_value, (
                beer['id'],
                beer['name'],
                beer['tagline'],
                parse_first_brewed(beer['first_brewed']).isoformat(),
                beer['description'],
                beer['image_url'],
                beer['abv'],
                beer['ibu'],
                beer['target_fg'],
                beer['target_og'],
                beer['ebc'],
                beer['srm'],
                beer['ph'],
                beer['attenuation_level'],
                beer['brewers_tips'],
                beer['contributed_by'],
                beer['volume']['value'],
            ))) + '\n'
            for beer in beers
        ))
        cursor.copy_from(beers_string_iterator, 'beers', sep='|', size=size)

The default value for size is 8192, which is 2 ** 13, so we will keep sizes in powers of 2:

>>> copy_string_iterator(connection, iter(beers), size=1024)
copy_string_iterator(size=1024)
Time   0.4536
Memory 0.0

>>> copy_string_iterator(connection, iter(beers), size=8192)
copy_string_iterator(size=8192)
Time   0.4596
Memory 0.0

>>> copy_string_iterator(connection, iter(beers), size=16384)
copy_string_iterator(size=16384)
Time   0.4649
Memory 0.0

>>> copy_string_iterator(connection, iter(beers), size=65536)
copy_string_iterator(size=65536)
Time   0.6171
Memory 0.0

EXPLAIN EXTENDED: INNER JOIN vs. CROSS APPLY

CROSS APPLY is a Microsoft’s extension to SQL, which was originally intended to be used with table-valued functions (TVF‘s).

The query above would look like this:

1
2
3
4
5
6
7
8
9
SELECT  *
FROM    table1
CROSS APPLY
(
SELECT  TOP (table1.rowcount) *
FROM    table2
ORDER BY
id
) t2

Postgres equivalent to CROSS APPLY in SQL Server