|
5 | 5 | import shutil
|
6 | 6 | from datetime import datetime
|
7 | 7 | from tempfile import mkdtemp
|
8 |
| -from typing import Dict, List, Optional |
| 8 | +from typing import Dict, Iterable, List, Optional, Union |
9 | 9 |
|
10 | 10 | from dateutil.parser import parse as parse_date
|
11 | 11 | from distro import distro
|
@@ -385,6 +385,57 @@ def process(self, output):
|
385 | 385 | return sysctls
|
386 | 386 |
|
387 | 387 |
|
| 388 | +class GroupInfo(TypedDict): |
| 389 | + name: str |
| 390 | + password: str |
| 391 | + gid: int |
| 392 | + user_list: list[str] |
| 393 | + |
| 394 | + |
| 395 | +def _group_info_from_group_str(info: str) -> GroupInfo: |
| 396 | + """ |
| 397 | + Parses an entry from /etc/group or a similar source, e.g. |
| 398 | + 'plugdev:x:46:sysadmin,user2' into a GroupInfo dict object |
| 399 | + """ |
| 400 | + |
| 401 | + fields = info.split(":") |
| 402 | + |
| 403 | + if len(fields) != 4: |
| 404 | + raise ValueError(f"Error parsing group '{info}', expected exactly 4 fields separated by :") |
| 405 | + |
| 406 | + return { |
| 407 | + "name": fields[0], |
| 408 | + "password": fields[1], |
| 409 | + "gid": int(fields[2]), |
| 410 | + "user_list": fields[3].split(","), |
| 411 | + } |
| 412 | + |
| 413 | + |
| 414 | +class Group(FactBase[GroupInfo]): |
| 415 | + """ |
| 416 | + Returns information on a specific group on the system. |
| 417 | + """ |
| 418 | + |
| 419 | + def command(self, group): |
| 420 | + # FIXME: the '|| true' ensures 'process' is called, even if |
| 421 | + # getent was unable to find information on the group |
| 422 | + # There must be a better way to do this ! |
| 423 | + # e.g. allow facts 'process' method access to the process |
| 424 | + # return code ? |
| 425 | + return f"getent group {group} || true" |
| 426 | + |
| 427 | + default = None |
| 428 | + |
| 429 | + def process(self, output: Iterable[str]) -> str: |
| 430 | + group_string = next(iter(output), None) |
| 431 | + |
| 432 | + if group_string is None: |
| 433 | + # This will happen if the group was simply not found |
| 434 | + return None |
| 435 | + |
| 436 | + return _group_info_from_group_str(group_string) |
| 437 | + |
| 438 | + |
388 | 439 | class Groups(FactBase[List[str]]):
|
389 | 440 | """
|
390 | 441 | Returns a list of groups on the system.
|
@@ -417,7 +468,20 @@ def process(self, output) -> list[str]:
|
417 | 468 | Crontab = crontab.Crontab
|
418 | 469 |
|
419 | 470 |
|
420 |
| -class Users(FactBase): |
| 471 | +class UserInfo(TypedDict): |
| 472 | + name: str |
| 473 | + comment: str |
| 474 | + home: str |
| 475 | + shell: str |
| 476 | + group: str |
| 477 | + groups: list[str] |
| 478 | + uid: int |
| 479 | + gid: int |
| 480 | + lastlog: str |
| 481 | + password: str |
| 482 | + |
| 483 | + |
| 484 | +class Users(FactBase[dict[str, UserInfo]]): |
421 | 485 | """
|
422 | 486 | Returns a dictionary of users -> details.
|
423 | 487 |
|
@@ -457,7 +521,7 @@ def command(self) -> str:
|
457 | 521 |
|
458 | 522 | default = dict
|
459 | 523 |
|
460 |
| - def process(self, output): |
| 524 | + def process(self, output: Iterable[str]) -> dict[str, UserInfo]: |
461 | 525 | users = {}
|
462 | 526 | rex = r"[A-Z][a-z]{2} [A-Z][a-z]{2} {1,2}\d+ .+$"
|
463 | 527 |
|
|
0 commit comments