Created
October 8, 2014 15:22
-
-
Save thatandyrose/e766fbd6e9ba0d39316d to your computer and use it in GitHub Desktop.
GTFS importer snippets
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'csv' | |
require 'open-uri' | |
module GTFS | |
class AgencyImporter < BaseImporter | |
def import(agency_ids = []) | |
options = { | |
document_build: ->(row){{agency_id: row[:agency_id], agency_timezone: row[:agency_timezone], agency_name: row[:agency_name]}} | |
} | |
if agency_ids.empty? | |
log("Importing all Agencies.") | |
else | |
log("Importing Agencies for ids: #{agency_ids}.") | |
options[:condition_array] = agency_ids.map(&:to_s) | |
options[:condition_field] = :agency_id | |
end | |
generic_import(options) | |
end | |
end | |
end | |
module GTFS | |
class ShapeImporter < BaseImporter | |
def import | |
generic_import({ | |
condition_array: Trip.col.distinct(:shape_id), | |
enforce_condition_delete:true, | |
condition_field: :shape_id, | |
document_build: ->(row){ | |
{ | |
shape_id: row[:shape_id], | |
shape_pt_lat: row[:shape_pt_lat].to_f, | |
shape_pt_lon: row[:shape_pt_lon].to_f, | |
shape_pt_sequence: row[:shape_pt_sequence].to_i, | |
shape_dist_traveled: row[:shape_dist_traveled].to_i | |
} | |
} | |
}) | |
end | |
end | |
end | |
module GTFS | |
class CalendarDateImporter < BaseImporter | |
def import | |
generic_import({ | |
condition_array: Trip.col.distinct(:service_id), | |
condition_field: :service_id, | |
document_build: ->(row){{service_id: row[:service_id], effective_on: Time.parse(row[:date]), exception_type: row[:exception_type].to_i}} | |
}) | |
end | |
end | |
end | |
module GTFS | |
class BaseImporter | |
def initialize(file_uri,options = {}) | |
@file_uri = file_uri | |
@log_lines_threshhold = options[:log_lines_threshold] ? options[:log_lines_threshold] : 10000 | |
collection_name = self.class.to_s.gsub('GTFS::','').gsub('Importer','') | |
if collection_name != 'Base' | |
@model_connection = collection_name.constantize | |
@mongo_col = @model_connection.col | |
end | |
end | |
def open_file | |
log("Opening file: #{@file_uri}.") | |
open(@file_uri) | |
end | |
def iterate_csv(&block) | |
file = open_file | |
log("Iterating file. Size: #{file.size/1024/1024} MB") | |
row_count = 0 | |
import_count = 0 | |
header = nil | |
file.each_line do |line| | |
row = line.parse_csv | |
row = sanitize_csv_row_values(row) | |
if row_count > 0 #we don't want to import the header row. | |
import_count += 1 if block.call(create_row_hash(header,row)) | |
else | |
header = row | |
log("First line read. Next log will be in #{@log_lines_threshhold} lines") | |
end | |
row_count += 1 | |
log("#{row_count} row(s) iterated. #{import_count} imported.") if is_a_nth(row_count,@log_lines_threshhold) | |
end | |
log("#{import_count} row(s) out of #{row_count - 1} imported.") | |
end | |
def generic_import(o = {}) | |
log("Importing for #{@model_connection.col_name}.") | |
if o[:condition_array] | |
condition_block = ->(row){o[:condition_array].include?(row[o[:condition_field]])} | |
else | |
condition_block = ->(row){true} | |
end | |
log("Delete documets for #{@model_connection.col_name} before we start.") | |
@mongo_col.drop | |
if !o[:condition_array] || o[:condition_array].any? | |
log("Importer will Iterate CSV") | |
import_by_iterating(o.merge(condition_block:condition_block)) | |
else | |
log("No imports for #{@model_connection.col_name} as condition array was empty") | |
end | |
end | |
def sanitize_csv_row_values(row_array) | |
row_array.map{|val|val.to_s.strip} | |
end | |
def create_row_hash(header_array, values_array) | |
new_hash = {} | |
header_array.each_with_index do |key, i| | |
#remove weird char from beginning | |
if !key[0].match(/^[a-zA-Z0-9_-]+$/) | |
key.slice!(0) | |
end | |
new_hash = new_hash.merge(key.to_sym => values_array[i]) | |
end | |
new_hash | |
end | |
def is_a_nth(count,n) | |
(count.to_f/n).to_i == (count.to_f/n) | |
end | |
def log(msg) | |
puts "#{DateTime.now.to_s(:short)}: #{msg}" | |
end | |
def import_by_iterating(options) | |
document_build_block = options[:document_build] | |
enforce_condition_delete = options[:enforce_condition_delete] | |
condition_field = options[:condition_field] | |
condition_array = options[:condition_array] | |
condition_block = options[:condition_block] | |
batch = [] | |
batch_size = 30000 | |
inserts = 0 | |
iterate_csv do |row| | |
if enforce_condition_delete || condition_block.call(row) | |
batch << document_build_block.call(row) | |
if batch.size >= batch_size | |
inserts += batch.size | |
@mongo_col.insert(batch) | |
batch = [] | |
end | |
true | |
end | |
end | |
@mongo_col.insert(batch) if batch.size > 0 | |
inserts += batch.size | |
batch = nil | |
actual_count = @mongo_col.count | |
log("Checking reported count was inserted. reported:#{inserts}, actual:#{actual_count}") | |
if enforce_condition_delete | |
log("enforce_condition_delete is true. Lets delete some data.") | |
@mongo_col.remove(condition_field => {'$nin' => condition_array}) | |
removed_count = actual_count - @mongo_col.count | |
log("#{removed_count} documents removed from #{@model_connection.col_name}") | |
end | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment