Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@
serialise_sv_detections,
)
from inference.core.workflows.errors import WorkflowSyntaxError
from inference.core.workflows.execution_engine.entities.base import WorkflowImageData
from inference.core.workflows.execution_engine.entities.base import (
VideoMetadata,
WorkflowImageData,
)


def ignore_signal(signal_number: int, frame: FrameType) -> None:
Expand Down Expand Up @@ -262,7 +265,9 @@ def start_loop(loop: asyncio.AbstractEventLoop):
webrtc_turn_config = parsed_payload.webrtc_turn_config
webcam_fps = parsed_payload.webcam_fps
to_inference_queue = SyncAsyncQueue(loop=loop, maxsize=10)
from_inference_queue = SyncAsyncQueue(loop=loop, maxsize=10)
from_inference_queue: "SyncAsyncQueue[Tuple[VideoMetadata, np.ndarray]]" = (
SyncAsyncQueue(loop=loop, maxsize=10)
)

stream_output = None
if parsed_payload.stream_output:
Expand Down Expand Up @@ -336,6 +341,16 @@ def webrtc_sink(
):
if peer_connection.data_output in prediction:
workflow_output = prediction[peer_connection.data_output]
json_data = {
peer_connection.data_output: None,
"_video_metadata": {
"frame_id": video_frame.frame_id,
"frame_timestamp": video_frame.frame_timestamp.isoformat(),
"measured_fps": video_frame.measured_fps,
"fps": video_frame.fps,
},
}

serialized_data = None
if isinstance(workflow_output, WorkflowImageData):
errors.append(
Expand All @@ -346,44 +361,50 @@ def webrtc_sink(
parsed_detections = serialise_sv_detections(
workflow_output
)
serialized_data = json.dumps(parsed_detections)
json_data[peer_connection.data_output] = (
parsed_detections
)
serialized_data = json.dumps(json_data)
except Exception as error:
errors.append(
f"Failed to serialise output: {peer_connection.data_output}"
)
elif isinstance(workflow_output, dict):
try:
serialized_data = json.dumps(workflow_output)
json_data[peer_connection.data_output] = workflow_output
serialized_data = json.dumps(json_data)
except Exception as error:
errors.append(
f"Failed to serialise output: {peer_connection.data_output}"
)
else:
serialized_data = str(workflow_output)
if serialized_data is not None:
peer_connection.data_channel.send(serialized_data)
json_data[peer_connection.data_output] = str(
workflow_output
)
serialized_data = json.dumps(json_data)
if serialized_data is None:
serialized_data = json.dumps(json_data)
peer_connection.data_channel.send(serialized_data)
else:
errors.append(
f"Selected data output '{peer_connection.data_output}' not found in workflow outputs"
)

if peer_connection.stream_output is not None:
frame: Optional[np.ndarray] = get_frame_from_workflow_output(
video_metadata, frame = get_frame_from_workflow_output(
workflow_output=prediction,
frame_output_key=peer_connection.stream_output,
)
if frame is None:
for k in prediction.keys():
frame = get_frame_from_workflow_output(
video_metadata, frame = get_frame_from_workflow_output(
workflow_output=prediction,
frame_output_key=k,
)
if frame is not None:
errors.append(
f"'{peer_connection.stream_output}' not found in workflow outputs, using '{k}' instead"
)
frame = frame.copy()
break
if frame is None:
errors.append("Visualisation blocks were not executed")
errors.append(
Expand All @@ -393,8 +414,14 @@ def webrtc_sink(
"Please try to adjust the scene so models detect objects"
)
errors.append("or stop preview, update workflow and try again.")
frame = video_frame.image.copy()

frame = video_frame.image
video_metadata = VideoMetadata(
frame_number=video_frame.frame_id,
frame_timestamp=video_frame.frame_timestamp,
fps=video_frame.fps,
measured_fps=video_frame.measured_fps,
video_identifier=video_frame.source_id,
)
for row, error in enumerate(errors):
frame = cv.putText(
frame,
Expand All @@ -405,7 +432,7 @@ def webrtc_sink(
(0, 255, 0),
2,
)
from_inference_queue.sync_put(frame)
from_inference_queue.sync_put((video_metadata, frame))

buffer_sink = InMemoryBufferSink.init(
queue_size=parsed_payload.sink_configuration.results_buffer_size,
Expand Down
24 changes: 17 additions & 7 deletions inference/core/interfaces/stream_manager/manager_app/webrtc.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@
)
from inference.core.utils.async_utils import Queue as SyncAsyncQueue
from inference.core.utils.function import experimental
from inference.core.workflows.execution_engine.entities.base import WorkflowImageData
from inference.core.workflows.execution_engine.entities.base import (
VideoMetadata,
WorkflowImageData,
)

logging.getLogger("aiortc").setLevel(logging.WARNING)

Expand All @@ -61,9 +64,10 @@ def overlay_text_on_np_frame(frame: np.ndarray, text: List[str]):

def get_frame_from_workflow_output(
workflow_output: Dict[str, Union[WorkflowImageData, Any]], frame_output_key: str
) -> Optional[np.ndarray]:
) -> Optional[Tuple[VideoMetadata, np.ndarray]]:
latency: Optional[datetime.timedelta] = None
np_image: Optional[np.ndarray] = None
video_metadata: Optional[VideoMetadata] = None

step_output = workflow_output.get(frame_output_key)
if isinstance(step_output, WorkflowImageData):
Expand All @@ -76,6 +80,7 @@ def get_frame_from_workflow_output(
datetime.datetime.now() - step_output.video_metadata.frame_timestamp
)
np_image = step_output.numpy_image
video_metadata = step_output.video_metadata
elif isinstance(step_output, dict):
for frame_output in step_output.values():
if isinstance(frame_output, WorkflowImageData):
Expand All @@ -89,19 +94,20 @@ def get_frame_from_workflow_output(
- frame_output.video_metadata.frame_timestamp
)
np_image = frame_output.numpy_image
video_metadata = frame_output.video_metadata

# logger.warning since inference pipeline is noisy on INFO level
if DEBUG_WEBRTC_PROCESSING_LATENCY and latency is not None:
logger.warning("Processing latency: %ss", latency.total_seconds())

return np_image
return video_metadata, np_image


class VideoTransformTrack(VideoStreamTrack):
def __init__(
self,
to_inference_queue: "SyncAsyncQueue[VideoFrame]",
from_inference_queue: "SyncAsyncQueue[np.ndarray]",
from_inference_queue: "SyncAsyncQueue[Tuple[VideoMetadata, np.ndarray]]",
asyncio_loop: asyncio.AbstractEventLoop,
fps_probe_frames: int,
webcam_fps: Optional[float] = None,
Expand All @@ -122,7 +128,9 @@ def __init__(
self._processed = 0

self.to_inference_queue: "SyncAsyncQueue[VideoFrame]" = to_inference_queue
self.from_inference_queue: "SyncAsyncQueue[np.ndarray]" = from_inference_queue
self.from_inference_queue: (
"SyncAsyncQueue[Tuple[VideoMetadata, np.ndarray]]"
) = from_inference_queue

self._fps_probe_frames = fps_probe_frames
self._probe_count: int = 0
Expand Down Expand Up @@ -261,7 +269,7 @@ async def recv(self):
self._processed += 1

np_frame: Optional[np.ndarray] = None
np_frame = await self.from_inference_queue.async_get()
video_metadata, np_frame = await self.from_inference_queue.async_get()

if np_frame is None:
if self._last_processed_frame:
Expand All @@ -280,6 +288,8 @@ async def recv(self):
pts, time_base = await self.next_timestamp()
new_frame.pts = pts
new_frame.time_base = time_base
if video_metadata is not None:
new_frame.opaque = video_metadata.frame_number

return new_frame

Expand Down Expand Up @@ -378,7 +388,7 @@ def on_status_update(self, status_update: StatusUpdate) -> None:
async def init_rtc_peer_connection(
webrtc_offer: WebRTCOffer,
to_inference_queue: "SyncAsyncQueue[VideoFrame]",
from_inference_queue: "SyncAsyncQueue[np.ndarray]",
from_inference_queue: "SyncAsyncQueue[Tuple[VideoMetadata, np.ndarray]]",
asyncio_loop: asyncio.AbstractEventLoop,
fps_probe_frames: int,
webrtc_turn_config: Optional[WebRTCTURNConfig] = None,
Expand Down