Skip to content

Tutorials

xwings edited this page Jul 6, 2025 · 2 revisions

Tutorials

Comprehensive step-by-step tutorials for mastering Qiling Framework, from beginner concepts to advanced security research techniques.

Learning Path

🟢 Beginner Level

🟡 Intermediate Level

🔴 Advanced Level

🚀 Expert Level


Hello World Series

Tutorial 1: Your First Qiling Script

Objective: Learn the basics of emulating a simple binary.

Prerequisites: Qiling Framework installed

Example Binary: We'll use a simple "Hello World" program for Linux x86_64.

# tutorial_01_hello_world.py
from qiling import Qiling
from qiling.const import QL_VERBOSE

def basic_hello_world():
    """Basic binary emulation - your first Qiling script."""
    
    # Step 1: Initialize Qiling with binary and rootfs
    ql = Qiling(['examples/rootfs/x8664_linux/bin/x8664_hello'], 
               'examples/rootfs/x8664_linux',
               verbose=QL_VERBOSE.DEFAULT)
    
    # Step 2: Run the binary
    print("[+] Starting emulation...")
    ql.run()
    print("[+] Emulation complete!")

if __name__ == "__main__":
    basic_hello_world()

What happens here:

  1. Qiling() creates an emulation instance
  2. First parameter: Path to the binary to emulate
  3. Second parameter: Root filesystem containing necessary libraries
  4. verbose: Controls output detail level
  5. ql.run(): Starts the emulation

Expected Output:

[+] Starting emulation...
Hello World!
[+] Emulation complete!

Tutorial 2: Cross-Architecture Hello World

Objective: Run the same program on different architectures.

# tutorial_02_cross_arch.py
from qiling import Qiling
from qiling.const import QL_VERBOSE

def cross_architecture_demo():
    """Demonstrate cross-architecture emulation."""
    
    architectures = [
        {
            'name': 'Linux x86_64',
            'binary': 'examples/rootfs/x8664_linux/bin/x8664_hello',
            'rootfs': 'examples/rootfs/x8664_linux'
        },
        {
            'name': 'Linux x86',
            'binary': 'examples/rootfs/x86_linux/bin/x86_hello', 
            'rootfs': 'examples/rootfs/x86_linux'
        },
        {
            'name': 'Linux ARM',
            'binary': 'examples/rootfs/arm_linux/bin/arm_hello',
            'rootfs': 'examples/rootfs/arm_linux'
        },
        {
            'name': 'Windows x86',
            'binary': 'examples/rootfs/x86_windows/bin/x86_hello.exe',
            'rootfs': 'examples/rootfs/x86_windows'
        }
    ]
    
    for arch in architectures:
        print(f"\n[+] Testing {arch['name']}...")
        try:
            ql = Qiling([arch['binary']], arch['rootfs'],
                       verbose=QL_VERBOSE.OFF)  # Quiet mode
            ql.run()
            print(f"    ✓ {arch['name']} executed successfully")
        except Exception as e:
            print(f"    ✗ {arch['name']} failed: {e}")

if __name__ == "__main__":
    cross_architecture_demo()

Key Learning Points:

  • Qiling automatically detects architecture and OS
  • Same API works across different platforms
  • Rootfs must match the target architecture

Basic Binary Analysis

Tutorial 3: Instruction-Level Analysis

Objective: Monitor execution at the instruction level.

# tutorial_03_instruction_analysis.py
from qiling import Qiling
from qiling.const import QL_VERBOSE

def instruction_level_analysis():
    """Monitor and analyze every executed instruction."""
    
    ql = Qiling(['examples/rootfs/x8664_linux/bin/x8664_hello'],
               'examples/rootfs/x8664_linux',
               verbose=QL_VERBOSE.OFF)
    
    # Counters for analysis
    instruction_count = 0
    instruction_types = {}
    
    def count_instructions(ql, address, size):
        """Hook function called for every instruction."""
        nonlocal instruction_count
        instruction_count += 1
        
        # Read the instruction bytes
        try:
            code = ql.mem.read(address, size)
            
            # Simple instruction classification by first byte
            first_byte = code[0]
            instruction_types[first_byte] = instruction_types.get(first_byte, 0) + 1
            
            # Print every 100th instruction
            if instruction_count % 100 == 0:
                print(f"[{instruction_count:05d}] 0x{address:016x}: {code.hex()}")
                
        except Exception as e:
            print(f"Error reading instruction at 0x{address:x}: {e}")
    
    # Set up the hook
    print("[+] Setting up instruction monitoring...")
    ql.hook_code(count_instructions)
    
    # Run the binary
    print("[+] Starting monitored execution...")
    ql.run()
    
    # Print analysis results
    print(f"\n[+] Execution Analysis:")
    print(f"    Total instructions executed: {instruction_count}")
    print(f"    Unique instruction types: {len(instruction_types)}")
    print(f"    Most common instruction bytes:")
    
    # Show top 5 most common instruction types
    sorted_types = sorted(instruction_types.items(), 
                         key=lambda x: x[1], reverse=True)
    for opcode, count in sorted_types[:5]:
        print(f"      0x{opcode:02x}: {count} times ({count/instruction_count*100:.1f}%)")

if __name__ == "__main__":
    instruction_level_analysis()

Learning Points:

  • Hook functions receive parameters about the current execution state
  • You can read memory and examine instructions in real-time
  • Hooks enable powerful analysis capabilities

Tutorial 4: Memory Access Monitoring

Objective: Track memory reads and writes.

# tutorial_04_memory_monitoring.py
from qiling import Qiling
from qiling.const import QL_VERBOSE

def memory_access_monitoring():
    """Monitor all memory access operations."""
    
    ql = Qiling(['examples/rootfs/x8664_linux/bin/x8664_hello'],
               'examples/rootfs/x8664_linux',
               verbose=QL_VERBOSE.OFF)
    
    # Memory access tracking
    memory_reads = []
    memory_writes = []
    
    def track_memory_read(ql, access, address, size, value):
        """Track memory read operations."""
        memory_reads.append({
            'address': address,
            'size': size,
            'pc': ql.arch.regs.rip  # Program counter
        })
        
        # Print interesting reads (every 50th)
        if len(memory_reads) % 50 == 0:
            print(f"[READ {len(memory_reads):04d}] 0x{address:016x} "
                  f"({size} bytes) from PC 0x{ql.arch.regs.rip:016x}")
    
    def track_memory_write(ql, access, address, size, value):
        """Track memory write operations."""
        memory_writes.append({
            'address': address,
            'size': size,
            'value': value,
            'pc': ql.arch.regs.rip
        })
        
        print(f"[WRITE] 0x{address:016x} = 0x{value:x} "
              f"({size} bytes) from PC 0x{ql.arch.regs.rip:016x}")
    
    # Set up memory hooks
    print("[+] Setting up memory monitoring...")
    ql.hook_mem_read(track_memory_read)
    ql.hook_mem_write(track_memory_write)
    
    # Run with monitoring
    print("[+] Starting memory-monitored execution...")
    ql.run()
    
    # Analysis
    print(f"\n[+] Memory Access Analysis:")
    print(f"    Total memory reads: {len(memory_reads)}")
    print(f"    Total memory writes: {len(memory_writes)}")
    
    if memory_writes:
        print(f"    Write locations:")
        for write in memory_writes[-5:]:  # Last 5 writes
            print(f"      0x{write['address']:016x}: 0x{write['value']:x}")

if __name__ == "__main__":
    memory_access_monitoring()

Simple API Hooking

Tutorial 5: Basic API Interception

Objective: Intercept and modify system calls.

# tutorial_05_api_hooking.py
from qiling import Qiling
from qiling.const import QL_VERBOSE

def basic_api_hooking():
    """Learn to intercept and modify API calls."""
    
    ql = Qiling(['examples/rootfs/x8664_linux/bin/x8664_hello'],
               'examples/rootfs/x8664_linux',
               verbose=QL_VERBOSE.DEBUG)
    
    # Track API calls
    api_calls = []
    
    # Custom write function
    def my_write(ql, fd, buf, count):
        """Custom implementation of write() system call."""
        
        # Read the data being written
        try:
            data = ql.mem.read(buf, count)
            content = data.decode('utf-8', errors='ignore')
            
            # Log the API call
            api_calls.append({
                'function': 'write',
                'fd': fd,
                'content': content.strip(),
                'count': count
            })
            
            print(f"[API HOOK] write(fd={fd}, count={count})")
            print(f"[API HOOK] Content: '{content.strip()}'")
            
            # Modify the content
            if "Hello World" in content:
                modified_content = content.replace("Hello World", "Hello Qiling Framework")
                print(f"[API HOOK] Modified to: '{modified_content.strip()}'")
                
                # Write modified content back to buffer
                ql.mem.write(buf, modified_content.encode('utf-8'))
            
        except Exception as e:
            print(f"[API HOOK] Error processing write: {e}")
        
        # Call the original write function
        return ql.os.write(fd, buf, count)
    
    # Custom open function
    def my_open(ql, filename_ptr, flags, mode):
        """Custom implementation of open() system call."""
        
        try:
            filename = ql.os.utils.read_cstring(filename_ptr)
            
            api_calls.append({
                'function': 'open',
                'filename': filename,
                'flags': flags,
                'mode': mode
            })
            
            print(f"[API HOOK] open('{filename}', flags=0x{flags:x}, mode=0x{mode:x})")
            
        except Exception as e:
            print(f"[API HOOK] Error processing open: {e}")
        
        # Call original open
        return ql.os.open(filename_ptr, flags, mode)
    
    # Set up API hooks
    print("[+] Setting up API hooks...")
    ql.set_api("write", my_write)
    ql.set_api("open", my_open)
    
    # Run with API interception
    print("[+] Starting execution with API hooks...")
    ql.run()
    
    # Show API call summary
    print(f"\n[+] API Call Summary:")
    for call in api_calls:
        if call['function'] == 'write':
            print(f"    write(): '{call['content']}'")
        elif call['function'] == 'open':
            print(f"    open(): '{call['filename']}'")

if __name__ == "__main__":
    basic_api_hooking()

Key Concepts:

  • API hooks intercept system calls before they reach the OS
  • You can read, modify, or completely replace API behavior
  • Original functionality can still be called

Reverse Engineering with Qiling

Tutorial 6: Solving a Crackme

Objective: Use Qiling to solve a reverse engineering challenge.

# tutorial_06_crackme_solver.py
from qiling import Qiling
from qiling.const import QL_VERBOSE
import string

def automated_crackme_solver():
    """Automatically solve a crackme using Qiling."""
    
    # This example is based on examples/crackme_x86_linux.py
    
    def brute_force_flag():
        """Brute force the flag character by character."""
        
        # Known flag format (from analysis)
        flag = ""
        charset = string.ascii_letters + string.digits + "{}_"
        
        print("[+] Starting automated crackme solving...")
        
        for position in range(20):  # Assume max 20 character flag
            found = False
            
            for char in charset:
                test_flag = flag + char
                
                # Test this flag
                if test_flag_candidate(test_flag):
                    flag += char
                    print(f"[+] Found character {position + 1}: '{char}' -> '{flag}'")
                    found = True
                    break
            
            if not found:
                print(f"[+] No more characters found. Final flag: '{flag}'")
                break
        
        return flag
    
    def test_flag_candidate(test_flag):
        """Test if a flag candidate is correct."""
        
        # Set up Qiling instance
        ql = Qiling(['examples/rootfs/x8664_linux/bin/crackme'], 
                   'examples/rootfs/x8664_linux',
                   verbose=QL_VERBOSE.OFF)
        
        # Patch the input to our test flag
        input_address = 0x601050  # Address where input is stored
        
        def patch_input(ql):
            """Patch the input buffer with our test flag."""
            ql.mem.write(input_address, test_flag.encode('utf-8') + b'\x00')
        
        # Success indicator
        success = [False]
        
        def check_success(ql, address, size):
            """Check if we hit the success code path."""
            # Look for success message or specific instruction pattern
            if address == 0x400696:  # Success code address
                success[0] = True
                ql.emu_stop()
        
        # Set up hooks
        ql.hook_address(patch_input, 0x400580)  # Input reading function
        ql.hook_address(check_success, 0x400696)  # Success check
        
        try:
            ql.run(timeout=1000000)  # 1 second timeout
            return success[0]
        except:
            return False
    
    # Start brute forcing
    final_flag = brute_force_flag()
    print(f"\n[+] Crackme solved! Flag: '{final_flag}'")
    
    # Verify the solution
    print("[+] Verifying solution...")
    if test_flag_candidate(final_flag):
        print("[+] ✓ Solution verified!")
    else:
        print("[+] ✗ Solution verification failed")

if __name__ == "__main__":
    automated_crackme_solver()

Tutorial 7: Function Hooking and Analysis

Objective: Hook specific functions to understand program behavior.

# tutorial_07_function_hooking.py
from qiling import Qiling
from qiling.const import QL_VERBOSE

def function_hooking_analysis():
    """Advanced function hooking for binary analysis."""
    
    ql = Qiling(['examples/rootfs/x8664_linux/bin/x8664_hello'],
               'examples/rootfs/x8664_linux',
               verbose=QL_VERBOSE.OFF)
    
    # Function call tracking
    call_stack = []
    function_stats = {}
    
    def hook_function_entry(ql):
        """Hook function entry points."""
        
        pc = ql.arch.regs.rip
        
        # Simple function detection (CALL instruction followed by function prologue)
        try:
            # Check for function prologue patterns
            code = ql.mem.read(pc, 8)
            
            # Common x86_64 function prologue: push rbp; mov rbp, rsp
            if code[:3] == b'\x55\x48\x89' or code[:2] == b'\x55\x48':
                
                # Get return address from stack
                return_addr = ql.mem.read_ptr(ql.arch.regs.rsp)
                
                call_stack.append({
                    'function_addr': pc,
                    'return_addr': return_addr,
                    'stack_depth': len(call_stack)
                })
                
                # Update statistics
                if pc not in function_stats:
                    function_stats[pc] = {
                        'call_count': 0,
                        'name': f"func_0x{pc:x}"
                    }
                
                function_stats[pc]['call_count'] += 1
                
                print(f"[FUNC] Entering function 0x{pc:x} "
                      f"(depth: {len(call_stack)}, "
                      f"calls: {function_stats[pc]['call_count']})")
                
        except Exception as e:
            pass  # Ignore errors in analysis
    
    def hook_function_return(ql):
        """Hook function returns."""
        
        pc = ql.arch.regs.rip
        
        # Check for return instruction
        try:
            code = ql.mem.read(pc, 1)
            if code[0] == 0xc3:  # RET instruction
                
                if call_stack:
                    func_info = call_stack.pop()
                    print(f"[FUNC] Returning from function 0x{func_info['function_addr']:x} "
                          f"to 0x{func_info['return_addr']:x} "
                          f"(depth: {len(call_stack)})")
                
        except Exception:
            pass
    
    # Set up comprehensive function tracking
    print("[+] Setting up function call tracking...")
    ql.hook_code(hook_function_entry)
    ql.hook_code(hook_function_return)
    
    # Run analysis
    print("[+] Starting function analysis...")
    ql.run()
    
    # Generate function analysis report
    print(f"\n[+] Function Analysis Report:")
    print(f"    Maximum call stack depth: {max([len(call_stack)] + [0])}")
    print(f"    Unique functions called: {len(function_stats)}")
    
    # Show most frequently called functions
    sorted_funcs = sorted(function_stats.items(), 
                         key=lambda x: x[1]['call_count'], reverse=True)
    
    print(f"    Most frequently called functions:")
    for addr, stats in sorted_funcs[:5]:
        print(f"      0x{addr:x}: {stats['call_count']} calls")

if __name__ == "__main__":
    function_hooking_analysis()

Malware Analysis Techniques

Tutorial 8: Basic Malware Behavior Analysis

Objective: Analyze malware behavior using Qiling.

# tutorial_08_malware_analysis.py
from qiling import Qiling
from qiling.const import QL_VERBOSE
import json
import time

def malware_behavior_analysis():
    """Analyze malware behavior patterns."""
    
    # This example analyzes a Windows malware sample
    # Note: Replace with actual malware path and appropriate rootfs
    
    sample_path = "examples/rootfs/x86_windows/bin/malware_sample.exe"
    rootfs_path = "examples/rootfs/x86_windows"
    
    # Behavioral indicators
    behavior_indicators = {
        'file_operations': [],
        'registry_operations': [],
        'network_operations': [],
        'process_operations': [],
        'suspicious_activities': []
    }
    
    def setup_behavioral_monitoring(ql):
        """Set up comprehensive behavioral monitoring."""
        
        # File system monitoring
        def hook_createfile(ql, lpFileName, dwDesiredAccess, dwShareMode, 
                           lpSecurityAttributes, dwCreationDisposition, 
                           dwFlagsAndAttributes, hTemplateFile):
            
            try:
                filename = ql.os.utils.read_wstring(lpFileName)
                
                behavior_indicators['file_operations'].append({
                    'operation': 'CreateFile',
                    'filename': filename,
                    'access': dwDesiredAccess,
                    'disposition': dwCreationDisposition,
                    'timestamp': time.time()
                })
                
                print(f"[FILE] CreateFile: {filename}")
                
                # Check for suspicious file operations
                suspicious_paths = ['system32', 'startup', 'temp', 'programdata']
                if any(path in filename.lower() for path in suspicious_paths):
                    behavior_indicators['suspicious_activities'].append({
                        'type': 'suspicious_file_access',
                        'details': f"Access to {filename}"
                    })
                
            except Exception as e:
                print(f"Error in CreateFile hook: {e}")
            
            return None  # Call original
        
        # Registry monitoring
        def hook_regcreatekey(ql, hKey, lpSubKey, phkResult):
            try:
                subkey = ql.os.utils.read_wstring(lpSubKey)
                
                behavior_indicators['registry_operations'].append({
                    'operation': 'RegCreateKey',
                    'key': subkey,
                    'timestamp': time.time()
                })
                
                print(f"[REGISTRY] RegCreateKey: {subkey}")
                
                # Check for persistence mechanisms
                persistence_keys = ['run', 'runonce', 'startup', 'winlogon']
                if any(key in subkey.lower() for key in persistence_keys):
                    behavior_indicators['suspicious_activities'].append({
                        'type': 'persistence_mechanism',
                        'details': f"Registry key: {subkey}"
                    })
                
            except Exception as e:
                print(f"Error in RegCreateKey hook: {e}")
            
            return None
        
        # Network monitoring
        def hook_internetopen(ql, lpszAgent, dwAccessType, lpszProxy, 
                             lpszProxyBypass, dwFlags):
            try:
                user_agent = ql.os.utils.read_wstring(lpszAgent) if lpszAgent else "Unknown"
                
                behavior_indicators['network_operations'].append({
                    'operation': 'InternetOpen',
                    'user_agent': user_agent,
                    'timestamp': time.time()
                })
                
                print(f"[NETWORK] InternetOpen: {user_agent}")
                
            except Exception as e:
                print(f"Error in InternetOpen hook: {e}")
            
            return None
        
        # Process monitoring
        def hook_createprocess(ql, lpApplicationName, lpCommandLine, 
                              lpProcessAttributes, lpThreadAttributes,
                              bInheritHandles, dwCreationFlags, lpEnvironment,
                              lpCurrentDirectory, lpStartupInfo, lpProcessInformation):
            try:
                app_name = ql.os.utils.read_wstring(lpApplicationName) if lpApplicationName else "Unknown"
                cmd_line = ql.os.utils.read_wstring(lpCommandLine) if lpCommandLine else "Unknown"
                
                behavior_indicators['process_operations'].append({
                    'operation': 'CreateProcess',
                    'application': app_name,
                    'command_line': cmd_line,
                    'creation_flags': dwCreationFlags,
                    'timestamp': time.time()
                })
                
                print(f"[PROCESS] CreateProcess: {app_name}")
                print(f"[PROCESS] Command line: {cmd_line}")
                
                # Check for process injection indicators
                if dwCreationFlags & 0x4:  # CREATE_SUSPENDED
                    behavior_indicators['suspicious_activities'].append({
                        'type': 'process_injection_indicator',
                        'details': f"Process created suspended: {app_name}"
                    })
                
            except Exception as e:
                print(f"Error in CreateProcess hook: {e}")
            
            return None
        
        # Set up all hooks
        ql.set_api("CreateFileW", hook_createfile)
        ql.set_api("RegCreateKeyW", hook_regcreatekey)
        ql.set_api("InternetOpenW", hook_internetopen)
        ql.set_api("CreateProcessW", hook_createprocess)
        
        print("[+] Behavioral monitoring hooks installed")
    
    def generate_behavior_report():
        """Generate comprehensive behavior report."""
        
        report = {
            'analysis_timestamp': time.time(),
            'sample_path': sample_path,
            'behavioral_indicators': behavior_indicators,
            'threat_assessment': {
                'persistence_mechanisms': len([a for a in behavior_indicators['suspicious_activities'] 
                                             if a['type'] == 'persistence_mechanism']),
                'suspicious_file_access': len([a for a in behavior_indicators['suspicious_activities'] 
                                             if a['type'] == 'suspicious_file_access']),
                'process_injection_indicators': len([a for a in behavior_indicators['suspicious_activities'] 
                                                   if a['type'] == 'process_injection_indicator']),
                'network_connections': len(behavior_indicators['network_operations'])
            }
        }
        
        return report
    
    # Main analysis
    try:
        print(f"[+] Starting malware analysis: {sample_path}")
        
        ql = Qiling([sample_path], rootfs_path, 
                   verbose=QL_VERBOSE.DEBUG)
        
        # Set up monitoring
        setup_behavioral_monitoring(ql)
        
        # Run with timeout
        print("[+] Starting monitored execution...")
        ql.run(timeout=30000000)  # 30 second timeout
        
    except Exception as e:
        print(f"[!] Analysis completed with exception: {e}")
    
    # Generate and save report
    report = generate_behavior_report()
    
    print(f"\n[+] Analysis Complete!")
    print(f"    File operations: {len(behavior_indicators['file_operations'])}")
    print(f"    Registry operations: {len(behavior_indicators['registry_operations'])}")
    print(f"    Network operations: {len(behavior_indicators['network_operations'])}")
    print(f"    Process operations: {len(behavior_indicators['process_operations'])}")
    print(f"    Suspicious activities: {len(behavior_indicators['suspicious_activities'])}")
    
    # Save detailed report
    with open("malware_analysis_report.json", "w") as f:
        json.dump(report, f, indent=2, default=str)
    
    print(f"[+] Detailed report saved to: malware_analysis_report.json")

if __name__ == "__main__":
    malware_behavior_analysis()

IoT Firmware Analysis

Tutorial 9: Router Firmware Emulation

Objective: Analyze IoT firmware using Qiling.

# tutorial_09_iot_firmware.py
from qiling import Qiling
from qiling.const import QL_VERBOSE
import socket
import threading

def iot_firmware_analysis():
    """Analyze IoT router firmware."""
    
    # Based on examples/netgear_6220.py
    firmware_path = "examples/rootfs/netgear_r6220/bin/mini_httpd"
    rootfs_path = "examples/rootfs/netgear_r6220"
    
    # Network simulation
    network_connections = []
    http_requests = []
    
    def setup_network_environment(ql):
        """Set up realistic network environment for firmware."""
        
        # Environment variables for network configuration
        ql.env["REMOTE_ADDR"] = "192.168.1.100"
        ql.env["HTTP_HOST"] = "192.168.1.1"
        ql.env["REQUEST_METHOD"] = "GET"
        ql.env["REQUEST_URI"] = "/cgi-bin/status.cgi"
        ql.env["QUERY_STRING"] = "action=get_status"
        
        # Simulate network interface
        def hook_socket(ql, domain, type, protocol):
            print(f"[NETWORK] Socket created: domain={domain}, type={type}")
            network_connections.append({
                'domain': domain,
                'type': type,
                'protocol': protocol
            })
            return len(network_connections)  # Return socket descriptor
        
        def hook_bind(ql, sockfd, addr_ptr, addrlen):
            print(f"[NETWORK] Bind called on socket {sockfd}")
            return 0  # Success
        
        def hook_listen(ql, sockfd, backlog):
            print(f"[NETWORK] Listen on socket {sockfd}, backlog={backlog}")
            return 0  # Success
        
        def hook_accept(ql, sockfd, addr_ptr, addrlen_ptr):
            print(f"[NETWORK] Accept on socket {sockfd}")
            return sockfd + 100  # Return new connection descriptor
        
        # HTTP request simulation
        def hook_recv(ql, sockfd, buf, len, flags):
            """Simulate receiving HTTP request."""
            
            http_request = (
                b"GET /cgi-bin/status.cgi?action=get_status HTTP/1.1\r\n"
                b"Host: 192.168.1.1\r\n"
                b"User-Agent: Mozilla/5.0\r\n"
                b"Accept: text/html\r\n"
                b"\r\n"
            )
            
            if len >= len(http_request):
                ql.mem.write(buf, http_request)
                
                http_requests.append({
                    'method': 'GET',
                    'uri': '/cgi-bin/status.cgi',
                    'query': 'action=get_status'
                })
                
                print(f"[HTTP] Simulated HTTP request: GET /cgi-bin/status.cgi")
                return len(http_request)
            
            return 0
        
        def hook_send(ql, sockfd, buf, len, flags):
            """Capture HTTP response."""
            try:
                response = ql.mem.read(buf, len)
                response_str = response.decode('utf-8', errors='ignore')
                
                print(f"[HTTP] Response sent ({len} bytes):")
                print(f"[HTTP] {response_str[:200]}...")  # First 200 chars
                
            except Exception as e:
                print(f"[HTTP] Error reading response: {e}")
            
            return len  # Pretend all data sent
        
        # Set up network hooks
        ql.set_api("socket", hook_socket)
        ql.set_api("bind", hook_bind)
        ql.set_api("listen", hook_listen)
        ql.set_api("accept", hook_accept)
        ql.set_api("recv", hook_recv)
        ql.set_api("send", hook_send)
        
        print("[+] Network environment configured")
    
    def setup_firmware_environment(ql):
        """Set up firmware-specific environment."""
        
        # Common IoT firmware environment variables
        ql.env["PATH"] = "/bin:/sbin:/usr/bin:/usr/sbin"
        ql.env["HOME"] = "/tmp"
        ql.env["SHELL"] = "/bin/sh"
        ql.env["USER"] = "admin"
        
        # Router-specific variables
        ql.env["MODEL"] = "R6220"
        ql.env["FIRMWARE_VERSION"] = "1.1.0.86"
        ql.env["LAN_IP"] = "192.168.1.1"
        ql.env["LAN_NETMASK"] = "255.255.255.0"
        
        # File system mappings for common router paths
        ql.add_fs_mapper("/proc/version", "/dev/null")
        ql.add_fs_mapper("/proc/cpuinfo", "/dev/null")
        ql.add_fs_mapper("/etc/passwd", "/dev/null")
        
        print("[+] Firmware environment configured")
    
    def analyze_firmware_behavior():
        """Analyze firmware behavior and vulnerabilities."""
        
        vulnerabilities = []
        
        # Check for command injection vulnerabilities
        for request in http_requests:
            if 'query' in request and ';' in request['query']:
                vulnerabilities.append({
                    'type': 'potential_command_injection',
                    'location': request['uri'],
                    'details': 'Semicolon in query parameter'
                })
        
        # Check for buffer overflow indicators
        # (This would need more sophisticated analysis in practice)
        
        return vulnerabilities
    
    # Main firmware analysis
    print(f"[+] Starting IoT firmware analysis: {firmware_path}")
    
    try:
        # Command line arguments for the firmware
        argv = [
            firmware_path,
            "-d", "/www",           # Document root
            "-r", "NETGEAR R6220",  # Server name
            "-c", "**.cgi",         # CGI pattern
            "-t", "300"             # Timeout
        ]
        
        ql = Qiling(argv, rootfs_path, 
                   verbose=QL_VERBOSE.DEBUG)
        
        # Set up environments
        setup_firmware_environment(ql)
        setup_network_environment(ql)
        
        # Run firmware
        print("[+] Starting firmware emulation...")
        ql.run(timeout=30000000)  # 30 second timeout
        
    except Exception as e:
        print(f"[!] Firmware analysis completed: {e}")
    
    # Analyze results
    vulnerabilities = analyze_firmware_behavior()
    
    print(f"\n[+] Firmware Analysis Results:")
    print(f"    Network connections: {len(network_connections)}")
    print(f"    HTTP requests processed: {len(http_requests)}")
    print(f"    Potential vulnerabilities: {len(vulnerabilities)}")
    
    for vuln in vulnerabilities:
        print(f"      {vuln['type']}: {vuln['details']}")

if __name__ == "__main__":
    iot_firmware_analysis()

These tutorials provide a comprehensive learning path from basic concepts to advanced security research techniques. Each tutorial builds upon previous knowledge while introducing new concepts and real-world applications.

The tutorials are designed to be:

  • Progressive: Start simple and build complexity
  • Practical: Based on real examples from the Qiling repository
  • Educational: Include detailed explanations and comments
  • Actionable: Provide working code that can be run and modified

Would you like me to continue with more advanced tutorials covering fuzzing, hardware emulation, or custom extension development?

Clone this wiki locally