1414Usage Example (Academic/Research):
1515 # Basic usage with region NORTH_AMERICA
1616 df = spark.readStream.format("opensky").load()
17-
17+
1818 # With specific region and authentication
1919 df = spark.readStream.format("opensky") \
2020 .option("region", "EUROPE") \
3737 When using this data in research or publications, please cite:
3838 "The OpenSky Network, https://opensky-network.org"
3939
40- Author: Frank Munz, Databricks - Example Only, No Warranty
40+ Author: Frank Munz, Databricks - Example Only, No Warranty
4141Purpose: Educational Example / Academic Research Tool
4242Version: 1.0
4343Last Updated: July-2025
5656"Bringing Up OpenSky: A Large-scale ADS-B Sensor Network for Research".
5757In Proceedings of the 13th IEEE/ACM International Symposium on Information Processing in Sensor Networks (IPSN), pages 83-94, April 2014.
5858
59-
6059DISCLAIMER & LIABILITY:
6160This code is provided "AS IS" for educational purposes only. The author and Databricks make no warranties, express or implied, and disclaim all liability for any damages, losses, or issues arising from the use of this code. Users assume full responsibility for compliance with all applicable terms of service, laws, and regulations. Use at your own risk.
6261
6362For commercial use, contact OpenSky Network directly.
6463================================================================================
6564
66-
6765"""
6866
6967
@@ -106,34 +104,34 @@ class RateLimitError(OpenSkyAPIError):
106104 pass
107105
108106class OpenSkyStreamReader (SimpleDataSourceStreamReader ):
109-
107+
110108 DEFAULT_REGION = "NORTH_AMERICA"
111109 MIN_REQUEST_INTERVAL = 5.0 # seconds between requests
112110 ANONYMOUS_RATE_LIMIT = 100 # calls per day
113111 AUTHENTICATED_RATE_LIMIT = 4000 # calls per day
114112 MAX_RETRIES = 3
115113 RETRY_BACKOFF = 2
116114 RETRY_STATUS_CODES = [429 , 500 , 502 , 503 , 504 ]
117-
115+
118116 def __init__ (self , schema : StructType , options : Dict [str , str ]):
119117 super ().__init__ ()
120118 self .schema = schema
121119 self .options = options
122120 self .session = self ._create_session ()
123121 self .last_request_time = 0
124-
122+
125123 region_name = options .get ('region' , self .DEFAULT_REGION ).upper ()
126124 try :
127125 self .bbox = Region [region_name ].value
128126 except KeyError :
129127 print (f"Invalid region '{ region_name } '. Defaulting to { self .DEFAULT_REGION } ." )
130128 self .bbox = Region [self .DEFAULT_REGION ].value
131-
129+
132130 self .client_id = options .get ('client_id' )
133131 self .client_secret = options .get ('client_secret' )
134132 self .access_token = None
135133 self .token_expires_at = 0
136-
134+
137135 if self .client_id and self .client_secret :
138136 self ._get_access_token () # OAuth2 authentication
139137 self .rate_limit = self .AUTHENTICATED_RATE_LIMIT
@@ -145,23 +143,23 @@ def _get_access_token(self):
145143 current_time = time .time ()
146144 if self .access_token and current_time < self .token_expires_at :
147145 return # Token still valid
148-
146+
149147 token_url = "https://auth.opensky-network.org/auth/realms/opensky-network/protocol/openid-connect/token"
150148 data = {
151149 "grant_type" : "client_credentials" ,
152150 "client_id" : self .client_id ,
153151 "client_secret" : self .client_secret
154152 }
155-
153+
156154 try :
157155 response = requests .post (token_url , data = data , timeout = 10 )
158156 response .raise_for_status ()
159157 token_data = response .json ()
160-
158+
161159 self .access_token = token_data ["access_token" ]
162160 expires_in = token_data .get ("expires_in" , 1800 )
163161 self .token_expires_at = current_time + expires_in - 300
164-
162+
165163 except requests .exceptions .RequestException as e :
166164 raise OpenSkyAPIError (f"Failed to get access token: { str (e )} " )
167165
@@ -185,45 +183,45 @@ def _handle_rate_limit(self):
185183 """Ensure e MIN_REQUEST_INTERVAL seconds between requests"""
186184 current_time = time .time ()
187185 time_since_last_request = current_time - self .last_request_time
188-
186+
189187 if time_since_last_request < self .MIN_REQUEST_INTERVAL :
190188 sleep_time = self .MIN_REQUEST_INTERVAL - time_since_last_request
191189 time .sleep (sleep_time )
192-
190+
193191 self .last_request_time = time .time ()
194192
195193 def _fetch_states (self ) -> requests .Response :
196194 """Fetch states from OpenSky API with error handling"""
197195 self ._handle_rate_limit ()
198-
196+
199197 if self .client_id and self .client_secret :
200198 self ._get_access_token ()
201-
199+
202200 params = {
203201 'lamin' : self .bbox .lamin ,
204202 'lamax' : self .bbox .lamax ,
205203 'lomin' : self .bbox .lomin ,
206204 'lomax' : self .bbox .lomax
207205 }
208-
206+
209207 headers = {}
210208 if self .access_token :
211209 headers ['Authorization' ] = f'Bearer { self .access_token } '
212-
210+
213211 try :
214212 response = self .session .get (
215213 "https://opensky-network.org/api/states/all" ,
216214 params = params ,
217215 headers = headers ,
218216 timeout = 10
219217 )
220-
218+
221219 if response .status_code == 429 :
222220 raise RateLimitError ("API rate limit exceeded" )
223221 response .raise_for_status ()
224-
222+
225223 return response
226-
224+
227225 except requests .exceptions .RequestException as e :
228226 error_msg = f"API request failed: { str (e )} "
229227 if isinstance (e , requests .exceptions .Timeout ):
@@ -236,7 +234,7 @@ def valid_state(self, state: List) -> bool:
236234 """Validate state data"""
237235 if not state or len (state ) < 17 :
238236 return False
239-
237+
240238 return (state [0 ] is not None and # icao24
241239 state [5 ] is not None and # longitude
242240 state [6 ] is not None ) # latitude
@@ -282,39 +280,160 @@ def safe_bool(value: Any) -> Optional[bool]:
282280 def readBetweenOffsets (self , start : Dict [str , int ], end : Dict [str , int ]) -> Iterator [Tuple ]:
283281 data , _ = self .read (start )
284282 return iter (data )
285-
283+
286284 def read (self , start : Dict [str , int ]) -> Tuple [List [Tuple ], Dict [str , int ]]:
287285 """Read states with error handling and backoff"""
288286 try :
289287 response = self ._fetch_states ()
290288 data = response .json ()
291-
289+
292290 valid_states = [
293291 self .parse_state (s , data ['time' ])
294292 for s in data .get ('states' , [])
295293 if self .valid_state (s )
296294 ]
297-
295+
298296 return (
299297 valid_states ,
300298 {'last_fetch' : data .get ('time' , int (time .time ()))}
301299 )
302-
300+
303301 except OpenSkyAPIError as e :
304302 print (f"OpenSky API Error: { str (e )} " )
305303 return ([], start )
306304 except Exception as e :
307305 print (f"Unexpected error: { str (e )} " )
308306 return ([], start )
309307
308+
310309class OpenSkyDataSource (DataSource ):
310+ """
311+ Apache Spark DataSource for streaming real-time aircraft tracking data from OpenSky Network.
312+
313+ This data source provides access to live aircraft position, velocity, and flight data
314+ from the OpenSky Network's REST API (https://opensky-network.org/). The OpenSky Network
315+ is a community-based receiver network that collects air traffic surveillance data using
316+ ADS-B transponders and makes it available as open data for research and educational purposes.
317+
318+ The data source supports streaming aircraft state vectors including position coordinates,
319+ altitude, velocity, heading, call signs, and various flight status information for aircraft
320+ within configurable geographic regions.
321+
322+ Parameters
323+ ----------
324+ options : Dict[str, str], optional
325+ Configuration options for the data source. Supported options:
326+
327+ region : str, default "NORTH_AMERICA"
328+ Geographic region to collect data from. Valid options:
329+ - "EUROPE": European airspace (35°N-72°N, 25°W-45°E)
330+ - "NORTH_AMERICA": North American airspace (7°N-72°N, 168°W-60°W)
331+ - "SOUTH_AMERICA": South American airspace (56°S-15°N, 90°W-30°W)
332+ - "ASIA": Asian airspace (10°S-82°N, 45°E-180°E)
333+ - "AUSTRALIA": Australian airspace (50°S-10°S, 110°E-180°E)
334+ - "AFRICA": African airspace (35°S-37°N, 20°W-52°E)
335+ - "GLOBAL": Worldwide coverage (90°S-90°N, 180°W-180°E)
336+
337+ client_id : str, optional
338+ OAuth2 client ID for authenticated access. Increases rate limit from
339+ 100 to 4000 API calls per day. Requires corresponding client_secret.
340+
341+ client_secret : str, optional
342+ OAuth2 client secret for authenticated access. Must be provided when
343+ client_id is specified.
344+
345+ Examples
346+ --------
347+ Basic usage with default North America region:
348+
349+ >>> df = spark.readStream.format("opensky").load()
350+ >>> query = df.writeStream.format("console").start()
351+
352+ Specify a different region:
353+
354+ >>> df = spark.readStream.format("opensky") \\
355+ ... .option("region", "EUROPE") \\
356+ ... .load()
357+
358+ Authenticated access for higher rate limits:
359+
360+ >>> df = spark.readStream.format("opensky") \\
361+ ... .option("region", "ASIA") \\
362+ ... .option("client_id", "your_research_client_id") \\
363+ ... .option("client_secret", "your_research_client_secret") \\
364+ ... .load()
365+
366+ Process aircraft data with filtering:
367+
368+ >>> df = spark.readStream.format("opensky").load()
369+ >>> commercial_flights = df.filter(df.callsign.isNotNull() & (df.geo_altitude > 10000))
370+ >>> query = commercial_flights.writeStream.format("delta").option("path", "/tmp/flights").start()
371+
372+ Schema
373+ ------
374+ The returned DataFrame contains the following columns:
375+
376+ - time_ingest (TimestampType): When the data was ingested
377+ - icao24 (StringType): Unique ICAO 24-bit address of the aircraft
378+ - callsign (StringType): Flight number or aircraft call sign
379+ - origin_country (StringType): Country where aircraft is registered
380+ - time_position (TimestampType): Last position update timestamp
381+ - last_contact (TimestampType): Last time aircraft was seen
382+ - longitude (DoubleType): Aircraft longitude in decimal degrees
383+ - latitude (DoubleType): Aircraft latitude in decimal degrees
384+ - geo_altitude (DoubleType): Aircraft altitude above sea level in meters
385+ - on_ground (BooleanType): Whether aircraft is on ground
386+ - velocity (DoubleType): Ground speed in m/s
387+ - true_track (DoubleType): Track angle in degrees (0° = north)
388+ - vertical_rate (DoubleType): Climb/descent rate in m/s
389+ - sensors (ArrayType[IntegerType]): Sensor IDs that detected aircraft
390+ - baro_altitude (DoubleType): Barometric altitude in meters
391+ - squawk (StringType): Transponder squawk code
392+ - spi (BooleanType): Special Position Identification flag
393+ - category (IntegerType): Aircraft category (0-15)
394+
395+ Rate Limits
396+ -----------
397+ - Anonymous access: 100 API calls per day
398+ - Authenticated access: 4000 API calls per day (research accounts)
399+ - Data contributors: 8000 API calls per day
400+ - Minimum 5-second interval between requests
401+
402+ Raises
403+ ------
404+ ValueError
405+ If client_id is provided without client_secret, or if an invalid region is specified.
406+
407+ Notes
408+ -----
409+ - This data source is intended for academic research and educational purposes only
410+ - Commercial use requires explicit permission from OpenSky Network
411+ - Users must comply with OpenSky Network Terms of Use
412+ - All timestamps are in UTC timezone
413+ - Data may have gaps due to receiver coverage limitations
414+ - API rate limits are enforced automatically with exponential backoff
415+
416+ References
417+ ----------
418+ OpenSky Network: https://opensky-network.org/
419+ API Documentation: https://opensky-network.org/apidoc/
420+ Terms of Use: https://opensky-network.org/about/terms-of-use
421+
422+ Citation
423+ --------
424+ When using this data in research, please cite:
425+ Matthias Schäfer, Martin Strohmeier, Vincent Lenders, Ivan Martinovic and Matthias Wilhelm.
426+ "Bringing Up OpenSky: A Large-scale ADS-B Sensor Network for Research".
427+ In Proceedings of the 13th IEEE/ACM International Symposium on Information Processing
428+ in Sensor Networks (IPSN), pages 83-94, April 2014.
429+ """
311430 def __init__ (self , options : Dict [str , str ] = None ):
312431 super ().__init__ (options or {})
313432 self .options = options or {}
314-
433+
315434 if 'client_id' in self .options and not self .options .get ('client_secret' ):
316435 raise ValueError ("client_secret must be provided when client_id is set" )
317-
436+
318437 if 'region' in self .options and self .options ['region' ].upper () not in Region .__members__ :
319438 raise ValueError (f"Invalid region. Must be one of: { ', ' .join (Region .__members__ .keys ())} " )
320439
@@ -346,5 +465,3 @@ def schema(self) -> StructType:
346465
347466 def simpleStreamReader (self , schema : StructType ) -> OpenSkyStreamReader :
348467 return OpenSkyStreamReader (schema , self .options )
349-
350- spark .dataSource .register (OpenSkyDataSource )
0 commit comments