From 10bf89832a03708a1c579478ebaa08ac56386996 Mon Sep 17 00:00:00 2001 From: fedeocire Date: Tue, 12 Dec 2023 19:40:49 +0100 Subject: [PATCH 1/2] Artnet Controller Add Artnet Controller --- .../controllers/_ArtnetDMXController.py | 255 ++++++++++++++++++ PyDMXControl/controllers/__init__.py | 51 ++-- 2 files changed, 281 insertions(+), 25 deletions(-) create mode 100644 PyDMXControl/controllers/_ArtnetDMXController.py diff --git a/PyDMXControl/controllers/_ArtnetDMXController.py b/PyDMXControl/controllers/_ArtnetDMXController.py new file mode 100644 index 0000000..afa1ce3 --- /dev/null +++ b/PyDMXControl/controllers/_ArtnetDMXController.py @@ -0,0 +1,255 @@ +""" + * PyDMXControl: A Python 3 module to control DMX using OpenDMX or uDMX. + * Featuring fixture profiles, built-in effects and a web control panel. + * + * Copyright (C) 2022 Matt Cowley (MattIPv4) (me@mattcowley.co.uk) +""" +import socket +from typing import List + +from ._TransmittingController import TransmittingController + + +class ArtnetDMXController(TransmittingController): + + UDP_PORT = 6454 + + def __init__(self, *args, **kwargs): + # Device information + self.__target_ip = kwargs.pop("target_ip", "127.0.0.1") + self.__universe = kwargs.pop("universe", 0) + self.__subnet = 0 + self.__net = 0 + self.__sequence = 0 + self.__make_even = kwargs.pop("even_packet_size", True) + self.__packet_size = self.put_in_range(kwargs.pop("packet_size", 512), 2, 512, self.__make_even) + self.__packet_header = bytearray() + self.__buffer = bytearray(self.__packet_size) + self.__broadcast=kwargs.pop("broadcast", False) + + + + self.__is_simplified = True # simplify use of universe, net and subnet + + # UDP SOCKET + self.__socket_client = None + + # Create the parent controller + super().__init__(*args, **kwargs) + + def make_header(self): + """Make packet header.""" + # 0 - id (7 x bytes + Null) + self.__packet_header = bytearray() + self.__packet_header.extend(bytearray('Art-Net', 'utf8')) + self.__packet_header.append(0x0) + # 8 - opcode (2 x 8 low byte first) + self.__packet_header.append(0x00) + self.__packet_header.append(0x50) # ArtDmx data packet + # 10 - prototocol version (2 x 8 high byte first) + self.__packet_header.append(0x0) + self.__packet_header.append(14) + # 12 - sequence (int 8), NULL for not implemented + self.__packet_header.append(self.__sequence) + # 13 - physical port (int 8) + self.__packet_header.append(0x00) + # 14 - universe, (2 x 8 low byte first) + if self.__is_simplified: + # not quite correct but good enough for most cases: + # the whole net subnet is simplified + # by transforming a single uint16 into its 8 bit parts + # you will most likely not see any differences in small networks + msb, lsb = self.shift_this(self.__universe) # convert to MSB / LSB + self.__packet_header.append(lsb) + self.__packet_header.append(msb) + # 14 - universe, subnet (2 x 4 bits each) + # 15 - net (7 bit value) + else: + # as specified in Artnet 4 (remember to set the value manually after): + # Bit 3 - 0 = Universe (1-16) + # Bit 7 - 4 = Subnet (1-16) + # Bit 14 - 8 = Net (1-128) + # Bit 15 = 0 + # this means 16 * 16 * 128 = 32768 universes per port + # a subnet is a group of 16 Universes + # 16 subnets will make a net, there are 128 of them + self.__packet_header.append(self.__subnet << 4 | self.__universe) + self.__packet_header.append(self.__net & 0xFF) + # 16 - packet size (2 x 8 high byte first) + msb, lsb = self.shift_this(self.__packet_size) # convert to MSB / LSB + self.__packet_header.append(msb) + self.__packet_header.append(lsb) + + def _connect(self): + # Try to close if exists + if self.__socket_client is not None: + try: + self._close() + except Exception: + pass + + # Get new device + self.__socket_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + + if self.__broadcast: + self.__socket_client.setsockopt( + socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + + self.make_header() + + def _close(self): + self.__socket_client.close() + print("CLOSE: ArtnetDMX closed") + + def _transmit(self, frame: List[int], first: int): + # Convert to a bytearray and pad the start of the frame + # We're transmitting direct DMX data here, so a frame must start at channel 1, but can end early + + self.__buffer=bytearray(([0] * (first - 1)) + frame) + # Write + packet = bytearray() + packet.extend(self.__packet_header) + packet.extend(self.__buffer) + try: + self.__socket_client.sendto(packet, (self.__target_ip, self.UDP_PORT)) + except socket.error as error: + print(f"ERROR: Socket error with exception: {error}") + + """Provides common functions byte objects.""" + + def set_universe(self, universe): + """Setter for universe (0 - 15 / 256). + + Mind if protocol has been simplified + """ + # This is ugly, trying to keep interface easy + # With simplified mode the universe will be split into two + # values, (uni and sub) which is correct anyway. Net will always be 0 + if self.is_simplified: + self.universe = self.put_in_range(universe, 0, 255, False) + else: + self.universe = self.put_in_range(universe, 0, 15, False) + self.make_header() + + def set_subnet(self, sub): + """Setter for subnet address (0 - 15). + + Set simplify to false to use + """ + self.subnet = self.put_in_range(sub, 0, 15, False) + self.make_header() + + def set_net(self, net): + """Setter for net address (0 - 127). + + Set simplify to false to use + """ + self.net = self.put_in_range(net, 0, 127, False) + self.make_header() + + def set_packet_size(self, packet_size): + """Setter for packet size (2 - 512, even only).""" + self.packet_size = self.put_in_range(packet_size, 2, 512, self.make_even) + self.make_header() + + def shift_this(number, high_first=True): + """Utility method: extracts MSB and LSB from number. + + Args: + number - number to shift + high_first - MSB or LSB first (true / false) + + Returns: + (high, low) - tuple with shifted values + + """ + low = (number & 0xFF) + high = ((number >> 8) & 0xFF) + if high_first: + return((high, low)) + return((low, high)) + + + def clamp(number, min_val, max_val): + """Utility method: sets number in defined range. + + Args: + number - number to use + range_min - lowest possible number + range_max - highest possible number + + Returns: + number - number in correct range + """ + return max(min_val, min(number, max_val)) + + + def set_even(number): + """Utility method: ensures number is even by adding. + + Args: + number - number to make even + + Returns: + number - even number + """ + if number % 2 != 0: + number += 1 + return number + + + def put_in_range(self,number, range_min, range_max, make_even=True): + """Utility method: sets number in defined range. + DEPRECATED: this will be removed from the library + + Args: + number - number to use + range_min - lowest possible number + range_max - highest possible number + make_even - should number be made even + + Returns: + number - number in correct range + + """ + number = self.clamp(number, range_min, range_max) + if make_even: + number = self.set_even(number) + return number + + + def make_address_mask(self,universe, sub=0, net=0, is_simplified=True): + """Returns the address bytes for a given universe, subnet and net. + + Args: + universe - Universe to listen + sub - Subnet to listen + net - Net to listen + is_simplified - Whether to use nets and subnet or universe only, + see User Guide page 5 (Universe Addressing) + + Returns: + bytes - byte mask for given address + + """ + address_mask = bytearray() + + if is_simplified: + # Ensure data is in right range + universe = self.clamp(universe, 0, 32767) + + # Make mask + msb, lsb = self.shift_this(universe) # convert to MSB / LSB + address_mask.append(lsb) + address_mask.append(msb) + else: + # Ensure data is in right range + universe = self.clamp(universe, 0, 15) + sub = self.clamp(sub, 0, 15) + net = self.clamp(net, 0, 127) + + # Make mask + address_mask.append(sub << 4 | universe) + address_mask.append(net & 0xFF) + + return address_mask diff --git a/PyDMXControl/controllers/__init__.py b/PyDMXControl/controllers/__init__.py index 9c4a312..14fb88c 100644 --- a/PyDMXControl/controllers/__init__.py +++ b/PyDMXControl/controllers/__init__.py @@ -1,25 +1,26 @@ -""" - * PyDMXControl: A Python 3 module to control DMX using OpenDMX or uDMX. - * Featuring fixture profiles, built-in effects and a web control panel. - * - * Copyright (C) 2022 Matt Cowley (MattIPv4) (me@mattcowley.co.uk) - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published - * by the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * You should have received a copy of the GNU General Public License - * along with this program. If not, please see - * or . -""" - -from ._Controller import Controller -from ._PrintController import PrintController -from ._TransmittingController import TransmittingController -from ._uDMXController import uDMXController -from ._OpenDMXController import OpenDMXController -from ._SerialController import SerialController +""" + * PyDMXControl: A Python 3 module to control DMX using OpenDMX or uDMX. + * Featuring fixture profiles, built-in effects and a web control panel. + * + * Copyright (C) 2022 Matt Cowley (MattIPv4) (me@mattcowley.co.uk) + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * You should have received a copy of the GNU General Public License + * along with this program. If not, please see + * or . +""" + +from ._Controller import Controller +from ._PrintController import PrintController +from ._TransmittingController import TransmittingController +from ._uDMXController import uDMXController +from ._OpenDMXController import OpenDMXController +from ._SerialController import SerialController +from ._ArtnetDMXController import ArtnetDMXController From 34abb6c85b5acd415e298abd59015d56f04742f6 Mon Sep 17 00:00:00 2001 From: fedeocire Date: Wed, 13 Dec 2023 20:08:42 +0100 Subject: [PATCH 2/2] Update _ArtnetDMXController.py Updated file --- .../controllers/_ArtnetDMXController.py | 173 +++++++++--------- 1 file changed, 89 insertions(+), 84 deletions(-) diff --git a/PyDMXControl/controllers/_ArtnetDMXController.py b/PyDMXControl/controllers/_ArtnetDMXController.py index afa1ce3..d45c9ef 100644 --- a/PyDMXControl/controllers/_ArtnetDMXController.py +++ b/PyDMXControl/controllers/_ArtnetDMXController.py @@ -22,7 +22,7 @@ def __init__(self, *args, **kwargs): self.__net = 0 self.__sequence = 0 self.__make_even = kwargs.pop("even_packet_size", True) - self.__packet_size = self.put_in_range(kwargs.pop("packet_size", 512), 2, 512, self.__make_even) + self.__packet_size = put_in_range(kwargs.pop("packet_size", 512), 2, 512, self.__make_even) self.__packet_header = bytearray() self.__buffer = bytearray(self.__packet_size) self.__broadcast=kwargs.pop("broadcast", False) @@ -59,7 +59,7 @@ def make_header(self): # the whole net subnet is simplified # by transforming a single uint16 into its 8 bit parts # you will most likely not see any differences in small networks - msb, lsb = self.shift_this(self.__universe) # convert to MSB / LSB + msb, lsb = shift_this(self.__universe) # convert to MSB / LSB self.__packet_header.append(lsb) self.__packet_header.append(msb) # 14 - universe, subnet (2 x 4 bits each) @@ -76,7 +76,7 @@ def make_header(self): self.__packet_header.append(self.__subnet << 4 | self.__universe) self.__packet_header.append(self.__net & 0xFF) # 16 - packet size (2 x 8 high byte first) - msb, lsb = self.shift_this(self.__packet_size) # convert to MSB / LSB + msb, lsb = shift_this(self.__packet_size) # convert to MSB / LSB self.__packet_header.append(msb) self.__packet_header.append(lsb) @@ -126,9 +126,9 @@ def set_universe(self, universe): # With simplified mode the universe will be split into two # values, (uni and sub) which is correct anyway. Net will always be 0 if self.is_simplified: - self.universe = self.put_in_range(universe, 0, 255, False) + self.universe = put_in_range(universe, 0, 255, False) else: - self.universe = self.put_in_range(universe, 0, 15, False) + self.universe = put_in_range(universe, 0, 15, False) self.make_header() def set_subnet(self, sub): @@ -136,7 +136,7 @@ def set_subnet(self, sub): Set simplify to false to use """ - self.subnet = self.put_in_range(sub, 0, 15, False) + self.subnet = put_in_range(sub, 0, 15, False) self.make_header() def set_net(self, net): @@ -144,112 +144,117 @@ def set_net(self, net): Set simplify to false to use """ - self.net = self.put_in_range(net, 0, 127, False) + self.net = put_in_range(net, 0, 127, False) self.make_header() def set_packet_size(self, packet_size): """Setter for packet size (2 - 512, even only).""" - self.packet_size = self.put_in_range(packet_size, 2, 512, self.make_even) + self.packet_size = put_in_range(packet_size, 2, 512, self.make_even) self.make_header() - def shift_this(number, high_first=True): - """Utility method: extracts MSB and LSB from number. + + +"""Provides common functions byte objects.""" - Args: - number - number to shift - high_first - MSB or LSB first (true / false) - Returns: - (high, low) - tuple with shifted values +def shift_this(number, high_first=True): + """Utility method: extracts MSB and LSB from number. - """ - low = (number & 0xFF) - high = ((number >> 8) & 0xFF) - if high_first: - return((high, low)) - return((low, high)) + Args: + number - number to shift + high_first - MSB or LSB first (true / false) + Returns: + (high, low) - tuple with shifted values - def clamp(number, min_val, max_val): - """Utility method: sets number in defined range. + """ + low = (number & 0xFF) + high = ((number >> 8) & 0xFF) + if high_first: + return((high, low)) + return((low, high)) - Args: - number - number to use - range_min - lowest possible number - range_max - highest possible number - Returns: - number - number in correct range - """ - return max(min_val, min(number, max_val)) +def clamp(number, min_val, max_val): + """Utility method: sets number in defined range. + Args: + number - number to use + range_min - lowest possible number + range_max - highest possible number - def set_even(number): - """Utility method: ensures number is even by adding. + Returns: + number - number in correct range + """ + return max(min_val, min(number, max_val)) - Args: - number - number to make even - Returns: - number - even number - """ - if number % 2 != 0: - number += 1 - return number +def set_even(number): + """Utility method: ensures number is even by adding. + Args: + number - number to make even - def put_in_range(self,number, range_min, range_max, make_even=True): - """Utility method: sets number in defined range. - DEPRECATED: this will be removed from the library + Returns: + number - even number + """ + if number % 2 != 0: + number += 1 + return number - Args: - number - number to use - range_min - lowest possible number - range_max - highest possible number - make_even - should number be made even - Returns: - number - number in correct range +def put_in_range(number, range_min, range_max, make_even=True): + """Utility method: sets number in defined range. + DEPRECATED: this will be removed from the library - """ - number = self.clamp(number, range_min, range_max) - if make_even: - number = self.set_even(number) - return number + Args: + number - number to use + range_min - lowest possible number + range_max - highest possible number + make_even - should number be made even + Returns: + number - number in correct range - def make_address_mask(self,universe, sub=0, net=0, is_simplified=True): - """Returns the address bytes for a given universe, subnet and net. + """ + number = clamp(number, range_min, range_max) + if make_even: + number = set_even(number) + return number - Args: - universe - Universe to listen - sub - Subnet to listen - net - Net to listen - is_simplified - Whether to use nets and subnet or universe only, - see User Guide page 5 (Universe Addressing) - Returns: - bytes - byte mask for given address +def make_address_mask(universe, sub=0, net=0, is_simplified=True): + """Returns the address bytes for a given universe, subnet and net. - """ - address_mask = bytearray() + Args: + universe - Universe to listen + sub - Subnet to listen + net - Net to listen + is_simplified - Whether to use nets and subnet or universe only, + see User Guide page 5 (Universe Addressing) - if is_simplified: - # Ensure data is in right range - universe = self.clamp(universe, 0, 32767) + Returns: + bytes - byte mask for given address - # Make mask - msb, lsb = self.shift_this(universe) # convert to MSB / LSB - address_mask.append(lsb) - address_mask.append(msb) - else: - # Ensure data is in right range - universe = self.clamp(universe, 0, 15) - sub = self.clamp(sub, 0, 15) - net = self.clamp(net, 0, 127) + """ + address_mask = bytearray() + + if is_simplified: + # Ensure data is in right range + universe = clamp(universe, 0, 32767) + + # Make mask + msb, lsb = shift_this(universe) # convert to MSB / LSB + address_mask.append(lsb) + address_mask.append(msb) + else: + # Ensure data is in right range + universe = clamp(universe, 0, 15) + sub = clamp(sub, 0, 15) + net = clamp(net, 0, 127) - # Make mask - address_mask.append(sub << 4 | universe) - address_mask.append(net & 0xFF) + # Make mask + address_mask.append(sub << 4 | universe) + address_mask.append(net & 0xFF) - return address_mask + return address_mask