Change Data Capture Queries

On this page Carat arrow pointing down
Note:

This is an enterprise-only feature. Request a 30-day trial license to try it out.

Change data capture queries allow you to define the change data emitted to your sink when you create a changefeed. The expression syntax provides a way to select columns and apply filters to further restrict or transform the data in your changefeed messages.

You can use CDC queries to do the following:

  • Filter out specific rows and columns from changefeed messages to decrease the load on your downstream sink and support outbox workloads.
  • Modify data before it emits to reduce the time and operational burden of filtering or transforming data downstream.
  • Stabilize or customize the schema of your changefeed messages for increased compatibility with external systems.

You can use any CockroachDB-supported SQL expression syntax that is not listed in limitations to build a changefeed query.

See the Examples section for further use cases.

Syntax

There are two possible components to CDC queries:

  • Projections select the columns that you want to emit data from.
  • Predicates restrict the resulting column change data based on the filters you apply.
CREATE CHANGEFEED [INTO sink] [WITH options] AS SELECT projection FROM table [WHERE predicate];
Parameter Description
sink Specify the sink URL to emit change data to. See Changefeed Sinks for a list of supported sinks. It is also possible to run a changefeed without a sink CREATE CHANGEFEED WITH..., which will send changes to the active SQL session.
options Set options on the changefeed. See the Options table for a full list.
projection Select the columns from which to emit data.
table Define the table to which the columns belong.
predicate Apply optional filters with a WHERE clause.

For a SQL diagram of the CDC query syntax, see the CREATE CHANGEFEED page.

New in v23.1: To emit different properties for a row, specify the following explicitly in CDC queries:

  • cdc_prev: A tuple-typed column that gives changefeeds access to the previous state of a row. See the Emit the previous state of a row example for more detail.
  • CDC queries support system columns, for example:
    • crdb_internal_mvcc_timestamp: Records the timestamp of each row created in a table. If you do not have a timestamp column in the target table, you can access crdb_internal_mvcc_timestamp in a changefeed. See the Determine the age of a row example.

Limitations

  • You can only apply CDC queries on a single table in each statement.
  • Some stable functions, notably functions that return MVCC timestamps, are overridden to return the MVCC timestamp of the event, e.g., transaction_timestamp or statement_timestamp. Additionally, some time-based functions, such as now() are not supported. We recommend using the transaction_timestamp() function or the crdb_internal_mvcc_timestamp column instead.
  • You cannot alter a changefeed that uses CDC queries. Tracking GitHub issue
  • The following are not permitted in CDC queries:
  • delete changefeed events will not contain any content in the output message. Tracking GitHub issue

CDC query function support

New in v23.1: The following table outlines functions that are useful with CDC queries:

Function Description
changefeed_creation_timestamp() Returns the decimal MVCC timestamp when the changefeed was created.
event_op() Returns a string describing the type of event. If a changefeed is running with the diff option, then this function returns 'insert', 'update', or 'delete'. If a changefeed is running without the diff option, it is not possible to determine an update from an insert, so event_op() returns 'upsert' or 'delete'.
event_schema_timestamp() Returns the timestamp of the schema changes event.

You can also use the following functions in CDC queries:

  • Functions marked as "Immutable" on the Functions and Operators page.
  • New in v23.1: Non-volatile user-defined functions. See the Queries and user-defined functions example.
  • Functions that rely on session data. At the time of changefeed creation, information about the current session is saved. When a CDC query includes one of the functions that use session data, the query will evaluate the saved session data.
  • The following "Stable" functions:
    • age()
    • array_to_json()
    • array_to_string()
    • crdb_internal.cluster_id()
    • date_part()
    • date_trunc()
    • extract()
    • format()
    • jsonb_build_array()
    • jsonb_build_object()
    • to_json()
    • to_jsonb()
    • row_to_json()
    • overlaps()
    • pg_collation_for()
    • pg_typeof()
    • quote_literal()
    • quote_nullable()

Unsupported functions

You can not use the following functions with CDC queries:

Examples

CDC queries allow you to customize your changefeed for particular scenarios. This section outlines several possible use cases for CDC queries.

See CREATE CHANGEFEED for examples on using the foundational syntax to create a changefeed. For information on sinks, see the Changefeed Sinks page.

Tip:

To optimize the SELECT query you run in your changefeed statement, use the EXPLAIN statement to view a statement plan.

Note that EXPLAIN does not have access to cdc_prev, therefore you will receive an error if your SELECT query contains cdc_prev.

Filter columns

To only emit data from specific columns in a table, you can use SELECT {columns} to define the table's columns.

As an example, using the users table from the movr database, you can create a changefeed that will emit messages including only the name and city column data:

icon/buttons/copy
CREATE CHANGEFEED INTO "scheme://sink-URI" WITH updated AS SELECT name, city FROM users;
{"record":{"users":{"name":{"string":"Steven Lara"},"city":{"string":"los angeles"}}}}
{"record":{"users":{"city":{"string":"los angeles"},"name":{"string":"Carl Russell"}}}}
{"record":{"users":{"name":{"string":"Brett Porter"},"city":{"string":"boston"}}}}
{"record":{"users":{"name":{"string":"Vanessa Rivera"},"city":{"string":"los angeles"}}}}
{"record":{"users":{"name":{"string":"Tony Henderson"},"city":{"string":"los angeles"}}}}
{"record":{"users":{"city":{"string":"boston"},"name":{"string":"Emily Hill"}}}}
{"record":{"users":{"name":{"string":"Dustin Kramer"},"city":{"string":"boston"}}}}
{"record":{"users":{"name":{"string":"Dawn Roman"},"city":{"string":"boston"}}}}

Filter delete messages

To remove the delete messages from a changefeed stream, use the event_op() function:

icon/buttons/copy
CREATE CHANGEFEED INTO sink AS SELECT * FROM table WHERE NOT event_op() = 'delete';

Filtering delete messages from your changefeed is helpful for certain outbox table use cases. See Queries and the outbox pattern for further detail.

Emit the previous state of a row

New in v23.1: Changefeeds can access the cdc_prev hidden column on a table to emit the previous state of a row or column. cdc_prev is a tuple-typed column that contains the table's columns.

To emit the previous state of a row, it is necessary to explicitly call cdc_prev:

icon/buttons/copy
CREATE CHANGEFEED INTO 'external://sink' AS SELECT *, cdc_prev FROM movr.rides;

To emit the previous state of a column, you can specify this as a named field from the cdc_prev tuple with the following syntax:

icon/buttons/copy
CREATE CHANGEFEED INTO 'external://sink' AS SELECT *, cdc_prev FROM movr.vehicles WHERE (cdc_prev).status = 'in_use';

Geofilter a changefeed

When you are working with a REGIONAL BY ROW table, you can filter the changefeed on the crdb_region column to create a region-specific changefeed:

icon/buttons/copy
CREATE CHANGEFEED INTO sink AS SELECT * FROM table WHERE crdb_region = 'europe-west2';

For more detail on targeting REGIONAL BY ROW tables with changefeeds, see Changefeeds in Multi-Region Deployments.

Stabilize the changefeed message schema

As changefeed messages emit from the database, message formats can vary as tables experience schema changes. You can select columns with typecasting to prevent message fields from changing during a changefeed's lifecycle:

icon/buttons/copy
CREATE CHANGEFEED INTO sink AS SELECT id::int, name::varchar, admin::bool FROM users;

Shard changefeed messages

CDC queries allow you to emit changefeed messages from the same table to different endpoints. As a result, you can use queries to load balance messages across changefeed sinks without the need for an intermediate system.

In this example, the query uses the ride_id column's UUID to shard the messages. The left() function filters the first character from the ride_id column and finds the specified initial characters. The example shards successfully by running a changefeed on the same table and dividing the 16 possible beginning UUID characters through to f.

Therefore, the first changefeed created:

icon/buttons/copy
CREATE CHANGEFEED INTO 'scheme://sink-URI-1' 
AS SELECT * FROM movr.vehicle_location_histories 
WHERE left(ride_id::string, 1) IN ('0','1','2','3');

The final changefeed created:

icon/buttons/copy
CREATE CHANGEFEED INTO 'scheme://sink-URI-4' 
AS SELECT * FROM movr.vehicle_location_histories 
WHERE left(ride_id::string, 1) IN ('c','d','e','f');

View recent changes to a row

You can use CDC queries as a tool for debugging or investigating issues from the SQL shell.

For example, you may need to identify what recently changed in a specific row. You can use the cursor option with the desired start time and a WHERE clause describing the row in question. Instead of sending to a sink, a "sinkless" changefeed will allow you to view the results in the SQL shell.

  1. Find the start time. Use the cluster_logical_timestamp() function to calculate the logical time. This will return the logical timestamp for an hour earlier than the statement run time:

    icon/buttons/copy
    SELECT cluster_logical_timestamp() - 3600000000000;
    
                 ?column?
    ----------------------------------
      1663938662092036106.0000000000
    (1 row)
    
  2. Run the changefeed without a sink and pass the start time to the cursor option:

    icon/buttons/copy
    CREATE CHANGEFEED WITH cursor='1663938662092036106.0000000000' 
    AS SELECT * FROM vehicle_location_histories 
    WHERE ride_id::string LIKE 'f2616bb3%';
    
  3. To find changes within a time period, use cursor with the end_time option:

    icon/buttons/copy
    CREATE CHANGEFEED WITH cursor='1663938662092036106.0000000000', end_time='1663942405825479261.0000000000' 
    AS SELECT * FROM vehicle_location_histories 
    WHERE ride_id::string LIKE 'f2616bb3%';
    

Determine the age of a row

New in v23.1: You can determine the age of a row by using the crdb_internal_mvcc_timestamp system column and cdc_prev to access the row's previous state:

icon/buttons/copy
CREATE CHANGEFEED INTO 'external://sink' 
AS SELECT crdb_internal_mvcc_timestamp - (cdc_prev).crdb_internal_mvcc_timestamp 
AS age 
FROM movr.rides;
{"age": 1679504962492204986.0000000000}
{"age": 1679577387885735266.0000000000}
{"age": 1679504962492204986.0000000000}
{"age": 1679578262568913199.0000000000}
{"age": 1679504962492381317.0000000000}
{"age": 1679579853238534524.0000000000}
{"age": 1679578374708255008.0000000000}
{"age": 1679504962492381317.0000000000}
{"age": 1679578344852201733.0000000000}
{"age": 1679578242116550285.0000000000}

Recover lost messages

In the event that an incident downstream has affected some rows, you may need a way to recover or evaluate the specific rows. Create a new changefeed that only watches for the affected row(s). Here, the example uses the row's primary key:

icon/buttons/copy
CREATE CHANGEFEED INTO 'scheme://sink-URI' 
AS SELECT * FROM movr.vehicle_location_histories 
WHERE ride_id = 'ff9df988-ebda-4066-b0fc-ecbc45f8d12b';

The changefeed will return messages for the specified rows:

{"key":"[\"washington dc\", \"ff9df988-ebda-4066-b0fc-ecbc45f8d12b\", \"2022-09-22T20:10:05.405737\"]","table":"vehicle_location_histories","value":"{\"city\": \"washington dc\", \"lat\": 128, \"long\": 11, \"ride_id\": \"ff9df988-ebda-4066-b0fc-ecbc45f8d12b\", \"timestamp\": \"2022-09-22T20:10:05.405737\"}"}
{"key":"[\"washington dc\", \"ff9df988-ebda-4066-b0fc-ecbc45f8d12b\", \"2022-09-22T20:10:05.478217\"]","table":"vehicle_location_histories","value":"{\"city\": \"washington dc\", \"lat\": 45, \"long\": -66, \"ride_id\": \"ff9df988-ebda-4066-b0fc-ecbc45f8d12b\", \"timestamp\": \"2022-09-22T20:10:05.478217\"}"}
{"key":"[\"washington dc\", \"ff9df988-ebda-4066-b0fc-ecbc45f8d12b\", \"2022-09-22T20:10:05.487198\"]","table":"vehicle_location_histories","value":"{\"city\": \"washington dc\", \"lat\": -34, \"long\": -49, \"ride_id\": \"ff9df988-ebda-4066-b0fc-ecbc45f8d12b\", \"timestamp\": \"2022-09-22T20:10:05.487198\"}"}
{"key":"[\"washington dc\", \"ff9df988-ebda-4066-b0fc-ecbc45f8d12b\", \"2022-09-22T20:10:05.535764\"]","table":"vehicle_location_histories","value":"{\"city\": \"washington dc\", \"lat\": 1E+1, \"long\": -27, \"ride_id\": \"ff9df988-ebda-4066-b0fc-ecbc45f8d12b\", \"timestamp\": \"2022-09-22T20:10:05.535764\"}"}
{"key":"[\"washington dc\", \"ff9df988-ebda-4066-b0fc-ecbc45f8d12b\", \"2022-09-22T20:10:05.575483\"]","table":"vehicle_location_histories","value":"{\"city\": \"washington dc\", \"lat\": 83, \"long\": 84, \"ride_id\": \"ff9df988-ebda-4066-b0fc-ecbc45f8d12b\", \"timestamp\": \"2022-09-22T20:10:05.575483\"}"}

The output will only include the row's history that has been changed within the garbage collection window. If the change occurred outside of the garbage collection window, it will not be returned as part of this output. See Garbage collection and changefeeds for more detail on how the garbage collection window interacts with changefeeds.

Customize changefeed messages

You can adapt your changefeed messages by filtering the columns, but it is also possible to build message fields with SQL expressions.

In this example, the query adds a summary field to the changefeed message:

icon/buttons/copy
CREATE CHANGEFEED AS SELECT *, owner_id::string || ' takes passengers by ' || type || '. They are currently ' || status AS summary FROM vehicles;
{"key":"[\"seattle\", \"e88af90e-1212-4d10-ad13-5b30cfe3bd16\"]","table":"vehicles","value":"{\"city\": \"seattle\", \"creation_time\": \"2019-01-02T03:04:05\", \"current_location\": \"49128 Gerald Mall\", \"ext\": {\"color\": \"yellow\"}, \"id\": \"e88af90e-1212-4d10-ad13-5b30cfe3bd16\", \"owner_id\": \"28df0fab-cde9-4bc1-a11e-769f4b915171\", \"status\": \"in_use\", \"summary\": \"28df0fab-cde9-4bc1-a11e-769f4b915171 takes passengers by skateboard. They are currently in_use\", \"type\": \"skateboard\"}"}
{"key":"[\"seattle\", \"f00edb5f-d951-4dfd-8c1c-d0b34ebc38be\"]","table":"vehicles","value":"{\"city\": \"seattle\", \"creation_time\": \"2019-01-02T03:04:05\", \"current_location\": \"2521 Jaclyn Place Apt. 68\", \"ext\": {\"color\": \"yellow\"}, \"id\": \"f00edb5f-d951-4dfd-8c1c-d0b34ebc38be\", \"owner_id\": \"d4b05249-afed-4f89-b7a9-d5533f687a13\", \"status\": \"available\", \"summary\": \"d4b05249-afed-4f89-b7a9-d5533f687a13 takes passengers by scooter. They are currently available\", \"type\": \"scooter\"}"}

Create a scheduled changefeed to export filtered data

This example creates a nightly export of some filtered table data with a scheduled changefeed that will run just after midnight every night. The changefeed uses CDC queries to query the table and filter the data it will send to the sink:

icon/buttons/copy
CREATE SCHEDULE sf_skateboard FOR CHANGEFEED INTO 'external://cloud-sink' WITH format=csv 
  AS SELECT current_location AS sf_address, id, type, status FROM vehicles 
  WHERE city = 'san francisco' AND type = 'skateboard' 
  RECURRING '1 0 * * *' WITH SCHEDULE OPTIONS on_execution_failure=retry, on_previous_running=start;

The schedule options control the schedule's behavior:

  • If it runs into a failure, on_execution_failure=retry will ensure that the schedule retries the changefeed immediately.
  • If the previous scheduled changefeed is still running, on_previous_running=start will start a new changefeed at the defined cadence.

Queries and the outbox pattern

The transactional outbox pattern provides a way to publish events reliably through an outbox table before sending to the messaging system. CDC queries can help to streamline this process by eliminating the need for an outbox table in the database. If you also have a requirement to transform the data or remove delete messages from the changefeed payload, queries can achieve this.

For example, you have three tables: users, accounts, and dogs. You need to send all changes to any of those tables to a single Kafka endpoint using a specific structure. Namely, a JSON object like the following:

{
  "event_timestamp": 1663698160437524000,
  "table": "dogs",
  "type": "create",
  "data": "{ \"good_boy\": true }"
}

To achieve this, you create changefeeds directly on the tables and transform the result into the required format.

For the previous JSON example:

icon/buttons/copy
CREATE CHANGEFEED INTO 'kafka://endpoint?topic_name=events' AS SELECT
event_schema_timestamp()::int AS event_timestamp,
'dogs' AS table,
event_op() AS type,
jsonb_build_object('good_boy',good_boy) AS data
FROM dogs;

This statement does the following:

For the remaining tables, you use the same statement structure to create changefeeds that will send messages to the Kafka endpoint:

icon/buttons/copy
CREATE CHANGEFEED INTO 'kafka://endpoint?topic_name=events' AS SELECT
event_schema_timestamp()::int AS event_timestamp,
'users' AS table,
event_op() AS type,
jsonb_build_object('email', email, 'admin', admin) AS data
FROM users;
icon/buttons/copy
CREATE CHANGEFEED INTO 'kafka://endpoint?topic_name=events' AS SELECT
event_schema_timestamp()::int AS event_timestamp,
'accounts' AS table,
event_op() AS type,
jsonb_build_object('owner', owner) AS data
FROM accounts;

For a different usage of the outbox pattern, you may still want an events table to track and manage the lifecycle of an event. You can also use CDC queries in this case to filter the event management metadata out of a message.

For example, when you delete a message in your outbox table after processing it (or with row-level TTL). You can filter the delete messages from your changefeed:

icon/buttons/copy
CREATE CHANGEFEED INTO 'kafka://endpoint?topic_name=events' AS SELECT * FROM outbox WHERE event_op() != 'delete';

Similarly, if you have a status column in your outbox table tracking its lifecycle, you can filter out updates as well so that only the initial insert sends a message:

icon/buttons/copy
CREATE CHANGEFEED INTO 'scheme://sink-URI' AS SELECT status, cdc_prev FROM outbox WHERE (cdc_prev).status IS NULL;

Since all non-primary key columns will be NULL in the cdc_prev output for an insert message, insert messages will be sent. Updates will not send, as long as the status was not previously NULL.

Queries and user-defined functions

New in v23.1: You can create CDC queries that include user-defined functions.

The following CREATE FUNCTION statement builds the doubleRevenue() function at the database level:

icon/buttons/copy
CREATE FUNCTION doubleRevenue(r int)
RETURNS INT IMMUTABLE LEAKPROOF LANGUAGE SQL AS 
$$ SELECT 2 * r $$;

You can then use this function within a CDC query tagetting a table in the same database:

icon/buttons/copy
CREATE CHANGEFEED INTO 'external://sink' AS SELECT rider_id, doubleRevenue(rides.revenue::int) FROM rides WHERE revenue < 30;

See also


Yes No
On this page

Yes No