Skip to content

Instantly share code, notes, and snippets.

@ykessler
Last active June 1, 2026 00:52
Show Gist options
  • Select an option

  • Save ykessler/d8efd685036d6a60a94b04c7b177a9e3 to your computer and use it in GitHub Desktop.

Select an option

Save ykessler/d8efd685036d6a60a94b04c7b177a9e3 to your computer and use it in GitHub Desktop.
temporary acris legals addr_unit backfill pull script
#!/usr/bin/env ruby
# Temporary Render-run ACRIS legals unit pull. No repo dependency.
# Requires DATABASE_URL in env. Writes only to _backfill_acris_legals_unit.
require 'pg'
require 'net/http'
require 'json'
require 'uri'
require 'time'
DATABASE_URL = ENV.fetch('DATABASE_URL')
DATASET = ENV.fetch('SODA_DATASET', '8h5j-fqxa')
SODA_TOKEN = ENV.fetch('SODA_APP_TOKEN', 'r2HNtbneZzNpphofxtuawl7S7')
BATCH_SIZE = ENV.fetch('BATCH_SIZE', '500').to_i
MAX_DOC_IDS = ENV['MAX_DOC_IDS']&.to_i
SLEEP_BETWEEN = ENV.fetch('SLEEP_BETWEEN', '0.5').to_f
LOG_PREFIX = ENV.fetch('LOG_PREFIX', '[render-acris-pull] ')
INSERT_CHUNK_SIZE = ENV.fetch('INSERT_CHUNK_SIZE', '5000').to_i
def log(msg)
STDOUT.puts "#{LOG_PREFIX}[#{Time.now.utc.iso8601}] #{msg}"
STDOUT.flush
end
def bbl_from_parts(borough, block, lot)
b = Integer(borough.to_s)
bl = Integer(block.to_s)
lt = Integer(lot.to_s)
raise 'bad borough' unless (1..5).cover?(b)
raise 'bad block' if bl < 1 || bl > 99999
raise 'bad lot' if lt < 1 || lt > 9999
"#{b}#{bl.to_s.rjust(5,'0')}#{lt.to_s.rjust(4,'0')}".to_i
end
def soda_get(dataset, params)
uri = URI("https://data.cityofnewyork.us/resource/#{dataset}.json")
uri.query = URI.encode_www_form(params)
req = Net::HTTP::Get.new(uri)
req['X-App-Token'] = SODA_TOKEN
Net::HTTP.start(uri.host, uri.port, use_ssl: true, read_timeout: 180, open_timeout: 30) do |http|
res = http.request(req)
raise "SODA HTTP #{res.code}: #{res.body[0,300]}" unless res.is_a?(Net::HTTPSuccess)
JSON.parse(res.body)
end
end
conn = PG.connect(DATABASE_URL.sub(/^postgis/, 'postgres'))
log "connected; batch_size=#{BATCH_SIZE}; max_doc_ids=#{MAX_DOC_IDS || 'ALL'}"
limit_clause = MAX_DOC_IDS ? "LIMIT #{MAX_DOC_IDS.to_i}" : ""
todo_sql = <<~SQL
SELECT DISTINCT l.document_id
FROM nycod_acris_real_property_legals l
WHERE l.good_through_date >= DATE '2019-01-01'
AND (l.addr_unit IS NULL OR l.addr_unit = '')
AND NOT EXISTS (
SELECT 1 FROM _backfill_acris_legals_unit s
WHERE s.document_id = l.document_id
)
ORDER BY l.document_id
#{limit_clause}
SQL
log 'computing todo list...'
todo = conn.exec(todo_sql).map { |r| r['document_id'] }
log "todo distinct doc_ids=#{todo.size}"
exit 0 if todo.empty?
total_soda = 0
total_inserted = 0
total_skipped = 0
batch_idx = 0
batch_total = (todo.size.to_f / BATCH_SIZE).ceil
todo.each_slice(BATCH_SIZE) do |ids|
batch_idx += 1
started = Time.now
where = "document_id in (#{ids.map { |x| "'#{x}'" }.join(',')})"
items = nil
begin
items = soda_get(DATASET, { '$where' => where, '$limit' => 50_000 })
rescue => e
log "batch #{batch_idx}/#{batch_total} SODA ERROR #{e.class}: #{e.message}; sleeping 10s, continuing"
sleep 10
next
end
rows = []
items.each do |r|
bbl = bbl_from_parts(r['borough'], r['block'], r['lot']) rescue nil
next unless bbl
unit = r['unit'].to_s.strip
unit = nil if unit.empty?
sn = r['street_number'].to_s.strip
sm = r['street_name'].to_s.strip
sn = nil if sn.empty?
sm = nil if sm.empty?
base = []
base << sn if sn
base << sm if sm
new_parts = unit ? base + ["APT #{unit}"] : base
old_address = base.any? ? base.join(' ') : 'NA'
new_address = new_parts.any? ? new_parts.join(' ') : 'NA'
rows << [r['document_id'].to_s, bbl, old_address, unit, new_address]
end
inserted = 0
rows.each_slice(INSERT_CHUNK_SIZE) do |chunk|
values = []
binds = []
chunk.each_with_index do |row, i|
base_i = i * 5
values << "($#{base_i+1},$#{base_i+2},$#{base_i+3},$#{base_i+4},$#{base_i+5})"
binds.concat(row)
end
sql = <<~SQL
INSERT INTO _backfill_acris_legals_unit
(document_id, bbl, old_address, new_unit, new_address)
VALUES #{values.join(',')}
ON CONFLICT (document_id, bbl, old_address) DO NOTHING
SQL
inserted += conn.exec_params(sql, binds).cmd_tuples
end
skipped = rows.size - inserted
total_soda += items.size
total_inserted += inserted
total_skipped += skipped
elapsed = (Time.now - started).round(2)
log "batch #{batch_idx}/#{batch_total} ids=#{ids.size} soda_rows=#{items.size} inserted=#{inserted} skipped=#{skipped} elapsed=#{elapsed}s"
sleep SLEEP_BETWEEN if SLEEP_BETWEEN > 0
end
log "DONE batches=#{batch_idx} soda_rows=#{total_soda} inserted=#{total_inserted} skipped=#{total_skipped}"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment