diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py index 4e97f4edb..d057d4dd5 100644 --- a/sonic-xcvrd/tests/test_xcvrd.py +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -6,6 +6,13 @@ from xcvrd.xcvrd_utilities import common from xcvrd.dom.dom_mgr import * from xcvrd.xcvrd import * +from xcvrd.cmis import CmisManagerTask +from xcvrd.xcvrd_utilities.common import ( + CMIS_STATE_UNKNOWN, CMIS_STATE_INSERTED, CMIS_STATE_DP_PRE_INIT_CHECK, + CMIS_STATE_DP_DEINIT, CMIS_STATE_AP_CONF, CMIS_STATE_DP_ACTIVATE, + CMIS_STATE_DP_INIT, CMIS_STATE_DP_TXON, CMIS_STATE_READY, + CMIS_STATE_REMOVED, CMIS_STATE_FAILED, is_syncd_warm_restore_complete +) from xcvrd.sff_mgr import * from xcvrd.xcvrd_utilities.xcvr_table_helper import * from xcvrd.dom.utilities.db.utils import DBUtils @@ -249,7 +256,7 @@ def test_SffManagerTask_task_run_with_exception(self): def test_CmisManagerTask_task_run_with_exception(self): port_mapping = PortMapping() stop_event = threading.Event() - cmis_manager = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + cmis_manager = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, MagicMock()) cmis_manager.wait_for_port_config_done = MagicMock(side_effect = NotImplementedError) exception_received = None trace = None @@ -263,12 +270,12 @@ def test_CmisManagerTask_task_run_with_exception(self): assert not cmis_manager.is_alive() assert(type(exception_received) == NotImplementedError) assert("NotImplementedError" in str(trace) and "effect" in str(trace)) - assert("sonic-xcvrd/xcvrd/xcvrd.py" in str(trace)) + assert("sonic-xcvrd/xcvrd/cmis/cmis_manager_task.py" in str(trace)) assert("wait_for_port_config_done" in str(trace)) port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) port_mapping.handle_port_change_event(port_change_event) - cmis_manager = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + cmis_manager = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, MagicMock()) cmis_manager.wait_for_port_config_done = MagicMock() #no-op cmis_manager.update_port_transceiver_status_table_sw_cmis_state = MagicMock(side_effect = NotImplementedError) exception_received = None @@ -283,11 +290,11 @@ def test_CmisManagerTask_task_run_with_exception(self): assert not cmis_manager.is_alive() assert(type(exception_received) == NotImplementedError) assert("NotImplementedError" in str(trace) and "effect" in str(trace)) - assert("sonic-xcvrd/xcvrd/xcvrd.py" in str(trace)) + assert("sonic-xcvrd/xcvrd/cmis/cmis_manager_task.py" in str(trace)) assert("update_port_transceiver_status_table_sw_cmis_state" in str(trace)) - @patch('xcvrd.xcvrd.PortChangeObserver', MagicMock(handle_port_update_event=MagicMock())) - @patch('xcvrd.xcvrd.CmisManagerTask.wait_for_port_config_done', MagicMock()) + @patch('xcvrd.cmis.cmis_manager_task.PortChangeObserver', MagicMock(handle_port_update_event=MagicMock())) + @patch('xcvrd.cmis.CmisManagerTask.wait_for_port_config_done', MagicMock()) @patch('xcvrd.xcvrd_utilities.common.is_fast_reboot_enabled', MagicMock(return_value=False)) @patch('xcvrd.xcvrd_utilities.common.get_cmis_application_desired', MagicMock(side_effect=KeyError)) @patch('xcvrd.xcvrd_utilities.common.log_exception_traceback') @@ -302,7 +309,7 @@ def test_CmisManagerTask_get_xcvr_api_exception(self, mock_platform_chassis, moc mock_platform_chassis.get_sfp = MagicMock(return_value=mock_sfp) port_mapping = PortMapping() stop_event = threading.Event() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, mock_platform_chassis) task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) task.get_cfg_port_tbl = MagicMock() task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) @@ -394,10 +401,10 @@ def test_SfpStateUpdateTask_task_run_with_exception(self): @patch('xcvrd.xcvrd.SfpStateUpdateTask.is_alive', MagicMock(return_value = False)) @patch('xcvrd.xcvrd.DomInfoUpdateTask.is_alive', MagicMock(return_value = False)) - @patch('xcvrd.xcvrd.CmisManagerTask.is_alive', MagicMock(return_value = False)) + @patch('xcvrd.cmis.CmisManagerTask.is_alive', MagicMock(return_value = False)) @patch('xcvrd.xcvrd.SffManagerTask.is_alive', MagicMock(return_value=False)) - @patch('xcvrd.xcvrd.CmisManagerTask.join', MagicMock(side_effect=NotImplementedError)) - @patch('xcvrd.xcvrd.CmisManagerTask.start', MagicMock()) + @patch('xcvrd.cmis.CmisManagerTask.join', MagicMock(side_effect=NotImplementedError)) + @patch('xcvrd.cmis.CmisManagerTask.start', MagicMock()) @patch('xcvrd.xcvrd.SffManagerTask.start', MagicMock()) @patch('xcvrd.xcvrd.DomInfoUpdateTask.start', MagicMock()) @patch('xcvrd.xcvrd.SfpStateUpdateTask.start', MagicMock()) @@ -466,6 +473,7 @@ def test_is_npu_si_settings_update_required(self): assert xcvr_table_helper.is_npu_si_settings_update_required("Ethernet0", port_mapping) assert not xcvr_table_helper.is_npu_si_settings_update_required("Ethernet0", port_mapping) + @patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd_utilities.port_event_helper.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd_utilities.common._wrapper_get_transceiver_firmware_info', MagicMock(return_value={'active_firmware': '2.1.1', 'inactive_firmware': '1.2.4'})) @@ -480,18 +488,18 @@ def test_post_port_sfp_firmware_info_to_db(self, mock_get_presence): mock_cmis_manager = MagicMock() dom_info_update = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping, mock_sfp_obj_dict, stop_event, mock_cmis_manager) firmware_info_tbl = Table("STATE_DB", TRANSCEIVER_FIRMWARE_INFO_TABLE) - + # Test 1: stop_event is set - should not update table stop_event.set() dom_info_update.post_port_sfp_firmware_info_to_db(logical_port_name, port_mapping, firmware_info_tbl, stop_event) assert firmware_info_tbl.get_size() == 0 - + # Test 2: transceiver not present - should not update table stop_event.clear() mock_get_presence.return_value = False dom_info_update.post_port_sfp_firmware_info_to_db(logical_port_name, port_mapping, firmware_info_tbl, stop_event) assert firmware_info_tbl.get_size() == 0 - + # Test 3: transceiver present - should update table for both logical ports mock_get_presence.return_value = True dom_info_update.post_port_sfp_firmware_info_to_db(logical_port_name, port_mapping, firmware_info_tbl, stop_event) @@ -2029,13 +2037,15 @@ def test_remove_stale_transceiver_info(self, mock_get_presence, mock_del_port_sf else: assert (port, mock_port_mapping_data, [mock_intf_tbl]) not in mock_del_port_sfp_dom_info_from_db.call_args_list + @patch('xcvrd.cmis.CmisManagerTask.join') + @patch('xcvrd.cmis.CmisManagerTask.start') @patch('xcvrd.xcvrd.DaemonXcvrd.init') @patch('xcvrd.xcvrd.DaemonXcvrd.deinit') @patch('xcvrd.xcvrd.DomInfoUpdateTask.start') @patch('xcvrd.xcvrd.SfpStateUpdateTask.start') @patch('xcvrd.xcvrd.DomInfoUpdateTask.join') @patch('xcvrd.xcvrd.SfpStateUpdateTask.join') - def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1, mock_task_run2, mock_deinit, mock_init): + def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1, mock_task_run2, mock_deinit, mock_init, mock_cmis_join, mock_cmis_start): mock_init.return_value = PortMapping() xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) xcvrd.load_feature_flags = MagicMock() @@ -2337,7 +2347,7 @@ def test_SffManagerTask_task_worker(self, mock_chassis): def test_CmisManagerTask_update_port_transceiver_status_table_sw_cmis_state(self): port_mapping = PortMapping() stop_event = threading.Event() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, MagicMock()) port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_SET) task.on_port_update_event(port_change_event) @@ -2355,7 +2365,7 @@ def test_CmisManagerTask_update_port_transceiver_status_table_sw_cmis_state(self def test_CmisManagerTask_handle_port_change_event(self): port_mapping = PortMapping() stop_event = threading.Event() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, MagicMock()) assert not task.isPortConfigDone port_change_event = PortChangeEvent('PortConfigDone', -1, 0, PortChangeEvent.PORT_SET) @@ -2395,7 +2405,7 @@ def test_CmisManagerTask_handle_port_change_event(self): def test_CmisManagerTask_get_configured_freq(self, mock_table_helper): port_mapping = PortMapping() stop_event = threading.Event() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, MagicMock()) cfg_port_tbl = MagicMock() cfg_port_tbl.hget = MagicMock(return_value=(True, 193100)) mock_table_helper.get_cfg_port_tbl = MagicMock(return_value=cfg_port_tbl) @@ -2407,7 +2417,7 @@ def test_CmisManagerTask_get_configured_freq(self, mock_table_helper): def test_CmisManagerTask_get_configured_tx_power_from_db(self, mock_table_helper): port_mapping = PortMapping() stop_event = threading.Event() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, MagicMock()) cfg_port_tbl = MagicMock() cfg_port_tbl.hget = MagicMock(return_value=(True, -10)) mock_table_helper.get_cfg_port_tbl = MagicMock(return_value=cfg_port_tbl) @@ -2417,7 +2427,7 @@ def test_CmisManagerTask_get_configured_tx_power_from_db(self, mock_table_helper @patch('xcvrd.xcvrd.platform_chassis') @patch('xcvrd.xcvrd_utilities.common.is_fast_reboot_enabled', MagicMock(return_value=(False))) - @patch('xcvrd.xcvrd.PortChangeObserver', MagicMock(handle_port_update_event=MagicMock())) + @patch('xcvrd.cmis.cmis_manager_task.PortChangeObserver', MagicMock(handle_port_update_event=MagicMock())) def test_CmisManagerTask_task_run_stop(self, mock_chassis): mock_object = MagicMock() mock_object.get_presence = MagicMock(return_value=True) @@ -2425,7 +2435,7 @@ def test_CmisManagerTask_task_run_stop(self, mock_chassis): port_mapping = PortMapping() stop_event = threading.Event() - cmis_manager = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + cmis_manager = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, MagicMock()) cmis_manager.wait_for_port_config_done = MagicMock() cmis_manager.start() cmis_manager.join() @@ -2443,7 +2453,7 @@ def get_application(lane): mock_xcvr_api.get_application = MagicMock(side_effect=get_application) port_mapping = PortMapping() stop_event = threading.Event() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, MagicMock()) assert task.is_decommission_required(mock_xcvr_api, app_new) == expected DEFAULT_DP_STATE = { @@ -2501,7 +2511,7 @@ def get_application(lane): port_mapping = PortMapping() stop_event = threading.Event() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, MagicMock()) assert task.is_cmis_application_update_required(mock_xcvr_api, app_new, host_lanes_mask) == expected @@ -2570,7 +2580,7 @@ def get_host_lane_assignment_option_side_effect(app): mock_xcvr_api.get_host_lane_assignment_option = MagicMock(side_effect=get_host_lane_assignment_option_side_effect) port_mapping = PortMapping() stop_event = threading.Event() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, MagicMock()) appl = common.get_cmis_application_desired(mock_xcvr_api, host_lane_count, speed) assert task.get_cmis_host_lanes_mask(mock_xcvr_api, appl, host_lane_count, subport) == expected @@ -2583,7 +2593,7 @@ def test_CmisManagerTask_post_port_active_apsel_to_db_error_cases(self, mock_fie port_mapping = PortMapping() stop_event = threading.Event() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, MagicMock()) lport = "Ethernet0" host_lanes_mask = 0xff @@ -2645,7 +2655,7 @@ def test_CmisManagerTask_post_port_active_apsel_to_db(self): port_mapping = PortMapping() stop_event = threading.Event() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, MagicMock()) task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) task.xcvr_table_helper.get_intf_tbl = MagicMock(return_value=int_tbl) @@ -2742,7 +2752,7 @@ def test_CmisManagerTask_post_port_active_apsel_to_db(self): def test_CmisManagerTask_test_is_timer_expired(self, expired_time, current_time, expected_result): port_mapping = PortMapping() stop_event = threading.Event() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, MagicMock()) # Call the is_timer_expired function result = task.is_timer_expired(expired_time, current_time) @@ -2753,10 +2763,10 @@ def test_CmisManagerTask_test_is_timer_expired(self, expired_time, current_time, @patch('xcvrd.xcvrd.XcvrTableHelper.get_status_sw_tbl') @patch('xcvrd.xcvrd.platform_chassis') @patch('xcvrd.xcvrd_utilities.common.is_fast_reboot_enabled', MagicMock(return_value=(False))) - @patch('xcvrd.xcvrd.PortChangeObserver', MagicMock(handle_port_update_event=MagicMock())) + @patch('xcvrd.cmis.cmis_manager_task.PortChangeObserver', MagicMock(handle_port_update_event=MagicMock())) @patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD')) - @patch('xcvrd.xcvrd.CmisManagerTask.wait_for_port_config_done', MagicMock()) - @patch('xcvrd.xcvrd.CmisManagerTask.is_decommission_required', MagicMock(return_value=False)) + @patch('xcvrd.cmis.CmisManagerTask.wait_for_port_config_done', MagicMock()) + @patch('xcvrd.cmis.CmisManagerTask.is_decommission_required', MagicMock(return_value=False)) @patch('xcvrd.xcvrd_utilities.common.is_cmis_api', MagicMock(return_value=True)) @patch('xcvrd.xcvrd_utilities.optics_si_parser.optics_si_present', MagicMock(return_value=(True))) @patch('xcvrd.xcvrd_utilities.optics_si_parser.fetch_optics_si_setting', MagicMock()) @@ -2939,7 +2949,7 @@ def test_CmisManagerTask_task_worker(self, mock_chassis, mock_get_status_sw_tbl) port_mapping = PortMapping() port_mapping.handle_port_change_event(PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD)) stop_event = threading.Event() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, mock_chassis) task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) task.xcvr_table_helper.get_status_sw_tbl.return_value = mock_get_status_sw_tbl task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) @@ -3007,7 +3017,7 @@ def test_CmisManagerTask_task_worker(self, mock_chassis, mock_get_status_sw_tbl) port_mapping = PortMapping() port_mapping.handle_port_change_event(PortChangeEvent('Ethernet1', 1, 0, PortChangeEvent.PORT_ADD)) stop_event = threading.Event() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, mock_chassis) task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) task.xcvr_table_helper.get_status_sw_tbl.return_value = mock_get_status_sw_tbl task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) @@ -3034,10 +3044,10 @@ def test_CmisManagerTask_task_worker(self, mock_chassis, mock_get_status_sw_tbl) @patch('xcvrd.xcvrd.XcvrTableHelper.get_status_sw_tbl') @patch('xcvrd.xcvrd.platform_chassis') @patch('xcvrd.xcvrd_utilities.common.is_fast_reboot_enabled', MagicMock(return_value=(True))) - @patch('xcvrd.xcvrd.PortChangeObserver', MagicMock(handle_port_update_event=MagicMock())) + @patch('xcvrd.cmis.cmis_manager_task.PortChangeObserver', MagicMock(handle_port_update_event=MagicMock())) @patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD')) - @patch('xcvrd.xcvrd.CmisManagerTask.wait_for_port_config_done', MagicMock()) - @patch('xcvrd.xcvrd.CmisManagerTask.is_decommission_required', MagicMock(return_value=False)) + @patch('xcvrd.cmis.CmisManagerTask.wait_for_port_config_done', MagicMock()) + @patch('xcvrd.cmis.CmisManagerTask.is_decommission_required', MagicMock(return_value=False)) @patch('xcvrd.xcvrd_utilities.common.is_cmis_api', MagicMock(return_value=True)) def test_CmisManagerTask_task_worker_fastboot(self, mock_chassis, mock_get_status_sw_tbl): mock_get_status_sw_tbl = Table("STATE_DB", TRANSCEIVER_STATUS_SW_TABLE) @@ -3138,7 +3148,7 @@ def test_CmisManagerTask_task_worker_fastboot(self, mock_chassis, mock_get_statu port_mapping = PortMapping() port_mapping.handle_port_change_event(PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD)) stop_event = threading.Event() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, mock_chassis) task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) task.xcvr_table_helper.get_status_sw_tbl.return_value = mock_get_status_sw_tbl task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) @@ -3173,10 +3183,10 @@ def test_CmisManagerTask_task_worker_fastboot(self, mock_chassis, mock_get_statu @patch('xcvrd.xcvrd.XcvrTableHelper.get_status_sw_tbl') @patch('xcvrd.xcvrd.platform_chassis') @patch('xcvrd.xcvrd_utilities.common.is_fast_reboot_enabled', MagicMock(return_value=(False))) - @patch('xcvrd.xcvrd.PortChangeObserver', MagicMock(handle_port_update_event=MagicMock())) + @patch('xcvrd.cmis.cmis_manager_task.PortChangeObserver', MagicMock(handle_port_update_event=MagicMock())) @patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD')) - @patch('xcvrd.xcvrd.CmisManagerTask.wait_for_port_config_done', MagicMock()) - @patch('xcvrd.xcvrd.CmisManagerTask.is_decommission_required', MagicMock(return_value=False)) + @patch('xcvrd.cmis.CmisManagerTask.wait_for_port_config_done', MagicMock()) + @patch('xcvrd.cmis.CmisManagerTask.is_decommission_required', MagicMock(return_value=False)) @patch('xcvrd.xcvrd_utilities.common.is_cmis_api', MagicMock(return_value=True)) def test_CmisManagerTask_task_worker_host_tx_ready_false_to_true(self, mock_chassis, mock_get_status_sw_tbl): mock_get_status_sw_tbl = Table("STATE_DB", TRANSCEIVER_STATUS_TABLE) @@ -3317,7 +3327,7 @@ def test_CmisManagerTask_task_worker_host_tx_ready_false_to_true(self, mock_chas port_mapping = PortMapping() port_mapping.handle_port_change_event(PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD)) stop_event = threading.Event() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, mock_chassis) task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) task.xcvr_table_helper.get_status_sw_tbl.return_value = mock_get_status_sw_tbl task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) @@ -3378,9 +3388,9 @@ def test_CmisManagerTask_task_worker_host_tx_ready_false_to_true(self, mock_chas @patch('xcvrd.xcvrd.XcvrTableHelper.get_status_sw_tbl') @patch('xcvrd.xcvrd.platform_chassis') @patch('xcvrd.xcvrd_utilities.common.is_fast_reboot_enabled', MagicMock(return_value=(False))) - @patch('xcvrd.xcvrd.PortChangeObserver', MagicMock(handle_port_update_event=MagicMock())) + @patch('xcvrd.cmis.cmis_manager_task.PortChangeObserver', MagicMock(handle_port_update_event=MagicMock())) @patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD')) - @patch('xcvrd.xcvrd.CmisManagerTask.wait_for_port_config_done', MagicMock()) + @patch('xcvrd.cmis.CmisManagerTask.wait_for_port_config_done', MagicMock()) @patch('xcvrd.xcvrd_utilities.common.is_cmis_api', MagicMock(return_value=True)) @patch('xcvrd.xcvrd_utilities.common.get_cmis_application_desired', MagicMock(return_value=1)) def test_CmisManagerTask_task_worker_decommission(self, mock_chassis, mock_get_status_sw_tbl): @@ -3417,7 +3427,7 @@ def test_CmisManagerTask_task_worker_decommission(self, mock_chassis, mock_get_s port_mapping = PortMapping() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, mock_chassis) task.is_decommission_required = MagicMock(side_effect=[True]*2 + [False]*10) task.xcvr_table_helper.get_status_sw_tbl.return_value = mock_get_status_sw_tbl task.get_host_tx_status = MagicMock(return_value='true') @@ -3756,6 +3766,7 @@ def test_DomInfoUpdateTask_task_worker_vdm_failure(self, mock_post_pm_info): task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) task.DOM_INFO_UPDATE_PERIOD_SECS = 0 task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) + task.port_mapping.logical_port_list = ['Ethernet0'] task.port_mapping.physical_to_logical = {'1': ['Ethernet0']} task.port_mapping.get_asic_id_for_logical_port = MagicMock(return_value=0) task.get_dom_polling_from_config_db = MagicMock(return_value='enabled') @@ -4688,7 +4699,7 @@ def test_CmisManagerTask_validate_frequency_and_grid(self, lport, freq, grid, ex mock_xcvr_api.get_supported_freq_config.return_value = (0x80, 0, 0, 191300, 196100) port_mapping = PortMapping() stop_event = threading.Event() - task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event) + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping, stop_event, MagicMock()) result = task.validate_frequency_and_grid(mock_xcvr_api, lport, freq, grid) assert result == expected @@ -4878,16 +4889,16 @@ class TestOpticSiParser(object): def test_match_optics_si_key_regex_error(self): """Test _match_optics_si_key with invalid regex pattern (lines 31-32, 34)""" from xcvrd.xcvrd_utilities.optics_si_parser import _match_optics_si_key - + # Test with invalid regex pattern that causes re.error dict_key = "[invalid regex" # Unclosed bracket causes regex error key = "VENDOR-1234" vendor_name_str = "VENDOR" - + # Should fall back to string comparison and return True for exact match result = _match_optics_si_key(dict_key, key, vendor_name_str) assert result == False # No exact string match - + # Test with exact string match after regex error result = _match_optics_si_key(dict_key, dict_key, vendor_name_str) assert result == True # Exact string match @@ -4895,20 +4906,20 @@ def test_match_optics_si_key_regex_error(self): def test_match_optics_si_key_fallback_string_match(self): """Test _match_optics_si_key fallback string comparison (line 37)""" from xcvrd.xcvrd_utilities.optics_si_parser import _match_optics_si_key - + # Test with invalid regex that falls back to string comparison dict_key = "[invalid" key = "VENDOR-1234" vendor_name_str = "VENDOR" - + # Test exact key match result = _match_optics_si_key(key, key, vendor_name_str) assert result == True - + # Test vendor name match result = _match_optics_si_key(vendor_name_str, key, vendor_name_str) assert result == True - + # Test split key match result = _match_optics_si_key("VENDOR", key, vendor_name_str) assert result == True @@ -4917,7 +4928,7 @@ def test_get_port_media_settings_speed_key_missing(self): """Test _get_port_media_settings when SPEED_KEY not in optics_si_dict (line 126)""" from xcvrd.xcvrd_utilities.optics_si_parser import _get_port_media_settings import xcvrd.xcvrd_utilities.optics_si_parser as parser - + original_dict = parser.g_optics_si_dict parser.g_optics_si_dict = { 'PORT_MEDIA_SETTINGS': { @@ -4926,7 +4937,7 @@ def test_get_port_media_settings_speed_key_missing(self): } } } - + try: result = _get_port_media_settings(5, 25, "VENDOR-1234", "VENDOR", {'default': 'value'}) assert result == {'default': 'value'} @@ -4936,38 +4947,38 @@ def test_get_port_media_settings_speed_key_missing(self): def test_get_module_vendor_key_api_none(self): """Test get_module_vendor_key when API is None (line 152)""" from xcvrd.xcvrd_utilities.optics_si_parser import get_module_vendor_key - + # Mock SFP with None API mock_sfp = MagicMock() mock_sfp.get_xcvr_api.return_value = None - + result = get_module_vendor_key(1, mock_sfp) assert result is None def test_get_module_vendor_key_vendor_name_none(self): """Test get_module_vendor_key when vendor name is None""" from xcvrd.xcvrd_utilities.optics_si_parser import get_module_vendor_key - + # Mock API with None vendor name mock_api = MagicMock() mock_api.get_manufacturer.return_value = None mock_sfp = MagicMock() mock_sfp.get_xcvr_api.return_value = mock_api - + result = get_module_vendor_key(1, mock_sfp) assert result is None def test_get_module_vendor_key_vendor_pn_none(self): """Test get_module_vendor_key when vendor part number is None""" from xcvrd.xcvrd_utilities.optics_si_parser import get_module_vendor_key - + # Mock API with None vendor part number mock_api = MagicMock() mock_api.get_manufacturer.return_value = "VENDOR" mock_api.get_model.return_value = None mock_sfp = MagicMock() mock_sfp.get_xcvr_api.return_value = mock_api - + result = get_module_vendor_key(1, mock_sfp) assert result is None @@ -5001,7 +5012,7 @@ def test_get_port_media_settings_no_values_with_empty_default(self): def test_load_optics_si_settings_no_file(self): """Test load_optics_si_settings when no file exists""" from xcvrd.xcvrd_utilities.optics_si_parser import load_optics_si_settings - + with patch('sonic_py_common.device_info.get_paths_to_platform_and_hwsku_dirs', return_value=('/nonexistent/platform', '/nonexistent/hwsku')): with patch('os.path.isfile', return_value=False): @@ -5012,10 +5023,10 @@ def test_optics_si_present_empty_dict(self): """Test optics_si_present when global dict is empty""" from xcvrd.xcvrd_utilities.optics_si_parser import optics_si_present import xcvrd.xcvrd_utilities.optics_si_parser as parser - + original_dict = parser.g_optics_si_dict parser.g_optics_si_dict = {} - + try: result = optics_si_present() assert result == False @@ -5026,10 +5037,10 @@ def test_optics_si_present_with_data(self): """Test optics_si_present when global dict has data""" from xcvrd.xcvrd_utilities.optics_si_parser import optics_si_present import xcvrd.xcvrd_utilities.optics_si_parser as parser - + original_dict = parser.g_optics_si_dict parser.g_optics_si_dict = {'some': 'data'} - + try: result = optics_si_present() assert result == True diff --git a/sonic-xcvrd/xcvrd/cmis/__init__.py b/sonic-xcvrd/xcvrd/cmis/__init__.py new file mode 100644 index 000000000..97607c0cd --- /dev/null +++ b/sonic-xcvrd/xcvrd/cmis/__init__.py @@ -0,0 +1,3 @@ +from .cmis_manager_task import CmisManagerTask + +__all__ = ['CmisManagerTask'] diff --git a/sonic-xcvrd/xcvrd/cmis/cmis_manager_task.py b/sonic-xcvrd/xcvrd/cmis/cmis_manager_task.py new file mode 100644 index 000000000..1d5e8e15e --- /dev/null +++ b/sonic-xcvrd/xcvrd/cmis/cmis_manager_task.py @@ -0,0 +1,1109 @@ +#!/usr/bin/env python3 + +""" + cmis_manager_task + CMIS transceiver management task for SONiC +""" + +try: + import threading + import time + import datetime + from swsscommon import swsscommon + from sonic_py_common import syslogger, daemon_base + from sonic_platform_base.sonic_xcvr.api.public.c_cmis import CmisApi + + from ..xcvrd_utilities import common + from ..xcvrd_utilities.common import ( + CMIS_STATE_UNKNOWN, CMIS_STATE_INSERTED, CMIS_STATE_DP_PRE_INIT_CHECK, + CMIS_STATE_DP_DEINIT, CMIS_STATE_AP_CONF, CMIS_STATE_DP_ACTIVATE, + CMIS_STATE_DP_INIT, CMIS_STATE_DP_TXON, CMIS_STATE_READY, + CMIS_STATE_REMOVED, CMIS_STATE_FAILED, CMIS_TERMINAL_STATES + ) + from ..xcvrd_utilities.xcvr_table_helper import XcvrTableHelper + from ..xcvrd_utilities import port_event_helper + from ..xcvrd_utilities.port_event_helper import PortChangeObserver + from ..xcvrd_utilities import media_settings_parser + from ..xcvrd_utilities import optics_si_parser + from ..xcvrd_utilities import sfp_status_helper + +except ImportError as e: + raise ImportError(str(e) + " - required module not found") + +# Constants +SYSLOG_IDENTIFIER_CMIS = "CmisManagerTask" + +# Global logger instance +helper_logger = syslogger.SysLogger(SYSLOG_IDENTIFIER_CMIS, enable_runtime_config=True) + +# Thread wrapper class for CMIS transceiver management + +class CmisManagerTask(threading.Thread): + + CMIS_MAX_RETRIES = 3 + CMIS_DEF_EXPIRED = 60 # seconds, default expiration time + CMIS_MODULE_TYPES = ['QSFP-DD', 'QSFP_DD', 'OSFP', 'OSFP-8X', 'QSFP+C'] + CMIS_MAX_HOST_LANES = 8 + CMIS_EXPIRATION_BUFFER_MS = 2 + ALL_LANES_MASK = 0xff + + def __init__(self, namespaces, port_mapping, main_thread_stop_event, platform_chassis, skip_cmis_mgr=False): + threading.Thread.__init__(self) + self.name = "CmisManagerTask" + self.exc = None + self.task_stopping_event = threading.Event() + self.main_thread_stop_event = main_thread_stop_event + self.port_dict = {k: {"asic_id": v} for k, v in port_mapping.logical_to_asic.items()} + self.decomm_pending_dict = {} + self.isPortInitDone = False + self.isPortConfigDone = False + self.skip_cmis_mgr = skip_cmis_mgr + self.namespaces = namespaces + self.platform_chassis = platform_chassis + self.xcvr_table_helper = XcvrTableHelper(self.namespaces) + + def log_debug(self, message): + helper_logger.log_debug("CMIS: {}".format(message)) + + def log_notice(self, message): + helper_logger.log_notice("CMIS: {}".format(message)) + + def log_error(self, message): + helper_logger.log_error("CMIS: {}".format(message)) + + def get_asic_id(self, lport): + return self.port_dict.get(lport, {}).get("asic_id", -1) + + def update_port_transceiver_status_table_sw_cmis_state(self, lport, cmis_state_to_set): + status_table = self.xcvr_table_helper.get_status_sw_tbl(self.get_asic_id(lport)) + if status_table is None: + helper_logger.log_error("status_table is None while updating " + "sw CMIS state for lport {}".format(lport)) + return + + fvs = swsscommon.FieldValuePairs([('cmis_state', cmis_state_to_set)]) + status_table.set(lport, fvs) + + def on_port_update_event(self, port_change_event): + if port_change_event.event_type not in [port_change_event.PORT_SET, port_change_event.PORT_DEL]: + return + + lport = port_change_event.port_name + pport = port_change_event.port_index + + if lport in ['PortInitDone']: + self.isPortInitDone = True + return + + if lport in ['PortConfigDone']: + self.isPortConfigDone = True + return + + # Skip if it's not a physical port + if not lport.startswith('Ethernet'): + return + + # Skip if the physical index is not available + if pport is None: + return + + if port_change_event.port_dict is None: + return + + if port_change_event.event_type == port_change_event.PORT_SET: + if lport not in self.port_dict: + self.port_dict[lport] = {"asic_id": port_change_event.asic_id, + "forced_tx_disabled": False} + if pport >= 0: + self.port_dict[lport]['index'] = pport + if 'speed' in port_change_event.port_dict and port_change_event.port_dict['speed'] != 'N/A': + self.port_dict[lport]['speed'] = port_change_event.port_dict['speed'] + if 'lanes' in port_change_event.port_dict: + self.port_dict[lport]['lanes'] = port_change_event.port_dict['lanes'] + if 'host_tx_ready' in port_change_event.port_dict: + self.port_dict[lport]['host_tx_ready'] = port_change_event.port_dict['host_tx_ready'] + if 'admin_status' in port_change_event.port_dict: + self.port_dict[lport]['admin_status'] = port_change_event.port_dict['admin_status'] + if 'laser_freq' in port_change_event.port_dict: + self.port_dict[lport]['laser_freq'] = int(port_change_event.port_dict['laser_freq']) + if 'tx_power' in port_change_event.port_dict: + self.port_dict[lport]['tx_power'] = float(port_change_event.port_dict['tx_power']) + if 'subport' in port_change_event.port_dict: + self.port_dict[lport]['subport'] = int(port_change_event.port_dict['subport']) + + self.force_cmis_reinit(lport, 0) + + elif port_change_event.event_type == port_change_event.PORT_DEL: + # In handling the DEL event, the following two scenarios must be considered: + # 1. PORT_DEL event due to transceiver plug-out + # 2. PORT_DEL event due to Dynamic Port Breakout (DPB) + # + # Scenario 1 is simple, as only a STATE_DB|TRANSCEIVER_INFO PORT_DEL event occurs, + # so we just need to set SW_CMIS_STATE to CMIS_STATE_REMOVED. + # + # Scenario 2 is a bit more complex. First, for the port(s) before DPB, a CONFIG_DB|PORT PORT_DEL + # and a STATE_DB|PORT_TABLE PORT_DEL event occur. Next, for the port(s) after DPB, + # a CONFIG_DB|PORT PORT_SET and a STATE_DB|PORT_TABLE PORT_SET event occur. + # After that (after a short delay), a STATE_DB|TRANSCEIVER_INFO PORT_DEL event + # occurs for the port(s) before DPB, and finally, a STATE_DB|TRANSCEIVER_INFO + # PORT_SET event occurs for the port(s) after DPB. + # + # Below is the event sequence when configuring Ethernet0 from "2x200G" to "1x400G" + # (based on actual logs). + # + # 1. SET Ethernet0 CONFIG_DB|PORT + # 2. DEL Ethernet2 CONFIG_DB|PORT + # 3. DEL Ethernet0 CONFIG_DB|PORT + # 4. DEL Ethernet0 STATE_DB|PORT_TABLE + # 5. DEL Ethernet2 STATE_DB|PORT_TABLE + # 6. SET Ethernet0 CONFIG_DB|PORT + # 7. SET Ethernet0 STATE_DB|PORT_TABLE + # 8. SET Ethernet0 STATE_DB|PORT_TABLE + # 9. DEL Ethernet2 STATE_DB|TRANSCEIVER_INFO + # 10. DEL Ethernet0 STATE_DB|TRANSCEIVER_INFO + # 11. SET Ethernet0 STATE_DB|TRANSCEIVER_INFO + # + # To handle both scenarios, if the lport exists in port_dict for any DEL EVENT, + # set SW_CMIS_STATE to REMOVED. Additionally, for DEL EVENTS from CONFIG_DB due to DPB, + # remove the lport from port_dict. + if lport in self.port_dict: + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_REMOVED) + + if port_change_event.db_name == 'CONFIG_DB' and port_change_event.table_name == 'PORT': + self.clear_decomm_pending(lport) + self.port_dict.pop(lport) + + def get_cmis_dp_init_duration_secs(self, api): + return api.get_datapath_init_duration()/1000 + + def get_cmis_dp_deinit_duration_secs(self, api): + return api.get_datapath_deinit_duration()/1000 + + def get_cmis_dp_tx_turnoff_duration_secs(self, api): + return api.get_datapath_tx_turnoff_duration()/1000 + + def get_cmis_module_power_up_duration_secs(self, api): + return api.get_module_pwr_up_duration()/1000 + + def get_cmis_module_power_down_duration_secs(self, api): + return api.get_module_pwr_down_duration()/1000 + + def get_cmis_host_lanes_mask(self, api, appl, host_lane_count, subport): + """ + Retrieves mask of active host lanes based on appl, host lane count and subport + + Args: + api: + XcvrApi object + appl: + Integer, the transceiver-specific application code + host_lane_count: + Integer, number of lanes on the host side + subport: + Integer, 1-based logical port number of the physical port after breakout + 0 means port is a non-breakout port + + Returns: + Integer, a mask of the active lanes on the host side + e.g. 0x3 for lane 0 and lane 1. + """ + host_lanes_mask = 0 + + if appl is None or host_lane_count <= 0 or subport < 0: + self.log_error("Invalid input to get host lane mask - appl {} host_lane_count {} " + "subport {}!".format(appl, host_lane_count, subport)) + return host_lanes_mask + + host_lane_assignment_option = api.get_host_lane_assignment_option(appl) + host_lane_start_bit = (host_lane_count * (0 if subport == 0 else subport - 1)) + if host_lane_assignment_option & (1 << host_lane_start_bit): + host_lanes_mask = ((1 << host_lane_count) - 1) << host_lane_start_bit + else: + self.log_error("Unable to find starting host lane - host_lane_assignment_option {}" + " host_lane_start_bit {} host_lane_count {} subport {} appl {}!".format( + host_lane_assignment_option, host_lane_start_bit, host_lane_count, + subport, appl)) + + return host_lanes_mask + + def get_cmis_media_lanes_mask(self, api, appl, lport, subport): + """ + Retrieves mask of active media lanes based on appl, lport and subport + + Args: + api: + XcvrApi object + appl: + Integer, the transceiver-specific application code + lport: + String, logical port name + subport: + Integer, 1-based logical port number of the physical port after breakout + 0 means port is a non-breakout port + + Returns: + Integer, a mask of the active lanes on the media side + e.g. 0xf for lane 0, lane 1, lane 2 and lane 3. + """ + media_lanes_mask = 0 + media_lane_count = self.port_dict[lport]['media_lane_count'] + media_lane_assignment_option = self.port_dict[lport]['media_lane_assignment_options'] + + if appl < 1 or media_lane_count <= 0 or subport < 0: + self.log_error("Invalid input to get media lane mask - appl {} media_lane_count {} " + "lport {} subport {}!".format(appl, media_lane_count, lport, subport)) + return media_lanes_mask + + media_lane_start_bit = (media_lane_count * (0 if subport == 0 else subport - 1)) + if media_lane_assignment_option & (1 << media_lane_start_bit): + media_lanes_mask = ((1 << media_lane_count) - 1) << media_lane_start_bit + else: + self.log_error("Unable to find starting media lane - media_lane_assignment_option {}" + " media_lane_start_bit {} media_lane_count {} lport {} subport {} appl {}!".format( + media_lane_assignment_option, media_lane_start_bit, media_lane_count, + lport, subport, appl)) + + return media_lanes_mask + + def clear_decomm_pending(self, lport): + """ + Clear the decommission pending status for the entire physical port this logical port belongs to. + + Args: + lport: + String, logical port name + """ + self.decomm_pending_dict.pop(self.port_dict.get(lport, {}).get('index'), None) + + def set_decomm_pending(self, lport): + """ + Set the decommission pending status. + + Args: + lport: + String, logical port name + """ + physical_port_idx = self.port_dict[lport]['index'] + if physical_port_idx in self.decomm_pending_dict: + # only one logical port can be the lead logical port doing the + # decommission state machine. + return + self.decomm_pending_dict[physical_port_idx] = lport + self.log_notice("{}: DECOMMISSION: setting decomm_pending for physical port " + "{}".format(lport, physical_port_idx)) + + def is_decomm_lead_lport(self, lport): + """ + Check if this is the lead logical port doing the decommission state machine. + + Args: + lport: + String, logical port name + Returns: + Boolean, True if decommission pending, False otherwise + """ + return self.decomm_pending_dict.get(self.port_dict[lport]['index']) == lport + + def is_decomm_pending(self, lport): + """ + Get the decommission pending status for the physical port the given logical port belongs to. + + Args: + lport: + String, logical port name + Returns: + Boolean, True if decommission pending, False otherwise + """ + return self.port_dict[lport]['index'] in self.decomm_pending_dict + + def is_decomm_failed(self, lport): + """ + Get the decommission failed status for the physical port the given logical port belongs to. + + Args: + lport: + String, logical port name + Returns: + Boolean, True if decommission failed, False otherwise + """ + physical_port_idx = self.port_dict[lport]['index'] + lead_logical_port = self.decomm_pending_dict.get(physical_port_idx) + if lead_logical_port is None: + return False + return ( + common.get_cmis_state_from_state_db( + lead_logical_port, + self.xcvr_table_helper.get_status_sw_tbl( + self.get_asic_id(lead_logical_port) + ) + ) + == CMIS_STATE_FAILED + ) + + def is_decommission_required(self, api, app_new): + """ + Check if the CMIS decommission (i.e. reset appl code to 0 for all lanes + of the entire physical port) is required + + Args: + api: + XcvrApi object + app_new: + Integer, the new desired appl code + Returns: + True, if decommission is required + False, if decommission is not required + """ + for lane in range(self.CMIS_MAX_HOST_LANES): + app_cur = api.get_application(lane) + if app_cur != 0 and app_cur != app_new: + return True + return False + + def is_cmis_application_update_required(self, api, app_new, host_lanes_mask): + """ + Check if the CMIS application update is required + + Args: + api: + XcvrApi object + app_new: + Integer, the transceiver-specific application code for the new application + host_lanes_mask: + Integer, a bitmask of the lanes on the host side + e.g. 0x5 for lane 0 and lane 2. + + Returns: + Boolean, true if application update is required otherwise false + """ + if api.is_flat_memory() or app_new <= 0 or host_lanes_mask <= 0: + self.log_error("Invalid input while checking CMIS update required - is_flat_memory {}" + "app_new {} host_lanes_mask {}!".format( + api.is_flat_memory(), app_new, host_lanes_mask)) + return False + + app_old = 0 + for lane in range(self.CMIS_MAX_HOST_LANES): + if ((1 << lane) & host_lanes_mask) == 0: + continue + if app_old == 0: + app_old = api.get_application(lane) + elif app_old != api.get_application(lane): + self.log_notice("Not all the lanes are in the same application mode " + "app_old {} current app {} lane {} host_lanes_mask {}".format( + app_old, api.get_application(lane), lane, host_lanes_mask)) + self.log_notice("Forcing application update...") + return True + + if app_old == app_new: + skip = True + dp_state = api.get_datapath_state() + conf_state = api.get_config_datapath_hostlane_status() + for lane in range(self.CMIS_MAX_HOST_LANES): + if ((1 << lane) & host_lanes_mask) == 0: + continue + name = "DP{}State".format(lane + 1) + if dp_state[name] != 'DataPathActivated': + skip = False + break + name = "ConfigStatusLane{}".format(lane + 1) + if conf_state[name] != 'ConfigSuccess': + skip = False + break + return (not skip) + return True + + def force_cmis_reinit(self, lport, retries=0): + """ + Try to force the restart of CMIS state machine + """ + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_INSERTED) + self.port_dict[lport]['cmis_retries'] = retries + self.port_dict[lport]['cmis_expired'] = None # No expiration + + def check_module_state(self, api, states): + """ + Check if the CMIS module is in the specified state + + Args: + api: + XcvrApi object + states: + List, a string list of states + + Returns: + Boolean, true if it's in the specified state, otherwise false + """ + return api.get_module_state() in states + + def check_config_error(self, api, host_lanes_mask, states): + """ + Check if the CMIS configuration states are in the specified state + + Args: + api: + XcvrApi object + host_lanes_mask: + Integer, a bitmask of the lanes on the host side + e.g. 0x5 for lane 0 and lane 2. + states: + List, a string list of states + + Returns: + Boolean, true if all lanes are in the specified state, otherwise false + """ + done = True + cerr = api.get_config_datapath_hostlane_status() + for lane in range(self.CMIS_MAX_HOST_LANES): + if ((1 << lane) & host_lanes_mask) == 0: + continue + key = "ConfigStatusLane{}".format(lane + 1) + if cerr[key] not in states: + done = False + break + + return done + + def check_datapath_init_pending(self, api, host_lanes_mask): + """ + Check if the CMIS datapath init is pending + + Args: + api: + XcvrApi object + host_lanes_mask: + Integer, a bitmask of the lanes on the host side + e.g. 0x5 for lane 0 and lane 2. + + Returns: + Boolean, true if all lanes are pending datapath init, otherwise false + """ + pending = True + dpinit_pending_dict = api.get_dpinit_pending() + for lane in range(self.CMIS_MAX_HOST_LANES): + if ((1 << lane) & host_lanes_mask) == 0: + continue + key = "DPInitPending{}".format(lane + 1) + if not dpinit_pending_dict[key]: + pending = False + break + + return pending + + def check_datapath_state(self, api, host_lanes_mask, states): + """ + Check if the CMIS datapath states are in the specified state + + Args: + api: + XcvrApi object + host_lanes_mask: + Integer, a bitmask of the lanes on the host side + e.g. 0x5 for lane 0 and lane 2. + states: + List, a string list of states + + Returns: + Boolean, true if all lanes are in the specified state, otherwise false + """ + done = True + dpstate = api.get_datapath_state() + for lane in range(self.CMIS_MAX_HOST_LANES): + if ((1 << lane) & host_lanes_mask) == 0: + continue + key = "DP{}State".format(lane + 1) + if dpstate[key] not in states: + done = False + break + + return done + + def get_configured_laser_freq_from_db(self, lport): + """ + Return the Tx power configured by user in CONFIG_DB's PORT table + """ + port_tbl = self.xcvr_table_helper.get_cfg_port_tbl(self.get_asic_id(lport)) + + found, laser_freq = port_tbl.hget(lport, 'laser_freq') + return int(laser_freq) if found else 0 + + def get_configured_tx_power_from_db(self, lport): + """ + Return the Tx power configured by user in CONFIG_DB's PORT table + """ + port_tbl = self.xcvr_table_helper.get_cfg_port_tbl(self.get_asic_id(lport)) + + found, power = port_tbl.hget(lport, 'tx_power') + return float(power) if found else 0 + + def get_host_tx_status(self, lport): + state_port_tbl = self.xcvr_table_helper.get_state_port_tbl(self.get_asic_id(lport)) + + found, host_tx_ready = state_port_tbl.hget(lport, 'host_tx_ready') + return host_tx_ready if found else 'false' + + def get_port_admin_status(self, lport): + cfg_port_tbl = self.xcvr_table_helper.get_cfg_port_tbl(self.get_asic_id(lport)) + + found, admin_status = cfg_port_tbl.hget(lport, 'admin_status') + return admin_status if found else 'down' + + def configure_tx_output_power(self, api, lport, tx_power): + min_p, max_p = api.get_supported_power_config() + if tx_power < min_p: + self.log_error("{} configured tx power {} < minimum power {} supported".format(lport, tx_power, min_p)) + if tx_power > max_p: + self.log_error("{} configured tx power {} > maximum power {} supported".format(lport, tx_power, max_p)) + return api.set_tx_power(tx_power) + + def validate_frequency_and_grid(self, api, lport, freq, grid=75): + supported_grid, _, _, lowf, highf = api.get_supported_freq_config() + if freq < lowf: + self.log_error("{} configured freq:{} GHz is lower than the supported freq:{} GHz".format(lport, freq, lowf)) + return False + if freq > highf: + self.log_error("{} configured freq:{} GHz is higher than the supported freq:{} GHz".format(lport, freq, highf)) + return False + if grid == 75: + if (supported_grid >> 7) & 0x1 != 1: + self.log_error("{} configured freq:{}GHz supported grid:{} 75GHz is not supported".format(lport, freq, supported_grid)) + return False + chan = int(round((freq - 193100)/25)) + if chan % 3 != 0: + self.log_error("{} configured freq:{}GHz is NOT in 75GHz grid".format(lport, freq)) + return False + elif grid == 100: + if (supported_grid >> 5) & 0x1 != 1: + self.log_error("{} configured freq:{}GHz 100GHz is not supported".format(lport, freq)) + return False + else: + self.log_error("{} configured freq:{}GHz {}GHz is not supported".format(lport, freq, grid)) + return False + return True + + def configure_laser_frequency(self, api, lport, freq, grid=75): + if api.get_tuning_in_progress(): + self.log_error("{} Tuning in progress, subport selection may fail!".format(lport)) + return api.set_laser_freq(freq, grid) + + def post_port_active_apsel_to_db(self, api, lport, host_lanes_mask, reset_apsel=False): + if reset_apsel == False: + try: + act_apsel = api.get_active_apsel_hostlane() + appl_advt = api.get_application_advertisement() + except NotImplementedError: + helper_logger.log_error("Required feature is not implemented") + return + + tuple_list = [] + for lane in range(self.CMIS_MAX_HOST_LANES): + if ((1 << lane) & host_lanes_mask) == 0: + tuple_list.append(('active_apsel_hostlane{}'.format(lane + 1), 'N/A')) + continue + if reset_apsel == False: + act_apsel_lane = act_apsel.get('ActiveAppSelLane{}'.format(lane + 1), 'N/A') + tuple_list.append(('active_apsel_hostlane{}'.format(lane + 1), + str(act_apsel_lane))) + else: + tuple_list.append(('active_apsel_hostlane{}'.format(lane + 1), 'N/A')) + + # also update host_lane_count and media_lane_count + if len(tuple_list) > 0: + if reset_apsel == False: + appl_advt_act = appl_advt.get(act_apsel_lane) + host_lane_count = appl_advt_act.get('host_lane_count', 'N/A') if appl_advt_act else 'N/A' + tuple_list.append(('host_lane_count', str(host_lane_count))) + media_lane_count = appl_advt_act.get('media_lane_count', 'N/A') if appl_advt_act else 'N/A' + tuple_list.append(('media_lane_count', str(media_lane_count))) + else: + tuple_list.append(('host_lane_count', 'N/A')) + tuple_list.append(('media_lane_count', 'N/A')) + + intf_tbl = self.xcvr_table_helper.get_intf_tbl(self.get_asic_id(lport)) + if not intf_tbl: + helper_logger.log_warning("Active ApSel db update: TRANSCEIVER_INFO table not found for {}".format(lport)) + return + found, _ = intf_tbl.get(lport) + if not found: + helper_logger.log_warning("Active ApSel db update: {} not found in INTF_TABLE".format(lport)) + return + fvs = swsscommon.FieldValuePairs(tuple_list) + intf_tbl.set(lport, fvs) + self.log_notice("{}: updated TRANSCEIVER_INFO_TABLE {}".format(lport, tuple_list)) + + def wait_for_port_config_done(self, namespace): + # Connect to APPL_DB and subscribe to PORT table notifications + appl_db = daemon_base.db_connect("APPL_DB", namespace=namespace) + + sel = swsscommon.Select() + port_tbl = swsscommon.SubscriberStateTable(appl_db, swsscommon.APP_PORT_TABLE_NAME) + sel.addSelectable(port_tbl) + + # Make sure this daemon started after all port configured + while not self.task_stopping_event.is_set(): + (state, c) = sel.select(port_event_helper.SELECT_TIMEOUT_MSECS) + if state == swsscommon.Select.TIMEOUT: + continue + if state != swsscommon.Select.OBJECT: + self.log_warning("sel.select() did not return swsscommon.Select.OBJECT") + continue + + (key, op, fvp) = port_tbl.pop() + if key in ["PortConfigDone", "PortInitDone"]: + break + + def update_cmis_state_expiration_time(self, lport, duration_seconds): + """ + Set the CMIS expiration time for the given logical port + in the port dictionary. + Args: + lport: Logical port name + duration_seconds: Duration in seconds for the expiration + """ + self.port_dict[lport]['cmis_expired'] = datetime.datetime.now() + \ + datetime.timedelta(seconds=duration_seconds) + \ + datetime.timedelta(milliseconds=self.CMIS_EXPIRATION_BUFFER_MS) + + def is_timer_expired(self, expired_time, current_time=None): + """ + Check if the given expiration time has passed. + + Args: + expired_time (datetime): The expiration time to check. + current_time (datetime, optional): The current time. Defaults to now. + + Returns: + bool: True if expired_time is not None and has passed, False otherwise. + """ + if expired_time is None: + return False + + if current_time is None: + current_time = datetime.datetime.now() + + return expired_time <= current_time + + def task_worker(self): + is_fast_reboot = common.is_fast_reboot_enabled() + + # APPL_DB for CONFIG updates, and STATE_DB for insertion/removal + port_change_observer = PortChangeObserver(self.namespaces, helper_logger, + self.task_stopping_event, + self.on_port_update_event) + + while not self.task_stopping_event.is_set(): + # Handle port change event from main thread + port_change_observer.handle_port_update_event() + + for lport, info in self.port_dict.items(): + if self.task_stopping_event.is_set(): + break + + if lport not in self.port_dict: + continue + + state = common.get_cmis_state_from_state_db(lport, self.xcvr_table_helper.get_status_sw_tbl(self.get_asic_id(lport))) + if state in CMIS_TERMINAL_STATES or state == CMIS_STATE_UNKNOWN: + if state != CMIS_STATE_READY: + self.port_dict[lport]['appl'] = 0 + self.port_dict[lport]['host_lanes_mask'] = 0 + continue + + # Handle the case when Xcvrd was NOT running when 'host_tx_ready' or 'admin_status' + # was updated or this is the first run so reconcile the above two attributes + if 'host_tx_ready' not in self.port_dict[lport]: + self.port_dict[lport]['host_tx_ready'] = self.get_host_tx_status(lport) + + if 'admin_status' not in self.port_dict[lport]: + self.port_dict[lport]['admin_status'] = self.get_port_admin_status(lport) + + pport = int(info.get('index', "-1")) + speed = int(info.get('speed', "0")) + lanes = info.get('lanes', "").strip() + subport = info.get('subport', 0) + if pport < 0 or speed == 0 or len(lanes) < 1 or subport < 0: + continue + + # Desired port speed on the host side + host_speed = speed + host_lane_count = len(lanes.split(',')) + + # double-check the HW presence before moving forward + sfp = self.platform_chassis.get_sfp(pport) + if not sfp.get_presence(): + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_REMOVED) + continue + + try: + # Skip if XcvrApi is not supported + api = sfp.get_xcvr_api() + if api is None: + self.log_error("{}: skipping CMIS state machine since no xcvr api!!!".format(lport)) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) + continue + + # Skip if it's not a paged memory device + if api.is_flat_memory(): + self.log_notice("{}: skipping CMIS state machine for flat memory xcvr".format(lport)) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) + continue + + # Skip if it's not a CMIS module + type = api.get_module_type_abbreviation() + if (type is None) or (type not in self.CMIS_MODULE_TYPES): + self.log_notice("{}: skipping CMIS state machine for non-CMIS module with type {}".format(lport, type)) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) + continue + + if api.is_coherent_module(): + if 'tx_power' not in self.port_dict[lport]: + self.port_dict[lport]['tx_power'] = self.get_configured_tx_power_from_db(lport) + if 'laser_freq' not in self.port_dict[lport]: + self.port_dict[lport]['laser_freq'] = self.get_configured_laser_freq_from_db(lport) + except AttributeError: + # Skip if these essential routines are not available + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) + continue + except Exception as e: + self.log_error("{}: Exception in xcvr api: {}".format(lport, e)) + common.log_exception_traceback() + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) + continue + + # CMIS expiration and retries + # + # A retry should always start over at INSETRTED state, while the + # expiration will reset the state to INSETRTED and advance the + # retry counter + expired = self.port_dict[lport].get('cmis_expired') + retries = self.port_dict[lport].get('cmis_retries', 0) + host_lanes_mask = self.port_dict[lport].get('host_lanes_mask', 0) + appl = self.port_dict[lport].get('appl', 0) + # appl can be 0 if this lport is in decommission state machine, which should not be considered as failed case. + if state != CMIS_STATE_INSERTED and not self.is_decomm_lead_lport(lport) and (host_lanes_mask <= 0 or appl < 1): + self.log_error("{}: Unexpected value for host_lanes_mask {} or appl {} in " + "{} state".format(lport, host_lanes_mask, appl, state)) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) + continue + + self.log_notice("{}: {}G, lanemask=0x{:x}, CMIS state={}{}, Module state={}, DP state={}, appl {} host_lane_count {} " + "retries={}".format(lport, int(speed/1000), host_lanes_mask, state, + "(decommission" + ("*" if self.is_decomm_lead_lport(lport) else "") + ")" + if self.is_decomm_pending(lport) else "", + api.get_module_state(), api.get_datapath_state(), appl, host_lane_count, retries)) + if retries > self.CMIS_MAX_RETRIES: + self.log_error("{}: FAILED".format(lport)) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) + continue + + try: + # CMIS state transitions + if state == CMIS_STATE_INSERTED: + self.port_dict[lport]['appl'] = common.get_cmis_application_desired(api, host_lane_count, host_speed) + if self.port_dict[lport]['appl'] is None: + self.log_error("{}: no suitable app for the port appl {} host_lane_count {} " + "host_speed {}".format(lport, appl, host_lane_count, host_speed)) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) + continue + appl = self.port_dict[lport]['appl'] + self.log_notice("{}: Setting appl={}".format(lport, appl)) + + self.port_dict[lport]['host_lanes_mask'] = self.get_cmis_host_lanes_mask(api, + appl, host_lane_count, subport) + if self.port_dict[lport]['host_lanes_mask'] <= 0: + self.log_error("{}: Invalid lane mask received - host_lane_count {} subport {} " + "appl {}!".format(lport, host_lane_count, subport, appl)) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) + continue + host_lanes_mask = self.port_dict[lport]['host_lanes_mask'] + self.log_notice("{}: Setting host_lanemask=0x{:x}".format(lport, host_lanes_mask)) + + self.port_dict[lport]['media_lane_count'] = int(api.get_media_lane_count(appl)) + self.port_dict[lport]['media_lane_assignment_options'] = int(api.get_media_lane_assignment_option(appl)) + media_lane_count = self.port_dict[lport]['media_lane_count'] + media_lane_assignment_options = self.port_dict[lport]['media_lane_assignment_options'] + self.port_dict[lport]['media_lanes_mask'] = self.get_cmis_media_lanes_mask(api, + appl, lport, subport) + if self.port_dict[lport]['media_lanes_mask'] <= 0: + self.log_error("{}: Invalid media lane mask received - media_lane_count {} " + "media_lane_assignment_options {} subport {}" + " appl {}!".format(lport, media_lane_count, media_lane_assignment_options, subport, appl)) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) + continue + media_lanes_mask = self.port_dict[lport]['media_lanes_mask'] + self.log_notice("{}: Setting media_lanemask=0x{:x}".format(lport, media_lanes_mask)) + + if self.is_decommission_required(api, appl): + self.set_decomm_pending(lport) + + if self.is_decomm_lead_lport(lport): + # Set all the DP lanes AppSel to unused(0) when non default app code needs to be configured + self.port_dict[lport]['appl'] = appl = 0 + self.port_dict[lport]['host_lanes_mask'] = host_lanes_mask = self.ALL_LANES_MASK + self.port_dict[lport]['media_lanes_mask'] = self.ALL_LANES_MASK + self.log_notice("{}: DECOMMISSION: setting appl={} and " + "host_lanes_mask/media_lanes_mask={:#x}".format(lport, appl, self.ALL_LANES_MASK)) + # Skip rest of the deinit/pre-init when this is the lead logical port for decommission + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_DEINIT) + continue + elif self.is_decomm_pending(lport): + if self.is_decomm_failed(lport): + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) + decomm_status_str = "failed" + else: + decomm_status_str = "waiting for completion" + self.log_notice("{}: DECOMMISSION: decommission has already started for this physical port, " + "{}".format(lport, decomm_status_str)) + continue + + if self.port_dict[lport]['host_tx_ready'] != 'true' or \ + self.port_dict[lport]['admin_status'] != 'up': + if is_fast_reboot and self.check_datapath_state(api, host_lanes_mask, ['DataPathActivated']): + self.log_notice("{} Skip datapath re-init in fast-reboot".format(lport)) + else: + self.log_notice("{} Forcing Tx laser OFF".format(lport)) + # Force DataPath re-init + api.tx_disable_channel(media_lanes_mask, True) + self.port_dict[lport]['forced_tx_disabled'] = True + txoff_duration = self.get_cmis_dp_tx_turnoff_duration_secs(api) + self.log_notice("{}: Tx turn off duration {} secs".format(lport, txoff_duration)) + self.update_cmis_state_expiration_time(lport, txoff_duration) + self.post_port_active_apsel_to_db(api, lport, host_lanes_mask, reset_apsel=True) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) + continue + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_PRE_INIT_CHECK) + if state == CMIS_STATE_DP_PRE_INIT_CHECK: + if self.port_dict[lport].get('forced_tx_disabled', False): + # Ensure that Tx is OFF + # Transceiver will remain in DataPathDeactivated state while it is in Low Power Mode (even if Tx is disabled) + # Transceiver will enter DataPathInitialized state if Tx was disabled after CMIS initialization was completed + if not self.check_datapath_state(api, host_lanes_mask, ['DataPathDeactivated', 'DataPathInitialized']): + if self.is_timer_expired(expired): + self.log_notice("{}: timeout for 'DataPathDeactivated/DataPathInitialized'".format(lport)) + self.force_cmis_reinit(lport, retries + 1) + continue + self.port_dict[lport]['forced_tx_disabled'] = False + self.log_notice("{}: Tx laser is successfully turned OFF".format(lport)) + + # Configure the target output power if ZR module + if api.is_coherent_module(): + tx_power = self.port_dict[lport]['tx_power'] + # Prevent configuring same tx power multiple times + if 0 != tx_power and tx_power != api.get_tx_config_power(): + if 1 != self.configure_tx_output_power(api, lport, tx_power): + self.log_error("{} failed to configure Tx power = {}".format(lport, tx_power)) + else: + self.log_notice("{} Successfully configured Tx power = {}".format(lport, tx_power)) + + need_update = self.is_cmis_application_update_required(api, appl, host_lanes_mask) + + # For ZR module, Datapath needes to be re-initlialized on new channel selection + if api.is_coherent_module(): + freq = self.port_dict[lport]['laser_freq'] + # If user requested frequency is NOT the same as configured on the module + # force datapath re-initialization + if 0 != freq and freq != api.get_laser_config_freq(): + if self.validate_frequency_and_grid(api, lport, freq) == True: + need_update = True + else: + # clear setting of invalid frequency config + self.port_dict[lport]['laser_freq'] = 0 + + if not need_update: + # No application updates + # As part of xcvrd restart, the TRANSCEIVER_INFO table is deleted and + # created with default value of 'N/A' for all the active apsel fields. + # The below (post_port_active_apsel_to_db) will ensure that the + # active apsel fields are updated correctly in the DB since + # the CMIS state remains unchanged during xcvrd restart + self.post_port_active_apsel_to_db(api, lport, host_lanes_mask) + self.log_notice("{}: no CMIS application update required...READY".format(lport)) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) + continue + self.log_notice("{}: force Datapath reinit".format(lport)) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_DEINIT) + elif state == CMIS_STATE_DP_DEINIT: + # D.2.2 Software Deinitialization + api.set_datapath_deinit(host_lanes_mask) + + # D.1.3 Software Configuration and Initialization + media_lanes_mask = self.port_dict[lport]['media_lanes_mask'] + if not api.tx_disable_channel(media_lanes_mask, True): + self.log_notice("{}: unable to turn off tx power with host_lanes_mask {}".format(lport, host_lanes_mask)) + self.port_dict[lport]['cmis_retries'] = retries + 1 + continue + + #Sets module to high power mode and doesn't impact datapath if module is already in high power mode + api.set_lpmode(False, wait_state_change = False) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_AP_CONF) + dpDeinitDuration = self.get_cmis_dp_deinit_duration_secs(api) + modulePwrUpDuration = self.get_cmis_module_power_up_duration_secs(api) + self.log_notice("{}: DpDeinit duration {} secs, modulePwrUp duration {} secs".format(lport, dpDeinitDuration, modulePwrUpDuration)) + self.update_cmis_state_expiration_time(lport, max(modulePwrUpDuration, dpDeinitDuration)) + + elif state == CMIS_STATE_AP_CONF: + # Explicit control bit to apply custom Host SI settings. + # It will be set to 1 and applied via set_application if + # custom SI settings is applicable + ec = 0 + + # TODO: Use fine grained time when the CMIS memory map is available + if not self.check_module_state(api, ['ModuleReady']): + if self.is_timer_expired(expired): + self.log_notice("{}: timeout for 'ModuleReady'".format(lport)) + self.force_cmis_reinit(lport, retries + 1) + continue + + if not self.check_datapath_state(api, host_lanes_mask, ['DataPathDeactivated']): + if self.is_timer_expired(expired): + self.log_notice("{}: timeout for 'DataPathDeactivated state'".format(lport)) + self.force_cmis_reinit(lport, retries + 1) + continue + + # Skip rest if it's in decommission state machine + if not self.is_decomm_pending(lport): + if api.is_coherent_module(): + # For ZR module, configure the laser frequency when Datapath is in Deactivated state + freq = self.port_dict[lport]['laser_freq'] + if 0 != freq: + if 1 != self.configure_laser_frequency(api, lport, freq): + self.log_error("{} failed to configure laser frequency {} GHz".format(lport, freq)) + else: + self.log_notice("{} configured laser frequency {} GHz".format(lport, freq)) + + # Stage custom SI settings + if optics_si_parser.optics_si_present(): + optics_si_dict = {} + # Apply module SI settings if applicable + lane_speed = int(speed/1000)//host_lane_count + optics_si_dict = optics_si_parser.fetch_optics_si_setting(pport, lane_speed, sfp) + + self.log_debug("Read SI parameters for port {} from optics_si_settings.json vendor file:".format(lport)) + for key, sub_dict in optics_si_dict.items(): + self.log_debug("{}".format(key)) + for sub_key, value in sub_dict.items(): + self.log_debug("{}: {}".format(sub_key, str(value))) + + if optics_si_dict: + self.log_notice("{}: Apply Optics SI found for Vendor: {} PN: {} lane speed: {}G". + format(lport, api.get_manufacturer(), api.get_model(), lane_speed)) + if not api.stage_custom_si_settings(host_lanes_mask, optics_si_dict): + self.log_notice("{}: unable to stage custom SI settings ".format(lport)) + self.force_cmis_reinit(lport, retries + 1) + continue + + # Set Explicit control bit to apply Custom Host SI settings + ec = 1 + + # D.1.3 Software Configuration and Initialization + api.set_application(host_lanes_mask, appl, ec) + if not api.scs_apply_datapath_init(host_lanes_mask): + self.log_notice("{}: unable to set application and stage DP init".format(lport)) + self.force_cmis_reinit(lport, retries + 1) + continue + + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_INIT) + elif state == CMIS_STATE_DP_INIT: + if not self.check_config_error(api, host_lanes_mask, ['ConfigSuccess']): + if self.is_timer_expired(expired): + self.log_notice("{}: timeout for 'ConfigSuccess', current ConfigStatus: " + "{}".format(lport, list(api.get_config_datapath_hostlane_status().values()))) + self.force_cmis_reinit(lport, retries + 1) + continue + + # Clear decommission status and invoke CMIS reinit so that normal CMIS initialization can begin + if self.is_decomm_pending(lport): + self.log_notice("{}: DECOMMISSION: done for physical port {}".format(lport, self.port_dict[lport]['index'])) + self.clear_decomm_pending(lport) + self.force_cmis_reinit(lport) + continue + + if hasattr(api, 'get_cmis_rev'): + # Check datapath init pending on module that supports CMIS 5.x + majorRev = int(api.get_cmis_rev().split('.')[0]) + if majorRev >= 5 and not self.check_datapath_init_pending(api, host_lanes_mask): + self.log_notice("{}: datapath init not pending".format(lport)) + self.force_cmis_reinit(lport, retries + 1) + continue + + # Ensure the Datapath is NOT Activated unless the host Tx siganl is good. + # NOTE: Some CMIS compliant modules may have 'auto-squelch' feature where + # the module won't take datapaths to Activated state if host tries to enable + # the datapaths while there is no good Tx signal from the host-side. + if self.port_dict[lport]['admin_status'] != 'up' or \ + self.port_dict[lport]['host_tx_ready'] != 'true': + self.log_notice("{} waiting for host tx ready...".format(lport)) + continue + + # D.1.3 Software Configuration and Initialization + api.set_datapath_init(host_lanes_mask) + dpInitDuration = self.get_cmis_dp_init_duration_secs(api) + self.log_notice("{}: DpInit duration {} secs".format(lport, dpInitDuration)) + self.update_cmis_state_expiration_time(lport, dpInitDuration) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_TXON) + elif state == CMIS_STATE_DP_TXON: + if not self.check_datapath_state(api, host_lanes_mask, ['DataPathInitialized']): + if self.is_timer_expired(expired): + self.log_notice("{}: timeout for 'DataPathInitialized'".format(lport)) + self.force_cmis_reinit(lport, retries + 1) + continue + + # Turn ON the laser + media_lanes_mask = self.port_dict[lport]['media_lanes_mask'] + api.tx_disable_channel(media_lanes_mask, False) + self.log_notice("{}: Turning ON tx power".format(lport)) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_ACTIVATE) + elif state == CMIS_STATE_DP_ACTIVATE: + # Use dpInitDuration instead of MaxDurationDPTxTurnOn because + # some modules rely on dpInitDuration to turn on the Tx signal. + # This behavior deviates from the CMIS spec but is honored + # to prevent old modules from breaking with new sonic + if not self.check_datapath_state(api, host_lanes_mask, ['DataPathActivated']): + if self.is_timer_expired(expired): + self.log_notice("{}: timeout for 'DataPathActivated'".format(lport)) + self.force_cmis_reinit(lport, retries + 1) + continue + + self.log_notice("{}: READY".format(lport)) + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) + self.post_port_active_apsel_to_db(api, lport, host_lanes_mask) + + except Exception as e: + self.log_error("{}: internal errors due to {}".format(lport, e)) + common.log_exception_traceback() + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) + + self.log_notice("Stopped") + + def run(self): + if self.platform_chassis is None: + self.log_notice("Platform chassis is not available, stopping...") + return + + if self.skip_cmis_mgr: + self.log_notice("Skipping CMIS Task Manager") + return + + try: + + self.log_notice("Waiting for PortConfigDone...") + for namespace in self.namespaces: + self.wait_for_port_config_done(namespace) + + for lport in self.port_dict.keys(): + self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_UNKNOWN) + + self.task_worker() + except Exception as e: + helper_logger.log_error("Exception occured at {} thread due to {}".format(threading.current_thread().name, repr(e))) + common.log_exception_traceback() + self.exc = e + self.main_thread_stop_event.set() + + def join(self): + self.task_stopping_event.set() + if not self.skip_cmis_mgr: + threading.Thread.join(self) + if self.exc: + raise self.exc + diff --git a/sonic-xcvrd/xcvrd/dom/dom_mgr.py b/sonic-xcvrd/xcvrd/dom/dom_mgr.py index 509d740e6..572e110b9 100644 --- a/sonic-xcvrd/xcvrd/dom/dom_mgr.py +++ b/sonic-xcvrd/xcvrd/dom/dom_mgr.py @@ -131,7 +131,7 @@ def is_port_in_cmis_initialization_process(self, logical_port_name): return False cmis_state = common.get_cmis_state_from_state_db(logical_port_name, self.xcvr_table_helper.get_status_sw_tbl(asic_index)) - if cmis_state not in xcvrd.CMIS_TERMINAL_STATES: + if cmis_state not in common.CMIS_TERMINAL_STATES: return True else: return False diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index a6ca4997a..93418d190 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -29,6 +29,7 @@ from .xcvrd_utilities import sfp_status_helper from .sff_mgr import SffManagerTask from .dom.dom_mgr import DomInfoUpdateTask + from .cmis.cmis_manager_task import CmisManagerTask from .xcvrd_utilities.xcvr_table_helper import * from .xcvrd_utilities import port_event_helper from .xcvrd_utilities.port_event_helper import PortChangeObserver @@ -53,24 +54,6 @@ PLATFORM_SPECIFIC_MODULE_NAME = "sfputil" PLATFORM_SPECIFIC_CLASS_NAME = "SfpUtil" -CMIS_STATE_UNKNOWN = 'UNKNOWN' -CMIS_STATE_INSERTED = 'INSERTED' -CMIS_STATE_DP_PRE_INIT_CHECK = 'DP_PRE_INIT_CHECK' -CMIS_STATE_DP_DEINIT = 'DP_DEINIT' -CMIS_STATE_AP_CONF = 'AP_CONFIGURED' -CMIS_STATE_DP_ACTIVATE = 'DP_ACTIVATION' -CMIS_STATE_DP_INIT = 'DP_INIT' -CMIS_STATE_DP_TXON = 'DP_TXON' -CMIS_STATE_READY = 'READY' -CMIS_STATE_REMOVED = 'REMOVED' -CMIS_STATE_FAILED = 'FAILED' - -CMIS_TERMINAL_STATES = { - CMIS_STATE_FAILED, - CMIS_STATE_READY, - CMIS_STATE_REMOVED - } - # Mgminit time required as per CMIS spec MGMT_INIT_TIME_DELAY_SECS = 2 @@ -270,1120 +253,6 @@ def waiting_time_compensation_with_sleep(time_start, time_to_wait): # Delete port from SFP status table - -def is_fast_reboot_enabled(): - fastboot_enabled = subprocess.check_output('sonic-db-cli STATE_DB hget "FAST_RESTART_ENABLE_TABLE|system" enable', shell=True, universal_newlines=True) - return "true" in fastboot_enabled - - -def is_warm_reboot_enabled(): - warmstart = swsscommon.WarmStart() - warmstart.initialize("xcvrd", "pmon") - warmstart.checkWarmStart("xcvrd", "pmon", False) - is_warm_start = warmstart.isWarmStart() - return is_warm_start - -def is_syncd_warm_restore_complete(): - """ - This function determins whether syncd's restore count is not 0, which indicates warm-reboot - to avoid premature config push by xcvrd that caused port flaps. - """ - state_db = daemon_base.db_connect("STATE_DB") - restore_count = state_db.hget("WARM_RESTART_TABLE|syncd", "restore_count") - system_enabled = state_db.hget("WARM_RESTART_ENABLE_TABLE|system", "enable") - try: - # --- Handle restore_count (could be int, str, or None) --- - if restore_count is not None: - if isinstance(restore_count, int): - if restore_count > 0: - return True - elif isinstance(restore_count, str): - if restore_count.strip().isdigit() and int(restore_count.strip()) > 0: - return True - - # --- Handle system_enabled (only care about "true"/"false"/None) --- - if isinstance(system_enabled, str): - if system_enabled.strip().lower() == "true": - return True - - except Exception as e: - helper_logger.log_warning(f"Unexpected value: restore_count={restore_count}, system_enabled={system_enabled}, error={e}") - log_exception_traceback() - return False -# -# Helper classes =============================================================== -# - -# Thread wrapper class for CMIS transceiver management - -class CmisManagerTask(threading.Thread): - - CMIS_MAX_RETRIES = 3 - CMIS_DEF_EXPIRED = 60 # seconds, default expiration time - CMIS_MODULE_TYPES = ['QSFP-DD', 'QSFP_DD', 'OSFP', 'OSFP-8X', 'QSFP+C'] - CMIS_MAX_HOST_LANES = 8 - CMIS_EXPIRATION_BUFFER_MS = 2 - ALL_LANES_MASK = 0xff - - def __init__(self, namespaces, port_mapping, main_thread_stop_event, skip_cmis_mgr=False): - threading.Thread.__init__(self) - self.name = "CmisManagerTask" - self.exc = None - self.task_stopping_event = threading.Event() - self.main_thread_stop_event = main_thread_stop_event - self.port_dict = {k: {"asic_id": v} for k, v in port_mapping.logical_to_asic.items()} - self.decomm_pending_dict = {} - self.isPortInitDone = False - self.isPortConfigDone = False - self.skip_cmis_mgr = skip_cmis_mgr - self.namespaces = namespaces - self.xcvr_table_helper = XcvrTableHelper(self.namespaces) - - def log_debug(self, message): - helper_logger.log_debug("CMIS: {}".format(message)) - - def log_notice(self, message): - helper_logger.log_notice("CMIS: {}".format(message)) - - def log_error(self, message): - helper_logger.log_error("CMIS: {}".format(message)) - - def get_asic_id(self, lport): - return self.port_dict.get(lport, {}).get("asic_id", -1) - - def update_port_transceiver_status_table_sw_cmis_state(self, lport, cmis_state_to_set): - status_table = self.xcvr_table_helper.get_status_sw_tbl(self.get_asic_id(lport)) - if status_table is None: - helper_logger.log_error("status_table is None while updating " - "sw CMIS state for lport {}".format(lport)) - return - - fvs = swsscommon.FieldValuePairs([('cmis_state', cmis_state_to_set)]) - status_table.set(lport, fvs) - - def on_port_update_event(self, port_change_event): - if port_change_event.event_type not in [port_change_event.PORT_SET, port_change_event.PORT_DEL]: - return - - lport = port_change_event.port_name - pport = port_change_event.port_index - - if lport in ['PortInitDone']: - self.isPortInitDone = True - return - - if lport in ['PortConfigDone']: - self.isPortConfigDone = True - return - - # Skip if it's not a physical port - if not lport.startswith('Ethernet'): - return - - # Skip if the physical index is not available - if pport is None: - return - - if port_change_event.port_dict is None: - return - - if port_change_event.event_type == port_change_event.PORT_SET: - if lport not in self.port_dict: - self.port_dict[lport] = {"asic_id": port_change_event.asic_id, - "forced_tx_disabled": False} - if pport >= 0: - self.port_dict[lport]['index'] = pport - if 'speed' in port_change_event.port_dict and port_change_event.port_dict['speed'] != 'N/A': - self.port_dict[lport]['speed'] = port_change_event.port_dict['speed'] - if 'lanes' in port_change_event.port_dict: - self.port_dict[lport]['lanes'] = port_change_event.port_dict['lanes'] - if 'host_tx_ready' in port_change_event.port_dict: - self.port_dict[lport]['host_tx_ready'] = port_change_event.port_dict['host_tx_ready'] - if 'admin_status' in port_change_event.port_dict: - self.port_dict[lport]['admin_status'] = port_change_event.port_dict['admin_status'] - if 'laser_freq' in port_change_event.port_dict: - self.port_dict[lport]['laser_freq'] = int(port_change_event.port_dict['laser_freq']) - if 'tx_power' in port_change_event.port_dict: - self.port_dict[lport]['tx_power'] = float(port_change_event.port_dict['tx_power']) - if 'subport' in port_change_event.port_dict: - self.port_dict[lport]['subport'] = int(port_change_event.port_dict['subport']) - - self.force_cmis_reinit(lport, 0) - - elif port_change_event.event_type == port_change_event.PORT_DEL: - # In handling the DEL event, the following two scenarios must be considered: - # 1. PORT_DEL event due to transceiver plug-out - # 2. PORT_DEL event due to Dynamic Port Breakout (DPB) - # - # Scenario 1 is simple, as only a STATE_DB|TRANSCEIVER_INFO PORT_DEL event occurs, - # so we just need to set SW_CMIS_STATE to CMIS_STATE_REMOVED. - # - # Scenario 2 is a bit more complex. First, for the port(s) before DPB, a CONFIG_DB|PORT PORT_DEL - # and a STATE_DB|PORT_TABLE PORT_DEL event occur. Next, for the port(s) after DPB, - # a CONFIG_DB|PORT PORT_SET and a STATE_DB|PORT_TABLE PORT_SET event occur. - # After that (after a short delay), a STATE_DB|TRANSCEIVER_INFO PORT_DEL event - # occurs for the port(s) before DPB, and finally, a STATE_DB|TRANSCEIVER_INFO - # PORT_SET event occurs for the port(s) after DPB. - # - # Below is the event sequence when configuring Ethernet0 from "2x200G" to "1x400G" - # (based on actual logs). - # - # 1. SET Ethernet0 CONFIG_DB|PORT - # 2. DEL Ethernet2 CONFIG_DB|PORT - # 3. DEL Ethernet0 CONFIG_DB|PORT - # 4. DEL Ethernet0 STATE_DB|PORT_TABLE - # 5. DEL Ethernet2 STATE_DB|PORT_TABLE - # 6. SET Ethernet0 CONFIG_DB|PORT - # 7. SET Ethernet0 STATE_DB|PORT_TABLE - # 8. SET Ethernet0 STATE_DB|PORT_TABLE - # 9. DEL Ethernet2 STATE_DB|TRANSCEIVER_INFO - # 10. DEL Ethernet0 STATE_DB|TRANSCEIVER_INFO - # 11. SET Ethernet0 STATE_DB|TRANSCEIVER_INFO - # - # To handle both scenarios, if the lport exists in port_dict for any DEL EVENT, - # set SW_CMIS_STATE to REMOVED. Additionally, for DEL EVENTS from CONFIG_DB due to DPB, - # remove the lport from port_dict. - if lport in self.port_dict: - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_REMOVED) - - if port_change_event.db_name == 'CONFIG_DB' and port_change_event.table_name == 'PORT': - self.clear_decomm_pending(lport) - self.port_dict.pop(lport) - - def get_cmis_dp_init_duration_secs(self, api): - return api.get_datapath_init_duration()/1000 - - def get_cmis_dp_deinit_duration_secs(self, api): - return api.get_datapath_deinit_duration()/1000 - - def get_cmis_dp_tx_turnoff_duration_secs(self, api): - return api.get_datapath_tx_turnoff_duration()/1000 - - def get_cmis_module_power_up_duration_secs(self, api): - return api.get_module_pwr_up_duration()/1000 - - def get_cmis_module_power_down_duration_secs(self, api): - return api.get_module_pwr_down_duration()/1000 - - def get_cmis_host_lanes_mask(self, api, appl, host_lane_count, subport): - """ - Retrieves mask of active host lanes based on appl, host lane count and subport - - Args: - api: - XcvrApi object - appl: - Integer, the transceiver-specific application code - host_lane_count: - Integer, number of lanes on the host side - subport: - Integer, 1-based logical port number of the physical port after breakout - 0 means port is a non-breakout port - - Returns: - Integer, a mask of the active lanes on the host side - e.g. 0x3 for lane 0 and lane 1. - """ - host_lanes_mask = 0 - - if appl is None or host_lane_count <= 0 or subport < 0: - self.log_error("Invalid input to get host lane mask - appl {} host_lane_count {} " - "subport {}!".format(appl, host_lane_count, subport)) - return host_lanes_mask - - host_lane_assignment_option = api.get_host_lane_assignment_option(appl) - host_lane_start_bit = (host_lane_count * (0 if subport == 0 else subport - 1)) - if host_lane_assignment_option & (1 << host_lane_start_bit): - host_lanes_mask = ((1 << host_lane_count) - 1) << host_lane_start_bit - else: - self.log_error("Unable to find starting host lane - host_lane_assignment_option {}" - " host_lane_start_bit {} host_lane_count {} subport {} appl {}!".format( - host_lane_assignment_option, host_lane_start_bit, host_lane_count, - subport, appl)) - - return host_lanes_mask - - def get_cmis_media_lanes_mask(self, api, appl, lport, subport): - """ - Retrieves mask of active media lanes based on appl, lport and subport - - Args: - api: - XcvrApi object - appl: - Integer, the transceiver-specific application code - lport: - String, logical port name - subport: - Integer, 1-based logical port number of the physical port after breakout - 0 means port is a non-breakout port - - Returns: - Integer, a mask of the active lanes on the media side - e.g. 0xf for lane 0, lane 1, lane 2 and lane 3. - """ - media_lanes_mask = 0 - media_lane_count = self.port_dict[lport]['media_lane_count'] - media_lane_assignment_option = self.port_dict[lport]['media_lane_assignment_options'] - - if appl < 1 or media_lane_count <= 0 or subport < 0: - self.log_error("Invalid input to get media lane mask - appl {} media_lane_count {} " - "lport {} subport {}!".format(appl, media_lane_count, lport, subport)) - return media_lanes_mask - - media_lane_start_bit = (media_lane_count * (0 if subport == 0 else subport - 1)) - if media_lane_assignment_option & (1 << media_lane_start_bit): - media_lanes_mask = ((1 << media_lane_count) - 1) << media_lane_start_bit - else: - self.log_error("Unable to find starting media lane - media_lane_assignment_option {}" - " media_lane_start_bit {} media_lane_count {} lport {} subport {} appl {}!".format( - media_lane_assignment_option, media_lane_start_bit, media_lane_count, - lport, subport, appl)) - - return media_lanes_mask - - def clear_decomm_pending(self, lport): - """ - Clear the decommission pending status for the entire physical port this logical port belongs to. - - Args: - lport: - String, logical port name - """ - self.decomm_pending_dict.pop(self.port_dict.get(lport, {}).get('index'), None) - - def set_decomm_pending(self, lport): - """ - Set the decommission pending status. - - Args: - lport: - String, logical port name - """ - physical_port_idx = self.port_dict[lport]['index'] - if physical_port_idx in self.decomm_pending_dict: - # only one logical port can be the lead logical port doing the - # decommission state machine. - return - self.decomm_pending_dict[physical_port_idx] = lport - self.log_notice("{}: DECOMMISSION: setting decomm_pending for physical port " - "{}".format(lport, physical_port_idx)) - - def is_decomm_lead_lport(self, lport): - """ - Check if this is the lead logical port doing the decommission state machine. - - Args: - lport: - String, logical port name - Returns: - Boolean, True if decommission pending, False otherwise - """ - return self.decomm_pending_dict.get(self.port_dict[lport]['index']) == lport - - def is_decomm_pending(self, lport): - """ - Get the decommission pending status for the physical port the given logical port belongs to. - - Args: - lport: - String, logical port name - Returns: - Boolean, True if decommission pending, False otherwise - """ - return self.port_dict[lport]['index'] in self.decomm_pending_dict - - def is_decomm_failed(self, lport): - """ - Get the decommission failed status for the physical port the given logical port belongs to. - - Args: - lport: - String, logical port name - Returns: - Boolean, True if decommission failed, False otherwise - """ - physical_port_idx = self.port_dict[lport]['index'] - lead_logical_port = self.decomm_pending_dict.get(physical_port_idx) - if lead_logical_port is None: - return False - return ( - common.get_cmis_state_from_state_db( - lead_logical_port, - self.xcvr_table_helper.get_status_sw_tbl( - self.get_asic_id(lead_logical_port) - ) - ) - == CMIS_STATE_FAILED - ) - - def is_decommission_required(self, api, app_new): - """ - Check if the CMIS decommission (i.e. reset appl code to 0 for all lanes - of the entire physical port) is required - - Args: - api: - XcvrApi object - app_new: - Integer, the new desired appl code - Returns: - True, if decommission is required - False, if decommission is not required - """ - for lane in range(self.CMIS_MAX_HOST_LANES): - app_cur = api.get_application(lane) - if app_cur != 0 and app_cur != app_new: - return True - return False - - def is_cmis_application_update_required(self, api, app_new, host_lanes_mask): - """ - Check if the CMIS application update is required - - Args: - api: - XcvrApi object - app_new: - Integer, the transceiver-specific application code for the new application - host_lanes_mask: - Integer, a bitmask of the lanes on the host side - e.g. 0x5 for lane 0 and lane 2. - - Returns: - Boolean, true if application update is required otherwise false - """ - if api.is_flat_memory() or app_new <= 0 or host_lanes_mask <= 0: - self.log_error("Invalid input while checking CMIS update required - is_flat_memory {}" - "app_new {} host_lanes_mask {}!".format( - api.is_flat_memory(), app_new, host_lanes_mask)) - return False - - app_old = 0 - for lane in range(self.CMIS_MAX_HOST_LANES): - if ((1 << lane) & host_lanes_mask) == 0: - continue - if app_old == 0: - app_old = api.get_application(lane) - elif app_old != api.get_application(lane): - self.log_notice("Not all the lanes are in the same application mode " - "app_old {} current app {} lane {} host_lanes_mask {}".format( - app_old, api.get_application(lane), lane, host_lanes_mask)) - self.log_notice("Forcing application update...") - return True - - if app_old == app_new: - skip = True - dp_state = api.get_datapath_state() - conf_state = api.get_config_datapath_hostlane_status() - for lane in range(self.CMIS_MAX_HOST_LANES): - if ((1 << lane) & host_lanes_mask) == 0: - continue - name = "DP{}State".format(lane + 1) - if dp_state[name] != 'DataPathActivated': - skip = False - break - name = "ConfigStatusLane{}".format(lane + 1) - if conf_state[name] != 'ConfigSuccess': - skip = False - break - return (not skip) - return True - - def force_cmis_reinit(self, lport, retries=0): - """ - Try to force the restart of CMIS state machine - """ - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_INSERTED) - self.port_dict[lport]['cmis_retries'] = retries - self.port_dict[lport]['cmis_expired'] = None # No expiration - - def check_module_state(self, api, states): - """ - Check if the CMIS module is in the specified state - - Args: - api: - XcvrApi object - states: - List, a string list of states - - Returns: - Boolean, true if it's in the specified state, otherwise false - """ - return api.get_module_state() in states - - def check_config_error(self, api, host_lanes_mask, states): - """ - Check if the CMIS configuration states are in the specified state - - Args: - api: - XcvrApi object - host_lanes_mask: - Integer, a bitmask of the lanes on the host side - e.g. 0x5 for lane 0 and lane 2. - states: - List, a string list of states - - Returns: - Boolean, true if all lanes are in the specified state, otherwise false - """ - done = True - cerr = api.get_config_datapath_hostlane_status() - for lane in range(self.CMIS_MAX_HOST_LANES): - if ((1 << lane) & host_lanes_mask) == 0: - continue - key = "ConfigStatusLane{}".format(lane + 1) - if cerr[key] not in states: - done = False - break - - return done - - def check_datapath_init_pending(self, api, host_lanes_mask): - """ - Check if the CMIS datapath init is pending - - Args: - api: - XcvrApi object - host_lanes_mask: - Integer, a bitmask of the lanes on the host side - e.g. 0x5 for lane 0 and lane 2. - - Returns: - Boolean, true if all lanes are pending datapath init, otherwise false - """ - pending = True - dpinit_pending_dict = api.get_dpinit_pending() - for lane in range(self.CMIS_MAX_HOST_LANES): - if ((1 << lane) & host_lanes_mask) == 0: - continue - key = "DPInitPending{}".format(lane + 1) - if not dpinit_pending_dict[key]: - pending = False - break - - return pending - - def check_datapath_state(self, api, host_lanes_mask, states): - """ - Check if the CMIS datapath states are in the specified state - - Args: - api: - XcvrApi object - host_lanes_mask: - Integer, a bitmask of the lanes on the host side - e.g. 0x5 for lane 0 and lane 2. - states: - List, a string list of states - - Returns: - Boolean, true if all lanes are in the specified state, otherwise false - """ - done = True - dpstate = api.get_datapath_state() - for lane in range(self.CMIS_MAX_HOST_LANES): - if ((1 << lane) & host_lanes_mask) == 0: - continue - key = "DP{}State".format(lane + 1) - if dpstate[key] not in states: - done = False - break - - return done - - def get_configured_laser_freq_from_db(self, lport): - """ - Return the Tx power configured by user in CONFIG_DB's PORT table - """ - port_tbl = self.xcvr_table_helper.get_cfg_port_tbl(self.get_asic_id(lport)) - - found, laser_freq = port_tbl.hget(lport, 'laser_freq') - return int(laser_freq) if found else 0 - - def get_configured_tx_power_from_db(self, lport): - """ - Return the Tx power configured by user in CONFIG_DB's PORT table - """ - port_tbl = self.xcvr_table_helper.get_cfg_port_tbl(self.get_asic_id(lport)) - - found, power = port_tbl.hget(lport, 'tx_power') - return float(power) if found else 0 - - def get_host_tx_status(self, lport): - state_port_tbl = self.xcvr_table_helper.get_state_port_tbl(self.get_asic_id(lport)) - - found, host_tx_ready = state_port_tbl.hget(lport, 'host_tx_ready') - return host_tx_ready if found else 'false' - - def get_port_admin_status(self, lport): - cfg_port_tbl = self.xcvr_table_helper.get_cfg_port_tbl(self.get_asic_id(lport)) - - found, admin_status = cfg_port_tbl.hget(lport, 'admin_status') - return admin_status if found else 'down' - - def configure_tx_output_power(self, api, lport, tx_power): - min_p, max_p = api.get_supported_power_config() - if tx_power < min_p: - self.log_error("{} configured tx power {} < minimum power {} supported".format(lport, tx_power, min_p)) - if tx_power > max_p: - self.log_error("{} configured tx power {} > maximum power {} supported".format(lport, tx_power, max_p)) - return api.set_tx_power(tx_power) - - def validate_frequency_and_grid(self, api, lport, freq, grid=75): - supported_grid, _, _, lowf, highf = api.get_supported_freq_config() - if freq < lowf: - self.log_error("{} configured freq:{} GHz is lower than the supported freq:{} GHz".format(lport, freq, lowf)) - return False - if freq > highf: - self.log_error("{} configured freq:{} GHz is higher than the supported freq:{} GHz".format(lport, freq, highf)) - return False - if grid == 75: - if (supported_grid >> 7) & 0x1 != 1: - self.log_error("{} configured freq:{}GHz supported grid:{} 75GHz is not supported".format(lport, freq, supported_grid)) - return False - chan = int(round((freq - 193100)/25)) - if chan % 3 != 0: - self.log_error("{} configured freq:{}GHz is NOT in 75GHz grid".format(lport, freq)) - return False - elif grid == 100: - if (supported_grid >> 5) & 0x1 != 1: - self.log_error("{} configured freq:{}GHz 100GHz is not supported".format(lport, freq)) - return False - else: - self.log_error("{} configured freq:{}GHz {}GHz is not supported".format(lport, freq, grid)) - return False - return True - - def configure_laser_frequency(self, api, lport, freq, grid=75): - if api.get_tuning_in_progress(): - self.log_error("{} Tuning in progress, subport selection may fail!".format(lport)) - return api.set_laser_freq(freq, grid) - - def post_port_active_apsel_to_db(self, api, lport, host_lanes_mask, reset_apsel=False): - if reset_apsel == False: - try: - act_apsel = api.get_active_apsel_hostlane() - appl_advt = api.get_application_advertisement() - except NotImplementedError: - helper_logger.log_error("Required feature is not implemented") - return - - tuple_list = [] - for lane in range(self.CMIS_MAX_HOST_LANES): - if ((1 << lane) & host_lanes_mask) == 0: - tuple_list.append(('active_apsel_hostlane{}'.format(lane + 1), 'N/A')) - continue - if reset_apsel == False: - act_apsel_lane = act_apsel.get('ActiveAppSelLane{}'.format(lane + 1), 'N/A') - tuple_list.append(('active_apsel_hostlane{}'.format(lane + 1), - str(act_apsel_lane))) - else: - tuple_list.append(('active_apsel_hostlane{}'.format(lane + 1), 'N/A')) - - # also update host_lane_count and media_lane_count - if len(tuple_list) > 0: - if reset_apsel == False: - appl_advt_act = appl_advt.get(act_apsel_lane) - host_lane_count = appl_advt_act.get('host_lane_count', 'N/A') if appl_advt_act else 'N/A' - tuple_list.append(('host_lane_count', str(host_lane_count))) - media_lane_count = appl_advt_act.get('media_lane_count', 'N/A') if appl_advt_act else 'N/A' - tuple_list.append(('media_lane_count', str(media_lane_count))) - else: - tuple_list.append(('host_lane_count', 'N/A')) - tuple_list.append(('media_lane_count', 'N/A')) - - intf_tbl = self.xcvr_table_helper.get_intf_tbl(self.get_asic_id(lport)) - if not intf_tbl: - helper_logger.log_warning("Active ApSel db update: TRANSCEIVER_INFO table not found for {}".format(lport)) - return - found, _ = intf_tbl.get(lport) - if not found: - helper_logger.log_warning("Active ApSel db update: {} not found in INTF_TABLE".format(lport)) - return - fvs = swsscommon.FieldValuePairs(tuple_list) - intf_tbl.set(lport, fvs) - self.log_notice("{}: updated TRANSCEIVER_INFO_TABLE {}".format(lport, tuple_list)) - - def wait_for_port_config_done(self, namespace): - # Connect to APPL_DB and subscribe to PORT table notifications - appl_db = daemon_base.db_connect("APPL_DB", namespace=namespace) - - sel = swsscommon.Select() - port_tbl = swsscommon.SubscriberStateTable(appl_db, swsscommon.APP_PORT_TABLE_NAME) - sel.addSelectable(port_tbl) - - # Make sure this daemon started after all port configured - while not self.task_stopping_event.is_set(): - (state, c) = sel.select(port_event_helper.SELECT_TIMEOUT_MSECS) - if state == swsscommon.Select.TIMEOUT: - continue - if state != swsscommon.Select.OBJECT: - self.log_warning("sel.select() did not return swsscommon.Select.OBJECT") - continue - - (key, op, fvp) = port_tbl.pop() - if key in ["PortConfigDone", "PortInitDone"]: - break - - def update_cmis_state_expiration_time(self, lport, duration_seconds): - """ - Set the CMIS expiration time for the given logical port - in the port dictionary. - Args: - lport: Logical port name - duration_seconds: Duration in seconds for the expiration - """ - self.port_dict[lport]['cmis_expired'] = datetime.datetime.now() + \ - datetime.timedelta(seconds=duration_seconds) + \ - datetime.timedelta(milliseconds=self.CMIS_EXPIRATION_BUFFER_MS) - - def is_timer_expired(self, expired_time, current_time=None): - """ - Check if the given expiration time has passed. - - Args: - expired_time (datetime): The expiration time to check. - current_time (datetime, optional): The current time. Defaults to now. - - Returns: - bool: True if expired_time is not None and has passed, False otherwise. - """ - if expired_time is None: - return False - - if current_time is None: - current_time = datetime.datetime.now() - - return expired_time <= current_time - - def task_worker(self): - is_fast_reboot = common.is_fast_reboot_enabled() - - # APPL_DB for CONFIG updates, and STATE_DB for insertion/removal - port_change_observer = PortChangeObserver(self.namespaces, helper_logger, - self.task_stopping_event, - self.on_port_update_event) - - while not self.task_stopping_event.is_set(): - # Handle port change event from main thread - port_change_observer.handle_port_update_event() - - for lport, info in self.port_dict.items(): - if self.task_stopping_event.is_set(): - break - - if lport not in self.port_dict: - continue - - state = common.get_cmis_state_from_state_db(lport, self.xcvr_table_helper.get_status_sw_tbl(self.get_asic_id(lport))) - if state in CMIS_TERMINAL_STATES or state == CMIS_STATE_UNKNOWN: - if state != CMIS_STATE_READY: - self.port_dict[lport]['appl'] = 0 - self.port_dict[lport]['host_lanes_mask'] = 0 - continue - - # Handle the case when Xcvrd was NOT running when 'host_tx_ready' or 'admin_status' - # was updated or this is the first run so reconcile the above two attributes - if 'host_tx_ready' not in self.port_dict[lport]: - self.port_dict[lport]['host_tx_ready'] = self.get_host_tx_status(lport) - - if 'admin_status' not in self.port_dict[lport]: - self.port_dict[lport]['admin_status'] = self.get_port_admin_status(lport) - - pport = int(info.get('index', "-1")) - speed = int(info.get('speed', "0")) - lanes = info.get('lanes', "").strip() - subport = info.get('subport', 0) - if pport < 0 or speed == 0 or len(lanes) < 1 or subport < 0: - continue - - # Desired port speed on the host side - host_speed = speed - host_lane_count = len(lanes.split(',')) - - # double-check the HW presence before moving forward - sfp = platform_chassis.get_sfp(pport) - if not sfp.get_presence(): - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_REMOVED) - continue - - try: - # Skip if XcvrApi is not supported - api = sfp.get_xcvr_api() - if api is None: - self.log_error("{}: skipping CMIS state machine since no xcvr api!!!".format(lport)) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) - continue - - # Skip if it's not a paged memory device - if api.is_flat_memory(): - self.log_notice("{}: skipping CMIS state machine for flat memory xcvr".format(lport)) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) - continue - - # Skip if it's not a CMIS module - type = api.get_module_type_abbreviation() - if (type is None) or (type not in self.CMIS_MODULE_TYPES): - self.log_notice("{}: skipping CMIS state machine for non-CMIS module with type {}".format(lport, type)) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) - continue - - if api.is_coherent_module(): - if 'tx_power' not in self.port_dict[lport]: - self.port_dict[lport]['tx_power'] = self.get_configured_tx_power_from_db(lport) - if 'laser_freq' not in self.port_dict[lport]: - self.port_dict[lport]['laser_freq'] = self.get_configured_laser_freq_from_db(lport) - except AttributeError: - # Skip if these essential routines are not available - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) - continue - except Exception as e: - self.log_error("{}: Exception in xcvr api: {}".format(lport, e)) - common.log_exception_traceback() - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) - continue - - # CMIS expiration and retries - # - # A retry should always start over at INSETRTED state, while the - # expiration will reset the state to INSETRTED and advance the - # retry counter - expired = self.port_dict[lport].get('cmis_expired') - retries = self.port_dict[lport].get('cmis_retries', 0) - host_lanes_mask = self.port_dict[lport].get('host_lanes_mask', 0) - appl = self.port_dict[lport].get('appl', 0) - # appl can be 0 if this lport is in decommission state machine, which should not be considered as failed case. - if state != CMIS_STATE_INSERTED and not self.is_decomm_lead_lport(lport) and (host_lanes_mask <= 0 or appl < 1): - self.log_error("{}: Unexpected value for host_lanes_mask {} or appl {} in " - "{} state".format(lport, host_lanes_mask, appl, state)) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) - continue - - self.log_notice("{}: {}G, lanemask=0x{:x}, CMIS state={}{}, Module state={}, DP state={}, appl {} host_lane_count {} " - "retries={}".format(lport, int(speed/1000), host_lanes_mask, state, - "(decommission" + ("*" if self.is_decomm_lead_lport(lport) else "") + ")" - if self.is_decomm_pending(lport) else "", - api.get_module_state(), api.get_datapath_state(), appl, host_lane_count, retries)) - if retries > self.CMIS_MAX_RETRIES: - self.log_error("{}: FAILED".format(lport)) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) - continue - - try: - # CMIS state transitions - if state == CMIS_STATE_INSERTED: - self.port_dict[lport]['appl'] = common.get_cmis_application_desired(api, host_lane_count, host_speed) - if self.port_dict[lport]['appl'] is None: - self.log_error("{}: no suitable app for the port appl {} host_lane_count {} " - "host_speed {}".format(lport, appl, host_lane_count, host_speed)) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) - continue - appl = self.port_dict[lport]['appl'] - self.log_notice("{}: Setting appl={}".format(lport, appl)) - - self.port_dict[lport]['host_lanes_mask'] = self.get_cmis_host_lanes_mask(api, - appl, host_lane_count, subport) - if self.port_dict[lport]['host_lanes_mask'] <= 0: - self.log_error("{}: Invalid lane mask received - host_lane_count {} subport {} " - "appl {}!".format(lport, host_lane_count, subport, appl)) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) - continue - host_lanes_mask = self.port_dict[lport]['host_lanes_mask'] - self.log_notice("{}: Setting host_lanemask=0x{:x}".format(lport, host_lanes_mask)) - - self.port_dict[lport]['media_lane_count'] = int(api.get_media_lane_count(appl)) - self.port_dict[lport]['media_lane_assignment_options'] = int(api.get_media_lane_assignment_option(appl)) - media_lane_count = self.port_dict[lport]['media_lane_count'] - media_lane_assignment_options = self.port_dict[lport]['media_lane_assignment_options'] - self.port_dict[lport]['media_lanes_mask'] = self.get_cmis_media_lanes_mask(api, - appl, lport, subport) - if self.port_dict[lport]['media_lanes_mask'] <= 0: - self.log_error("{}: Invalid media lane mask received - media_lane_count {} " - "media_lane_assignment_options {} subport {}" - " appl {}!".format(lport, media_lane_count, media_lane_assignment_options, subport, appl)) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) - continue - media_lanes_mask = self.port_dict[lport]['media_lanes_mask'] - self.log_notice("{}: Setting media_lanemask=0x{:x}".format(lport, media_lanes_mask)) - - if self.is_decommission_required(api, appl): - self.set_decomm_pending(lport) - - if self.is_decomm_lead_lport(lport): - # Set all the DP lanes AppSel to unused(0) when non default app code needs to be configured - self.port_dict[lport]['appl'] = appl = 0 - self.port_dict[lport]['host_lanes_mask'] = host_lanes_mask = self.ALL_LANES_MASK - self.port_dict[lport]['media_lanes_mask'] = self.ALL_LANES_MASK - self.log_notice("{}: DECOMMISSION: setting appl={} and " - "host_lanes_mask/media_lanes_mask={:#x}".format(lport, appl, self.ALL_LANES_MASK)) - # Skip rest of the deinit/pre-init when this is the lead logical port for decommission - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_DEINIT) - continue - elif self.is_decomm_pending(lport): - if self.is_decomm_failed(lport): - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) - decomm_status_str = "failed" - else: - decomm_status_str = "waiting for completion" - self.log_notice("{}: DECOMMISSION: decommission has already started for this physical port, " - "{}".format(lport, decomm_status_str)) - continue - - if self.port_dict[lport]['host_tx_ready'] != 'true' or \ - self.port_dict[lport]['admin_status'] != 'up': - if is_fast_reboot and self.check_datapath_state(api, host_lanes_mask, ['DataPathActivated']): - self.log_notice("{} Skip datapath re-init in fast-reboot".format(lport)) - else: - self.log_notice("{} Forcing Tx laser OFF".format(lport)) - # Force DataPath re-init - api.tx_disable_channel(media_lanes_mask, True) - self.port_dict[lport]['forced_tx_disabled'] = True - txoff_duration = self.get_cmis_dp_tx_turnoff_duration_secs(api) - self.log_notice("{}: Tx turn off duration {} secs".format(lport, txoff_duration)) - self.update_cmis_state_expiration_time(lport, txoff_duration) - self.post_port_active_apsel_to_db(api, lport, host_lanes_mask, reset_apsel=True) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) - continue - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_PRE_INIT_CHECK) - if state == CMIS_STATE_DP_PRE_INIT_CHECK: - if self.port_dict[lport].get('forced_tx_disabled', False): - # Ensure that Tx is OFF - # Transceiver will remain in DataPathDeactivated state while it is in Low Power Mode (even if Tx is disabled) - # Transceiver will enter DataPathInitialized state if Tx was disabled after CMIS initialization was completed - if not self.check_datapath_state(api, host_lanes_mask, ['DataPathDeactivated', 'DataPathInitialized']): - if self.is_timer_expired(expired): - self.log_notice("{}: timeout for 'DataPathDeactivated/DataPathInitialized'".format(lport)) - self.force_cmis_reinit(lport, retries + 1) - continue - self.port_dict[lport]['forced_tx_disabled'] = False - self.log_notice("{}: Tx laser is successfully turned OFF".format(lport)) - - # Configure the target output power if ZR module - if api.is_coherent_module(): - tx_power = self.port_dict[lport]['tx_power'] - # Prevent configuring same tx power multiple times - if 0 != tx_power and tx_power != api.get_tx_config_power(): - if 1 != self.configure_tx_output_power(api, lport, tx_power): - self.log_error("{} failed to configure Tx power = {}".format(lport, tx_power)) - else: - self.log_notice("{} Successfully configured Tx power = {}".format(lport, tx_power)) - - need_update = self.is_cmis_application_update_required(api, appl, host_lanes_mask) - - # For ZR module, Datapath needes to be re-initlialized on new channel selection - if api.is_coherent_module(): - freq = self.port_dict[lport]['laser_freq'] - # If user requested frequency is NOT the same as configured on the module - # force datapath re-initialization - if 0 != freq and freq != api.get_laser_config_freq(): - if self.validate_frequency_and_grid(api, lport, freq) == True: - need_update = True - else: - # clear setting of invalid frequency config - self.port_dict[lport]['laser_freq'] = 0 - - if not need_update: - # No application updates - # As part of xcvrd restart, the TRANSCEIVER_INFO table is deleted and - # created with default value of 'N/A' for all the active apsel fields. - # The below (post_port_active_apsel_to_db) will ensure that the - # active apsel fields are updated correctly in the DB since - # the CMIS state remains unchanged during xcvrd restart - self.post_port_active_apsel_to_db(api, lport, host_lanes_mask) - self.log_notice("{}: no CMIS application update required...READY".format(lport)) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) - continue - self.log_notice("{}: force Datapath reinit".format(lport)) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_DEINIT) - elif state == CMIS_STATE_DP_DEINIT: - # D.2.2 Software Deinitialization - api.set_datapath_deinit(host_lanes_mask) - - # D.1.3 Software Configuration and Initialization - media_lanes_mask = self.port_dict[lport]['media_lanes_mask'] - if not api.tx_disable_channel(media_lanes_mask, True): - self.log_notice("{}: unable to turn off tx power with host_lanes_mask {}".format(lport, host_lanes_mask)) - self.port_dict[lport]['cmis_retries'] = retries + 1 - continue - - #Sets module to high power mode and doesn't impact datapath if module is already in high power mode - api.set_lpmode(False, wait_state_change = False) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_AP_CONF) - dpDeinitDuration = self.get_cmis_dp_deinit_duration_secs(api) - modulePwrUpDuration = self.get_cmis_module_power_up_duration_secs(api) - self.log_notice("{}: DpDeinit duration {} secs, modulePwrUp duration {} secs".format(lport, dpDeinitDuration, modulePwrUpDuration)) - self.update_cmis_state_expiration_time(lport, max(modulePwrUpDuration, dpDeinitDuration)) - - elif state == CMIS_STATE_AP_CONF: - # Explicit control bit to apply custom Host SI settings. - # It will be set to 1 and applied via set_application if - # custom SI settings is applicable - ec = 0 - - # TODO: Use fine grained time when the CMIS memory map is available - if not self.check_module_state(api, ['ModuleReady']): - if self.is_timer_expired(expired): - self.log_notice("{}: timeout for 'ModuleReady'".format(lport)) - self.force_cmis_reinit(lport, retries + 1) - continue - - if not self.check_datapath_state(api, host_lanes_mask, ['DataPathDeactivated']): - if self.is_timer_expired(expired): - self.log_notice("{}: timeout for 'DataPathDeactivated state'".format(lport)) - self.force_cmis_reinit(lport, retries + 1) - continue - - # Skip rest if it's in decommission state machine - if not self.is_decomm_pending(lport): - if api.is_coherent_module(): - # For ZR module, configure the laser frequency when Datapath is in Deactivated state - freq = self.port_dict[lport]['laser_freq'] - if 0 != freq: - if 1 != self.configure_laser_frequency(api, lport, freq): - self.log_error("{} failed to configure laser frequency {} GHz".format(lport, freq)) - else: - self.log_notice("{} configured laser frequency {} GHz".format(lport, freq)) - - # Stage custom SI settings - if optics_si_parser.optics_si_present(): - optics_si_dict = {} - # Apply module SI settings if applicable - lane_speed = int(speed/1000)//host_lane_count - optics_si_dict = optics_si_parser.fetch_optics_si_setting(pport, lane_speed, sfp) - - self.log_debug("Read SI parameters for port {} from optics_si_settings.json vendor file:".format(lport)) - for key, sub_dict in optics_si_dict.items(): - self.log_debug("{}".format(key)) - for sub_key, value in sub_dict.items(): - self.log_debug("{}: {}".format(sub_key, str(value))) - - if optics_si_dict: - self.log_notice("{}: Apply Optics SI found for Vendor: {} PN: {} lane speed: {}G". - format(lport, api.get_manufacturer(), api.get_model(), lane_speed)) - if not api.stage_custom_si_settings(host_lanes_mask, optics_si_dict): - self.log_notice("{}: unable to stage custom SI settings ".format(lport)) - self.force_cmis_reinit(lport, retries + 1) - continue - - # Set Explicit control bit to apply Custom Host SI settings - ec = 1 - - # D.1.3 Software Configuration and Initialization - api.set_application(host_lanes_mask, appl, ec) - if not api.scs_apply_datapath_init(host_lanes_mask): - self.log_notice("{}: unable to set application and stage DP init".format(lport)) - self.force_cmis_reinit(lport, retries + 1) - continue - - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_INIT) - elif state == CMIS_STATE_DP_INIT: - if not self.check_config_error(api, host_lanes_mask, ['ConfigSuccess']): - if self.is_timer_expired(expired): - self.log_notice("{}: timeout for 'ConfigSuccess', current ConfigStatus: " - "{}".format(lport, list(api.get_config_datapath_hostlane_status().values()))) - self.force_cmis_reinit(lport, retries + 1) - continue - - # Clear decommission status and invoke CMIS reinit so that normal CMIS initialization can begin - if self.is_decomm_pending(lport): - self.log_notice("{}: DECOMMISSION: done for physical port {}".format(lport, self.port_dict[lport]['index'])) - self.clear_decomm_pending(lport) - self.force_cmis_reinit(lport) - continue - - if hasattr(api, 'get_cmis_rev'): - # Check datapath init pending on module that supports CMIS 5.x - majorRev = int(api.get_cmis_rev().split('.')[0]) - if majorRev >= 5 and not self.check_datapath_init_pending(api, host_lanes_mask): - self.log_notice("{}: datapath init not pending".format(lport)) - self.force_cmis_reinit(lport, retries + 1) - continue - - # Ensure the Datapath is NOT Activated unless the host Tx siganl is good. - # NOTE: Some CMIS compliant modules may have 'auto-squelch' feature where - # the module won't take datapaths to Activated state if host tries to enable - # the datapaths while there is no good Tx signal from the host-side. - if self.port_dict[lport]['admin_status'] != 'up' or \ - self.port_dict[lport]['host_tx_ready'] != 'true': - self.log_notice("{} waiting for host tx ready...".format(lport)) - continue - - # D.1.3 Software Configuration and Initialization - api.set_datapath_init(host_lanes_mask) - dpInitDuration = self.get_cmis_dp_init_duration_secs(api) - self.log_notice("{}: DpInit duration {} secs".format(lport, dpInitDuration)) - self.update_cmis_state_expiration_time(lport, dpInitDuration) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_TXON) - elif state == CMIS_STATE_DP_TXON: - if not self.check_datapath_state(api, host_lanes_mask, ['DataPathInitialized']): - if self.is_timer_expired(expired): - self.log_notice("{}: timeout for 'DataPathInitialized'".format(lport)) - self.force_cmis_reinit(lport, retries + 1) - continue - - # Turn ON the laser - media_lanes_mask = self.port_dict[lport]['media_lanes_mask'] - api.tx_disable_channel(media_lanes_mask, False) - self.log_notice("{}: Turning ON tx power".format(lport)) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_DP_ACTIVATE) - elif state == CMIS_STATE_DP_ACTIVATE: - # Use dpInitDuration instead of MaxDurationDPTxTurnOn because - # some modules rely on dpInitDuration to turn on the Tx signal. - # This behavior deviates from the CMIS spec but is honored - # to prevent old modules from breaking with new sonic - if not self.check_datapath_state(api, host_lanes_mask, ['DataPathActivated']): - if self.is_timer_expired(expired): - self.log_notice("{}: timeout for 'DataPathActivated'".format(lport)) - self.force_cmis_reinit(lport, retries + 1) - continue - - self.log_notice("{}: READY".format(lport)) - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_READY) - self.post_port_active_apsel_to_db(api, lport, host_lanes_mask) - - except Exception as e: - self.log_error("{}: internal errors due to {}".format(lport, e)) - common.log_exception_traceback() - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_FAILED) - - self.log_notice("Stopped") - - def run(self): - if platform_chassis is None: - self.log_notice("Platform chassis is not available, stopping...") - return - - if self.skip_cmis_mgr: - self.log_notice("Skipping CMIS Task Manager") - return - - try: - - self.log_notice("Waiting for PortConfigDone...") - for namespace in self.namespaces: - self.wait_for_port_config_done(namespace) - - for lport in self.port_dict.keys(): - self.update_port_transceiver_status_table_sw_cmis_state(lport, CMIS_STATE_UNKNOWN) - - self.task_worker() - except Exception as e: - helper_logger.log_error("Exception occured at {} thread due to {}".format(threading.current_thread().name, repr(e))) - common.log_exception_traceback() - self.exc = e - self.main_thread_stop_event.set() - - def join(self): - self.task_stopping_event.set() - if not self.skip_cmis_mgr: - threading.Thread.join(self) - if self.exc: - raise self.exc - # Thread wrapper class to update sfp state info periodically @@ -1442,7 +311,7 @@ def _post_port_sfp_info_and_dom_thr_to_db_once(self, port_mapping, xcvr_table_he transceiver_dict = {} retry_eeprom_set = set() - is_warm_start = is_syncd_warm_restore_complete() + is_warm_start = common.is_syncd_warm_restore_complete() # Post all the current interface sfp/dom threshold info to STATE_DB logical_port_list = port_mapping.logical_port_list for logical_port_name in logical_port_list: @@ -2173,7 +1042,7 @@ def init(self): def deinit(self): self.log_info("Start daemon deinit...") - is_warm_fast_reboot = is_syncd_warm_restore_complete() or common.is_fast_reboot_enabled() + is_warm_fast_reboot = common.is_syncd_warm_restore_complete() or common.is_fast_reboot_enabled() # Delete all the information from DB and then exit port_mapping_data = port_event_helper.get_port_mapping(self.namespaces) @@ -2239,7 +1108,7 @@ def run(self): # Start the CMIS manager cmis_manager = None if not self.skip_cmis_mgr: - cmis_manager = CmisManagerTask(self.namespaces, port_mapping_data, self.stop_event, self.skip_cmis_mgr) + cmis_manager = CmisManagerTask(self.namespaces, port_mapping_data, self.stop_event, self.skip_cmis_mgr, platform_chassis) cmis_manager.start() self.threads.append(cmis_manager) diff --git a/sonic-xcvrd/xcvrd/xcvrd_utilities/common.py b/sonic-xcvrd/xcvrd/xcvrd_utilities/common.py index 7a8255aa2..65360173b 100644 --- a/sonic-xcvrd/xcvrd/xcvrd_utilities/common.py +++ b/sonic-xcvrd/xcvrd/xcvrd_utilities/common.py @@ -11,7 +11,7 @@ import traceback import threading from swsscommon import swsscommon - from sonic_py_common import syslogger + from sonic_py_common import syslogger, daemon_base from . import sfp_status_helper from sonic_platform_base.sonic_xcvr.api.public.c_cmis import CmisApi @@ -19,6 +19,25 @@ raise ImportError(str(e) + " - required module not found") +# CMIS States +CMIS_STATE_UNKNOWN = 'UNKNOWN' +CMIS_STATE_INSERTED = 'INSERTED' +CMIS_STATE_DP_PRE_INIT_CHECK = 'DP_PRE_INIT_CHECK' +CMIS_STATE_DP_DEINIT = 'DP_DEINIT' +CMIS_STATE_AP_CONF = 'AP_CONFIGURED' +CMIS_STATE_DP_ACTIVATE = 'DP_ACTIVATION' +CMIS_STATE_DP_INIT = 'DP_INIT' +CMIS_STATE_DP_TXON = 'DP_TXON' +CMIS_STATE_READY = 'READY' +CMIS_STATE_REMOVED = 'REMOVED' +CMIS_STATE_FAILED = 'FAILED' + +CMIS_TERMINAL_STATES = { + CMIS_STATE_FAILED, + CMIS_STATE_READY, + CMIS_STATE_REMOVED +} + # Global variables that will be injected from the parent module platform_chassis = None platform_sfputil = None @@ -113,6 +132,34 @@ def is_warm_reboot_enabled(): is_warm_start = warmstart.isWarmStart() return is_warm_start +def is_syncd_warm_restore_complete(): + """ + This function determins whether syncd's restore count is not 0, which indicates warm-reboot + to avoid premature config push by xcvrd that caused port flaps. + """ + state_db = daemon_base.db_connect("STATE_DB") + restore_count = state_db.hget("WARM_RESTART_TABLE|syncd", "restore_count") + system_enabled = state_db.hget("WARM_RESTART_ENABLE_TABLE|system", "enable") + try: + # --- Handle restore_count (could be int, str, or None) --- + if restore_count is not None: + if isinstance(restore_count, int): + if restore_count > 0: + return True + elif isinstance(restore_count, str): + if restore_count.strip().isdigit() and int(restore_count.strip()) > 0: + return True + + # --- Handle system_enabled (only care about "true"/"false"/None) --- + if isinstance(system_enabled, str): + if system_enabled.strip().lower() == "true": + return True + + except Exception as e: + helper_logger.log_warning(f"Unexpected value: restore_count={restore_count}, system_enabled={system_enabled}, error={e}") + log_exception_traceback() + return False + # # CMIS Helper Functions ======================================================== #