#!/usr/bin/env python3 # -*- encoding: utf-8 -*- from typing import Any from typing import List from typing import Dict from typing import Union from pathlib import Path import sqlite3 import json import csv CAPITAL_LVL = ['', 'minor', 'admin', 'primary'] CAPITAL_SEQ = ['City', 'District', 'State/Province', 'Country'] def parse_csv(fn: Union[str, Path]) -> List[List[str]]: with open(str(fn)) as f: r = csv.reader(f) return [list(row) for row in r] def tbl2dict(tbl: List[List[str]]) -> List[Dict[str, str]]: return [dict(zip(tbl[0], row)) for row in tbl[1:]] def sanitize2tree(tbl: List[Dict[str, str]]) -> Dict[str, Union[Any, Dict[str, Union[Any, Dict[str, Any]]]]]: for entry in tbl: entry['lat'] = float(entry['lat']) entry['lng'] = float(entry['lng']) entry['capital'] = CAPITAL_LVL.index(entry['capital']) del entry['population'] del entry['id'] names = set() countries = dict() for entry in tbl: if entry['country'] not in countries: countries[entry['country']] = { 'name': entry['country'], 'iso2': entry['iso2'], 'iso3': entry['iso3'], 'states': dict(), 'capital': None, } names.add(entry['country']) names.add(entry['iso2']) names.add(entry['iso3']) country = countries[entry['country']] states = country['states'] if entry['admin_name'] not in states: states[entry['admin_name']] = { 'name': entry['admin_name'], 'cities': dict(), 'capital': None, } names.add(entry['admin_name']) state = states[entry['admin_name']] cities = state['cities'] if entry['city'] not in cities: cities[entry['city']] = { 'name': entry['city'], 'name_ascii': entry['city_ascii'], 'lat': entry['lat'], 'lng': entry['lng'], 'capital_of': CAPITAL_SEQ[entry['capital']], 'capital_of_#': entry['capital'], } names.add(entry['city']) names.add(entry['city_ascii']) city = cities[entry['city']] if city['capital_of_#'] >= 2: state['capital'] = city['name'] if city['capital_of_#'] == 3: country['capital'] = state['name'] names.add(city['name']) names.add(state['name']) names.add(country['name']) for country in countries.values(): country_lats = list() country_lngs = list() if country['capital'] not in country['states']: country['capital'] = None if country['capital'] is None and len(country['states']) == 1: country['capital'] = list(country['states'].keys())[0] for state in country['states'].values(): state_lats = list() state_lngs = list() if state['capital'] not in state['cities']: state['capital'] = None if state['capital'] is None and len(state['cities']) == 1: state['capital'] = list(state['cities'].keys())[0] city = state['cities'][state['capital']] city['capital_of_#'] = 2 if country['capital'] == state['name']: city['capital_of_#'] = 3 city['capital_of'] = CAPITAL_SEQ[entry['capital']] for city in state['cities'].values(): country_lats.append(city['lat']) country_lngs.append(city['lng']) state_lats.append(city['lat']) state_lngs.append(city['lng']) state['lat'] = round((max(state_lats)+min(state_lats))/2, 6) state['lng'] = round((max(state_lngs)+min(state_lngs))/2, 6) country['lat'] = round((max(country_lats)+min(country_lats))/2, 6) country['lng'] = round((max(country_lngs)+min(country_lngs))/2, 6) return countries, names def populate_empty_sqlite(conn, countries): rf = conn.row_factory conn.row_factory = sqlite3.Row cur = conn.cursor() cur.execute('''CREATE TABLE cities ( parent_id INT, name VARCHAR(255), name_ascii VARCHAR(255), capital_of VARCHAR(255), capital_of_no INT, lat DOUBLE, lng DOUBLE, FOREIGN KEY(parent_id) REFERENCES states(rowid) );''') cur.execute('''CREATE TABLE states ( parent_id INT, name VARCHAR(255), capital VARCHAR(255), capital_id INTEGER, lat DOUBLE, lng DOUBLE, FOREIGN KEY(parent_id) REFERENCES countries(rowid), FOREIGN KEY(capital_id) REFERENCES cities(rowid) );''') cur.execute('''CREATE TABLE countries ( name VARCHAR(255), iso2 VARCHAR(2), iso3 VARCHAR(3), capital VARCHAR(255), capital_id INTEGER, lat DOUBLE, lng DOUBLE, FOREIGN KEY(capital_id) REFERENCES states(rowid) );''') for country in countries.values(): country_id = cur.execute( '''INSERT INTO countries(name,iso2,iso3,capital,lat,lng) VALUES (?,?,?,?,?,?)''', tuple( [country[key] for key in 'name,iso2,iso3,capital,lat,lng'.split(',')] ) ).lastrowid for state in country['states'].values(): state_id = cur.execute( '''INSERT INTO states(parent_id,name,capital,lat,lng) VALUES (?,?,?,?,?)''', tuple( [country_id] + [state[key] for key in 'name,capital,lat,lng'.split(',')] ) ).lastrowid for city in state['cities'].values(): city_id = cur.execute( '''INSERT INTO cities(parent_id,name,name_ascii,capital_of,capital_of_no,lat,lng) VALUES (?,?,?,?,?,?,?)''', tuple( [state_id] + [city[key] for key in 'name,name_ascii,capital_of,capital_of_#,lat,lng'.split(',')] ) ).lastrowid if city['capital_of_#'] >= 2: cur.execute( 'UPDATE states SET capital_id=? WHERE rowid=?', (city_id, state_id)) if city['capital_of_#'] >= 3: cur.execute( 'UPDATE countries SET capital_id=? WHERE rowid=?', (state_id, country_id)) cur.close() conn.commit() conn.row_factory = rf conn.close() def main(): countries, names = sanitize2tree( tbl2dict(parse_csv('simplemaps_worldcities_basic.csv'))) for todel in [Path('simplemaps_worldcities_basic.json'), Path('simplemaps_worldcities_basic.db')]: if todel.exists(): todel.unlink() Path('simplemaps_worldcities_basic.json').write_text( json.dumps(countries, sort_keys=True, indent=2)) Path('simplemaps_worldcities_basic_names.json').write_text( json.dumps(sorted(list(names)))) populate_empty_sqlite(sqlite3.connect( 'simplemaps_worldcities_basic.db'), countries) # print(countries) # print(countries['Bahamas, The']) if __name__ == '__main__': main()