Skip to content

Commit 83717af

Browse files
committed
Refactored the extractor segment and renamed it readFile. refactored field_segment to include an AbstractFieldSegment class.
1 parent a5f0515 commit 83717af

File tree

8 files changed

+88
-51
lines changed

8 files changed

+88
-51
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818
- Pulls "Annotated" typing from parameter names to create the Parameters section of the documentation.
1919
Makes for cleaner, more consistently up to date documentation. The use of Annotated is optional.
2020
- Updated **isIn** and **isNotIn** to function list **isTrue** so that they need no always be filters.
21+
- Created an AbstractFieldSegment class and changed the field_segment decorator to use it. This makes it easier
22+
to create segments with consistent "field segment" behavior that require additional initialization.
23+
- Renamed ExtractFile to ReadFile and refactored it to be descended from AbstractFileSegment
24+
- Renamed **extract** to **readFile** in chatterlang for consistence.
2125

2226
## 0.8.1
2327
### Improvements

src/talkpipe/data/extraction.py

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
from typing import Union, Iterable, Annotated
44
from pathlib import PosixPath
55
from docx import Document
6-
from talkpipe.pipe.core import segment, AbstractSegment, field_segment
7-
from talkpipe.chatterlang.registry import register_segment, register_source
6+
from talkpipe.pipe.core import segment, AbstractFieldSegment, field_segment
7+
from talkpipe.chatterlang.registry import register_segment
88
import logging
99
from pathlib import Path
1010
import glob
@@ -107,8 +107,8 @@ def listFiles(patterns: Annotated[Iterable[str], "Iterable of file patterns or p
107107
else:
108108
logging.debug(f"Skipping non-file: {match}")
109109

110-
@register_segment("extract")
111-
class FileExtractor(AbstractSegment):
110+
@register_segment("readFile")
111+
class ReadFile(AbstractFieldSegment):
112112
"""
113113
A class for extracting text content from different file types.
114114
@@ -121,22 +121,13 @@ class FileExtractor(AbstractSegment):
121121
122122
Methods:
123123
register_extractor(file_extension: str, extractor): Register a new file extractor for a specific extension.
124-
extract(file_path: Union[str, PosixPath]): Extract content from a single file.
125-
transform(input_iter): Transform an iterator of file paths into an iterator of their contents.
124+
ProcessItem(file_path: Union[str, PosixPath]): Extract content from a single file.
126125
127-
Example:
128-
>>> extractor = FileExtractor()
129-
>>> content = extractor.extract("document.txt")
130-
>>> for text in extractor.transform(["file1.txt", "file2.docx"]):
131-
... print(text)
132-
133-
Raises:
134-
Exception: When trying to extract content from a file with an unsupported extension.
135126
"""
136127
_extractors:dict
137128

138-
def __init__(self):
139-
super().__init__()
129+
def __init__(self, field: str = None, set_as: str = None):
130+
super().__init__(field=field, set_as=set_as)
140131
logging.debug("Initializing FileExtractor")
141132
self._extractors = {}
142133
self.register_extractor("txt", readtxt())
@@ -147,14 +138,11 @@ def register_extractor(self, file_extension:str, extractor):
147138
logging.debug(f"Registering extractor for extension: {file_extension}")
148139
self._extractors[file_extension] = extractor
149140

150-
def extract(self, file_path:Union[str, PosixPath]):
141+
def process_value(self, file_path:Union[str, PosixPath]):
151142
file_extension = file_path.split(".")[-1] if isinstance(file_path, str) else file_path.suffix[1:]
152143
if file_extension not in self._extractors:
153144
logging.error(f"Unsupported file extension: {file_extension}")
154145
raise Exception(f"File extension {file_extension} not supported")
155146
logging.debug(f"Extracting content from file: {file_path}")
156147
return next(self._extractors[file_extension]([file_path]))
157148

158-
def transform(self, input_iter):
159-
for file_path in input_iter:
160-
yield self.extract(file_path)

src/talkpipe/pipe/core.py

Lines changed: 56 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -398,27 +398,18 @@ def field_segment(*decorator_args, **decorator_kwargs):
398398
set_as: The field name to append the result as
399399
"""
400400
def decorator(func):
401-
class FieldSegment(AbstractSegment):
401+
class FieldSegment(AbstractFieldSegment):
402402
def __init__(self, *init_args, **init_kwargs):
403-
super().__init__()
404403
merged_kwargs = {**decorator_kwargs, **init_kwargs}
405-
self.field = merged_kwargs.get('field')
406-
self.set_as = merged_kwargs.get('set_as')
407-
merged_kwargs.pop('field', None)
408-
merged_kwargs.pop('set_as', None)
404+
field = merged_kwargs.pop('field', None)
405+
set_as = merged_kwargs.pop('set_as', None)
406+
super().__init__(field=field, set_as=set_as)
409407
self._func = lambda x: func(x, *init_args, **merged_kwargs)
410408
# Store reference to original function for documentation access
411409
self._original_func = func
412410

413-
def transform(self, input_iter):
414-
for item in input_iter:
415-
value = data_manipulation.extract_property(item, self.field) if self.field else item
416-
result = self._func(value)
417-
if self.set_as:
418-
item[self.set_as] = result
419-
yield item
420-
else:
421-
yield result
411+
def process_value(self, value):
412+
return self._func(value)
422413

423414
FieldSegment.__name__ = f"{func.__name__}FieldSegment"
424415
# Preserve original function's docstring and metadata
@@ -430,6 +421,56 @@ def transform(self, input_iter):
430421
return decorator(decorator_args[0])
431422
return decorator
432423

424+
class AbstractFieldSegment(AbstractSegment[T, U]):
425+
"""Abstract base class for segments that process a single field and optionally set results.
426+
427+
This class handles the 'field' and 'set_as' parameters that are commonly used
428+
in field-processing segments, making it easy for descendant classes to have
429+
their own constructors while still supporting field extraction and result setting.
430+
431+
Args:
432+
field: The field to extract from each item (optional)
433+
set_as: The field name to set/append the result as (optional)
434+
"""
435+
436+
def __init__(self, field: str = None, set_as: str = None):
437+
super().__init__()
438+
self.field = field
439+
self.set_as = set_as
440+
441+
@abstractmethod
442+
def process_value(self, value: Any) -> Any:
443+
"""Process the extracted field value or the entire item.
444+
445+
This method must be implemented by subclasses to define how to process
446+
the extracted field value (or entire item if no field is specified).
447+
448+
Args:
449+
value: The field value extracted from the item, or the entire item
450+
if no field was specified
451+
452+
Returns:
453+
Any: The processed result
454+
"""
455+
pass
456+
457+
def transform(self, input_iter: Iterable[T]) -> Iterator[U]:
458+
"""Transform input items by processing field values.
459+
460+
For each item:
461+
1. Extract the specified field value (or use entire item if no field)
462+
2. Process the value using process_value()
463+
3. Either yield the result directly or set it on the item and yield the item
464+
"""
465+
for item in input_iter:
466+
value = data_manipulation.extract_property(item, self.field) if self.field else item
467+
result = self.process_value(value)
468+
if self.set_as:
469+
item[self.set_as] = result
470+
yield item
471+
else:
472+
yield result
473+
433474
class Pipeline(AbstractSegment):
434475
"""A pipeline is a sequence of operations. Each operation draws from the output of the previous operation
435476
and yields items to the next operation. The pipeline can be executed by calling it with an input iterator.

src/talkpipe/search/abstract.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
1-
from typing import List, Dict, Any, Optional, Tuple, Union, Protocol
1+
from typing import List, Optional, Tuple, Union, Protocol
22
from pydantic import BaseModel
3-
import numpy as np
43

5-
# Type aliases
6-
VectorLike = Union[List[float], np.ndarray]
7-
Document = Dict[str, str]
8-
DocID = str
4+
from talkpipe.util.data_manipulation import VectorLike, Document, DocID
95

106
class SearchResult(BaseModel):
117
score: float

src/talkpipe/search/simplevectordb.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
from talkpipe.pipe.core import segment
1515
from talkpipe.pipe import field_segment
1616
from talkpipe.chatterlang import register_segment
17-
from .abstract import VectorLike, DocumentStore, VectorAddable, VectorSearchable, SearchResult, Document, DocID
18-
from talkpipe.util.data_manipulation import extract_property, toDict
17+
from .abstract import DocumentStore, VectorAddable, VectorSearchable, SearchResult
18+
from talkpipe.util.data_manipulation import DocID, Document, VectorLike, extract_property, toDict
1919

2020
logger = logging.getLogger(__name__)
2121

src/talkpipe/search/whoosh.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,15 @@
1010
from whoosh.writing import LockError
1111
from talkpipe.pipe import segment, field_segment
1212
from talkpipe.chatterlang import register_segment
13-
from talkpipe.util.data_manipulation import toDict, extract_property
13+
from talkpipe.util.data_manipulation import DocID, Document, toDict, extract_property
1414
from talkpipe.util.config import parse_key_value_str
1515
import time
1616

1717
from .abstract import (
1818
SearchResult,
1919
DocumentStore,
2020
MutableDocumentStore,
21-
TextSearchable,
22-
Document,
23-
DocID
21+
TextSearchable
2422
)
2523

2624
logger = logging.getLogger(__name__)

src/talkpipe/util/data_manipulation.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,20 @@
1+
from typing import Union
12
import logging
23
import re
34
import inspect
45
import json
56
import textwrap
67
from types import MappingProxyType
7-
from typing import Any, Set
8+
from typing import Any, Dict, List, Set
9+
10+
import numpy as np
811
from talkpipe.util.config import parse_key_value_str
912

13+
# Type aliases
14+
VectorLike = Union[List[float], np.ndarray]
15+
Document = Dict[str, str]
16+
DocID = str
17+
1018
logger = logging.getLogger(__name__)
1119

1220
def get_all_attributes(obj: Any, skip_packages: tuple = ('pydantic',), visited: Set = None,
@@ -359,4 +367,6 @@ def lambda_function(item: Any) -> Any:
359367
raise
360368
return None
361369

362-
return lambda_function
370+
return lambda_function
371+
372+

tests/talkpipe/data/test_extraction.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import pytest
2-
from talkpipe.data.extraction import FileExtractor, readtxt, readdocx, listFiles
2+
from talkpipe.data.extraction import ReadFile, readtxt, readdocx, listFiles
33

44
def test_readdocx(tmp_path):
55
# Test reading individual docx file using existing test file
@@ -53,7 +53,7 @@ def test_readtxt(tmp_path):
5353
next(readtxt()([tmp_path / "nonexistent_dir"]))
5454

5555
def test_FileExtractor(tmp_path):
56-
fe = FileExtractor()
56+
fe = ReadFile()
5757

5858
with open(tmp_path / "test.txt", "w") as file:
5959
file.write("Hello World")

0 commit comments

Comments
 (0)