196 lines
7.2 KiB
Python
Executable File
196 lines
7.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# -*- encoding: utf-8 -*-
|
|
|
|
from typing import Any
|
|
from typing import List
|
|
from typing import Dict
|
|
from typing import Union
|
|
from pathlib import Path
|
|
import sqlite3
|
|
import json
|
|
import csv
|
|
|
|
CAPITAL_LVL = ['', 'minor', 'admin', 'primary']
|
|
CAPITAL_SEQ = ['City', 'District', 'State/Province', 'Country']
|
|
|
|
|
|
def parse_csv(fn: Union[str, Path]) -> List[List[str]]:
|
|
with open(str(fn)) as f:
|
|
r = csv.reader(f)
|
|
return [list(row) for row in r]
|
|
|
|
|
|
def tbl2dict(tbl: List[List[str]]) -> List[Dict[str, str]]:
|
|
return [dict(zip(tbl[0], row)) for row in tbl[1:]]
|
|
|
|
|
|
def sanitize2tree(tbl: List[Dict[str, str]]) -> Dict[str, Union[Any, Dict[str, Union[Any, Dict[str, Any]]]]]:
|
|
for entry in tbl:
|
|
entry['lat'] = float(entry['lat'])
|
|
entry['lng'] = float(entry['lng'])
|
|
entry['capital'] = CAPITAL_LVL.index(entry['capital'])
|
|
del entry['population']
|
|
del entry['id']
|
|
names = set()
|
|
countries = dict()
|
|
for entry in tbl:
|
|
if entry['country'] not in countries:
|
|
countries[entry['country']] = {
|
|
'name': entry['country'],
|
|
'iso2': entry['iso2'],
|
|
'iso3': entry['iso3'],
|
|
'states': dict(),
|
|
'capital': None,
|
|
}
|
|
names.add(entry['country'])
|
|
names.add(entry['iso2'])
|
|
names.add(entry['iso3'])
|
|
country = countries[entry['country']]
|
|
states = country['states']
|
|
if entry['admin_name'] not in states:
|
|
states[entry['admin_name']] = {
|
|
'name': entry['admin_name'],
|
|
'cities': dict(),
|
|
'capital': None,
|
|
}
|
|
names.add(entry['admin_name'])
|
|
state = states[entry['admin_name']]
|
|
cities = state['cities']
|
|
if entry['city'] not in cities:
|
|
cities[entry['city']] = {
|
|
'name': entry['city'],
|
|
'name_ascii': entry['city_ascii'],
|
|
'lat': entry['lat'],
|
|
'lng': entry['lng'],
|
|
'capital_of': CAPITAL_SEQ[entry['capital']],
|
|
'capital_of_#': entry['capital'],
|
|
}
|
|
names.add(entry['city'])
|
|
names.add(entry['city_ascii'])
|
|
city = cities[entry['city']]
|
|
if city['capital_of_#'] >= 2:
|
|
state['capital'] = city['name']
|
|
if city['capital_of_#'] == 3:
|
|
country['capital'] = state['name']
|
|
names.add(city['name'])
|
|
names.add(state['name'])
|
|
names.add(country['name'])
|
|
for country in countries.values():
|
|
country_lats = list()
|
|
country_lngs = list()
|
|
if country['capital'] not in country['states']:
|
|
country['capital'] = None
|
|
if country['capital'] is None and len(country['states']) == 1:
|
|
country['capital'] = list(country['states'].keys())[0]
|
|
for state in country['states'].values():
|
|
state_lats = list()
|
|
state_lngs = list()
|
|
if state['capital'] not in state['cities']:
|
|
state['capital'] = None
|
|
if state['capital'] is None and len(state['cities']) == 1:
|
|
state['capital'] = list(state['cities'].keys())[0]
|
|
city = state['cities'][state['capital']]
|
|
city['capital_of_#'] = 2
|
|
if country['capital'] == state['name']:
|
|
city['capital_of_#'] = 3
|
|
city['capital_of'] = CAPITAL_SEQ[entry['capital']]
|
|
for city in state['cities'].values():
|
|
country_lats.append(city['lat'])
|
|
country_lngs.append(city['lng'])
|
|
state_lats.append(city['lat'])
|
|
state_lngs.append(city['lng'])
|
|
state['lat'] = round((max(state_lats)+min(state_lats))/2, 6)
|
|
state['lng'] = round((max(state_lngs)+min(state_lngs))/2, 6)
|
|
country['lat'] = round((max(country_lats)+min(country_lats))/2, 6)
|
|
country['lng'] = round((max(country_lngs)+min(country_lngs))/2, 6)
|
|
return countries, names
|
|
|
|
|
|
def populate_empty_sqlite(conn, countries):
|
|
rf = conn.row_factory
|
|
conn.row_factory = sqlite3.Row
|
|
cur = conn.cursor()
|
|
cur.execute('''CREATE TABLE cities (
|
|
parent_id INT,
|
|
name VARCHAR(255),
|
|
name_ascii VARCHAR(255),
|
|
capital_of VARCHAR(255),
|
|
capital_of_no INT,
|
|
lat DOUBLE,
|
|
lng DOUBLE,
|
|
FOREIGN KEY(parent_id) REFERENCES states(rowid)
|
|
);''')
|
|
cur.execute('''CREATE TABLE states (
|
|
parent_id INT,
|
|
name VARCHAR(255),
|
|
capital VARCHAR(255),
|
|
capital_id INTEGER,
|
|
lat DOUBLE,
|
|
lng DOUBLE,
|
|
FOREIGN KEY(parent_id) REFERENCES countries(rowid),
|
|
FOREIGN KEY(capital_id) REFERENCES cities(rowid)
|
|
);''')
|
|
cur.execute('''CREATE TABLE countries (
|
|
name VARCHAR(255),
|
|
iso2 VARCHAR(2),
|
|
iso3 VARCHAR(3),
|
|
capital VARCHAR(255),
|
|
capital_id INTEGER,
|
|
lat DOUBLE,
|
|
lng DOUBLE,
|
|
FOREIGN KEY(capital_id) REFERENCES states(rowid)
|
|
);''')
|
|
for country in countries.values():
|
|
country_id = cur.execute(
|
|
'''INSERT INTO countries(name,iso2,iso3,capital,lat,lng) VALUES (?,?,?,?,?,?)''',
|
|
tuple(
|
|
[country[key] for key in 'name,iso2,iso3,capital,lat,lng'.split(',')]
|
|
)
|
|
).lastrowid
|
|
for state in country['states'].values():
|
|
state_id = cur.execute(
|
|
'''INSERT INTO states(parent_id,name,capital,lat,lng) VALUES (?,?,?,?,?)''',
|
|
tuple(
|
|
[country_id] +
|
|
[state[key] for key in 'name,capital,lat,lng'.split(',')]
|
|
)
|
|
).lastrowid
|
|
for city in state['cities'].values():
|
|
city_id = cur.execute(
|
|
'''INSERT INTO cities(parent_id,name,name_ascii,capital_of,capital_of_no,lat,lng) VALUES (?,?,?,?,?,?,?)''',
|
|
tuple(
|
|
[state_id] +
|
|
[city[key] for key in 'name,name_ascii,capital_of,capital_of_#,lat,lng'.split(',')]
|
|
)
|
|
).lastrowid
|
|
if city['capital_of_#'] >= 2:
|
|
cur.execute(
|
|
'UPDATE states SET capital_id=? WHERE rowid=?', (city_id, state_id))
|
|
if city['capital_of_#'] >= 3:
|
|
cur.execute(
|
|
'UPDATE countries SET capital_id=? WHERE rowid=?', (state_id, country_id))
|
|
cur.close()
|
|
conn.commit()
|
|
conn.row_factory = rf
|
|
conn.close()
|
|
|
|
|
|
def main():
|
|
countries, names = sanitize2tree(
|
|
tbl2dict(parse_csv('simplemaps_worldcities_basic.csv')))
|
|
for todel in [Path('simplemaps_worldcities_basic.json'), Path('simplemaps_worldcities_basic.db')]:
|
|
if todel.exists():
|
|
todel.unlink()
|
|
Path('simplemaps_worldcities_basic.json').write_text(
|
|
json.dumps(countries, sort_keys=True, indent=2))
|
|
Path('simplemaps_worldcities_basic_names.json').write_text(
|
|
json.dumps(sorted(list(names))))
|
|
populate_empty_sqlite(sqlite3.connect(
|
|
'simplemaps_worldcities_basic.db'), countries)
|
|
# print(countries)
|
|
# print(countries['Bahamas, The'])
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|