|
3 | 3 | import geocoder |
4 | 4 | from sqlalchemy import create_engine, MetaData, or_ |
5 | 5 | from sqlalchemy.orm import sessionmaker |
| 6 | +from time import sleep |
| 7 | +from copy import deepcopy |
6 | 8 |
|
7 | 9 |
|
8 | 10 | # attributes in entity details that might contain the full address in a single string |
|
16 | 18 | "country": ["country"], |
17 | 19 | "po_box": ["pob", "zipcode", "postal_code", "pob_postal_code"],} |
18 | 20 |
|
19 | | - |
20 | 21 | class GeoCodeEntities(object): |
21 | 22 |
|
| 23 | + PROVIDER_DEFAULT_PARAMS = { |
| 24 | + "timeout": 5, |
| 25 | + "sleep_seconds": 1 |
| 26 | + } |
| 27 | + |
| 28 | + # the processor will try each provider that has the env_vars in order up to the limit |
| 29 | + PROVIDERS = [ |
| 30 | + # google seems to provide the best results, so we try to use it first |
| 31 | + # https://developers.google.com/maps/documentation/geocoding/usage-limits |
| 32 | + # $0.50 USD / 1000 up to 100000 |
| 33 | + { |
| 34 | + "provider": "google", |
| 35 | + "env_vars": ["GOOGLE_API_KEY"], |
| 36 | + "limit": 50000 |
| 37 | + }, |
| 38 | + { |
| 39 | + "provider": "google", |
| 40 | + "env_vars": ["GOOGLE_CLIENT", "GOOGLE_CLIENT_SECRET"], |
| 41 | + "limit": 50000 |
| 42 | + }, |
| 43 | + # 1M limit of free geocode entities per year |
| 44 | + { |
| 45 | + "provider": "bing", |
| 46 | + "env_vars": ["BING_API_KEY"], |
| 47 | + "limit": 50000 |
| 48 | + }, |
| 49 | + # up 2500 daily free requests which it's worth to use |
| 50 | + # we could analyze the data later and decide which providers to use (we store the provider along with the lat / lng) |
| 51 | + { |
| 52 | + "provider": "google", |
| 53 | + "limit": 2500 |
| 54 | + }, |
| 55 | + # free and unlimited, but seems to provide lower quality results |
| 56 | + { |
| 57 | + "provider": "osm", |
| 58 | + "limit": 50000 |
| 59 | + } |
| 60 | + ] |
| 61 | + |
22 | 62 | def __init__(self, parameters, datapackage, resources): |
23 | 63 | self.parameters = parameters |
24 | 64 | self.datapackage = datapackage |
25 | 65 | self.resources = resources |
26 | 66 | self.locations_cache = {} |
| 67 | + # we will append providers that failed during this pipeline run here - to prevent retrying them again |
| 68 | + self.blacklist_providers = parameters.get("blacklist-providers", []) |
| 69 | + self.provider_request_counts = {provider["provider"]: 0 for provider in self.PROVIDERS} |
27 | 70 |
|
28 | 71 | @staticmethod |
29 | 72 | def get_schema(): |
30 | 73 | return {"fields": [{"name": "entity_id", "type": "string"}, |
31 | 74 | {"name": "location", "type": "string"}, |
32 | 75 | {"name": "lat", "type": "number"}, |
33 | | - {"name": "lng", "type": "number"}], |
| 76 | + {"name": "lng", "type": "number"}, |
| 77 | + {"name": "provider", "type": "string"}, |
| 78 | + {"name": "geojson", "type": "object"}], |
34 | 79 | "primaryKey": ["entity_id"]} |
35 | 80 |
|
36 | 81 | def initialize_db_session(self): |
@@ -66,54 +111,129 @@ def filter_resources(self): |
66 | 111 | else: |
67 | 112 | yield resource |
68 | 113 |
|
69 | | - def geocoder_google(self, location): |
70 | | - return geocoder.google(location) |
| 114 | + def geocoder_get(self, location, provider, timeout): |
| 115 | + return geocoder.get(location, provider=provider, session=self.requests_session, timeout=timeout) |
| 116 | + |
| 117 | + def get_entity_row(self, entity_id): |
| 118 | + if self.db_table is not None: |
| 119 | + rows = self.db_session.query(self.db_table).filter(self.db_table.c.entity_id==entity_id).all() |
| 120 | + if len(rows) > 0: |
| 121 | + return rows[0] |
| 122 | + return None |
| 123 | + |
| 124 | + def get_location_row(self, entity_id, entity_location): |
| 125 | + if self.db_table is not None: |
| 126 | + rows = self.db_session.query(self.db_table).filter(self.db_table.c.entity_id!=entity_id, |
| 127 | + self.db_table.c.location==entity_location, |
| 128 | + self.db_table.c.lat!=None, |
| 129 | + self.db_table.c.lng!=None).all() |
| 130 | + if len(rows) > 0: |
| 131 | + return rows[0] |
| 132 | + return None |
| 133 | + |
| 134 | + def is_update_needed(self, entity_row, location): |
| 135 | + if entity_row is None: |
| 136 | + # new entity - not previously geocoded, will be inserted |
| 137 | + return True |
| 138 | + else: |
| 139 | + # entity matched by entity_id to existing row in entities_geocode table |
| 140 | + # need to determine if we should update this entity's geocode data or not |
| 141 | + if entity_row.location and not location: |
| 142 | + # previously had a location, now doesn't - should update it in DB |
| 143 | + return True |
| 144 | + elif entity_row.location != location: |
| 145 | + # location changed for this entity |
| 146 | + return True |
| 147 | + else: |
| 148 | + # no change is needed |
| 149 | + return False |
| 150 | + |
| 151 | + def warn_once(self, msg): |
| 152 | + if not hasattr(self, "_warn_once_msgs"): |
| 153 | + self._warn_once_msgs = [] |
| 154 | + if msg not in self._warn_once_msgs: |
| 155 | + self._warn_once_msgs.append(msg) |
| 156 | + logging.warning(msg) |
71 | 157 |
|
72 | | - def get_entity_location_details_from_db(self, session, table, entity_id, location): |
73 | | - got_entity_row, old_location, location_lat, location_lng = False, None, None, None |
74 | | - if table is not None: |
75 | | - if location: |
76 | | - rows = session.query(table).filter(or_(table.c.entity_id == entity_id, table.c.location == location)) |
| 158 | + def geocode(self, location): |
| 159 | + for provider_params in self.PROVIDERS: |
| 160 | + provider = deepcopy(self.PROVIDER_DEFAULT_PARAMS) |
| 161 | + provider.update(provider_params) |
| 162 | + geocode_provider = provider["provider"] |
| 163 | + if geocode_provider in self.blacklist_providers: |
| 164 | + self.warn_once("provider {} is blacklisted".format(geocode_provider)) |
| 165 | + elif self.provider_request_counts[geocode_provider] >= provider["limit"]: |
| 166 | + self.warn_once("provider {} reached limit".format(geocode_provider)) |
| 167 | + elif "" in [os.environ.get(env_var, "") for env_var in provider.get("env_vars", [])]: |
| 168 | + self.warn_once("provider {} requires environment variables {}".format(geocode_provider, provider["env_vars"])) |
77 | 169 | else: |
78 | | - rows = session.query(table).filter(or_(table.c.entity_id == entity_id)) |
79 | | - got_location_row = False |
80 | | - for row in rows: |
81 | | - if row.entity_id == entity_id: |
82 | | - old_location = row.location |
83 | | - got_entity_row = True |
84 | | - if row.location == location: |
85 | | - location_lat, location_lng = row.lat, row.lng |
86 | | - got_location_row = True |
87 | | - if not got_entity_row and not got_location_row: |
88 | | - raise Exception("Unexpected row: {}".format(row)) |
89 | | - return got_entity_row, old_location, location_lat, location_lng |
| 170 | + # valid provider |
| 171 | + if provider["sleep_seconds"] > 0: |
| 172 | + sleep(provider["sleep_seconds"]) |
| 173 | + self.provider_request_counts[geocode_provider] += 1 |
| 174 | + try: |
| 175 | + g = self.geocoder_get(location, geocode_provider, provider["timeout"]) |
| 176 | + except Exception: |
| 177 | + logging.exception("geocoding exception, blacklist this provider") |
| 178 | + self.blacklist_providers.append(geocode_provider) |
| 179 | + else: |
| 180 | + if g.ok and g.confidence > 0: |
| 181 | + # got valid geo data |
| 182 | + return g.lat, g.lng, geocode_provider, g.geojson |
| 183 | + else: |
| 184 | + self.warn_once("couldn't get any geo data for provider {}, will not try again".format(geocode_provider)) |
| 185 | + # return the response and provider to allow to inspect it later in DB |
| 186 | + return None, None, geocode_provider, g.geojson |
| 187 | + self.warn_once("exhausted all providers, couldn't find any geo data") |
| 188 | + return None |
90 | 189 |
|
91 | 190 | def filter_resource(self, resource): |
92 | | - session = self.initialize_db_session() |
93 | | - meta = MetaData(bind=session.connection()) |
94 | | - meta.reflect() |
95 | | - table = meta.tables.get(self.parameters["geo-table"]) |
| 191 | + self.db_session = self.initialize_db_session() |
| 192 | + self.db_meta = MetaData(bind=self.db_session.connection()) |
| 193 | + self.db_meta.reflect() |
| 194 | + self.db_table = self.db_meta.tables.get(self.parameters["geo-table"]) |
| 195 | + self.requests_session = requests.session() |
96 | 196 | for row in resource: |
97 | | - entity_id, location = row["id"], self.get_location_string(row["details"]) |
98 | | - has_row, old_location, lat, lng = self.get_entity_location_details_from_db(session, table, entity_id, location) |
99 | | - if (not has_row # new entity - not geocoded, will be inserted |
100 | | - or (has_row and not location and old_location) # previously had a location, now doesn't |
101 | | - or (has_row and location != old_location)): # location changed |
102 | | - # need to update DB |
103 | | - if location and (not lat or not lng): |
104 | | - if location in self.locations_cache: |
105 | | - lat, lng = self.locations_cache[location] |
106 | | - else: |
107 | | - # new location, need to geocode |
108 | | - g = self.geocoder_google(location) |
109 | | - if g.ok: |
110 | | - lat, lng = g.latlng |
111 | | - else: |
112 | | - lat, lng = None, None |
113 | | - self.locations_cache[location] = lat, lng |
114 | | - # only yield items which need to be updated in DB |
115 | | - yield {"entity_id": entity_id, "location": location, |
116 | | - "lat": lat, "lng": lng} |
| 197 | + entity_id = row["id"] |
| 198 | + entity_location = self.get_location_string(row["details"]) |
| 199 | + if entity_location: |
| 200 | + entity_row = self.get_entity_row(entity_id) |
| 201 | + if self.is_update_needed(entity_row, entity_location): |
| 202 | + row = self.get_row(entity_id, entity_location) |
| 203 | + if row: |
| 204 | + yield row |
| 205 | + else: |
| 206 | + logging.info("no update needed: {}".format(entity_id)) |
| 207 | + self.requests_session.close() |
| 208 | + |
| 209 | + def get_row(self, entity_id, entity_location): |
| 210 | + # get DB row for this entity id and location |
| 211 | + # does the geocoding if needed |
| 212 | + location_cache = self.locations_cache.get(entity_location) |
| 213 | + if location_cache: |
| 214 | + # location was already processed in current run, we can reuse the result |
| 215 | + lat, lng, provider, geojson = location_cache |
| 216 | + else: |
| 217 | + lat, lng, provider, geojson = None, None, None, {} |
| 218 | + # check if there is another entity with the same location in DB |
| 219 | + # in that case we might not need to geocode again |
| 220 | + location_row = self.get_location_row(entity_id, entity_location) |
| 221 | + if location_row is not None: |
| 222 | + lat, lng = location_row.lat, location_row.lng |
| 223 | + provider, geojson = location_row.provider, location_row.geojson |
| 224 | + else: |
| 225 | + # need to geocode |
| 226 | + res = self.geocode(entity_location) |
| 227 | + if res is None: |
| 228 | + # this means all providers were exhausted, we skip those rows and don't cache or yield them |
| 229 | + return None |
| 230 | + else: |
| 231 | + lat, lng, provider, geojson = res |
| 232 | + # store the result in internal cache - whether it's from geocode or from DB |
| 233 | + self.locations_cache[entity_location] = lat, lng, provider, geojson |
| 234 | + return {"entity_id": entity_id, "location": entity_location, |
| 235 | + "lat": lat, "lng": lng, "provider": provider, |
| 236 | + "geojson": geojson} |
117 | 237 |
|
118 | 238 |
|
119 | 239 | if __name__ == "__main__": |
|
0 commit comments