Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 1 addition & 198 deletions qiskit_experiments/database_service/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
from abc import ABC, abstractmethod
from collections import OrderedDict
from datetime import datetime, timezone
from typing import Callable, Tuple, List, Dict, Any, Union, Type, Optional
from typing import Callable, Tuple, Dict, Any, Union, Type, Optional
import json

import pandas as pd
import dateutil.parser
import pkg_resources
from dateutil import tz
Expand Down Expand Up @@ -278,199 +277,3 @@ def append(self, value):
"""Append to the list."""
with self._lock:
self._container.append(value)


class ThreadSafeDataFrame(ThreadSafeContainer):
"""Thread safe data frame.

This class wraps pandas dataframe with predefined column labels,
which is specified by the class method `_default_columns`.
Subclass can override this method to provide default labels specific to its data structure.

This object is expected to be used internally in the ExperimentData.
"""

def __init__(self, init_values=None):
"""ThreadSafeContainer constructor."""
self._columns = self._default_columns()
self._extra = []
super().__init__(init_values)

@classmethod
def _default_columns(cls) -> List[str]:
return []

def _init_container(self, init_values: Optional[Union[Dict, pd.DataFrame]] = None):
"""Initialize the container."""
if init_values is None:
return pd.DataFrame(columns=self.get_columns())
if isinstance(init_values, pd.DataFrame):
input_columns = list(init_values.columns)
if input_columns != self.get_columns():
raise ValueError(
f"Input data frame contains unexpected columns {input_columns}. "
f"{self.__class__.__name__} defines {self.get_columns()} as default columns."
)
return init_values
if isinstance(init_values, dict):
return pd.DataFrame.from_dict(
data=init_values,
orient="index",
columns=self.get_columns(),
)
raise TypeError(f"Initial value of {type(init_values)} is not valid data type.")

def get_columns(self) -> List[str]:
"""Return current column names.

Returns:
List of column names.
"""
with self._lock:
return self._columns.copy()

def add_columns(self, *new_columns: str, default_value: Any = None):
"""Add new columns to the table.

This operation mutates the current container.

Args:
new_columns: Name of columns to add.
default_value: Default value to fill added columns.
"""
with self._lock:
# Order sensitive
new_columns = [c for c in new_columns if c not in self.get_columns()]
if len(new_columns) == 0:
return

# Update columns
for new_column in new_columns:
self._container.insert(len(self._container.columns), new_column, default_value)
self._columns.extend(new_columns)
self._extra.extend(new_columns)

def clear(self):
"""Remove all elements from this container."""
with self._lock:
self._container = self._init_container()
self._columns = self._default_columns()
self._extra = []

def container(
self,
collapse_extra: bool = True,
) -> pd.DataFrame:
"""Return bare pandas dataframe.

Args:
collapse_extra: Set True to show only default columns.

Returns:
Bare pandas dataframe. This object is no longer thread safe.
"""
with self._lock:
container = self._container.copy()

if collapse_extra:
return container[self._default_columns()]
return container

def drop_entry(
self,
index: str,
):
"""Drop entry from the dataframe.

Args:
index: Name of entry to drop.

Raises:
ValueError: When index is not in this table.
"""
with self._lock:
if index not in self._container.index:
raise ValueError(f"Table index {index} doesn't exist in this table.")
self._container.drop(index, inplace=True)

def get_entry(
self,
index: str,
) -> pd.Series:
"""Get entry from the dataframe.

Args:
index: Name of entry to acquire.

Returns:
Pandas Series of acquired entry. This doesn't mutate the table.

Raises:
ValueError: When index is not in this table.
"""
with self._lock:
if index not in self._container.index:
raise ValueError(f"Table index {index} doesn't exist in this table.")

return self._container.loc[index]

def add_entry(
self,
index: str,
**kwargs,
) -> pd.Series:
"""Add new entry to the dataframe.

Args:
index: Name of this entry. Must be unique in this table.
kwargs: Description of new entry to register.

Returns:
Pandas Series of added entry. This doesn't mutate the table.

Raises:
ValueError: When index is not unique in this table.
"""
with self._lock:
if index in self._container.index:
raise ValueError(f"Table index {index} already exists in the table.")

if kwargs.keys() - set(self.get_columns()):
self.add_columns(*kwargs.keys())

template = dict.fromkeys(self.get_columns())
template.update(kwargs)

if not isinstance(index, str):
index = str(index)
self._container.loc[index] = list(template.values())

return self._container.iloc[-1]

def _repr_html_(self) -> Union[str, None]:
"""Return HTML representation of this dataframe."""
with self._lock:
# Remove underscored columns.
return self._container._repr_html_()

def __json_encode__(self) -> Dict[str, Any]:
with self._lock:
return {
"class": "ThreadSafeDataFrame",
"data": self._container.to_dict(orient="index"),
"columns": self._columns,
"extra": self._extra,
}

@classmethod
def __json_decode__(cls, value: Dict[str, Any]) -> "ThreadSafeDataFrame":
if not value.get("class", None) == "ThreadSafeDataFrame":
raise ValueError("JSON decoded value for ThreadSafeDataFrame is not valid class type.")

instance = object.__new__(cls)
# Need to update self._columns first to set extra columns in the dataframe container.
instance._columns = value.get("columns", cls._default_columns())
instance._extra = value.get("extra", [])
instance._lock = threading.RLock()
instance._container = instance._init_container(init_values=value.get("data", {}))
return instance
Loading