Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 246 additions & 1 deletion src/azure-cli/azure/cli/command_modules/acs/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -2147,10 +2147,19 @@ def k8s_install_kubectl(cmd, client_version='latest', install_location=None, sou
"""

if not source_url:
source_url = "https://dl.k8s.io/release"
cloud_name = cmd.cli_ctx.cloud.name
if cloud_name.lower() == 'azurechinacloud':
source_url = 'https://mirror.azure.cn/kubernetes/kubectl'
else:
# Only try Microsoft packages for Linux systems (packages are Linux-only)
system = platform.system()
if system == 'Linux':
try:
return _k8s_install_kubectl_from_microsoft_packages(cmd, client_version, install_location, arch)
except Exception as e: # pylint: disable=broad-except
logger.warning("Failed to install from Microsoft packages, falling back to Google Storage: %s", str(e))
# For non-Linux systems or fallback, use Google Storage
source_url = "https://storage.googleapis.com/kubernetes-release/release"

if client_version == 'latest':
latest_version_url = source_url + '/stable.txt'
Expand Down Expand Up @@ -2205,6 +2214,242 @@ def k8s_install_kubectl(cmd, client_version='latest', install_location=None, sou
install_dir, cli)


def _k8s_install_kubectl_from_microsoft_packages(cmd, client_version='latest', install_location=None, arch=None):
"""
Install kubectl from Microsoft packages repository by downloading and extracting .deb package.
Note: This method is only supported on Linux systems as Microsoft packages contain Linux binaries.
"""
system = platform.system()
if system != 'Linux':
raise CLIError(f"Microsoft packages method is only supported on Linux (current system: {system}). Use '--source-url' to specify an alternative source.")

if arch is None:
arch = get_arch_for_cli_binary()

# Map architecture to package architecture
if arch == 'amd64':
pkg_arch = 'amd64'
elif arch == 'arm64':
pkg_arch = 'arm64'
else:
raise CLIError(f"Unsupported architecture '{arch}' for Microsoft packages")

# Get available kubectl packages from Microsoft
packages_url = f"https://packages.microsoft.com/ubuntu/22.04/prod/dists/jammy/main/binary-{pkg_arch}/Packages.gz"

logger.warning('Fetching kubectl package information from Microsoft packages repository...')
try:
packages_data = _urlopen_read(packages_url)
import gzip
packages_text = gzip.decompress(packages_data).decode('utf-8')
except Exception as e:
raise CLIError(f'Failed to fetch package information from Microsoft: {e}')

# Parse packages and find kubectl
kubectl_packages = []
current_package = {}

for line in packages_text.split('\n'):
if line.startswith('Package: '):
if current_package.get('Package') == 'kubectl':
kubectl_packages.append(current_package)
current_package = {'Package': line.split(': ', 1)[1]}
elif line.startswith('Version: '):
current_package['Version'] = line.split(': ', 1)[1]
elif line.startswith('Filename: '):
current_package['Filename'] = line.split(': ', 1)[1]
elif line.startswith('SHA256: '):
current_package['SHA256'] = line.split(': ', 1)[1]
elif line == '':
if current_package.get('Package') == 'kubectl':
kubectl_packages.append(current_package)
current_package = {}

if not kubectl_packages:
raise CLIError('No kubectl packages found in Microsoft repository')

# Select package version
if client_version == 'latest':
# Sort by version and get latest (simple string sort should work for semver)
kubectl_packages.sort(key=lambda x: x.get('Version', ''), reverse=True)
selected_package = kubectl_packages[0]
logger.warning('Using latest kubectl version from Microsoft packages: %s', selected_package['Version'])
else:
# Find specific version
version_to_find = client_version.lstrip('v') # Remove 'v' prefix if present
selected_package = None
for pkg in kubectl_packages:
if pkg.get('Version', '').startswith(version_to_find):
selected_package = pkg
break
if not selected_package:
raise CLIError(f'kubectl version {client_version} not found in Microsoft packages')

# Download the .deb package
base_url = "https://packages.microsoft.com/ubuntu/22.04/prod/"
package_url = base_url + selected_package['Filename']

logger.warning('Downloading kubectl package from Microsoft: %s', package_url)

# Create temporary directory for package extraction
with tempfile.TemporaryDirectory() as temp_dir:
deb_path = os.path.join(temp_dir, 'kubectl.deb')

try:
_urlretrieve(package_url, deb_path)
except Exception as e:
raise CLIError(f'Failed to download kubectl package: {e}')

# Extract .deb package using ar and tar
try:
_extract_kubectl_from_deb(deb_path, temp_dir, install_location, system)
except Exception as e:
raise CLIError(f'Failed to extract kubectl from package: {e}')

# Handle post-installation
install_dir, cli = os.path.dirname(install_location), os.path.basename(install_location)
if system == 'Windows':
handle_windows_post_install(install_dir, cli)
else:
logger.warning('Please ensure that %s is in your search PATH, so the `%s` command can be found.',
install_dir, cli)

logger.warning('Successfully installed kubectl from Microsoft packages')


def _extract_kubectl_from_deb(deb_path, temp_dir, install_location, system):
"""
Extract kubectl binary from .deb package.
"""
# Ensure installation directory exists
install_dir = os.path.dirname(install_location)
if not os.path.exists(install_dir):
os.makedirs(install_dir)

# Determine binary name
if system == 'Windows':
binary_name = 'kubectl.exe'
else:
binary_name = 'kubectl'

# validate install location
validate_install_location(install_location, binary_name)

# Extract .deb using ar command or dpkg-deb if available
try:
# Try using dpkg-deb first (handles all compression formats)
if shutil.which('dpkg-deb'):
subprocess.run(['dpkg-deb', '-x', deb_path, temp_dir], check=True, capture_output=True)
elif shutil.which('ar'):
# Extract using ar command
subprocess.run(['ar', 'x', deb_path], cwd=temp_dir, check=True, capture_output=True)

# Find and extract the data archive
data_files = [f for f in os.listdir(temp_dir) if f.startswith('data.tar')]
if not data_files:
raise CLIError('No data archive found in .deb package')

data_file = os.path.join(temp_dir, data_files[0])

# Handle different compression formats
if data_files[0].endswith('.xz'):
import lzma
with lzma.open(data_file, 'rb') as f_in:
with open(os.path.join(temp_dir, 'data.tar'), 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
data_file = os.path.join(temp_dir, 'data.tar')
elif data_files[0].endswith('.gz'):
import gzip
with gzip.open(data_file, 'rb') as f_in:
with open(os.path.join(temp_dir, 'data.tar'), 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
data_file = os.path.join(temp_dir, 'data.tar')
elif data_files[0].endswith('.zst'):
# Try using zstd command if available
if shutil.which('zstd'):
uncompressed_file = os.path.join(temp_dir, 'data.tar')
subprocess.run(['zstd', '-d', data_file, '-o', uncompressed_file], check=True, capture_output=True)
data_file = uncompressed_file
else:
logger.warning('zstd compression detected but zstd command not found. Please install zstd: brew install zstd (macOS) or apt install zstd (Ubuntu)')
raise CLIError('zstd compression not supported (zstd command not found)')

# Extract tar archive
import tarfile
with tarfile.open(data_file, 'r') as tar:
tar.extractall(temp_dir)
else:
# Fallback: Use Python to extract .deb (it's an ar archive)
_extract_ar_archive(deb_path, temp_dir)

# This method is complex for newer compression formats, so we'll try a simpler approach
raise CLIError('No suitable extraction tool found (dpkg-deb, ar). Please install dpkg-deb or ar.')

# Find kubectl binary in extracted files
kubectl_path = None
for root, _, files in os.walk(temp_dir):
for file in files:
if file == 'kubectl' and os.access(os.path.join(root, file), os.X_OK):
kubectl_path = os.path.join(root, file)
break
if kubectl_path:
break

if not kubectl_path:
raise CLIError('kubectl binary not found in extracted package')

# Copy to final location
shutil.copy2(kubectl_path, install_location)

# Set executable permissions
os.chmod(install_location,
os.stat(install_location).st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)

except subprocess.CalledProcessError as e:
stderr = e.stderr.decode('utf-8') if e.stderr else str(e)
raise CLIError(f'Failed to extract .deb package: {stderr}')
except Exception as e:
raise CLIError(f'Error during package extraction: {e}')


def _extract_ar_archive(ar_path, extract_dir):
"""
Simple Python implementation to extract .deb (ar archive) files.
"""
with open(ar_path, 'rb') as f:
# Check for ar archive signature
signature = f.read(8)
if signature != b'!<arch>\n':
raise CLIError('Invalid .deb file format')

while True:
# Read file header (60 bytes)
header = f.read(60)
if len(header) < 60:
break

# Parse header
filename = header[0:16].decode('ascii').strip()
size = int(header[48:58].decode('ascii').strip())

# Skip debian-binary
if filename == 'debian-binary':
f.read(size)
if size % 2: # ar archives are padded to even boundaries
f.read(1)
continue

# Read file content
content = f.read(size)
if size % 2: # ar archives are padded to even boundaries
f.read(1)

# Write to extract directory
output_path = os.path.join(extract_dir, filename.rstrip('/'))
with open(output_path, 'wb') as out_f:
out_f.write(content)


# install kubelogin
def k8s_install_kubelogin(cmd, client_version='latest', install_location=None, source_url=None, arch=None):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,17 +611,56 @@ def test_update_addons(self, rg_def, get_resource_groups_client, get_resources_c

@mock.patch('azure.cli.command_modules.acs.custom._urlretrieve')
@mock.patch('azure.cli.command_modules.acs.custom.logger')
def test_k8s_install_kubectl_emit_warnings(self, logger_mock, mock_url_retrieve):
@mock.patch('azure.cli.command_modules.acs.custom.platform.system')
def test_k8s_install_kubectl_emit_warnings(self, mock_platform_system, logger_mock, mock_url_retrieve):
mock_url_retrieve.side_effect = lambda _, install_location: open(install_location, 'a').close()

# Test for non-Linux systems (macOS) - uses Google Storage directly
mock_platform_system.return_value = 'Darwin'
try:
temp_dir = tempfile.mkdtemp() # tempfile.TemporaryDirectory() is no available on 2.7
test_location = os.path.join(temp_dir, 'kubectl')
k8s_install_kubectl(mock.MagicMock(), client_version='1.2.3', install_location=test_location)
self.assertEqual(mock_url_retrieve.call_count, 1)
# 3 warnings, 1st for arch, 2nd for download result, 3rd for updating PATH
self.assertEqual(logger_mock.warning.call_count, 3) # 3 warnings, one for download result
# 3 warnings for non-Linux: 1st for arch, 2nd for download result, 3rd for updating PATH
self.assertEqual(logger_mock.warning.call_count, 3)
finally:
shutil.rmtree(temp_dir)

# Reset mocks for Windows test
logger_mock.reset_mock()
mock_url_retrieve.reset_mock()

# Test for Windows systems - uses Google Storage directly (same as macOS)
mock_platform_system.return_value = 'Windows'
try:
temp_dir = tempfile.mkdtemp()
test_location = os.path.join(temp_dir, 'kubectl')
k8s_install_kubectl(mock.MagicMock(), client_version='1.2.3', install_location=test_location)
self.assertEqual(mock_url_retrieve.call_count, 1)
# 3 warnings for Windows: 1st for arch, 2nd for download result, 3rd for updating PATH
self.assertEqual(logger_mock.warning.call_count, 3)
finally:
shutil.rmtree(temp_dir)

# Reset mocks for Linux test
logger_mock.reset_mock()
mock_url_retrieve.reset_mock()

# Test for Linux systems - tries Microsoft packages first, but will fail in test env and fallback
mock_platform_system.return_value = 'Linux'
with mock.patch('azure.cli.command_modules.acs.custom._k8s_install_kubectl_from_microsoft_packages') as mock_ms_packages:
# Make Microsoft packages fail to test fallback behavior
mock_ms_packages.side_effect = Exception("Microsoft packages not available in test")
try:
temp_dir = tempfile.mkdtemp()
test_location = os.path.join(temp_dir, 'kubectl')
k8s_install_kubectl(mock.MagicMock(), client_version='1.2.3', install_location=test_location)
self.assertEqual(mock_url_retrieve.call_count, 1)
# 4 warnings for Linux with fallback: 1st for fallback warning, 2nd for arch, 3rd for download, 4th for PATH
self.assertEqual(logger_mock.warning.call_count, 4)
finally:
shutil.rmtree(temp_dir)

@mock.patch('azure.cli.command_modules.acs.custom._urlretrieve')
@mock.patch('azure.cli.command_modules.acs.custom.logger')
Expand Down