Last active
June 1, 2026 00:52
-
-
Save ykessler/d8efd685036d6a60a94b04c7b177a9e3 to your computer and use it in GitHub Desktop.
temporary acris legals addr_unit backfill pull script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env ruby | |
| # Temporary Render-run ACRIS legals unit pull. No repo dependency. | |
| # Requires DATABASE_URL in env. Writes only to _backfill_acris_legals_unit. | |
| require 'pg' | |
| require 'net/http' | |
| require 'json' | |
| require 'uri' | |
| require 'time' | |
| DATABASE_URL = ENV.fetch('DATABASE_URL') | |
| DATASET = ENV.fetch('SODA_DATASET', '8h5j-fqxa') | |
| SODA_TOKEN = ENV.fetch('SODA_APP_TOKEN', 'r2HNtbneZzNpphofxtuawl7S7') | |
| BATCH_SIZE = ENV.fetch('BATCH_SIZE', '500').to_i | |
| MAX_DOC_IDS = ENV['MAX_DOC_IDS']&.to_i | |
| SLEEP_BETWEEN = ENV.fetch('SLEEP_BETWEEN', '0.5').to_f | |
| LOG_PREFIX = ENV.fetch('LOG_PREFIX', '[render-acris-pull] ') | |
| INSERT_CHUNK_SIZE = ENV.fetch('INSERT_CHUNK_SIZE', '5000').to_i | |
| def log(msg) | |
| STDOUT.puts "#{LOG_PREFIX}[#{Time.now.utc.iso8601}] #{msg}" | |
| STDOUT.flush | |
| end | |
| def bbl_from_parts(borough, block, lot) | |
| b = Integer(borough.to_s) | |
| bl = Integer(block.to_s) | |
| lt = Integer(lot.to_s) | |
| raise 'bad borough' unless (1..5).cover?(b) | |
| raise 'bad block' if bl < 1 || bl > 99999 | |
| raise 'bad lot' if lt < 1 || lt > 9999 | |
| "#{b}#{bl.to_s.rjust(5,'0')}#{lt.to_s.rjust(4,'0')}".to_i | |
| end | |
| def soda_get(dataset, params) | |
| uri = URI("https://data.cityofnewyork.us/resource/#{dataset}.json") | |
| uri.query = URI.encode_www_form(params) | |
| req = Net::HTTP::Get.new(uri) | |
| req['X-App-Token'] = SODA_TOKEN | |
| Net::HTTP.start(uri.host, uri.port, use_ssl: true, read_timeout: 180, open_timeout: 30) do |http| | |
| res = http.request(req) | |
| raise "SODA HTTP #{res.code}: #{res.body[0,300]}" unless res.is_a?(Net::HTTPSuccess) | |
| JSON.parse(res.body) | |
| end | |
| end | |
| conn = PG.connect(DATABASE_URL.sub(/^postgis/, 'postgres')) | |
| log "connected; batch_size=#{BATCH_SIZE}; max_doc_ids=#{MAX_DOC_IDS || 'ALL'}" | |
| limit_clause = MAX_DOC_IDS ? "LIMIT #{MAX_DOC_IDS.to_i}" : "" | |
| todo_sql = <<~SQL | |
| SELECT DISTINCT l.document_id | |
| FROM nycod_acris_real_property_legals l | |
| WHERE l.good_through_date >= DATE '2019-01-01' | |
| AND (l.addr_unit IS NULL OR l.addr_unit = '') | |
| AND NOT EXISTS ( | |
| SELECT 1 FROM _backfill_acris_legals_unit s | |
| WHERE s.document_id = l.document_id | |
| ) | |
| ORDER BY l.document_id | |
| #{limit_clause} | |
| SQL | |
| log 'computing todo list...' | |
| todo = conn.exec(todo_sql).map { |r| r['document_id'] } | |
| log "todo distinct doc_ids=#{todo.size}" | |
| exit 0 if todo.empty? | |
| total_soda = 0 | |
| total_inserted = 0 | |
| total_skipped = 0 | |
| batch_idx = 0 | |
| batch_total = (todo.size.to_f / BATCH_SIZE).ceil | |
| todo.each_slice(BATCH_SIZE) do |ids| | |
| batch_idx += 1 | |
| started = Time.now | |
| where = "document_id in (#{ids.map { |x| "'#{x}'" }.join(',')})" | |
| items = nil | |
| begin | |
| items = soda_get(DATASET, { '$where' => where, '$limit' => 50_000 }) | |
| rescue => e | |
| log "batch #{batch_idx}/#{batch_total} SODA ERROR #{e.class}: #{e.message}; sleeping 10s, continuing" | |
| sleep 10 | |
| next | |
| end | |
| rows = [] | |
| items.each do |r| | |
| bbl = bbl_from_parts(r['borough'], r['block'], r['lot']) rescue nil | |
| next unless bbl | |
| unit = r['unit'].to_s.strip | |
| unit = nil if unit.empty? | |
| sn = r['street_number'].to_s.strip | |
| sm = r['street_name'].to_s.strip | |
| sn = nil if sn.empty? | |
| sm = nil if sm.empty? | |
| base = [] | |
| base << sn if sn | |
| base << sm if sm | |
| new_parts = unit ? base + ["APT #{unit}"] : base | |
| old_address = base.any? ? base.join(' ') : 'NA' | |
| new_address = new_parts.any? ? new_parts.join(' ') : 'NA' | |
| rows << [r['document_id'].to_s, bbl, old_address, unit, new_address] | |
| end | |
| inserted = 0 | |
| rows.each_slice(INSERT_CHUNK_SIZE) do |chunk| | |
| values = [] | |
| binds = [] | |
| chunk.each_with_index do |row, i| | |
| base_i = i * 5 | |
| values << "($#{base_i+1},$#{base_i+2},$#{base_i+3},$#{base_i+4},$#{base_i+5})" | |
| binds.concat(row) | |
| end | |
| sql = <<~SQL | |
| INSERT INTO _backfill_acris_legals_unit | |
| (document_id, bbl, old_address, new_unit, new_address) | |
| VALUES #{values.join(',')} | |
| ON CONFLICT (document_id, bbl, old_address) DO NOTHING | |
| SQL | |
| inserted += conn.exec_params(sql, binds).cmd_tuples | |
| end | |
| skipped = rows.size - inserted | |
| total_soda += items.size | |
| total_inserted += inserted | |
| total_skipped += skipped | |
| elapsed = (Time.now - started).round(2) | |
| log "batch #{batch_idx}/#{batch_total} ids=#{ids.size} soda_rows=#{items.size} inserted=#{inserted} skipped=#{skipped} elapsed=#{elapsed}s" | |
| sleep SLEEP_BETWEEN if SLEEP_BETWEEN > 0 | |
| end | |
| log "DONE batches=#{batch_idx} soda_rows=#{total_soda} inserted=#{total_inserted} skipped=#{total_skipped}" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment