- pg_cron extension installed and enabled (superuser)
- your target tables have RLS policies using
auth.uid()
/auth.jwt()
create schema if not exists ingest;
create table ingest.staging (
staging_id bigserial primary key,
ingest_id uuid not null default gen_random_uuid(),
ingest_jwt text not null,
target_schema text not null,
target_table text not null,
payload_csv text not null,
loaded_at timestamp not null default clock_timestamp()
);
ingest_id
isolates each upload batchingest_jwt
preserves the user’s token for later RLS context
create or replace procedure ingest.load_csv(
p_table_name text,
p_csv_lines text[],
p_ingest_jwt text
)
language plpgsql
security invoker
as $$
declare
v_schema text;
v_table text;
begin
if strpos(p_table_name, '.') > 0 then
v_schema := split_part(p_table_name, '.', 1);
v_table := split_part(p_table_name, '.', 2);
else
v_schema := 'public';
v_table := p_table_name;
end if;
insert into ingest.staging(
ingest_id,
ingest_jwt,
target_schema,
target_table,
payload_csv
)
select
gen_random_uuid(),
p_ingest_jwt,
v_schema,
v_table,
line
from unnest(p_csv_lines) as line;
end;
$$;
- called via
supabase.rpc('ingest.load_csv', { p_table_name, p_csv_lines, p_ingest_jwt })
- tags each row with both a batch UUID and the user’s JWT
create or replace procedure ingest.process_batches(batch_size int default 5000)
language plpgsql
as $$
declare
v_id uuid;
v_jwt text;
v_schema text;
v_table text;
v_cols text;
v_pk text;
v_set text;
v_sql text;
begin
loop
-- pick one batch
select ingest_id, ingest_jwt, target_schema, target_table
into v_id, v_jwt, v_schema, v_table
from ingest.staging
limit 1
for update skip locked;
exit when not found;
-- inject JWT so auth.uid()/auth.jwt() work
perform set_config('request.jwt.claim.sub',
(v_jwt::jsonb ->> 'sub'),
false);
perform set_config('request.jwt.claims',
v_jwt,
false);
-- dynamically fetch columns & pk
select string_agg(quote_ident(column_name), ', ')
into v_cols
from information_schema.columns
where table_schema = v_schema
and table_name = v_table;
select string_agg(quote_ident(a.attname), ', ')
into v_pk
from pg_index i
join pg_attribute a
on a.attrelid = i.indrelid
and a.attnum = any(i.indkey)
where i.indrelid = format('%I.%I', v_schema, v_table)::regclass
and i.indisprimary;
select string_agg(
format('%1$s = excluded.%1$s', col),
', '
)
into v_set
from unnest(string_to_array(v_cols, ', ')) as col
where col <> any(string_to_array(v_pk, ', '));
-- build the upsert SQL
v_sql := format($f$
insert into %I.%I (%s)
select %s
from jsonb_populate_recordset(
null::%I.%I,
$1
)
on conflict (%s) do update
set %s
$f$,
v_schema, v_table,
v_cols,
v_cols,
v_schema, v_table,
v_pk,
v_set
);
-- aggregate CSV -> jsonb array for this batch
with to_upsert as (
select jsonb_agg(
jsonb_build_object(
-- assume CSV columns match exactly; parse as needed
-- split_part(payload_csv, ',', 1)::type as col1, ...
'csv', payload_csv
)
) as arr
from ingest.staging
where ingest_id = v_id
)
-- execute the upsert
execute v_sql
using (select arr from to_upsert);
-- cleanup
delete from ingest.staging
where ingest_id = v_id;
end loop;
end;
$$;
- all in one proc: pick, inject JWT, introspect schema, upsert, cleanup
- no separate
public.bulk_ingest
needed
select cron.schedule(
'*/1 * * * *',
$$
select ingest.process_batches();
$$
);
- runs every minute; adjust as desired
alter table public.my_table enable row level security;
create policy owners_only
on public.my_table
for all
using ( owner_id = auth.uid() )
with check ( owner_id = auth.uid() );
- because the batch processor sets
request.jwt.claim.sub
,auth.uid()
returns the right user for every row
- JWT storage size: full tokens in each row can bloat staging; consider storing only the
sub
claim if you prefer. - session GUC limits: Postgres limits each setting’s size (~2 KB); large JWTs might exceed it.
- schema introspection cost: querying
information_schema
/pg_index
per batch has some overhead; cache if necessary. - cron collisions: if
process_batches
takes longer than the interval, two workers might run simultaneously; use advisory locks or extend the cron interval. - error handling: currently a failing upsert means the batch rows are deleted; you may wish to catch exceptions and move those rows to an error table instead.
- CSV parsing: above we treated each
payload_csv
as a single JSON field; in reality you’d need to parse CSV columns into their proper types—either in SQL or before loading.
With this single, self-contained procedure, you handle ingest, RLS-safe upsert, and batch cleanup in one place—no separate public.bulk_ingest
required, and you avoid long-lived external functions entirely.