furmeet-geoquery/makeDb.py

196 lines
7.2 KiB
Python
Executable File

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
from typing import Any
from typing import List
from typing import Dict
from typing import Union
from pathlib import Path
import sqlite3
import json
import csv
CAPITAL_LVL = ['', 'minor', 'admin', 'primary']
CAPITAL_SEQ = ['City', 'District', 'State/Province', 'Country']
def parse_csv(fn: Union[str, Path]) -> List[List[str]]:
with open(str(fn)) as f:
r = csv.reader(f)
return [list(row) for row in r]
def tbl2dict(tbl: List[List[str]]) -> List[Dict[str, str]]:
return [dict(zip(tbl[0], row)) for row in tbl[1:]]
def sanitize2tree(tbl: List[Dict[str, str]]) -> Dict[str, Union[Any, Dict[str, Union[Any, Dict[str, Any]]]]]:
for entry in tbl:
entry['lat'] = float(entry['lat'])
entry['lng'] = float(entry['lng'])
entry['capital'] = CAPITAL_LVL.index(entry['capital'])
del entry['population']
del entry['id']
names = set()
countries = dict()
for entry in tbl:
if entry['country'] not in countries:
countries[entry['country']] = {
'name': entry['country'],
'iso2': entry['iso2'],
'iso3': entry['iso3'],
'states': dict(),
'capital': None,
}
names.add(entry['country'])
names.add(entry['iso2'])
names.add(entry['iso3'])
country = countries[entry['country']]
states = country['states']
if entry['admin_name'] not in states:
states[entry['admin_name']] = {
'name': entry['admin_name'],
'cities': dict(),
'capital': None,
}
names.add(entry['admin_name'])
state = states[entry['admin_name']]
cities = state['cities']
if entry['city'] not in cities:
cities[entry['city']] = {
'name': entry['city'],
'name_ascii': entry['city_ascii'],
'lat': entry['lat'],
'lng': entry['lng'],
'capital_of': CAPITAL_SEQ[entry['capital']],
'capital_of_#': entry['capital'],
}
names.add(entry['city'])
names.add(entry['city_ascii'])
city = cities[entry['city']]
if city['capital_of_#'] >= 2:
state['capital'] = city['name']
if city['capital_of_#'] == 3:
country['capital'] = state['name']
names.add(city['name'])
names.add(state['name'])
names.add(country['name'])
for country in countries.values():
country_lats = list()
country_lngs = list()
if country['capital'] not in country['states']:
country['capital'] = None
if country['capital'] is None and len(country['states']) == 1:
country['capital'] = list(country['states'].keys())[0]
for state in country['states'].values():
state_lats = list()
state_lngs = list()
if state['capital'] not in state['cities']:
state['capital'] = None
if state['capital'] is None and len(state['cities']) == 1:
state['capital'] = list(state['cities'].keys())[0]
city = state['cities'][state['capital']]
city['capital_of_#'] = 2
if country['capital'] == state['name']:
city['capital_of_#'] = 3
city['capital_of'] = CAPITAL_SEQ[entry['capital']]
for city in state['cities'].values():
country_lats.append(city['lat'])
country_lngs.append(city['lng'])
state_lats.append(city['lat'])
state_lngs.append(city['lng'])
state['lat'] = round((max(state_lats)+min(state_lats))/2, 6)
state['lng'] = round((max(state_lngs)+min(state_lngs))/2, 6)
country['lat'] = round((max(country_lats)+min(country_lats))/2, 6)
country['lng'] = round((max(country_lngs)+min(country_lngs))/2, 6)
return countries, names
def populate_empty_sqlite(conn, countries):
rf = conn.row_factory
conn.row_factory = sqlite3.Row
cur = conn.cursor()
cur.execute('''CREATE TABLE cities (
parent_id INT,
name VARCHAR(255),
name_ascii VARCHAR(255),
capital_of VARCHAR(255),
capital_of_no INT,
lat DOUBLE,
lng DOUBLE,
FOREIGN KEY(parent_id) REFERENCES states(rowid)
);''')
cur.execute('''CREATE TABLE states (
parent_id INT,
name VARCHAR(255),
capital VARCHAR(255),
capital_id INTEGER,
lat DOUBLE,
lng DOUBLE,
FOREIGN KEY(parent_id) REFERENCES countries(rowid),
FOREIGN KEY(capital_id) REFERENCES cities(rowid)
);''')
cur.execute('''CREATE TABLE countries (
name VARCHAR(255),
iso2 VARCHAR(2),
iso3 VARCHAR(3),
capital VARCHAR(255),
capital_id INTEGER,
lat DOUBLE,
lng DOUBLE,
FOREIGN KEY(capital_id) REFERENCES states(rowid)
);''')
for country in countries.values():
country_id = cur.execute(
'''INSERT INTO countries(name,iso2,iso3,capital,lat,lng) VALUES (?,?,?,?,?,?)''',
tuple(
[country[key] for key in 'name,iso2,iso3,capital,lat,lng'.split(',')]
)
).lastrowid
for state in country['states'].values():
state_id = cur.execute(
'''INSERT INTO states(parent_id,name,capital,lat,lng) VALUES (?,?,?,?,?)''',
tuple(
[country_id] +
[state[key] for key in 'name,capital,lat,lng'.split(',')]
)
).lastrowid
for city in state['cities'].values():
city_id = cur.execute(
'''INSERT INTO cities(parent_id,name,name_ascii,capital_of,capital_of_no,lat,lng) VALUES (?,?,?,?,?,?,?)''',
tuple(
[state_id] +
[city[key] for key in 'name,name_ascii,capital_of,capital_of_#,lat,lng'.split(',')]
)
).lastrowid
if city['capital_of_#'] >= 2:
cur.execute(
'UPDATE states SET capital_id=? WHERE rowid=?', (city_id, state_id))
if city['capital_of_#'] >= 3:
cur.execute(
'UPDATE countries SET capital_id=? WHERE rowid=?', (state_id, country_id))
cur.close()
conn.commit()
conn.row_factory = rf
conn.close()
def main():
countries, names = sanitize2tree(
tbl2dict(parse_csv('simplemaps_worldcities_basic.csv')))
for todel in [Path('simplemaps_worldcities_basic.json'), Path('simplemaps_worldcities_basic.db')]:
if todel.exists():
todel.unlink()
Path('simplemaps_worldcities_basic.json').write_text(
json.dumps(countries, sort_keys=True, indent=2))
Path('simplemaps_worldcities_basic_names.json').write_text(
json.dumps(sorted(list(names))))
populate_empty_sqlite(sqlite3.connect(
'simplemaps_worldcities_basic.db'), countries)
# print(countries)
# print(countries['Bahamas, The'])
if __name__ == '__main__':
main()