From 7dbfd3491eac50d5b702e42552cb4f9b2e57a0a3 Mon Sep 17 00:00:00 2001 From: Andrew Rodgers Date: Sat, 5 Apr 2025 16:17:08 -0500 Subject: [PATCH 01/14] This is tested at one site --- services/core/PlatformDriverAgent/README.md | 3 + .../platform_driver/agent.py | 2 +- .../platform_driver/interfaces/bacnet.py | 142 ++++++++++-------- 3 files changed, 84 insertions(+), 63 deletions(-) diff --git a/services/core/PlatformDriverAgent/README.md b/services/core/PlatformDriverAgent/README.md index 6a8642656f..5b1d860895 100644 --- a/services/core/PlatformDriverAgent/README.md +++ b/services/core/PlatformDriverAgent/README.md @@ -77,6 +77,9 @@ to the device. Heart beats are triggered by the Actuator Agent which must be run ## Changes +- 4.5.0 (2025-04-05) + - Moved batching to driver for better control + - 4.4.2 (2024-12-04) - updated ethernetip driver to import from new platform_driver module - updated rainforest eagle driver with error handling during configuration diff --git a/services/core/PlatformDriverAgent/platform_driver/agent.py b/services/core/PlatformDriverAgent/platform_driver/agent.py index accade41fd..cb32723c79 100644 --- a/services/core/PlatformDriverAgent/platform_driver/agent.py +++ b/services/core/PlatformDriverAgent/platform_driver/agent.py @@ -58,7 +58,7 @@ utils.setup_logging() _log = logging.getLogger(__name__) -__version__ = '4.4.3' +__version__ = '4.5.0' PROMETHEUS_METRICS_FILE = "/opt/packages/prometheus_exporter/scrape_files/scrape_metrics.prom" diff --git a/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py b/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py index 44b7b88a0a..9fca399772 100644 --- a/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py +++ b/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py @@ -64,6 +64,7 @@ "binaryValue": bool, "binaryInput": bool, "binaryOutput": bool, + "schedule": bool, } @@ -112,7 +113,9 @@ def configure(self, config_dict, registry_config_str): self.max_per_request = config_dict.get("max_per_request", 24) self.use_read_multiple = config_dict.get("use_read_multiple", True) self.timeout = float(config_dict.get("timeout", 30.0)) - self.failover_bacnet_to_single = bool(config_dict.get("failover_bacnet_to_single", False)) + self.failover_bacnet_to_single = bool( + config_dict.get("failover_bacnet_to_single", True) + ) self.ping_retry_interval = timedelta( seconds=config_dict.get("ping_retry_interval", 5.0) @@ -212,6 +215,7 @@ def set_point(self, point_name, value, priority=None): def scrape_all(self): # TODO: support reading from an array. point_map = {} + point_names = [] read_registers = self.get_registers_by_type("byte", True) write_registers = self.get_registers_by_type("byte", False) @@ -222,6 +226,7 @@ def scrape_all(self): self.enable_collection = True for register in read_registers + write_registers: + point_names.append(register.point_name) point_map[register.point_name] = [ register.object_type, register.instance_number, @@ -229,70 +234,83 @@ def scrape_all(self): register.index, ] - while True: - try: - result = self.vip.rpc.call( - self.proxy_address, - "read_properties", - self.target_address, - point_map, - self.max_per_request, - self.use_read_multiple, - ).get(timeout=180) - - _log.debug(f"found {len(result)} results in platform driver") - except gevent.timeout.Timeout as exc: - _log.error(f"Timed out reading target {self.target_address}") - raise exc - except RemoteError as exc: - if "unknownProperty" in exc.message: - _log.debug(f"unknownProperty error: {exc.message}") - # self.vip.config.set("unknown_properties", exc.message) - if "noResponse" in exc.message and self.use_read_multiple and self.failover_bacnet_to_single is True: - _log.warning( - f"device {self.target_address} did not respond reading multiple" - ) - self.use_read_multiple = False - continue - elif "noResponse" in exc.message and not self.use_read_multiple: - # disable device for collection - self.enable_collection = False - break - if "segmentationNotSupported" in exc.message: - if self.max_per_request <= 1: + results = [] + for i in range(0, len(point_names), self.max_per_request): + use_read_multiple = self.use_read_multiple + batch = { + key: point_map[key] for key in point_names[i : i + self.max_per_request] + } + while True: + try: + batch_result = self.vip.rpc.call( + self.proxy_address, + "read_properties", + self.target_address, + batch, + self.max_per_request, + use_read_multiple, + ).get(timeout=180) + _log.debug(f"found {len(batch_result)} results in platform driver") + results.append(batch_result) + except gevent.timeout.Timeout as exc: + _log.error(f"Timed out reading target {self.target_address}") + raise exc + except RemoteError as exc: + if "unknownProperty" in exc.message: + _log.debug(f"unknownProperty error: {exc.message}") + # self.vip.config.set("unknown_properties", exc.message) + if ( + "noResponse" in exc.message + and self.use_read_multiple + and self.failover_bacnet_to_single is True + ): + _log.warning( + f"device {self.target_address} did not respond reading multiple" + ) + use_read_multiple = False + continue + elif "noResponse" in exc.message and not self.use_read_multiple: + # disable device for collection + self.enable_collection = False + break + if "segmentationNotSupported" in exc.message: + if self.max_per_request <= 1: + _log.error( + "Receiving a segmentationNotSupported error with 'max_per_request' setting of 1." + ) + raise + self.register_count_divisor += 1 + self.max_per_request = max( + int(self.register_count / self.register_count_divisor), 1 + ) + _log.info( + "Device requires a lower max_per_request setting. Trying: " + + str(self.max_per_request) + ) + continue + elif ( + exc.message.endswith("rejected the request: 9") + and self.use_read_multiple + ): + _log.info( + "Device rejected request with 'unrecognized-service' error, attempting to access with use_read_multiple false" + ) + self.use_read_multiple = False + continue + else: + trace = traceback.format_exc() _log.error( - "Receiving a segmentationNotSupported error with 'max_per_request' setting of 1." + f"Error reading target {self.target_address}: {trace}" ) - raise - self.register_count_divisor += 1 - self.max_per_request = max( - int(self.register_count / self.register_count_divisor), 1 - ) - _log.info( - "Device requires a lower max_per_request setting. Trying: " - + str(self.max_per_request) - ) - continue - elif ( - exc.message.endswith("rejected the request: 9") - and self.use_read_multiple - ): - _log.info( - "Device rejected request with 'unrecognized-service' error, attempting to access with use_read_multiple false" - ) - self.use_read_multiple = False - continue + raise exc + except errors.Unreachable: + # If the Proxy is not running bail. + _log.warning("Unable to reach BACnet proxy.") + self.schedule_ping() + raise else: - trace = traceback.format_exc() - _log.error(f"Error reading target {self.target_address}: {trace}") - raise exc - except errors.Unreachable: - # If the Proxy is not running bail. - _log.warning("Unable to reach BACnet proxy.") - self.schedule_ping() - raise - else: - break + break + result = {k: v for d in results for k, v in d.items()} _log.debug(f"{self.target_address=}") return result From 098c1994aeaedf8af7947da0313549d491de48bf Mon Sep 17 00:00:00 2001 From: Andrew Rodgers Date: Sat, 5 Apr 2025 16:48:34 -0500 Subject: [PATCH 02/14] Added failing point cache --- services/core/PlatformDriverAgent/README.md | 5 +- .../platform_driver/agent.py | 421 ++++++++++++------ .../platform_driver/driver.py | 303 +++++++------ .../platform_driver/interfaces/bacnet.py | 9 + 4 files changed, 479 insertions(+), 259 deletions(-) diff --git a/services/core/PlatformDriverAgent/README.md b/services/core/PlatformDriverAgent/README.md index 5b1d860895..9e4f2fe2f1 100644 --- a/services/core/PlatformDriverAgent/README.md +++ b/services/core/PlatformDriverAgent/README.md @@ -76,9 +76,10 @@ to the device. Heart beats are triggered by the Actuator Agent which must be run ## Changes - +- 4.5.1 (2025-04-05) + - Added failing point cache with one hour default timeout - 4.5.0 (2025-04-05) - - Moved batching to driver for better control + - Moved batching to driver for better control - 4.4.2 (2024-12-04) - updated ethernetip driver to import from new platform_driver module diff --git a/services/core/PlatformDriverAgent/platform_driver/agent.py b/services/core/PlatformDriverAgent/platform_driver/agent.py index cb32723c79..d45caff87b 100644 --- a/services/core/PlatformDriverAgent/platform_driver/agent.py +++ b/services/core/PlatformDriverAgent/platform_driver/agent.py @@ -41,7 +41,13 @@ import gevent from collections import defaultdict -from prometheus_client import CollectorRegistry, Gauge, Counter, Histogram, write_to_textfile +from prometheus_client import ( + CollectorRegistry, + Gauge, + Counter, + Histogram, + write_to_textfile, +) from volttron.platform.vip.agent import Agent, RPC from volttron.platform.agent import utils from volttron.platform.agent import math_utils @@ -58,13 +64,17 @@ utils.setup_logging() _log = logging.getLogger(__name__) -__version__ = '4.5.0' +__version__ = "4.5.1" -PROMETHEUS_METRICS_FILE = "/opt/packages/prometheus_exporter/scrape_files/scrape_metrics.prom" +PROMETHEUS_METRICS_FILE = ( + "/opt/packages/prometheus_exporter/scrape_files/scrape_metrics.prom" +) + class OverrideError(DriverInterfaceError): """Error raised when the user tries to set/revert point when global override is set.""" + pass @@ -80,39 +90,46 @@ def get_config(name, default=None): # Increase open files resource limit to max or 8192 if unlimited system_socket_limit = None - + try: soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE) except OSError: - _log.exception('error getting open file limits') + _log.exception("error getting open file limits") else: if soft != hard and soft != resource.RLIM_INFINITY: try: system_socket_limit = 8192 if hard == resource.RLIM_INFINITY else hard resource.setrlimit(resource.RLIMIT_NOFILE, (system_socket_limit, hard)) except OSError: - _log.exception('error setting open file limits') + _log.exception("error setting open file limits") else: - _log.debug('open file resource limit increased from %d to %d', - soft, system_socket_limit) + _log.debug( + "open file resource limit increased from %d to %d", + soft, + system_socket_limit, + ) if soft == hard: system_socket_limit = soft - max_open_sockets = get_config('max_open_sockets', None) + max_open_sockets = get_config("max_open_sockets", None) # TODO: update the default after scalability testing. - max_concurrent_publishes = get_config('max_concurrent_publishes', 10000) + max_concurrent_publishes = get_config("max_concurrent_publishes", 10000) - driver_config_list = get_config('driver_config_list') - - scalability_test = get_config('scalability_test', False) - scalability_test_iterations = get_config('scalability_test_iterations', 3) + driver_config_list = get_config("driver_config_list") - driver_scrape_interval = get_config('driver_scrape_interval', 0.02) + scalability_test = get_config("scalability_test", False) + scalability_test_iterations = get_config("scalability_test_iterations", 3) + + driver_scrape_interval = get_config("driver_scrape_interval", 0.02) if config.get("driver_config_list") is not None: - _log.warning("Platform driver configured with old setting. This is no longer supported.") - _log.warning('Use the script "scripts/update_platform_driver_config.py" to convert the configuration.') + _log.warning( + "Platform driver configured with old setting. This is no longer supported." + ) + _log.warning( + 'Use the script "scripts/update_platform_driver_config.py" to convert the configuration.' + ) publish_depth_first_all = bool(get_config("publish_depth_first_all", True)) publish_breadth_first_all = bool(get_config("publish_breadth_first_all", False)) @@ -121,33 +138,41 @@ def get_config(name, default=None): group_offset_interval = get_config("group_offset_interval", 0.0) - return PlatformDriverAgent(driver_config_list, scalability_test, - scalability_test_iterations, - driver_scrape_interval, - group_offset_interval, - max_open_sockets, - max_concurrent_publishes, - system_socket_limit, - publish_depth_first_all, - publish_breadth_first_all, - publish_depth_first, - publish_breadth_first, - heartbeat_autostart=True, **kwargs) + return PlatformDriverAgent( + driver_config_list, + scalability_test, + scalability_test_iterations, + driver_scrape_interval, + group_offset_interval, + max_open_sockets, + max_concurrent_publishes, + system_socket_limit, + publish_depth_first_all, + publish_breadth_first_all, + publish_depth_first, + publish_breadth_first, + heartbeat_autostart=True, + **kwargs, + ) class PlatformDriverAgent(Agent): - def __init__(self, driver_config_list, scalability_test = False, - scalability_test_iterations=3, - driver_scrape_interval=0.02, - group_offset_interval=0.0, - max_open_sockets=None, - max_concurrent_publishes=10000, - system_socket_limit=None, - publish_depth_first_all=True, - publish_breadth_first_all=False, - publish_depth_first=False, - publish_breadth_first=False, - **kwargs): + def __init__( + self, + driver_config_list, + scalability_test=False, + scalability_test_iterations=3, + driver_scrape_interval=0.02, + group_offset_interval=0.0, + max_open_sockets=None, + max_concurrent_publishes=10000, + system_socket_limit=None, + publish_depth_first_all=True, + publish_breadth_first_all=False, + publish_depth_first=False, + publish_breadth_first=False, + **kwargs, + ): super(PlatformDriverAgent, self).__init__(**kwargs) self.instances = {} self.scalability_test = scalability_test @@ -172,12 +197,55 @@ def __init__(self, driver_config_list, scalability_test = False, self.unresponsive_devices = {} self.collector_registry = CollectorRegistry() - new_buckets = (.005, .01, .025, .05, .075, .1, .25, .5, .75, 1.0, 2.5, 5.0, 7.5, 10.0, 30, float("inf")) - self.performance_histogram = Histogram("device_scrape_time_histogram", "Time taken to scrape given device - histogram", ['device'], registry=self.collector_registry, buckets=new_buckets) - self.performance_gauge = Gauge("device_scrape_time", "Time taken to scrape device", ['device'], registry=self.collector_registry) - self.error_counter = Counter("device_error_count", "Number of errors per device", ['device'], registry=self.collector_registry) - self.failed_point_scrape = Counter("failed_point_scrape", "Failed scrape for existing point", ['point', 'device'], registry=self.collector_registry) - self.point_count = Gauge("point_count", "Number of points per device", ['device'], registry=self.collector_registry) + new_buckets = ( + 0.005, + 0.01, + 0.025, + 0.05, + 0.075, + 0.1, + 0.25, + 0.5, + 0.75, + 1.0, + 2.5, + 5.0, + 7.5, + 10.0, + 30, + float("inf"), + ) + self.performance_histogram = Histogram( + "device_scrape_time_histogram", + "Time taken to scrape given device - histogram", + ["device"], + registry=self.collector_registry, + buckets=new_buckets, + ) + self.performance_gauge = Gauge( + "device_scrape_time", + "Time taken to scrape device", + ["device"], + registry=self.collector_registry, + ) + self.error_counter = Counter( + "device_error_count", + "Number of errors per device", + ["device"], + registry=self.collector_registry, + ) + self.failed_point_scrape = Counter( + "failed_point_scrape", + "Failed scrape for existing point", + ["point", "device"], + registry=self.collector_registry, + ) + self.point_count = Gauge( + "point_count", + "Number of points per device", + ["device"], + registry=self.collector_registry, + ) self.publish_depth_first_all = bool(publish_depth_first_all) self.publish_breadth_first_all = bool(publish_breadth_first_all) @@ -195,23 +263,31 @@ def __init__(self, driver_config_list, scalability_test = False, self.test_results = [] self.current_test_start = None - self.default_config = {"scalability_test": scalability_test, - "scalability_test_iterations": scalability_test_iterations, - "max_open_sockets": max_open_sockets, - "max_concurrent_publishes": max_concurrent_publishes, - "driver_scrape_interval": self.driver_scrape_interval, - "group_offset_interval": self.group_offset_interval, - "publish_depth_first_all": self.publish_depth_first_all, - "publish_breadth_first_all": self.publish_breadth_first_all, - "publish_depth_first": self.publish_depth_first, - "publish_breadth_first": self.publish_breadth_first} + self.default_config = { + "scalability_test": scalability_test, + "scalability_test_iterations": scalability_test_iterations, + "max_open_sockets": max_open_sockets, + "max_concurrent_publishes": max_concurrent_publishes, + "driver_scrape_interval": self.driver_scrape_interval, + "group_offset_interval": self.group_offset_interval, + "publish_depth_first_all": self.publish_depth_first_all, + "publish_breadth_first_all": self.publish_breadth_first_all, + "publish_depth_first": self.publish_depth_first, + "publish_breadth_first": self.publish_breadth_first, + } self.vip.config.set_default("config", self.default_config) - self.vip.config.subscribe(self.configure_main, actions=["NEW", "UPDATE"], pattern="config") - self.vip.config.subscribe(self.update_driver, actions=["NEW", "UPDATE"], pattern="devices/*") - self.vip.config.subscribe(self.remove_driver, actions="DELETE", pattern="devices/*") + self.vip.config.subscribe( + self.configure_main, actions=["NEW", "UPDATE"], pattern="config" + ) + self.vip.config.subscribe( + self.update_driver, actions=["NEW", "UPDATE"], pattern="devices/*" + ) + self.vip.config.subscribe( + self.remove_driver, actions="DELETE", pattern="devices/*" + ) # self.vip.pubsub.subscribe(peer="pubsub", callback=self.add_unresponsive_bacnet_device, prefix="errors/bacnet") - + @Core.periodic(10) def flush_metrics(self): if self.last_written < self.last_scraped: @@ -228,28 +304,43 @@ def configure_main(self, config_name, action, contents): if self.max_open_sockets is not None: max_open_sockets = int(self.max_open_sockets) configure_socket_lock(max_open_sockets) - _log.info("maximum concurrently open sockets limited to " + str(max_open_sockets)) + _log.info( + "maximum concurrently open sockets limited to " + + str(max_open_sockets) + ) elif self.system_socket_limit is not None: max_open_sockets = int(self.system_socket_limit * 0.8) - _log.info("maximum concurrently open sockets limited to " + str(max_open_sockets) + - " (derived from system limits)") + _log.info( + "maximum concurrently open sockets limited to " + + str(max_open_sockets) + + " (derived from system limits)" + ) configure_socket_lock(max_open_sockets) else: configure_socket_lock() - _log.warning("No limit set on the maximum number of concurrently open sockets. " - "Consider setting max_open_sockets if you plan to work with 800+ modbus devices.") + _log.warning( + "No limit set on the maximum number of concurrently open sockets. " + "Consider setting max_open_sockets if you plan to work with 800+ modbus devices." + ) - self.max_concurrent_publishes = config['max_concurrent_publishes'] + self.max_concurrent_publishes = config["max_concurrent_publishes"] max_concurrent_publishes = int(self.max_concurrent_publishes) if max_concurrent_publishes < 1: - _log.warning("No limit set on the maximum number of concurrent driver publishes. " - "Consider setting max_concurrent_publishes if you plan to work with many devices.") + _log.warning( + "No limit set on the maximum number of concurrent driver publishes. " + "Consider setting max_concurrent_publishes if you plan to work with many devices." + ) else: - _log.info("maximum concurrent driver publishes limited to " + str(max_concurrent_publishes)) + _log.info( + "maximum concurrent driver publishes limited to " + + str(max_concurrent_publishes) + ) configure_publish_lock(max_concurrent_publishes) self.scalability_test = bool(config["scalability_test"]) - self.scalability_test_iterations = int(config["scalability_test_iterations"]) + self.scalability_test_iterations = int( + config["scalability_test_iterations"] + ) if self.scalability_test: self.waiting_to_finish = set() @@ -258,31 +349,47 @@ def configure_main(self, config_name, action, contents): self.current_test_start = None except ValueError as e: - _log.error("ERROR PROCESSING STARTUP CRITICAL CONFIGURATION SETTINGS: {}".format(e)) + _log.error( + "ERROR PROCESSING STARTUP CRITICAL CONFIGURATION SETTINGS: {}".format( + e + ) + ) _log.error("Platform driver SHUTTING DOWN") sys.exit(1) else: if self.max_open_sockets != config["max_open_sockets"]: - _log.info("The platform driver must be restarted for changes to the max_open_sockets setting to take " - "effect") + _log.info( + "The platform driver must be restarted for changes to the max_open_sockets setting to take " + "effect" + ) if self.max_concurrent_publishes != config["max_concurrent_publishes"]: - _log.info("The platform driver must be restarted for changes to the max_concurrent_publishes setting to " - "take effect") + _log.info( + "The platform driver must be restarted for changes to the max_concurrent_publishes setting to " + "take effect" + ) if self.scalability_test != bool(config["scalability_test"]): if not self.scalability_test: _log.info( - "The platform driver must be restarted with scalability_test set to true in order to run a test.") + "The platform driver must be restarted with scalability_test set to true in order to run a test." + ) if self.scalability_test: - _log.info("A scalability test may not be interrupted. Restarting the driver is required to stop " - "the test.") + _log.info( + "A scalability test may not be interrupted. Restarting the driver is required to stop " + "the test." + ) try: - if self.scalability_test_iterations != int(config["scalability_test_iterations"]) and \ - self.scalability_test: - _log.info("A scalability test must be restarted for the scalability_test_iterations setting to " - "take effect.") + if ( + self.scalability_test_iterations + != int(config["scalability_test_iterations"]) + and self.scalability_test + ): + _log.info( + "A scalability test must be restarted for the scalability_test_iterations setting to " + "take effect." + ) except ValueError: pass @@ -305,7 +412,11 @@ def configure_main(self, config_name, action, contents): # If end time > current time, set override with new duration if end_time > now: delta = end_time - now - self._set_override_on(pattern, delta.total_seconds(), from_config_store=True) + self._set_override_on( + pattern, + delta.total_seconds(), + from_config_store=True, + ) else: self._override_patterns = set() except KeyError: @@ -328,23 +439,34 @@ def configure_main(self, config_name, action, contents): # TODO: set a health status for the agent if self.scalability_test and action == "UPDATE": - _log.info("Running scalability test. Settings may not be changed without restart.") + _log.info( + "Running scalability test. Settings may not be changed without restart." + ) return - if (self.driver_scrape_interval != driver_scrape_interval or - self.group_offset_interval != group_offset_interval): + if ( + self.driver_scrape_interval != driver_scrape_interval + or self.group_offset_interval != group_offset_interval + ): self.driver_scrape_interval = driver_scrape_interval self.group_offset_interval = group_offset_interval - _log.info("Setting time delta between driver device scrapes to " + str(driver_scrape_interval)) + _log.info( + "Setting time delta between driver device scrapes to " + + str(driver_scrape_interval) + ) # Reset all scrape schedules self.freed_time_slots.clear() self.group_counts.clear() for driver in self.instances.values(): time_slot = self.group_counts[driver.group] - driver.update_scrape_schedule(time_slot, self.driver_scrape_interval, - driver.group, self.group_offset_interval) + driver.update_scrape_schedule( + time_slot, + self.driver_scrape_interval, + driver.group, + self.group_offset_interval, + ) self.group_counts[driver.group] += 1 self.publish_depth_first_all = bool(config["publish_depth_first_all"]) @@ -354,13 +476,15 @@ def configure_main(self, config_name, action, contents): # Update the publish settings on running devices. for driver in self.instances.values(): - driver.update_publish_types(self.publish_depth_first_all, - self.publish_breadth_first_all, - self.publish_depth_first, - self.publish_breadth_first) + driver.update_publish_types( + self.publish_depth_first_all, + self.publish_breadth_first_all, + self.publish_depth_first, + self.publish_breadth_first, + ) def derive_device_topic(self, config_name): - _, topic = config_name.split('/', 1) + _, topic = config_name.split("/", 1) return topic def stop_driver(self, device_topic): @@ -394,67 +518,76 @@ def update_driver(self, config_name, action, contents): slot = self.freed_time_slots[group].pop(0) _log.info("Starting driver: {}".format(topic)) - driver = DriverAgent(self, contents, slot, self.driver_scrape_interval, topic, - group, self.group_offset_interval, - self.publish_depth_first_all, - self.publish_breadth_first_all, - self.publish_depth_first, - self.publish_breadth_first) + driver = DriverAgent( + self, + contents, + slot, + self.driver_scrape_interval, + topic, + group, + self.group_offset_interval, + self.publish_depth_first_all, + self.publish_breadth_first_all, + self.publish_depth_first, + self.publish_breadth_first, + ) gevent.spawn(driver.core.run) self.instances[topic] = driver self.group_counts[group] += 1 self._name_map[topic.lower()] = topic - self._update_override_state(topic, 'add') + self._update_override_state(topic, "add") def remove_driver(self, config_name, action, contents): topic = self.derive_device_topic(config_name) self.stop_driver(topic) - self._update_override_state(topic, 'remove') + self._update_override_state(topic, "remove") # def device_startup_callback(self, topic, driver): # _log.debug("Driver hooked up for "+topic) # topic = topic.strip('/') # self.instances[topic] = driver - + def scrape_starting(self, topic): if not self.scalability_test: return - + if not self.waiting_to_finish: # Start a new measurement self.current_test_start = datetime.now() self.waiting_to_finish = set(self.instances.keys()) - + if topic not in self.waiting_to_finish: _log.warning( - f"{topic} started twice before test finished, increase the length of scrape interval and rerun test") + f"{topic} started twice before test finished, increase the length of scrape interval and rerun test" + ) def scrape_ending(self, topic): if not self.scalability_test: return - + try: self.waiting_to_finish.remove(topic) except KeyError: _log.warning( - f"{topic} published twice before test finished, increase the length of scrape interval and rerun test") + f"{topic} published twice before test finished, increase the length of scrape interval and rerun test" + ) if not self.waiting_to_finish: end = datetime.now() delta = end - self.current_test_start delta = delta.total_seconds() self.test_results.append(delta) - + self.test_iterations += 1 - + _log.info("publish {} took {} seconds".format(self.test_iterations, delta)) - + if self.test_iterations >= self.scalability_test_iterations: # Test is now over. Button it up and shutdown. - mean = math_utils.mean(self.test_results) - stdev = math_utils.stdev(self.test_results) - _log.info("Mean total publish time: "+str(mean)) - _log.info("Std dev publish time: "+str(stdev)) + mean = math_utils.mean(self.test_results) + stdev = math_utils.stdev(self.test_results) + _log.info("Mean total publish time: " + str(mean)) + _log.info("Std dev publish time: " + str(stdev)) sys.exit(0) # def add_unresponsive_bacnet_device(self, peer, sender, bus, topic, headers, message): @@ -504,7 +637,10 @@ def set_point(self, path, point_name, value, **kwargs): """ if path in self._override_devices: raise OverrideError( - "Cannot set point on device {} since global override is set".format(path)) + "Cannot set point on device {} since global override is set".format( + path + ) + ) else: return self.instances[path].set_point(point_name, value, **kwargs) @@ -538,10 +674,15 @@ def set_multiple_points(self, path, point_names_values, **kwargs): """ if path in self._override_devices: raise OverrideError( - "Cannot set point on device {} since global override is set".format(path)) + "Cannot set point on device {} since global override is set".format( + path + ) + ) else: - return self.instances[path].set_multiple_points(point_names_values, **kwargs) - + return self.instances[path].set_multiple_points( + point_names_values, **kwargs + ) + @RPC.export def heart_beat(self): """RPC method @@ -551,7 +692,7 @@ def heart_beat(self): _log.debug("sending heartbeat") for device in self.instances.values(): device.heart_beat() - + @RPC.export def revert_point(self, path, point_name, **kwargs): """RPC method @@ -567,7 +708,10 @@ def revert_point(self, path, point_name, **kwargs): """ if path in self._override_devices: raise OverrideError( - "Cannot revert point on device {} since global override is set".format(path)) + "Cannot revert point on device {} since global override is set".format( + path + ) + ) else: self.instances[path].revert_point(point_name, **kwargs) @@ -584,12 +728,15 @@ def revert_device(self, path, **kwargs): """ if path in self._override_devices: raise OverrideError( - "Cannot revert device {} since global override is set".format(path)) + "Cannot revert device {} since global override is set".format(path) + ) else: self.instances[path].revert_all(**kwargs) @RPC.export - def set_override_on(self, pattern, duration=0.0, failsafe_revert=True, staggered_revert=False): + def set_override_on( + self, pattern, duration=0.0, failsafe_revert=True, staggered_revert=False + ): """RPC method Turn on override condition on all the devices matching the pattern. @@ -610,8 +757,14 @@ def set_override_on(self, pattern, duration=0.0, failsafe_revert=True, staggered """ self._set_override_on(pattern, duration, failsafe_revert, staggered_revert) - def _set_override_on(self, pattern, duration=0.0, failsafe_revert=True, staggered_revert=False, - from_config_store=False): + def _set_override_on( + self, + pattern, + duration=0.0, + failsafe_revert=True, + staggered_revert=False, + from_config_store=False, + ): """Turn on override condition on all devices matching the pattern. It schedules an event to keep track of the duration over which override has to be applied. New override patterns and corresponding end times are stored in config store. @@ -637,7 +790,9 @@ def _set_override_on(self, pattern, duration=0.0, failsafe_revert=True, staggere # If revert to default state is needed if failsafe_revert: if staggered_revert: - self.core.spawn_later(i*stagger_interval, self.instances[name].revert_all()) + self.core.spawn_later( + i * stagger_interval, self.instances[name].revert_all() + ) else: self.core.spawn(self.instances[name].revert_all()) # Set override @@ -700,7 +855,7 @@ def get_override_patterns(self): return list(self._override_patterns) @RPC.export - @RPC.allow('platform_driver_config') + @RPC.allow("platform_driver_config") def update_config_store(self, config_name, contents): self.vip.config.set(config_name, contents) @@ -734,7 +889,8 @@ def _set_override_off(self, pattern): else: _log.error("Override Pattern did not match!") raise OverrideError( - "Pattern {} does not exist in list of override patterns".format(pattern)) + "Pattern {} does not exist in list of override patterns".format(pattern) + ) def _update_override_interval(self, interval, pattern): """Schedules a new override event for the specified interval and pattern. If the pattern already exists and new @@ -805,7 +961,7 @@ def _update_override_state(self, device, state): """ device = device.lower() - if state == 'add': + if state == "add": # If device falls under the existing overridden patterns, then add it to list of overridden devices. for pattern in self._override_patterns: if fnmatch.fnmatch(device, pattern): @@ -832,11 +988,10 @@ def forward_bacnet_cov_value(self, source_address, point_name, point_values): def main(argv=sys.argv): """Main method called to start the agent.""" - utils.vip_main(platform_driver_agent, identity=PLATFORM_DRIVER, - version=__version__) + utils.vip_main(platform_driver_agent, identity=PLATFORM_DRIVER, version=__version__) -if __name__ == '__main__': +if __name__ == "__main__": # Entry point for script try: sys.exit(main()) diff --git a/services/core/PlatformDriverAgent/platform_driver/driver.py b/services/core/PlatformDriverAgent/platform_driver/driver.py index 2c5ee72c4f..1b3cf97a81 100644 --- a/services/core/PlatformDriverAgent/platform_driver/driver.py +++ b/services/core/PlatformDriverAgent/platform_driver/driver.py @@ -44,10 +44,12 @@ import gevent import traceback from volttron.platform.messaging import headers as headers_mod -from volttron.platform.messaging.topics import (DRIVER_TOPIC_BASE, - DRIVER_TOPIC_ALL, - DEVICES_VALUE, - DEVICES_PATH) +from volttron.platform.messaging.topics import ( + DRIVER_TOPIC_BASE, + DRIVER_TOPIC_ALL, + DEVICES_VALUE, + DEVICES_PATH, +) from volttron.platform.vip.agent.errors import VIPError, Again from .driver_locks import publish_lock @@ -58,69 +60,103 @@ class DriverAgent(BasicAgent): - def __init__(self, parent, config, time_slot, driver_scrape_interval, device_path, - group, group_offset_interval, - default_publish_depth_first_all=True, - default_publish_breadth_first_all=True, - default_publish_depth_first=True, - default_publish_breadth_first=True, - **kwargs): + def __init__( + self, + parent, + config, + time_slot, + driver_scrape_interval, + device_path, + group, + group_offset_interval, + default_publish_depth_first_all=True, + default_publish_breadth_first_all=True, + default_publish_depth_first=True, + default_publish_breadth_first=True, + **kwargs, + ): super(DriverAgent, self).__init__(**kwargs) self.heart_beat_value = 0 - self.device_name = '' - #Use the parent's vip connection + self.device_name = "" + # Use the parent's vip connection self.parent = parent self.vip = parent.vip self.config = config self.device_path = device_path self.last_noresponse = datetime.datetime.now() - self.update_publish_types(default_publish_depth_first_all , - default_publish_breadth_first_all, - default_publish_depth_first, - default_publish_breadth_first) - + self.update_publish_types( + default_publish_depth_first_all, + default_publish_breadth_first_all, + default_publish_depth_first, + default_publish_breadth_first, + ) try: interval = int(config.get("interval", 60)) if interval < 1.0: raise ValueError except ValueError: - _log.warning("Invalid device scrape interval {}. Defaulting to 60 seconds.".format(config.get("interval"))) + _log.warning( + "Invalid device scrape interval {}. Defaulting to 60 seconds.".format( + config.get("interval") + ) + ) interval = 60 self.interval = interval self.periodic_read_event = None - self.update_scrape_schedule(time_slot, driver_scrape_interval, group, group_offset_interval) - - def update_publish_types(self, publish_depth_first_all, - publish_breadth_first_all, - publish_depth_first, - publish_breadth_first): + self.update_scrape_schedule( + time_slot, driver_scrape_interval, group, group_offset_interval + ) + + def update_publish_types( + self, + publish_depth_first_all, + publish_breadth_first_all, + publish_depth_first, + publish_breadth_first, + ): """Setup which publish types happen for a scrape. - Values passed in are overridden by settings in the specific device configuration.""" - self.publish_depth_first_all = bool(self.config.get("publish_depth_first_all", publish_depth_first_all)) - self.publish_breadth_first_all = bool(self.config.get("publish_breadth_first_all", publish_breadth_first_all)) - self.publish_depth_first = bool(self.config.get("publish_depth_first", publish_depth_first)) - self.publish_breadth_first = bool(self.config.get("publish_breadth_first", publish_breadth_first)) - - - def update_scrape_schedule(self, time_slot, driver_scrape_interval, group, group_offset_interval): - self.time_slot_offset = (time_slot * driver_scrape_interval) + (group * group_offset_interval) + Values passed in are overridden by settings in the specific device configuration. + """ + self.publish_depth_first_all = bool( + self.config.get("publish_depth_first_all", publish_depth_first_all) + ) + self.publish_breadth_first_all = bool( + self.config.get("publish_breadth_first_all", publish_breadth_first_all) + ) + self.publish_depth_first = bool( + self.config.get("publish_depth_first", publish_depth_first) + ) + self.publish_breadth_first = bool( + self.config.get("publish_breadth_first", publish_breadth_first) + ) + + def update_scrape_schedule( + self, time_slot, driver_scrape_interval, group, group_offset_interval + ): + self.time_slot_offset = (time_slot * driver_scrape_interval) + ( + group * group_offset_interval + ) self.time_slot = time_slot self.group = group - _log.debug("{} group: {}, time_slot: {}, offset: {}".format(self.device_path, group, - time_slot, self.time_slot_offset)) + _log.debug( + "{} group: {}, time_slot: {}, offset: {}".format( + self.device_path, group, time_slot, self.time_slot_offset + ) + ) if self.time_slot_offset >= self.interval: _log.warning( - "Scrape offset exceeds interval. Required adjustment will cause scrapes to double up with other devices.") + "Scrape offset exceeds interval. Required adjustment will cause scrapes to double up with other devices." + ) while self.time_slot_offset >= self.interval: self.time_slot_offset -= self.interval - #check weather or not we have run our starting method. + # check weather or not we have run our starting method. if not self.periodic_read_event: return @@ -128,9 +164,9 @@ def update_scrape_schedule(self, time_slot, driver_scrape_interval, group, group next_periodic_read = self.find_starting_datetime(utils.get_aware_utc_now()) - self.periodic_read_event = self.core.schedule(next_periodic_read, self.periodic_read, next_periodic_read) - - + self.periodic_read_event = self.core.schedule( + next_periodic_read, self.periodic_read, next_periodic_read + ) def find_starting_datetime(self, now): midnight = now.replace(hour=0, minute=0, second=0, microsecond=0) @@ -146,13 +182,14 @@ def find_starting_datetime(self, now): next_in_seconds = previous_in_seconds + interval from_midnight = datetime.timedelta(seconds=next_in_seconds) - return midnight + from_midnight + datetime.timedelta(seconds=self.time_slot_offset) - + return ( + midnight + from_midnight + datetime.timedelta(seconds=self.time_slot_offset) + ) def get_interface(self, driver_type, config_dict, config_string): """Returns an instance of the interface""" module_name = "platform_driver.interfaces." + driver_type - module = __import__(module_name,globals(),locals(),[], 0) + module = __import__(module_name, globals(), locals(), [], 0) interfaces = module.interfaces sub_module = getattr(interfaces, driver_type) klass = getattr(sub_module, "Interface") @@ -160,7 +197,7 @@ def get_interface(self, driver_type, config_dict, config_string): interface.configure(config_dict, config_string) return interface - @Core.receiver('onstart') + @Core.receiver("onstart") def starting(self, sender, **kwargs): self.setup_device() @@ -169,10 +206,13 @@ def starting(self, sender, **kwargs): next_periodic_read = self.find_starting_datetime(utils.get_aware_utc_now()) - self.periodic_read_event = self.core.schedule(next_periodic_read, self.periodic_read, next_periodic_read) - - self.all_path_depth, self.all_path_breadth = self.get_paths_for_point(DRIVER_TOPIC_ALL) + self.periodic_read_event = self.core.schedule( + next_periodic_read, self.periodic_read, next_periodic_read + ) + self.all_path_depth, self.all_path_breadth = self.get_paths_for_point( + DRIVER_TOPIC_ALL + ) def setup_device(self): config = self.config @@ -186,44 +226,45 @@ def setup_device(self): self.meta_data = {} for point in self.interface.get_register_names(): register = self.interface.get_register_by_name(point) - if register.register_type == 'bit': - ts_type = 'boolean' + if register.register_type == "bit": + ts_type = "boolean" else: if register.python_type is int: - ts_type = 'integer' + ts_type = "integer" elif register.python_type is float: - ts_type = 'float' + ts_type = "float" elif register.python_type is bool: - ts_type = 'boolean' + ts_type = "boolean" elif register.python_type is str: - ts_type = 'string' - else: - ts_type = 'string' + ts_type = "string" + else: + ts_type = "string" _log.debug(f"ts_type is of type {register.python_type}") - self.meta_data[point] = {'units': register.get_units(), - 'type': ts_type, - 'tz': config.get('timezone', '')} - - self.base_topic = DEVICES_VALUE(campus='', - building='', - unit='', - path=self.device_path, - point=None) - - self.device_name = DEVICES_PATH(base='', - node='', - campus='', - building='', - unit='', - path=self.device_path, - point='') + self.meta_data[point] = { + "units": register.get_units(), + "type": ts_type, + "tz": config.get("timezone", ""), + } + + self.base_topic = DEVICES_VALUE( + campus="", building="", unit="", path=self.device_path, point=None + ) + + self.device_name = DEVICES_PATH( + base="", + node="", + campus="", + building="", + unit="", + path=self.device_path, + point="", + ) # self.parent.device_startup_callback(self.device_name, self) - def periodic_read(self, now): - #we not use self.core.schedule to prevent drift. + # we not use self.core.schedule to prevent drift. next_scrape_time = now + datetime.timedelta(seconds=self.interval) # Sanity check now. # This is specifically for when this is running in a VM that gets @@ -235,9 +276,13 @@ def periodic_read(self, now): if test_now - next_scrape_time > datetime.timedelta(seconds=self.interval): next_scrape_time = self.find_starting_datetime(test_now) - _log.debug("{} next scrape scheduled: {}".format(self.device_path, next_scrape_time)) + _log.debug( + "{} next scrape scheduled: {}".format(self.device_path, next_scrape_time) + ) - self.periodic_read_event = self.core.schedule(next_scrape_time, self.periodic_read, next_scrape_time) + self.periodic_read_event = self.core.schedule( + next_scrape_time, self.periodic_read, next_scrape_time + ) _log.debug("scraping device: " + self.device_name) start_time = time.time() @@ -252,10 +297,14 @@ def periodic_read(self, now): self.parent.point_count.labels(device=self.device_name).set(len(results)) _log.debug(f"{len(results)=}") register_names = self.interface.get_register_names_view() - for point in (register_names - results.keys()): + for point in register_names - results.keys(): + if hasattr(self.interface, "failing_points"): + self.interface.failing_points[point] = datetime.datetime.utcnow() depth_first_topic = self.base_topic(point=point) - self.parent.failed_point_scrape.labels(point=depth_first_topic, device=self.device_name).inc() - _log.error("Failed to scrape point: "+depth_first_topic) + self.parent.failed_point_scrape.labels( + point=depth_first_topic, device=self.device_name + ).inc() + _log.error("Failed to scrape point: " + depth_first_topic) except (Exception, gevent.Timeout) as exc: tb = traceback.format_exc() self.parent.error_counter.labels(device=self.device_name).inc() @@ -266,11 +315,13 @@ def periodic_read(self, now): # self.parent.last_scraped = datetime.datetime.now() return end_time = time.time() - scrape_time = end_time-start_time - self.parent.performance_histogram.labels(device=self.device_name).observe(scrape_time) + scrape_time = end_time - start_time + self.parent.performance_histogram.labels(device=self.device_name).observe( + scrape_time + ) self.parent.performance_gauge.labels(device=self.device_name).set(scrape_time) self.parent.last_scraped = datetime.datetime.now() - #temporarily moving return out of Excelt clause for testing + # temporarily moving return out of Excelt clause for testing # XXX: Does a warning need to be printed? if not results: _log.warning(f"no results for {self.device_name}") @@ -278,12 +329,14 @@ def periodic_read(self, now): utcnow = utils.get_aware_utc_now() utcnow_string = utils.format_timestamp(utcnow) - sync_timestamp = utils.format_timestamp(now - datetime.timedelta(seconds=self.time_slot_offset)) + sync_timestamp = utils.format_timestamp( + now - datetime.timedelta(seconds=self.time_slot_offset) + ) headers = { headers_mod.DATE: utcnow_string, headers_mod.TIMESTAMP: utcnow_string, - headers_mod.SYNC_TIMESTAMP: sync_timestamp + headers_mod.SYNC_TIMESTAMP: sync_timestamp, } if self.publish_depth_first or self.publish_breadth_first: @@ -292,25 +345,23 @@ def periodic_read(self, now): message = [value, self.meta_data[point]] if self.publish_depth_first: - self._publish_wrapper(depth_first_topic, - headers=headers, - message=message) + self._publish_wrapper( + depth_first_topic, headers=headers, message=message + ) if self.publish_breadth_first: - self._publish_wrapper(breadth_first_topic, - headers=headers, - message=message) + self._publish_wrapper( + breadth_first_topic, headers=headers, message=message + ) message = [results, self.meta_data] if self.publish_depth_first_all: - self._publish_wrapper(self.all_path_depth, - headers=headers, - message=message) + self._publish_wrapper(self.all_path_depth, headers=headers, message=message) if self.publish_breadth_first_all: - self._publish_wrapper(self.all_path_breadth, - headers=headers, - message=message) + self._publish_wrapper( + self.all_path_breadth, headers=headers, message=message + ) self.parent.scrape_ending(self.device_name) @@ -319,14 +370,13 @@ def _publish_wrapper(self, topic, headers, message): try: with publish_lock(): _log.debug("publishing: " + topic) - self.vip.pubsub.publish('pubsub', - topic, - headers=headers, - message=message).get(timeout=10.0) + self.vip.pubsub.publish( + "pubsub", topic, headers=headers, message=message + ).get(timeout=10.0) _log.debug("finish publishing: " + topic) except gevent.Timeout: - _log.warning("Did not receive confirmation of publish to "+topic) + _log.warning("Did not receive confirmation of publish to " + topic) break except Again: _log.warning("publish delayed: " + topic + " pubsub is busy") @@ -343,18 +393,20 @@ def heart_beat(self): self.heart_beat_value = int(not bool(self.heart_beat_value)) - _log.debug("sending heartbeat: " + self.device_name + ' ' + str(self.heart_beat_value)) + _log.debug( + "sending heartbeat: " + self.device_name + " " + str(self.heart_beat_value) + ) self.set_point(self.heart_beat_point, self.heart_beat_value) def get_paths_for_point(self, point): depth_first = self.base_topic(point=point) - parts = depth_first.split('/') + parts = depth_first.split("/") breadth_first_parts = parts[1:] breadth_first_parts.reverse() breadth_first_parts = [DRIVER_TOPIC_BASE] + breadth_first_parts - breadth_first = '/'.join(breadth_first_parts) + breadth_first = "/".join(breadth_first_parts) return depth_first, breadth_first @@ -368,14 +420,14 @@ def scrape_all(self): return self.interface.scrape_all() def get_multiple_points(self, point_names, **kwargs): - return self.interface.get_multiple_points(self.device_name, - point_names, - **kwargs) + return self.interface.get_multiple_points( + self.device_name, point_names, **kwargs + ) def set_multiple_points(self, point_names_values, **kwargs): - return self.interface.set_multiple_points(self.device_name, - point_names_values, - **kwargs) + return self.interface.set_multiple_points( + self.device_name, point_names_values, **kwargs + ) def revert_point(self, point_name, **kwargs): self.interface.revert_point(point_name, **kwargs) @@ -402,24 +454,27 @@ def publish_cov_value(self, point_name, point_values): individual_point_message = [value, self.meta_data[point_name]] depth_first_topic, breadth_first_topic = self.get_paths_for_point( - point_name) + point_name + ) if self.publish_depth_first: - self._publish_wrapper(depth_first_topic, - headers=headers, - message=individual_point_message) + self._publish_wrapper( + depth_first_topic, headers=headers, message=individual_point_message + ) # if self.publish_breadth_first: - self._publish_wrapper(breadth_first_topic, - headers=headers, - message=individual_point_message) + self._publish_wrapper( + breadth_first_topic, + headers=headers, + message=individual_point_message, + ) if self.publish_depth_first_all: - self._publish_wrapper(self.all_path_depth, - headers=headers, - message=all_message) + self._publish_wrapper( + self.all_path_depth, headers=headers, message=all_message + ) if self.publish_breadth_first_all: - self._publish_wrapper(self.all_path_breadth, - headers=headers, - message=all_message) + self._publish_wrapper( + self.all_path_breadth, headers=headers, message=all_message + ) diff --git a/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py b/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py index 9fca399772..2916279e66 100644 --- a/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py +++ b/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py @@ -41,6 +41,7 @@ import gevent import traceback from datetime import datetime, timedelta +from typing import Dict from platform_driver.driver_exceptions import DriverConfigError from platform_driver.interfaces import BaseInterface, BaseRegister @@ -101,6 +102,8 @@ def __init__(self, **kwargs): self.use_read_multiple = True self.enable_collection = True self.collection_disabled_time = None + self.failing_points: Dict[str, datetime] = {} + self.fail_retry = timedelta(hours=1) # self.unresponsive_devices = {} def configure(self, config_dict, registry_config_str): @@ -214,6 +217,7 @@ def set_point(self, point_name, value, priority=None): def scrape_all(self): # TODO: support reading from an array. + now = datetime.now() point_map = {} point_names = [] read_registers = self.get_registers_by_type("byte", True) @@ -226,6 +230,11 @@ def scrape_all(self): self.enable_collection = True for register in read_registers + write_registers: + if register.point_name in self.failing_points and self.failing_points[ + register.point_name + ] > (now - self.fail_retry): + _log.debug(f"Skipping {register.point_name} due to recent failure.") + continue point_names.append(register.point_name) point_map[register.point_name] = [ register.object_type, From 87e53c2746c3377a36ff08e0d5f196c1300d399c Mon Sep 17 00:00:00 2001 From: Andrew Rodgers Date: Sat, 5 Apr 2025 17:29:00 -0500 Subject: [PATCH 03/14] Enabling retry after failure timeout --- .../platform_driver/interfaces/bacnet.py | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py b/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py index 2916279e66..4ffb2097ee 100644 --- a/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py +++ b/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py @@ -166,6 +166,9 @@ def ping_target(self): self.schedule_ping() def get_point(self, point_name, get_priority_array=False): + """ + Concrete imglementation of get_point for BACnet interface. + """ register = self.get_register_by_name(point_name) property_name = "priorityArray" if get_priority_array else register.property register_index = None if get_priority_array else register.index @@ -181,6 +184,9 @@ def get_point(self, point_name, get_priority_array=False): return result def set_point(self, point_name, value, priority=None): + """ + Concrete implementation of set_point for BACnet interface. + """ # TODO: support writing from an array. register = self.get_register_by_name(point_name) if register.read_only: @@ -230,11 +236,15 @@ def scrape_all(self): self.enable_collection = True for register in read_registers + write_registers: - if register.point_name in self.failing_points and self.failing_points[ - register.point_name - ] > (now - self.fail_retry): - _log.debug(f"Skipping {register.point_name} due to recent failure.") - continue + if register.point_name in self.failing_points: + if self.failing_points[register.point_name] > (now - self.fail_retry): + _log.debug(f"Skipping {register.point_name} due to recent failure.") + continue + else: + _log.debug( + f"Retrying {register.point_name} after failure period expired." + ) + del self.failing_points[register.point_name] point_names.append(register.point_name) point_map[register.point_name] = [ register.object_type, From 39671e56d3df8626eb626ee4e61e47c748eff8e1 Mon Sep 17 00:00:00 2001 From: Andrew Rodgers Date: Sat, 5 Apr 2025 17:36:24 -0500 Subject: [PATCH 04/14] Removed schedule as it was causing problems in BP, removed debug logging --- .../platform_driver/interfaces/bacnet.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py b/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py index 4ffb2097ee..2f2ea77fab 100644 --- a/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py +++ b/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py @@ -65,7 +65,7 @@ "binaryValue": bool, "binaryInput": bool, "binaryOutput": bool, - "schedule": bool, + # "schedule": bool, } @@ -256,6 +256,7 @@ def scrape_all(self): results = [] for i in range(0, len(point_names), self.max_per_request): use_read_multiple = self.use_read_multiple + # generate a batch of reads equal to the max_per_request batch = { key: point_map[key] for key in point_names[i : i + self.max_per_request] } @@ -272,7 +273,9 @@ def scrape_all(self): _log.debug(f"found {len(batch_result)} results in platform driver") results.append(batch_result) except gevent.timeout.Timeout as exc: - _log.error(f"Timed out reading target {self.target_address}") + _log.error(f"Timed out reading target {self.target_address} with batch {batch}: {exc}") + if not use_read_multiple: + break raise exc except RemoteError as exc: if "unknownProperty" in exc.message: From ad545f07244acab766d6358925c405050526d00b Mon Sep 17 00:00:00 2001 From: Andrew Rodgers Date: Sat, 5 Apr 2025 17:42:23 -0500 Subject: [PATCH 05/14] Using gauge instead of counter for point failures, as all we care about is how many failed during each scrape, no matter the scrape rate. --- services/core/PlatformDriverAgent/platform_driver/agent.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/core/PlatformDriverAgent/platform_driver/agent.py b/services/core/PlatformDriverAgent/platform_driver/agent.py index d45caff87b..ea0d388afe 100644 --- a/services/core/PlatformDriverAgent/platform_driver/agent.py +++ b/services/core/PlatformDriverAgent/platform_driver/agent.py @@ -234,7 +234,7 @@ def __init__( ["device"], registry=self.collector_registry, ) - self.failed_point_scrape = Counter( + self.failed_point_scrape = Gauge( "failed_point_scrape", "Failed scrape for existing point", ["point", "device"], From 7de96a1f3b31622ffd2af21d48de3842f846f6a8 Mon Sep 17 00:00:00 2001 From: Andrew Rodgers Date: Sat, 5 Apr 2025 17:49:15 -0500 Subject: [PATCH 06/14] Cleaning up debug --- .../PlatformDriverAgent/platform_driver/interfaces/bacnet.py | 1 - 1 file changed, 1 deletion(-) diff --git a/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py b/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py index 2f2ea77fab..ab404c1dea 100644 --- a/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py +++ b/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py @@ -333,7 +333,6 @@ def scrape_all(self): else: break result = {k: v for d in results for k, v in d.items()} - _log.debug(f"{self.target_address=}") return result def revert_all(self, priority=None): From 366176dfd55376ac847664236e92213f4d8a4420 Mon Sep 17 00:00:00 2001 From: Andrew Rodgers Date: Sat, 5 Apr 2025 17:55:05 -0500 Subject: [PATCH 07/14] Removing noisy debug statements --- .../PlatformDriverAgent/platform_driver/driver.py | 12 +++++++----- .../platform_driver/interfaces/bacnet.py | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/services/core/PlatformDriverAgent/platform_driver/driver.py b/services/core/PlatformDriverAgent/platform_driver/driver.py index 1b3cf97a81..ce68b47b95 100644 --- a/services/core/PlatformDriverAgent/platform_driver/driver.py +++ b/services/core/PlatformDriverAgent/platform_driver/driver.py @@ -295,16 +295,18 @@ def periodic_read(self, now): # return results = self.interface.scrape_all() self.parent.point_count.labels(device=self.device_name).set(len(results)) - _log.debug(f"{len(results)=}") register_names = self.interface.get_register_names_view() + count_scrape_failed = 0 for point in register_names - results.keys(): if hasattr(self.interface, "failing_points"): - self.interface.failing_points[point] = datetime.datetime.utcnow() + if point not in self.interface.failing_points: + self.interface.failing_points[point] = datetime.datetime.utcnow() depth_first_topic = self.base_topic(point=point) - self.parent.failed_point_scrape.labels( - point=depth_first_topic, device=self.device_name - ).inc() + count_scrape_failed += 1 _log.error("Failed to scrape point: " + depth_first_topic) + self.parent.failed_point_scrape.labels( + point=depth_first_topic, device=self.device_name + ).set(count_scrape_failed) except (Exception, gevent.Timeout) as exc: tb = traceback.format_exc() self.parent.error_counter.labels(device=self.device_name).inc() diff --git a/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py b/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py index ab404c1dea..4f01f74197 100644 --- a/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py +++ b/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py @@ -270,7 +270,7 @@ def scrape_all(self): self.max_per_request, use_read_multiple, ).get(timeout=180) - _log.debug(f"found {len(batch_result)} results in platform driver") + # _log.debug(f"found {len(batch_result)} results in platform driver") results.append(batch_result) except gevent.timeout.Timeout as exc: _log.error(f"Timed out reading target {self.target_address} with batch {batch}: {exc}") From 21b1004ca40f6d9ebe648e5b01ae328b1de13343 Mon Sep 17 00:00:00 2001 From: Andrew Rodgers Date: Sat, 5 Apr 2025 17:57:18 -0500 Subject: [PATCH 08/14] Cleaning up more debug logging --- services/core/PlatformDriverAgent/platform_driver/driver.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/services/core/PlatformDriverAgent/platform_driver/driver.py b/services/core/PlatformDriverAgent/platform_driver/driver.py index ce68b47b95..24bead2774 100644 --- a/services/core/PlatformDriverAgent/platform_driver/driver.py +++ b/services/core/PlatformDriverAgent/platform_driver/driver.py @@ -298,12 +298,12 @@ def periodic_read(self, now): register_names = self.interface.get_register_names_view() count_scrape_failed = 0 for point in register_names - results.keys(): + depth_first_topic = self.base_topic(point=point) if hasattr(self.interface, "failing_points"): if point not in self.interface.failing_points: + _log.error("Failed to scrape point, adding to failing points: " + depth_first_topic) self.interface.failing_points[point] = datetime.datetime.utcnow() - depth_first_topic = self.base_topic(point=point) count_scrape_failed += 1 - _log.error("Failed to scrape point: " + depth_first_topic) self.parent.failed_point_scrape.labels( point=depth_first_topic, device=self.device_name ).set(count_scrape_failed) From ef79b4bdc99bb8dc402dc79058cb3ac8abc3e2d3 Mon Sep 17 00:00:00 2001 From: Andrew Rodgers Date: Sat, 5 Apr 2025 18:00:08 -0500 Subject: [PATCH 09/14] Spread out retries so as not to load device up all at once --- services/core/PlatformDriverAgent/platform_driver/driver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/core/PlatformDriverAgent/platform_driver/driver.py b/services/core/PlatformDriverAgent/platform_driver/driver.py index 24bead2774..945615ea9d 100644 --- a/services/core/PlatformDriverAgent/platform_driver/driver.py +++ b/services/core/PlatformDriverAgent/platform_driver/driver.py @@ -302,7 +302,7 @@ def periodic_read(self, now): if hasattr(self.interface, "failing_points"): if point not in self.interface.failing_points: _log.error("Failed to scrape point, adding to failing points: " + depth_first_topic) - self.interface.failing_points[point] = datetime.datetime.utcnow() + self.interface.failing_points[point] = datetime.datetime.utcnow() + datetime.timedelta(minutes=random.randint(1,30)) count_scrape_failed += 1 self.parent.failed_point_scrape.labels( point=depth_first_topic, device=self.device_name From 8bf5e373a31f20472bca283cb12ee3726013c473 Mon Sep 17 00:00:00 2001 From: Andrew Rodgers Date: Sat, 5 Apr 2025 18:00:47 -0500 Subject: [PATCH 10/14] adding comments --- .../PlatformDriverAgent/platform_driver/driver.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/services/core/PlatformDriverAgent/platform_driver/driver.py b/services/core/PlatformDriverAgent/platform_driver/driver.py index 945615ea9d..1f770506db 100644 --- a/services/core/PlatformDriverAgent/platform_driver/driver.py +++ b/services/core/PlatformDriverAgent/platform_driver/driver.py @@ -301,8 +301,15 @@ def periodic_read(self, now): depth_first_topic = self.base_topic(point=point) if hasattr(self.interface, "failing_points"): if point not in self.interface.failing_points: - _log.error("Failed to scrape point, adding to failing points: " + depth_first_topic) - self.interface.failing_points[point] = datetime.datetime.utcnow() + datetime.timedelta(minutes=random.randint(1,30)) + _log.error( + "Failed to scrape point, adding to failing points: " + + depth_first_topic + ) + self.interface.failing_points[ + point + ] = datetime.datetime.utcnow() + datetime.timedelta( + minutes=random.randint(1, 30) # random backoff time + ) count_scrape_failed += 1 self.parent.failed_point_scrape.labels( point=depth_first_topic, device=self.device_name From 07c36ad956c135e4d9f006d5fd7ddae6ebc9e34b Mon Sep 17 00:00:00 2001 From: Andrew Rodgers Date: Sat, 5 Apr 2025 19:35:33 -0500 Subject: [PATCH 11/14] Fixing issue with metrics, making sure successful reads reset error --- .../platform_driver/driver.py | 37 ++++++++++--------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/services/core/PlatformDriverAgent/platform_driver/driver.py b/services/core/PlatformDriverAgent/platform_driver/driver.py index 1f770506db..d94fb7336d 100644 --- a/services/core/PlatformDriverAgent/platform_driver/driver.py +++ b/services/core/PlatformDriverAgent/platform_driver/driver.py @@ -296,24 +296,27 @@ def periodic_read(self, now): results = self.interface.scrape_all() self.parent.point_count.labels(device=self.device_name).set(len(results)) register_names = self.interface.get_register_names_view() - count_scrape_failed = 0 - for point in register_names - results.keys(): + for point in register_names: depth_first_topic = self.base_topic(point=point) - if hasattr(self.interface, "failing_points"): - if point not in self.interface.failing_points: - _log.error( - "Failed to scrape point, adding to failing points: " - + depth_first_topic - ) - self.interface.failing_points[ - point - ] = datetime.datetime.utcnow() + datetime.timedelta( - minutes=random.randint(1, 30) # random backoff time - ) - count_scrape_failed += 1 - self.parent.failed_point_scrape.labels( - point=depth_first_topic, device=self.device_name - ).set(count_scrape_failed) + if point not in results.keys(): + self.parent.failed_point_scrape.labels( + point=depth_first_topic, device=self.device_name + ).set(1) + if hasattr(self.interface, "failing_points"): + if point not in self.interface.failing_points: + _log.error( + "Failed to scrape point, adding to failing points: " + + depth_first_topic + ) + self.interface.failing_points[ + point + ] = datetime.datetime.utcnow() + datetime.timedelta( + minutes=random.randint(1, 30) # random backoff time + ) + else: + self.parent.failed_point_scrape.labels( + point=depth_first_topic, device=self.device_name + ).set(0) except (Exception, gevent.Timeout) as exc: tb = traceback.format_exc() self.parent.error_counter.labels(device=self.device_name).inc() From 731a65da3c40ed95019d9d065c19f621f5aee9c9 Mon Sep 17 00:00:00 2001 From: Andrew Rodgers Date: Sun, 6 Apr 2025 16:03:50 +0000 Subject: [PATCH 12/14] Updated timeout defaults to be better for batch reads --- services/core/PlatformDriverAgent/README.md | 2 ++ services/core/PlatformDriverAgent/platform_driver/agent.py | 2 +- .../platform_driver/interfaces/bacnet.py | 6 +++++- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/services/core/PlatformDriverAgent/README.md b/services/core/PlatformDriverAgent/README.md index 9e4f2fe2f1..3750c2b540 100644 --- a/services/core/PlatformDriverAgent/README.md +++ b/services/core/PlatformDriverAgent/README.md @@ -76,6 +76,8 @@ to the device. Heart beats are triggered by the Actuator Agent which must be run ## Changes +- 4.5.2 (2025-04-05) + - Using saner defaults for per-batch timeout - 4.5.1 (2025-04-05) - Added failing point cache with one hour default timeout - 4.5.0 (2025-04-05) diff --git a/services/core/PlatformDriverAgent/platform_driver/agent.py b/services/core/PlatformDriverAgent/platform_driver/agent.py index ea0d388afe..9b5bcf5828 100644 --- a/services/core/PlatformDriverAgent/platform_driver/agent.py +++ b/services/core/PlatformDriverAgent/platform_driver/agent.py @@ -64,7 +64,7 @@ utils.setup_logging() _log = logging.getLogger(__name__) -__version__ = "4.5.1" +__version__ = "4.5.2" PROMETHEUS_METRICS_FILE = ( diff --git a/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py b/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py index 4f01f74197..424f4a8340 100644 --- a/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py +++ b/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py @@ -254,6 +254,10 @@ def scrape_all(self): ] results = [] + if self.use_read_multiple: + timeout = min( self.timeout, max(30, self.max_per_request), 180) # cap timeout to 3 minutes for each batch + else: + timeout = self.timeout for i in range(0, len(point_names), self.max_per_request): use_read_multiple = self.use_read_multiple # generate a batch of reads equal to the max_per_request @@ -269,7 +273,7 @@ def scrape_all(self): batch, self.max_per_request, use_read_multiple, - ).get(timeout=180) + ).get(timeout=timeout) # _log.debug(f"found {len(batch_result)} results in platform driver") results.append(batch_result) except gevent.timeout.Timeout as exc: From 244dd3779691c58a6fb00f95e1253b394ac780b4 Mon Sep 17 00:00:00 2001 From: Andrew Rodgers Date: Mon, 14 Apr 2025 11:43:39 -0500 Subject: [PATCH 13/14] updating default ping interval --- .../platform_driver/interfaces/bacnet.py | 253 ++++++++++++++++-- 1 file changed, 232 insertions(+), 21 deletions(-) diff --git a/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py b/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py index 424f4a8340..d6f63cd9b5 100644 --- a/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py +++ b/services/core/PlatformDriverAgent/platform_driver/interfaces/bacnet.py @@ -36,23 +36,35 @@ # under Contract DE-AC05-76RL01830 # }}} +""" +BACnet Driver Interface + +This module provides an interface to communicate with BACnet devices +using the BACnet/IP protocol. It handles device discovery, reading and writing +properties, and supports Change of Value (COV) subscriptions for efficient +data collection. +""" import logging -import gevent import traceback from datetime import datetime, timedelta from typing import Dict +import gevent from platform_driver.driver_exceptions import DriverConfigError from platform_driver.interfaces import BaseInterface, BaseRegister -from volttron.platform.vip.agent import errors + from volttron.platform.jsonrpc import RemoteError +from volttron.platform.vip.agent import errors # Logging is completely configured by now. _log = logging.getLogger(__name__) +# Default lifetime for COV subscriptions in seconds DEFAULT_COV_LIFETIME = 180 +# Buffer time to renew COV subscriptions before they expire COV_UPDATE_BUFFER = 3 +# Mapping of BACnet object types to Python types for proper value conversion BACNET_TYPE_MAPPING = { "multiStateValue": int, "multiStateInput": int, @@ -70,6 +82,13 @@ class Register(BaseRegister): + """ + BACnet Register Class + + Represents a single BACnet point or property to be read from or written to. + Extends the BaseRegister class to include BACnet-specific attributes. + """ + def __init__( self, instance_number, @@ -82,6 +101,20 @@ def __init__( priority=None, list_index=None, ): + """ + Initialize a BACnet register. + + Args: + instance_number: BACnet object instance number + object_type: BACnet object type (e.g., "analogValue", "binaryOutput") + property_name: BACnet property to read/write (usually "presentValue") + read_only: Boolean indicating if point is read-only + point_name: Volttron point name + units: Engineering units of the point + description: Optional description of the point + priority: BACnet priority for writing (1-16, None for read-only points) + list_index: Index for accessing array properties, None for non-array properties + """ super(Register, self).__init__( "byte", read_only, point_name, units, description=description ) @@ -94,60 +127,116 @@ def __init__( class Interface(BaseInterface): + """ + BACnet Interface Implementation + + This class provides the interface for communicating with BACnet devices + through a BACnet proxy agent. It handles device configuration, point discovery, + reading/writing values, and optimizing communication with devices. + """ + def __init__(self, **kwargs): + """ + Initialize the BACnet interface. + + Args: + **kwargs: Keyword arguments passed to the base interface + """ super(Interface, self).__init__(**kwargs) + # Initial maximum register count for batch operations self.register_count = 10000 + # Used to reduce register count when segmentation issues occur self.register_count_divisor = 1 + # List of points configured for COV (Change of Value) subscriptions self.cov_points = [] + # Flag to enable/disable batch reads when possible self.use_read_multiple = True + # Flag to enable/disable data collection (disabled when device is unreachable) self.enable_collection = True + # Timestamp when collection was disabled self.collection_disabled_time = None + # Dictionary to track points that have failed and when they failed self.failing_points: Dict[str, datetime] = {} + # Time period to wait before retrying failed points self.fail_retry = timedelta(hours=1) + # Dictionary to track unresponsive devices (commented out) # self.unresponsive_devices = {} def configure(self, config_dict, registry_config_str): + """ + Configure the BACnet interface with settings from the driver configuration. + + Args: + config_dict: A dictionary of driver configuration settings + registry_config_str: A list of register definitions (points to be read/written) + """ + # Minimum BACnet priority allowed for writing (1-16, with 1 being highest priority) self.min_priority = config_dict.get("min_priority", 8) + # Parse the registry configuration to create register objects self.parse_config(registry_config_str) + # BACnet device address (IP:Port for BACnet/IP) self.target_address = config_dict.get("device_address") + # BACnet device instance ID self.device_id = int(config_dict.get("device_id")) + # Lifetime for COV subscriptions in seconds self.cov_lifetime = config_dict.get("cov_lifetime", DEFAULT_COV_LIFETIME) + # VIP address of the BACnet proxy agent self.proxy_address = config_dict.get("proxy_address", "platform.bacnet_proxy") + # Maximum number of points to read in a single request self.max_per_request = config_dict.get("max_per_request", 24) + # Whether to use ReadPropertyMultiple service when possible self.use_read_multiple = config_dict.get("use_read_multiple", True) + # Timeout for BACnet requests in seconds self.timeout = float(config_dict.get("timeout", 30.0)) + # Whether to attempt single-point reads if batch reads fail self.failover_bacnet_to_single = bool( config_dict.get("failover_bacnet_to_single", True) ) + # How often to retry pinging an unresponsive device self.ping_retry_interval = timedelta( - seconds=config_dict.get("ping_retry_interval", 5.0) + seconds=config_dict.get("ping_retry_interval", 900) ) self.scheduled_ping = None + # Initial ping to verify device is reachable self.ping_target() - # list of points to establish change of value subscriptions with, generated from the registry config + # Establish COV subscriptions for all points marked for COV in the registry for point_name in self.cov_points: self.establish_cov_subscription(point_name, self.cov_lifetime, True) def schedule_ping(self): + """ + Schedules a ping attempt to the BACnet device after the retry interval. + Ensures we don't schedule multiple pings simultaneously. + """ if self.scheduled_ping is None: now = datetime.now() next_try = now + self.ping_retry_interval self.scheduled_ping = self.core.schedule(next_try, self.ping_target) def ping_target(self): + """ + Sends a directed WhoIsRequest to the BACnet device to verify connectivity. + + For devices behind routers (especially RemoteStation addresses), this establishes + the network route to the device. If successful, enables data collection. + If unsuccessful, schedules a retry based on ping_retry_interval. + """ # Some devices (mostly RemoteStation addresses behind routers) will not be reachable without # first establishing the route to the device. Sending a directed WhoIsRequest is will # settle that for us when the response comes back. pinged = False try: + # Call the ping_device method on the BACnet proxy agent self.vip.rpc.call( self.proxy_address, "ping_device", self.target_address, self.device_id ).get(timeout=self.timeout) pinged = True + # Device responded, enable data collection + self.enable_collection = True except errors.Unreachable: _log.warning("Unable to reach BACnet proxy.") @@ -159,19 +248,32 @@ def ping_target(self): f"Timeout trying to ping device {self.target_address}. Scheduling to retry" ) + # Clear the scheduled ping flag self.scheduled_ping = None - # Schedule retry. + # Schedule retry if ping was unsuccessful if not pinged: self.schedule_ping() def get_point(self, point_name, get_priority_array=False): """ - Concrete imglementation of get_point for BACnet interface. + Concrete implementation of get_point for BACnet interface. + + Args: + point_name: Name of the point/register to read + get_priority_array: If True, reads the priorityArray property instead of the + register's configured property (usually presentValue) + + Returns: + The value of the requested point/property as the appropriate Python type """ + # Get the register object for this point register = self.get_register_by_name(point_name) + # Determine which property to read - either priorityArray or the configured property property_name = "priorityArray" if get_priority_array else register.property + # For priorityArray, we don't use an index value register_index = None if get_priority_array else register.index + # Use the BACnet proxy to read the property result = self.vip.rpc.call( self.proxy_address, "read_property", @@ -186,14 +288,30 @@ def get_point(self, point_name, get_priority_array=False): def set_point(self, point_name, value, priority=None): """ Concrete implementation of set_point for BACnet interface. + + Args: + point_name: Name of the point to write to + value: Value to write to the point + priority: BACnet priority to use (1-16, with 1 being highest priority) + If None, uses the register's configured priority + + Returns: + Result from the BACnet write operation + + Raises: + IOError: If trying to write to a read-only point or using a priority + lower than the configured minimum priority """ # TODO: support writing from an array. + # Get the register object for this point register = self.get_register_by_name(point_name) + # Check if point is configured as read-only if register.read_only: raise IOError( "Trying to write to a point configured read only: " + point_name ) + # Validate the requested priority against minimum allowed if priority is not None and priority < self.min_priority: raise IOError( "Trying to write with a priority lower than the minimum of " @@ -201,6 +319,7 @@ def set_point(self, point_name, value, priority=None): ) # We've already validated the register priority against the min priority. + # Prepare arguments for the BACnet write_property call args = [ self.target_address, value, @@ -210,6 +329,7 @@ def set_point(self, point_name, value, priority=None): priority if priority is not None else register.priority, register.index, ] + # Use the BACnet proxy to write the property result = self.vip.rpc.call(self.proxy_address, "write_property", *args).get( timeout=self.timeout ) @@ -222,19 +342,37 @@ def set_point(self, point_name, value, priority=None): # self.unresponsive_devices[address] = datetime.now() def scrape_all(self): + """ + Reads values for all configured points from the BACnet device. + + This method implements an efficient batch-reading strategy with various + fallback mechanisms to handle device limitations: + 1. Tries to read points in batches using ReadPropertyMultiple when possible + 2. Falls back to single-point reads if batch reads fail + 3. Skips points that have recently failed until their retry period expires + 4. Handles various BACnet communication errors with appropriate strategies + + Returns: + Dictionary mapping point names to their current values + """ # TODO: support reading from an array. now = datetime.now() point_map = {} point_names = [] + # Get all registered points (both read-only and writable) read_registers = self.get_registers_by_type("byte", True) write_registers = self.get_registers_by_type("byte", False) + # Check if collection is disabled (due to previous communication failures) if self.enable_collection is False: + # If disabled for less than 24 hours, skip collection attempt if datetime.now() - self.collection_disabled_time < timedelta(hours=24): return else: + # After 24 hours, try again self.enable_collection = True + # Build list of points to read, skipping recently failed points for register in read_registers + write_registers: if register.point_name in self.failing_points: if self.failing_points[register.point_name] > (now - self.fail_retry): @@ -253,19 +391,28 @@ def scrape_all(self): register.index, ] + # If no points to read, disable collection and schedule ping + if len(point_map) == 0: + self.enable_collection = False + self.schedule_ping() + + # Storage for batch read results results = [] + # Process points in batches based on max_per_request setting if self.use_read_multiple: timeout = min( self.timeout, max(30, self.max_per_request), 180) # cap timeout to 3 minutes for each batch else: timeout = self.timeout for i in range(0, len(point_names), self.max_per_request): + # Start with configured read_multiple setting, may change based on device capability use_read_multiple = self.use_read_multiple - # generate a batch of reads equal to the max_per_request + # Generate a batch of reads equal to the max_per_request batch = { key: point_map[key] for key in point_names[i : i + self.max_per_request] } while True: try: + # Attempt to read properties through the BACnet proxy batch_result = self.vip.rpc.call( self.proxy_address, "read_properties", @@ -277,14 +424,21 @@ def scrape_all(self): # _log.debug(f"found {len(batch_result)} results in platform driver") results.append(batch_result) except gevent.timeout.Timeout as exc: - _log.error(f"Timed out reading target {self.target_address} with batch {batch}: {exc}") + # Handle timeouts during reading + _log.error( + f"Timed out reading target {self.target_address} with batch {batch}: {exc}" + ) if not use_read_multiple: + # If already using single reads, give up on this batch break + # Otherwise propagate the error raise exc except RemoteError as exc: + # Handle unknown property errors if "unknownProperty" in exc.message: _log.debug(f"unknownProperty error: {exc.message}") # self.vip.config.set("unknown_properties", exc.message) + # Handle no response from device if ( "noResponse" in exc.message and self.use_read_multiple @@ -293,18 +447,24 @@ def scrape_all(self): _log.warning( f"device {self.target_address} did not respond reading multiple" ) + # Fallback to single property reads use_read_multiple = False continue elif "noResponse" in exc.message and not self.use_read_multiple: - # disable device for collection + # If still no response with single reads, disable collection + # and schedule ping to reenable collection when device responds after interval self.enable_collection = False + self.collection_disabled_time = datetime.now() + self.schedule_ping() break + # Handle segmentation not supported error if "segmentationNotSupported" in exc.message: if self.max_per_request <= 1: _log.error( "Receiving a segmentationNotSupported error with 'max_per_request' setting of 1." ) raise + # Reduce number of points per request and try again self.register_count_divisor += 1 self.max_per_request = max( int(self.register_count / self.register_count_divisor), 1 @@ -314,6 +474,7 @@ def scrape_all(self): + str(self.max_per_request) ) continue + # Handle unrecognized service error (device doesn't support ReadPropertyMultiple) elif ( exc.message.endswith("rejected the request: 9") and self.use_read_multiple @@ -324,56 +485,87 @@ def scrape_all(self): self.use_read_multiple = False continue else: + # Log and re-raise other errors trace = traceback.format_exc() _log.error( f"Error reading target {self.target_address}: {trace}" ) raise exc except errors.Unreachable: - # If the Proxy is not running bail. + # If the Proxy is not running bail _log.warning("Unable to reach BACnet proxy.") self.schedule_ping() raise else: + # Successfully read this batch, break the retry loop break + # Combine all batch results into a single dictionary result = {k: v for d in results for k, v in d.items()} return result def revert_all(self, priority=None): """ - Revert entrire device to it's default state + Revert entire device to its default state by releasing all writable points. + + Args: + priority: BACnet priority to use for the revert operation. + If None, uses each register's configured priority. """ - # TODO: Add multipoint write support + # TODO: Add multipoint write support for more efficient reversion + # Get all writable registers write_registers = self.get_registers_by_type("byte", False) + # Revert each point individually for register in write_registers: self.revert_point(register.point_name, priority=priority) def revert_point(self, point_name, priority=None): """ - Revert point to it's default state + Revert a specific point to its default state by writing NULL to its priority array slot. + + Args: + point_name: Name of the point to revert + priority: BACnet priority to use for the revert operation. + If None, uses the register's configured priority. """ + # Writing None (NULL) to a priority slot releases control at that priority self.set_point(point_name, None, priority=priority) def parse_config(self, configDict): + """ + Parse the registry configuration for BACnet points. + + Processes each row in the registry configuration to create Register objects + for each BACnet point, and identifies points that should use COV subscriptions. + + Args: + configDict: List of dictionaries, each representing a BACnet point configuration + + Raises: + DriverConfigError: If a point is configured with a priority lower than the minimum allowed + """ if configDict is None: return + # Store total register count for calculating batch read sizes self.register_count = len(configDict) for regDef in configDict: - # Skip lines that have no address yet. + # Skip lines that have no point name defined if not regDef.get("Volttron Point Name"): continue + # Extract point configuration from registry definition io_type = regDef.get("BACnet Object Type") read_only = regDef.get("Writable").lower() != "true" point_name = regDef.get("Volttron Point Name") - # checks if the point is flagged for change of value + # Check if the point is flagged for change of value subscriptions is_cov = regDef.get("COV Flag", "false").lower() == "true" + # BACnet object instance number index = int(regDef.get("Index")) + # Process array index if specified (for array properties) list_index = regDef.get("Array Index", "") list_index = list_index.strip() @@ -382,6 +574,7 @@ def parse_config(self, configDict): else: list_index = int(list_index) + # Process write priority if specified (for writable points) priority = regDef.get("Write Priority", "") priority = priority.strip() if not priority: @@ -389,6 +582,7 @@ def parse_config(self, configDict): else: priority = int(priority) + # Validate write priority against minimum allowed if priority < self.min_priority: message = "{point} configured with a priority {priority} which is lower than than minimum {min}." raise DriverConfigError( @@ -397,11 +591,13 @@ def parse_config(self, configDict): ) ) + # Extract additional metadata description = regDef.get("Notes", "") units = regDef.get("Units") property_name = regDef.get("Property") try: + # Create register object for this point register = Register( index, io_type, @@ -414,23 +610,37 @@ def parse_config(self, configDict): list_index=list_index, ) + # Add register to the interface's register map self.insert_register(register) except Exception as exc: # pylint: disable=broad-except _log.error(f"Error parsing register definition: {regDef=} {exc=}") + # If point is flagged for COV, add to the COV subscription list if is_cov: self.cov_points.append(point_name) def establish_cov_subscription(self, point_name, lifetime, renew=False): """ - Asks the BACnet proxy to establish a COV subscription for the point via RPC. - If lifetime is specified, the subscription will live for that period, else the - subscription will last indefinitely. Default period of 3 minutes. If renew is - True, the the core scheduler will call this method again near the expiration - of the subscription. + Establishes a COV (Change of Value) subscription for a BACnet point. + + COV subscriptions allow the BACnet device to push updates to VOLTTRON when + values change, rather than requiring constant polling. This can significantly + reduce network traffic and improve responsiveness to value changes. + + Args: + point_name: Name of the point to establish COV subscription for + lifetime: Duration (in seconds) that the subscription should remain active + If None, subscription lasts indefinitely + renew: If True, automatically reschedule subscription renewal before expiration + + Note: + The BACnet proxy agent must support COV subscriptions for this to work. + Not all BACnet devices support COV subscriptions for all point types. """ + # Get the register object for this point register = self.get_register_by_name(point_name) try: + # Request COV subscription through the BACnet proxy self.vip.rpc.call( self.proxy_address, "create_cov_subscription", @@ -445,9 +655,10 @@ def establish_cov_subscription(self, point_name, lifetime, renew=False): _log.warning( "Unable to establish a subscription via the bacnet proxy as it was unreachable." ) - # Schedule COV resubscribe + # Schedule COV resubscribe before the subscription expires if renew and (lifetime > COV_UPDATE_BUFFER): now = datetime.now() + # Schedule renewal a few seconds before expiration next_sub_update = now + timedelta(seconds=(lifetime - COV_UPDATE_BUFFER)) self.core.schedule( next_sub_update, From 02f4037774b9eead500a274a5a8d3e19014a0957 Mon Sep 17 00:00:00 2001 From: Andrew Rodgers Date: Mon, 14 Apr 2025 11:54:27 -0500 Subject: [PATCH 14/14] updating version --- services/core/PlatformDriverAgent/README.md | 2 ++ services/core/PlatformDriverAgent/platform_driver/agent.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/services/core/PlatformDriverAgent/README.md b/services/core/PlatformDriverAgent/README.md index 3750c2b540..8710877fbf 100644 --- a/services/core/PlatformDriverAgent/README.md +++ b/services/core/PlatformDriverAgent/README.md @@ -76,6 +76,8 @@ to the device. Heart beats are triggered by the Actuator Agent which must be run ## Changes +- 4.5.3 (2025-04-14) + - added ping retry and full device skipping after all points failure, improved code comments - 4.5.2 (2025-04-05) - Using saner defaults for per-batch timeout - 4.5.1 (2025-04-05) diff --git a/services/core/PlatformDriverAgent/platform_driver/agent.py b/services/core/PlatformDriverAgent/platform_driver/agent.py index 9b5bcf5828..003044d695 100644 --- a/services/core/PlatformDriverAgent/platform_driver/agent.py +++ b/services/core/PlatformDriverAgent/platform_driver/agent.py @@ -64,7 +64,7 @@ utils.setup_logging() _log = logging.getLogger(__name__) -__version__ = "4.5.2" +__version__ = "4.5.3" PROMETHEUS_METRICS_FILE = (