|
3 | 3 | import logging |
4 | 4 | import keyword |
5 | 5 | import inspect |
| 6 | +import re |
6 | 7 | from enum import Enum |
7 | 8 | from typing import Callable, Any |
| 9 | +from zoneinfo import ZoneInfo |
| 10 | +from datetime import datetime |
8 | 11 |
|
9 | 12 | __all__ = [ |
10 | 13 | 'if_cond_fn', 'Multiprop', 'rune_any_elements', 'rune_get_only_element', |
11 | 14 | 'rune_filter', 'rune_all_elements', 'rune_contains', 'rune_disjoint', |
12 | 15 | 'rune_join', 'rune_flatten_list', 'rune_resolve_attr', |
13 | 16 | 'rune_resolve_deep_attr', 'rune_count', 'rune_attr_exists', |
14 | 17 | '_get_rune_object', 'rune_set_attr', 'rune_add_attr', |
15 | | - 'rune_check_cardinality', 'rune_str', 'rune_check_one_of' |
| 18 | + 'rune_check_cardinality', 'rune_str', 'rune_check_one_of','rune_zoned_date_time' |
16 | 19 | ] |
17 | 20 |
|
18 | 21 |
|
@@ -133,6 +136,47 @@ def rune_str(x: Any) -> str: |
133 | 136 | return str(x) |
134 | 137 |
|
135 | 138 |
|
| 139 | +def rune_zoned_date_time(x: str): |
| 140 | + """ |
| 141 | + Parse a datetime string with optional offset and/or named time zone |
| 142 | + """ |
| 143 | + #Separate str in parts (date, time , offset, zone) |
| 144 | + parts = x.strip().split() |
| 145 | + |
| 146 | + # Try parsing last part as a zone name |
| 147 | + possible_tz = parts[-1] |
| 148 | + try: |
| 149 | + zone = ZoneInfo(possible_tz) |
| 150 | + parts = parts[:-1] # Remove the zone name from the string |
| 151 | + except: |
| 152 | + zone = None |
| 153 | + |
| 154 | + # Datetime object with date, time and offset |
| 155 | + cleaned_input = " ".join(parts) |
| 156 | + |
| 157 | + # Parse datetime |
| 158 | + try: |
| 159 | + dt = datetime.strptime(cleaned_input, "%Y-%m-%d %H:%M:%S %z") |
| 160 | + has_offset = True |
| 161 | + except ValueError: |
| 162 | + dt = datetime.strptime(cleaned_input, "%Y-%m-%d %H:%M:%S") |
| 163 | + has_offset = False |
| 164 | + |
| 165 | + # Apply zone if valid |
| 166 | + if zone: |
| 167 | + if has_offset: |
| 168 | + input_offset = dt.utcoffset() |
| 169 | + zone_offset = dt.replace(tzinfo=zone).utcoffset() |
| 170 | + if input_offset != zone_offset: |
| 171 | + raise ValueError( |
| 172 | + f"Offset {input_offset} does not match zone '{zone.key}' ({zone_offset})" |
| 173 | + ) |
| 174 | + dt = dt.replace(tzinfo=zone) |
| 175 | + |
| 176 | + return dt |
| 177 | + |
| 178 | + |
| 179 | + |
136 | 180 | def _get_rune_object(base_model: str, attribute: str, value: Any) -> Any: |
137 | 181 | model_class = globals()[base_model] |
138 | 182 | instance_kwargs = {attribute: value} |
|
0 commit comments