Skip to content

Instantly share code, notes, and snippets.

@Revadike
Last active April 29, 2025 13:11
Show Gist options
  • Save Revadike/ae16fffa592e5acb8bf4636f5e68a57b to your computer and use it in GitHub Desktop.
Save Revadike/ae16fffa592e5acb8bf4636f5e68a57b to your computer and use it in GitHub Desktop.
Cron csv procedure

1. prerequisites

  • pg_cron extension installed and enabled (superuser)
  • your target tables have RLS policies using auth.uid()/auth.jwt()

2. create and extend the staging table

create schema if not exists ingest;

create table ingest.staging (
  staging_id    bigserial    primary key,
  ingest_id     uuid         not null default gen_random_uuid(),
  ingest_jwt    text         not null,
  target_schema text         not null,
  target_table  text         not null,
  payload_csv   text         not null,
  loaded_at     timestamp    not null default clock_timestamp()
);
  • ingest_id isolates each upload batch
  • ingest_jwt preserves the user’s token for later RLS context

3. loader RPC: shove CSV + JWT into staging

create or replace procedure ingest.load_csv(
  p_table_name  text,
  p_csv_lines   text[],
  p_ingest_jwt  text
)
language plpgsql
security invoker
as $$
declare
  v_schema  text;
  v_table   text;
begin
  if strpos(p_table_name, '.') > 0 then
    v_schema := split_part(p_table_name, '.', 1);
    v_table  := split_part(p_table_name, '.', 2);
  else
    v_schema := 'public';
    v_table  := p_table_name;
  end if;

  insert into ingest.staging(
    ingest_id,
    ingest_jwt,
    target_schema,
    target_table,
    payload_csv
  )
  select
    gen_random_uuid(),
    p_ingest_jwt,
    v_schema,
    v_table,
    line
  from unnest(p_csv_lines) as line;
end;
$$;
  • called via supabase.rpc('ingest.load_csv', { p_table_name, p_csv_lines, p_ingest_jwt })
  • tags each row with both a batch UUID and the user’s JWT

4. single combined processor: pick, inject, upsert, clean

create or replace procedure ingest.process_batches(batch_size int default 5000)
language plpgsql
as $$
declare
  v_id       uuid;
  v_jwt      text;
  v_schema   text;
  v_table    text;
  v_cols     text;
  v_pk       text;
  v_set      text;
  v_sql      text;
begin
  loop
    -- pick one batch
    select ingest_id, ingest_jwt, target_schema, target_table
      into v_id, v_jwt, v_schema, v_table
      from ingest.staging
     limit 1
     for update skip locked;

    exit when not found;

    -- inject JWT so auth.uid()/auth.jwt() work
    perform set_config('request.jwt.claim.sub',
                       (v_jwt::jsonb ->> 'sub'),
                       false);
    perform set_config('request.jwt.claims',
                       v_jwt,
                       false);

    -- dynamically fetch columns & pk
    select string_agg(quote_ident(column_name), ', ')
      into v_cols
      from information_schema.columns
     where table_schema = v_schema
       and table_name   = v_table;

    select string_agg(quote_ident(a.attname), ', ')
      into v_pk
      from pg_index i
      join pg_attribute a
        on a.attrelid = i.indrelid
       and a.attnum   = any(i.indkey)
     where i.indrelid = format('%I.%I', v_schema, v_table)::regclass
       and i.indisprimary;

    select string_agg(
             format('%1$s = excluded.%1$s', col),
             ', '
           )
      into v_set
      from unnest(string_to_array(v_cols, ', ')) as col
     where col <> any(string_to_array(v_pk, ', '));

    -- build the upsert SQL
    v_sql := format($f$
      insert into %I.%I (%s)
      select %s
        from jsonb_populate_recordset(
               null::%I.%I,
               $1
             )
      on conflict (%s) do update
        set %s
    $f$,
      v_schema, v_table,
      v_cols,
      v_cols,
      v_schema, v_table,
      v_pk,
      v_set
    );

    -- aggregate CSV -> jsonb array for this batch
    with to_upsert as (
      select jsonb_agg(
               jsonb_build_object( 
                 -- assume CSV columns match exactly; parse as needed
                 -- split_part(payload_csv, ',', 1)::type as col1, ...
                 'csv', payload_csv
               )
             ) as arr
      from ingest.staging
     where ingest_id = v_id
    )
    -- execute the upsert
    execute v_sql
      using (select arr from to_upsert);

    -- cleanup
    delete from ingest.staging
     where ingest_id = v_id;
  end loop;
end;
$$;
  • all in one proc: pick, inject JWT, introspect schema, upsert, cleanup
  • no separate public.bulk_ingest needed

5. schedule it via pg_cron

select cron.schedule(
  '*/1 * * * *',
  $$
    select ingest.process_batches();
  $$
);
  • runs every minute; adjust as desired

6. RLS policies on production tables

alter table public.my_table enable row level security;

create policy owners_only
  on public.my_table
  for all
  using ( owner_id = auth.uid() )
  with check ( owner_id = auth.uid() );
  • because the batch processor sets request.jwt.claim.sub, auth.uid() returns the right user for every row

7. potential concerns

  • JWT storage size: full tokens in each row can bloat staging; consider storing only the sub claim if you prefer.
  • session GUC limits: Postgres limits each setting’s size (~2 KB); large JWTs might exceed it.
  • schema introspection cost: querying information_schema/pg_index per batch has some overhead; cache if necessary.
  • cron collisions: if process_batches takes longer than the interval, two workers might run simultaneously; use advisory locks or extend the cron interval.
  • error handling: currently a failing upsert means the batch rows are deleted; you may wish to catch exceptions and move those rows to an error table instead.
  • CSV parsing: above we treated each payload_csv as a single JSON field; in reality you’d need to parse CSV columns into their proper types—either in SQL or before loading.

With this single, self-contained procedure, you handle ingest, RLS-safe upsert, and batch cleanup in one place—no separate public.bulk_ingest required, and you avoid long-lived external functions entirely.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment