---
title: Create a custom job to downsample and compress chunks | Tiger Data Docs
description: Downsample and convert hypertable chunks to the columnstore by combining a continuous aggregate refresh policy with hypercore
---

TimescaleDB lets you downsample and convert chunks to the columnstore by combining a [continuous aggregate refresh policy](/docs/build/continuous-aggregates/refresh-policies/index.md) with [hypercore](/docs/learn/columnar-storage/understand-hypercore/index.md). If you want to implement features not supported by those policies, you can write a job to downsample and convert chunks to the columnstore instead.

The following example downsamples raw data to an average over hourly data. This is an illustrative example, which can be done more simply with a continuous aggregate policy. But you can make the query arbitrarily complex.

1. **Create a procedure to downsample chunks and convert them to columnstore**

   This procedure first queries the chunks of a hypertable to determine if they are older than the `lag` parameter. The hypertable in this example is named `metrics`. If the chunk is not already in the columnstore, downsample it by taking the average of the raw data, then convert it to the columnstore. This procedure uses a temporary table to store the data while calculating the average.

   ```
   CREATE OR REPLACE PROCEDURE downsample_compress (job_id int, config jsonb)
   LANGUAGE PLPGSQL
   AS $$
   DECLARE
     lag interval;
     chunk REGCLASS;
     tmp_name name;
   BEGIN
     SELECT jsonb_object_field_text (config, 'lag')::interval INTO STRICT lag;


     IF lag IS NULL THEN
       RAISE EXCEPTION 'Config must have lag';
     END IF;


     FOR chunk IN
       SELECT show.oid
       FROM show_chunks('metrics', older_than => lag) SHOW (oid)
         INNER JOIN pg_class pgc ON pgc.oid = show.oid
         INNER JOIN pg_namespace pgns ON pgc.relnamespace = pgns.oid
         INNER JOIN timescaledb_information.chunks chunk ON chunk.chunk_name = pgc.relname
           AND chunk.chunk_schema = pgns.nspname
       WHERE chunk.is_compressed::bool = FALSE
     LOOP
       RAISE NOTICE 'Processing chunk: %', chunk::text;


       -- build name for temp table
       SELECT '_tmp' || relname
       FROM pg_class
       WHERE oid = chunk INTO STRICT tmp_name;


       -- copy downsampled chunk data into temp table
       EXECUTE format($sql$ CREATE UNLOGGED TABLE %I AS
         SELECT time_bucket('1h', time), device_id, avg(value) FROM %s GROUP BY 1, 2;
       $sql$, tmp_name, chunk);


       -- clear original chunk
       EXECUTE format('TRUNCATE %s;', chunk);


       -- copy downsampled data back into chunk
       EXECUTE format('INSERT INTO %s(time, device_id, value) SELECT * FROM %I;', chunk, tmp_name);


       -- drop temp table
       EXECUTE format('DROP TABLE %I;', tmp_name);


       PERFORM convert_to_columnstore (chunk);


       COMMIT;
     END LOOP;
   END
   $$;
   ```

2. **Register the job to run daily**

   In the `config`, set `lag` to 12 months to drop chunks containing data older than 12 months.

   ```
   SELECT add_job('downsample_compress','1d', config => '{"lag":"12 month"}');
   ```
